xref: /unit/src/nxt_event_engine.c (revision 12:477899a6661b)
1 
2 /*
3  * Copyright (C) Igor Sysoev
4  * Copyright (C) NGINX, Inc.
5  */
6 
7 #include <nxt_main.h>
8 
9 
10 static nxt_int_t nxt_event_engine_post_init(nxt_event_engine_t *engine);
11 static nxt_int_t nxt_event_engine_signal_pipe_create(
12     nxt_event_engine_t *engine);
13 static void nxt_event_engine_signal_pipe_close(nxt_task_t *task, void *obj,
14     void *data);
15 static void nxt_event_engine_signal_pipe(nxt_task_t *task, void *obj,
16     void *data);
17 static void nxt_event_engine_post_handler(nxt_task_t *task, void *obj,
18     void *data);
19 static void nxt_event_engine_signal_pipe_error(nxt_task_t *task, void *obj,
20     void *data);
21 static void nxt_event_engine_signal_handler(nxt_task_t *task, void *obj,
22     void *data);
23 static nxt_work_handler_t nxt_event_engine_queue_pop(nxt_event_engine_t *engine,
24     nxt_task_t **task, void **obj, void **data);
25 
26 
27 nxt_event_engine_t *
28 nxt_event_engine_create(nxt_thread_t *thr,
29     const nxt_event_interface_t *interface, const nxt_sig_event_t *signals,
30     nxt_uint_t flags, nxt_uint_t batch)
31 {
32     nxt_uint_t          events;
33     nxt_event_engine_t  *engine;
34 
35     engine = nxt_zalloc(sizeof(nxt_event_engine_t));
36     if (engine == NULL) {
37         return NULL;
38     }
39 
40     engine->task.thread = thr;
41     engine->task.log = thr->log;
42     engine->task.ident = nxt_task_next_ident();
43 
44     thr->engine = engine;
45     thr->fiber = &engine->fibers->fiber;
46 
47     engine->batch = batch;
48 
49     if (flags & NXT_ENGINE_FIBERS) {
50         engine->fibers = nxt_fiber_main_create(engine);
51         if (engine->fibers == NULL) {
52             goto fibers_fail;
53         }
54     }
55 
56     engine->current_work_queue = &engine->fast_work_queue;
57 
58     nxt_work_queue_cache_create(&engine->work_queue_cache, 0);
59 
60     engine->fast_work_queue.cache = &engine->work_queue_cache;
61     engine->accept_work_queue.cache = &engine->work_queue_cache;
62     engine->read_work_queue.cache = &engine->work_queue_cache;
63     engine->socket_work_queue.cache = &engine->work_queue_cache;
64     engine->connect_work_queue.cache = &engine->work_queue_cache;
65     engine->write_work_queue.cache = &engine->work_queue_cache;
66     engine->shutdown_work_queue.cache = &engine->work_queue_cache;
67     engine->close_work_queue.cache = &engine->work_queue_cache;
68 
69     nxt_work_queue_name(&engine->fast_work_queue, "fast");
70     nxt_work_queue_name(&engine->accept_work_queue, "accept");
71     nxt_work_queue_name(&engine->read_work_queue, "read");
72     nxt_work_queue_name(&engine->socket_work_queue, "socket");
73     nxt_work_queue_name(&engine->connect_work_queue, "connect");
74     nxt_work_queue_name(&engine->write_work_queue, "write");
75     nxt_work_queue_name(&engine->shutdown_work_queue, "shutdown");
76     nxt_work_queue_name(&engine->close_work_queue, "close");
77 
78     if (signals != NULL) {
79         engine->signals = nxt_event_engine_signals(signals);
80         if (engine->signals == NULL) {
81             goto signals_fail;
82         }
83 
84         engine->signals->handler = nxt_event_engine_signal_handler;
85 
86         if (!interface->signal_support) {
87             if (nxt_event_engine_signals_start(engine) != NXT_OK) {
88                 goto signals_fail;
89             }
90         }
91     }
92 
93     /*
94      * Number of event set and timers changes should be at least twice
95      * more than number of events to avoid premature flushes of the changes.
96      * Fourfold is for sure.
97      */
98     events = (batch != 0) ? batch : 32;
99 
100     if (interface->create(engine, 4 * events, events) != NXT_OK) {
101         goto event_set_fail;
102     }
103 
104     engine->event = *interface;
105 
106     if (nxt_event_engine_post_init(engine) != NXT_OK) {
107         goto post_fail;
108     }
109 
110     if (nxt_timers_init(&engine->timers, 4 * events) != NXT_OK) {
111         goto timers_fail;
112     }
113 
114     nxt_thread_time_update(thr);
115     engine->timers.now = nxt_thread_monotonic_time(thr) / 1000000;
116 
117     engine->max_connections = 0xffffffff;
118 
119     nxt_queue_init(&engine->listen_connections);
120     nxt_queue_init(&engine->idle_connections);
121 
122 #if !(NXT_THREADS)
123 
124     if (interface->signal_support) {
125         thr->time.signal = -1;
126     }
127 
128 #endif
129 
130     return engine;
131 
132 timers_fail:
133 post_fail:
134 
135     interface->free(engine);
136 
137 event_set_fail:
138 signals_fail:
139 
140     nxt_free(engine->signals);
141     nxt_work_queue_cache_destroy(&engine->work_queue_cache);
142     nxt_free(engine->fibers);
143 
144 fibers_fail:
145 
146     nxt_free(engine);
147     return NULL;
148 }
149 
150 
151 static nxt_int_t
152 nxt_event_engine_post_init(nxt_event_engine_t *engine)
153 {
154     if (engine->event.enable_post != NULL) {
155         return engine->event.enable_post(engine, nxt_event_engine_post_handler);
156     }
157 
158 #if !(NXT_THREADS)
159 
160     /* Only signals may are posted in single-threaded mode. */
161 
162     if (engine->event->signal_support) {
163         return NXT_OK;
164     }
165 
166 #endif
167 
168     if (nxt_event_engine_signal_pipe_create(engine) != NXT_OK) {
169         return NXT_ERROR;
170     }
171 
172     return NXT_OK;
173 }
174 
175 
176 static nxt_int_t
177 nxt_event_engine_signal_pipe_create(nxt_event_engine_t *engine)
178 {
179     nxt_event_engine_pipe_t  *pipe;
180 
181     pipe = nxt_zalloc(sizeof(nxt_event_engine_pipe_t));
182     if (pipe == NULL) {
183         return NXT_ERROR;
184     }
185 
186     engine->pipe = pipe;
187 
188     /*
189      * An event engine pipe is in blocking mode for writer
190      * and in non-blocking node for reader.
191      */
192 
193     if (nxt_pipe_create(pipe->fds, 1, 0) != NXT_OK) {
194         nxt_free(pipe);
195         return NXT_ERROR;
196     }
197 
198     pipe->event.fd = pipe->fds[0];
199     pipe->event.task = &engine->task;
200     pipe->event.read_work_queue = &engine->fast_work_queue;
201     pipe->event.read_handler = nxt_event_engine_signal_pipe;
202     pipe->event.write_work_queue = &engine->fast_work_queue;
203     pipe->event.error_handler = nxt_event_engine_signal_pipe_error;
204     pipe->event.log = engine->task.log;
205 
206     nxt_fd_event_enable_read(engine, &pipe->event);
207 
208     return NXT_OK;
209 }
210 
211 
212 static void
213 nxt_event_engine_signal_pipe_free(nxt_event_engine_t *engine)
214 {
215     nxt_event_engine_pipe_t  *pipe;
216 
217     pipe = engine->pipe;
218 
219     if (pipe != NULL) {
220 
221         if (pipe->event.read_work_queue != NULL) {
222             nxt_fd_event_close(engine, &pipe->event);
223             nxt_pipe_close(pipe->fds);
224         }
225 
226         nxt_free(pipe);
227     }
228 }
229 
230 
231 static void
232 nxt_event_engine_signal_pipe_close(nxt_task_t *task, void *obj, void *data)
233 {
234     nxt_event_engine_pipe_t  *pipe;
235 
236     pipe = obj;
237 
238     nxt_pipe_close(pipe->fds);
239     nxt_free(pipe);
240 }
241 
242 
243 void
244 nxt_event_engine_post(nxt_event_engine_t *engine, nxt_work_t *work)
245 {
246     nxt_debug(&engine->task, "event engine post");
247 
248     nxt_locked_work_queue_add(&engine->locked_work_queue, work);
249 
250     nxt_event_engine_signal(engine, 0);
251 }
252 
253 
254 void
255 nxt_event_engine_signal(nxt_event_engine_t *engine, nxt_uint_t signo)
256 {
257     u_char  buf;
258 
259     nxt_debug(&engine->task, "event engine signal:%ui", signo);
260 
261     /*
262      * A signal number may be sent in a signal context, so the signal
263      * information cannot be passed via a locked work queue.
264      */
265 
266     if (engine->event.signal != NULL) {
267         engine->event.signal(engine, signo);
268         return;
269     }
270 
271     buf = (u_char) signo;
272     (void) nxt_fd_write(engine->pipe->fds[1], &buf, 1);
273 }
274 
275 
276 static void
277 nxt_event_engine_signal_pipe(nxt_task_t *task, void *obj, void *data)
278 {
279     int             i, n;
280     u_char          signo;
281     nxt_bool_t      post;
282     nxt_fd_event_t  *ev;
283     u_char          buf[128];
284 
285     ev = obj;
286 
287     nxt_debug(task, "engine signal pipe");
288 
289     post = 0;
290 
291     do {
292         n = nxt_fd_read(ev->fd, buf, sizeof(buf));
293 
294         for (i = 0; i < n; i++) {
295             signo = buf[i];
296 
297             nxt_debug(task, "engine pipe signo:%d", signo);
298 
299             if (signo == 0) {
300                 /* A post should be processed only once. */
301                 post = 1;
302 
303             } else {
304                 nxt_event_engine_signal_handler(task,
305                                              (void *) (uintptr_t) signo, NULL);
306             }
307         }
308 
309     } while (n == sizeof(buf));
310 
311     if (post) {
312         nxt_event_engine_post_handler(task, NULL, NULL);
313     }
314 }
315 
316 
317 static void
318 nxt_event_engine_post_handler(nxt_task_t *task, void *obj, void *data)
319 {
320     nxt_thread_t        *thread;
321     nxt_event_engine_t  *engine;
322 
323     thread = task->thread;
324     engine = thread->engine;
325 
326     nxt_locked_work_queue_move(thread, &engine->locked_work_queue,
327                                &engine->fast_work_queue);
328 }
329 
330 
331 static void
332 nxt_event_engine_signal_pipe_error(nxt_task_t *task, void *obj, void *data)
333 {
334     nxt_event_engine_t  *engine;
335 
336     engine = task->thread->engine;
337 
338     nxt_log(task, NXT_LOG_CRIT, "engine pipe(%FD:%FD) event error",
339             engine->pipe->fds[0], engine->pipe->fds[1]);
340 
341     nxt_fd_event_close(engine, &engine->pipe->event);
342     nxt_pipe_close(engine->pipe->fds);
343 }
344 
345 
346 static void
347 nxt_event_engine_signal_handler(nxt_task_t *task, void *obj, void *data)
348 {
349     uintptr_t              signo;
350     const nxt_sig_event_t  *sigev;
351 
352     signo = (uintptr_t) obj;
353 
354     for (sigev = task->thread->engine->signals->sigev;
355          sigev->signo != 0;
356          sigev++)
357     {
358         if (signo == (nxt_uint_t) sigev->signo) {
359             sigev->handler(task, (void *) signo, (void *) sigev->name);
360             return;
361         }
362     }
363 
364     nxt_log(task, NXT_LOG_CRIT, "signal %ui handler not found", signo);
365 }
366 
367 
368 nxt_int_t
369 nxt_event_engine_change(nxt_thread_t *thr, nxt_task_t *task,
370     const nxt_event_interface_t *interface, nxt_uint_t batch)
371 {
372     nxt_uint_t          events;
373     nxt_event_engine_t  *engine;
374 
375     engine = thr->engine;
376     engine->batch = batch;
377 
378     if (!engine->event.signal_support && interface->signal_support) {
379         /*
380          * Block signal processing if the current event
381          * facility does not support signal processing.
382          */
383         nxt_event_engine_signals_stop(engine);
384 
385         /*
386          * Add to engine fast work queue the signal events possibly
387          * received before the blocking signal processing.
388          */
389         nxt_event_engine_signal_pipe(task, &engine->pipe->event, NULL);
390     }
391 
392     if (engine->pipe != NULL && interface->enable_post != NULL) {
393         /*
394          * An engine pipe must be closed after all signal events
395          * added above to engine fast work queue will be processed.
396          */
397         nxt_work_queue_add(&engine->fast_work_queue,
398                            nxt_event_engine_signal_pipe_close,
399                            &engine->task, engine->pipe, NULL);
400 
401         engine->pipe = NULL;
402     }
403 
404     engine->event.free(engine);
405 
406     events = (batch != 0) ? batch : 32;
407 
408     if (interface->create(engine, 4 * events, events) != NXT_OK) {
409         return NXT_ERROR;
410     }
411 
412     engine->event = *interface;
413 
414     if (nxt_event_engine_post_init(engine) != NXT_OK) {
415         return NXT_ERROR;
416     }
417 
418     if (engine->signals != NULL) {
419 
420         if (!engine->event.signal_support) {
421             return nxt_event_engine_signals_start(engine);
422         }
423 
424 #if (NXT_THREADS)
425         /*
426          * Reset the PID flag to start the signal thread if
427          * some future event facility will not support signals.
428          */
429         engine->signals->process = 0;
430 #endif
431     }
432 
433     return NXT_OK;
434 }
435 
436 
437 void
438 nxt_event_engine_free(nxt_event_engine_t *engine)
439 {
440     nxt_event_engine_signal_pipe_free(engine);
441     nxt_free(engine->signals);
442 
443     nxt_work_queue_cache_destroy(&engine->work_queue_cache);
444 
445     engine->event.free(engine);
446 
447     /* TODO: free timers */
448 
449     nxt_free(engine);
450 }
451 
452 
453 static nxt_work_handler_t
454 nxt_event_engine_queue_pop(nxt_event_engine_t *engine, nxt_task_t **task,
455     void **obj, void **data)
456 {
457     nxt_work_queue_t  *wq, *last;
458 
459     wq = engine->current_work_queue;
460     last = wq;
461 
462     if (wq->head == NULL) {
463         wq = &engine->fast_work_queue;
464 
465         if (wq->head == NULL) {
466 
467             do {
468                 engine->current_work_queue++;
469                 wq = engine->current_work_queue;
470 
471                 if (wq > &engine->close_work_queue) {
472                     wq = &engine->fast_work_queue;
473                     engine->current_work_queue = wq;
474                 }
475 
476                 if (wq->head != NULL) {
477                     goto found;
478                 }
479 
480             } while (wq != last);
481 
482             engine->current_work_queue = &engine->fast_work_queue;
483 
484             return NULL;
485         }
486     }
487 
488 found:
489 
490     nxt_debug(&engine->task, "work queue: %s", wq->name);
491 
492     return nxt_work_queue_pop(wq, task, obj, data);
493 }
494 
495 
496 void
497 nxt_event_engine_start(nxt_event_engine_t *engine)
498 {
499     void                *obj, *data;
500     nxt_task_t          *task;
501     nxt_msec_t          timeout, now;
502     nxt_thread_t        *thr;
503     nxt_work_handler_t  handler;
504 
505     thr = nxt_thread();
506 
507     if (engine->fibers) {
508         /*
509          * _setjmp() cannot be wrapped in a function since return from
510          * the function clobbers stack used by future _setjmp() returns.
511          */
512         _setjmp(engine->fibers->fiber.jmp);
513 
514         /* A return point from fibers. */
515     }
516 
517     thr->log = engine->task.log;
518 
519     for ( ;; ) {
520 
521         for ( ;; ) {
522             handler = nxt_event_engine_queue_pop(engine, &task, &obj, &data);
523 
524             if (handler == NULL) {
525                 break;
526             }
527 
528             handler(task, obj, data);
529         }
530 
531         /* Attach some event engine work queues in preferred order. */
532 
533         timeout = nxt_timer_find(engine);
534 
535         engine->event.poll(engine, timeout);
536 
537         now = nxt_thread_monotonic_time(thr) / 1000000;
538 
539         nxt_timer_expire(engine, now);
540     }
541 }
542