xref: /unit/src/nxt_event_engine.c (revision 326:2aad6a5fac13)
1 
2 /*
3  * Copyright (C) Igor Sysoev
4  * Copyright (C) NGINX, Inc.
5  */
6 
7 #include <nxt_main.h>
8 
9 
10 static nxt_int_t nxt_event_engine_post_init(nxt_event_engine_t *engine);
11 static nxt_int_t nxt_event_engine_signal_pipe_create(
12     nxt_event_engine_t *engine);
13 static void nxt_event_engine_signal_pipe_close(nxt_task_t *task, void *obj,
14     void *data);
15 static void nxt_event_engine_signal_pipe(nxt_task_t *task, void *obj,
16     void *data);
17 static void nxt_event_engine_post_handler(nxt_task_t *task, void *obj,
18     void *data);
19 static void nxt_event_engine_signal_pipe_error(nxt_task_t *task, void *obj,
20     void *data);
21 static void nxt_event_engine_signal_handler(nxt_task_t *task, void *obj,
22     void *data);
23 static nxt_work_handler_t nxt_event_engine_queue_pop(nxt_event_engine_t *engine,
24     nxt_task_t **task, void **obj, void **data);
25 
26 
27 nxt_event_engine_t *
28 nxt_event_engine_create(nxt_task_t *task,
29     const nxt_event_interface_t *interface, const nxt_sig_event_t *signals,
30     nxt_uint_t flags, nxt_uint_t batch)
31 {
32     nxt_uint_t          events;
33     nxt_thread_t        *thread;
34     nxt_event_engine_t  *engine;
35 
36     engine = nxt_zalloc(sizeof(nxt_event_engine_t));
37     if (engine == NULL) {
38         return NULL;
39     }
40 
41     nxt_debug(task, "create engine %p", engine);
42 
43     thread = task->thread;
44 
45     engine->task.thread = thread;
46     engine->task.log = thread->log;
47     engine->task.ident = nxt_task_next_ident();
48 
49     engine->batch = batch;
50 
51 #if 0
52     if (flags & NXT_ENGINE_FIBERS) {
53         engine->fibers = nxt_fiber_main_create(engine);
54         if (engine->fibers == NULL) {
55             goto fibers_fail;
56         }
57     }
58 #endif
59 
60     engine->current_work_queue = &engine->fast_work_queue;
61 
62     nxt_work_queue_cache_create(&engine->work_queue_cache, 0);
63 
64     engine->fast_work_queue.cache = &engine->work_queue_cache;
65     engine->accept_work_queue.cache = &engine->work_queue_cache;
66     engine->read_work_queue.cache = &engine->work_queue_cache;
67     engine->socket_work_queue.cache = &engine->work_queue_cache;
68     engine->connect_work_queue.cache = &engine->work_queue_cache;
69     engine->write_work_queue.cache = &engine->work_queue_cache;
70     engine->shutdown_work_queue.cache = &engine->work_queue_cache;
71     engine->close_work_queue.cache = &engine->work_queue_cache;
72 
73     nxt_work_queue_name(&engine->fast_work_queue, "fast");
74     nxt_work_queue_name(&engine->accept_work_queue, "accept");
75     nxt_work_queue_name(&engine->read_work_queue, "read");
76     nxt_work_queue_name(&engine->socket_work_queue, "socket");
77     nxt_work_queue_name(&engine->connect_work_queue, "connect");
78     nxt_work_queue_name(&engine->write_work_queue, "write");
79     nxt_work_queue_name(&engine->shutdown_work_queue, "shutdown");
80     nxt_work_queue_name(&engine->close_work_queue, "close");
81 
82     if (signals != NULL) {
83         engine->signals = nxt_event_engine_signals(signals);
84         if (engine->signals == NULL) {
85             goto signals_fail;
86         }
87 
88         engine->signals->handler = nxt_event_engine_signal_handler;
89 
90         if (!interface->signal_support) {
91             if (nxt_event_engine_signals_start(engine) != NXT_OK) {
92                 goto signals_fail;
93             }
94         }
95     }
96 
97     /*
98      * Number of event set and timers changes should be at least twice
99      * more than number of events to avoid premature flushes of the changes.
100      * Fourfold is for sure.
101      */
102     events = (batch != 0) ? batch : 32;
103 
104     if (interface->create(engine, 4 * events, events) != NXT_OK) {
105         goto event_set_fail;
106     }
107 
108     engine->event = *interface;
109 
110     if (nxt_event_engine_post_init(engine) != NXT_OK) {
111         goto post_fail;
112     }
113 
114     if (nxt_timers_init(&engine->timers, 4 * events) != NXT_OK) {
115         goto timers_fail;
116     }
117 
118     thread = task->thread;
119 
120     nxt_thread_time_update(thread);
121     engine->timers.now = nxt_thread_monotonic_time(thread) / 1000000;
122 
123     engine->max_connections = 0xffffffff;
124 
125     nxt_queue_init(&engine->joints);
126     nxt_queue_init(&engine->listen_connections);
127     nxt_queue_init(&engine->idle_connections);
128 
129     return engine;
130 
131 timers_fail:
132 post_fail:
133 
134     interface->free(engine);
135 
136 event_set_fail:
137 signals_fail:
138 
139     nxt_free(engine->signals);
140     nxt_work_queue_cache_destroy(&engine->work_queue_cache);
141     nxt_free(engine->fibers);
142 
143 #if 0
144 fibers_fail:
145 
146     nxt_free(engine);
147 #endif
148 
149     return NULL;
150 }
151 
152 
153 static nxt_int_t
154 nxt_event_engine_post_init(nxt_event_engine_t *engine)
155 {
156     if (engine->event.enable_post != NULL) {
157         return engine->event.enable_post(engine, nxt_event_engine_post_handler);
158     }
159 
160     if (nxt_event_engine_signal_pipe_create(engine) != NXT_OK) {
161         return NXT_ERROR;
162     }
163 
164     return NXT_OK;
165 }
166 
167 
168 static nxt_int_t
169 nxt_event_engine_signal_pipe_create(nxt_event_engine_t *engine)
170 {
171     nxt_event_engine_pipe_t  *pipe;
172 
173     pipe = nxt_zalloc(sizeof(nxt_event_engine_pipe_t));
174     if (pipe == NULL) {
175         return NXT_ERROR;
176     }
177 
178     engine->pipe = pipe;
179 
180     /*
181      * An event engine pipe is in blocking mode for writer
182      * and in non-blocking node for reader.
183      */
184 
185     if (nxt_pipe_create(&engine->task, pipe->fds, 1, 0) != NXT_OK) {
186         nxt_free(pipe);
187         return NXT_ERROR;
188     }
189 
190     pipe->event.fd = pipe->fds[0];
191     pipe->event.task = &engine->task;
192     pipe->event.read_work_queue = &engine->fast_work_queue;
193     pipe->event.read_handler = nxt_event_engine_signal_pipe;
194     pipe->event.write_work_queue = &engine->fast_work_queue;
195     pipe->event.error_handler = nxt_event_engine_signal_pipe_error;
196     pipe->event.log = engine->task.log;
197 
198     nxt_fd_event_enable_read(engine, &pipe->event);
199 
200     return NXT_OK;
201 }
202 
203 
204 static void
205 nxt_event_engine_signal_pipe_free(nxt_event_engine_t *engine)
206 {
207     nxt_event_engine_pipe_t  *pipe;
208 
209     pipe = engine->pipe;
210 
211     if (pipe != NULL) {
212 
213         if (pipe->event.read_work_queue != NULL) {
214             nxt_fd_event_close(engine, &pipe->event);
215             nxt_pipe_close(pipe->event.task, pipe->fds);
216         }
217 
218         nxt_free(pipe);
219     }
220 }
221 
222 
223 static void
224 nxt_event_engine_signal_pipe_close(nxt_task_t *task, void *obj, void *data)
225 {
226     nxt_event_engine_pipe_t  *pipe;
227 
228     pipe = obj;
229 
230     nxt_pipe_close(pipe->event.task, pipe->fds);
231     nxt_free(pipe);
232 }
233 
234 
235 void
236 nxt_event_engine_post(nxt_event_engine_t *engine, nxt_work_t *work)
237 {
238     nxt_debug(&engine->task, "event engine post");
239 
240 #if (NXT_DEBUG)
241     if (nxt_slow_path(work->next != NULL)) {
242         nxt_debug(&engine->task, "event engine post multiple works");
243     }
244 #endif
245 
246     nxt_locked_work_queue_add(&engine->locked_work_queue, work);
247 
248     nxt_event_engine_signal(engine, 0);
249 }
250 
251 
252 void
253 nxt_event_engine_signal(nxt_event_engine_t *engine, nxt_uint_t signo)
254 {
255     u_char  buf;
256 
257     nxt_debug(&engine->task, "event engine signal:%ui", signo);
258 
259     /*
260      * A signal number may be sent in a signal context, so the signal
261      * information cannot be passed via a locked work queue.
262      */
263 
264     if (engine->event.signal != NULL) {
265         engine->event.signal(engine, signo);
266         return;
267     }
268 
269     buf = (u_char) signo;
270     (void) nxt_fd_write(engine->pipe->fds[1], &buf, 1);
271 }
272 
273 
274 static void
275 nxt_event_engine_signal_pipe(nxt_task_t *task, void *obj, void *data)
276 {
277     int             i, n;
278     u_char          signo;
279     nxt_bool_t      post;
280     nxt_fd_event_t  *ev;
281     u_char          buf[128];
282 
283     ev = obj;
284 
285     nxt_debug(task, "engine signal pipe");
286 
287     post = 0;
288 
289     do {
290         n = nxt_fd_read(ev->fd, buf, sizeof(buf));
291 
292         for (i = 0; i < n; i++) {
293             signo = buf[i];
294 
295             nxt_debug(task, "engine pipe signo:%d", signo);
296 
297             if (signo == 0) {
298                 /* A post should be processed only once. */
299                 post = 1;
300 
301             } else {
302                 nxt_event_engine_signal_handler(task,
303                                              (void *) (uintptr_t) signo, NULL);
304             }
305         }
306 
307     } while (n == sizeof(buf));
308 
309     if (post) {
310         nxt_event_engine_post_handler(task, NULL, NULL);
311     }
312 }
313 
314 
315 static void
316 nxt_event_engine_post_handler(nxt_task_t *task, void *obj, void *data)
317 {
318     nxt_thread_t        *thread;
319     nxt_event_engine_t  *engine;
320 
321     thread = task->thread;
322     engine = thread->engine;
323 
324     nxt_locked_work_queue_move(thread, &engine->locked_work_queue,
325                                &engine->fast_work_queue);
326 }
327 
328 
329 static void
330 nxt_event_engine_signal_pipe_error(nxt_task_t *task, void *obj, void *data)
331 {
332     nxt_event_engine_t       *engine;
333     nxt_event_engine_pipe_t  *pipe;
334 
335     engine = task->thread->engine;
336     pipe = engine->pipe;
337 
338     nxt_log(task, NXT_LOG_CRIT, "engine pipe(%FD:%FD) event error",
339             pipe->fds[0], pipe->fds[1]);
340 
341     nxt_fd_event_close(engine, &pipe->event);
342     nxt_pipe_close(pipe->event.task, pipe->fds);
343 }
344 
345 
346 static void
347 nxt_event_engine_signal_handler(nxt_task_t *task, void *obj, void *data)
348 {
349     uintptr_t              signo;
350     const nxt_sig_event_t  *sigev;
351 
352     signo = (uintptr_t) obj;
353 
354     for (sigev = task->thread->engine->signals->sigev;
355          sigev->signo != 0;
356          sigev++)
357     {
358         if (signo == (nxt_uint_t) sigev->signo) {
359             sigev->handler(task, (void *) signo, (void *) sigev->name);
360             return;
361         }
362     }
363 
364     nxt_log(task, NXT_LOG_CRIT, "signal %ui handler not found", signo);
365 }
366 
367 
368 nxt_int_t
369 nxt_event_engine_change(nxt_event_engine_t *engine,
370     const nxt_event_interface_t *interface, nxt_uint_t batch)
371 {
372     nxt_uint_t  events;
373 
374     engine->batch = batch;
375 
376     if (!engine->event.signal_support && interface->signal_support) {
377         /*
378          * Block signal processing if the current event
379          * facility does not support signal processing.
380          */
381         nxt_event_engine_signals_stop(engine);
382 
383         /*
384          * Add to engine fast work queue the signal events possibly
385          * received before the blocking signal processing.
386          */
387         nxt_event_engine_signal_pipe(&engine->task, &engine->pipe->event, NULL);
388     }
389 
390     if (engine->pipe != NULL && interface->enable_post != NULL) {
391         /*
392          * An engine pipe must be closed after all signal events
393          * added above to engine fast work queue will be processed.
394          */
395         nxt_work_queue_add(&engine->fast_work_queue,
396                            nxt_event_engine_signal_pipe_close,
397                            &engine->task, engine->pipe, NULL);
398 
399         engine->pipe = NULL;
400     }
401 
402     engine->event.free(engine);
403 
404     events = (batch != 0) ? batch : 32;
405 
406     if (interface->create(engine, 4 * events, events) != NXT_OK) {
407         return NXT_ERROR;
408     }
409 
410     engine->event = *interface;
411 
412     if (nxt_event_engine_post_init(engine) != NXT_OK) {
413         return NXT_ERROR;
414     }
415 
416     if (engine->signals != NULL) {
417 
418         if (!engine->event.signal_support) {
419             return nxt_event_engine_signals_start(engine);
420         }
421 
422         /*
423          * Reset the PID flag to start the signal thread if
424          * some future event facility will not support signals.
425          */
426         engine->signals->process = 0;
427     }
428 
429     return NXT_OK;
430 }
431 
432 
433 void
434 nxt_event_engine_free(nxt_event_engine_t *engine)
435 {
436     nxt_thread_log_debug("free engine %p", engine);
437 
438     nxt_event_engine_signal_pipe_free(engine);
439     nxt_free(engine->signals);
440 
441     nxt_work_queue_cache_destroy(&engine->work_queue_cache);
442 
443     engine->event.free(engine);
444 
445     /* TODO: free timers */
446 
447     nxt_free(engine);
448 }
449 
450 
451 static nxt_work_handler_t
452 nxt_event_engine_queue_pop(nxt_event_engine_t *engine, nxt_task_t **task,
453     void **obj, void **data)
454 {
455     nxt_work_queue_t  *wq, *last;
456 
457     wq = engine->current_work_queue;
458     last = wq;
459 
460     if (wq->head == NULL) {
461         wq = &engine->fast_work_queue;
462 
463         if (wq->head == NULL) {
464 
465             do {
466                 engine->current_work_queue++;
467                 wq = engine->current_work_queue;
468 
469                 if (wq > &engine->close_work_queue) {
470                     wq = &engine->fast_work_queue;
471                     engine->current_work_queue = wq;
472                 }
473 
474                 if (wq->head != NULL) {
475                     goto found;
476                 }
477 
478             } while (wq != last);
479 
480             engine->current_work_queue = &engine->fast_work_queue;
481 
482             return NULL;
483         }
484     }
485 
486 found:
487 
488     nxt_debug(&engine->task, "work queue: %s", wq->name);
489 
490     return nxt_work_queue_pop(wq, task, obj, data);
491 }
492 
493 
494 void
495 nxt_event_engine_start(nxt_event_engine_t *engine)
496 {
497     void                *obj, *data;
498     nxt_task_t          *task;
499     nxt_msec_t          timeout, now;
500     nxt_thread_t        *thr;
501     nxt_work_handler_t  handler;
502 
503     thr = nxt_thread();
504 
505     if (engine->fibers) {
506         /*
507          * _setjmp() cannot be wrapped in a function since return from
508          * the function clobbers stack used by future _setjmp() returns.
509          */
510         _setjmp(engine->fibers->fiber.jmp);
511 
512         /* A return point from fibers. */
513     }
514 
515     thr->log = engine->task.log;
516 
517     for ( ;; ) {
518 
519         for ( ;; ) {
520             handler = nxt_event_engine_queue_pop(engine, &task, &obj, &data);
521 
522             if (handler == NULL) {
523                 break;
524             }
525 
526             thr->task = task;
527 
528             handler(task, obj, data);
529         }
530 
531         /* Attach some event engine work queues in preferred order. */
532 
533         timeout = nxt_timer_find(engine);
534 
535         engine->event.poll(engine, timeout);
536 
537         now = nxt_thread_monotonic_time(thr) / 1000000;
538 
539         nxt_timer_expire(engine, now);
540     }
541 }
542 
543 
544 #if (NXT_DEBUG)
545 
546 void nxt_event_engine_thread_adopt(nxt_event_engine_t *engine)
547 {
548     nxt_work_queue_thread_adopt(&engine->fast_work_queue);
549     nxt_work_queue_thread_adopt(&engine->accept_work_queue);
550     nxt_work_queue_thread_adopt(&engine->read_work_queue);
551     nxt_work_queue_thread_adopt(&engine->socket_work_queue);
552     nxt_work_queue_thread_adopt(&engine->connect_work_queue);
553     nxt_work_queue_thread_adopt(&engine->write_work_queue);
554     nxt_work_queue_thread_adopt(&engine->shutdown_work_queue);
555     nxt_work_queue_thread_adopt(&engine->close_work_queue);
556 }
557 
558 #endif
559