1 2 /* 3 * Copyright (C) Igor Sysoev 4 * Copyright (C) NGINX, Inc. 5 */ 6 7 #include <nxt_main.h> 8 9 10 static nxt_int_t nxt_event_engine_post_init(nxt_event_engine_t *engine); 11 static nxt_int_t nxt_event_engine_signal_pipe_create( 12 nxt_event_engine_t *engine); 13 static void nxt_event_engine_signal_pipe_close(nxt_task_t *task, void *obj, 14 void *data); 15 static void nxt_event_engine_signal_pipe(nxt_task_t *task, void *obj, 16 void *data); 17 static void nxt_event_engine_post_handler(nxt_task_t *task, void *obj, 18 void *data); 19 static void nxt_event_engine_signal_pipe_error(nxt_task_t *task, void *obj, 20 void *data); 21 static void nxt_event_engine_signal_handler(nxt_task_t *task, void *obj, 22 void *data); 23 static nxt_work_handler_t nxt_event_engine_queue_pop(nxt_event_engine_t *engine, 24 nxt_task_t **task, void **obj, void **data); 25 26 27 nxt_event_engine_t * 28 nxt_event_engine_create(nxt_task_t *task, 29 const nxt_event_interface_t *interface, const nxt_sig_event_t *signals, 30 nxt_uint_t flags, nxt_uint_t batch) 31 { 32 nxt_uint_t events; 33 nxt_thread_t *thread; 34 nxt_event_engine_t *engine; 35 36 engine = nxt_zalloc(sizeof(nxt_event_engine_t)); 37 if (engine == NULL) { 38 return NULL; 39 } 40 41 nxt_debug(task, "create engine %p", engine); 42 43 thread = task->thread; 44 45 engine->task.thread = thread; 46 engine->task.log = thread->log; 47 engine->task.ident = nxt_task_next_ident(); 48 49 engine->batch = batch; 50 51 #if 0 52 if (flags & NXT_ENGINE_FIBERS) { 53 engine->fibers = nxt_fiber_main_create(engine); 54 if (engine->fibers == NULL) { 55 goto fibers_fail; 56 } 57 } 58 #endif 59 60 engine->current_work_queue = &engine->fast_work_queue; 61 62 nxt_work_queue_cache_create(&engine->work_queue_cache, 0); 63 64 engine->fast_work_queue.cache = &engine->work_queue_cache; 65 engine->accept_work_queue.cache = &engine->work_queue_cache; 66 engine->read_work_queue.cache = &engine->work_queue_cache; 67 engine->socket_work_queue.cache = &engine->work_queue_cache; 68 engine->connect_work_queue.cache = &engine->work_queue_cache; 69 engine->write_work_queue.cache = &engine->work_queue_cache; 70 engine->shutdown_work_queue.cache = &engine->work_queue_cache; 71 engine->close_work_queue.cache = &engine->work_queue_cache; 72 73 nxt_work_queue_name(&engine->fast_work_queue, "fast"); 74 nxt_work_queue_name(&engine->accept_work_queue, "accept"); 75 nxt_work_queue_name(&engine->read_work_queue, "read"); 76 nxt_work_queue_name(&engine->socket_work_queue, "socket"); 77 nxt_work_queue_name(&engine->connect_work_queue, "connect"); 78 nxt_work_queue_name(&engine->write_work_queue, "write"); 79 nxt_work_queue_name(&engine->shutdown_work_queue, "shutdown"); 80 nxt_work_queue_name(&engine->close_work_queue, "close"); 81 82 if (signals != NULL) { 83 engine->signals = nxt_event_engine_signals(signals); 84 if (engine->signals == NULL) { 85 goto signals_fail; 86 } 87 88 engine->signals->handler = nxt_event_engine_signal_handler; 89 90 if (!interface->signal_support) { 91 if (nxt_event_engine_signals_start(engine) != NXT_OK) { 92 goto signals_fail; 93 } 94 } 95 } 96 97 /* 98 * Number of event set and timers changes should be at least twice 99 * more than number of events to avoid premature flushes of the changes. 100 * Fourfold is for sure. 101 */ 102 events = (batch != 0) ? batch : 32; 103 104 if (interface->create(engine, 4 * events, events) != NXT_OK) { 105 goto event_set_fail; 106 } 107 108 engine->event = *interface; 109 110 if (nxt_event_engine_post_init(engine) != NXT_OK) { 111 goto post_fail; 112 } 113 114 if (nxt_timers_init(&engine->timers, 4 * events) != NXT_OK) { 115 goto timers_fail; 116 } 117 118 thread = task->thread; 119 120 nxt_thread_time_update(thread); 121 engine->timers.now = nxt_thread_monotonic_time(thread) / 1000000; 122 123 engine->max_connections = 0xffffffff; 124 125 nxt_queue_init(&engine->joints); 126 nxt_queue_init(&engine->listen_connections); 127 nxt_queue_init(&engine->idle_connections); 128 129 return engine; 130 131 timers_fail: 132 post_fail: 133 134 interface->free(engine); 135 136 event_set_fail: 137 signals_fail: 138 139 nxt_free(engine->signals); 140 nxt_work_queue_cache_destroy(&engine->work_queue_cache); 141 nxt_free(engine->fibers); 142 143 #if 0 144 fibers_fail: 145 146 nxt_free(engine); 147 #endif 148 149 return NULL; 150 } 151 152 153 static nxt_int_t 154 nxt_event_engine_post_init(nxt_event_engine_t *engine) 155 { 156 if (engine->event.enable_post != NULL) { 157 return engine->event.enable_post(engine, nxt_event_engine_post_handler); 158 } 159 160 if (nxt_event_engine_signal_pipe_create(engine) != NXT_OK) { 161 return NXT_ERROR; 162 } 163 164 return NXT_OK; 165 } 166 167 168 static nxt_int_t 169 nxt_event_engine_signal_pipe_create(nxt_event_engine_t *engine) 170 { 171 nxt_event_engine_pipe_t *pipe; 172 173 pipe = nxt_zalloc(sizeof(nxt_event_engine_pipe_t)); 174 if (pipe == NULL) { 175 return NXT_ERROR; 176 } 177 178 engine->pipe = pipe; 179 180 /* 181 * An event engine pipe is in blocking mode for writer 182 * and in non-blocking node for reader. 183 */ 184 185 if (nxt_pipe_create(&engine->task, pipe->fds, 1, 0) != NXT_OK) { 186 nxt_free(pipe); 187 return NXT_ERROR; 188 } 189 190 pipe->event.fd = pipe->fds[0]; 191 pipe->event.task = &engine->task; 192 pipe->event.read_work_queue = &engine->fast_work_queue; 193 pipe->event.read_handler = nxt_event_engine_signal_pipe; 194 pipe->event.write_work_queue = &engine->fast_work_queue; 195 pipe->event.error_handler = nxt_event_engine_signal_pipe_error; 196 pipe->event.log = engine->task.log; 197 198 nxt_fd_event_enable_read(engine, &pipe->event); 199 200 return NXT_OK; 201 } 202 203 204 static void 205 nxt_event_engine_signal_pipe_free(nxt_event_engine_t *engine) 206 { 207 nxt_event_engine_pipe_t *pipe; 208 209 pipe = engine->pipe; 210 211 if (pipe != NULL) { 212 213 if (pipe->event.read_work_queue != NULL) { 214 nxt_fd_event_close(engine, &pipe->event); 215 nxt_pipe_close(pipe->event.task, pipe->fds); 216 } 217 218 nxt_free(pipe); 219 } 220 } 221 222 223 static void 224 nxt_event_engine_signal_pipe_close(nxt_task_t *task, void *obj, void *data) 225 { 226 nxt_event_engine_pipe_t *pipe; 227 228 pipe = obj; 229 230 nxt_pipe_close(pipe->event.task, pipe->fds); 231 nxt_free(pipe); 232 } 233 234 235 void 236 nxt_event_engine_post(nxt_event_engine_t *engine, nxt_work_t *work) 237 { 238 nxt_debug(&engine->task, "event engine post"); 239 240 #if (NXT_DEBUG) 241 if (nxt_slow_path(work->next != NULL)) { 242 nxt_debug(&engine->task, "event engine post multiple works"); 243 } 244 #endif 245 246 nxt_locked_work_queue_add(&engine->locked_work_queue, work); 247 248 nxt_event_engine_signal(engine, 0); 249 } 250 251 252 void 253 nxt_event_engine_signal(nxt_event_engine_t *engine, nxt_uint_t signo) 254 { 255 u_char buf; 256 257 nxt_debug(&engine->task, "event engine signal:%ui", signo); 258 259 /* 260 * A signal number may be sent in a signal context, so the signal 261 * information cannot be passed via a locked work queue. 262 */ 263 264 if (engine->event.signal != NULL) { 265 engine->event.signal(engine, signo); 266 return; 267 } 268 269 buf = (u_char) signo; 270 (void) nxt_fd_write(engine->pipe->fds[1], &buf, 1); 271 } 272 273 274 static void 275 nxt_event_engine_signal_pipe(nxt_task_t *task, void *obj, void *data) 276 { 277 int i, n; 278 u_char signo; 279 nxt_bool_t post; 280 nxt_fd_event_t *ev; 281 u_char buf[128]; 282 283 ev = obj; 284 285 nxt_debug(task, "engine signal pipe"); 286 287 post = 0; 288 289 do { 290 n = nxt_fd_read(ev->fd, buf, sizeof(buf)); 291 292 for (i = 0; i < n; i++) { 293 signo = buf[i]; 294 295 nxt_debug(task, "engine pipe signo:%d", signo); 296 297 if (signo == 0) { 298 /* A post should be processed only once. */ 299 post = 1; 300 301 } else { 302 nxt_event_engine_signal_handler(task, 303 (void *) (uintptr_t) signo, NULL); 304 } 305 } 306 307 } while (n == sizeof(buf)); 308 309 if (post) { 310 nxt_event_engine_post_handler(task, NULL, NULL); 311 } 312 } 313 314 315 static void 316 nxt_event_engine_post_handler(nxt_task_t *task, void *obj, void *data) 317 { 318 nxt_thread_t *thread; 319 nxt_event_engine_t *engine; 320 321 thread = task->thread; 322 engine = thread->engine; 323 324 nxt_locked_work_queue_move(thread, &engine->locked_work_queue, 325 &engine->fast_work_queue); 326 } 327 328 329 static void 330 nxt_event_engine_signal_pipe_error(nxt_task_t *task, void *obj, void *data) 331 { 332 nxt_event_engine_t *engine; 333 nxt_event_engine_pipe_t *pipe; 334 335 engine = task->thread->engine; 336 pipe = engine->pipe; 337 338 nxt_log(task, NXT_LOG_CRIT, "engine pipe(%FD:%FD) event error", 339 pipe->fds[0], pipe->fds[1]); 340 341 nxt_fd_event_close(engine, &pipe->event); 342 nxt_pipe_close(pipe->event.task, pipe->fds); 343 } 344 345 346 static void 347 nxt_event_engine_signal_handler(nxt_task_t *task, void *obj, void *data) 348 { 349 uintptr_t signo; 350 const nxt_sig_event_t *sigev; 351 352 signo = (uintptr_t) obj; 353 354 for (sigev = task->thread->engine->signals->sigev; 355 sigev->signo != 0; 356 sigev++) 357 { 358 if (signo == (nxt_uint_t) sigev->signo) { 359 sigev->handler(task, (void *) signo, (void *) sigev->name); 360 return; 361 } 362 } 363 364 nxt_log(task, NXT_LOG_CRIT, "signal %ui handler not found", signo); 365 } 366 367 368 nxt_int_t 369 nxt_event_engine_change(nxt_event_engine_t *engine, 370 const nxt_event_interface_t *interface, nxt_uint_t batch) 371 { 372 nxt_uint_t events; 373 374 engine->batch = batch; 375 376 if (!engine->event.signal_support && interface->signal_support) { 377 /* 378 * Block signal processing if the current event 379 * facility does not support signal processing. 380 */ 381 nxt_event_engine_signals_stop(engine); 382 383 /* 384 * Add to engine fast work queue the signal events possibly 385 * received before the blocking signal processing. 386 */ 387 nxt_event_engine_signal_pipe(&engine->task, &engine->pipe->event, NULL); 388 } 389 390 if (engine->pipe != NULL && interface->enable_post != NULL) { 391 /* 392 * An engine pipe must be closed after all signal events 393 * added above to engine fast work queue will be processed. 394 */ 395 nxt_work_queue_add(&engine->fast_work_queue, 396 nxt_event_engine_signal_pipe_close, 397 &engine->task, engine->pipe, NULL); 398 399 engine->pipe = NULL; 400 } 401 402 engine->event.free(engine); 403 404 events = (batch != 0) ? batch : 32; 405 406 if (interface->create(engine, 4 * events, events) != NXT_OK) { 407 return NXT_ERROR; 408 } 409 410 engine->event = *interface; 411 412 if (nxt_event_engine_post_init(engine) != NXT_OK) { 413 return NXT_ERROR; 414 } 415 416 if (engine->signals != NULL) { 417 418 if (!engine->event.signal_support) { 419 return nxt_event_engine_signals_start(engine); 420 } 421 422 /* 423 * Reset the PID flag to start the signal thread if 424 * some future event facility will not support signals. 425 */ 426 engine->signals->process = 0; 427 } 428 429 return NXT_OK; 430 } 431 432 433 void 434 nxt_event_engine_free(nxt_event_engine_t *engine) 435 { 436 nxt_thread_log_debug("free engine %p", engine); 437 438 nxt_event_engine_signal_pipe_free(engine); 439 nxt_free(engine->signals); 440 441 nxt_work_queue_cache_destroy(&engine->work_queue_cache); 442 443 engine->event.free(engine); 444 445 /* TODO: free timers */ 446 447 nxt_free(engine); 448 } 449 450 451 static nxt_work_handler_t 452 nxt_event_engine_queue_pop(nxt_event_engine_t *engine, nxt_task_t **task, 453 void **obj, void **data) 454 { 455 nxt_work_queue_t *wq, *last; 456 457 wq = engine->current_work_queue; 458 last = wq; 459 460 if (wq->head == NULL) { 461 wq = &engine->fast_work_queue; 462 463 if (wq->head == NULL) { 464 465 do { 466 engine->current_work_queue++; 467 wq = engine->current_work_queue; 468 469 if (wq > &engine->close_work_queue) { 470 wq = &engine->fast_work_queue; 471 engine->current_work_queue = wq; 472 } 473 474 if (wq->head != NULL) { 475 goto found; 476 } 477 478 } while (wq != last); 479 480 engine->current_work_queue = &engine->fast_work_queue; 481 482 return NULL; 483 } 484 } 485 486 found: 487 488 nxt_debug(&engine->task, "work queue: %s", wq->name); 489 490 return nxt_work_queue_pop(wq, task, obj, data); 491 } 492 493 494 void 495 nxt_event_engine_start(nxt_event_engine_t *engine) 496 { 497 void *obj, *data; 498 nxt_task_t *task; 499 nxt_msec_t timeout, now; 500 nxt_thread_t *thr; 501 nxt_work_handler_t handler; 502 503 thr = nxt_thread(); 504 505 if (engine->fibers) { 506 /* 507 * _setjmp() cannot be wrapped in a function since return from 508 * the function clobbers stack used by future _setjmp() returns. 509 */ 510 _setjmp(engine->fibers->fiber.jmp); 511 512 /* A return point from fibers. */ 513 } 514 515 thr->log = engine->task.log; 516 517 for ( ;; ) { 518 519 for ( ;; ) { 520 handler = nxt_event_engine_queue_pop(engine, &task, &obj, &data); 521 522 if (handler == NULL) { 523 break; 524 } 525 526 thr->task = task; 527 528 handler(task, obj, data); 529 } 530 531 /* Attach some event engine work queues in preferred order. */ 532 533 timeout = nxt_timer_find(engine); 534 535 engine->event.poll(engine, timeout); 536 537 now = nxt_thread_monotonic_time(thr) / 1000000; 538 539 nxt_timer_expire(engine, now); 540 } 541 } 542 543 544 #if (NXT_DEBUG) 545 546 void nxt_event_engine_thread_adopt(nxt_event_engine_t *engine) 547 { 548 nxt_work_queue_thread_adopt(&engine->fast_work_queue); 549 nxt_work_queue_thread_adopt(&engine->accept_work_queue); 550 nxt_work_queue_thread_adopt(&engine->read_work_queue); 551 nxt_work_queue_thread_adopt(&engine->socket_work_queue); 552 nxt_work_queue_thread_adopt(&engine->connect_work_queue); 553 nxt_work_queue_thread_adopt(&engine->write_work_queue); 554 nxt_work_queue_thread_adopt(&engine->shutdown_work_queue); 555 nxt_work_queue_thread_adopt(&engine->close_work_queue); 556 } 557 558 #endif 559