xref: /unit/src/nxt_event_engine.c (revision 13:3a52b2c3d3f1)
1 
2 /*
3  * Copyright (C) Igor Sysoev
4  * Copyright (C) NGINX, Inc.
5  */
6 
7 #include <nxt_main.h>
8 
9 
10 static nxt_int_t nxt_event_engine_post_init(nxt_event_engine_t *engine);
11 static nxt_int_t nxt_event_engine_signal_pipe_create(
12     nxt_event_engine_t *engine);
13 static void nxt_event_engine_signal_pipe_close(nxt_task_t *task, void *obj,
14     void *data);
15 static void nxt_event_engine_signal_pipe(nxt_task_t *task, void *obj,
16     void *data);
17 static void nxt_event_engine_post_handler(nxt_task_t *task, void *obj,
18     void *data);
19 static void nxt_event_engine_signal_pipe_error(nxt_task_t *task, void *obj,
20     void *data);
21 static void nxt_event_engine_signal_handler(nxt_task_t *task, void *obj,
22     void *data);
23 static nxt_work_handler_t nxt_event_engine_queue_pop(nxt_event_engine_t *engine,
24     nxt_task_t **task, void **obj, void **data);
25 
26 
27 nxt_event_engine_t *
28 nxt_event_engine_create(nxt_thread_t *thr,
29     const nxt_event_interface_t *interface, const nxt_sig_event_t *signals,
30     nxt_uint_t flags, nxt_uint_t batch)
31 {
32     nxt_uint_t          events;
33     nxt_event_engine_t  *engine;
34 
35     engine = nxt_zalloc(sizeof(nxt_event_engine_t));
36     if (engine == NULL) {
37         return NULL;
38     }
39 
40     engine->task.thread = thr;
41     engine->task.log = thr->log;
42     engine->task.ident = nxt_task_next_ident();
43 
44     thr->engine = engine;
45     thr->fiber = &engine->fibers->fiber;
46 
47     engine->batch0 = batch;
48 
49     if (flags & NXT_ENGINE_FIBERS) {
50         engine->fibers = nxt_fiber_main_create(engine);
51         if (engine->fibers == NULL) {
52             goto fibers_fail;
53         }
54     }
55 
56     engine->current_work_queue = &engine->fast_work_queue;
57 
58     nxt_work_queue_cache_create(&engine->work_queue_cache, 0);
59 
60     engine->fast_work_queue.cache = &engine->work_queue_cache;
61     engine->accept_work_queue.cache = &engine->work_queue_cache;
62     engine->read_work_queue.cache = &engine->work_queue_cache;
63     engine->socket_work_queue.cache = &engine->work_queue_cache;
64     engine->connect_work_queue.cache = &engine->work_queue_cache;
65     engine->write_work_queue.cache = &engine->work_queue_cache;
66     engine->shutdown_work_queue.cache = &engine->work_queue_cache;
67     engine->close_work_queue.cache = &engine->work_queue_cache;
68 
69     nxt_work_queue_name(&engine->fast_work_queue, "fast");
70     nxt_work_queue_name(&engine->accept_work_queue, "accept");
71     nxt_work_queue_name(&engine->read_work_queue, "read");
72     nxt_work_queue_name(&engine->socket_work_queue, "socket");
73     nxt_work_queue_name(&engine->connect_work_queue, "connect");
74     nxt_work_queue_name(&engine->write_work_queue, "write");
75     nxt_work_queue_name(&engine->shutdown_work_queue, "shutdown");
76     nxt_work_queue_name(&engine->close_work_queue, "close");
77 
78     if (signals != NULL) {
79         engine->signals = nxt_event_engine_signals(signals);
80         if (engine->signals == NULL) {
81             goto signals_fail;
82         }
83 
84         engine->signals->handler = nxt_event_engine_signal_handler;
85 
86         if (!interface->signal_support) {
87             if (nxt_event_engine_signals_start(engine) != NXT_OK) {
88                 goto signals_fail;
89             }
90         }
91     }
92 
93     /*
94      * Number of event set and timers changes should be at least twice
95      * more than number of events to avoid premature flushes of the changes.
96      * Fourfold is for sure.
97      */
98     events = (batch != 0) ? batch : 32;
99 
100     if (interface->create(engine, 4 * events, events) != NXT_OK) {
101         goto event_set_fail;
102     }
103 
104     engine->event = *interface;
105 
106     if (nxt_event_engine_post_init(engine) != NXT_OK) {
107         goto post_fail;
108     }
109 
110     if (nxt_timers_init(&engine->timers, 4 * events) != NXT_OK) {
111         goto timers_fail;
112     }
113 
114     nxt_thread_time_update(thr);
115     engine->timers.now = nxt_thread_monotonic_time(thr) / 1000000;
116 
117     engine->max_connections = 0xffffffff;
118 
119     nxt_queue_init(&engine->listen_connections);
120     nxt_queue_init(&engine->idle_connections);
121 
122 #if !(NXT_THREADS)
123 
124     if (interface->signal_support) {
125         thr->time.signal = -1;
126     }
127 
128 #endif
129 
130     return engine;
131 
132 timers_fail:
133 post_fail:
134 
135     interface->free(engine);
136 
137 event_set_fail:
138 signals_fail:
139 
140     nxt_free(engine->signals);
141     nxt_work_queue_cache_destroy(&engine->work_queue_cache);
142     nxt_free(engine->fibers);
143 
144 fibers_fail:
145 
146     nxt_free(engine);
147     return NULL;
148 }
149 
150 
151 static nxt_int_t
152 nxt_event_engine_post_init(nxt_event_engine_t *engine)
153 {
154     if (engine->event.enable_post != NULL) {
155         return engine->event.enable_post(engine, nxt_event_engine_post_handler);
156     }
157 
158 #if !(NXT_THREADS)
159 
160     /* Only signals may are posted in single-threaded mode. */
161 
162     if (engine->event->signal_support) {
163         return NXT_OK;
164     }
165 
166 #endif
167 
168     if (nxt_event_engine_signal_pipe_create(engine) != NXT_OK) {
169         return NXT_ERROR;
170     }
171 
172     return NXT_OK;
173 }
174 
175 
176 static nxt_int_t
177 nxt_event_engine_signal_pipe_create(nxt_event_engine_t *engine)
178 {
179     nxt_event_engine_pipe_t  *pipe;
180 
181     pipe = nxt_zalloc(sizeof(nxt_event_engine_pipe_t));
182     if (pipe == NULL) {
183         return NXT_ERROR;
184     }
185 
186     engine->pipe = pipe;
187 
188     /*
189      * An event engine pipe is in blocking mode for writer
190      * and in non-blocking node for reader.
191      */
192 
193     if (nxt_pipe_create(&engine->task, pipe->fds, 1, 0) != NXT_OK) {
194         nxt_free(pipe);
195         return NXT_ERROR;
196     }
197 
198     pipe->event.fd = pipe->fds[0];
199     pipe->event.task = &engine->task;
200     pipe->event.read_work_queue = &engine->fast_work_queue;
201     pipe->event.read_handler = nxt_event_engine_signal_pipe;
202     pipe->event.write_work_queue = &engine->fast_work_queue;
203     pipe->event.error_handler = nxt_event_engine_signal_pipe_error;
204     pipe->event.log = engine->task.log;
205 
206     nxt_fd_event_enable_read(engine, &pipe->event);
207 
208     return NXT_OK;
209 }
210 
211 
212 static void
213 nxt_event_engine_signal_pipe_free(nxt_event_engine_t *engine)
214 {
215     nxt_event_engine_pipe_t  *pipe;
216 
217     pipe = engine->pipe;
218 
219     if (pipe != NULL) {
220 
221         if (pipe->event.read_work_queue != NULL) {
222             nxt_fd_event_close(engine, &pipe->event);
223             nxt_pipe_close(pipe->event.task, pipe->fds);
224         }
225 
226         nxt_free(pipe);
227     }
228 }
229 
230 
231 static void
232 nxt_event_engine_signal_pipe_close(nxt_task_t *task, void *obj, void *data)
233 {
234     nxt_event_engine_pipe_t  *pipe;
235 
236     pipe = obj;
237 
238     nxt_pipe_close(pipe->event.task, pipe->fds);
239     nxt_free(pipe);
240 }
241 
242 
243 void
244 nxt_event_engine_post(nxt_event_engine_t *engine, nxt_work_t *work)
245 {
246     nxt_debug(&engine->task, "event engine post");
247 
248     nxt_locked_work_queue_add(&engine->locked_work_queue, work);
249 
250     nxt_event_engine_signal(engine, 0);
251 }
252 
253 
254 void
255 nxt_event_engine_signal(nxt_event_engine_t *engine, nxt_uint_t signo)
256 {
257     u_char  buf;
258 
259     nxt_debug(&engine->task, "event engine signal:%ui", signo);
260 
261     /*
262      * A signal number may be sent in a signal context, so the signal
263      * information cannot be passed via a locked work queue.
264      */
265 
266     if (engine->event.signal != NULL) {
267         engine->event.signal(engine, signo);
268         return;
269     }
270 
271     buf = (u_char) signo;
272     (void) nxt_fd_write(engine->pipe->fds[1], &buf, 1);
273 }
274 
275 
276 static void
277 nxt_event_engine_signal_pipe(nxt_task_t *task, void *obj, void *data)
278 {
279     int             i, n;
280     u_char          signo;
281     nxt_bool_t      post;
282     nxt_fd_event_t  *ev;
283     u_char          buf[128];
284 
285     ev = obj;
286 
287     nxt_debug(task, "engine signal pipe");
288 
289     post = 0;
290 
291     do {
292         n = nxt_fd_read(ev->fd, buf, sizeof(buf));
293 
294         for (i = 0; i < n; i++) {
295             signo = buf[i];
296 
297             nxt_debug(task, "engine pipe signo:%d", signo);
298 
299             if (signo == 0) {
300                 /* A post should be processed only once. */
301                 post = 1;
302 
303             } else {
304                 nxt_event_engine_signal_handler(task,
305                                              (void *) (uintptr_t) signo, NULL);
306             }
307         }
308 
309     } while (n == sizeof(buf));
310 
311     if (post) {
312         nxt_event_engine_post_handler(task, NULL, NULL);
313     }
314 }
315 
316 
317 static void
318 nxt_event_engine_post_handler(nxt_task_t *task, void *obj, void *data)
319 {
320     nxt_thread_t        *thread;
321     nxt_event_engine_t  *engine;
322 
323     thread = task->thread;
324     engine = thread->engine;
325 
326     nxt_locked_work_queue_move(thread, &engine->locked_work_queue,
327                                &engine->fast_work_queue);
328 }
329 
330 
331 static void
332 nxt_event_engine_signal_pipe_error(nxt_task_t *task, void *obj, void *data)
333 {
334     nxt_event_engine_t       *engine;
335     nxt_event_engine_pipe_t  *pipe;
336 
337     engine = task->thread->engine;
338     pipe = engine->pipe;
339 
340     nxt_log(task, NXT_LOG_CRIT, "engine pipe(%FD:%FD) event error",
341             pipe->fds[0], pipe->fds[1]);
342 
343     nxt_fd_event_close(engine, &pipe->event);
344     nxt_pipe_close(pipe->event.task, pipe->fds);
345 }
346 
347 
348 static void
349 nxt_event_engine_signal_handler(nxt_task_t *task, void *obj, void *data)
350 {
351     uintptr_t              signo;
352     const nxt_sig_event_t  *sigev;
353 
354     signo = (uintptr_t) obj;
355 
356     for (sigev = task->thread->engine->signals->sigev;
357          sigev->signo != 0;
358          sigev++)
359     {
360         if (signo == (nxt_uint_t) sigev->signo) {
361             sigev->handler(task, (void *) signo, (void *) sigev->name);
362             return;
363         }
364     }
365 
366     nxt_log(task, NXT_LOG_CRIT, "signal %ui handler not found", signo);
367 }
368 
369 
370 nxt_int_t
371 nxt_event_engine_change(nxt_thread_t *thr, nxt_task_t *task,
372     const nxt_event_interface_t *interface, nxt_uint_t batch)
373 {
374     nxt_uint_t          events;
375     nxt_event_engine_t  *engine;
376 
377     engine = thr->engine;
378     engine->batch0 = batch;
379 
380     if (!engine->event.signal_support && interface->signal_support) {
381         /*
382          * Block signal processing if the current event
383          * facility does not support signal processing.
384          */
385         nxt_event_engine_signals_stop(engine);
386 
387         /*
388          * Add to engine fast work queue the signal events possibly
389          * received before the blocking signal processing.
390          */
391         nxt_event_engine_signal_pipe(task, &engine->pipe->event, NULL);
392     }
393 
394     if (engine->pipe != NULL && interface->enable_post != NULL) {
395         /*
396          * An engine pipe must be closed after all signal events
397          * added above to engine fast work queue will be processed.
398          */
399         nxt_work_queue_add(&engine->fast_work_queue,
400                            nxt_event_engine_signal_pipe_close,
401                            &engine->task, engine->pipe, NULL);
402 
403         engine->pipe = NULL;
404     }
405 
406     engine->event.free(engine);
407 
408     events = (batch != 0) ? batch : 32;
409 
410     if (interface->create(engine, 4 * events, events) != NXT_OK) {
411         return NXT_ERROR;
412     }
413 
414     engine->event = *interface;
415 
416     if (nxt_event_engine_post_init(engine) != NXT_OK) {
417         return NXT_ERROR;
418     }
419 
420     if (engine->signals != NULL) {
421 
422         if (!engine->event.signal_support) {
423             return nxt_event_engine_signals_start(engine);
424         }
425 
426 #if (NXT_THREADS)
427         /*
428          * Reset the PID flag to start the signal thread if
429          * some future event facility will not support signals.
430          */
431         engine->signals->process = 0;
432 #endif
433     }
434 
435     return NXT_OK;
436 }
437 
438 
439 void
440 nxt_event_engine_free(nxt_event_engine_t *engine)
441 {
442     nxt_event_engine_signal_pipe_free(engine);
443     nxt_free(engine->signals);
444 
445     nxt_work_queue_cache_destroy(&engine->work_queue_cache);
446 
447     engine->event.free(engine);
448 
449     /* TODO: free timers */
450 
451     nxt_free(engine);
452 }
453 
454 
455 static nxt_work_handler_t
456 nxt_event_engine_queue_pop(nxt_event_engine_t *engine, nxt_task_t **task,
457     void **obj, void **data)
458 {
459     nxt_work_queue_t  *wq, *last;
460 
461     wq = engine->current_work_queue;
462     last = wq;
463 
464     if (wq->head == NULL) {
465         wq = &engine->fast_work_queue;
466 
467         if (wq->head == NULL) {
468 
469             do {
470                 engine->current_work_queue++;
471                 wq = engine->current_work_queue;
472 
473                 if (wq > &engine->close_work_queue) {
474                     wq = &engine->fast_work_queue;
475                     engine->current_work_queue = wq;
476                 }
477 
478                 if (wq->head != NULL) {
479                     goto found;
480                 }
481 
482             } while (wq != last);
483 
484             engine->current_work_queue = &engine->fast_work_queue;
485 
486             return NULL;
487         }
488     }
489 
490 found:
491 
492     nxt_debug(&engine->task, "work queue: %s", wq->name);
493 
494     return nxt_work_queue_pop(wq, task, obj, data);
495 }
496 
497 
498 void
499 nxt_event_engine_start(nxt_event_engine_t *engine)
500 {
501     void                *obj, *data;
502     nxt_task_t          *task;
503     nxt_msec_t          timeout, now;
504     nxt_thread_t        *thr;
505     nxt_work_handler_t  handler;
506 
507     thr = nxt_thread();
508 
509     if (engine->fibers) {
510         /*
511          * _setjmp() cannot be wrapped in a function since return from
512          * the function clobbers stack used by future _setjmp() returns.
513          */
514         _setjmp(engine->fibers->fiber.jmp);
515 
516         /* A return point from fibers. */
517     }
518 
519     thr->log = engine->task.log;
520 
521     for ( ;; ) {
522 
523         for ( ;; ) {
524             handler = nxt_event_engine_queue_pop(engine, &task, &obj, &data);
525 
526             if (handler == NULL) {
527                 break;
528             }
529 
530             handler(task, obj, data);
531         }
532 
533         /* Attach some event engine work queues in preferred order. */
534 
535         timeout = nxt_timer_find(engine);
536 
537         engine->event.poll(engine, timeout);
538 
539         now = nxt_thread_monotonic_time(thr) / 1000000;
540 
541         nxt_timer_expire(engine, now);
542     }
543 }
544