xref: /unit/src/nxt_event_engine.c (revision 0:a63ceefd6ab0)
1 
2 /*
3  * Copyright (C) Igor Sysoev
4  * Copyright (C) NGINX, Inc.
5  */
6 
7 #include <nxt_main.h>
8 
9 
10 static nxt_int_t nxt_event_engine_post_init(nxt_thread_t *thr,
11     nxt_event_engine_t *engine);
12 static nxt_int_t nxt_event_engine_signal_pipe_create(nxt_thread_t *thr,
13     nxt_event_engine_t *engine);
14 static void nxt_event_engine_signal_pipe_close(nxt_thread_t *thr, void *obj,
15     void *data);
16 static void nxt_event_engine_signal_pipe(nxt_thread_t *thr, void *obj,
17     void *data);
18 static void nxt_event_engine_post_handler(nxt_thread_t *thr, void *obj,
19     void *data);
20 static void nxt_event_engine_signal_pipe_error(nxt_thread_t *thr, void *obj,
21     void *data);
22 static void nxt_event_engine_signal_handler(nxt_thread_t *thr, void *obj,
23     void *data);
24 static const nxt_event_sig_t *nxt_event_engine_signal_find(nxt_thread_t *thr,
25     nxt_uint_t signo);
26 
27 
28 nxt_event_engine_t *
29 nxt_event_engine_create(nxt_thread_t *thr, const nxt_event_set_ops_t *event_set,
30     const nxt_event_sig_t *signals, nxt_uint_t flags, nxt_uint_t batch)
31 {
32     nxt_uint_t          events;
33     nxt_event_engine_t  *engine;
34 
35     engine = nxt_zalloc(sizeof(nxt_event_engine_t));
36     if (engine == NULL) {
37         return NULL;
38     }
39 
40     engine->batch = batch;
41 
42     if (flags & NXT_ENGINE_FIBERS) {
43         engine->fibers = nxt_fiber_main_create(engine);
44         if (engine->fibers == NULL) {
45             goto fibers_fail;
46         }
47     }
48 
49     nxt_thread_work_queue_create(thr, 0);
50 
51     nxt_work_queue_name(&engine->accept_work_queue, "accept");
52     nxt_work_queue_name(&engine->read_work_queue, "read");
53     nxt_work_queue_name(&engine->socket_work_queue, "socket");
54     nxt_work_queue_name(&engine->connect_work_queue, "connect");
55     nxt_work_queue_name(&engine->write_work_queue, "write");
56     nxt_work_queue_name(&engine->shutdown_work_queue, "shutdown");
57     nxt_work_queue_name(&engine->close_work_queue, "close");
58 
59 #if (NXT_THREADS)
60 
61     nxt_locked_work_queue_create(&engine->work_queue, 7);
62 
63 #endif
64 
65     if (signals != NULL) {
66         engine->signals = nxt_event_engine_signals(signals);
67         if (engine->signals == NULL) {
68             goto signals_fail;
69         }
70 
71         engine->signals->handler = nxt_event_engine_signal_handler;
72 
73         if (!event_set->signal_support) {
74             if (nxt_event_engine_signals_start(engine) != NXT_OK) {
75                 goto signals_fail;
76             }
77         }
78     }
79 
80     /*
81      * Number of event set and timers changes should be at least twice
82      * more than number of events to avoid premature flushes of the changes.
83      * Fourfold is for sure.
84      */
85     events = (batch != 0) ? batch : 32;
86 
87     engine->event_set = event_set->create(engine->signals, 4 * events, events);
88     if (engine->event_set == NULL) {
89         goto event_set_fail;
90     }
91 
92     engine->event = event_set;
93 
94     if (nxt_event_engine_post_init(thr, engine) != NXT_OK) {
95         goto post_fail;
96     }
97 
98     if (nxt_event_timers_init(&engine->timers, 4 * events) != NXT_OK) {
99         goto timers_fail;
100     }
101 
102     nxt_thread_time_update(thr);
103     engine->timers.now = nxt_thread_monotonic_time(thr) / 1000000;
104 
105     engine->max_connections = 0xffffffff;
106 
107     nxt_queue_init(&engine->listen_connections);
108     nxt_queue_init(&engine->idle_connections);
109 
110     thr->engine = engine;
111     thr->fiber = &engine->fibers->fiber;
112 
113 #if !(NXT_THREADS)
114 
115     if (engine->event->signal_support) {
116         thr->time.signal = -1;
117     }
118 
119 #endif
120 
121     return engine;
122 
123 timers_fail:
124 post_fail:
125 
126     event_set->free(engine->event_set);
127 
128 event_set_fail:
129 signals_fail:
130 
131     nxt_free(engine->signals);
132     nxt_thread_work_queue_destroy(thr);
133     nxt_free(engine->fibers);
134 
135 fibers_fail:
136 
137     nxt_free(engine);
138     return NULL;
139 }
140 
141 
142 static nxt_int_t
143 nxt_event_engine_post_init(nxt_thread_t *thr, nxt_event_engine_t *engine)
144 {
145     if (engine->event->enable_post != NULL) {
146         return engine->event->enable_post(engine->event_set,
147                                           nxt_event_engine_post_handler);
148     }
149 
150 #if !(NXT_THREADS)
151 
152     /* Only signals may are posted in single-threaded mode. */
153 
154     if (engine->event->signal_support) {
155         return NXT_OK;
156     }
157 
158 #endif
159 
160     if (nxt_event_engine_signal_pipe_create(thr, engine) != NXT_OK) {
161         return NXT_ERROR;
162     }
163 
164     return NXT_OK;
165 }
166 
167 
168 static nxt_int_t
169 nxt_event_engine_signal_pipe_create(nxt_thread_t *thr,
170     nxt_event_engine_t *engine)
171 {
172     nxt_event_engine_pipe_t  *pipe;
173 
174     pipe = nxt_zalloc(sizeof(nxt_event_engine_pipe_t));
175     if (pipe == NULL) {
176         return NXT_ERROR;
177     }
178 
179     engine->pipe = pipe;
180 
181     /*
182      * An event engine pipe is in blocking mode for writer
183      * and in non-blocking node for reader.
184      */
185 
186     if (nxt_pipe_create(pipe->fds, 1, 0) != NXT_OK) {
187         nxt_free(pipe);
188         return NXT_ERROR;
189     }
190 
191     pipe->event.fd = pipe->fds[0];
192     pipe->event.read_work_queue = &thr->work_queue.main;
193     pipe->event.read_handler = nxt_event_engine_signal_pipe;
194     pipe->event.write_work_queue = &thr->work_queue.main;
195     pipe->event.error_handler = nxt_event_engine_signal_pipe_error;
196     pipe->event.log = &nxt_main_log;
197 
198     nxt_event_fd_enable_read(engine, &pipe->event);
199 
200     return NXT_OK;
201 }
202 
203 
204 static void
205 nxt_event_engine_signal_pipe_free(nxt_event_engine_t *engine)
206 {
207     nxt_event_engine_pipe_t  *pipe;
208 
209     pipe = engine->pipe;
210 
211     if (pipe != NULL) {
212 
213         if (pipe->event.read_work_queue != NULL) {
214             nxt_event_fd_close(engine, &pipe->event);
215             nxt_pipe_close(pipe->fds);
216         }
217 
218         nxt_free(pipe);
219     }
220 }
221 
222 
223 static void
224 nxt_event_engine_signal_pipe_close(nxt_thread_t *thr, void *obj, void *data)
225 {
226     nxt_event_engine_pipe_t  *pipe;
227 
228     pipe = obj;
229 
230     nxt_pipe_close(pipe->fds);
231     nxt_free(pipe);
232 }
233 
234 
235 void
236 nxt_event_engine_post(nxt_event_engine_t *engine, nxt_work_handler_t handler,
237     void *obj, void *data, nxt_log_t *log)
238 {
239     nxt_thread_log_debug("event engine post");
240 
241     nxt_locked_work_queue_add(&engine->work_queue, handler, obj, data, log);
242 
243     nxt_event_engine_signal(engine, 0);
244 }
245 
246 
247 void
248 nxt_event_engine_signal(nxt_event_engine_t *engine, nxt_uint_t signo)
249 {
250     u_char  buf;
251 
252     nxt_thread_log_debug("event engine signal:%ui", signo);
253 
254     /*
255      * A signal number may be sent in a signal context, so the signal
256      * information cannot be passed via a locked work queue.
257      */
258 
259     if (engine->event->signal != NULL) {
260         engine->event->signal(engine->event_set, signo);
261         return;
262     }
263 
264     buf = (u_char) signo;
265     (void) nxt_fd_write(engine->pipe->fds[1], &buf, 1);
266 }
267 
268 
269 static void
270 nxt_event_engine_signal_pipe(nxt_thread_t *thr, void *obj, void *data)
271 {
272     int                    i, n;
273     u_char                 signo;
274     nxt_bool_t             post;
275     nxt_event_fd_t         *ev;
276     const nxt_event_sig_t  *sigev;
277     u_char                 buf[128];
278 
279     ev = obj;
280 
281     nxt_log_debug(thr->log, "engine signal pipe");
282 
283     post = 0;
284 
285     do {
286         n = nxt_fd_read(ev->fd, buf, sizeof(buf));
287 
288         for (i = 0; i < n; i++) {
289             signo = buf[i];
290 
291             nxt_log_debug(thr->log, "engine pipe signo:%d", signo);
292 
293             if (signo == 0) {
294                 /* A post should be processed only once. */
295                 post = 1;
296 
297             } else {
298                 sigev = nxt_event_engine_signal_find(thr, signo);
299 
300                 if (nxt_fast_path(sigev != NULL)) {
301                     sigev->handler(thr, (void *) (uintptr_t) signo,
302                                    (void *) sigev->name);
303                 }
304             }
305         }
306 
307     } while (n == sizeof(buf));
308 
309     if (post) {
310         nxt_event_engine_post_handler(thr, NULL, NULL);
311     }
312 }
313 
314 
315 static void
316 nxt_event_engine_post_handler(nxt_thread_t *thr, void *obj, void *data)
317 {
318     nxt_locked_work_queue_move(thr, &thr->engine->work_queue,
319                                &thr->work_queue.main);
320 }
321 
322 
323 static void
324 nxt_event_engine_signal_pipe_error(nxt_thread_t *thr, void *obj, void *data)
325 {
326     nxt_event_fd_t  *ev;
327 
328     ev = obj;
329 
330     nxt_log_alert(ev->log, "engine pipe(%FD:%FD) event error",
331                   thr->engine->pipe->fds[0], thr->engine->pipe->fds[1]);
332 
333     nxt_event_fd_close(thr->engine, &thr->engine->pipe->event);
334     nxt_pipe_close(thr->engine->pipe->fds);
335 }
336 
337 
338 static void
339 nxt_event_engine_signal_handler(nxt_thread_t *thr, void *obj, void *data)
340 {
341     uintptr_t              signo;
342     const nxt_event_sig_t  *sigev;
343 
344     signo = (uintptr_t) obj;
345 
346     sigev = nxt_event_engine_signal_find(thr, signo);
347 
348     if (nxt_fast_path(sigev != NULL)) {
349         sigev->handler(thr, (void *) (uintptr_t) signo, (void *) sigev->name);
350     }
351 }
352 
353 
354 static const nxt_event_sig_t *
355 nxt_event_engine_signal_find(nxt_thread_t *thr, nxt_uint_t signo)
356 {
357     const nxt_event_sig_t  *sigev;
358 
359     for (sigev = thr->engine->signals->sigev; sigev->signo != 0; sigev++) {
360         if (signo == (nxt_uint_t) sigev->signo) {
361             return sigev;
362         }
363     }
364 
365     nxt_log_alert(thr->log, "signal %ui handler not found", signo);
366 
367     return NULL;
368 }
369 
370 
371 nxt_int_t
372 nxt_event_engine_change(nxt_thread_t *thr, const nxt_event_set_ops_t *event_set,
373     nxt_uint_t batch)
374 {
375     nxt_uint_t          events;
376     nxt_event_engine_t  *engine;
377 
378     engine = thr->engine;
379     engine->batch = batch;
380 
381     if (!engine->event->signal_support && event_set->signal_support) {
382         /*
383          * Block signal processing if the current event
384          * facility does not support signal processing.
385          */
386         nxt_event_engine_signals_stop(engine);
387 
388         /*
389          * Add to thread main work queue the signal events possibly
390          * received before the blocking signal processing.
391          */
392         nxt_event_engine_signal_pipe(thr, &engine->pipe->event, NULL);
393     }
394 
395     if (engine->pipe != NULL && event_set->enable_post != NULL) {
396         /*
397          * An engine pipe must be closed after all signal events
398          * added above to thread main work queue will be processed.
399          */
400         nxt_thread_work_queue_add(thr, &thr->work_queue.main,
401                                   nxt_event_engine_signal_pipe_close,
402                                   engine->pipe, NULL, &nxt_main_log);
403 
404         engine->pipe = NULL;
405     }
406 
407     engine->event->free(engine->event_set);
408 
409     events = (batch != 0) ? batch : 32;
410 
411     engine->event_set = event_set->create(engine->signals, 4 * events, events);
412     if (engine->event_set == NULL) {
413         return NXT_ERROR;
414     }
415 
416     engine->event = event_set;
417 
418     if (nxt_event_engine_post_init(thr, engine) != NXT_OK) {
419         return NXT_ERROR;
420     }
421 
422     if (engine->signals != NULL) {
423 
424         if (!engine->event->signal_support) {
425             return nxt_event_engine_signals_start(engine);
426         }
427 
428 #if (NXT_THREADS)
429         /*
430          * Reset the PID flag to start the signal thread if
431          * some future event facility will not support signals.
432          */
433         engine->signals->process = 0;
434 #endif
435     }
436 
437     return NXT_OK;
438 }
439 
440 
441 void
442 nxt_event_engine_free(nxt_event_engine_t *engine)
443 {
444     nxt_event_engine_signal_pipe_free(engine);
445     nxt_free(engine->signals);
446 
447     nxt_locked_work_queue_destroy(&engine->work_queue);
448     nxt_thread_work_queue_destroy(nxt_thread());
449 
450     engine->event->free(engine->event_set);
451 
452     /* TODO: free timers */
453 
454     nxt_free(engine);
455 }
456 
457 
458 void
459 nxt_event_engine_start(nxt_event_engine_t *engine)
460 {
461     void                *obj, *data;
462     nxt_msec_t          timeout, now;
463     nxt_thread_t        *thr;
464     nxt_work_handler_t  handler;
465 
466     thr = nxt_thread();
467 
468     if (engine->fibers) {
469         /*
470          * _setjmp() cannot be wrapped in a function since return from
471          * the function clobbers stack used by future _setjmp() returns.
472          */
473         _setjmp(engine->fibers->fiber.jmp);
474 
475         /* A return point from fibers. */
476     }
477 
478     for ( ;; ) {
479 
480         for ( ;; ) {
481             handler = nxt_thread_work_queue_pop(thr, &obj, &data, &thr->log);
482 
483             if (handler == NULL) {
484                 break;
485             }
486 
487             handler(thr, obj, data);
488 
489             thr->log = &nxt_main_log;
490         }
491 
492         for ( ;; ) {
493             handler = nxt_thread_last_work_queue_pop(thr, &obj, &data,
494                                                      &thr->log);
495             if (handler == NULL) {
496                 break;
497             }
498 
499             handler(thr, obj, data);
500 
501             thr->log = &nxt_main_log;
502         }
503 
504         /* Attach some event engine work queues in preferred order. */
505 
506         nxt_work_queue_attach(thr, &engine->accept_work_queue);
507         nxt_work_queue_attach(thr, &engine->read_work_queue);
508 
509         timeout = nxt_event_timer_find(engine);
510 
511         engine->event->poll(thr, engine->event_set, timeout);
512 
513         /*
514          * Look up expired timers only if a new zero timer has been
515          * just added before the event poll or if the event poll slept
516          * at least 1 millisecond, because all old eligible timers were
517          * processed in the previous iterations.
518          */
519 
520         now = nxt_thread_monotonic_time(thr) / 1000000;
521 
522         if (timeout == 0 || now != engine->timers.now) {
523             nxt_event_timer_expire(thr, now);
524         }
525     }
526 }
527