xref: /unit/src/nxt_event_engine.c (revision 223:bf98efe2c55c)
1 
2 /*
3  * Copyright (C) Igor Sysoev
4  * Copyright (C) NGINX, Inc.
5  */
6 
7 #include <nxt_main.h>
8 
9 
10 static nxt_int_t nxt_event_engine_post_init(nxt_event_engine_t *engine);
11 static nxt_int_t nxt_event_engine_signal_pipe_create(
12     nxt_event_engine_t *engine);
13 static void nxt_event_engine_signal_pipe_close(nxt_task_t *task, void *obj,
14     void *data);
15 static void nxt_event_engine_signal_pipe(nxt_task_t *task, void *obj,
16     void *data);
17 static void nxt_event_engine_post_handler(nxt_task_t *task, void *obj,
18     void *data);
19 static void nxt_event_engine_signal_pipe_error(nxt_task_t *task, void *obj,
20     void *data);
21 static void nxt_event_engine_signal_handler(nxt_task_t *task, void *obj,
22     void *data);
23 static nxt_work_handler_t nxt_event_engine_queue_pop(nxt_event_engine_t *engine,
24     nxt_task_t **task, void **obj, void **data);
25 
26 
27 nxt_event_engine_t *
28 nxt_event_engine_create(nxt_task_t *task,
29     const nxt_event_interface_t *interface, const nxt_sig_event_t *signals,
30     nxt_uint_t flags, nxt_uint_t batch)
31 {
32     nxt_uint_t          events;
33     nxt_thread_t        *thread;
34     nxt_event_engine_t  *engine;
35 
36     engine = nxt_zalloc(sizeof(nxt_event_engine_t));
37     if (engine == NULL) {
38         return NULL;
39     }
40 
41     nxt_debug(task, "create engine %p", engine);
42 
43     thread = task->thread;
44 
45     engine->task.thread = thread;
46     engine->task.log = thread->log;
47     engine->task.ident = nxt_task_next_ident();
48 
49     engine->batch = batch;
50 
51     if (flags & NXT_ENGINE_FIBERS) {
52         engine->fibers = nxt_fiber_main_create(engine);
53         if (engine->fibers == NULL) {
54             goto fibers_fail;
55         }
56     }
57 
58     engine->current_work_queue = &engine->fast_work_queue;
59 
60     nxt_work_queue_cache_create(&engine->work_queue_cache, 0);
61 
62     engine->fast_work_queue.cache = &engine->work_queue_cache;
63     engine->accept_work_queue.cache = &engine->work_queue_cache;
64     engine->read_work_queue.cache = &engine->work_queue_cache;
65     engine->socket_work_queue.cache = &engine->work_queue_cache;
66     engine->connect_work_queue.cache = &engine->work_queue_cache;
67     engine->write_work_queue.cache = &engine->work_queue_cache;
68     engine->shutdown_work_queue.cache = &engine->work_queue_cache;
69     engine->close_work_queue.cache = &engine->work_queue_cache;
70 
71     nxt_work_queue_name(&engine->fast_work_queue, "fast");
72     nxt_work_queue_name(&engine->accept_work_queue, "accept");
73     nxt_work_queue_name(&engine->read_work_queue, "read");
74     nxt_work_queue_name(&engine->socket_work_queue, "socket");
75     nxt_work_queue_name(&engine->connect_work_queue, "connect");
76     nxt_work_queue_name(&engine->write_work_queue, "write");
77     nxt_work_queue_name(&engine->shutdown_work_queue, "shutdown");
78     nxt_work_queue_name(&engine->close_work_queue, "close");
79 
80     if (signals != NULL) {
81         engine->signals = nxt_event_engine_signals(signals);
82         if (engine->signals == NULL) {
83             goto signals_fail;
84         }
85 
86         engine->signals->handler = nxt_event_engine_signal_handler;
87 
88         if (!interface->signal_support) {
89             if (nxt_event_engine_signals_start(engine) != NXT_OK) {
90                 goto signals_fail;
91             }
92         }
93     }
94 
95     /*
96      * Number of event set and timers changes should be at least twice
97      * more than number of events to avoid premature flushes of the changes.
98      * Fourfold is for sure.
99      */
100     events = (batch != 0) ? batch : 32;
101 
102     if (interface->create(engine, 4 * events, events) != NXT_OK) {
103         goto event_set_fail;
104     }
105 
106     engine->event = *interface;
107 
108     if (nxt_event_engine_post_init(engine) != NXT_OK) {
109         goto post_fail;
110     }
111 
112     if (nxt_timers_init(&engine->timers, 4 * events) != NXT_OK) {
113         goto timers_fail;
114     }
115 
116     thread = task->thread;
117 
118     nxt_thread_time_update(thread);
119     engine->timers.now = nxt_thread_monotonic_time(thread) / 1000000;
120 
121     engine->max_connections = 0xffffffff;
122 
123     nxt_queue_init(&engine->joints);
124     nxt_queue_init(&engine->listen_connections);
125     nxt_queue_init(&engine->idle_connections);
126 
127     return engine;
128 
129 timers_fail:
130 post_fail:
131 
132     interface->free(engine);
133 
134 event_set_fail:
135 signals_fail:
136 
137     nxt_free(engine->signals);
138     nxt_work_queue_cache_destroy(&engine->work_queue_cache);
139     nxt_free(engine->fibers);
140 
141 fibers_fail:
142 
143     nxt_free(engine);
144     return NULL;
145 }
146 
147 
148 static nxt_int_t
149 nxt_event_engine_post_init(nxt_event_engine_t *engine)
150 {
151     if (engine->event.enable_post != NULL) {
152         return engine->event.enable_post(engine, nxt_event_engine_post_handler);
153     }
154 
155     if (nxt_event_engine_signal_pipe_create(engine) != NXT_OK) {
156         return NXT_ERROR;
157     }
158 
159     return NXT_OK;
160 }
161 
162 
163 static nxt_int_t
164 nxt_event_engine_signal_pipe_create(nxt_event_engine_t *engine)
165 {
166     nxt_event_engine_pipe_t  *pipe;
167 
168     pipe = nxt_zalloc(sizeof(nxt_event_engine_pipe_t));
169     if (pipe == NULL) {
170         return NXT_ERROR;
171     }
172 
173     engine->pipe = pipe;
174 
175     /*
176      * An event engine pipe is in blocking mode for writer
177      * and in non-blocking node for reader.
178      */
179 
180     if (nxt_pipe_create(&engine->task, pipe->fds, 1, 0) != NXT_OK) {
181         nxt_free(pipe);
182         return NXT_ERROR;
183     }
184 
185     pipe->event.fd = pipe->fds[0];
186     pipe->event.task = &engine->task;
187     pipe->event.read_work_queue = &engine->fast_work_queue;
188     pipe->event.read_handler = nxt_event_engine_signal_pipe;
189     pipe->event.write_work_queue = &engine->fast_work_queue;
190     pipe->event.error_handler = nxt_event_engine_signal_pipe_error;
191     pipe->event.log = engine->task.log;
192 
193     nxt_fd_event_enable_read(engine, &pipe->event);
194 
195     return NXT_OK;
196 }
197 
198 
199 static void
200 nxt_event_engine_signal_pipe_free(nxt_event_engine_t *engine)
201 {
202     nxt_event_engine_pipe_t  *pipe;
203 
204     pipe = engine->pipe;
205 
206     if (pipe != NULL) {
207 
208         if (pipe->event.read_work_queue != NULL) {
209             nxt_fd_event_close(engine, &pipe->event);
210             nxt_pipe_close(pipe->event.task, pipe->fds);
211         }
212 
213         nxt_free(pipe);
214     }
215 }
216 
217 
218 static void
219 nxt_event_engine_signal_pipe_close(nxt_task_t *task, void *obj, void *data)
220 {
221     nxt_event_engine_pipe_t  *pipe;
222 
223     pipe = obj;
224 
225     nxt_pipe_close(pipe->event.task, pipe->fds);
226     nxt_free(pipe);
227 }
228 
229 
230 void
231 nxt_event_engine_post(nxt_event_engine_t *engine, nxt_work_t *work)
232 {
233     nxt_debug(&engine->task, "event engine post");
234 
235     nxt_locked_work_queue_add(&engine->locked_work_queue, work);
236 
237     nxt_event_engine_signal(engine, 0);
238 }
239 
240 
241 void
242 nxt_event_engine_signal(nxt_event_engine_t *engine, nxt_uint_t signo)
243 {
244     u_char  buf;
245 
246     nxt_debug(&engine->task, "event engine signal:%ui", signo);
247 
248     /*
249      * A signal number may be sent in a signal context, so the signal
250      * information cannot be passed via a locked work queue.
251      */
252 
253     if (engine->event.signal != NULL) {
254         engine->event.signal(engine, signo);
255         return;
256     }
257 
258     buf = (u_char) signo;
259     (void) nxt_fd_write(engine->pipe->fds[1], &buf, 1);
260 }
261 
262 
263 static void
264 nxt_event_engine_signal_pipe(nxt_task_t *task, void *obj, void *data)
265 {
266     int             i, n;
267     u_char          signo;
268     nxt_bool_t      post;
269     nxt_fd_event_t  *ev;
270     u_char          buf[128];
271 
272     ev = obj;
273 
274     nxt_debug(task, "engine signal pipe");
275 
276     post = 0;
277 
278     do {
279         n = nxt_fd_read(ev->fd, buf, sizeof(buf));
280 
281         for (i = 0; i < n; i++) {
282             signo = buf[i];
283 
284             nxt_debug(task, "engine pipe signo:%d", signo);
285 
286             if (signo == 0) {
287                 /* A post should be processed only once. */
288                 post = 1;
289 
290             } else {
291                 nxt_event_engine_signal_handler(task,
292                                              (void *) (uintptr_t) signo, NULL);
293             }
294         }
295 
296     } while (n == sizeof(buf));
297 
298     if (post) {
299         nxt_event_engine_post_handler(task, NULL, NULL);
300     }
301 }
302 
303 
304 static void
305 nxt_event_engine_post_handler(nxt_task_t *task, void *obj, void *data)
306 {
307     nxt_thread_t        *thread;
308     nxt_event_engine_t  *engine;
309 
310     thread = task->thread;
311     engine = thread->engine;
312 
313     nxt_locked_work_queue_move(thread, &engine->locked_work_queue,
314                                &engine->fast_work_queue);
315 }
316 
317 
318 static void
319 nxt_event_engine_signal_pipe_error(nxt_task_t *task, void *obj, void *data)
320 {
321     nxt_event_engine_t       *engine;
322     nxt_event_engine_pipe_t  *pipe;
323 
324     engine = task->thread->engine;
325     pipe = engine->pipe;
326 
327     nxt_log(task, NXT_LOG_CRIT, "engine pipe(%FD:%FD) event error",
328             pipe->fds[0], pipe->fds[1]);
329 
330     nxt_fd_event_close(engine, &pipe->event);
331     nxt_pipe_close(pipe->event.task, pipe->fds);
332 }
333 
334 
335 static void
336 nxt_event_engine_signal_handler(nxt_task_t *task, void *obj, void *data)
337 {
338     uintptr_t              signo;
339     const nxt_sig_event_t  *sigev;
340 
341     signo = (uintptr_t) obj;
342 
343     for (sigev = task->thread->engine->signals->sigev;
344          sigev->signo != 0;
345          sigev++)
346     {
347         if (signo == (nxt_uint_t) sigev->signo) {
348             sigev->handler(task, (void *) signo, (void *) sigev->name);
349             return;
350         }
351     }
352 
353     nxt_log(task, NXT_LOG_CRIT, "signal %ui handler not found", signo);
354 }
355 
356 
357 nxt_int_t
358 nxt_event_engine_change(nxt_event_engine_t *engine,
359     const nxt_event_interface_t *interface, nxt_uint_t batch)
360 {
361     nxt_uint_t  events;
362 
363     engine->batch = batch;
364 
365     if (!engine->event.signal_support && interface->signal_support) {
366         /*
367          * Block signal processing if the current event
368          * facility does not support signal processing.
369          */
370         nxt_event_engine_signals_stop(engine);
371 
372         /*
373          * Add to engine fast work queue the signal events possibly
374          * received before the blocking signal processing.
375          */
376         nxt_event_engine_signal_pipe(&engine->task, &engine->pipe->event, NULL);
377     }
378 
379     if (engine->pipe != NULL && interface->enable_post != NULL) {
380         /*
381          * An engine pipe must be closed after all signal events
382          * added above to engine fast work queue will be processed.
383          */
384         nxt_work_queue_add(&engine->fast_work_queue,
385                            nxt_event_engine_signal_pipe_close,
386                            &engine->task, engine->pipe, NULL);
387 
388         engine->pipe = NULL;
389     }
390 
391     engine->event.free(engine);
392 
393     events = (batch != 0) ? batch : 32;
394 
395     if (interface->create(engine, 4 * events, events) != NXT_OK) {
396         return NXT_ERROR;
397     }
398 
399     engine->event = *interface;
400 
401     if (nxt_event_engine_post_init(engine) != NXT_OK) {
402         return NXT_ERROR;
403     }
404 
405     if (engine->signals != NULL) {
406 
407         if (!engine->event.signal_support) {
408             return nxt_event_engine_signals_start(engine);
409         }
410 
411         /*
412          * Reset the PID flag to start the signal thread if
413          * some future event facility will not support signals.
414          */
415         engine->signals->process = 0;
416     }
417 
418     return NXT_OK;
419 }
420 
421 
422 void
423 nxt_event_engine_free(nxt_event_engine_t *engine)
424 {
425     nxt_thread_log_debug("free engine %p", engine);
426 
427     nxt_event_engine_signal_pipe_free(engine);
428     nxt_free(engine->signals);
429 
430     nxt_work_queue_cache_destroy(&engine->work_queue_cache);
431 
432     engine->event.free(engine);
433 
434     /* TODO: free timers */
435 
436     nxt_free(engine);
437 }
438 
439 
440 static nxt_work_handler_t
441 nxt_event_engine_queue_pop(nxt_event_engine_t *engine, nxt_task_t **task,
442     void **obj, void **data)
443 {
444     nxt_work_queue_t  *wq, *last;
445 
446     wq = engine->current_work_queue;
447     last = wq;
448 
449     if (wq->head == NULL) {
450         wq = &engine->fast_work_queue;
451 
452         if (wq->head == NULL) {
453 
454             do {
455                 engine->current_work_queue++;
456                 wq = engine->current_work_queue;
457 
458                 if (wq > &engine->close_work_queue) {
459                     wq = &engine->fast_work_queue;
460                     engine->current_work_queue = wq;
461                 }
462 
463                 if (wq->head != NULL) {
464                     goto found;
465                 }
466 
467             } while (wq != last);
468 
469             engine->current_work_queue = &engine->fast_work_queue;
470 
471             return NULL;
472         }
473     }
474 
475 found:
476 
477     nxt_debug(&engine->task, "work queue: %s", wq->name);
478 
479     return nxt_work_queue_pop(wq, task, obj, data);
480 }
481 
482 
483 void
484 nxt_event_engine_start(nxt_event_engine_t *engine)
485 {
486     void                *obj, *data;
487     nxt_task_t          *task;
488     nxt_msec_t          timeout, now;
489     nxt_thread_t        *thr;
490     nxt_work_handler_t  handler;
491 
492     thr = nxt_thread();
493 
494     if (engine->fibers) {
495         /*
496          * _setjmp() cannot be wrapped in a function since return from
497          * the function clobbers stack used by future _setjmp() returns.
498          */
499         _setjmp(engine->fibers->fiber.jmp);
500 
501         /* A return point from fibers. */
502     }
503 
504     thr->log = engine->task.log;
505 
506     for ( ;; ) {
507 
508         for ( ;; ) {
509             handler = nxt_event_engine_queue_pop(engine, &task, &obj, &data);
510 
511             if (handler == NULL) {
512                 break;
513             }
514 
515             thr->task = task;
516 
517             handler(task, obj, data);
518         }
519 
520         /* Attach some event engine work queues in preferred order. */
521 
522         timeout = nxt_timer_find(engine);
523 
524         engine->event.poll(engine, timeout);
525 
526         now = nxt_thread_monotonic_time(thr) / 1000000;
527 
528         nxt_timer_expire(engine, now);
529     }
530 }
531 
532 
533 static nxt_int_t
534 nxt_req_conn_test(nxt_lvlhsh_query_t *lhq, void *data)
535 {
536     return NXT_OK;
537 }
538 
539 static const nxt_lvlhsh_proto_t  lvlhsh_req_conn_proto  nxt_aligned(64) = {
540     NXT_LVLHSH_DEFAULT,
541     nxt_req_conn_test,
542     nxt_lvlhsh_alloc,
543     nxt_lvlhsh_free,
544 };
545 
546 
547 void
548 nxt_event_engine_request_add(nxt_event_engine_t *engine,
549     nxt_req_conn_link_t *rc)
550 {
551     nxt_lvlhsh_query_t  lhq;
552 
553     lhq.key_hash = nxt_murmur_hash2(&rc->req_id, sizeof(rc->req_id));
554     lhq.key.length = sizeof(rc->req_id);
555     lhq.key.start = (u_char *) &rc->req_id;
556     lhq.proto = &lvlhsh_req_conn_proto;
557     lhq.replace = 0;
558     lhq.value = rc;
559     lhq.pool = engine->mem_pool;
560 
561     switch (nxt_lvlhsh_insert(&engine->requests, &lhq)) {
562 
563     case NXT_OK:
564         break;
565 
566     default:
567         nxt_thread_log_error(NXT_LOG_WARN, "req %08uxD to conn add failed",
568                              rc->req_id);
569         break;
570     }
571 }
572 
573 
574 nxt_req_conn_link_t *
575 nxt_event_engine_request_find(nxt_event_engine_t *engine, nxt_req_id_t req_id)
576 {
577     nxt_lvlhsh_query_t  lhq;
578 
579     lhq.key_hash = nxt_murmur_hash2(&req_id, sizeof(req_id));
580     lhq.key.length = sizeof(req_id);
581     lhq.key.start = (u_char *) &req_id;
582     lhq.proto = &lvlhsh_req_conn_proto;
583 
584     if (nxt_lvlhsh_find(&engine->requests, &lhq) == NXT_OK) {
585         return lhq.value;
586     }
587 
588     return NULL;
589 }
590 
591 
592 void
593 nxt_event_engine_request_remove(nxt_event_engine_t *engine,
594     nxt_req_conn_link_t *rc)
595 {
596     nxt_lvlhsh_query_t  lhq;
597 
598     lhq.key_hash = nxt_murmur_hash2(&rc->req_id, sizeof(rc->req_id));
599     lhq.key.length = sizeof(rc->req_id);
600     lhq.key.start = (u_char *) &rc->req_id;
601     lhq.proto = &lvlhsh_req_conn_proto;
602     lhq.pool = engine->mem_pool;
603 
604     switch (nxt_lvlhsh_delete(&engine->requests, &lhq)) {
605 
606     case NXT_OK:
607         break;
608 
609     default:
610         nxt_thread_log_error(NXT_LOG_WARN, "req %08uxD to conn remove failed",
611                              rc->req_id);
612         break;
613     }
614 }
615 
616 
617 nxt_req_conn_link_t *
618 nxt_event_engine_request_find_remove(nxt_event_engine_t *engine,
619     nxt_req_id_t req_id)
620 {
621     nxt_lvlhsh_query_t  lhq;
622 
623     lhq.key_hash = nxt_murmur_hash2(&req_id, sizeof(req_id));
624     lhq.key.length = sizeof(req_id);
625     lhq.key.start = (u_char *) &req_id;
626     lhq.proto = &lvlhsh_req_conn_proto;
627     lhq.pool = engine->mem_pool;
628 
629     switch (nxt_lvlhsh_delete(&engine->requests, &lhq)) {
630 
631     case NXT_OK:
632         return lhq.value;
633 
634     default:
635         nxt_thread_log_error(NXT_LOG_WARN, "req %08uxD to conn remove failed",
636                              req_id);
637         break;
638     }
639 
640     return NULL;
641 }
642 
643 
644 #if (NXT_DEBUG)
645 
646 void nxt_event_engine_thread_adopt(nxt_event_engine_t *engine)
647 {
648     nxt_work_queue_thread_adopt(&engine->fast_work_queue);
649     nxt_work_queue_thread_adopt(&engine->accept_work_queue);
650     nxt_work_queue_thread_adopt(&engine->read_work_queue);
651     nxt_work_queue_thread_adopt(&engine->socket_work_queue);
652     nxt_work_queue_thread_adopt(&engine->connect_work_queue);
653     nxt_work_queue_thread_adopt(&engine->write_work_queue);
654     nxt_work_queue_thread_adopt(&engine->shutdown_work_queue);
655     nxt_work_queue_thread_adopt(&engine->close_work_queue);
656 }
657 
658 #endif
659