xref: /unit/src/nxt_event_engine.c (revision 88:c6879c7b5bdf)
1 
2 /*
3  * Copyright (C) Igor Sysoev
4  * Copyright (C) NGINX, Inc.
5  */
6 
7 #include <nxt_main.h>
8 
9 
10 static nxt_int_t nxt_event_engine_post_init(nxt_event_engine_t *engine);
11 static nxt_int_t nxt_event_engine_signal_pipe_create(
12     nxt_event_engine_t *engine);
13 static void nxt_event_engine_signal_pipe_close(nxt_task_t *task, void *obj,
14     void *data);
15 static void nxt_event_engine_signal_pipe(nxt_task_t *task, void *obj,
16     void *data);
17 static void nxt_event_engine_post_handler(nxt_task_t *task, void *obj,
18     void *data);
19 static void nxt_event_engine_signal_pipe_error(nxt_task_t *task, void *obj,
20     void *data);
21 static void nxt_event_engine_signal_handler(nxt_task_t *task, void *obj,
22     void *data);
23 static nxt_work_handler_t nxt_event_engine_queue_pop(nxt_event_engine_t *engine,
24     nxt_task_t **task, void **obj, void **data);
25 
26 
27 nxt_event_engine_t *
28 nxt_event_engine_create(nxt_task_t *task,
29     const nxt_event_interface_t *interface, const nxt_sig_event_t *signals,
30     nxt_uint_t flags, nxt_uint_t batch)
31 {
32     nxt_uint_t          events;
33     nxt_thread_t        *thread;
34     nxt_event_engine_t  *engine;
35 
36     engine = nxt_zalloc(sizeof(nxt_event_engine_t));
37     if (engine == NULL) {
38         return NULL;
39     }
40 
41     thread = task->thread;
42 
43     engine->task.thread = thread;
44     engine->task.log = thread->log;
45     engine->task.ident = nxt_task_next_ident();
46 
47     engine->batch = batch;
48 
49     if (flags & NXT_ENGINE_FIBERS) {
50         engine->fibers = nxt_fiber_main_create(engine);
51         if (engine->fibers == NULL) {
52             goto fibers_fail;
53         }
54     }
55 
56     engine->current_work_queue = &engine->fast_work_queue;
57 
58     nxt_work_queue_cache_create(&engine->work_queue_cache, 0);
59 
60     engine->fast_work_queue.cache = &engine->work_queue_cache;
61     engine->accept_work_queue.cache = &engine->work_queue_cache;
62     engine->read_work_queue.cache = &engine->work_queue_cache;
63     engine->socket_work_queue.cache = &engine->work_queue_cache;
64     engine->connect_work_queue.cache = &engine->work_queue_cache;
65     engine->write_work_queue.cache = &engine->work_queue_cache;
66     engine->shutdown_work_queue.cache = &engine->work_queue_cache;
67     engine->close_work_queue.cache = &engine->work_queue_cache;
68 
69     nxt_work_queue_name(&engine->fast_work_queue, "fast");
70     nxt_work_queue_name(&engine->accept_work_queue, "accept");
71     nxt_work_queue_name(&engine->read_work_queue, "read");
72     nxt_work_queue_name(&engine->socket_work_queue, "socket");
73     nxt_work_queue_name(&engine->connect_work_queue, "connect");
74     nxt_work_queue_name(&engine->write_work_queue, "write");
75     nxt_work_queue_name(&engine->shutdown_work_queue, "shutdown");
76     nxt_work_queue_name(&engine->close_work_queue, "close");
77 
78     if (signals != NULL) {
79         engine->signals = nxt_event_engine_signals(signals);
80         if (engine->signals == NULL) {
81             goto signals_fail;
82         }
83 
84         engine->signals->handler = nxt_event_engine_signal_handler;
85 
86         if (!interface->signal_support) {
87             if (nxt_event_engine_signals_start(engine) != NXT_OK) {
88                 goto signals_fail;
89             }
90         }
91     }
92 
93     /*
94      * Number of event set and timers changes should be at least twice
95      * more than number of events to avoid premature flushes of the changes.
96      * Fourfold is for sure.
97      */
98     events = (batch != 0) ? batch : 32;
99 
100     if (interface->create(engine, 4 * events, events) != NXT_OK) {
101         goto event_set_fail;
102     }
103 
104     engine->event = *interface;
105 
106     if (nxt_event_engine_post_init(engine) != NXT_OK) {
107         goto post_fail;
108     }
109 
110     if (nxt_timers_init(&engine->timers, 4 * events) != NXT_OK) {
111         goto timers_fail;
112     }
113 
114     thread = task->thread;
115 
116     nxt_thread_time_update(thread);
117     engine->timers.now = nxt_thread_monotonic_time(thread) / 1000000;
118 
119     engine->max_connections = 0xffffffff;
120 
121     nxt_queue_init(&engine->joints);
122     nxt_queue_init(&engine->listen_connections);
123     nxt_queue_init(&engine->idle_connections);
124 
125 #if !(NXT_THREADS)
126 
127     if (interface->signal_support) {
128         thread->time.signal = -1;
129     }
130 
131 #endif
132 
133     return engine;
134 
135 timers_fail:
136 post_fail:
137 
138     interface->free(engine);
139 
140 event_set_fail:
141 signals_fail:
142 
143     nxt_free(engine->signals);
144     nxt_work_queue_cache_destroy(&engine->work_queue_cache);
145     nxt_free(engine->fibers);
146 
147 fibers_fail:
148 
149     nxt_free(engine);
150     return NULL;
151 }
152 
153 
154 static nxt_int_t
155 nxt_event_engine_post_init(nxt_event_engine_t *engine)
156 {
157     if (engine->event.enable_post != NULL) {
158         return engine->event.enable_post(engine, nxt_event_engine_post_handler);
159     }
160 
161 #if !(NXT_THREADS)
162 
163     /* Only signals may are posted in single-threaded mode. */
164 
165     if (engine->event->signal_support) {
166         return NXT_OK;
167     }
168 
169 #endif
170 
171     if (nxt_event_engine_signal_pipe_create(engine) != NXT_OK) {
172         return NXT_ERROR;
173     }
174 
175     return NXT_OK;
176 }
177 
178 
179 static nxt_int_t
180 nxt_event_engine_signal_pipe_create(nxt_event_engine_t *engine)
181 {
182     nxt_event_engine_pipe_t  *pipe;
183 
184     pipe = nxt_zalloc(sizeof(nxt_event_engine_pipe_t));
185     if (pipe == NULL) {
186         return NXT_ERROR;
187     }
188 
189     engine->pipe = pipe;
190 
191     /*
192      * An event engine pipe is in blocking mode for writer
193      * and in non-blocking node for reader.
194      */
195 
196     if (nxt_pipe_create(&engine->task, pipe->fds, 1, 0) != NXT_OK) {
197         nxt_free(pipe);
198         return NXT_ERROR;
199     }
200 
201     pipe->event.fd = pipe->fds[0];
202     pipe->event.task = &engine->task;
203     pipe->event.read_work_queue = &engine->fast_work_queue;
204     pipe->event.read_handler = nxt_event_engine_signal_pipe;
205     pipe->event.write_work_queue = &engine->fast_work_queue;
206     pipe->event.error_handler = nxt_event_engine_signal_pipe_error;
207     pipe->event.log = engine->task.log;
208 
209     nxt_fd_event_enable_read(engine, &pipe->event);
210 
211     return NXT_OK;
212 }
213 
214 
215 static void
216 nxt_event_engine_signal_pipe_free(nxt_event_engine_t *engine)
217 {
218     nxt_event_engine_pipe_t  *pipe;
219 
220     pipe = engine->pipe;
221 
222     if (pipe != NULL) {
223 
224         if (pipe->event.read_work_queue != NULL) {
225             nxt_fd_event_close(engine, &pipe->event);
226             nxt_pipe_close(pipe->event.task, pipe->fds);
227         }
228 
229         nxt_free(pipe);
230     }
231 }
232 
233 
234 static void
235 nxt_event_engine_signal_pipe_close(nxt_task_t *task, void *obj, void *data)
236 {
237     nxt_event_engine_pipe_t  *pipe;
238 
239     pipe = obj;
240 
241     nxt_pipe_close(pipe->event.task, pipe->fds);
242     nxt_free(pipe);
243 }
244 
245 
246 void
247 nxt_event_engine_post(nxt_event_engine_t *engine, nxt_work_t *work)
248 {
249     nxt_debug(&engine->task, "event engine post");
250 
251     nxt_locked_work_queue_add(&engine->locked_work_queue, work);
252 
253     nxt_event_engine_signal(engine, 0);
254 }
255 
256 
257 void
258 nxt_event_engine_signal(nxt_event_engine_t *engine, nxt_uint_t signo)
259 {
260     u_char  buf;
261 
262     nxt_debug(&engine->task, "event engine signal:%ui", signo);
263 
264     /*
265      * A signal number may be sent in a signal context, so the signal
266      * information cannot be passed via a locked work queue.
267      */
268 
269     if (engine->event.signal != NULL) {
270         engine->event.signal(engine, signo);
271         return;
272     }
273 
274     buf = (u_char) signo;
275     (void) nxt_fd_write(engine->pipe->fds[1], &buf, 1);
276 }
277 
278 
279 static void
280 nxt_event_engine_signal_pipe(nxt_task_t *task, void *obj, void *data)
281 {
282     int             i, n;
283     u_char          signo;
284     nxt_bool_t      post;
285     nxt_fd_event_t  *ev;
286     u_char          buf[128];
287 
288     ev = obj;
289 
290     nxt_debug(task, "engine signal pipe");
291 
292     post = 0;
293 
294     do {
295         n = nxt_fd_read(ev->fd, buf, sizeof(buf));
296 
297         for (i = 0; i < n; i++) {
298             signo = buf[i];
299 
300             nxt_debug(task, "engine pipe signo:%d", signo);
301 
302             if (signo == 0) {
303                 /* A post should be processed only once. */
304                 post = 1;
305 
306             } else {
307                 nxt_event_engine_signal_handler(task,
308                                              (void *) (uintptr_t) signo, NULL);
309             }
310         }
311 
312     } while (n == sizeof(buf));
313 
314     if (post) {
315         nxt_event_engine_post_handler(task, NULL, NULL);
316     }
317 }
318 
319 
320 static void
321 nxt_event_engine_post_handler(nxt_task_t *task, void *obj, void *data)
322 {
323     nxt_thread_t        *thread;
324     nxt_event_engine_t  *engine;
325 
326     thread = task->thread;
327     engine = thread->engine;
328 
329     nxt_locked_work_queue_move(thread, &engine->locked_work_queue,
330                                &engine->fast_work_queue);
331 }
332 
333 
334 static void
335 nxt_event_engine_signal_pipe_error(nxt_task_t *task, void *obj, void *data)
336 {
337     nxt_event_engine_t       *engine;
338     nxt_event_engine_pipe_t  *pipe;
339 
340     engine = task->thread->engine;
341     pipe = engine->pipe;
342 
343     nxt_log(task, NXT_LOG_CRIT, "engine pipe(%FD:%FD) event error",
344             pipe->fds[0], pipe->fds[1]);
345 
346     nxt_fd_event_close(engine, &pipe->event);
347     nxt_pipe_close(pipe->event.task, pipe->fds);
348 }
349 
350 
351 static void
352 nxt_event_engine_signal_handler(nxt_task_t *task, void *obj, void *data)
353 {
354     uintptr_t              signo;
355     const nxt_sig_event_t  *sigev;
356 
357     signo = (uintptr_t) obj;
358 
359     for (sigev = task->thread->engine->signals->sigev;
360          sigev->signo != 0;
361          sigev++)
362     {
363         if (signo == (nxt_uint_t) sigev->signo) {
364             sigev->handler(task, (void *) signo, (void *) sigev->name);
365             return;
366         }
367     }
368 
369     nxt_log(task, NXT_LOG_CRIT, "signal %ui handler not found", signo);
370 }
371 
372 
373 nxt_int_t
374 nxt_event_engine_change(nxt_event_engine_t *engine,
375     const nxt_event_interface_t *interface, nxt_uint_t batch)
376 {
377     nxt_uint_t  events;
378 
379     engine->batch = batch;
380 
381     if (!engine->event.signal_support && interface->signal_support) {
382         /*
383          * Block signal processing if the current event
384          * facility does not support signal processing.
385          */
386         nxt_event_engine_signals_stop(engine);
387 
388         /*
389          * Add to engine fast work queue the signal events possibly
390          * received before the blocking signal processing.
391          */
392         nxt_event_engine_signal_pipe(&engine->task, &engine->pipe->event, NULL);
393     }
394 
395     if (engine->pipe != NULL && interface->enable_post != NULL) {
396         /*
397          * An engine pipe must be closed after all signal events
398          * added above to engine fast work queue will be processed.
399          */
400         nxt_work_queue_add(&engine->fast_work_queue,
401                            nxt_event_engine_signal_pipe_close,
402                            &engine->task, engine->pipe, NULL);
403 
404         engine->pipe = NULL;
405     }
406 
407     engine->event.free(engine);
408 
409     events = (batch != 0) ? batch : 32;
410 
411     if (interface->create(engine, 4 * events, events) != NXT_OK) {
412         return NXT_ERROR;
413     }
414 
415     engine->event = *interface;
416 
417     if (nxt_event_engine_post_init(engine) != NXT_OK) {
418         return NXT_ERROR;
419     }
420 
421     if (engine->signals != NULL) {
422 
423         if (!engine->event.signal_support) {
424             return nxt_event_engine_signals_start(engine);
425         }
426 
427 #if (NXT_THREADS)
428         /*
429          * Reset the PID flag to start the signal thread if
430          * some future event facility will not support signals.
431          */
432         engine->signals->process = 0;
433 #endif
434     }
435 
436     return NXT_OK;
437 }
438 
439 
440 void
441 nxt_event_engine_free(nxt_event_engine_t *engine)
442 {
443     nxt_event_engine_signal_pipe_free(engine);
444     nxt_free(engine->signals);
445 
446     nxt_work_queue_cache_destroy(&engine->work_queue_cache);
447 
448     engine->event.free(engine);
449 
450     /* TODO: free timers */
451 
452     nxt_free(engine);
453 }
454 
455 
456 static nxt_work_handler_t
457 nxt_event_engine_queue_pop(nxt_event_engine_t *engine, nxt_task_t **task,
458     void **obj, void **data)
459 {
460     nxt_work_queue_t  *wq, *last;
461 
462     wq = engine->current_work_queue;
463     last = wq;
464 
465     if (wq->head == NULL) {
466         wq = &engine->fast_work_queue;
467 
468         if (wq->head == NULL) {
469 
470             do {
471                 engine->current_work_queue++;
472                 wq = engine->current_work_queue;
473 
474                 if (wq > &engine->close_work_queue) {
475                     wq = &engine->fast_work_queue;
476                     engine->current_work_queue = wq;
477                 }
478 
479                 if (wq->head != NULL) {
480                     goto found;
481                 }
482 
483             } while (wq != last);
484 
485             engine->current_work_queue = &engine->fast_work_queue;
486 
487             return NULL;
488         }
489     }
490 
491 found:
492 
493     nxt_debug(&engine->task, "work queue: %s", wq->name);
494 
495     return nxt_work_queue_pop(wq, task, obj, data);
496 }
497 
498 
499 void
500 nxt_event_engine_start(nxt_event_engine_t *engine)
501 {
502     void                *obj, *data;
503     nxt_task_t          *task;
504     nxt_msec_t          timeout, now;
505     nxt_thread_t        *thr;
506     nxt_work_handler_t  handler;
507 
508     thr = nxt_thread();
509 
510     if (engine->fibers) {
511         /*
512          * _setjmp() cannot be wrapped in a function since return from
513          * the function clobbers stack used by future _setjmp() returns.
514          */
515         _setjmp(engine->fibers->fiber.jmp);
516 
517         /* A return point from fibers. */
518     }
519 
520     thr->log = engine->task.log;
521 
522     for ( ;; ) {
523 
524         for ( ;; ) {
525             handler = nxt_event_engine_queue_pop(engine, &task, &obj, &data);
526 
527             if (handler == NULL) {
528                 break;
529             }
530 
531             thr->task = task;
532 
533             handler(task, obj, data);
534         }
535 
536         /* Attach some event engine work queues in preferred order. */
537 
538         timeout = nxt_timer_find(engine);
539 
540         engine->event.poll(engine, timeout);
541 
542         now = nxt_thread_monotonic_time(thr) / 1000000;
543 
544         nxt_timer_expire(engine, now);
545     }
546 }
547 
548 
549 static nxt_int_t
550 nxt_req_conn_test(nxt_lvlhsh_query_t *lhq, void *data)
551 {
552     return NXT_OK;
553 }
554 
555 static const nxt_lvlhsh_proto_t  lvlhsh_req_conn_proto  nxt_aligned(64) = {
556     NXT_LVLHSH_DEFAULT,
557     nxt_req_conn_test,
558     nxt_lvlhsh_alloc,
559     nxt_lvlhsh_free,
560 };
561 
562 
563 void
564 nxt_event_engine_request_add(nxt_event_engine_t *engine,
565     nxt_req_conn_link_t *rc)
566 {
567     nxt_lvlhsh_query_t  lhq;
568 
569     lhq.key_hash = nxt_murmur_hash2(&rc->req_id, sizeof(rc->req_id));
570     lhq.key.length = sizeof(rc->req_id);
571     lhq.key.start = (u_char *) &rc->req_id;
572     lhq.proto = &lvlhsh_req_conn_proto;
573     lhq.replace = 0;
574     lhq.value = rc;
575     lhq.pool = engine->mem_pool;
576 
577     switch (nxt_lvlhsh_insert(&engine->requests, &lhq)) {
578 
579     case NXT_OK:
580         break;
581 
582     default:
583         nxt_thread_log_error(NXT_LOG_WARN, "req %08uxD to conn add failed",
584                              rc->req_id);
585         break;
586     }
587 }
588 
589 
590 nxt_req_conn_link_t *
591 nxt_event_engine_request_find(nxt_event_engine_t *engine, nxt_req_id_t req_id)
592 {
593     nxt_lvlhsh_query_t  lhq;
594 
595     lhq.key_hash = nxt_murmur_hash2(&req_id, sizeof(req_id));
596     lhq.key.length = sizeof(req_id);
597     lhq.key.start = (u_char *) &req_id;
598     lhq.proto = &lvlhsh_req_conn_proto;
599 
600     if (nxt_lvlhsh_find(&engine->requests, &lhq) == NXT_OK) {
601         return lhq.value;
602     }
603 
604     return NULL;
605 }
606 
607 
608 void
609 nxt_event_engine_request_remove(nxt_event_engine_t *engine,
610     nxt_req_conn_link_t *rc)
611 {
612     nxt_lvlhsh_query_t  lhq;
613 
614     lhq.key_hash = nxt_murmur_hash2(&rc->req_id, sizeof(rc->req_id));
615     lhq.key.length = sizeof(rc->req_id);
616     lhq.key.start = (u_char *) &rc->req_id;
617     lhq.proto = &lvlhsh_req_conn_proto;
618     lhq.pool = engine->mem_pool;
619 
620     switch (nxt_lvlhsh_delete(&engine->requests, &lhq)) {
621 
622     case NXT_OK:
623         break;
624 
625     default:
626         nxt_thread_log_error(NXT_LOG_WARN, "req %08uxD to conn remove failed",
627                              rc->req_id);
628         break;
629     }
630 }
631 
632 
633 nxt_req_conn_link_t *
634 nxt_event_engine_request_find_remove(nxt_event_engine_t *engine,
635     nxt_req_id_t req_id)
636 {
637     nxt_lvlhsh_query_t  lhq;
638 
639     lhq.key_hash = nxt_murmur_hash2(&req_id, sizeof(req_id));
640     lhq.key.length = sizeof(req_id);
641     lhq.key.start = (u_char *) &req_id;
642     lhq.proto = &lvlhsh_req_conn_proto;
643     lhq.pool = engine->mem_pool;
644 
645     switch (nxt_lvlhsh_delete(&engine->requests, &lhq)) {
646 
647     case NXT_OK:
648         return lhq.value;
649 
650     default:
651         nxt_thread_log_error(NXT_LOG_WARN, "req %08uxD to conn remove failed",
652                              req_id);
653         break;
654     }
655 
656     return NULL;
657 }
658 
659