Libevhtp  1.2.13
thread.c
Go to the documentation of this file.
1 #define _GNU_SOURCE
2 #include <stdio.h>
3 #include <stdlib.h>
4 #include <stdint.h>
5 #include <limits.h>
6 #ifndef WIN32
7 #include <sys/queue.h>
8 #endif
9 
10 #include <unistd.h>
11 #include <pthread.h>
12 
13 #include <event2/event.h>
14 #include <event2/thread.h>
15 
16 #include "internal.h"
17 #include "evhtp/thread.h"
18 
19 typedef struct evthr_cmd evthr_cmd_t;
20 typedef struct evthr_pool_slist evthr_pool_slist_t;
21 
22 struct evthr_cmd {
23  uint8_t stop;
24  void * args;
25  evthr_cb cb;
26 } __attribute__((packed));
27 
28 TAILQ_HEAD(evthr_pool_slist, evthr);
29 
30 struct evthr_pool {
31 #ifdef EVTHR_SHARED_PIPE
32  int rdr;
33  int wdr;
34 #endif
35  int nthreads;
37 };
38 
39 struct evthr {
40  int rdr;
41  int wdr;
42  char err;
43  ev_t * event;
44  evbase_t * evbase;
45  pthread_mutex_t lock;
46  pthread_t * thr;
47  evthr_init_cb init_cb;
48  evthr_init_cb exit_cb;
49  void * arg;
50  void * aux;
51 
52 #ifdef EVTHR_SHARED_PIPE
53  int pool_rdr;
54  struct event * shared_pool_ev;
55 #endif
56  TAILQ_ENTRY(evthr) next;
57 };
58 
59 #define _evthr_read(thr, cmd, sock) \
60  (recv(sock, cmd, sizeof(evthr_cmd_t), 0) == sizeof(evthr_cmd_t)) ? 1 : 0
61 
62 static void
63 _evthr_read_cmd(evutil_socket_t sock, short which, void * args) {
64  evthr_t * thread;
65  evthr_cmd_t cmd;
66  int stopped;
67 
68  if (!(thread = (evthr_t *)args)) {
69  return;
70  }
71 
72  stopped = 0;
73 
74  if (evhtp_likely(_evthr_read(thread, &cmd, sock) == 1)) {
75  stopped = cmd.stop;
76 
77  if (evhtp_likely(cmd.cb != NULL)) {
78  (cmd.cb)(thread, cmd.args, thread->arg);
79  }
80  }
81 
82  if (evhtp_unlikely(stopped == 1)) {
83  event_base_loopbreak(thread->evbase);
84  }
85 
86  return;
87 } /* _evthr_read_cmd */
88 
89 static void *
90 _evthr_loop(void * args) {
91  evthr_t * thread;
92 
93  if (!(thread = (evthr_t *)args)) {
94  return NULL;
95  }
96 
97  if (thread == NULL || thread->thr == NULL) {
98  pthread_exit(NULL);
99  }
100 
101  thread->evbase = event_base_new();
102  thread->event = event_new(thread->evbase, thread->rdr,
103  EV_READ | EV_PERSIST, _evthr_read_cmd, args);
104 
105  event_add(thread->event, NULL);
106 
107 #ifdef EVTHR_SHARED_PIPE
108  if (thread->pool_rdr > 0) {
109  thread->shared_pool_ev = event_new(thread->evbase, thread->pool_rdr,
110  EV_READ | EV_PERSIST, _evthr_read_cmd, args);
111  event_add(thread->shared_pool_ev, NULL);
112  }
113 #endif
114 
115  pthread_mutex_lock(&thread->lock);
116  if (thread->init_cb != NULL) {
117  (thread->init_cb)(thread, thread->arg);
118  }
119  pthread_mutex_unlock(&thread->lock);
121  event_base_loop(thread->evbase, 0);
122 
123  pthread_mutex_lock(&thread->lock);
124  if (thread->exit_cb != NULL) {
125  (thread->exit_cb)(thread, thread->arg);
126  }
127  pthread_mutex_unlock(&thread->lock);
128 
129  if (thread->err == 1) {
130  fprintf(stderr, "FATAL ERROR!\n");
131  }
132 
133  pthread_exit(NULL);
134 } /* _evthr_loop */
135 
136 evthr_res
137 evthr_defer(evthr_t * thread, evthr_cb cb, void * arg) {
138  evthr_cmd_t cmd = {
139  .cb = cb,
140  .args = arg,
141  .stop = 0
142  };
143 
144  if (send(thread->wdr, &cmd, sizeof(cmd), 0) <= 0) {
145  return EVTHR_RES_RETRY;
146  }
147 
148  return EVTHR_RES_OK;
149 }
150 
151 evthr_res
152 evthr_stop(evthr_t * thread) {
153  evthr_cmd_t cmd = {
154  .cb = NULL,
155  .args = NULL,
156  .stop = 1
157  };
158 
159  if (send(thread->wdr, &cmd, sizeof(evthr_cmd_t), 0) < 0) {
160  return EVTHR_RES_RETRY;
161  }
162 
163  pthread_join(*thread->thr, NULL);
164  return EVTHR_RES_OK;
165 }
166 
167 evbase_t *
168 evthr_get_base(evthr_t * thr) {
169  return thr ? thr->evbase : NULL;
170 }
171 
172 void
173 evthr_set_aux(evthr_t * thr, void * aux) {
174  if (thr) {
175  thr->aux = aux;
176  }
177 }
178 
179 void *
180 evthr_get_aux(evthr_t * thr) {
181  return thr ? thr->aux : NULL;
182 }
183 
184 int
185 evthr_set_initcb(evthr_t * thr, evthr_init_cb cb) {
186  if (thr == NULL) {
187  return -1;
188  }
189 
190  thr->init_cb = cb;
191 
192  return 01;
193 }
194 
195 int
196 evthr_set_exitcb(evthr_t * thr, evthr_exit_cb cb) {
197  if (thr == NULL) {
198  return -1;
199  }
200 
201  thr->exit_cb = cb;
202 
203  return 0;
204 }
205 
206 static evthr_t *
207 _evthr_new(evthr_init_cb init_cb, evthr_exit_cb exit_cb, void * args) {
208  evthr_t * thread;
209  int fds[2];
210 
211  if (evutil_socketpair(AF_UNIX, SOCK_STREAM, 0, fds) == -1) {
212  return NULL;
213  }
214 
215  evutil_make_socket_nonblocking(fds[0]);
216  evutil_make_socket_nonblocking(fds[1]);
217 
218  if (!(thread = calloc(sizeof(evthr_t), 1))) {
219  return NULL;
220  }
221 
222  thread->thr = malloc(sizeof(pthread_t));
223  thread->arg = args;
224  thread->rdr = fds[0];
225  thread->wdr = fds[1];
226 
227  evthr_set_initcb(thread, init_cb);
228  evthr_set_exitcb(thread, exit_cb);
229 
230  if (pthread_mutex_init(&thread->lock, NULL)) {
231  evthr_free(thread);
232  return NULL;
233  }
234 
235  return thread;
236 } /* evthr_new */
237 
238 evthr_t *
239 evthr_new(evthr_init_cb init_cb, void * args) {
240  return _evthr_new(init_cb, NULL, args);
241 }
242 
243 evthr_t *
244 evthr_wexit_new(evthr_init_cb init_cb, evthr_exit_cb exit_cb, void * args) {
245  return _evthr_new(init_cb, exit_cb, args);
246 }
247 
248 int
249 evthr_start(evthr_t * thread) {
250  if (thread == NULL || thread->thr == NULL) {
251  return -1;
252  }
253 
254  if (pthread_create(thread->thr, NULL, _evthr_loop, (void *)thread)) {
255  return -1;
256  }
257 
258  return 0;
259 }
260 
261 void
262 evthr_free(evthr_t * thread) {
263  if (thread == NULL) {
264  return;
265  }
266 
267  if (thread->rdr > 0) {
268  close(thread->rdr);
269  }
270 
271  if (thread->wdr > 0) {
272  close(thread->wdr);
273  }
274 
275  if (thread->thr) {
276  free(thread->thr);
277  }
278 
279  if (thread->event) {
280  event_free(thread->event);
281  }
282 
283  if (thread->evbase) {
284  event_base_free(thread->evbase);
285  }
286 
287  free(thread);
288 } /* evthr_free */
289 
290 void
291 evthr_pool_free(evthr_pool_t * pool) {
292  evthr_t * thread;
293  evthr_t * save;
294 
295  if (pool == NULL) {
296  return;
297  }
298 
299  TAILQ_FOREACH_SAFE(thread, &pool->threads, next, save) {
300  TAILQ_REMOVE(&pool->threads, thread, next);
301 
302  evthr_free(thread);
303  }
304 
305  free(pool);
306 }
307 
308 evthr_res
309 evthr_pool_stop(evthr_pool_t * pool) {
310  evthr_t * thr;
311  evthr_t * save;
312 
313  if (pool == NULL) {
314  return EVTHR_RES_FATAL;
315  }
316 
317  TAILQ_FOREACH_SAFE(thr, &pool->threads, next, save) {
318  evthr_stop(thr);
319  }
320 
321  return EVTHR_RES_OK;
322 }
323 
324 evthr_res
325 evthr_pool_defer(evthr_pool_t * pool, evthr_cb cb, void * arg) {
326 #ifdef EVTHR_SHARED_PIPE
327  evthr_cmd_t cmd = {
328  .cb = cb,
329  .args = arg,
330  .stop = 0
331  };
332 
333  if (evhtp_unlikely(send(pool->wdr, &cmd, sizeof(cmd), 0) == -1)) {
334  return EVTHR_RES_RETRY;
335  }
336 
337  return EVTHR_RES_OK;
338 #else
339  evthr_t * thr = NULL;
340 
341  if (pool == NULL) {
342  return EVTHR_RES_FATAL;
343  }
344 
345  if (cb == NULL) {
346  return EVTHR_RES_NOCB;
347  }
348 
349  thr = TAILQ_FIRST(&pool->threads);
350 
351  TAILQ_REMOVE(&pool->threads, thr, next);
352  TAILQ_INSERT_TAIL(&pool->threads, thr, next);
353 
354 
355  return evthr_defer(thr, cb, arg);
356 #endif
357 } /* evthr_pool_defer */
358 
359 static evthr_pool_t *
360 _evthr_pool_new(int nthreads,
361  evthr_init_cb init_cb,
362  evthr_exit_cb exit_cb,
363  void * shared) {
364  evthr_pool_t * pool;
365  int i;
366 
367 #ifdef EVTHR_SHARED_PIPE
368  int fds[2];
369 #endif
370 
371  if (nthreads == 0) {
372  return NULL;
373  }
374 
375  if (!(pool = calloc(sizeof(evthr_pool_t), 1))) {
376  return NULL;
377  }
378 
379  pool->nthreads = nthreads;
380  TAILQ_INIT(&pool->threads);
381 
382 #ifdef EVTHR_SHARED_PIPE
383  if (evutil_socketpair(AF_UNIX, SOCK_DGRAM, 0, fds) == -1) {
384  return NULL;
385  }
386 
387  evutil_make_socket_nonblocking(fds[0]);
388  evutil_make_socket_nonblocking(fds[1]);
389 
390  pool->rdr = fds[0];
391  pool->wdr = fds[1];
392 #endif
393 
394  for (i = 0; i < nthreads; i++) {
395  evthr_t * thread;
396 
397  if (!(thread = evthr_wexit_new(init_cb, exit_cb, shared))) {
398  evthr_pool_free(pool);
399  return NULL;
400  }
401 
402 #ifdef EVTHR_SHARED_PIPE
403  thread->pool_rdr = fds[0];
404 #endif
405 
406  TAILQ_INSERT_TAIL(&pool->threads, thread, next);
407  }
408 
409  return pool;
410 } /* _evthr_pool_new */
411 
412 evthr_pool_t *
413 evthr_pool_new(int nthreads, evthr_init_cb init_cb, void * shared) {
414  return _evthr_pool_new(nthreads, init_cb, NULL, shared);
415 }
416 
417 evthr_pool_t *
418 evthr_pool_wexit_new(int nthreads,
419  evthr_init_cb init_cb,
420  evthr_exit_cb exit_cb, void * shared) {
421  return _evthr_pool_new(nthreads, init_cb, exit_cb, shared);
422 }
423 
424 int
425 evthr_pool_start(evthr_pool_t * pool) {
426  evthr_t * evthr = NULL;
427 
428  if (pool == NULL) {
429  return -1;
430  }
431 
432  TAILQ_FOREACH(evthr, &pool->threads, next) {
433  if (evthr_start(evthr) < 0) {
434  return -1;
435  }
436 
437  usleep(5000);
438  }
439 
440  return 0;
441 }
int evthr_set_initcb(evthr_t *thr, evthr_init_cb cb)
Definition: thread.c:185
static void _evthr_read_cmd(evutil_socket_t sock, short which, void *args)
Definition: thread.c:63
int evthr_pool_start(evthr_pool_t *pool)
Definition: thread.c:425
evthr_pool_t * evthr_pool_wexit_new(int nthreads, evthr_init_cb init_cb, evthr_exit_cb exit_cb, void *shared)
Definition: thread.c:418
int evthr_set_exitcb(evthr_t *thr, evthr_exit_cb cb)
Definition: thread.c:196
int rdr
Definition: thread.c:40
pthread_mutex_t lock
Definition: thread.c:45
evthr_t * evthr_wexit_new(evthr_init_cb init_cb, evthr_exit_cb exit_cb, void *args)
Definition: thread.c:244
void * args
Definition: thread.c:24
int wdr
Definition: thread.c:41
void * aux
Definition: thread.c:50
struct evthr_pool __attribute__
static evthr_pool_t * _evthr_pool_new(int nthreads, evthr_init_cb init_cb, evthr_exit_cb exit_cb, void *shared)
Definition: thread.c:360
evbase_t * evthr_get_base(evthr_t *thr)
Definition: thread.c:168
pthread_t * thr
Definition: thread.c:46
void * arg
Definition: thread.c:49
void evthr_set_aux(evthr_t *thr, void *aux)
Definition: thread.c:173
char err
Definition: thread.c:42
evthr_cb cb
Definition: thread.c:25
evthr_res evthr_pool_stop(evthr_pool_t *pool)
Definition: thread.c:309
evthr_res evthr_stop(evthr_t *thread)
Definition: thread.c:152
void evthr_pool_free(evthr_pool_t *pool)
Definition: thread.c:291
int evthr_start(evthr_t *thread)
Definition: thread.c:249
evthr_init_cb exit_cb
Definition: thread.c:48
#define TAILQ_FOREACH_SAFE(var, head, field, tvar)
Definition: internal.h:22
evthr_res evthr_pool_defer(evthr_pool_t *pool, evthr_cb cb, void *arg)
Definition: thread.c:325
ev_t * event
Definition: thread.c:43
static void * _evthr_loop(void *args)
Definition: thread.c:90
Definition: thread.c:39
#define _evthr_read(thr, cmd, sock)
Definition: thread.c:59
struct evthr_pool_slist evthr_pool_slist_t
Definition: thread.c:20
uint8_t stop
Definition: thread.c:23
evthr_init_cb init_cb
Definition: thread.c:47
evthr_pool_slist_t threads
Definition: thread.c:36
evthr_t * evthr_new(evthr_init_cb init_cb, void *args)
Definition: thread.c:239
void * evthr_get_aux(evthr_t *thr)
Definition: thread.c:180
void evthr_free(evthr_t *thread)
Definition: thread.c:262
evthr_pool_t * evthr_pool_new(int nthreads, evthr_init_cb init_cb, void *shared)
Definition: thread.c:413
evbase_t * evbase
Definition: thread.c:44
static evthr_t * _evthr_new(evthr_init_cb init_cb, evthr_exit_cb exit_cb, void *args)
Definition: thread.c:207
evthr_res evthr_defer(evthr_t *thread, evthr_cb cb, void *arg)
Definition: thread.c:137
int nthreads
Definition: thread.c:35
#define evhtp_unlikely(x)
Definition: internal.h:18
TAILQ_HEAD(evthr_pool_slist, evthr)
#define evhtp_likely(x)
Definition: internal.h:17