记一个多线程使用libevent的问题 – adinosaur

记一个多线程使用libevent的问题 – adinosaur

前段时间运用libevent体系库引起了东西游玩服务性的引擎,这时遭遇战的成绩经过被记载到群众中去。。

我选择将逻辑和体系划分为设计服务性的上的线状物。,线状物间相应队列。但这有东西成绩。:

当东西逻辑线状物想驾驶发送东西记录包时,体系线状物也可以在观望形势后再作决议体系IO的体系大声喊上被闭塞。。即使缺乏特别纠正,此刻,音讯使成群将保留在起缓冲作用的人中。,直到下东西体系线状物从挂起的体系大声喊使恢复原状(COMPAR)。于是,当逻辑线状物发送音讯包时,bufferevent_write基本的使行动起来机制。,让体系线状物从体系大声喊使恢复原状,譬如ePube和句柄。

本着对libevent的api不熟悉,后头,我本身引起了这时效能。。引起事实上决不复杂。,但怀念我的心。:只排简略而基本的的信号。,确保尽量少的bug。。直到后头,我和我的同事议论过(P),才看见同样libevent是有对此做维持的,但极小的是怎样做的。,贴壁纸中缺乏极小的阐明。,因而我的同事无法阐明报告。。本着这种情况,我决议,把libevent中与此相干的源码粗略的过一遍,为了了解上面几件事。:

(1)与线状物观望形势后再作决议的使行动起来事变相干的API是什么?,若何运用呢?

(2)在这些API在身后做了什么?

相干API,及用法

后头我有变化多的的概念。,libevent相干的api很简略而且除非东西:

/*call event_use_pthreads() if use posix 线状物。 
evthread_use_windows_threads();
struct event_base* ev_base = event_base_new();

笔者基本的小心的是效能。evthread_use_windows_threads大声喊必须做的事设定初值。event_base从前。在此以后,缺乏基本的做别的事实。,逻辑线状物在给予。bufferevent_write的时分,libevent就会自发地使行动起来体系线状物的事变丰满的,并给予发送记录。。

API在身后的逻辑

先着手evthread_use_windows_threads这时功能做了什么?

int evthread_use_windows_threads(void) {
  ...  
  evthread_set_lock_callbacks(&cbs);
  evthread_set_id_callback(evthread_win32_get_id);
  evthread_set_condition_callbacks(&cond_cbs);
  return 0;            
}

经过呼叫evthread_use_windows_threads,笔者先行设置了稍微回调功能。,包含设置了libevent获取线状物id的回调功能evthread_id_fn_

检查设定初值事变丰满的的效能。event_base_new做了什么:

// event.c
struct event_base * 
event_base_new(void) {
    ...
    base = event_base_new_with_config(cfg);
}

struct event_base * 
event_base_new_with_config(const struct event_config CFG) {
    ...
#ifndef EVENT__DISABLE_THREAD_SUPPORT
	if (EVTHREAD_LOCKING_ENABLED() &&
	    (!cfg || !(cfg->flags & EVENT_BASE_FLAG_NOLOCK))) {
		int r;
		EVTHREAD_ALLOC_LOCK(base->th_base_lock, 0);
		EVTHREAD_ALLOC_COND(base->current_event_cond);
		r = evthread_make_base_notifiable(base);
		if (r<0) {
			event_warnx("%s: Unable to make base notifiable.", __func__);
			event_base_free(base);
			return NULL;
		}
	}
#endif
    ...
}

int
evthread_make_base_notifiable(struct event_base 根底) {
	int r;
	if (!base)
		return -1;

	EVBASE_ACQUIRE_LOCK(base, th_base_lock);
	r = evthread_make_base_notifiable_nolock_(base);
	EVBASE_RELEASE_LOCK(base, th_base_lock);
	return r;
}

static int
evthread_make_base_notifiable_nolock_(struct event_base 根底) {
    ...
    if (evutil_make_internal_pipe_(base->th_notify_fd) == 0) {
	notify = evthread_notify_base_default;
	cb = evthread_notify_drain_default;
    } else {
	return -1;
    }

    base->th_notify_fn = notify;
}

它被大声喊如次:

event_base_new–>event_base_new_with_config–>evthread_make_base_notifiable–>evthread_make_base_notifiable_nolock_

至死经过evutil_make_internal_pipe_功能创办了两个相互衔接的socket(windows事实下,用这时来模仿管道):

/* Internal function: Set FD〔0〕 and FD〔1〕 to a pair of fds such that writes on
 * FD〔1〕 get read from FD〔0〕.  Make both fds nonblocking and close-on-exec.
 * Return 0 on success, -1 on 走慢。
 */
int
evutil_make_internal_pipe_(evutil_socket_t FD〔2〕
{
	/*
	  Making the second socket nonblocking is a bit subtle, given that we
	  ignore any EAGAIN returns when writing to it, and you don''t 习惯法地
	  do that for a nonblocking 插座。 But if the kernel gives us EAGAIN,
	  then there''s no need to add any more data to the buffer, since
	  the main thread is already either about to wake up and drain it,
	  or woken up and in the process of draining it.
	*/

#if defined(EVENT__HAVE_PIPE2)
	if (pipe2(fd, O_NONBLOCK|O_CLOEXEC) == 0)
		return 0;
#endif
#if defined(EVENT__HAVE_PIPE)
	if (管) == 0) {
		if (evutil_fast_socket_nonblocking(FD〔0〕) < 0 ||
		    evutil_fast_socket_nonblocking(FD〔1〕) < 0 ||
		    evutil_fast_socket_closeonexec(FD〔0〕) < 0 ||
		    evutil_fast_socket_closeonexec(FD〔1〕) < 0) {
			close(FD〔0〕);
			close(FD〔1〕);
			FD〔0〕 = FD〔1〕 = -1;
			return -1;
		}
		return 0;
	} else {
		event_warn("%s: pipe", __func__);
	}
#endif

#ifdef _WIN32
#define LOCAL_SOCKETPAIR_AF AF_INET
#else
#define LOCAL_SOCKETPAIR_AF AF_UNIX
#endif
	if (evutil_socketpair(LOCAL_SOCKETPAIR_AF, SOCK_STREAM, 0, fd) == 0) {
		if (evutil_fast_socket_nonblocking(FD〔0〕) < 0 ||
		    evutil_fast_socket_nonblocking(FD〔1〕) < 0 ||
		    evutil_fast_socket_closeonexec(FD〔0〕) < 0 ||
		    evutil_fast_socket_closeonexec(FD〔1〕) < 0) {
			evutil_closesocket(FD〔0〕);
			evutil_closesocket(FD〔1〕);
			FD〔0〕 = FD〔1〕 = -1;
			return -1;
		}
		return 0;
	}
	FD〔0〕 = FD〔1〕 = -1;
	return -1;
}

以后,再次启动使行动起来伪造。notify功能(evthread_notify_base_default):

static int
evthread_notify_base_default(struct event_base 根底) { 
  char BUF〔1〕 
  int r; 
  BUF〔0〕 = (刻) 0; 
#ifdef _WIN32 
  r = send(base->th_notify_FD〔1〕, buf, 1, 0); 
#else 
  r = write(base->th_notify_FD〔1〕, buf, 1); 
#endif 
  return (r < 0 && ! EVUTIL_ERR_IS_EAGAIN(errno)) ? -1 : 0; 
}

可以看出,在windows下libevent的使行动起来机制现实也self pipe trick,它只经过证实一对插座来模仿管道。,当笔者基本的醒着的时,它将1音节写信到东西套接字中。

再看一遍bufferevent_write:

// bufferevent.c
int
bufferevent_write(struct bufferevent *bufev, const void *data, size_t 维) {
	if (evbuffer_add(bufev->output, data, 维) == -1)
		return (-1);

	return 0;
}

// buffer.c
int
evbuffer_add(struct evbuffer *buf, const void *data_in, size_t datlen) {
    ...
    evbuffer_invoke_callbacks_(buf);
}

它弹簧了搭上回调功能。,在创办这些回调功能。bufferevent的时分被布置:

//bufferevent_sock.c
struct bufferevent *
bufferevent_socket_new(struct event_base *base, evutil_socket_t fd, int 得到或获准进行选择 {
    ...
    evbuffer_add_cb(bufev->output, bufferevent_socket_outbuf_cb, bufev);
}

static void
bufferevent_socket_outbuf_cb(struct evbuffer *buf, const struct evbuffer_cb_info *cbinfo, void 精氨酸) {
    ...
    if (cbinfo->n_added &&
	    (bufev->enabled & EV_WRITE) &&
	    !event_pending(&bufev->ev_write, EV_WRITE, 空) &&
	    !bufev_p->write_suspended) {
		/* Somebody added data to the buffer, and we would like to
		 * write, and we were not 用钢笔画的。  So, start 用钢笔画的。 */
		if (bufferevent_add_event_(&bufev->ev_write, &bufev->timeout_write) == -1) {
		    /* Should we log this? */
		}
	}
}

//bufferevent.c
int
bufferevent_add_event_(struct event *ev, const struct timeval 广播的频道) {
	if (!evutil_timerisset(tv))
		return event_add(ev, 空);
	else
		return event_add(ev, 广播的频道)
}

//event.c
int
event_add(struct event *ev, const struct timeval 广播的频道) {
	EVBASE_ACQUIRE_LOCK(ev->ev_base, th_base_lock);
	res = event_add_nolock_(ev, tv, 0);
	EVBASE_RELEASE_LOCK(ev->ev_base, th_base_lock);
}

int
event_add_nolock_(struct event *ev, const struct timeval *tv, int tv_is_absolute) {
    ...
    /* if we are not in the right thread, we need to wake up the loop */
    //即使在证实event_base从前大声喊了evthread_use_windows_threads,EvBaseKudioNoTebug将在此刻使恢复原状true。,别的方式,它是假的。。
    if (RES) != -1 && notify && EVBASE_NEED_NOTIFY(base))
        evthread_notify_base(base);
}

从信号,在往bufferevent在写信记录以后,给予回调功能。,使行动起来体系线状物逻辑。evthread_notify_base)。为什么笔者基本的人工控制大声喊它呢?evthread_use_windows_threads效能呢?

回想一次。:

#define EVBASE_NEED_NOTIFY(base)			 \
	((base)->running_loop &&			 \
	    ((base)->th_owner_id != evthreadimpl_get_id_()))

unsigned long
evthreadimpl_get_id_() {
	return evthread_id_fn_ ? evthread_id_fn_() : 1;
}

先行说过,当被传讯evthread_use_windows_threads,设置了libevent获取线状物id的回调功能evthread_id_fn_。正因这样的事物。,跑步和跑步。evthread_notify_base功能:

static int
evthread_notify_base(struct event_base 根底) {
	EVENT_BASE_ASSERT_LOCKED(base);
	if (!base->th_notify_fn)
		return -1;
	if (base->is_notify_pending)
		return 0;
	base->is_notify_pending = 1;
	return base->th_notify_fn(base);
}

因而,当笔者在逻辑线状物大声喊bufferevent_write尝试发送音长记录的时分,它将按照以下内容大声喊,使充满体系线状物:

bufferevent_write-->evbuffer_add-->evbuffer_invoke_callbacks_-->bufferevent_socket_evtbuf_cb_-->bufferevent_add_event_-->event_add-->event_add_nolock_-->evthread_notify_base

以上所述便是libevent跨线状物使行动起来的逻辑。

发表评论

电子邮件地址不会被公开。 必填项已用*标注