xref: /unit/src/nxt_event_engine.c (revision 165:a1156473a8f1)
1 
2 /*
3  * Copyright (C) Igor Sysoev
4  * Copyright (C) NGINX, Inc.
5  */
6 
7 #include <nxt_main.h>
8 
9 
10 static nxt_int_t nxt_event_engine_post_init(nxt_event_engine_t *engine);
11 static nxt_int_t nxt_event_engine_signal_pipe_create(
12     nxt_event_engine_t *engine);
13 static void nxt_event_engine_signal_pipe_close(nxt_task_t *task, void *obj,
14     void *data);
15 static void nxt_event_engine_signal_pipe(nxt_task_t *task, void *obj,
16     void *data);
17 static void nxt_event_engine_post_handler(nxt_task_t *task, void *obj,
18     void *data);
19 static void nxt_event_engine_signal_pipe_error(nxt_task_t *task, void *obj,
20     void *data);
21 static void nxt_event_engine_signal_handler(nxt_task_t *task, void *obj,
22     void *data);
23 static nxt_work_handler_t nxt_event_engine_queue_pop(nxt_event_engine_t *engine,
24     nxt_task_t **task, void **obj, void **data);
25 
26 
27 nxt_event_engine_t *
28 nxt_event_engine_create(nxt_task_t *task,
29     const nxt_event_interface_t *interface, const nxt_sig_event_t *signals,
30     nxt_uint_t flags, nxt_uint_t batch)
31 {
32     nxt_uint_t          events;
33     nxt_thread_t        *thread;
34     nxt_event_engine_t  *engine;
35 
36     engine = nxt_zalloc(sizeof(nxt_event_engine_t));
37     if (engine == NULL) {
38         return NULL;
39     }
40 
41     nxt_debug(task, "create engine %p", engine);
42 
43     thread = task->thread;
44 
45     engine->task.thread = thread;
46     engine->task.log = thread->log;
47     engine->task.ident = nxt_task_next_ident();
48 
49     engine->batch = batch;
50 
51     if (flags & NXT_ENGINE_FIBERS) {
52         engine->fibers = nxt_fiber_main_create(engine);
53         if (engine->fibers == NULL) {
54             goto fibers_fail;
55         }
56     }
57 
58     engine->current_work_queue = &engine->fast_work_queue;
59 
60     nxt_work_queue_cache_create(&engine->work_queue_cache, 0);
61 
62     engine->fast_work_queue.cache = &engine->work_queue_cache;
63     engine->accept_work_queue.cache = &engine->work_queue_cache;
64     engine->read_work_queue.cache = &engine->work_queue_cache;
65     engine->socket_work_queue.cache = &engine->work_queue_cache;
66     engine->connect_work_queue.cache = &engine->work_queue_cache;
67     engine->write_work_queue.cache = &engine->work_queue_cache;
68     engine->shutdown_work_queue.cache = &engine->work_queue_cache;
69     engine->close_work_queue.cache = &engine->work_queue_cache;
70 
71     nxt_work_queue_name(&engine->fast_work_queue, "fast");
72     nxt_work_queue_name(&engine->accept_work_queue, "accept");
73     nxt_work_queue_name(&engine->read_work_queue, "read");
74     nxt_work_queue_name(&engine->socket_work_queue, "socket");
75     nxt_work_queue_name(&engine->connect_work_queue, "connect");
76     nxt_work_queue_name(&engine->write_work_queue, "write");
77     nxt_work_queue_name(&engine->shutdown_work_queue, "shutdown");
78     nxt_work_queue_name(&engine->close_work_queue, "close");
79 
80     if (signals != NULL) {
81         engine->signals = nxt_event_engine_signals(signals);
82         if (engine->signals == NULL) {
83             goto signals_fail;
84         }
85 
86         engine->signals->handler = nxt_event_engine_signal_handler;
87 
88         if (!interface->signal_support) {
89             if (nxt_event_engine_signals_start(engine) != NXT_OK) {
90                 goto signals_fail;
91             }
92         }
93     }
94 
95     /*
96      * Number of event set and timers changes should be at least twice
97      * more than number of events to avoid premature flushes of the changes.
98      * Fourfold is for sure.
99      */
100     events = (batch != 0) ? batch : 32;
101 
102     if (interface->create(engine, 4 * events, events) != NXT_OK) {
103         goto event_set_fail;
104     }
105 
106     engine->event = *interface;
107 
108     if (nxt_event_engine_post_init(engine) != NXT_OK) {
109         goto post_fail;
110     }
111 
112     if (nxt_timers_init(&engine->timers, 4 * events) != NXT_OK) {
113         goto timers_fail;
114     }
115 
116     thread = task->thread;
117 
118     nxt_thread_time_update(thread);
119     engine->timers.now = nxt_thread_monotonic_time(thread) / 1000000;
120 
121     engine->max_connections = 0xffffffff;
122 
123     nxt_queue_init(&engine->joints);
124     nxt_queue_init(&engine->listen_connections);
125     nxt_queue_init(&engine->idle_connections);
126 
127 #if !(NXT_THREADS)
128 
129     if (interface->signal_support) {
130         thread->time.signal = -1;
131     }
132 
133 #endif
134 
135     return engine;
136 
137 timers_fail:
138 post_fail:
139 
140     interface->free(engine);
141 
142 event_set_fail:
143 signals_fail:
144 
145     nxt_free(engine->signals);
146     nxt_work_queue_cache_destroy(&engine->work_queue_cache);
147     nxt_free(engine->fibers);
148 
149 fibers_fail:
150 
151     nxt_free(engine);
152     return NULL;
153 }
154 
155 
156 static nxt_int_t
157 nxt_event_engine_post_init(nxt_event_engine_t *engine)
158 {
159     if (engine->event.enable_post != NULL) {
160         return engine->event.enable_post(engine, nxt_event_engine_post_handler);
161     }
162 
163 #if !(NXT_THREADS)
164 
165     /* Only signals may are posted in single-threaded mode. */
166 
167     if (engine->event->signal_support) {
168         return NXT_OK;
169     }
170 
171 #endif
172 
173     if (nxt_event_engine_signal_pipe_create(engine) != NXT_OK) {
174         return NXT_ERROR;
175     }
176 
177     return NXT_OK;
178 }
179 
180 
181 static nxt_int_t
182 nxt_event_engine_signal_pipe_create(nxt_event_engine_t *engine)
183 {
184     nxt_event_engine_pipe_t  *pipe;
185 
186     pipe = nxt_zalloc(sizeof(nxt_event_engine_pipe_t));
187     if (pipe == NULL) {
188         return NXT_ERROR;
189     }
190 
191     engine->pipe = pipe;
192 
193     /*
194      * An event engine pipe is in blocking mode for writer
195      * and in non-blocking node for reader.
196      */
197 
198     if (nxt_pipe_create(&engine->task, pipe->fds, 1, 0) != NXT_OK) {
199         nxt_free(pipe);
200         return NXT_ERROR;
201     }
202 
203     pipe->event.fd = pipe->fds[0];
204     pipe->event.task = &engine->task;
205     pipe->event.read_work_queue = &engine->fast_work_queue;
206     pipe->event.read_handler = nxt_event_engine_signal_pipe;
207     pipe->event.write_work_queue = &engine->fast_work_queue;
208     pipe->event.error_handler = nxt_event_engine_signal_pipe_error;
209     pipe->event.log = engine->task.log;
210 
211     nxt_fd_event_enable_read(engine, &pipe->event);
212 
213     return NXT_OK;
214 }
215 
216 
217 static void
218 nxt_event_engine_signal_pipe_free(nxt_event_engine_t *engine)
219 {
220     nxt_event_engine_pipe_t  *pipe;
221 
222     pipe = engine->pipe;
223 
224     if (pipe != NULL) {
225 
226         if (pipe->event.read_work_queue != NULL) {
227             nxt_fd_event_close(engine, &pipe->event);
228             nxt_pipe_close(pipe->event.task, pipe->fds);
229         }
230 
231         nxt_free(pipe);
232     }
233 }
234 
235 
236 static void
237 nxt_event_engine_signal_pipe_close(nxt_task_t *task, void *obj, void *data)
238 {
239     nxt_event_engine_pipe_t  *pipe;
240 
241     pipe = obj;
242 
243     nxt_pipe_close(pipe->event.task, pipe->fds);
244     nxt_free(pipe);
245 }
246 
247 
248 void
249 nxt_event_engine_post(nxt_event_engine_t *engine, nxt_work_t *work)
250 {
251     nxt_debug(&engine->task, "event engine post");
252 
253     nxt_locked_work_queue_add(&engine->locked_work_queue, work);
254 
255     nxt_event_engine_signal(engine, 0);
256 }
257 
258 
259 void
260 nxt_event_engine_signal(nxt_event_engine_t *engine, nxt_uint_t signo)
261 {
262     u_char  buf;
263 
264     nxt_debug(&engine->task, "event engine signal:%ui", signo);
265 
266     /*
267      * A signal number may be sent in a signal context, so the signal
268      * information cannot be passed via a locked work queue.
269      */
270 
271     if (engine->event.signal != NULL) {
272         engine->event.signal(engine, signo);
273         return;
274     }
275 
276     buf = (u_char) signo;
277     (void) nxt_fd_write(engine->pipe->fds[1], &buf, 1);
278 }
279 
280 
281 static void
282 nxt_event_engine_signal_pipe(nxt_task_t *task, void *obj, void *data)
283 {
284     int             i, n;
285     u_char          signo;
286     nxt_bool_t      post;
287     nxt_fd_event_t  *ev;
288     u_char          buf[128];
289 
290     ev = obj;
291 
292     nxt_debug(task, "engine signal pipe");
293 
294     post = 0;
295 
296     do {
297         n = nxt_fd_read(ev->fd, buf, sizeof(buf));
298 
299         for (i = 0; i < n; i++) {
300             signo = buf[i];
301 
302             nxt_debug(task, "engine pipe signo:%d", signo);
303 
304             if (signo == 0) {
305                 /* A post should be processed only once. */
306                 post = 1;
307 
308             } else {
309                 nxt_event_engine_signal_handler(task,
310                                              (void *) (uintptr_t) signo, NULL);
311             }
312         }
313 
314     } while (n == sizeof(buf));
315 
316     if (post) {
317         nxt_event_engine_post_handler(task, NULL, NULL);
318     }
319 }
320 
321 
322 static void
323 nxt_event_engine_post_handler(nxt_task_t *task, void *obj, void *data)
324 {
325     nxt_thread_t        *thread;
326     nxt_event_engine_t  *engine;
327 
328     thread = task->thread;
329     engine = thread->engine;
330 
331     nxt_locked_work_queue_move(thread, &engine->locked_work_queue,
332                                &engine->fast_work_queue);
333 }
334 
335 
336 static void
337 nxt_event_engine_signal_pipe_error(nxt_task_t *task, void *obj, void *data)
338 {
339     nxt_event_engine_t       *engine;
340     nxt_event_engine_pipe_t  *pipe;
341 
342     engine = task->thread->engine;
343     pipe = engine->pipe;
344 
345     nxt_log(task, NXT_LOG_CRIT, "engine pipe(%FD:%FD) event error",
346             pipe->fds[0], pipe->fds[1]);
347 
348     nxt_fd_event_close(engine, &pipe->event);
349     nxt_pipe_close(pipe->event.task, pipe->fds);
350 }
351 
352 
353 static void
354 nxt_event_engine_signal_handler(nxt_task_t *task, void *obj, void *data)
355 {
356     uintptr_t              signo;
357     const nxt_sig_event_t  *sigev;
358 
359     signo = (uintptr_t) obj;
360 
361     for (sigev = task->thread->engine->signals->sigev;
362          sigev->signo != 0;
363          sigev++)
364     {
365         if (signo == (nxt_uint_t) sigev->signo) {
366             sigev->handler(task, (void *) signo, (void *) sigev->name);
367             return;
368         }
369     }
370 
371     nxt_log(task, NXT_LOG_CRIT, "signal %ui handler not found", signo);
372 }
373 
374 
375 nxt_int_t
376 nxt_event_engine_change(nxt_event_engine_t *engine,
377     const nxt_event_interface_t *interface, nxt_uint_t batch)
378 {
379     nxt_uint_t  events;
380 
381     engine->batch = batch;
382 
383     if (!engine->event.signal_support && interface->signal_support) {
384         /*
385          * Block signal processing if the current event
386          * facility does not support signal processing.
387          */
388         nxt_event_engine_signals_stop(engine);
389 
390         /*
391          * Add to engine fast work queue the signal events possibly
392          * received before the blocking signal processing.
393          */
394         nxt_event_engine_signal_pipe(&engine->task, &engine->pipe->event, NULL);
395     }
396 
397     if (engine->pipe != NULL && interface->enable_post != NULL) {
398         /*
399          * An engine pipe must be closed after all signal events
400          * added above to engine fast work queue will be processed.
401          */
402         nxt_work_queue_add(&engine->fast_work_queue,
403                            nxt_event_engine_signal_pipe_close,
404                            &engine->task, engine->pipe, NULL);
405 
406         engine->pipe = NULL;
407     }
408 
409     engine->event.free(engine);
410 
411     events = (batch != 0) ? batch : 32;
412 
413     if (interface->create(engine, 4 * events, events) != NXT_OK) {
414         return NXT_ERROR;
415     }
416 
417     engine->event = *interface;
418 
419     if (nxt_event_engine_post_init(engine) != NXT_OK) {
420         return NXT_ERROR;
421     }
422 
423     if (engine->signals != NULL) {
424 
425         if (!engine->event.signal_support) {
426             return nxt_event_engine_signals_start(engine);
427         }
428 
429 #if (NXT_THREADS)
430         /*
431          * Reset the PID flag to start the signal thread if
432          * some future event facility will not support signals.
433          */
434         engine->signals->process = 0;
435 #endif
436     }
437 
438     return NXT_OK;
439 }
440 
441 
442 void
443 nxt_event_engine_free(nxt_event_engine_t *engine)
444 {
445     nxt_thread_log_debug("free engine %p", engine);
446 
447     nxt_event_engine_signal_pipe_free(engine);
448     nxt_free(engine->signals);
449 
450     nxt_work_queue_cache_destroy(&engine->work_queue_cache);
451 
452     engine->event.free(engine);
453 
454     /* TODO: free timers */
455 
456     nxt_free(engine);
457 }
458 
459 
460 static nxt_work_handler_t
461 nxt_event_engine_queue_pop(nxt_event_engine_t *engine, nxt_task_t **task,
462     void **obj, void **data)
463 {
464     nxt_work_queue_t  *wq, *last;
465 
466     wq = engine->current_work_queue;
467     last = wq;
468 
469     if (wq->head == NULL) {
470         wq = &engine->fast_work_queue;
471 
472         if (wq->head == NULL) {
473 
474             do {
475                 engine->current_work_queue++;
476                 wq = engine->current_work_queue;
477 
478                 if (wq > &engine->close_work_queue) {
479                     wq = &engine->fast_work_queue;
480                     engine->current_work_queue = wq;
481                 }
482 
483                 if (wq->head != NULL) {
484                     goto found;
485                 }
486 
487             } while (wq != last);
488 
489             engine->current_work_queue = &engine->fast_work_queue;
490 
491             return NULL;
492         }
493     }
494 
495 found:
496 
497     nxt_debug(&engine->task, "work queue: %s", wq->name);
498 
499     return nxt_work_queue_pop(wq, task, obj, data);
500 }
501 
502 
503 void
504 nxt_event_engine_start(nxt_event_engine_t *engine)
505 {
506     void                *obj, *data;
507     nxt_task_t          *task;
508     nxt_msec_t          timeout, now;
509     nxt_thread_t        *thr;
510     nxt_work_handler_t  handler;
511 
512     thr = nxt_thread();
513 
514     if (engine->fibers) {
515         /*
516          * _setjmp() cannot be wrapped in a function since return from
517          * the function clobbers stack used by future _setjmp() returns.
518          */
519         _setjmp(engine->fibers->fiber.jmp);
520 
521         /* A return point from fibers. */
522     }
523 
524     thr->log = engine->task.log;
525 
526     for ( ;; ) {
527 
528         for ( ;; ) {
529             handler = nxt_event_engine_queue_pop(engine, &task, &obj, &data);
530 
531             if (handler == NULL) {
532                 break;
533             }
534 
535             thr->task = task;
536 
537             handler(task, obj, data);
538         }
539 
540         /* Attach some event engine work queues in preferred order. */
541 
542         timeout = nxt_timer_find(engine);
543 
544         engine->event.poll(engine, timeout);
545 
546         now = nxt_thread_monotonic_time(thr) / 1000000;
547 
548         nxt_timer_expire(engine, now);
549     }
550 }
551 
552 
553 static nxt_int_t
554 nxt_req_conn_test(nxt_lvlhsh_query_t *lhq, void *data)
555 {
556     return NXT_OK;
557 }
558 
559 static const nxt_lvlhsh_proto_t  lvlhsh_req_conn_proto  nxt_aligned(64) = {
560     NXT_LVLHSH_DEFAULT,
561     nxt_req_conn_test,
562     nxt_lvlhsh_alloc,
563     nxt_lvlhsh_free,
564 };
565 
566 
567 void
568 nxt_event_engine_request_add(nxt_event_engine_t *engine,
569     nxt_req_conn_link_t *rc)
570 {
571     nxt_lvlhsh_query_t  lhq;
572 
573     lhq.key_hash = nxt_murmur_hash2(&rc->req_id, sizeof(rc->req_id));
574     lhq.key.length = sizeof(rc->req_id);
575     lhq.key.start = (u_char *) &rc->req_id;
576     lhq.proto = &lvlhsh_req_conn_proto;
577     lhq.replace = 0;
578     lhq.value = rc;
579     lhq.pool = engine->mem_pool;
580 
581     switch (nxt_lvlhsh_insert(&engine->requests, &lhq)) {
582 
583     case NXT_OK:
584         break;
585 
586     default:
587         nxt_thread_log_error(NXT_LOG_WARN, "req %08uxD to conn add failed",
588                              rc->req_id);
589         break;
590     }
591 }
592 
593 
594 nxt_req_conn_link_t *
595 nxt_event_engine_request_find(nxt_event_engine_t *engine, nxt_req_id_t req_id)
596 {
597     nxt_lvlhsh_query_t  lhq;
598 
599     lhq.key_hash = nxt_murmur_hash2(&req_id, sizeof(req_id));
600     lhq.key.length = sizeof(req_id);
601     lhq.key.start = (u_char *) &req_id;
602     lhq.proto = &lvlhsh_req_conn_proto;
603 
604     if (nxt_lvlhsh_find(&engine->requests, &lhq) == NXT_OK) {
605         return lhq.value;
606     }
607 
608     return NULL;
609 }
610 
611 
612 void
613 nxt_event_engine_request_remove(nxt_event_engine_t *engine,
614     nxt_req_conn_link_t *rc)
615 {
616     nxt_lvlhsh_query_t  lhq;
617 
618     lhq.key_hash = nxt_murmur_hash2(&rc->req_id, sizeof(rc->req_id));
619     lhq.key.length = sizeof(rc->req_id);
620     lhq.key.start = (u_char *) &rc->req_id;
621     lhq.proto = &lvlhsh_req_conn_proto;
622     lhq.pool = engine->mem_pool;
623 
624     switch (nxt_lvlhsh_delete(&engine->requests, &lhq)) {
625 
626     case NXT_OK:
627         break;
628 
629     default:
630         nxt_thread_log_error(NXT_LOG_WARN, "req %08uxD to conn remove failed",
631                              rc->req_id);
632         break;
633     }
634 }
635 
636 
637 nxt_req_conn_link_t *
638 nxt_event_engine_request_find_remove(nxt_event_engine_t *engine,
639     nxt_req_id_t req_id)
640 {
641     nxt_lvlhsh_query_t  lhq;
642 
643     lhq.key_hash = nxt_murmur_hash2(&req_id, sizeof(req_id));
644     lhq.key.length = sizeof(req_id);
645     lhq.key.start = (u_char *) &req_id;
646     lhq.proto = &lvlhsh_req_conn_proto;
647     lhq.pool = engine->mem_pool;
648 
649     switch (nxt_lvlhsh_delete(&engine->requests, &lhq)) {
650 
651     case NXT_OK:
652         return lhq.value;
653 
654     default:
655         nxt_thread_log_error(NXT_LOG_WARN, "req %08uxD to conn remove failed",
656                              req_id);
657         break;
658     }
659 
660     return NULL;
661 }
662 
663 
664 #if (NXT_DEBUG)
665 
666 void nxt_event_engine_thread_adopt(nxt_event_engine_t *engine)
667 {
668     nxt_work_queue_thread_adopt(&engine->fast_work_queue);
669     nxt_work_queue_thread_adopt(&engine->accept_work_queue);
670     nxt_work_queue_thread_adopt(&engine->read_work_queue);
671     nxt_work_queue_thread_adopt(&engine->socket_work_queue);
672     nxt_work_queue_thread_adopt(&engine->connect_work_queue);
673     nxt_work_queue_thread_adopt(&engine->write_work_queue);
674     nxt_work_queue_thread_adopt(&engine->shutdown_work_queue);
675     nxt_work_queue_thread_adopt(&engine->close_work_queue);
676 }
677 
678 #endif
679