1 2 /* 3 * Copyright (C) Igor Sysoev 4 * Copyright (C) NGINX, Inc. 5 */ 6 7 #include <nxt_main.h> 8 9 10 static nxt_int_t nxt_event_engine_post_init(nxt_event_engine_t *engine); 11 static nxt_int_t nxt_event_engine_signal_pipe_create( 12 nxt_event_engine_t *engine); 13 static void nxt_event_engine_signal_pipe_close(nxt_task_t *task, void *obj, 14 void *data); 15 static void nxt_event_engine_signal_pipe(nxt_task_t *task, void *obj, 16 void *data); 17 static void nxt_event_engine_post_handler(nxt_task_t *task, void *obj, 18 void *data); 19 static void nxt_event_engine_signal_pipe_error(nxt_task_t *task, void *obj, 20 void *data); 21 static void nxt_event_engine_signal_handler(nxt_task_t *task, void *obj, 22 void *data); 23 static nxt_work_handler_t nxt_event_engine_queue_pop(nxt_event_engine_t *engine, 24 nxt_task_t **task, void **obj, void **data); 25 26 27 nxt_event_engine_t * 28 nxt_event_engine_create(nxt_task_t *task, 29 const nxt_event_interface_t *interface, const nxt_sig_event_t *signals, 30 nxt_uint_t flags, nxt_uint_t batch) 31 { 32 nxt_uint_t events; 33 nxt_thread_t *thread; 34 nxt_event_engine_t *engine; 35 36 engine = nxt_zalloc(sizeof(nxt_event_engine_t)); 37 if (engine == NULL) { 38 return NULL; 39 } 40 41 nxt_debug(task, "create engine %p", engine); 42 43 thread = task->thread; 44 45 engine->task.thread = thread; 46 engine->task.log = thread->log; 47 engine->task.ident = nxt_task_next_ident(); 48 49 engine->batch = batch; 50 51 if (flags & NXT_ENGINE_FIBERS) { 52 engine->fibers = nxt_fiber_main_create(engine); 53 if (engine->fibers == NULL) { 54 goto fibers_fail; 55 } 56 } 57 58 engine->current_work_queue = &engine->fast_work_queue; 59 60 nxt_work_queue_cache_create(&engine->work_queue_cache, 0); 61 62 engine->fast_work_queue.cache = &engine->work_queue_cache; 63 engine->accept_work_queue.cache = &engine->work_queue_cache; 64 engine->read_work_queue.cache = &engine->work_queue_cache; 65 engine->socket_work_queue.cache = &engine->work_queue_cache; 66 engine->connect_work_queue.cache = &engine->work_queue_cache; 67 engine->write_work_queue.cache = &engine->work_queue_cache; 68 engine->shutdown_work_queue.cache = &engine->work_queue_cache; 69 engine->close_work_queue.cache = &engine->work_queue_cache; 70 71 nxt_work_queue_name(&engine->fast_work_queue, "fast"); 72 nxt_work_queue_name(&engine->accept_work_queue, "accept"); 73 nxt_work_queue_name(&engine->read_work_queue, "read"); 74 nxt_work_queue_name(&engine->socket_work_queue, "socket"); 75 nxt_work_queue_name(&engine->connect_work_queue, "connect"); 76 nxt_work_queue_name(&engine->write_work_queue, "write"); 77 nxt_work_queue_name(&engine->shutdown_work_queue, "shutdown"); 78 nxt_work_queue_name(&engine->close_work_queue, "close"); 79 80 if (signals != NULL) { 81 engine->signals = nxt_event_engine_signals(signals); 82 if (engine->signals == NULL) { 83 goto signals_fail; 84 } 85 86 engine->signals->handler = nxt_event_engine_signal_handler; 87 88 if (!interface->signal_support) { 89 if (nxt_event_engine_signals_start(engine) != NXT_OK) { 90 goto signals_fail; 91 } 92 } 93 } 94 95 /* 96 * Number of event set and timers changes should be at least twice 97 * more than number of events to avoid premature flushes of the changes. 98 * Fourfold is for sure. 99 */ 100 events = (batch != 0) ? batch : 32; 101 102 if (interface->create(engine, 4 * events, events) != NXT_OK) { 103 goto event_set_fail; 104 } 105 106 engine->event = *interface; 107 108 if (nxt_event_engine_post_init(engine) != NXT_OK) { 109 goto post_fail; 110 } 111 112 if (nxt_timers_init(&engine->timers, 4 * events) != NXT_OK) { 113 goto timers_fail; 114 } 115 116 thread = task->thread; 117 118 nxt_thread_time_update(thread); 119 engine->timers.now = nxt_thread_monotonic_time(thread) / 1000000; 120 121 engine->max_connections = 0xffffffff; 122 123 nxt_queue_init(&engine->joints); 124 nxt_queue_init(&engine->listen_connections); 125 nxt_queue_init(&engine->idle_connections); 126 127 #if !(NXT_THREADS) 128 129 if (interface->signal_support) { 130 thread->time.signal = -1; 131 } 132 133 #endif 134 135 return engine; 136 137 timers_fail: 138 post_fail: 139 140 interface->free(engine); 141 142 event_set_fail: 143 signals_fail: 144 145 nxt_free(engine->signals); 146 nxt_work_queue_cache_destroy(&engine->work_queue_cache); 147 nxt_free(engine->fibers); 148 149 fibers_fail: 150 151 nxt_free(engine); 152 return NULL; 153 } 154 155 156 static nxt_int_t 157 nxt_event_engine_post_init(nxt_event_engine_t *engine) 158 { 159 if (engine->event.enable_post != NULL) { 160 return engine->event.enable_post(engine, nxt_event_engine_post_handler); 161 } 162 163 #if !(NXT_THREADS) 164 165 /* Only signals may are posted in single-threaded mode. */ 166 167 if (engine->event->signal_support) { 168 return NXT_OK; 169 } 170 171 #endif 172 173 if (nxt_event_engine_signal_pipe_create(engine) != NXT_OK) { 174 return NXT_ERROR; 175 } 176 177 return NXT_OK; 178 } 179 180 181 static nxt_int_t 182 nxt_event_engine_signal_pipe_create(nxt_event_engine_t *engine) 183 { 184 nxt_event_engine_pipe_t *pipe; 185 186 pipe = nxt_zalloc(sizeof(nxt_event_engine_pipe_t)); 187 if (pipe == NULL) { 188 return NXT_ERROR; 189 } 190 191 engine->pipe = pipe; 192 193 /* 194 * An event engine pipe is in blocking mode for writer 195 * and in non-blocking node for reader. 196 */ 197 198 if (nxt_pipe_create(&engine->task, pipe->fds, 1, 0) != NXT_OK) { 199 nxt_free(pipe); 200 return NXT_ERROR; 201 } 202 203 pipe->event.fd = pipe->fds[0]; 204 pipe->event.task = &engine->task; 205 pipe->event.read_work_queue = &engine->fast_work_queue; 206 pipe->event.read_handler = nxt_event_engine_signal_pipe; 207 pipe->event.write_work_queue = &engine->fast_work_queue; 208 pipe->event.error_handler = nxt_event_engine_signal_pipe_error; 209 pipe->event.log = engine->task.log; 210 211 nxt_fd_event_enable_read(engine, &pipe->event); 212 213 return NXT_OK; 214 } 215 216 217 static void 218 nxt_event_engine_signal_pipe_free(nxt_event_engine_t *engine) 219 { 220 nxt_event_engine_pipe_t *pipe; 221 222 pipe = engine->pipe; 223 224 if (pipe != NULL) { 225 226 if (pipe->event.read_work_queue != NULL) { 227 nxt_fd_event_close(engine, &pipe->event); 228 nxt_pipe_close(pipe->event.task, pipe->fds); 229 } 230 231 nxt_free(pipe); 232 } 233 } 234 235 236 static void 237 nxt_event_engine_signal_pipe_close(nxt_task_t *task, void *obj, void *data) 238 { 239 nxt_event_engine_pipe_t *pipe; 240 241 pipe = obj; 242 243 nxt_pipe_close(pipe->event.task, pipe->fds); 244 nxt_free(pipe); 245 } 246 247 248 void 249 nxt_event_engine_post(nxt_event_engine_t *engine, nxt_work_t *work) 250 { 251 nxt_debug(&engine->task, "event engine post"); 252 253 nxt_locked_work_queue_add(&engine->locked_work_queue, work); 254 255 nxt_event_engine_signal(engine, 0); 256 } 257 258 259 void 260 nxt_event_engine_signal(nxt_event_engine_t *engine, nxt_uint_t signo) 261 { 262 u_char buf; 263 264 nxt_debug(&engine->task, "event engine signal:%ui", signo); 265 266 /* 267 * A signal number may be sent in a signal context, so the signal 268 * information cannot be passed via a locked work queue. 269 */ 270 271 if (engine->event.signal != NULL) { 272 engine->event.signal(engine, signo); 273 return; 274 } 275 276 buf = (u_char) signo; 277 (void) nxt_fd_write(engine->pipe->fds[1], &buf, 1); 278 } 279 280 281 static void 282 nxt_event_engine_signal_pipe(nxt_task_t *task, void *obj, void *data) 283 { 284 int i, n; 285 u_char signo; 286 nxt_bool_t post; 287 nxt_fd_event_t *ev; 288 u_char buf[128]; 289 290 ev = obj; 291 292 nxt_debug(task, "engine signal pipe"); 293 294 post = 0; 295 296 do { 297 n = nxt_fd_read(ev->fd, buf, sizeof(buf)); 298 299 for (i = 0; i < n; i++) { 300 signo = buf[i]; 301 302 nxt_debug(task, "engine pipe signo:%d", signo); 303 304 if (signo == 0) { 305 /* A post should be processed only once. */ 306 post = 1; 307 308 } else { 309 nxt_event_engine_signal_handler(task, 310 (void *) (uintptr_t) signo, NULL); 311 } 312 } 313 314 } while (n == sizeof(buf)); 315 316 if (post) { 317 nxt_event_engine_post_handler(task, NULL, NULL); 318 } 319 } 320 321 322 static void 323 nxt_event_engine_post_handler(nxt_task_t *task, void *obj, void *data) 324 { 325 nxt_thread_t *thread; 326 nxt_event_engine_t *engine; 327 328 thread = task->thread; 329 engine = thread->engine; 330 331 nxt_locked_work_queue_move(thread, &engine->locked_work_queue, 332 &engine->fast_work_queue); 333 } 334 335 336 static void 337 nxt_event_engine_signal_pipe_error(nxt_task_t *task, void *obj, void *data) 338 { 339 nxt_event_engine_t *engine; 340 nxt_event_engine_pipe_t *pipe; 341 342 engine = task->thread->engine; 343 pipe = engine->pipe; 344 345 nxt_log(task, NXT_LOG_CRIT, "engine pipe(%FD:%FD) event error", 346 pipe->fds[0], pipe->fds[1]); 347 348 nxt_fd_event_close(engine, &pipe->event); 349 nxt_pipe_close(pipe->event.task, pipe->fds); 350 } 351 352 353 static void 354 nxt_event_engine_signal_handler(nxt_task_t *task, void *obj, void *data) 355 { 356 uintptr_t signo; 357 const nxt_sig_event_t *sigev; 358 359 signo = (uintptr_t) obj; 360 361 for (sigev = task->thread->engine->signals->sigev; 362 sigev->signo != 0; 363 sigev++) 364 { 365 if (signo == (nxt_uint_t) sigev->signo) { 366 sigev->handler(task, (void *) signo, (void *) sigev->name); 367 return; 368 } 369 } 370 371 nxt_log(task, NXT_LOG_CRIT, "signal %ui handler not found", signo); 372 } 373 374 375 nxt_int_t 376 nxt_event_engine_change(nxt_event_engine_t *engine, 377 const nxt_event_interface_t *interface, nxt_uint_t batch) 378 { 379 nxt_uint_t events; 380 381 engine->batch = batch; 382 383 if (!engine->event.signal_support && interface->signal_support) { 384 /* 385 * Block signal processing if the current event 386 * facility does not support signal processing. 387 */ 388 nxt_event_engine_signals_stop(engine); 389 390 /* 391 * Add to engine fast work queue the signal events possibly 392 * received before the blocking signal processing. 393 */ 394 nxt_event_engine_signal_pipe(&engine->task, &engine->pipe->event, NULL); 395 } 396 397 if (engine->pipe != NULL && interface->enable_post != NULL) { 398 /* 399 * An engine pipe must be closed after all signal events 400 * added above to engine fast work queue will be processed. 401 */ 402 nxt_work_queue_add(&engine->fast_work_queue, 403 nxt_event_engine_signal_pipe_close, 404 &engine->task, engine->pipe, NULL); 405 406 engine->pipe = NULL; 407 } 408 409 engine->event.free(engine); 410 411 events = (batch != 0) ? batch : 32; 412 413 if (interface->create(engine, 4 * events, events) != NXT_OK) { 414 return NXT_ERROR; 415 } 416 417 engine->event = *interface; 418 419 if (nxt_event_engine_post_init(engine) != NXT_OK) { 420 return NXT_ERROR; 421 } 422 423 if (engine->signals != NULL) { 424 425 if (!engine->event.signal_support) { 426 return nxt_event_engine_signals_start(engine); 427 } 428 429 #if (NXT_THREADS) 430 /* 431 * Reset the PID flag to start the signal thread if 432 * some future event facility will not support signals. 433 */ 434 engine->signals->process = 0; 435 #endif 436 } 437 438 return NXT_OK; 439 } 440 441 442 void 443 nxt_event_engine_free(nxt_event_engine_t *engine) 444 { 445 nxt_thread_log_debug("free engine %p", engine); 446 447 nxt_event_engine_signal_pipe_free(engine); 448 nxt_free(engine->signals); 449 450 nxt_work_queue_cache_destroy(&engine->work_queue_cache); 451 452 engine->event.free(engine); 453 454 /* TODO: free timers */ 455 456 nxt_free(engine); 457 } 458 459 460 static nxt_work_handler_t 461 nxt_event_engine_queue_pop(nxt_event_engine_t *engine, nxt_task_t **task, 462 void **obj, void **data) 463 { 464 nxt_work_queue_t *wq, *last; 465 466 wq = engine->current_work_queue; 467 last = wq; 468 469 if (wq->head == NULL) { 470 wq = &engine->fast_work_queue; 471 472 if (wq->head == NULL) { 473 474 do { 475 engine->current_work_queue++; 476 wq = engine->current_work_queue; 477 478 if (wq > &engine->close_work_queue) { 479 wq = &engine->fast_work_queue; 480 engine->current_work_queue = wq; 481 } 482 483 if (wq->head != NULL) { 484 goto found; 485 } 486 487 } while (wq != last); 488 489 engine->current_work_queue = &engine->fast_work_queue; 490 491 return NULL; 492 } 493 } 494 495 found: 496 497 nxt_debug(&engine->task, "work queue: %s", wq->name); 498 499 return nxt_work_queue_pop(wq, task, obj, data); 500 } 501 502 503 void 504 nxt_event_engine_start(nxt_event_engine_t *engine) 505 { 506 void *obj, *data; 507 nxt_task_t *task; 508 nxt_msec_t timeout, now; 509 nxt_thread_t *thr; 510 nxt_work_handler_t handler; 511 512 thr = nxt_thread(); 513 514 if (engine->fibers) { 515 /* 516 * _setjmp() cannot be wrapped in a function since return from 517 * the function clobbers stack used by future _setjmp() returns. 518 */ 519 _setjmp(engine->fibers->fiber.jmp); 520 521 /* A return point from fibers. */ 522 } 523 524 thr->log = engine->task.log; 525 526 for ( ;; ) { 527 528 for ( ;; ) { 529 handler = nxt_event_engine_queue_pop(engine, &task, &obj, &data); 530 531 if (handler == NULL) { 532 break; 533 } 534 535 thr->task = task; 536 537 handler(task, obj, data); 538 } 539 540 /* Attach some event engine work queues in preferred order. */ 541 542 timeout = nxt_timer_find(engine); 543 544 engine->event.poll(engine, timeout); 545 546 now = nxt_thread_monotonic_time(thr) / 1000000; 547 548 nxt_timer_expire(engine, now); 549 } 550 } 551 552 553 static nxt_int_t 554 nxt_req_conn_test(nxt_lvlhsh_query_t *lhq, void *data) 555 { 556 return NXT_OK; 557 } 558 559 static const nxt_lvlhsh_proto_t lvlhsh_req_conn_proto nxt_aligned(64) = { 560 NXT_LVLHSH_DEFAULT, 561 nxt_req_conn_test, 562 nxt_lvlhsh_alloc, 563 nxt_lvlhsh_free, 564 }; 565 566 567 void 568 nxt_event_engine_request_add(nxt_event_engine_t *engine, 569 nxt_req_conn_link_t *rc) 570 { 571 nxt_lvlhsh_query_t lhq; 572 573 lhq.key_hash = nxt_murmur_hash2(&rc->req_id, sizeof(rc->req_id)); 574 lhq.key.length = sizeof(rc->req_id); 575 lhq.key.start = (u_char *) &rc->req_id; 576 lhq.proto = &lvlhsh_req_conn_proto; 577 lhq.replace = 0; 578 lhq.value = rc; 579 lhq.pool = engine->mem_pool; 580 581 switch (nxt_lvlhsh_insert(&engine->requests, &lhq)) { 582 583 case NXT_OK: 584 break; 585 586 default: 587 nxt_thread_log_error(NXT_LOG_WARN, "req %08uxD to conn add failed", 588 rc->req_id); 589 break; 590 } 591 } 592 593 594 nxt_req_conn_link_t * 595 nxt_event_engine_request_find(nxt_event_engine_t *engine, nxt_req_id_t req_id) 596 { 597 nxt_lvlhsh_query_t lhq; 598 599 lhq.key_hash = nxt_murmur_hash2(&req_id, sizeof(req_id)); 600 lhq.key.length = sizeof(req_id); 601 lhq.key.start = (u_char *) &req_id; 602 lhq.proto = &lvlhsh_req_conn_proto; 603 604 if (nxt_lvlhsh_find(&engine->requests, &lhq) == NXT_OK) { 605 return lhq.value; 606 } 607 608 return NULL; 609 } 610 611 612 void 613 nxt_event_engine_request_remove(nxt_event_engine_t *engine, 614 nxt_req_conn_link_t *rc) 615 { 616 nxt_lvlhsh_query_t lhq; 617 618 lhq.key_hash = nxt_murmur_hash2(&rc->req_id, sizeof(rc->req_id)); 619 lhq.key.length = sizeof(rc->req_id); 620 lhq.key.start = (u_char *) &rc->req_id; 621 lhq.proto = &lvlhsh_req_conn_proto; 622 lhq.pool = engine->mem_pool; 623 624 switch (nxt_lvlhsh_delete(&engine->requests, &lhq)) { 625 626 case NXT_OK: 627 break; 628 629 default: 630 nxt_thread_log_error(NXT_LOG_WARN, "req %08uxD to conn remove failed", 631 rc->req_id); 632 break; 633 } 634 } 635 636 637 nxt_req_conn_link_t * 638 nxt_event_engine_request_find_remove(nxt_event_engine_t *engine, 639 nxt_req_id_t req_id) 640 { 641 nxt_lvlhsh_query_t lhq; 642 643 lhq.key_hash = nxt_murmur_hash2(&req_id, sizeof(req_id)); 644 lhq.key.length = sizeof(req_id); 645 lhq.key.start = (u_char *) &req_id; 646 lhq.proto = &lvlhsh_req_conn_proto; 647 lhq.pool = engine->mem_pool; 648 649 switch (nxt_lvlhsh_delete(&engine->requests, &lhq)) { 650 651 case NXT_OK: 652 return lhq.value; 653 654 default: 655 nxt_thread_log_error(NXT_LOG_WARN, "req %08uxD to conn remove failed", 656 req_id); 657 break; 658 } 659 660 return NULL; 661 } 662 663