测试锁可能出现的问题
Libevent通过一些变量记录锁的使用情况,当检查到这些锁的错误使用时,就调用abort,退出运行。
evthread_enable_lock_debuging
,这个函数就是为了设置使能检测lock的了,但是实际代码是:
//misspelled version for backward compatibility
void evthread_enable_lock_debuging(void)
{
evthread_enable_lock_debugging();
}
对真正的接口封装了一下,一开始不明所以,后来,发现是debugging
这个单词出错了,为了兼容老版本代码,就直接封装了真正的接口。
看下如何设置调试锁的:
void evthread_enable_lock_debugging(void)
{
struct evthread_lock_callbacks cbs = {
EVTHREAD_LOCK_API_VERSION,
EVTHREAD_LOCKTYPE_RECURSIVE,
debug_lock_alloc,
debug_lock_free,
debug_lock_lock,
debug_lock_unlock
};
if (evthread_lock_debugging_enabled_)
return;
//把当前自定义的锁操作复制到_original_lock_fns结构体变量
memcpy(&original_lock_fns_, &evthread_lock_fns_,
sizeof(struct evthread_lock_callbacks));
//将当前的自定义锁操作设置成调试锁操作。调试锁操作函数内部
//还是使用_original_lock_fns的锁操作函数
memcpy(&evthread_lock_fns_, &cbs,
sizeof(struct evthread_lock_callbacks));
memcpy(&original_cond_fns_, &evthread_cond_fns_,
sizeof(struct evthread_condition_callbacks));
evthread_cond_fns_.wait_condition = debug_cond_wait;
//设置下标志位
evthread_lock_debugging_enabled_ = 1;
event_global_setup_locks_(0);
}
debug递归锁
struct debug_lock {
unsigned signature;
unsigned locktype; //锁的类型
unsigned long held_by; //被哪个线程占有
/* XXXX if we ever use read-write locks, we will need a separate
* lock to protect count. */
int count; //计算锁使用次数
void *lock; //锁类型
};
static int debug_lock_lock(unsigned mode, void* lock_)
{
struct debug_lock *lock = lock_;
int res = 0;
//检测锁的类型
if (lock->locktype & EVTHREAD_LOCKTYPE_READWRITE)
EVUTIL_ASSERT(mode & (EVTHREAD_READ|EVTHREAD_WRITE));
else
EVUTIL_ASSERT((mode & (EVTHREAD_READ|EVTHREAD_WRITE)) == 0);
if (original_lock_fns_.lock)
res = original_lock_fns_.lock(mode, lock->lock);
if (!res) {
//调用锁成功就记录,因为成功返回0
evthread_debug_lock_mark_locked(mode, lock);
}
return res;
}
这里也只是检测一下锁的类型,然后调用锁,如果失败就记录下情况。
static void evthread_debug_lock_mark_locked(unsigned mode, struct debug_lock * lock)
{
EVUTIL_ASSERT(DEBUG_LOCK_SIG == lock->signature);
++lock->count; //记录锁次数
//如果不是递归锁,断言一下,判断只能调用一次锁
if (!(lock->locktype & EVTHREAD_LOCKTYPE_RECURSIVE))
EVUTIL_ASSERT(lock->count == 1);
if (evthread_id_fn_) {
//获取当前线程id
unsigned long me;
me = evthread_id_fn_();
if (lock->count > 1)
EVUTIL_ASSERT(lock->held_by == me);
lock->held_by = me; //修改锁的占有线程id
}
}
主要是测试一个锁类型(如pthread_mutex_t)同时被加锁的次数。
如果是非递归锁,那么将不允许多次加锁。对于锁的实现没有bug的话,如果是非递归锁,那么会在第二次锁住同一个锁时,卡死在debug_lock_lock 函数的original_lock_fns.lock上(即发生了死锁)。此时evthread_debug_lock_mark_locked是不会被调用的。
但是,对于一个有bug的锁实现,那么就有可能发生这种情况。即对于非递归锁,其还是可以多次锁住同一个锁,并且不会发生死锁。此时,evthread_debug_lock_mark_locked函数将会被执行,在这个函数内部将会检测这种情况。Libevent的锁调试(检测)就是调试(检测)这种有bug的锁实现。
debug递归锁
static int debug_lock_unlock(unsigned mode, void* lock_)
{
struct debug_lock *lock = lock_;
int res = 0;
//检测,并记录
evthread_debug_lock_mark_unlocked(mode, lock);
if (original_lock_fns_.unlock)
//真正的解锁
res = original_lock_fns_.unlock(mode, lock->lock);
return res;
}
主要是先检测,记录对应的锁的信息,看下面。最后才是解锁操作。
static void evthread_debug_lock_mark_unlocked(unsigned mode, struct debug_lock* lock)
{
EVUTIL_ASSERT(DEBUG_LOCK_SIG == lock->signature);
//检测锁的类型
if (lock->locktype & EVTHREAD_LOCKTYPE_READWRITE)
EVUTIL_ASSERT(mode & (EVTHREAD_READ|EVTHREAD_WRITE));
else
EVUTIL_ASSERT((mode & (EVTHREAD_READ|EVTHREAD_WRITE)) == 0);
if (evthread_id_fn_) {
unsigned long me;
//锁,只能由对应加锁的线程解锁
me = evthread_id_fn_();
EVUTIL_ASSERT(lock->held_by == me);
if (lock->count == 1)
lock->held_by = 0; //清楚占有锁变量
}
--lock->count; //减掉解锁次数
EVUTIL_ASSERT(lock->count >= 0); //断言下是否已经解锁完毕
}
主要是检测解锁和解锁是否是同一个线程来操作的,就是在检测是否使用有误啊。
定制线程锁、条件变量
int evthread_set_lock_callbacks(const struct evthread_lock_callbacks *cbs)
{
//获取当前的锁结构
struct evthread_lock_callbacks *target = evthread_get_lock_callbacks();
//看下是否打开了调试锁模式
#ifndef EVENT__DISABLE_DEBUG_MODE
if (event_debug_mode_on_) {
//检测是否是在所有操作前先初始化自定义锁操作
if (event_debug_created_threadable_ctx_) {
event_errx(1, "evthread initialization must be called BEFORE anything else!");
}
}
#endif
if (!cbs) {
if (target->alloc)
event_warnx("Trying to disable lock functions after "
"they have been set up will probaby not work.");
memset(target, 0, sizeof(evthread_lock_fns_));
return 0;
}
if (target->alloc) {
//判断是否已经设置过了,设置过重复设置是允许的,但是设定了一部分,是不允许的
/* Uh oh; we already had locking callbacks set up.*/
if (target->lock_api_version == cbs->lock_api_version &&
target->supported_locktypes == cbs->supported_locktypes &&
target->alloc == cbs->alloc &&
target->free == cbs->free &&
target->lock == cbs->lock &&
target->unlock == cbs->unlock) {
/* no change -- allow this. */
return 0;
}
event_warnx("Can't change lock callbacks once they have been "
"initialized.");
return -1;
}
//把自定义锁函数复制到当前锁结构
if (cbs->alloc && cbs->free && cbs->lock && cbs->unlock) {
memcpy(target, cbs, sizeof(evthread_lock_fns_));
return event_global_setup_locks_(1);
} else {
return -1;
}
}
- 主要是检测自定义锁函数是否全都设置,如果没有完全都自定义是不允许的。
- 在非调试情况下,实际使用的是_evthread_lock_fns变量的线程锁函数指针成员。在调试情况下实际使用的是_original_lock_fns变量的。
- 也看出,当 cbs 为NULL的时候,就是形参为NULL时,就是没有设置自定义锁函数,就是不要线程锁的功能。
锁的使用(加锁和解锁)
int event_add(struct event* ev, const struct timeval* tv)
{
int res;
//是否已经初始化base
if (EVUTIL_FAILURE_CHECK(!ev->ev_base)) {
event_warnx("%s: event has no event_base set.", __func__);
return -1;
}
//加锁
EVBASE_ACQUIRE_LOCK(ev->ev_base, th_base_lock);
//把事件加入事件集
res = event_add_nolock_(ev, tv, 0);
//解锁
EVBASE_RELEASE_LOCK(ev->ev_base, th_base_lock);
return (res);
}
加锁和解锁都是通过宏来实现的。
// Lock an event_base, if it is set up for locking. Acquires the lock
// in the base structure whose field is named 'lockvar'.
#define EVBASE_ACQUIRE_LOCK(base, lockvar) do { \
EVLOCK_LOCK((base)->lockvar, 0); \
} while (0)
//Acquire a lock.
#define EVLOCK_LOCK(lockvar,mode) \
do { \
if (lockvar) \
evthread_lock_fns_.lock(mode, lockvar); \
} while (0)
利用 do { } while (0) 来实现宏的定义,因为这样会简洁。可以看到只在第二个宏时才判断
_evthread_lock_fns.lock
是否为NULL。
在非Windows系统上会把EVBASE_ACQUIRE_LOCK宏定义成第一种情况。但在Linux上调用event_add时,即使_evthread_lock_fns.lock为NULL也没有出现段错误
因为 EVBASE_ACQUIRE_LOCK(ev->ev_base, th_base_lock);的th_base_lock在申请base的时候已经分配了 EVTHREAD_ALLOC_LOCK(base->th_base_lock, 0); 。
struct event_base* event_base_new_with_config(const struct event_config* cfg)
{
struct event_base *base;
//之所以不用mm_malloc是因为mm_malloc并不会清零该内存区域。
//而这个函数是会清零申请到的内存区域。这相当于给base初始化
if ((base = mm_calloc(1, sizeof(struct event_base))) == NULL) {
event_warn("%s: calloc", __func__);
return NULL;
}
…….
#ifndef _EVENT_DISABLE_THREAD_SUPPORT
//对于th_base_lock变量,目前的值为NULL.
//EVTHREAD_LOCKING_ENABLED宏是测试_evthread_lock_fns.lock
//是否不为NULL
if (EVTHREAD_LOCKING_ENABLED() &&
(!cfg || !(cfg->flags & EVENT_BASE_FLAG_NOLOCK))) {
int r;
EVTHREAD_ALLOC_LOCK(base->th_base_lock, //申请锁变量
EVTHREAD_LOCKTYPE_RECURSIVE);
}
#endif
…..
return (base);
}
从这里可以看到,如果_evthread_lock_fns.lock为NULL,那么th_base_lock成员肯定为NULL,那么后面就不会调用_evthread_lock_fns.lock()函数。从而避过段错误。在Libevent中,像EVBASE_ACQUIRE_LOCK这个宏是专门给event_base用的。