13 #include <event2/event.h> 14 #include <event2/thread.h> 17 #include "evhtp/thread.h" 31 #ifdef EVTHR_SHARED_PIPE 52 #ifdef EVTHR_SHARED_PIPE 54 struct event * shared_pool_ev;
56 TAILQ_ENTRY(
evthr) next;
59 #define _evthr_read(thr, cmd, sock) \ 60 (recv(sock, cmd, sizeof(evthr_cmd_t), 0) == sizeof(evthr_cmd_t)) ? 1 : 0 68 if (!(thread = (evthr_t *)args)) {
78 (cmd.cb)(thread, cmd.args, thread->arg);
83 event_base_loopbreak(thread->evbase);
93 if (!(thread = (evthr_t *)args)) {
97 if (thread == NULL || thread->thr == NULL) {
101 thread->evbase = event_base_new();
102 thread->event = event_new(thread->evbase, thread->rdr,
105 event_add(thread->event, NULL);
107 #ifdef EVTHR_SHARED_PIPE 108 if (thread->pool_rdr > 0) {
109 thread->shared_pool_ev = event_new(thread->evbase, thread->pool_rdr,
111 event_add(thread->shared_pool_ev, NULL);
115 pthread_mutex_lock(&thread->lock);
116 if (thread->init_cb != NULL) {
117 (thread->init_cb)(thread, thread->arg);
119 pthread_mutex_unlock(&thread->lock);
121 event_base_loop(thread->evbase, 0);
123 pthread_mutex_lock(&thread->lock);
124 if (thread->exit_cb != NULL) {
125 (thread->exit_cb)(thread, thread->arg);
127 pthread_mutex_unlock(&thread->lock);
129 if (thread->err == 1) {
130 fprintf(stderr,
"FATAL ERROR!\n");
144 if (send(thread->wdr, &cmd,
sizeof(cmd), 0) <= 0) {
145 return EVTHR_RES_RETRY;
159 if (send(thread->wdr, &cmd,
sizeof(evthr_cmd_t), 0) < 0) {
160 return EVTHR_RES_RETRY;
163 pthread_join(*thread->thr, NULL);
169 return thr ? thr->evbase : NULL;
181 return thr ? thr->aux : NULL;
207 _evthr_new(evthr_init_cb init_cb, evthr_exit_cb exit_cb,
void * args) {
211 if (evutil_socketpair(AF_UNIX, SOCK_STREAM, 0, fds) == -1) {
215 evutil_make_socket_nonblocking(fds[0]);
216 evutil_make_socket_nonblocking(fds[1]);
218 if (!(thread = calloc(
sizeof(evthr_t), 1))) {
222 thread->thr = malloc(
sizeof(pthread_t));
224 thread->rdr = fds[0];
225 thread->wdr = fds[1];
230 if (pthread_mutex_init(&thread->lock, NULL)) {
250 if (thread == NULL || thread->thr == NULL) {
254 if (pthread_create(thread->thr, NULL,
_evthr_loop, (
void *)thread)) {
263 if (thread == NULL) {
267 if (thread->rdr > 0) {
271 if (thread->wdr > 0) {
280 event_free(thread->event);
283 if (thread->evbase) {
284 event_base_free(thread->evbase);
300 TAILQ_REMOVE(&pool->threads, thread, next);
314 return EVTHR_RES_FATAL;
326 #ifdef EVTHR_SHARED_PIPE 333 if (
evhtp_unlikely(send(pool->wdr, &cmd,
sizeof(cmd), 0) == -1)) {
334 return EVTHR_RES_RETRY;
339 evthr_t * thr = NULL;
342 return EVTHR_RES_FATAL;
346 return EVTHR_RES_NOCB;
349 thr = TAILQ_FIRST(&pool->threads);
351 TAILQ_REMOVE(&pool->threads, thr, next);
352 TAILQ_INSERT_TAIL(&pool->threads, thr, next);
359 static evthr_pool_t *
361 evthr_init_cb init_cb,
362 evthr_exit_cb exit_cb,
367 #ifdef EVTHR_SHARED_PIPE 375 if (!(pool = calloc(
sizeof(evthr_pool_t), 1))) {
379 pool->nthreads = nthreads;
380 TAILQ_INIT(&pool->threads);
382 #ifdef EVTHR_SHARED_PIPE 383 if (evutil_socketpair(AF_UNIX, SOCK_DGRAM, 0, fds) == -1) {
387 evutil_make_socket_nonblocking(fds[0]);
388 evutil_make_socket_nonblocking(fds[1]);
394 for (i = 0; i < nthreads; i++) {
402 #ifdef EVTHR_SHARED_PIPE 403 thread->pool_rdr = fds[0];
406 TAILQ_INSERT_TAIL(&pool->threads, thread, next);
419 evthr_init_cb init_cb,
420 evthr_exit_cb exit_cb,
void * shared) {
426 evthr_t *
evthr = NULL;
432 TAILQ_FOREACH(evthr, &pool->threads, next) {
int evthr_set_initcb(evthr_t *thr, evthr_init_cb cb)
static void _evthr_read_cmd(evutil_socket_t sock, short which, void *args)
int evthr_pool_start(evthr_pool_t *pool)
evthr_pool_t * evthr_pool_wexit_new(int nthreads, evthr_init_cb init_cb, evthr_exit_cb exit_cb, void *shared)
int evthr_set_exitcb(evthr_t *thr, evthr_exit_cb cb)
evthr_t * evthr_wexit_new(evthr_init_cb init_cb, evthr_exit_cb exit_cb, void *args)
struct evthr_pool __attribute__
static evthr_pool_t * _evthr_pool_new(int nthreads, evthr_init_cb init_cb, evthr_exit_cb exit_cb, void *shared)
evbase_t * evthr_get_base(evthr_t *thr)
void evthr_set_aux(evthr_t *thr, void *aux)
evthr_res evthr_pool_stop(evthr_pool_t *pool)
evthr_res evthr_stop(evthr_t *thread)
void evthr_pool_free(evthr_pool_t *pool)
int evthr_start(evthr_t *thread)
#define TAILQ_FOREACH_SAFE(var, head, field, tvar)
evthr_res evthr_pool_defer(evthr_pool_t *pool, evthr_cb cb, void *arg)
static void * _evthr_loop(void *args)
#define _evthr_read(thr, cmd, sock)
struct evthr_pool_slist evthr_pool_slist_t
evthr_pool_slist_t threads
evthr_t * evthr_new(evthr_init_cb init_cb, void *args)
void * evthr_get_aux(evthr_t *thr)
void evthr_free(evthr_t *thread)
evthr_pool_t * evthr_pool_new(int nthreads, evthr_init_cb init_cb, void *shared)
static evthr_t * _evthr_new(evthr_init_cb init_cb, evthr_exit_cb exit_cb, void *args)
evthr_res evthr_defer(evthr_t *thread, evthr_cb cb, void *arg)
#define evhtp_unlikely(x)
TAILQ_HEAD(evthr_pool_slist, evthr)