xref: /unit/src/nxt_event_engine.c (revision 318:c2442f5e054d)
1 
2 /*
3  * Copyright (C) Igor Sysoev
4  * Copyright (C) NGINX, Inc.
5  */
6 
7 #include <nxt_main.h>
8 
9 
10 static nxt_int_t nxt_event_engine_post_init(nxt_event_engine_t *engine);
11 static nxt_int_t nxt_event_engine_signal_pipe_create(
12     nxt_event_engine_t *engine);
13 static void nxt_event_engine_signal_pipe_close(nxt_task_t *task, void *obj,
14     void *data);
15 static void nxt_event_engine_signal_pipe(nxt_task_t *task, void *obj,
16     void *data);
17 static void nxt_event_engine_post_handler(nxt_task_t *task, void *obj,
18     void *data);
19 static void nxt_event_engine_signal_pipe_error(nxt_task_t *task, void *obj,
20     void *data);
21 static void nxt_event_engine_signal_handler(nxt_task_t *task, void *obj,
22     void *data);
23 static nxt_work_handler_t nxt_event_engine_queue_pop(nxt_event_engine_t *engine,
24     nxt_task_t **task, void **obj, void **data);
25 
26 
27 nxt_event_engine_t *
28 nxt_event_engine_create(nxt_task_t *task,
29     const nxt_event_interface_t *interface, const nxt_sig_event_t *signals,
30     nxt_uint_t flags, nxt_uint_t batch)
31 {
32     nxt_uint_t          events;
33     nxt_thread_t        *thread;
34     nxt_event_engine_t  *engine;
35 
36     engine = nxt_zalloc(sizeof(nxt_event_engine_t));
37     if (engine == NULL) {
38         return NULL;
39     }
40 
41     nxt_debug(task, "create engine %p", engine);
42 
43     thread = task->thread;
44 
45     engine->task.thread = thread;
46     engine->task.log = thread->log;
47     engine->task.ident = nxt_task_next_ident();
48 
49     engine->batch = batch;
50 
51     if (flags & NXT_ENGINE_FIBERS) {
52         engine->fibers = nxt_fiber_main_create(engine);
53         if (engine->fibers == NULL) {
54             goto fibers_fail;
55         }
56     }
57 
58     engine->current_work_queue = &engine->fast_work_queue;
59 
60     nxt_work_queue_cache_create(&engine->work_queue_cache, 0);
61 
62     engine->fast_work_queue.cache = &engine->work_queue_cache;
63     engine->accept_work_queue.cache = &engine->work_queue_cache;
64     engine->read_work_queue.cache = &engine->work_queue_cache;
65     engine->socket_work_queue.cache = &engine->work_queue_cache;
66     engine->connect_work_queue.cache = &engine->work_queue_cache;
67     engine->write_work_queue.cache = &engine->work_queue_cache;
68     engine->shutdown_work_queue.cache = &engine->work_queue_cache;
69     engine->close_work_queue.cache = &engine->work_queue_cache;
70 
71     nxt_work_queue_name(&engine->fast_work_queue, "fast");
72     nxt_work_queue_name(&engine->accept_work_queue, "accept");
73     nxt_work_queue_name(&engine->read_work_queue, "read");
74     nxt_work_queue_name(&engine->socket_work_queue, "socket");
75     nxt_work_queue_name(&engine->connect_work_queue, "connect");
76     nxt_work_queue_name(&engine->write_work_queue, "write");
77     nxt_work_queue_name(&engine->shutdown_work_queue, "shutdown");
78     nxt_work_queue_name(&engine->close_work_queue, "close");
79 
80     if (signals != NULL) {
81         engine->signals = nxt_event_engine_signals(signals);
82         if (engine->signals == NULL) {
83             goto signals_fail;
84         }
85 
86         engine->signals->handler = nxt_event_engine_signal_handler;
87 
88         if (!interface->signal_support) {
89             if (nxt_event_engine_signals_start(engine) != NXT_OK) {
90                 goto signals_fail;
91             }
92         }
93     }
94 
95     /*
96      * Number of event set and timers changes should be at least twice
97      * more than number of events to avoid premature flushes of the changes.
98      * Fourfold is for sure.
99      */
100     events = (batch != 0) ? batch : 32;
101 
102     if (interface->create(engine, 4 * events, events) != NXT_OK) {
103         goto event_set_fail;
104     }
105 
106     engine->event = *interface;
107 
108     if (nxt_event_engine_post_init(engine) != NXT_OK) {
109         goto post_fail;
110     }
111 
112     if (nxt_timers_init(&engine->timers, 4 * events) != NXT_OK) {
113         goto timers_fail;
114     }
115 
116     thread = task->thread;
117 
118     nxt_thread_time_update(thread);
119     engine->timers.now = nxt_thread_monotonic_time(thread) / 1000000;
120 
121     engine->max_connections = 0xffffffff;
122 
123     nxt_queue_init(&engine->joints);
124     nxt_queue_init(&engine->listen_connections);
125     nxt_queue_init(&engine->idle_connections);
126 
127     return engine;
128 
129 timers_fail:
130 post_fail:
131 
132     interface->free(engine);
133 
134 event_set_fail:
135 signals_fail:
136 
137     nxt_free(engine->signals);
138     nxt_work_queue_cache_destroy(&engine->work_queue_cache);
139     nxt_free(engine->fibers);
140 
141 fibers_fail:
142 
143     nxt_free(engine);
144     return NULL;
145 }
146 
147 
148 static nxt_int_t
149 nxt_event_engine_post_init(nxt_event_engine_t *engine)
150 {
151     if (engine->event.enable_post != NULL) {
152         return engine->event.enable_post(engine, nxt_event_engine_post_handler);
153     }
154 
155     if (nxt_event_engine_signal_pipe_create(engine) != NXT_OK) {
156         return NXT_ERROR;
157     }
158 
159     return NXT_OK;
160 }
161 
162 
163 static nxt_int_t
164 nxt_event_engine_signal_pipe_create(nxt_event_engine_t *engine)
165 {
166     nxt_event_engine_pipe_t  *pipe;
167 
168     pipe = nxt_zalloc(sizeof(nxt_event_engine_pipe_t));
169     if (pipe == NULL) {
170         return NXT_ERROR;
171     }
172 
173     engine->pipe = pipe;
174 
175     /*
176      * An event engine pipe is in blocking mode for writer
177      * and in non-blocking node for reader.
178      */
179 
180     if (nxt_pipe_create(&engine->task, pipe->fds, 1, 0) != NXT_OK) {
181         nxt_free(pipe);
182         return NXT_ERROR;
183     }
184 
185     pipe->event.fd = pipe->fds[0];
186     pipe->event.task = &engine->task;
187     pipe->event.read_work_queue = &engine->fast_work_queue;
188     pipe->event.read_handler = nxt_event_engine_signal_pipe;
189     pipe->event.write_work_queue = &engine->fast_work_queue;
190     pipe->event.error_handler = nxt_event_engine_signal_pipe_error;
191     pipe->event.log = engine->task.log;
192 
193     nxt_fd_event_enable_read(engine, &pipe->event);
194 
195     return NXT_OK;
196 }
197 
198 
199 static void
200 nxt_event_engine_signal_pipe_free(nxt_event_engine_t *engine)
201 {
202     nxt_event_engine_pipe_t  *pipe;
203 
204     pipe = engine->pipe;
205 
206     if (pipe != NULL) {
207 
208         if (pipe->event.read_work_queue != NULL) {
209             nxt_fd_event_close(engine, &pipe->event);
210             nxt_pipe_close(pipe->event.task, pipe->fds);
211         }
212 
213         nxt_free(pipe);
214     }
215 }
216 
217 
218 static void
219 nxt_event_engine_signal_pipe_close(nxt_task_t *task, void *obj, void *data)
220 {
221     nxt_event_engine_pipe_t  *pipe;
222 
223     pipe = obj;
224 
225     nxt_pipe_close(pipe->event.task, pipe->fds);
226     nxt_free(pipe);
227 }
228 
229 
230 void
231 nxt_event_engine_post(nxt_event_engine_t *engine, nxt_work_t *work)
232 {
233     nxt_debug(&engine->task, "event engine post");
234 
235 #if (NXT_DEBUG)
236     if (nxt_slow_path(work->next != NULL)) {
237         nxt_debug(&engine->task, "event engine post multiple works");
238     }
239 #endif
240 
241     nxt_locked_work_queue_add(&engine->locked_work_queue, work);
242 
243     nxt_event_engine_signal(engine, 0);
244 }
245 
246 
247 void
248 nxt_event_engine_signal(nxt_event_engine_t *engine, nxt_uint_t signo)
249 {
250     u_char  buf;
251 
252     nxt_debug(&engine->task, "event engine signal:%ui", signo);
253 
254     /*
255      * A signal number may be sent in a signal context, so the signal
256      * information cannot be passed via a locked work queue.
257      */
258 
259     if (engine->event.signal != NULL) {
260         engine->event.signal(engine, signo);
261         return;
262     }
263 
264     buf = (u_char) signo;
265     (void) nxt_fd_write(engine->pipe->fds[1], &buf, 1);
266 }
267 
268 
269 static void
270 nxt_event_engine_signal_pipe(nxt_task_t *task, void *obj, void *data)
271 {
272     int             i, n;
273     u_char          signo;
274     nxt_bool_t      post;
275     nxt_fd_event_t  *ev;
276     u_char          buf[128];
277 
278     ev = obj;
279 
280     nxt_debug(task, "engine signal pipe");
281 
282     post = 0;
283 
284     do {
285         n = nxt_fd_read(ev->fd, buf, sizeof(buf));
286 
287         for (i = 0; i < n; i++) {
288             signo = buf[i];
289 
290             nxt_debug(task, "engine pipe signo:%d", signo);
291 
292             if (signo == 0) {
293                 /* A post should be processed only once. */
294                 post = 1;
295 
296             } else {
297                 nxt_event_engine_signal_handler(task,
298                                              (void *) (uintptr_t) signo, NULL);
299             }
300         }
301 
302     } while (n == sizeof(buf));
303 
304     if (post) {
305         nxt_event_engine_post_handler(task, NULL, NULL);
306     }
307 }
308 
309 
310 static void
311 nxt_event_engine_post_handler(nxt_task_t *task, void *obj, void *data)
312 {
313     nxt_thread_t        *thread;
314     nxt_event_engine_t  *engine;
315 
316     thread = task->thread;
317     engine = thread->engine;
318 
319     nxt_locked_work_queue_move(thread, &engine->locked_work_queue,
320                                &engine->fast_work_queue);
321 }
322 
323 
324 static void
325 nxt_event_engine_signal_pipe_error(nxt_task_t *task, void *obj, void *data)
326 {
327     nxt_event_engine_t       *engine;
328     nxt_event_engine_pipe_t  *pipe;
329 
330     engine = task->thread->engine;
331     pipe = engine->pipe;
332 
333     nxt_log(task, NXT_LOG_CRIT, "engine pipe(%FD:%FD) event error",
334             pipe->fds[0], pipe->fds[1]);
335 
336     nxt_fd_event_close(engine, &pipe->event);
337     nxt_pipe_close(pipe->event.task, pipe->fds);
338 }
339 
340 
341 static void
342 nxt_event_engine_signal_handler(nxt_task_t *task, void *obj, void *data)
343 {
344     uintptr_t              signo;
345     const nxt_sig_event_t  *sigev;
346 
347     signo = (uintptr_t) obj;
348 
349     for (sigev = task->thread->engine->signals->sigev;
350          sigev->signo != 0;
351          sigev++)
352     {
353         if (signo == (nxt_uint_t) sigev->signo) {
354             sigev->handler(task, (void *) signo, (void *) sigev->name);
355             return;
356         }
357     }
358 
359     nxt_log(task, NXT_LOG_CRIT, "signal %ui handler not found", signo);
360 }
361 
362 
363 nxt_int_t
364 nxt_event_engine_change(nxt_event_engine_t *engine,
365     const nxt_event_interface_t *interface, nxt_uint_t batch)
366 {
367     nxt_uint_t  events;
368 
369     engine->batch = batch;
370 
371     if (!engine->event.signal_support && interface->signal_support) {
372         /*
373          * Block signal processing if the current event
374          * facility does not support signal processing.
375          */
376         nxt_event_engine_signals_stop(engine);
377 
378         /*
379          * Add to engine fast work queue the signal events possibly
380          * received before the blocking signal processing.
381          */
382         nxt_event_engine_signal_pipe(&engine->task, &engine->pipe->event, NULL);
383     }
384 
385     if (engine->pipe != NULL && interface->enable_post != NULL) {
386         /*
387          * An engine pipe must be closed after all signal events
388          * added above to engine fast work queue will be processed.
389          */
390         nxt_work_queue_add(&engine->fast_work_queue,
391                            nxt_event_engine_signal_pipe_close,
392                            &engine->task, engine->pipe, NULL);
393 
394         engine->pipe = NULL;
395     }
396 
397     engine->event.free(engine);
398 
399     events = (batch != 0) ? batch : 32;
400 
401     if (interface->create(engine, 4 * events, events) != NXT_OK) {
402         return NXT_ERROR;
403     }
404 
405     engine->event = *interface;
406 
407     if (nxt_event_engine_post_init(engine) != NXT_OK) {
408         return NXT_ERROR;
409     }
410 
411     if (engine->signals != NULL) {
412 
413         if (!engine->event.signal_support) {
414             return nxt_event_engine_signals_start(engine);
415         }
416 
417         /*
418          * Reset the PID flag to start the signal thread if
419          * some future event facility will not support signals.
420          */
421         engine->signals->process = 0;
422     }
423 
424     return NXT_OK;
425 }
426 
427 
428 void
429 nxt_event_engine_free(nxt_event_engine_t *engine)
430 {
431     nxt_thread_log_debug("free engine %p", engine);
432 
433     nxt_event_engine_signal_pipe_free(engine);
434     nxt_free(engine->signals);
435 
436     nxt_work_queue_cache_destroy(&engine->work_queue_cache);
437 
438     engine->event.free(engine);
439 
440     /* TODO: free timers */
441 
442     nxt_free(engine);
443 }
444 
445 
446 static nxt_work_handler_t
447 nxt_event_engine_queue_pop(nxt_event_engine_t *engine, nxt_task_t **task,
448     void **obj, void **data)
449 {
450     nxt_work_queue_t  *wq, *last;
451 
452     wq = engine->current_work_queue;
453     last = wq;
454 
455     if (wq->head == NULL) {
456         wq = &engine->fast_work_queue;
457 
458         if (wq->head == NULL) {
459 
460             do {
461                 engine->current_work_queue++;
462                 wq = engine->current_work_queue;
463 
464                 if (wq > &engine->close_work_queue) {
465                     wq = &engine->fast_work_queue;
466                     engine->current_work_queue = wq;
467                 }
468 
469                 if (wq->head != NULL) {
470                     goto found;
471                 }
472 
473             } while (wq != last);
474 
475             engine->current_work_queue = &engine->fast_work_queue;
476 
477             return NULL;
478         }
479     }
480 
481 found:
482 
483     nxt_debug(&engine->task, "work queue: %s", wq->name);
484 
485     return nxt_work_queue_pop(wq, task, obj, data);
486 }
487 
488 
489 void
490 nxt_event_engine_start(nxt_event_engine_t *engine)
491 {
492     void                *obj, *data;
493     nxt_task_t          *task;
494     nxt_msec_t          timeout, now;
495     nxt_thread_t        *thr;
496     nxt_work_handler_t  handler;
497 
498     thr = nxt_thread();
499 
500     if (engine->fibers) {
501         /*
502          * _setjmp() cannot be wrapped in a function since return from
503          * the function clobbers stack used by future _setjmp() returns.
504          */
505         _setjmp(engine->fibers->fiber.jmp);
506 
507         /* A return point from fibers. */
508     }
509 
510     thr->log = engine->task.log;
511 
512     for ( ;; ) {
513 
514         for ( ;; ) {
515             handler = nxt_event_engine_queue_pop(engine, &task, &obj, &data);
516 
517             if (handler == NULL) {
518                 break;
519             }
520 
521             thr->task = task;
522 
523             handler(task, obj, data);
524         }
525 
526         /* Attach some event engine work queues in preferred order. */
527 
528         timeout = nxt_timer_find(engine);
529 
530         engine->event.poll(engine, timeout);
531 
532         now = nxt_thread_monotonic_time(thr) / 1000000;
533 
534         nxt_timer_expire(engine, now);
535     }
536 }
537 
538 
539 #if (NXT_DEBUG)
540 
541 void nxt_event_engine_thread_adopt(nxt_event_engine_t *engine)
542 {
543     nxt_work_queue_thread_adopt(&engine->fast_work_queue);
544     nxt_work_queue_thread_adopt(&engine->accept_work_queue);
545     nxt_work_queue_thread_adopt(&engine->read_work_queue);
546     nxt_work_queue_thread_adopt(&engine->socket_work_queue);
547     nxt_work_queue_thread_adopt(&engine->connect_work_queue);
548     nxt_work_queue_thread_adopt(&engine->write_work_queue);
549     nxt_work_queue_thread_adopt(&engine->shutdown_work_queue);
550     nxt_work_queue_thread_adopt(&engine->close_work_queue);
551 }
552 
553 #endif
554