xref: /unit/src/nxt_event_engine.c (revision 611:323e11065f83)
1 
2 /*
3  * Copyright (C) Igor Sysoev
4  * Copyright (C) NGINX, Inc.
5  */
6 
7 #include <nxt_main.h>
8 
9 
10 typedef struct nxt_mem_cache_block_s  nxt_mem_cache_block_t;
11 
12 struct nxt_mem_cache_block_s {
13     nxt_mem_cache_block_t  *next;
14 };
15 
16 
17 typedef struct {
18     nxt_mem_cache_block_t  *free;
19     uint32_t               size;
20     uint32_t               count;
21 } nxt_mem_cache_t;
22 
23 
24 static nxt_int_t nxt_event_engine_post_init(nxt_event_engine_t *engine);
25 static nxt_int_t nxt_event_engine_signal_pipe_create(
26     nxt_event_engine_t *engine);
27 static void nxt_event_engine_signal_pipe_close(nxt_task_t *task, void *obj,
28     void *data);
29 static void nxt_event_engine_signal_pipe(nxt_task_t *task, void *obj,
30     void *data);
31 static void nxt_event_engine_post_handler(nxt_task_t *task, void *obj,
32     void *data);
33 static void nxt_event_engine_signal_pipe_error(nxt_task_t *task, void *obj,
34     void *data);
35 static void nxt_event_engine_signal_handler(nxt_task_t *task, void *obj,
36     void *data);
37 static nxt_work_handler_t nxt_event_engine_queue_pop(nxt_event_engine_t *engine,
38     nxt_task_t **task, void **obj, void **data);
39 
40 
41 nxt_event_engine_t *
42 nxt_event_engine_create(nxt_task_t *task,
43     const nxt_event_interface_t *interface, const nxt_sig_event_t *signals,
44     nxt_uint_t flags, nxt_uint_t batch)
45 {
46     nxt_uint_t          events;
47     nxt_thread_t        *thread;
48     nxt_event_engine_t  *engine;
49 
50     engine = nxt_zalloc(sizeof(nxt_event_engine_t));
51     if (engine == NULL) {
52         return NULL;
53     }
54 
55     nxt_debug(task, "create engine %p", engine);
56 
57     thread = task->thread;
58 
59     engine->task.thread = thread;
60     engine->task.log = thread->log;
61     engine->task.ident = nxt_task_next_ident();
62 
63     engine->batch = batch;
64 
65 #if 0
66     if (flags & NXT_ENGINE_FIBERS) {
67         engine->fibers = nxt_fiber_main_create(engine);
68         if (engine->fibers == NULL) {
69             goto fibers_fail;
70         }
71     }
72 #endif
73 
74     engine->current_work_queue = &engine->fast_work_queue;
75 
76     nxt_work_queue_cache_create(&engine->work_queue_cache, 0);
77 
78     engine->fast_work_queue.cache = &engine->work_queue_cache;
79     engine->accept_work_queue.cache = &engine->work_queue_cache;
80     engine->read_work_queue.cache = &engine->work_queue_cache;
81     engine->socket_work_queue.cache = &engine->work_queue_cache;
82     engine->connect_work_queue.cache = &engine->work_queue_cache;
83     engine->write_work_queue.cache = &engine->work_queue_cache;
84     engine->shutdown_work_queue.cache = &engine->work_queue_cache;
85     engine->close_work_queue.cache = &engine->work_queue_cache;
86 
87     nxt_work_queue_name(&engine->fast_work_queue, "fast");
88     nxt_work_queue_name(&engine->accept_work_queue, "accept");
89     nxt_work_queue_name(&engine->read_work_queue, "read");
90     nxt_work_queue_name(&engine->socket_work_queue, "socket");
91     nxt_work_queue_name(&engine->connect_work_queue, "connect");
92     nxt_work_queue_name(&engine->write_work_queue, "write");
93     nxt_work_queue_name(&engine->shutdown_work_queue, "shutdown");
94     nxt_work_queue_name(&engine->close_work_queue, "close");
95 
96     if (signals != NULL) {
97         engine->signals = nxt_event_engine_signals(signals);
98         if (engine->signals == NULL) {
99             goto signals_fail;
100         }
101 
102         engine->signals->handler = nxt_event_engine_signal_handler;
103 
104         if (!interface->signal_support) {
105             if (nxt_event_engine_signals_start(engine) != NXT_OK) {
106                 goto signals_fail;
107             }
108         }
109     }
110 
111     /*
112      * Number of event set and timers changes should be at least twice
113      * more than number of events to avoid premature flushes of the changes.
114      * Fourfold is for sure.
115      */
116     events = (batch != 0) ? batch : 32;
117 
118     if (interface->create(engine, 4 * events, events) != NXT_OK) {
119         goto event_set_fail;
120     }
121 
122     engine->event = *interface;
123 
124     if (nxt_event_engine_post_init(engine) != NXT_OK) {
125         goto post_fail;
126     }
127 
128     if (nxt_timers_init(&engine->timers, 4 * events) != NXT_OK) {
129         goto timers_fail;
130     }
131 
132     thread = task->thread;
133 
134     nxt_thread_time_update(thread);
135     engine->timers.now = nxt_thread_monotonic_time(thread) / 1000000;
136 
137     engine->max_connections = 0xFFFFFFFF;
138 
139     nxt_queue_init(&engine->joints);
140     nxt_queue_init(&engine->listen_connections);
141     nxt_queue_init(&engine->idle_connections);
142 
143     return engine;
144 
145 timers_fail:
146 post_fail:
147 
148     interface->free(engine);
149 
150 event_set_fail:
151 signals_fail:
152 
153     nxt_free(engine->signals);
154     nxt_work_queue_cache_destroy(&engine->work_queue_cache);
155     nxt_free(engine->fibers);
156 
157 #if 0
158 fibers_fail:
159 
160     nxt_free(engine);
161 #endif
162 
163     return NULL;
164 }
165 
166 
167 static nxt_int_t
168 nxt_event_engine_post_init(nxt_event_engine_t *engine)
169 {
170     if (engine->event.enable_post != NULL) {
171         return engine->event.enable_post(engine, nxt_event_engine_post_handler);
172     }
173 
174     if (nxt_event_engine_signal_pipe_create(engine) != NXT_OK) {
175         return NXT_ERROR;
176     }
177 
178     return NXT_OK;
179 }
180 
181 
182 static nxt_int_t
183 nxt_event_engine_signal_pipe_create(nxt_event_engine_t *engine)
184 {
185     nxt_event_engine_pipe_t  *pipe;
186 
187     pipe = nxt_zalloc(sizeof(nxt_event_engine_pipe_t));
188     if (pipe == NULL) {
189         return NXT_ERROR;
190     }
191 
192     engine->pipe = pipe;
193 
194     /*
195      * An event engine pipe is in blocking mode for writer
196      * and in non-blocking node for reader.
197      */
198 
199     if (nxt_pipe_create(&engine->task, pipe->fds, 1, 0) != NXT_OK) {
200         nxt_free(pipe);
201         return NXT_ERROR;
202     }
203 
204     pipe->event.fd = pipe->fds[0];
205     pipe->event.task = &engine->task;
206     pipe->event.read_work_queue = &engine->fast_work_queue;
207     pipe->event.read_handler = nxt_event_engine_signal_pipe;
208     pipe->event.write_work_queue = &engine->fast_work_queue;
209     pipe->event.error_handler = nxt_event_engine_signal_pipe_error;
210     pipe->event.log = engine->task.log;
211 
212     nxt_fd_event_enable_read(engine, &pipe->event);
213 
214     return NXT_OK;
215 }
216 
217 
218 static void
219 nxt_event_engine_signal_pipe_free(nxt_event_engine_t *engine)
220 {
221     nxt_event_engine_pipe_t  *pipe;
222 
223     pipe = engine->pipe;
224 
225     if (pipe != NULL) {
226 
227         if (pipe->event.read_work_queue != NULL) {
228             nxt_fd_event_close(engine, &pipe->event);
229             nxt_pipe_close(pipe->event.task, pipe->fds);
230         }
231 
232         nxt_free(pipe);
233     }
234 }
235 
236 
237 static void
238 nxt_event_engine_signal_pipe_close(nxt_task_t *task, void *obj, void *data)
239 {
240     nxt_event_engine_pipe_t  *pipe;
241 
242     pipe = obj;
243 
244     nxt_pipe_close(pipe->event.task, pipe->fds);
245     nxt_free(pipe);
246 }
247 
248 
249 void
250 nxt_event_engine_post(nxt_event_engine_t *engine, nxt_work_t *work)
251 {
252     nxt_debug(&engine->task, "event engine post");
253 
254 #if (NXT_DEBUG)
255     if (nxt_slow_path(work->next != NULL)) {
256         nxt_debug(&engine->task, "event engine post multiple works");
257     }
258 #endif
259 
260     nxt_locked_work_queue_add(&engine->locked_work_queue, work);
261 
262     nxt_event_engine_signal(engine, 0);
263 }
264 
265 
266 void
267 nxt_event_engine_signal(nxt_event_engine_t *engine, nxt_uint_t signo)
268 {
269     u_char  buf;
270 
271     nxt_debug(&engine->task, "event engine signal:%ui", signo);
272 
273     /*
274      * A signal number may be sent in a signal context, so the signal
275      * information cannot be passed via a locked work queue.
276      */
277 
278     if (engine->event.signal != NULL) {
279         engine->event.signal(engine, signo);
280         return;
281     }
282 
283     buf = (u_char) signo;
284     (void) nxt_fd_write(engine->pipe->fds[1], &buf, 1);
285 }
286 
287 
288 static void
289 nxt_event_engine_signal_pipe(nxt_task_t *task, void *obj, void *data)
290 {
291     int             i, n;
292     u_char          signo;
293     nxt_bool_t      post;
294     nxt_fd_event_t  *ev;
295     u_char          buf[128];
296 
297     ev = obj;
298 
299     nxt_debug(task, "engine signal pipe");
300 
301     post = 0;
302 
303     do {
304         n = nxt_fd_read(ev->fd, buf, sizeof(buf));
305 
306         for (i = 0; i < n; i++) {
307             signo = buf[i];
308 
309             nxt_debug(task, "engine pipe signo:%d", signo);
310 
311             if (signo == 0) {
312                 /* A post should be processed only once. */
313                 post = 1;
314 
315             } else {
316                 nxt_event_engine_signal_handler(task,
317                                              (void *) (uintptr_t) signo, NULL);
318             }
319         }
320 
321     } while (n == sizeof(buf));
322 
323     if (post) {
324         nxt_event_engine_post_handler(task, NULL, NULL);
325     }
326 }
327 
328 
329 static void
330 nxt_event_engine_post_handler(nxt_task_t *task, void *obj, void *data)
331 {
332     nxt_thread_t        *thread;
333     nxt_event_engine_t  *engine;
334 
335     thread = task->thread;
336     engine = thread->engine;
337 
338     nxt_locked_work_queue_move(thread, &engine->locked_work_queue,
339                                &engine->fast_work_queue);
340 }
341 
342 
343 static void
344 nxt_event_engine_signal_pipe_error(nxt_task_t *task, void *obj, void *data)
345 {
346     nxt_event_engine_t       *engine;
347     nxt_event_engine_pipe_t  *pipe;
348 
349     engine = task->thread->engine;
350     pipe = engine->pipe;
351 
352     nxt_alert(task, "engine pipe(%FD:%FD) event error",
353               pipe->fds[0], pipe->fds[1]);
354 
355     nxt_fd_event_close(engine, &pipe->event);
356     nxt_pipe_close(pipe->event.task, pipe->fds);
357 }
358 
359 
360 static void
361 nxt_event_engine_signal_handler(nxt_task_t *task, void *obj, void *data)
362 {
363     uintptr_t              signo;
364     const nxt_sig_event_t  *sigev;
365 
366     signo = (uintptr_t) obj;
367 
368     for (sigev = task->thread->engine->signals->sigev;
369          sigev->signo != 0;
370          sigev++)
371     {
372         if (signo == (nxt_uint_t) sigev->signo) {
373             sigev->handler(task, (void *) signo, (void *) sigev->name);
374             return;
375         }
376     }
377 
378     nxt_alert(task, "signal %ui handler not found", (nxt_uint_t) signo);
379 }
380 
381 
382 nxt_int_t
383 nxt_event_engine_change(nxt_event_engine_t *engine,
384     const nxt_event_interface_t *interface, nxt_uint_t batch)
385 {
386     nxt_uint_t  events;
387 
388     engine->batch = batch;
389 
390     if (!engine->event.signal_support && interface->signal_support) {
391         /*
392          * Block signal processing if the current event
393          * facility does not support signal processing.
394          */
395         nxt_event_engine_signals_stop(engine);
396 
397         /*
398          * Add to engine fast work queue the signal events possibly
399          * received before the blocking signal processing.
400          */
401         nxt_event_engine_signal_pipe(&engine->task, &engine->pipe->event, NULL);
402     }
403 
404     if (engine->pipe != NULL && interface->enable_post != NULL) {
405         /*
406          * An engine pipe must be closed after all signal events
407          * added above to engine fast work queue will be processed.
408          */
409         nxt_work_queue_add(&engine->fast_work_queue,
410                            nxt_event_engine_signal_pipe_close,
411                            &engine->task, engine->pipe, NULL);
412 
413         engine->pipe = NULL;
414     }
415 
416     engine->event.free(engine);
417 
418     events = (batch != 0) ? batch : 32;
419 
420     if (interface->create(engine, 4 * events, events) != NXT_OK) {
421         return NXT_ERROR;
422     }
423 
424     engine->event = *interface;
425 
426     if (nxt_event_engine_post_init(engine) != NXT_OK) {
427         return NXT_ERROR;
428     }
429 
430     if (engine->signals != NULL) {
431 
432         if (!engine->event.signal_support) {
433             return nxt_event_engine_signals_start(engine);
434         }
435 
436         /*
437          * Reset the PID flag to start the signal thread if
438          * some future event facility will not support signals.
439          */
440         engine->signals->process = 0;
441     }
442 
443     return NXT_OK;
444 }
445 
446 
447 void
448 nxt_event_engine_free(nxt_event_engine_t *engine)
449 {
450     nxt_thread_log_debug("free engine %p", engine);
451 
452     nxt_event_engine_signal_pipe_free(engine);
453     nxt_free(engine->signals);
454 
455     nxt_work_queue_cache_destroy(&engine->work_queue_cache);
456 
457     engine->event.free(engine);
458 
459     /* TODO: free timers */
460 
461     nxt_free(engine);
462 }
463 
464 
465 static nxt_work_handler_t
466 nxt_event_engine_queue_pop(nxt_event_engine_t *engine, nxt_task_t **task,
467     void **obj, void **data)
468 {
469     nxt_work_queue_t  *wq, *last;
470 
471     wq = engine->current_work_queue;
472     last = wq;
473 
474     if (wq->head == NULL) {
475         wq = &engine->fast_work_queue;
476 
477         if (wq->head == NULL) {
478 
479             do {
480                 engine->current_work_queue++;
481                 wq = engine->current_work_queue;
482 
483                 if (wq > &engine->close_work_queue) {
484                     wq = &engine->fast_work_queue;
485                     engine->current_work_queue = wq;
486                 }
487 
488                 if (wq->head != NULL) {
489                     goto found;
490                 }
491 
492             } while (wq != last);
493 
494             engine->current_work_queue = &engine->fast_work_queue;
495 
496             return NULL;
497         }
498     }
499 
500 found:
501 
502     nxt_debug(&engine->task, "work queue: %s", wq->name);
503 
504     return nxt_work_queue_pop(wq, task, obj, data);
505 }
506 
507 
508 void
509 nxt_event_engine_start(nxt_event_engine_t *engine)
510 {
511     void                *obj, *data;
512     nxt_task_t          *task;
513     nxt_msec_t          timeout, now;
514     nxt_thread_t        *thr;
515     nxt_work_handler_t  handler;
516 
517     thr = nxt_thread();
518 
519     if (engine->fibers) {
520         /*
521          * _setjmp() cannot be wrapped in a function since return from
522          * the function clobbers stack used by future _setjmp() returns.
523          */
524         _setjmp(engine->fibers->fiber.jmp);
525 
526         /* A return point from fibers. */
527     }
528 
529     thr->log = engine->task.log;
530 
531     for ( ;; ) {
532 
533         for ( ;; ) {
534             handler = nxt_event_engine_queue_pop(engine, &task, &obj, &data);
535 
536             if (handler == NULL) {
537                 break;
538             }
539 
540             thr->task = task;
541 
542             handler(task, obj, data);
543         }
544 
545         /* Attach some event engine work queues in preferred order. */
546 
547         timeout = nxt_timer_find(engine);
548 
549         engine->event.poll(engine, timeout);
550 
551         now = nxt_thread_monotonic_time(thr) / 1000000;
552 
553         nxt_timer_expire(engine, now);
554     }
555 }
556 
557 
558 void *
559 nxt_event_engine_mem_alloc(nxt_event_engine_t *engine, uint8_t *slot,
560     size_t size)
561 {
562     uint8_t                n;
563     nxt_uint_t             items;
564     nxt_array_t            *mem_cache;
565     nxt_mem_cache_t        *cache;
566     nxt_mem_cache_block_t  *block;
567 
568     mem_cache = engine->mem_cache;
569     n = *slot;
570 
571     if (n == (uint8_t) -1) {
572 
573         if (mem_cache == NULL) {
574             /* IPv4 nxt_sockaddr_t and HTTP/1 and HTTP/2 buffers. */
575             items = 3;
576 #if (NXT_INET6)
577             items++;
578 #endif
579 #if (NXT_HAVE_UNIX_DOMAIN)
580             items++;
581 #endif
582 
583             mem_cache = nxt_array_create(engine->mem_pool, items,
584                                          sizeof(nxt_mem_cache_t));
585             if (nxt_slow_path(mem_cache == NULL)) {
586                 return mem_cache;
587             }
588 
589             engine->mem_cache = mem_cache;
590         }
591 
592         cache = mem_cache->elts;
593         for (n = 0; n < mem_cache->nelts; n++) {
594             if (cache[n].size == size) {
595                 goto found;
596             }
597         }
598 
599         cache = nxt_array_add(mem_cache);
600         if (nxt_slow_path(cache == NULL)) {
601             return cache;
602         }
603 
604         cache->free = NULL;
605         cache->size = size;
606         cache->count = 0;
607 
608     found:
609 
610         *slot = n;
611     }
612 
613     cache = mem_cache->elts;
614     cache = cache + n;
615 
616     block = cache->free;
617 
618     if (block != NULL) {
619         cache->free = block->next;
620         cache->count--;
621         return block;
622     }
623 
624     return nxt_mp_alloc(engine->mem_pool, size);
625 }
626 
627 
628 void
629 nxt_event_engine_mem_free(nxt_event_engine_t *engine, uint8_t *slot, void *p)
630 {
631     nxt_mem_cache_t        *cache;
632     nxt_mem_cache_block_t  *block;
633 
634     block = p;
635 
636     cache = engine->mem_cache->elts;
637     cache = cache + *slot;
638 
639     if (cache->count < 16) {
640         cache->count++;
641         block->next = cache->free;
642         cache->free = block;
643 
644         return;
645     }
646 
647     nxt_mp_free(engine->mem_pool, p);
648 }
649 
650 
651 #if (NXT_DEBUG)
652 
653 void nxt_event_engine_thread_adopt(nxt_event_engine_t *engine)
654 {
655     nxt_work_queue_thread_adopt(&engine->fast_work_queue);
656     nxt_work_queue_thread_adopt(&engine->accept_work_queue);
657     nxt_work_queue_thread_adopt(&engine->read_work_queue);
658     nxt_work_queue_thread_adopt(&engine->socket_work_queue);
659     nxt_work_queue_thread_adopt(&engine->connect_work_queue);
660     nxt_work_queue_thread_adopt(&engine->write_work_queue);
661     nxt_work_queue_thread_adopt(&engine->shutdown_work_queue);
662     nxt_work_queue_thread_adopt(&engine->close_work_queue);
663 }
664 
665 #endif
666