xref: /unit/src/nxt_event_engine.c (revision 4:76c63e9b6322)
1 
2 /*
3  * Copyright (C) Igor Sysoev
4  * Copyright (C) NGINX, Inc.
5  */
6 
7 #include <nxt_main.h>
8 
9 
10 static nxt_int_t nxt_event_engine_post_init(nxt_event_engine_t *engine);
11 static nxt_int_t nxt_event_engine_signal_pipe_create(
12     nxt_event_engine_t *engine);
13 static void nxt_event_engine_signal_pipe_close(nxt_task_t *task, void *obj,
14     void *data);
15 static void nxt_event_engine_signal_pipe(nxt_task_t *task, void *obj,
16     void *data);
17 static void nxt_event_engine_post_handler(nxt_task_t *task, void *obj,
18     void *data);
19 static void nxt_event_engine_signal_pipe_error(nxt_task_t *task, void *obj,
20     void *data);
21 static void nxt_event_engine_signal_handler(nxt_task_t *task, void *obj,
22     void *data);
23 static nxt_work_handler_t nxt_event_engine_queue_pop(nxt_event_engine_t *engine,
24     nxt_task_t **task, void **obj, void **data);
25 
26 
27 nxt_event_engine_t *
28 nxt_event_engine_create(nxt_thread_t *thr, const nxt_event_set_ops_t *event_set,
29     const nxt_event_sig_t *signals, nxt_uint_t flags, nxt_uint_t batch)
30 {
31     nxt_uint_t          events;
32     nxt_event_engine_t  *engine;
33 
34     engine = nxt_zalloc(sizeof(nxt_event_engine_t));
35     if (engine == NULL) {
36         return NULL;
37     }
38 
39     engine->task.thread = thr;
40     engine->task.log = thr->log;
41     engine->task.ident = nxt_task_next_ident();
42 
43     thr->engine = engine;
44     thr->fiber = &engine->fibers->fiber;
45 
46     engine->batch = batch;
47 
48     if (flags & NXT_ENGINE_FIBERS) {
49         engine->fibers = nxt_fiber_main_create(engine);
50         if (engine->fibers == NULL) {
51             goto fibers_fail;
52         }
53     }
54 
55     engine->current_work_queue = &engine->fast_work_queue;
56 
57     nxt_work_queue_cache_create(&engine->work_queue_cache, 0);
58 
59     engine->fast_work_queue.cache = &engine->work_queue_cache;
60     engine->accept_work_queue.cache = &engine->work_queue_cache;
61     engine->read_work_queue.cache = &engine->work_queue_cache;
62     engine->socket_work_queue.cache = &engine->work_queue_cache;
63     engine->connect_work_queue.cache = &engine->work_queue_cache;
64     engine->write_work_queue.cache = &engine->work_queue_cache;
65     engine->shutdown_work_queue.cache = &engine->work_queue_cache;
66     engine->close_work_queue.cache = &engine->work_queue_cache;
67     engine->final_work_queue.cache = &engine->work_queue_cache;
68 
69     nxt_work_queue_name(&engine->fast_work_queue, "fast");
70     nxt_work_queue_name(&engine->accept_work_queue, "accept");
71     nxt_work_queue_name(&engine->read_work_queue, "read");
72     nxt_work_queue_name(&engine->socket_work_queue, "socket");
73     nxt_work_queue_name(&engine->connect_work_queue, "connect");
74     nxt_work_queue_name(&engine->write_work_queue, "write");
75     nxt_work_queue_name(&engine->shutdown_work_queue, "shutdown");
76     nxt_work_queue_name(&engine->close_work_queue, "close");
77     nxt_work_queue_name(&engine->final_work_queue, "final");
78 
79     if (signals != NULL) {
80         engine->signals = nxt_event_engine_signals(signals);
81         if (engine->signals == NULL) {
82             goto signals_fail;
83         }
84 
85         engine->signals->handler = nxt_event_engine_signal_handler;
86 
87         if (!event_set->signal_support) {
88             if (nxt_event_engine_signals_start(engine) != NXT_OK) {
89                 goto signals_fail;
90             }
91         }
92     }
93 
94     /*
95      * Number of event set and timers changes should be at least twice
96      * more than number of events to avoid premature flushes of the changes.
97      * Fourfold is for sure.
98      */
99     events = (batch != 0) ? batch : 32;
100 
101     engine->event_set = event_set->create(engine->signals, 4 * events, events);
102     if (engine->event_set == NULL) {
103         goto event_set_fail;
104     }
105 
106     engine->event = event_set;
107 
108     if (nxt_event_engine_post_init(engine) != NXT_OK) {
109         goto post_fail;
110     }
111 
112     if (nxt_event_timers_init(&engine->timers, 4 * events) != NXT_OK) {
113         goto timers_fail;
114     }
115 
116     nxt_thread_time_update(thr);
117     engine->timers.now = nxt_thread_monotonic_time(thr) / 1000000;
118 
119     engine->max_connections = 0xffffffff;
120 
121     nxt_queue_init(&engine->listen_connections);
122     nxt_queue_init(&engine->idle_connections);
123 
124     engine->thread = thr;
125 
126 #if !(NXT_THREADS)
127 
128     if (engine->event->signal_support) {
129         thr->time.signal = -1;
130     }
131 
132 #endif
133 
134     return engine;
135 
136 timers_fail:
137 post_fail:
138 
139     event_set->free(engine->event_set);
140 
141 event_set_fail:
142 signals_fail:
143 
144     nxt_free(engine->signals);
145     nxt_work_queue_cache_destroy(&engine->work_queue_cache);
146     nxt_free(engine->fibers);
147 
148 fibers_fail:
149 
150     nxt_free(engine);
151     return NULL;
152 }
153 
154 
155 static nxt_int_t
156 nxt_event_engine_post_init(nxt_event_engine_t *engine)
157 {
158     if (engine->event->enable_post != NULL) {
159         return engine->event->enable_post(engine->event_set,
160                                           nxt_event_engine_post_handler);
161     }
162 
163 #if !(NXT_THREADS)
164 
165     /* Only signals may are posted in single-threaded mode. */
166 
167     if (engine->event->signal_support) {
168         return NXT_OK;
169     }
170 
171 #endif
172 
173     if (nxt_event_engine_signal_pipe_create(engine) != NXT_OK) {
174         return NXT_ERROR;
175     }
176 
177     return NXT_OK;
178 }
179 
180 
181 static nxt_int_t
182 nxt_event_engine_signal_pipe_create(nxt_event_engine_t *engine)
183 {
184     nxt_event_engine_pipe_t  *pipe;
185 
186     pipe = nxt_zalloc(sizeof(nxt_event_engine_pipe_t));
187     if (pipe == NULL) {
188         return NXT_ERROR;
189     }
190 
191     engine->pipe = pipe;
192 
193     /*
194      * An event engine pipe is in blocking mode for writer
195      * and in non-blocking node for reader.
196      */
197 
198     if (nxt_pipe_create(pipe->fds, 1, 0) != NXT_OK) {
199         nxt_free(pipe);
200         return NXT_ERROR;
201     }
202 
203     pipe->event.fd = pipe->fds[0];
204     pipe->event.read_work_queue = &engine->fast_work_queue;
205     pipe->event.read_handler = nxt_event_engine_signal_pipe;
206     pipe->event.write_work_queue = &engine->fast_work_queue;
207     pipe->event.error_handler = nxt_event_engine_signal_pipe_error;
208     pipe->event.log = &nxt_main_log;
209 
210     nxt_event_fd_enable_read(engine, &pipe->event);
211 
212     return NXT_OK;
213 }
214 
215 
216 static void
217 nxt_event_engine_signal_pipe_free(nxt_event_engine_t *engine)
218 {
219     nxt_event_engine_pipe_t  *pipe;
220 
221     pipe = engine->pipe;
222 
223     if (pipe != NULL) {
224 
225         if (pipe->event.read_work_queue != NULL) {
226             nxt_event_fd_close(engine, &pipe->event);
227             nxt_pipe_close(pipe->fds);
228         }
229 
230         nxt_free(pipe);
231     }
232 }
233 
234 
235 static void
236 nxt_event_engine_signal_pipe_close(nxt_task_t *task, void *obj, void *data)
237 {
238     nxt_event_engine_pipe_t  *pipe;
239 
240     pipe = obj;
241 
242     nxt_pipe_close(pipe->fds);
243     nxt_free(pipe);
244 }
245 
246 
247 void
248 nxt_event_engine_post(nxt_event_engine_t *engine, nxt_work_t *work)
249 {
250     nxt_thread_log_debug("event engine post");
251 
252     nxt_locked_work_queue_add(&engine->locked_work_queue, work);
253 
254     nxt_event_engine_signal(engine, 0);
255 }
256 
257 
258 void
259 nxt_event_engine_signal(nxt_event_engine_t *engine, nxt_uint_t signo)
260 {
261     u_char  buf;
262 
263     nxt_thread_log_debug("event engine signal:%ui", signo);
264 
265     /*
266      * A signal number may be sent in a signal context, so the signal
267      * information cannot be passed via a locked work queue.
268      */
269 
270     if (engine->event->signal != NULL) {
271         engine->event->signal(engine->event_set, signo);
272         return;
273     }
274 
275     buf = (u_char) signo;
276     (void) nxt_fd_write(engine->pipe->fds[1], &buf, 1);
277 }
278 
279 
280 static void
281 nxt_event_engine_signal_pipe(nxt_task_t *task, void *obj, void *data)
282 {
283     int             i, n;
284     u_char          signo;
285     nxt_bool_t      post;
286     nxt_event_fd_t  *ev;
287     u_char          buf[128];
288 
289     ev = obj;
290 
291     nxt_debug(task, "engine signal pipe");
292 
293     post = 0;
294 
295     do {
296         n = nxt_fd_read(ev->fd, buf, sizeof(buf));
297 
298         for (i = 0; i < n; i++) {
299             signo = buf[i];
300 
301             nxt_debug(task, "engine pipe signo:%d", signo);
302 
303             if (signo == 0) {
304                 /* A post should be processed only once. */
305                 post = 1;
306 
307             } else {
308                 nxt_event_engine_signal_handler(task,
309                                              (void *) (uintptr_t) signo, NULL);
310             }
311         }
312 
313     } while (n == sizeof(buf));
314 
315     if (post) {
316         nxt_event_engine_post_handler(task, NULL, NULL);
317     }
318 }
319 
320 
321 static void
322 nxt_event_engine_post_handler(nxt_task_t *task, void *obj, void *data)
323 {
324     nxt_thread_t        *thread;
325     nxt_event_engine_t  *engine;
326 
327     thread = task->thread;
328     engine = thread->engine;
329 
330     nxt_locked_work_queue_move(thread, &engine->locked_work_queue,
331                                &engine->fast_work_queue);
332 }
333 
334 
335 static void
336 nxt_event_engine_signal_pipe_error(nxt_task_t *task, void *obj, void *data)
337 {
338     nxt_event_engine_t  *engine;
339 
340     engine = task->thread->engine;
341 
342     nxt_log(task, NXT_LOG_CRIT, "engine pipe(%FD:%FD) event error",
343             engine->pipe->fds[0], engine->pipe->fds[1]);
344 
345     nxt_event_fd_close(engine, &engine->pipe->event);
346     nxt_pipe_close(engine->pipe->fds);
347 }
348 
349 
350 static void
351 nxt_event_engine_signal_handler(nxt_task_t *task, void *obj, void *data)
352 {
353     uintptr_t              signo;
354     const nxt_event_sig_t  *sigev;
355 
356     signo = (uintptr_t) obj;
357 
358     for (sigev = task->thread->engine->signals->sigev;
359          sigev->signo != 0;
360          sigev++)
361     {
362         if (signo == (nxt_uint_t) sigev->signo) {
363             sigev->handler(task, (void *) signo, (void *) sigev->name);
364             return;
365         }
366     }
367 
368     nxt_log(task, NXT_LOG_CRIT, "signal %ui handler not found", signo);
369 }
370 
371 
372 nxt_int_t
373 nxt_event_engine_change(nxt_thread_t *thr, nxt_task_t *task,
374     const nxt_event_set_ops_t *event_set, nxt_uint_t batch)
375 {
376     nxt_uint_t          events;
377     nxt_event_engine_t  *engine;
378 
379     engine = thr->engine;
380     engine->batch = batch;
381 
382     if (!engine->event->signal_support && event_set->signal_support) {
383         /*
384          * Block signal processing if the current event
385          * facility does not support signal processing.
386          */
387         nxt_event_engine_signals_stop(engine);
388 
389         /*
390          * Add to engine fast work queue the signal events possibly
391          * received before the blocking signal processing.
392          */
393         nxt_event_engine_signal_pipe(task, &engine->pipe->event, NULL);
394     }
395 
396     if (engine->pipe != NULL && event_set->enable_post != NULL) {
397         /*
398          * An engine pipe must be closed after all signal events
399          * added above to engine fast work queue will be processed.
400          */
401         nxt_work_queue_add(&engine->final_work_queue,
402                            nxt_event_engine_signal_pipe_close,
403                            &engine->task, engine->pipe, NULL);
404 
405         engine->pipe = NULL;
406     }
407 
408     engine->event->free(engine->event_set);
409 
410     events = (batch != 0) ? batch : 32;
411 
412     engine->event_set = event_set->create(engine->signals, 4 * events, events);
413     if (engine->event_set == NULL) {
414         return NXT_ERROR;
415     }
416 
417     engine->event = event_set;
418 
419     if (nxt_event_engine_post_init(engine) != NXT_OK) {
420         return NXT_ERROR;
421     }
422 
423     if (engine->signals != NULL) {
424 
425         if (!engine->event->signal_support) {
426             return nxt_event_engine_signals_start(engine);
427         }
428 
429 #if (NXT_THREADS)
430         /*
431          * Reset the PID flag to start the signal thread if
432          * some future event facility will not support signals.
433          */
434         engine->signals->process = 0;
435 #endif
436     }
437 
438     return NXT_OK;
439 }
440 
441 
442 void
443 nxt_event_engine_free(nxt_event_engine_t *engine)
444 {
445     nxt_event_engine_signal_pipe_free(engine);
446     nxt_free(engine->signals);
447 
448     nxt_work_queue_cache_destroy(&engine->work_queue_cache);
449 
450     engine->event->free(engine->event_set);
451 
452     /* TODO: free timers */
453 
454     nxt_free(engine);
455 }
456 
457 
458 static nxt_work_handler_t
459 nxt_event_engine_queue_pop(nxt_event_engine_t *engine, nxt_task_t **task,
460     void **obj, void **data)
461 {
462     nxt_work_queue_t  *wq;
463 
464     wq = engine->current_work_queue;
465 
466     if (wq->head == NULL) {
467         wq = &engine->fast_work_queue;
468 
469         while (wq->head == NULL) {
470             engine->current_work_queue++;
471             wq = engine->current_work_queue;
472 
473             if (wq > &engine->final_work_queue) {
474                 engine->current_work_queue = &engine->fast_work_queue;
475 
476                 return NULL;
477             }
478         }
479     }
480 
481     nxt_debug(&engine->task, "work queue: %s", wq->name);
482 
483     return nxt_work_queue_pop(wq, task, obj, data);
484 }
485 
486 
487 void
488 nxt_event_engine_start(nxt_event_engine_t *engine)
489 {
490     void                *obj, *data;
491     nxt_task_t          *task;
492     nxt_msec_t          timeout, now;
493     nxt_thread_t        *thr;
494     nxt_work_handler_t  handler;
495 
496     thr = nxt_thread();
497 
498     if (engine->fibers) {
499         /*
500          * _setjmp() cannot be wrapped in a function since return from
501          * the function clobbers stack used by future _setjmp() returns.
502          */
503         _setjmp(engine->fibers->fiber.jmp);
504 
505         /* A return point from fibers. */
506     }
507 
508     thr->log = &nxt_main_log;
509 
510     for ( ;; ) {
511 
512         for ( ;; ) {
513             handler = nxt_event_engine_queue_pop(engine, &task, &obj, &data);
514 
515             if (handler == NULL) {
516                 break;
517             }
518 
519             handler(task, obj, data);
520         }
521 
522         /* Attach some event engine work queues in preferred order. */
523 
524         timeout = nxt_event_timer_find(engine);
525 
526         engine->event->poll(&engine->task, engine->event_set, timeout);
527 
528         /*
529          * Look up expired timers only if a new zero timer has been
530          * just added before the event poll or if the event poll slept
531          * at least 1 millisecond, because all old eligible timers were
532          * processed in the previous iterations.
533          */
534 
535         now = nxt_thread_monotonic_time(thr) / 1000000;
536 
537         if (timeout == 0 || now != engine->timers.now) {
538             nxt_event_timer_expire(thr, now);
539         }
540     }
541 }
542