Libevent

Libevent多线程,锁,条件变量(下)

Posted by Liangjf on April 14, 2019

测试锁可能出现的问题

Libevent通过一些变量记录锁的使用情况,当检查到这些锁的错误使用时,就调用abort,退出运行。

evthread_enable_lock_debuging,这个函数就是为了设置使能检测lock的了,但是实际代码是:

//misspelled version for backward compatibility
void evthread_enable_lock_debuging(void)
{
    evthread_enable_lock_debugging();
}

对真正的接口封装了一下,一开始不明所以,后来,发现是debugging这个单词出错了,为了兼容老版本代码,就直接封装了真正的接口。

看下如何设置调试锁的:

void evthread_enable_lock_debugging(void)
{
    struct evthread_lock_callbacks cbs = {
        EVTHREAD_LOCK_API_VERSION,
        EVTHREAD_LOCKTYPE_RECURSIVE,
        debug_lock_alloc,
        debug_lock_free,
        debug_lock_lock,
        debug_lock_unlock
    };
    if (evthread_lock_debugging_enabled_)
        return;

    //把当前自定义的锁操作复制到_original_lock_fns结构体变量
    memcpy(&original_lock_fns_, &evthread_lock_fns_,
        sizeof(struct evthread_lock_callbacks));

    //将当前的自定义锁操作设置成调试锁操作。调试锁操作函数内部
    //还是使用_original_lock_fns的锁操作函数
    memcpy(&evthread_lock_fns_, &cbs,
        sizeof(struct evthread_lock_callbacks));

    memcpy(&original_cond_fns_, &evthread_cond_fns_,
        sizeof(struct evthread_condition_callbacks));
    evthread_cond_fns_.wait_condition = debug_cond_wait;
    
    //设置下标志位
    evthread_lock_debugging_enabled_ = 1;

    event_global_setup_locks_(0);
}

debug递归锁

struct debug_lock {
    unsigned signature;    
    unsigned locktype;       //锁的类型
    unsigned long held_by;      //被哪个线程占有
    /* XXXX if we ever use read-write locks, we will need a separate
     * lock to protect count. */
    int count;      //计算锁使用次数
    void *lock;     //锁类型
};


static int debug_lock_lock(unsigned mode, void* lock_)
{
	struct debug_lock *lock = lock_;
	int res = 0;

	//检测锁的类型
	if (lock->locktype & EVTHREAD_LOCKTYPE_READWRITE)
		EVUTIL_ASSERT(mode & (EVTHREAD_READ|EVTHREAD_WRITE));
	else
		EVUTIL_ASSERT((mode & (EVTHREAD_READ|EVTHREAD_WRITE)) == 0);
	if (original_lock_fns_.lock)
		res = original_lock_fns_.lock(mode, lock->lock);
	if (!res) {
	//调用锁成功就记录,因为成功返回0
		evthread_debug_lock_mark_locked(mode, lock);
	}
	return res;
}

这里也只是检测一下锁的类型,然后调用锁,如果失败就记录下情况。

static void evthread_debug_lock_mark_locked(unsigned mode, struct debug_lock * lock)
{
    EVUTIL_ASSERT(DEBUG_LOCK_SIG == lock->signature);
    ++lock->count;  //记录锁次数
    
    //如果不是递归锁,断言一下,判断只能调用一次锁
    if (!(lock->locktype & EVTHREAD_LOCKTYPE_RECURSIVE))
        EVUTIL_ASSERT(lock->count == 1);
    if (evthread_id_fn_) {
        //获取当前线程id
        unsigned long me;
        me = evthread_id_fn_();
        if (lock->count > 1)
            EVUTIL_ASSERT(lock->held_by == me);
        lock->held_by = me;     //修改锁的占有线程id
    }
}

主要是测试一个锁类型(如pthread_mutex_t)同时被加锁的次数

如果是非递归锁,那么将不允许多次加锁。对于锁的实现没有bug的话,如果是非递归锁,那么会在第二次锁住同一个锁时,卡死在debug_lock_lock 函数的original_lock_fns.lock上(即发生了死锁)。此时evthread_debug_lock_mark_locked是不会被调用的。

但是,对于一个有bug的锁实现,那么就有可能发生这种情况。即对于非递归锁,其还是可以多次锁住同一个锁,并且不会发生死锁。此时,evthread_debug_lock_mark_locked函数将会被执行,在这个函数内部将会检测这种情况。Libevent的锁调试(检测)就是调试(检测)这种有bug的锁实现。

debug递归锁

static int debug_lock_unlock(unsigned mode, void* lock_)
{
    struct debug_lock *lock = lock_;
    int res = 0;
    //检测,并记录
    evthread_debug_lock_mark_unlocked(mode, lock);
    if (original_lock_fns_.unlock)
        //真正的解锁
        res = original_lock_fns_.unlock(mode, lock->lock);
    return res;
}

主要是先检测,记录对应的锁的信息,看下面。最后才是解锁操作。

static void evthread_debug_lock_mark_unlocked(unsigned mode, struct debug_lock* lock)
{
	EVUTIL_ASSERT(DEBUG_LOCK_SIG == lock->signature);
	//检测锁的类型
	if (lock->locktype & EVTHREAD_LOCKTYPE_READWRITE)
		EVUTIL_ASSERT(mode & (EVTHREAD_READ|EVTHREAD_WRITE));
	else
		EVUTIL_ASSERT((mode & (EVTHREAD_READ|EVTHREAD_WRITE)) == 0);
	if (evthread_id_fn_) {
		unsigned long me;
		//锁,只能由对应加锁的线程解锁
		me = evthread_id_fn_();
		EVUTIL_ASSERT(lock->held_by == me);
		if (lock->count == 1)
			lock->held_by = 0;		//清楚占有锁变量
	}
	--lock->count;		//减掉解锁次数
	EVUTIL_ASSERT(lock->count >= 0);	//断言下是否已经解锁完毕
}

主要是检测解锁和解锁是否是同一个线程来操作的,就是在检测是否使用有误啊。

定制线程锁、条件变量

int evthread_set_lock_callbacks(const struct evthread_lock_callbacks *cbs)
{
    //获取当前的锁结构
    struct evthread_lock_callbacks *target = evthread_get_lock_callbacks();

    //看下是否打开了调试锁模式
#ifndef EVENT__DISABLE_DEBUG_MODE
    if (event_debug_mode_on_) {
        //检测是否是在所有操作前先初始化自定义锁操作
        if (event_debug_created_threadable_ctx_) {
            event_errx(1, "evthread initialization must be called BEFORE anything else!");
        }
    }
#endif

    if (!cbs) {
        if (target->alloc)
            event_warnx("Trying to disable lock functions after "
                "they have been set up will probaby not work.");
        memset(target, 0, sizeof(evthread_lock_fns_));
        return 0;
    }
    if (target->alloc) {
        //判断是否已经设置过了,设置过重复设置是允许的,但是设定了一部分,是不允许的
        /* Uh oh; we already had locking callbacks set up.*/
        if (target->lock_api_version == cbs->lock_api_version &&
            target->supported_locktypes == cbs->supported_locktypes &&
            target->alloc == cbs->alloc &&
            target->free == cbs->free &&
            target->lock == cbs->lock &&
            target->unlock == cbs->unlock) {
            /* no change -- allow this. */
            return 0;
        }
        event_warnx("Can't change lock callbacks once they have been "
            "initialized.");
        return -1;
    }
    //把自定义锁函数复制到当前锁结构
    if (cbs->alloc && cbs->free && cbs->lock && cbs->unlock) {
        memcpy(target, cbs, sizeof(evthread_lock_fns_));
        return event_global_setup_locks_(1);
    } else {
        return -1;
    }
}
  • 主要是检测自定义锁函数是否全都设置,如果没有完全都自定义是不允许的。
  • 在非调试情况下,实际使用的是_evthread_lock_fns变量的线程锁函数指针成员。在调试情况下实际使用的是_original_lock_fns变量的。
  • 也看出,当 cbs 为NULL的时候,就是形参为NULL时,就是没有设置自定义锁函数,就是不要线程锁的功能。

锁的使用(加锁和解锁)

int event_add(struct event* ev, const struct timeval* tv)
{
    int res;

    //是否已经初始化base
    if (EVUTIL_FAILURE_CHECK(!ev->ev_base)) {
        event_warnx("%s: event has no event_base set.", __func__);
        return -1;
    }

    //加锁
    EVBASE_ACQUIRE_LOCK(ev->ev_base, th_base_lock);

    //把事件加入事件集
    res = event_add_nolock_(ev, tv, 0);

    //解锁
    EVBASE_RELEASE_LOCK(ev->ev_base, th_base_lock);

    return (res);
}

加锁和解锁都是通过宏来实现的。

 // Lock an event_base, if it is set up for locking.  Acquires the lock
//   in the base structure whose field is named 'lockvar'. 
#define EVBASE_ACQUIRE_LOCK(base, lockvar) do {				\
        EVLOCK_LOCK((base)->lockvar, 0);			\
    } while (0)

//Acquire a lock. 
#define EVLOCK_LOCK(lockvar,mode)					\
    do {								\
        if (lockvar)						\
            evthread_lock_fns_.lock(mode, lockvar);		\
    } while (0)

利用 do { } while (0) 来实现宏的定义,因为这样会简洁。可以看到只在第二个宏时才判断 _evthread_lock_fns.lock 是否为NULL。

在非Windows系统上会把EVBASE_ACQUIRE_LOCK宏定义成第一种情况。但在Linux上调用event_add时,即使_evthread_lock_fns.lock为NULL也没有出现段错误

因为 EVBASE_ACQUIRE_LOCK(ev->ev_base, th_base_lock);的th_base_lock在申请base的时候已经分配了 EVTHREAD_ALLOC_LOCK(base->th_base_lock, 0); 。

struct event_base* event_base_new_with_config(const struct event_config* cfg)
{
    struct event_base *base;

    //之所以不用mm_malloc是因为mm_malloc并不会清零该内存区域。
    //而这个函数是会清零申请到的内存区域。这相当于给base初始化
    if ((base = mm_calloc(1, sizeof(struct event_base))) == NULL) {
        event_warn("%s: calloc", __func__);
        return NULL;
    }

        …….

    #ifndef _EVENT_DISABLE_THREAD_SUPPORT

        //对于th_base_lock变量,目前的值为NULL.
       //EVTHREAD_LOCKING_ENABLED宏是测试_evthread_lock_fns.lock
       //是否不为NULL
        if (EVTHREAD_LOCKING_ENABLED() &&
            (!cfg || !(cfg->flags & EVENT_BASE_FLAG_NOLOCK))) {
            int r;
            EVTHREAD_ALLOC_LOCK(base->th_base_lock, //申请锁变量
                EVTHREAD_LOCKTYPE_RECURSIVE);
        }
    #endif

    …..
    return (base);
}

从这里可以看到,如果_evthread_lock_fns.lock为NULL,那么th_base_lock成员肯定为NULL,那么后面就不会调用_evthread_lock_fns.lock()函数。从而避过段错误。在Libevent中,像EVBASE_ACQUIRE_LOCK这个宏是专门给event_base用的。