1 2 /* 3 * Copyright (C) Igor Sysoev 4 * Copyright (C) NGINX, Inc. 5 */ 6 7 #include <nxt_main.h> 8 9 10 typedef struct nxt_mem_cache_block_s nxt_mem_cache_block_t; 11 12 struct nxt_mem_cache_block_s { 13 nxt_mem_cache_block_t *next; 14 }; 15 16 17 typedef struct { 18 nxt_mem_cache_block_t *free; 19 uint32_t size; 20 uint32_t count; 21 } nxt_mem_cache_t; 22 23 24 static nxt_int_t nxt_event_engine_post_init(nxt_event_engine_t *engine); 25 static nxt_int_t nxt_event_engine_signal_pipe_create( 26 nxt_event_engine_t *engine); 27 static void nxt_event_engine_signal_pipe_close(nxt_task_t *task, void *obj, 28 void *data); 29 static void nxt_event_engine_signal_pipe(nxt_task_t *task, void *obj, 30 void *data); 31 static void nxt_event_engine_post_handler(nxt_task_t *task, void *obj, 32 void *data); 33 static void nxt_event_engine_signal_pipe_error(nxt_task_t *task, void *obj, 34 void *data); 35 static void nxt_event_engine_signal_handler(nxt_task_t *task, void *obj, 36 void *data); 37 static nxt_work_handler_t nxt_event_engine_queue_pop(nxt_event_engine_t *engine, 38 nxt_task_t **task, void **obj, void **data); 39 40 41 nxt_event_engine_t * 42 nxt_event_engine_create(nxt_task_t *task, 43 const nxt_event_interface_t *interface, const nxt_sig_event_t *signals, 44 nxt_uint_t flags, nxt_uint_t batch) 45 { 46 nxt_uint_t events; 47 nxt_thread_t *thread; 48 nxt_event_engine_t *engine; 49 50 engine = nxt_zalloc(sizeof(nxt_event_engine_t)); 51 if (engine == NULL) { 52 return NULL; 53 } 54 55 nxt_debug(task, "create engine %p", engine); 56 57 thread = task->thread; 58 59 engine->task.thread = thread; 60 engine->task.log = thread->log; 61 engine->task.ident = nxt_task_next_ident(); 62 63 engine->batch = batch; 64 65 #if 0 66 if (flags & NXT_ENGINE_FIBERS) { 67 engine->fibers = nxt_fiber_main_create(engine); 68 if (engine->fibers == NULL) { 69 goto fibers_fail; 70 } 71 } 72 #endif 73 74 engine->current_work_queue = &engine->fast_work_queue; 75 76 nxt_work_queue_cache_create(&engine->work_queue_cache, 0); 77 78 engine->fast_work_queue.cache = &engine->work_queue_cache; 79 engine->accept_work_queue.cache = &engine->work_queue_cache; 80 engine->read_work_queue.cache = &engine->work_queue_cache; 81 engine->socket_work_queue.cache = &engine->work_queue_cache; 82 engine->connect_work_queue.cache = &engine->work_queue_cache; 83 engine->write_work_queue.cache = &engine->work_queue_cache; 84 engine->shutdown_work_queue.cache = &engine->work_queue_cache; 85 engine->close_work_queue.cache = &engine->work_queue_cache; 86 87 nxt_work_queue_name(&engine->fast_work_queue, "fast"); 88 nxt_work_queue_name(&engine->accept_work_queue, "accept"); 89 nxt_work_queue_name(&engine->read_work_queue, "read"); 90 nxt_work_queue_name(&engine->socket_work_queue, "socket"); 91 nxt_work_queue_name(&engine->connect_work_queue, "connect"); 92 nxt_work_queue_name(&engine->write_work_queue, "write"); 93 nxt_work_queue_name(&engine->shutdown_work_queue, "shutdown"); 94 nxt_work_queue_name(&engine->close_work_queue, "close"); 95 96 if (signals != NULL) { 97 engine->signals = nxt_event_engine_signals(signals); 98 if (engine->signals == NULL) { 99 goto signals_fail; 100 } 101 102 engine->signals->handler = nxt_event_engine_signal_handler; 103 104 if (!interface->signal_support) { 105 if (nxt_event_engine_signals_start(engine) != NXT_OK) { 106 goto signals_fail; 107 } 108 } 109 } 110 111 /* 112 * Number of event set and timers changes should be at least twice 113 * more than number of events to avoid premature flushes of the changes. 114 * Fourfold is for sure. 115 */ 116 events = (batch != 0) ? batch : 32; 117 118 if (interface->create(engine, 4 * events, events) != NXT_OK) { 119 goto event_set_fail; 120 } 121 122 engine->event = *interface; 123 124 if (nxt_event_engine_post_init(engine) != NXT_OK) { 125 goto post_fail; 126 } 127 128 if (nxt_timers_init(&engine->timers, 4 * events) != NXT_OK) { 129 goto timers_fail; 130 } 131 132 thread = task->thread; 133 134 nxt_thread_time_update(thread); 135 engine->timers.now = nxt_thread_monotonic_time(thread) / 1000000; 136 137 engine->max_connections = 0xFFFFFFFF; 138 139 nxt_queue_init(&engine->joints); 140 nxt_queue_init(&engine->listen_connections); 141 nxt_queue_init(&engine->idle_connections); 142 143 return engine; 144 145 timers_fail: 146 post_fail: 147 148 interface->free(engine); 149 150 event_set_fail: 151 signals_fail: 152 153 nxt_free(engine->signals); 154 nxt_work_queue_cache_destroy(&engine->work_queue_cache); 155 nxt_free(engine->fibers); 156 157 #if 0 158 fibers_fail: 159 160 nxt_free(engine); 161 #endif 162 163 return NULL; 164 } 165 166 167 static nxt_int_t 168 nxt_event_engine_post_init(nxt_event_engine_t *engine) 169 { 170 if (engine->event.enable_post != NULL) { 171 return engine->event.enable_post(engine, nxt_event_engine_post_handler); 172 } 173 174 if (nxt_event_engine_signal_pipe_create(engine) != NXT_OK) { 175 return NXT_ERROR; 176 } 177 178 return NXT_OK; 179 } 180 181 182 static nxt_int_t 183 nxt_event_engine_signal_pipe_create(nxt_event_engine_t *engine) 184 { 185 nxt_event_engine_pipe_t *pipe; 186 187 pipe = nxt_zalloc(sizeof(nxt_event_engine_pipe_t)); 188 if (pipe == NULL) { 189 return NXT_ERROR; 190 } 191 192 engine->pipe = pipe; 193 194 /* 195 * An event engine pipe is in blocking mode for writer 196 * and in non-blocking node for reader. 197 */ 198 199 if (nxt_pipe_create(&engine->task, pipe->fds, 1, 0) != NXT_OK) { 200 nxt_free(pipe); 201 return NXT_ERROR; 202 } 203 204 pipe->event.fd = pipe->fds[0]; 205 pipe->event.task = &engine->task; 206 pipe->event.read_work_queue = &engine->fast_work_queue; 207 pipe->event.read_handler = nxt_event_engine_signal_pipe; 208 pipe->event.write_work_queue = &engine->fast_work_queue; 209 pipe->event.error_handler = nxt_event_engine_signal_pipe_error; 210 pipe->event.log = engine->task.log; 211 212 nxt_fd_event_enable_read(engine, &pipe->event); 213 214 return NXT_OK; 215 } 216 217 218 static void 219 nxt_event_engine_signal_pipe_free(nxt_event_engine_t *engine) 220 { 221 nxt_event_engine_pipe_t *pipe; 222 223 pipe = engine->pipe; 224 225 if (pipe != NULL) { 226 227 if (pipe->event.read_work_queue != NULL) { 228 nxt_fd_event_close(engine, &pipe->event); 229 nxt_pipe_close(pipe->event.task, pipe->fds); 230 } 231 232 nxt_free(pipe); 233 } 234 } 235 236 237 static void 238 nxt_event_engine_signal_pipe_close(nxt_task_t *task, void *obj, void *data) 239 { 240 nxt_event_engine_pipe_t *pipe; 241 242 pipe = obj; 243 244 nxt_pipe_close(pipe->event.task, pipe->fds); 245 nxt_free(pipe); 246 } 247 248 249 void 250 nxt_event_engine_post(nxt_event_engine_t *engine, nxt_work_t *work) 251 { 252 nxt_debug(&engine->task, "event engine post"); 253 254 #if (NXT_DEBUG) 255 if (nxt_slow_path(work->next != NULL)) { 256 nxt_debug(&engine->task, "event engine post multiple works"); 257 } 258 #endif 259 260 nxt_locked_work_queue_add(&engine->locked_work_queue, work); 261 262 nxt_event_engine_signal(engine, 0); 263 } 264 265 266 void 267 nxt_event_engine_signal(nxt_event_engine_t *engine, nxt_uint_t signo) 268 { 269 u_char buf; 270 271 nxt_debug(&engine->task, "event engine signal:%ui", signo); 272 273 /* 274 * A signal number may be sent in a signal context, so the signal 275 * information cannot be passed via a locked work queue. 276 */ 277 278 if (engine->event.signal != NULL) { 279 engine->event.signal(engine, signo); 280 return; 281 } 282 283 buf = (u_char) signo; 284 (void) nxt_fd_write(engine->pipe->fds[1], &buf, 1); 285 } 286 287 288 static void 289 nxt_event_engine_signal_pipe(nxt_task_t *task, void *obj, void *data) 290 { 291 int i, n; 292 u_char signo; 293 nxt_bool_t post; 294 nxt_fd_event_t *ev; 295 u_char buf[128]; 296 297 ev = obj; 298 299 nxt_debug(task, "engine signal pipe"); 300 301 post = 0; 302 303 do { 304 n = nxt_fd_read(ev->fd, buf, sizeof(buf)); 305 306 for (i = 0; i < n; i++) { 307 signo = buf[i]; 308 309 nxt_debug(task, "engine pipe signo:%d", signo); 310 311 if (signo == 0) { 312 /* A post should be processed only once. */ 313 post = 1; 314 315 } else { 316 nxt_event_engine_signal_handler(task, 317 (void *) (uintptr_t) signo, NULL); 318 } 319 } 320 321 } while (n == sizeof(buf)); 322 323 if (post) { 324 nxt_event_engine_post_handler(task, NULL, NULL); 325 } 326 } 327 328 329 static void 330 nxt_event_engine_post_handler(nxt_task_t *task, void *obj, void *data) 331 { 332 nxt_thread_t *thread; 333 nxt_event_engine_t *engine; 334 335 thread = task->thread; 336 engine = thread->engine; 337 338 nxt_locked_work_queue_move(thread, &engine->locked_work_queue, 339 &engine->fast_work_queue); 340 } 341 342 343 static void 344 nxt_event_engine_signal_pipe_error(nxt_task_t *task, void *obj, void *data) 345 { 346 nxt_event_engine_t *engine; 347 nxt_event_engine_pipe_t *pipe; 348 349 engine = task->thread->engine; 350 pipe = engine->pipe; 351 352 nxt_alert(task, "engine pipe(%FD:%FD) event error", 353 pipe->fds[0], pipe->fds[1]); 354 355 nxt_fd_event_close(engine, &pipe->event); 356 nxt_pipe_close(pipe->event.task, pipe->fds); 357 } 358 359 360 static void 361 nxt_event_engine_signal_handler(nxt_task_t *task, void *obj, void *data) 362 { 363 uintptr_t signo; 364 const nxt_sig_event_t *sigev; 365 366 signo = (uintptr_t) obj; 367 368 for (sigev = task->thread->engine->signals->sigev; 369 sigev->signo != 0; 370 sigev++) 371 { 372 if (signo == (nxt_uint_t) sigev->signo) { 373 sigev->handler(task, (void *) signo, (void *) sigev->name); 374 return; 375 } 376 } 377 378 nxt_alert(task, "signal %ui handler not found", (nxt_uint_t) signo); 379 } 380 381 382 nxt_int_t 383 nxt_event_engine_change(nxt_event_engine_t *engine, 384 const nxt_event_interface_t *interface, nxt_uint_t batch) 385 { 386 nxt_uint_t events; 387 388 engine->batch = batch; 389 390 if (!engine->event.signal_support && interface->signal_support) { 391 /* 392 * Block signal processing if the current event 393 * facility does not support signal processing. 394 */ 395 nxt_event_engine_signals_stop(engine); 396 397 /* 398 * Add to engine fast work queue the signal events possibly 399 * received before the blocking signal processing. 400 */ 401 nxt_event_engine_signal_pipe(&engine->task, &engine->pipe->event, NULL); 402 } 403 404 if (engine->pipe != NULL && interface->enable_post != NULL) { 405 /* 406 * An engine pipe must be closed after all signal events 407 * added above to engine fast work queue will be processed. 408 */ 409 nxt_work_queue_add(&engine->fast_work_queue, 410 nxt_event_engine_signal_pipe_close, 411 &engine->task, engine->pipe, NULL); 412 413 engine->pipe = NULL; 414 } 415 416 engine->event.free(engine); 417 418 events = (batch != 0) ? batch : 32; 419 420 if (interface->create(engine, 4 * events, events) != NXT_OK) { 421 return NXT_ERROR; 422 } 423 424 engine->event = *interface; 425 426 if (nxt_event_engine_post_init(engine) != NXT_OK) { 427 return NXT_ERROR; 428 } 429 430 if (engine->signals != NULL) { 431 432 if (!engine->event.signal_support) { 433 return nxt_event_engine_signals_start(engine); 434 } 435 436 /* 437 * Reset the PID flag to start the signal thread if 438 * some future event facility will not support signals. 439 */ 440 engine->signals->process = 0; 441 } 442 443 return NXT_OK; 444 } 445 446 447 void 448 nxt_event_engine_free(nxt_event_engine_t *engine) 449 { 450 nxt_thread_log_debug("free engine %p", engine); 451 452 nxt_event_engine_signal_pipe_free(engine); 453 nxt_free(engine->signals); 454 455 nxt_work_queue_cache_destroy(&engine->work_queue_cache); 456 457 engine->event.free(engine); 458 459 /* TODO: free timers */ 460 461 nxt_free(engine); 462 } 463 464 465 static nxt_work_handler_t 466 nxt_event_engine_queue_pop(nxt_event_engine_t *engine, nxt_task_t **task, 467 void **obj, void **data) 468 { 469 nxt_work_queue_t *wq, *last; 470 471 wq = engine->current_work_queue; 472 last = wq; 473 474 if (wq->head == NULL) { 475 wq = &engine->fast_work_queue; 476 477 if (wq->head == NULL) { 478 479 do { 480 engine->current_work_queue++; 481 wq = engine->current_work_queue; 482 483 if (wq > &engine->close_work_queue) { 484 wq = &engine->fast_work_queue; 485 engine->current_work_queue = wq; 486 } 487 488 if (wq->head != NULL) { 489 goto found; 490 } 491 492 } while (wq != last); 493 494 engine->current_work_queue = &engine->fast_work_queue; 495 496 return NULL; 497 } 498 } 499 500 found: 501 502 nxt_debug(&engine->task, "work queue: %s", wq->name); 503 504 return nxt_work_queue_pop(wq, task, obj, data); 505 } 506 507 508 void 509 nxt_event_engine_start(nxt_event_engine_t *engine) 510 { 511 void *obj, *data; 512 nxt_task_t *task; 513 nxt_msec_t timeout, now; 514 nxt_thread_t *thr; 515 nxt_work_handler_t handler; 516 517 thr = nxt_thread(); 518 519 if (engine->fibers) { 520 /* 521 * _setjmp() cannot be wrapped in a function since return from 522 * the function clobbers stack used by future _setjmp() returns. 523 */ 524 _setjmp(engine->fibers->fiber.jmp); 525 526 /* A return point from fibers. */ 527 } 528 529 thr->log = engine->task.log; 530 531 for ( ;; ) { 532 533 for ( ;; ) { 534 handler = nxt_event_engine_queue_pop(engine, &task, &obj, &data); 535 536 if (handler == NULL) { 537 break; 538 } 539 540 thr->task = task; 541 542 handler(task, obj, data); 543 } 544 545 /* Attach some event engine work queues in preferred order. */ 546 547 timeout = nxt_timer_find(engine); 548 549 engine->event.poll(engine, timeout); 550 551 now = nxt_thread_monotonic_time(thr) / 1000000; 552 553 nxt_timer_expire(engine, now); 554 } 555 } 556 557 558 void * 559 nxt_event_engine_mem_alloc(nxt_event_engine_t *engine, uint8_t *hint, 560 size_t size) 561 { 562 uint32_t n; 563 nxt_uint_t items; 564 nxt_array_t *mem_cache; 565 nxt_mem_cache_t *cache; 566 nxt_mem_cache_block_t *block; 567 568 mem_cache = engine->mem_cache; 569 n = *hint; 570 571 if (n == NXT_EVENT_ENGINE_NO_MEM_HINT) { 572 573 if (mem_cache == NULL) { 574 /* IPv4 nxt_sockaddr_t and HTTP/1 and HTTP/2 buffers. */ 575 items = 3; 576 #if (NXT_INET6) 577 items++; 578 #endif 579 #if (NXT_HAVE_UNIX_DOMAIN) 580 items++; 581 #endif 582 583 mem_cache = nxt_array_create(engine->mem_pool, items, 584 sizeof(nxt_mem_cache_t)); 585 if (nxt_slow_path(mem_cache == NULL)) { 586 return mem_cache; 587 } 588 589 engine->mem_cache = mem_cache; 590 } 591 592 cache = mem_cache->elts; 593 for (n = 0; n < mem_cache->nelts; n++) { 594 if (cache[n].size == size) { 595 goto found; 596 } 597 } 598 599 cache = nxt_array_add(mem_cache); 600 if (nxt_slow_path(cache == NULL)) { 601 return cache; 602 } 603 604 cache->free = NULL; 605 cache->size = size; 606 cache->count = 0; 607 608 found: 609 610 if (n < NXT_EVENT_ENGINE_NO_MEM_HINT) { 611 *hint = (uint8_t) n; 612 } 613 } 614 615 cache = mem_cache->elts; 616 cache = cache + n; 617 618 block = cache->free; 619 620 if (block != NULL) { 621 cache->free = block->next; 622 cache->count--; 623 return block; 624 } 625 626 return nxt_mp_alloc(engine->mem_pool, size); 627 } 628 629 630 void 631 nxt_event_engine_mem_free(nxt_event_engine_t *engine, uint8_t hint, void *p, 632 size_t size) 633 { 634 uint32_t n; 635 nxt_array_t *mem_cache; 636 nxt_mem_cache_t *cache; 637 nxt_mem_cache_block_t *block; 638 639 block = p; 640 mem_cache = engine->mem_cache; 641 cache = mem_cache->elts; 642 643 n = hint; 644 645 if (nxt_slow_path(n == NXT_EVENT_ENGINE_NO_MEM_HINT)) { 646 647 if (size != 0) { 648 for (n = 0; n < mem_cache->nelts; n++) { 649 if (cache[n].size == size) { 650 goto found; 651 } 652 } 653 654 nxt_alert(&engine->task, 655 "event engine mem free(%p, %z) not found", p, size); 656 } 657 658 goto done; 659 } 660 661 found: 662 663 cache = cache + n; 664 665 if (cache->count < 16) { 666 cache->count++; 667 block->next = cache->free; 668 cache->free = block; 669 670 return; 671 } 672 673 done: 674 675 nxt_mp_free(engine->mem_pool, p); 676 } 677 678 679 void * 680 nxt_event_engine_buf_mem_alloc(nxt_event_engine_t *engine, size_t size) 681 { 682 nxt_buf_t *b; 683 uint8_t hint; 684 685 hint = NXT_EVENT_ENGINE_NO_MEM_HINT; 686 687 b = nxt_event_engine_mem_alloc(engine, &hint, NXT_BUF_MEM_SIZE + size); 688 if (nxt_slow_path(b == NULL)) { 689 return NULL; 690 } 691 692 nxt_memzero(b, NXT_BUF_MEM_SIZE); 693 694 b->cache_hint = hint; 695 b->data = engine; 696 b->completion_handler = nxt_event_engine_buf_mem_completion; 697 698 if (size != 0) { 699 b->mem.start = nxt_pointer_to(b, NXT_BUF_MEM_SIZE); 700 b->mem.pos = b->mem.start; 701 b->mem.free = b->mem.start; 702 b->mem.end = b->mem.start + size; 703 } 704 705 return b; 706 } 707 708 709 void 710 nxt_event_engine_buf_mem_free(nxt_event_engine_t *engine, nxt_buf_t *b) 711 { 712 size_t size; 713 714 size = NXT_BUF_MEM_SIZE + nxt_buf_mem_size(&b->mem); 715 716 nxt_event_engine_mem_free(engine, b->cache_hint, b, size); 717 } 718 719 720 void 721 nxt_event_engine_buf_mem_completion(nxt_task_t *task, void *obj, void *data) 722 { 723 nxt_event_engine_t *engine; 724 nxt_buf_t *b, *next, *parent; 725 726 b = obj; 727 parent = data; 728 729 nxt_debug(task, "buf completion: %p %p", b, b->mem.start); 730 731 engine = b->data; 732 733 do { 734 next = b->next; 735 parent = b->parent; 736 737 nxt_event_engine_buf_mem_free(engine, b); 738 739 nxt_buf_parent_completion(task, parent); 740 741 b = next; 742 } while (b != NULL); 743 } 744 745 746 #if (NXT_DEBUG) 747 748 void nxt_event_engine_thread_adopt(nxt_event_engine_t *engine) 749 { 750 nxt_work_queue_thread_adopt(&engine->fast_work_queue); 751 nxt_work_queue_thread_adopt(&engine->accept_work_queue); 752 nxt_work_queue_thread_adopt(&engine->read_work_queue); 753 nxt_work_queue_thread_adopt(&engine->socket_work_queue); 754 nxt_work_queue_thread_adopt(&engine->connect_work_queue); 755 nxt_work_queue_thread_adopt(&engine->write_work_queue); 756 nxt_work_queue_thread_adopt(&engine->shutdown_work_queue); 757 nxt_work_queue_thread_adopt(&engine->close_work_queue); 758 } 759 760 #endif 761