1
2 /*
3 * Copyright (C) Igor Sysoev
4 * Copyright (C) NGINX, Inc.
5 */
6
7 #include <nxt_main.h>
8
9
10 typedef struct nxt_mem_cache_block_s nxt_mem_cache_block_t;
11
12 struct nxt_mem_cache_block_s {
13 nxt_mem_cache_block_t *next;
14 };
15
16
17 typedef struct {
18 nxt_mem_cache_block_t *free;
19 uint32_t size;
20 uint32_t count;
21 } nxt_mem_cache_t;
22
23
24 static nxt_int_t nxt_event_engine_post_init(nxt_event_engine_t *engine);
25 static nxt_int_t nxt_event_engine_signal_pipe_create(
26 nxt_event_engine_t *engine);
27 static void nxt_event_engine_signal_pipe_close(nxt_task_t *task, void *obj,
28 void *data);
29 static void nxt_event_engine_signal_pipe(nxt_task_t *task, void *obj,
30 void *data);
31 static void nxt_event_engine_post_handler(nxt_task_t *task, void *obj,
32 void *data);
33 static void nxt_event_engine_signal_pipe_error(nxt_task_t *task, void *obj,
34 void *data);
35 static void nxt_event_engine_signal_handler(nxt_task_t *task, void *obj,
36 void *data);
37 static nxt_work_handler_t nxt_event_engine_queue_pop(nxt_event_engine_t *engine,
38 nxt_task_t **task, void **obj, void **data);
39
40
41 nxt_event_engine_t *
nxt_event_engine_create(nxt_task_t * task,const nxt_event_interface_t * interface,const nxt_sig_event_t * signals,nxt_uint_t flags,nxt_uint_t batch)42 nxt_event_engine_create(nxt_task_t *task,
43 const nxt_event_interface_t *interface, const nxt_sig_event_t *signals,
44 nxt_uint_t flags, nxt_uint_t batch)
45 {
46 nxt_uint_t events;
47 nxt_thread_t *thread;
48 nxt_event_engine_t *engine;
49
50 engine = nxt_zalloc(sizeof(nxt_event_engine_t));
51 if (engine == NULL) {
52 return NULL;
53 }
54
55 nxt_debug(task, "create engine %p", engine);
56
57 thread = task->thread;
58
59 engine->task.thread = thread;
60 engine->task.log = thread->log;
61 engine->task.ident = nxt_task_next_ident();
62
63 engine->batch = batch;
64
65 #if 0
66 if (flags & NXT_ENGINE_FIBERS) {
67 engine->fibers = nxt_fiber_main_create(engine);
68 if (engine->fibers == NULL) {
69 goto fibers_fail;
70 }
71 }
72 #endif
73
74 engine->current_work_queue = &engine->fast_work_queue;
75
76 nxt_work_queue_cache_create(&engine->work_queue_cache, 0);
77
78 engine->fast_work_queue.cache = &engine->work_queue_cache;
79 engine->accept_work_queue.cache = &engine->work_queue_cache;
80 engine->read_work_queue.cache = &engine->work_queue_cache;
81 engine->socket_work_queue.cache = &engine->work_queue_cache;
82 engine->connect_work_queue.cache = &engine->work_queue_cache;
83 engine->write_work_queue.cache = &engine->work_queue_cache;
84 engine->shutdown_work_queue.cache = &engine->work_queue_cache;
85 engine->close_work_queue.cache = &engine->work_queue_cache;
86
87 nxt_work_queue_name(&engine->fast_work_queue, "fast");
88 nxt_work_queue_name(&engine->accept_work_queue, "accept");
89 nxt_work_queue_name(&engine->read_work_queue, "read");
90 nxt_work_queue_name(&engine->socket_work_queue, "socket");
91 nxt_work_queue_name(&engine->connect_work_queue, "connect");
92 nxt_work_queue_name(&engine->write_work_queue, "write");
93 nxt_work_queue_name(&engine->shutdown_work_queue, "shutdown");
94 nxt_work_queue_name(&engine->close_work_queue, "close");
95
96 if (signals != NULL) {
97 engine->signals = nxt_event_engine_signals(signals);
98 if (engine->signals == NULL) {
99 goto signals_fail;
100 }
101
102 engine->signals->handler = nxt_event_engine_signal_handler;
103
104 if (!interface->signal_support) {
105 if (nxt_event_engine_signals_start(engine) != NXT_OK) {
106 goto signals_fail;
107 }
108 }
109 }
110
111 /*
112 * Number of event set and timers changes should be at least twice
113 * more than number of events to avoid premature flushes of the changes.
114 * Fourfold is for sure.
115 */
116 events = (batch != 0) ? batch : 32;
117
118 if (interface->create(engine, 4 * events, events) != NXT_OK) {
119 goto event_set_fail;
120 }
121
122 engine->event = *interface;
123
124 if (nxt_event_engine_post_init(engine) != NXT_OK) {
125 goto post_fail;
126 }
127
128 if (nxt_timers_init(&engine->timers, 4 * events) != NXT_OK) {
129 goto timers_fail;
130 }
131
132 thread = task->thread;
133
134 nxt_thread_time_update(thread);
135 engine->timers.now = nxt_thread_monotonic_time(thread) / 1000000;
136
137 engine->max_connections = 0xFFFFFFFF;
138
139 nxt_queue_init(&engine->joints);
140 nxt_queue_init(&engine->listen_connections);
141 nxt_queue_init(&engine->idle_connections);
142
143 return engine;
144
145 timers_fail:
146 post_fail:
147
148 interface->free(engine);
149
150 event_set_fail:
151 signals_fail:
152
153 nxt_free(engine->signals);
154 nxt_work_queue_cache_destroy(&engine->work_queue_cache);
155 nxt_free(engine->fibers);
156
157 #if 0
158 fibers_fail:
159 #endif
160
161 nxt_free(engine);
162
163 return NULL;
164 }
165
166
167 static nxt_int_t
nxt_event_engine_post_init(nxt_event_engine_t * engine)168 nxt_event_engine_post_init(nxt_event_engine_t *engine)
169 {
170 if (engine->event.enable_post != NULL) {
171 return engine->event.enable_post(engine, nxt_event_engine_post_handler);
172 }
173
174 if (nxt_event_engine_signal_pipe_create(engine) != NXT_OK) {
175 return NXT_ERROR;
176 }
177
178 return NXT_OK;
179 }
180
181
182 static nxt_int_t
nxt_event_engine_signal_pipe_create(nxt_event_engine_t * engine)183 nxt_event_engine_signal_pipe_create(nxt_event_engine_t *engine)
184 {
185 nxt_event_engine_pipe_t *pipe;
186
187 pipe = nxt_zalloc(sizeof(nxt_event_engine_pipe_t));
188 if (pipe == NULL) {
189 return NXT_ERROR;
190 }
191
192 engine->pipe = pipe;
193
194 /*
195 * An event engine pipe is in blocking mode for writer
196 * and in non-blocking node for reader.
197 */
198
199 if (nxt_pipe_create(&engine->task, pipe->fds, 1, 0) != NXT_OK) {
200 nxt_free(pipe);
201 return NXT_ERROR;
202 }
203
204 pipe->event.fd = pipe->fds[0];
205 pipe->event.task = &engine->task;
206 pipe->event.read_work_queue = &engine->fast_work_queue;
207 pipe->event.read_handler = nxt_event_engine_signal_pipe;
208 pipe->event.write_work_queue = &engine->fast_work_queue;
209 pipe->event.error_handler = nxt_event_engine_signal_pipe_error;
210 pipe->event.log = engine->task.log;
211
212 nxt_fd_event_enable_read(engine, &pipe->event);
213
214 return NXT_OK;
215 }
216
217
218 static void
nxt_event_engine_signal_pipe_free(nxt_event_engine_t * engine)219 nxt_event_engine_signal_pipe_free(nxt_event_engine_t *engine)
220 {
221 nxt_event_engine_pipe_t *pipe;
222
223 pipe = engine->pipe;
224
225 if (pipe != NULL) {
226
227 if (pipe->event.read_work_queue != NULL) {
228 nxt_fd_event_close(engine, &pipe->event);
229 nxt_pipe_close(pipe->event.task, pipe->fds);
230 }
231
232 nxt_free(pipe);
233 }
234 }
235
236
237 static void
nxt_event_engine_signal_pipe_close(nxt_task_t * task,void * obj,void * data)238 nxt_event_engine_signal_pipe_close(nxt_task_t *task, void *obj, void *data)
239 {
240 nxt_event_engine_pipe_t *pipe;
241
242 pipe = obj;
243
244 nxt_pipe_close(pipe->event.task, pipe->fds);
245 nxt_free(pipe);
246 }
247
248
249 void
nxt_event_engine_post(nxt_event_engine_t * engine,nxt_work_t * work)250 nxt_event_engine_post(nxt_event_engine_t *engine, nxt_work_t *work)
251 {
252 nxt_debug(&engine->task, "event engine post");
253
254 #if (NXT_DEBUG)
255 if (nxt_slow_path(work->next != NULL)) {
256 nxt_debug(&engine->task, "event engine post multiple works");
257 }
258 #endif
259
260 nxt_locked_work_queue_add(&engine->locked_work_queue, work);
261
262 nxt_event_engine_signal(engine, 0);
263 }
264
265
266 void
nxt_event_engine_signal(nxt_event_engine_t * engine,nxt_uint_t signo)267 nxt_event_engine_signal(nxt_event_engine_t *engine, nxt_uint_t signo)
268 {
269 u_char buf;
270
271 nxt_debug(&engine->task, "event engine signal:%ui", signo);
272
273 /*
274 * A signal number may be sent in a signal context, so the signal
275 * information cannot be passed via a locked work queue.
276 */
277
278 if (engine->event.signal != NULL) {
279 engine->event.signal(engine, signo);
280 return;
281 }
282
283 buf = (u_char) signo;
284 (void) nxt_fd_write(engine->pipe->fds[1], &buf, 1);
285 }
286
287
288 static void
nxt_event_engine_signal_pipe(nxt_task_t * task,void * obj,void * data)289 nxt_event_engine_signal_pipe(nxt_task_t *task, void *obj, void *data)
290 {
291 int i, n;
292 u_char signo;
293 nxt_bool_t post;
294 nxt_fd_event_t *ev;
295 u_char buf[128];
296
297 ev = obj;
298
299 nxt_debug(task, "engine signal pipe");
300
301 post = 0;
302
303 do {
304 n = nxt_fd_read(ev->fd, buf, sizeof(buf));
305
306 for (i = 0; i < n; i++) {
307 signo = buf[i];
308
309 nxt_debug(task, "engine pipe signo:%d", signo);
310
311 if (signo == 0) {
312 /* A post should be processed only once. */
313 post = 1;
314
315 } else {
316 nxt_event_engine_signal_handler(task,
317 (void *) (uintptr_t) signo, NULL);
318 }
319 }
320
321 } while (n == sizeof(buf));
322
323 if (post) {
324 nxt_event_engine_post_handler(task, NULL, NULL);
325 }
326 }
327
328
329 static void
nxt_event_engine_post_handler(nxt_task_t * task,void * obj,void * data)330 nxt_event_engine_post_handler(nxt_task_t *task, void *obj, void *data)
331 {
332 nxt_thread_t *thread;
333 nxt_event_engine_t *engine;
334
335 thread = task->thread;
336 engine = thread->engine;
337
338 nxt_locked_work_queue_move(thread, &engine->locked_work_queue,
339 &engine->fast_work_queue);
340 }
341
342
343 static void
nxt_event_engine_signal_pipe_error(nxt_task_t * task,void * obj,void * data)344 nxt_event_engine_signal_pipe_error(nxt_task_t *task, void *obj, void *data)
345 {
346 nxt_event_engine_t *engine;
347 nxt_event_engine_pipe_t *pipe;
348
349 engine = task->thread->engine;
350 pipe = engine->pipe;
351
352 nxt_alert(task, "engine pipe(%FD:%FD) event error",
353 pipe->fds[0], pipe->fds[1]);
354
355 nxt_fd_event_close(engine, &pipe->event);
356 nxt_pipe_close(pipe->event.task, pipe->fds);
357 }
358
359
360 static void
nxt_event_engine_signal_handler(nxt_task_t * task,void * obj,void * data)361 nxt_event_engine_signal_handler(nxt_task_t *task, void *obj, void *data)
362 {
363 uintptr_t signo;
364 const nxt_sig_event_t *sigev;
365
366 signo = (uintptr_t) obj;
367
368 for (sigev = task->thread->engine->signals->sigev;
369 sigev->signo != 0;
370 sigev++)
371 {
372 if (signo == (nxt_uint_t) sigev->signo) {
373 sigev->handler(task, (void *) signo, (void *) sigev->name);
374 return;
375 }
376 }
377
378 nxt_alert(task, "signal %ui handler not found", (nxt_uint_t) signo);
379 }
380
381
382 nxt_int_t
nxt_event_engine_change(nxt_event_engine_t * engine,const nxt_event_interface_t * interface,nxt_uint_t batch)383 nxt_event_engine_change(nxt_event_engine_t *engine,
384 const nxt_event_interface_t *interface, nxt_uint_t batch)
385 {
386 nxt_uint_t events;
387
388 engine->batch = batch;
389
390 if (!engine->event.signal_support && interface->signal_support) {
391 /*
392 * Block signal processing if the current event
393 * facility does not support signal processing.
394 */
395 nxt_event_engine_signals_stop(engine);
396
397 /*
398 * Add to engine fast work queue the signal events possibly
399 * received before the blocking signal processing.
400 */
401 nxt_event_engine_signal_pipe(&engine->task, &engine->pipe->event, NULL);
402 }
403
404 if (engine->pipe != NULL && interface->enable_post != NULL) {
405 /*
406 * An engine pipe must be closed after all signal events
407 * added above to engine fast work queue will be processed.
408 */
409 nxt_work_queue_add(&engine->fast_work_queue,
410 nxt_event_engine_signal_pipe_close,
411 &engine->task, engine->pipe, NULL);
412
413 engine->pipe = NULL;
414 }
415
416 engine->event.free(engine);
417
418 events = (batch != 0) ? batch : 32;
419
420 if (interface->create(engine, 4 * events, events) != NXT_OK) {
421 return NXT_ERROR;
422 }
423
424 engine->event = *interface;
425
426 if (nxt_event_engine_post_init(engine) != NXT_OK) {
427 return NXT_ERROR;
428 }
429
430 if (engine->signals != NULL) {
431
432 if (!engine->event.signal_support) {
433 return nxt_event_engine_signals_start(engine);
434 }
435
436 /*
437 * Reset the PID flag to start the signal thread if
438 * some future event facility will not support signals.
439 */
440 engine->signals->process = 0;
441 }
442
443 return NXT_OK;
444 }
445
446
447 void
nxt_event_engine_free(nxt_event_engine_t * engine)448 nxt_event_engine_free(nxt_event_engine_t *engine)
449 {
450 nxt_thread_log_debug("free engine %p", engine);
451
452 nxt_event_engine_signal_pipe_free(engine);
453 nxt_free(engine->signals);
454
455 nxt_work_queue_cache_destroy(&engine->work_queue_cache);
456
457 engine->event.free(engine);
458
459 /* TODO: free timers */
460
461 nxt_free(engine);
462 }
463
464
465 static nxt_work_handler_t
nxt_event_engine_queue_pop(nxt_event_engine_t * engine,nxt_task_t ** task,void ** obj,void ** data)466 nxt_event_engine_queue_pop(nxt_event_engine_t *engine, nxt_task_t **task,
467 void **obj, void **data)
468 {
469 nxt_work_queue_t *wq, *last;
470
471 wq = engine->current_work_queue;
472 last = wq;
473
474 if (wq->head == NULL) {
475 wq = &engine->fast_work_queue;
476
477 if (wq->head == NULL) {
478
479 do {
480 engine->current_work_queue++;
481 wq = engine->current_work_queue;
482
483 if (wq > &engine->close_work_queue) {
484 wq = &engine->fast_work_queue;
485 engine->current_work_queue = wq;
486 }
487
488 if (wq->head != NULL) {
489 goto found;
490 }
491
492 } while (wq != last);
493
494 engine->current_work_queue = &engine->fast_work_queue;
495
496 return NULL;
497 }
498 }
499
500 found:
501
502 nxt_debug(&engine->task, "work queue: %s", wq->name);
503
504 return nxt_work_queue_pop(wq, task, obj, data);
505 }
506
507
508 void
nxt_event_engine_start(nxt_event_engine_t * engine)509 nxt_event_engine_start(nxt_event_engine_t *engine)
510 {
511 void *obj, *data;
512 nxt_task_t *task;
513 nxt_msec_t timeout, now;
514 nxt_thread_t *thr;
515 nxt_work_handler_t handler;
516
517 thr = nxt_thread();
518
519 if (engine->fibers) {
520 /*
521 * _setjmp() cannot be wrapped in a function since return from
522 * the function clobbers stack used by future _setjmp() returns.
523 */
524 _setjmp(engine->fibers->fiber.jmp);
525
526 /* A return point from fibers. */
527 }
528
529 thr->log = engine->task.log;
530
531 for ( ;; ) {
532
533 for ( ;; ) {
534 handler = nxt_event_engine_queue_pop(engine, &task, &obj, &data);
535
536 if (handler == NULL) {
537 break;
538 }
539
540 thr->task = task;
541
542 handler(task, obj, data);
543 }
544
545 /* Attach some event engine work queues in preferred order. */
546
547 timeout = nxt_timer_find(engine);
548
549 engine->event.poll(engine, timeout);
550
551 now = nxt_thread_monotonic_time(thr) / 1000000;
552
553 nxt_timer_expire(engine, now);
554 }
555 }
556
557
558 void *
nxt_event_engine_mem_alloc(nxt_event_engine_t * engine,uint8_t * hint,size_t size)559 nxt_event_engine_mem_alloc(nxt_event_engine_t *engine, uint8_t *hint,
560 size_t size)
561 {
562 uint32_t n;
563 nxt_uint_t items;
564 nxt_array_t *mem_cache;
565 nxt_mem_cache_t *cache;
566 nxt_mem_cache_block_t *block;
567
568 mem_cache = engine->mem_cache;
569 n = *hint;
570
571 if (n == NXT_EVENT_ENGINE_NO_MEM_HINT) {
572
573 if (mem_cache == NULL) {
574 /* IPv4 nxt_sockaddr_t and HTTP/1 and HTTP/2 buffers. */
575 items = 3;
576 #if (NXT_INET6)
577 items++;
578 #endif
579 #if (NXT_HAVE_UNIX_DOMAIN)
580 items++;
581 #endif
582
583 mem_cache = nxt_array_create(engine->mem_pool, items,
584 sizeof(nxt_mem_cache_t));
585 if (nxt_slow_path(mem_cache == NULL)) {
586 return mem_cache;
587 }
588
589 engine->mem_cache = mem_cache;
590 }
591
592 cache = mem_cache->elts;
593 for (n = 0; n < mem_cache->nelts; n++) {
594 if (cache[n].size == size) {
595 goto found;
596 }
597 }
598
599 cache = nxt_array_add(mem_cache);
600 if (nxt_slow_path(cache == NULL)) {
601 return cache;
602 }
603
604 cache->free = NULL;
605 cache->size = size;
606 cache->count = 0;
607
608 found:
609
610 if (n < NXT_EVENT_ENGINE_NO_MEM_HINT) {
611 *hint = (uint8_t) n;
612 }
613 }
614
615 cache = mem_cache->elts;
616 cache = cache + n;
617
618 block = cache->free;
619
620 if (block != NULL) {
621 cache->free = block->next;
622 cache->count--;
623 return block;
624 }
625
626 return nxt_mp_alloc(engine->mem_pool, size);
627 }
628
629
630 void
nxt_event_engine_mem_free(nxt_event_engine_t * engine,uint8_t hint,void * p,size_t size)631 nxt_event_engine_mem_free(nxt_event_engine_t *engine, uint8_t hint, void *p,
632 size_t size)
633 {
634 uint32_t n;
635 nxt_array_t *mem_cache;
636 nxt_mem_cache_t *cache;
637 nxt_mem_cache_block_t *block;
638
639 block = p;
640 mem_cache = engine->mem_cache;
641 cache = mem_cache->elts;
642
643 n = hint;
644
645 if (nxt_slow_path(n == NXT_EVENT_ENGINE_NO_MEM_HINT)) {
646
647 if (size != 0) {
648 for (n = 0; n < mem_cache->nelts; n++) {
649 if (cache[n].size == size) {
650 goto found;
651 }
652 }
653
654 nxt_alert(&engine->task,
655 "event engine mem free(%p, %z) not found", p, size);
656 }
657
658 goto done;
659 }
660
661 found:
662
663 cache = cache + n;
664
665 if (cache->count < 16) {
666 cache->count++;
667 block->next = cache->free;
668 cache->free = block;
669
670 return;
671 }
672
673 done:
674
675 nxt_mp_free(engine->mem_pool, p);
676 }
677
678
679 void *
nxt_event_engine_buf_mem_alloc(nxt_event_engine_t * engine,size_t size)680 nxt_event_engine_buf_mem_alloc(nxt_event_engine_t *engine, size_t size)
681 {
682 nxt_buf_t *b;
683 uint8_t hint;
684
685 hint = NXT_EVENT_ENGINE_NO_MEM_HINT;
686
687 b = nxt_event_engine_mem_alloc(engine, &hint, NXT_BUF_MEM_SIZE + size);
688 if (nxt_slow_path(b == NULL)) {
689 return NULL;
690 }
691
692 nxt_memzero(b, NXT_BUF_MEM_SIZE);
693
694 b->cache_hint = hint;
695 b->data = engine;
696 b->completion_handler = nxt_event_engine_buf_mem_completion;
697
698 if (size != 0) {
699 b->mem.start = nxt_pointer_to(b, NXT_BUF_MEM_SIZE);
700 b->mem.pos = b->mem.start;
701 b->mem.free = b->mem.start;
702 b->mem.end = b->mem.start + size;
703 }
704
705 return b;
706 }
707
708
709 void
nxt_event_engine_buf_mem_free(nxt_event_engine_t * engine,nxt_buf_t * b)710 nxt_event_engine_buf_mem_free(nxt_event_engine_t *engine, nxt_buf_t *b)
711 {
712 size_t size;
713
714 size = NXT_BUF_MEM_SIZE + nxt_buf_mem_size(&b->mem);
715
716 nxt_event_engine_mem_free(engine, b->cache_hint, b, size);
717 }
718
719
720 void
nxt_event_engine_buf_mem_completion(nxt_task_t * task,void * obj,void * data)721 nxt_event_engine_buf_mem_completion(nxt_task_t *task, void *obj, void *data)
722 {
723 nxt_buf_t *b, *next, *parent;
724 nxt_event_engine_t *engine;
725
726 b = obj;
727
728 nxt_debug(task, "buf completion: %p %p", b, b->mem.start);
729
730 engine = b->data;
731
732 do {
733 next = b->next;
734 parent = b->parent;
735
736 nxt_event_engine_buf_mem_free(engine, b);
737
738 nxt_buf_parent_completion(task, parent);
739
740 b = next;
741 } while (b != NULL);
742 }
743
744
745 #if (NXT_DEBUG)
746
nxt_event_engine_thread_adopt(nxt_event_engine_t * engine)747 void nxt_event_engine_thread_adopt(nxt_event_engine_t *engine)
748 {
749 nxt_work_queue_thread_adopt(&engine->fast_work_queue);
750 nxt_work_queue_thread_adopt(&engine->accept_work_queue);
751 nxt_work_queue_thread_adopt(&engine->read_work_queue);
752 nxt_work_queue_thread_adopt(&engine->socket_work_queue);
753 nxt_work_queue_thread_adopt(&engine->connect_work_queue);
754 nxt_work_queue_thread_adopt(&engine->write_work_queue);
755 nxt_work_queue_thread_adopt(&engine->shutdown_work_queue);
756 nxt_work_queue_thread_adopt(&engine->close_work_queue);
757 }
758
759 #endif
760