xref: /unit/src/nxt_event_engine.c (revision 20:4dc92b438f58)
1 
2 /*
3  * Copyright (C) Igor Sysoev
4  * Copyright (C) NGINX, Inc.
5  */
6 
7 #include <nxt_main.h>
8 
9 
10 static nxt_int_t nxt_event_engine_post_init(nxt_event_engine_t *engine);
11 static nxt_int_t nxt_event_engine_signal_pipe_create(
12     nxt_event_engine_t *engine);
13 static void nxt_event_engine_signal_pipe_close(nxt_task_t *task, void *obj,
14     void *data);
15 static void nxt_event_engine_signal_pipe(nxt_task_t *task, void *obj,
16     void *data);
17 static void nxt_event_engine_post_handler(nxt_task_t *task, void *obj,
18     void *data);
19 static void nxt_event_engine_signal_pipe_error(nxt_task_t *task, void *obj,
20     void *data);
21 static void nxt_event_engine_signal_handler(nxt_task_t *task, void *obj,
22     void *data);
23 static nxt_work_handler_t nxt_event_engine_queue_pop(nxt_event_engine_t *engine,
24     nxt_task_t **task, void **obj, void **data);
25 
26 
27 nxt_event_engine_t *
28 nxt_event_engine_create(nxt_task_t *task,
29     const nxt_event_interface_t *interface, const nxt_sig_event_t *signals,
30     nxt_uint_t flags, nxt_uint_t batch)
31 {
32     nxt_uint_t          events;
33     nxt_thread_t        *thread;
34     nxt_event_engine_t  *engine;
35 
36     engine = nxt_zalloc(sizeof(nxt_event_engine_t));
37     if (engine == NULL) {
38         return NULL;
39     }
40 
41     thread = task->thread;
42 
43     engine->task.thread = thread;
44     engine->task.log = thread->log;
45     engine->task.ident = nxt_task_next_ident();
46 
47     thread->engine = engine;
48     thread->fiber = &engine->fibers->fiber;
49 
50     engine->batch = batch;
51 
52     if (flags & NXT_ENGINE_FIBERS) {
53         engine->fibers = nxt_fiber_main_create(engine);
54         if (engine->fibers == NULL) {
55             goto fibers_fail;
56         }
57     }
58 
59     engine->current_work_queue = &engine->fast_work_queue;
60 
61     nxt_work_queue_cache_create(&engine->work_queue_cache, 0);
62 
63     engine->fast_work_queue.cache = &engine->work_queue_cache;
64     engine->accept_work_queue.cache = &engine->work_queue_cache;
65     engine->read_work_queue.cache = &engine->work_queue_cache;
66     engine->socket_work_queue.cache = &engine->work_queue_cache;
67     engine->connect_work_queue.cache = &engine->work_queue_cache;
68     engine->write_work_queue.cache = &engine->work_queue_cache;
69     engine->shutdown_work_queue.cache = &engine->work_queue_cache;
70     engine->close_work_queue.cache = &engine->work_queue_cache;
71 
72     nxt_work_queue_name(&engine->fast_work_queue, "fast");
73     nxt_work_queue_name(&engine->accept_work_queue, "accept");
74     nxt_work_queue_name(&engine->read_work_queue, "read");
75     nxt_work_queue_name(&engine->socket_work_queue, "socket");
76     nxt_work_queue_name(&engine->connect_work_queue, "connect");
77     nxt_work_queue_name(&engine->write_work_queue, "write");
78     nxt_work_queue_name(&engine->shutdown_work_queue, "shutdown");
79     nxt_work_queue_name(&engine->close_work_queue, "close");
80 
81     if (signals != NULL) {
82         engine->signals = nxt_event_engine_signals(signals);
83         if (engine->signals == NULL) {
84             goto signals_fail;
85         }
86 
87         engine->signals->handler = nxt_event_engine_signal_handler;
88 
89         if (!interface->signal_support) {
90             if (nxt_event_engine_signals_start(engine) != NXT_OK) {
91                 goto signals_fail;
92             }
93         }
94     }
95 
96     /*
97      * Number of event set and timers changes should be at least twice
98      * more than number of events to avoid premature flushes of the changes.
99      * Fourfold is for sure.
100      */
101     events = (batch != 0) ? batch : 32;
102 
103     if (interface->create(engine, 4 * events, events) != NXT_OK) {
104         goto event_set_fail;
105     }
106 
107     engine->event = *interface;
108 
109     if (nxt_event_engine_post_init(engine) != NXT_OK) {
110         goto post_fail;
111     }
112 
113     if (nxt_timers_init(&engine->timers, 4 * events) != NXT_OK) {
114         goto timers_fail;
115     }
116 
117     thread = task->thread;
118 
119     nxt_thread_time_update(thread);
120     engine->timers.now = nxt_thread_monotonic_time(thread) / 1000000;
121 
122     engine->max_connections = 0xffffffff;
123 
124     nxt_queue_init(&engine->listen_connections);
125     nxt_queue_init(&engine->idle_connections);
126 
127 #if !(NXT_THREADS)
128 
129     if (interface->signal_support) {
130         thread->time.signal = -1;
131     }
132 
133 #endif
134 
135     return engine;
136 
137 timers_fail:
138 post_fail:
139 
140     interface->free(engine);
141 
142 event_set_fail:
143 signals_fail:
144 
145     nxt_free(engine->signals);
146     nxt_work_queue_cache_destroy(&engine->work_queue_cache);
147     nxt_free(engine->fibers);
148 
149 fibers_fail:
150 
151     nxt_free(engine);
152     return NULL;
153 }
154 
155 
156 static nxt_int_t
157 nxt_event_engine_post_init(nxt_event_engine_t *engine)
158 {
159     if (engine->event.enable_post != NULL) {
160         return engine->event.enable_post(engine, nxt_event_engine_post_handler);
161     }
162 
163 #if !(NXT_THREADS)
164 
165     /* Only signals may are posted in single-threaded mode. */
166 
167     if (engine->event->signal_support) {
168         return NXT_OK;
169     }
170 
171 #endif
172 
173     if (nxt_event_engine_signal_pipe_create(engine) != NXT_OK) {
174         return NXT_ERROR;
175     }
176 
177     return NXT_OK;
178 }
179 
180 
181 static nxt_int_t
182 nxt_event_engine_signal_pipe_create(nxt_event_engine_t *engine)
183 {
184     nxt_event_engine_pipe_t  *pipe;
185 
186     pipe = nxt_zalloc(sizeof(nxt_event_engine_pipe_t));
187     if (pipe == NULL) {
188         return NXT_ERROR;
189     }
190 
191     engine->pipe = pipe;
192 
193     /*
194      * An event engine pipe is in blocking mode for writer
195      * and in non-blocking node for reader.
196      */
197 
198     if (nxt_pipe_create(&engine->task, pipe->fds, 1, 0) != NXT_OK) {
199         nxt_free(pipe);
200         return NXT_ERROR;
201     }
202 
203     pipe->event.fd = pipe->fds[0];
204     pipe->event.task = &engine->task;
205     pipe->event.read_work_queue = &engine->fast_work_queue;
206     pipe->event.read_handler = nxt_event_engine_signal_pipe;
207     pipe->event.write_work_queue = &engine->fast_work_queue;
208     pipe->event.error_handler = nxt_event_engine_signal_pipe_error;
209     pipe->event.log = engine->task.log;
210 
211     nxt_fd_event_enable_read(engine, &pipe->event);
212 
213     return NXT_OK;
214 }
215 
216 
217 static void
218 nxt_event_engine_signal_pipe_free(nxt_event_engine_t *engine)
219 {
220     nxt_event_engine_pipe_t  *pipe;
221 
222     pipe = engine->pipe;
223 
224     if (pipe != NULL) {
225 
226         if (pipe->event.read_work_queue != NULL) {
227             nxt_fd_event_close(engine, &pipe->event);
228             nxt_pipe_close(pipe->event.task, pipe->fds);
229         }
230 
231         nxt_free(pipe);
232     }
233 }
234 
235 
236 static void
237 nxt_event_engine_signal_pipe_close(nxt_task_t *task, void *obj, void *data)
238 {
239     nxt_event_engine_pipe_t  *pipe;
240 
241     pipe = obj;
242 
243     nxt_pipe_close(pipe->event.task, pipe->fds);
244     nxt_free(pipe);
245 }
246 
247 
248 void
249 nxt_event_engine_post(nxt_event_engine_t *engine, nxt_work_t *work)
250 {
251     nxt_debug(&engine->task, "event engine post");
252 
253     nxt_locked_work_queue_add(&engine->locked_work_queue, work);
254 
255     nxt_event_engine_signal(engine, 0);
256 }
257 
258 
259 void
260 nxt_event_engine_signal(nxt_event_engine_t *engine, nxt_uint_t signo)
261 {
262     u_char  buf;
263 
264     nxt_debug(&engine->task, "event engine signal:%ui", signo);
265 
266     /*
267      * A signal number may be sent in a signal context, so the signal
268      * information cannot be passed via a locked work queue.
269      */
270 
271     if (engine->event.signal != NULL) {
272         engine->event.signal(engine, signo);
273         return;
274     }
275 
276     buf = (u_char) signo;
277     (void) nxt_fd_write(engine->pipe->fds[1], &buf, 1);
278 }
279 
280 
281 static void
282 nxt_event_engine_signal_pipe(nxt_task_t *task, void *obj, void *data)
283 {
284     int             i, n;
285     u_char          signo;
286     nxt_bool_t      post;
287     nxt_fd_event_t  *ev;
288     u_char          buf[128];
289 
290     ev = obj;
291 
292     nxt_debug(task, "engine signal pipe");
293 
294     post = 0;
295 
296     do {
297         n = nxt_fd_read(ev->fd, buf, sizeof(buf));
298 
299         for (i = 0; i < n; i++) {
300             signo = buf[i];
301 
302             nxt_debug(task, "engine pipe signo:%d", signo);
303 
304             if (signo == 0) {
305                 /* A post should be processed only once. */
306                 post = 1;
307 
308             } else {
309                 nxt_event_engine_signal_handler(task,
310                                              (void *) (uintptr_t) signo, NULL);
311             }
312         }
313 
314     } while (n == sizeof(buf));
315 
316     if (post) {
317         nxt_event_engine_post_handler(task, NULL, NULL);
318     }
319 }
320 
321 
322 static void
323 nxt_event_engine_post_handler(nxt_task_t *task, void *obj, void *data)
324 {
325     nxt_thread_t        *thread;
326     nxt_event_engine_t  *engine;
327 
328     thread = task->thread;
329     engine = thread->engine;
330 
331     nxt_locked_work_queue_move(thread, &engine->locked_work_queue,
332                                &engine->fast_work_queue);
333 }
334 
335 
336 static void
337 nxt_event_engine_signal_pipe_error(nxt_task_t *task, void *obj, void *data)
338 {
339     nxt_event_engine_t       *engine;
340     nxt_event_engine_pipe_t  *pipe;
341 
342     engine = task->thread->engine;
343     pipe = engine->pipe;
344 
345     nxt_log(task, NXT_LOG_CRIT, "engine pipe(%FD:%FD) event error",
346             pipe->fds[0], pipe->fds[1]);
347 
348     nxt_fd_event_close(engine, &pipe->event);
349     nxt_pipe_close(pipe->event.task, pipe->fds);
350 }
351 
352 
353 static void
354 nxt_event_engine_signal_handler(nxt_task_t *task, void *obj, void *data)
355 {
356     uintptr_t              signo;
357     const nxt_sig_event_t  *sigev;
358 
359     signo = (uintptr_t) obj;
360 
361     for (sigev = task->thread->engine->signals->sigev;
362          sigev->signo != 0;
363          sigev++)
364     {
365         if (signo == (nxt_uint_t) sigev->signo) {
366             sigev->handler(task, (void *) signo, (void *) sigev->name);
367             return;
368         }
369     }
370 
371     nxt_log(task, NXT_LOG_CRIT, "signal %ui handler not found", signo);
372 }
373 
374 
375 nxt_int_t
376 nxt_event_engine_change(nxt_event_engine_t *engine,
377     const nxt_event_interface_t *interface, nxt_uint_t batch)
378 {
379     nxt_uint_t  events;
380 
381     engine->batch = batch;
382 
383     if (!engine->event.signal_support && interface->signal_support) {
384         /*
385          * Block signal processing if the current event
386          * facility does not support signal processing.
387          */
388         nxt_event_engine_signals_stop(engine);
389 
390         /*
391          * Add to engine fast work queue the signal events possibly
392          * received before the blocking signal processing.
393          */
394         nxt_event_engine_signal_pipe(&engine->task, &engine->pipe->event, NULL);
395     }
396 
397     if (engine->pipe != NULL && interface->enable_post != NULL) {
398         /*
399          * An engine pipe must be closed after all signal events
400          * added above to engine fast work queue will be processed.
401          */
402         nxt_work_queue_add(&engine->fast_work_queue,
403                            nxt_event_engine_signal_pipe_close,
404                            &engine->task, engine->pipe, NULL);
405 
406         engine->pipe = NULL;
407     }
408 
409     engine->event.free(engine);
410 
411     events = (batch != 0) ? batch : 32;
412 
413     if (interface->create(engine, 4 * events, events) != NXT_OK) {
414         return NXT_ERROR;
415     }
416 
417     engine->event = *interface;
418 
419     if (nxt_event_engine_post_init(engine) != NXT_OK) {
420         return NXT_ERROR;
421     }
422 
423     if (engine->signals != NULL) {
424 
425         if (!engine->event.signal_support) {
426             return nxt_event_engine_signals_start(engine);
427         }
428 
429 #if (NXT_THREADS)
430         /*
431          * Reset the PID flag to start the signal thread if
432          * some future event facility will not support signals.
433          */
434         engine->signals->process = 0;
435 #endif
436     }
437 
438     return NXT_OK;
439 }
440 
441 
442 void
443 nxt_event_engine_free(nxt_event_engine_t *engine)
444 {
445     nxt_event_engine_signal_pipe_free(engine);
446     nxt_free(engine->signals);
447 
448     nxt_work_queue_cache_destroy(&engine->work_queue_cache);
449 
450     engine->event.free(engine);
451 
452     /* TODO: free timers */
453 
454     nxt_free(engine);
455 }
456 
457 
458 static nxt_work_handler_t
459 nxt_event_engine_queue_pop(nxt_event_engine_t *engine, nxt_task_t **task,
460     void **obj, void **data)
461 {
462     nxt_work_queue_t  *wq, *last;
463 
464     wq = engine->current_work_queue;
465     last = wq;
466 
467     if (wq->head == NULL) {
468         wq = &engine->fast_work_queue;
469 
470         if (wq->head == NULL) {
471 
472             do {
473                 engine->current_work_queue++;
474                 wq = engine->current_work_queue;
475 
476                 if (wq > &engine->close_work_queue) {
477                     wq = &engine->fast_work_queue;
478                     engine->current_work_queue = wq;
479                 }
480 
481                 if (wq->head != NULL) {
482                     goto found;
483                 }
484 
485             } while (wq != last);
486 
487             engine->current_work_queue = &engine->fast_work_queue;
488 
489             return NULL;
490         }
491     }
492 
493 found:
494 
495     nxt_debug(&engine->task, "work queue: %s", wq->name);
496 
497     return nxt_work_queue_pop(wq, task, obj, data);
498 }
499 
500 
501 void
502 nxt_event_engine_start(nxt_event_engine_t *engine)
503 {
504     void                *obj, *data;
505     nxt_task_t          *task;
506     nxt_msec_t          timeout, now;
507     nxt_thread_t        *thr;
508     nxt_work_handler_t  handler;
509 
510     thr = nxt_thread();
511 
512     if (engine->fibers) {
513         /*
514          * _setjmp() cannot be wrapped in a function since return from
515          * the function clobbers stack used by future _setjmp() returns.
516          */
517         _setjmp(engine->fibers->fiber.jmp);
518 
519         /* A return point from fibers. */
520     }
521 
522     thr->log = engine->task.log;
523 
524     for ( ;; ) {
525 
526         for ( ;; ) {
527             handler = nxt_event_engine_queue_pop(engine, &task, &obj, &data);
528 
529             if (handler == NULL) {
530                 break;
531             }
532 
533             handler(task, obj, data);
534         }
535 
536         /* Attach some event engine work queues in preferred order. */
537 
538         timeout = nxt_timer_find(engine);
539 
540         engine->event.poll(engine, timeout);
541 
542         now = nxt_thread_monotonic_time(thr) / 1000000;
543 
544         nxt_timer_expire(engine, now);
545     }
546 }
547