1 2 /* 3 * Copyright (C) Igor Sysoev 4 * Copyright (C) NGINX, Inc. 5 */ 6 7 #include <nxt_main.h> 8 9 10 static nxt_int_t nxt_event_engine_post_init(nxt_event_engine_t *engine); 11 static nxt_int_t nxt_event_engine_signal_pipe_create( 12 nxt_event_engine_t *engine); 13 static void nxt_event_engine_signal_pipe_close(nxt_task_t *task, void *obj, 14 void *data); 15 static void nxt_event_engine_signal_pipe(nxt_task_t *task, void *obj, 16 void *data); 17 static void nxt_event_engine_post_handler(nxt_task_t *task, void *obj, 18 void *data); 19 static void nxt_event_engine_signal_pipe_error(nxt_task_t *task, void *obj, 20 void *data); 21 static void nxt_event_engine_signal_handler(nxt_task_t *task, void *obj, 22 void *data); 23 static nxt_work_handler_t nxt_event_engine_queue_pop(nxt_event_engine_t *engine, 24 nxt_task_t **task, void **obj, void **data); 25 26 27 nxt_event_engine_t * 28 nxt_event_engine_create(nxt_task_t *task, 29 const nxt_event_interface_t *interface, const nxt_sig_event_t *signals, 30 nxt_uint_t flags, nxt_uint_t batch) 31 { 32 nxt_uint_t events; 33 nxt_thread_t *thread; 34 nxt_event_engine_t *engine; 35 36 engine = nxt_zalloc(sizeof(nxt_event_engine_t)); 37 if (engine == NULL) { 38 return NULL; 39 } 40 41 nxt_debug(task, "create engine %p", engine); 42 43 thread = task->thread; 44 45 engine->task.thread = thread; 46 engine->task.log = thread->log; 47 engine->task.ident = nxt_task_next_ident(); 48 49 engine->batch = batch; 50 51 if (flags & NXT_ENGINE_FIBERS) { 52 engine->fibers = nxt_fiber_main_create(engine); 53 if (engine->fibers == NULL) { 54 goto fibers_fail; 55 } 56 } 57 58 engine->current_work_queue = &engine->fast_work_queue; 59 60 nxt_work_queue_cache_create(&engine->work_queue_cache, 0); 61 62 engine->fast_work_queue.cache = &engine->work_queue_cache; 63 engine->accept_work_queue.cache = &engine->work_queue_cache; 64 engine->read_work_queue.cache = &engine->work_queue_cache; 65 engine->socket_work_queue.cache = &engine->work_queue_cache; 66 engine->connect_work_queue.cache = &engine->work_queue_cache; 67 engine->write_work_queue.cache = &engine->work_queue_cache; 68 engine->shutdown_work_queue.cache = &engine->work_queue_cache; 69 engine->close_work_queue.cache = &engine->work_queue_cache; 70 71 nxt_work_queue_name(&engine->fast_work_queue, "fast"); 72 nxt_work_queue_name(&engine->accept_work_queue, "accept"); 73 nxt_work_queue_name(&engine->read_work_queue, "read"); 74 nxt_work_queue_name(&engine->socket_work_queue, "socket"); 75 nxt_work_queue_name(&engine->connect_work_queue, "connect"); 76 nxt_work_queue_name(&engine->write_work_queue, "write"); 77 nxt_work_queue_name(&engine->shutdown_work_queue, "shutdown"); 78 nxt_work_queue_name(&engine->close_work_queue, "close"); 79 80 if (signals != NULL) { 81 engine->signals = nxt_event_engine_signals(signals); 82 if (engine->signals == NULL) { 83 goto signals_fail; 84 } 85 86 engine->signals->handler = nxt_event_engine_signal_handler; 87 88 if (!interface->signal_support) { 89 if (nxt_event_engine_signals_start(engine) != NXT_OK) { 90 goto signals_fail; 91 } 92 } 93 } 94 95 /* 96 * Number of event set and timers changes should be at least twice 97 * more than number of events to avoid premature flushes of the changes. 98 * Fourfold is for sure. 99 */ 100 events = (batch != 0) ? batch : 32; 101 102 if (interface->create(engine, 4 * events, events) != NXT_OK) { 103 goto event_set_fail; 104 } 105 106 engine->event = *interface; 107 108 if (nxt_event_engine_post_init(engine) != NXT_OK) { 109 goto post_fail; 110 } 111 112 if (nxt_timers_init(&engine->timers, 4 * events) != NXT_OK) { 113 goto timers_fail; 114 } 115 116 thread = task->thread; 117 118 nxt_thread_time_update(thread); 119 engine->timers.now = nxt_thread_monotonic_time(thread) / 1000000; 120 121 engine->max_connections = 0xffffffff; 122 123 nxt_queue_init(&engine->joints); 124 nxt_queue_init(&engine->listen_connections); 125 nxt_queue_init(&engine->idle_connections); 126 127 return engine; 128 129 timers_fail: 130 post_fail: 131 132 interface->free(engine); 133 134 event_set_fail: 135 signals_fail: 136 137 nxt_free(engine->signals); 138 nxt_work_queue_cache_destroy(&engine->work_queue_cache); 139 nxt_free(engine->fibers); 140 141 fibers_fail: 142 143 nxt_free(engine); 144 return NULL; 145 } 146 147 148 static nxt_int_t 149 nxt_event_engine_post_init(nxt_event_engine_t *engine) 150 { 151 if (engine->event.enable_post != NULL) { 152 return engine->event.enable_post(engine, nxt_event_engine_post_handler); 153 } 154 155 if (nxt_event_engine_signal_pipe_create(engine) != NXT_OK) { 156 return NXT_ERROR; 157 } 158 159 return NXT_OK; 160 } 161 162 163 static nxt_int_t 164 nxt_event_engine_signal_pipe_create(nxt_event_engine_t *engine) 165 { 166 nxt_event_engine_pipe_t *pipe; 167 168 pipe = nxt_zalloc(sizeof(nxt_event_engine_pipe_t)); 169 if (pipe == NULL) { 170 return NXT_ERROR; 171 } 172 173 engine->pipe = pipe; 174 175 /* 176 * An event engine pipe is in blocking mode for writer 177 * and in non-blocking node for reader. 178 */ 179 180 if (nxt_pipe_create(&engine->task, pipe->fds, 1, 0) != NXT_OK) { 181 nxt_free(pipe); 182 return NXT_ERROR; 183 } 184 185 pipe->event.fd = pipe->fds[0]; 186 pipe->event.task = &engine->task; 187 pipe->event.read_work_queue = &engine->fast_work_queue; 188 pipe->event.read_handler = nxt_event_engine_signal_pipe; 189 pipe->event.write_work_queue = &engine->fast_work_queue; 190 pipe->event.error_handler = nxt_event_engine_signal_pipe_error; 191 pipe->event.log = engine->task.log; 192 193 nxt_fd_event_enable_read(engine, &pipe->event); 194 195 return NXT_OK; 196 } 197 198 199 static void 200 nxt_event_engine_signal_pipe_free(nxt_event_engine_t *engine) 201 { 202 nxt_event_engine_pipe_t *pipe; 203 204 pipe = engine->pipe; 205 206 if (pipe != NULL) { 207 208 if (pipe->event.read_work_queue != NULL) { 209 nxt_fd_event_close(engine, &pipe->event); 210 nxt_pipe_close(pipe->event.task, pipe->fds); 211 } 212 213 nxt_free(pipe); 214 } 215 } 216 217 218 static void 219 nxt_event_engine_signal_pipe_close(nxt_task_t *task, void *obj, void *data) 220 { 221 nxt_event_engine_pipe_t *pipe; 222 223 pipe = obj; 224 225 nxt_pipe_close(pipe->event.task, pipe->fds); 226 nxt_free(pipe); 227 } 228 229 230 void 231 nxt_event_engine_post(nxt_event_engine_t *engine, nxt_work_t *work) 232 { 233 nxt_debug(&engine->task, "event engine post"); 234 235 #if (NXT_DEBUG) 236 if (nxt_slow_path(work->next != NULL)) { 237 nxt_debug(&engine->task, "event engine post multiple works"); 238 } 239 #endif 240 241 nxt_locked_work_queue_add(&engine->locked_work_queue, work); 242 243 nxt_event_engine_signal(engine, 0); 244 } 245 246 247 void 248 nxt_event_engine_signal(nxt_event_engine_t *engine, nxt_uint_t signo) 249 { 250 u_char buf; 251 252 nxt_debug(&engine->task, "event engine signal:%ui", signo); 253 254 /* 255 * A signal number may be sent in a signal context, so the signal 256 * information cannot be passed via a locked work queue. 257 */ 258 259 if (engine->event.signal != NULL) { 260 engine->event.signal(engine, signo); 261 return; 262 } 263 264 buf = (u_char) signo; 265 (void) nxt_fd_write(engine->pipe->fds[1], &buf, 1); 266 } 267 268 269 static void 270 nxt_event_engine_signal_pipe(nxt_task_t *task, void *obj, void *data) 271 { 272 int i, n; 273 u_char signo; 274 nxt_bool_t post; 275 nxt_fd_event_t *ev; 276 u_char buf[128]; 277 278 ev = obj; 279 280 nxt_debug(task, "engine signal pipe"); 281 282 post = 0; 283 284 do { 285 n = nxt_fd_read(ev->fd, buf, sizeof(buf)); 286 287 for (i = 0; i < n; i++) { 288 signo = buf[i]; 289 290 nxt_debug(task, "engine pipe signo:%d", signo); 291 292 if (signo == 0) { 293 /* A post should be processed only once. */ 294 post = 1; 295 296 } else { 297 nxt_event_engine_signal_handler(task, 298 (void *) (uintptr_t) signo, NULL); 299 } 300 } 301 302 } while (n == sizeof(buf)); 303 304 if (post) { 305 nxt_event_engine_post_handler(task, NULL, NULL); 306 } 307 } 308 309 310 static void 311 nxt_event_engine_post_handler(nxt_task_t *task, void *obj, void *data) 312 { 313 nxt_thread_t *thread; 314 nxt_event_engine_t *engine; 315 316 thread = task->thread; 317 engine = thread->engine; 318 319 nxt_locked_work_queue_move(thread, &engine->locked_work_queue, 320 &engine->fast_work_queue); 321 } 322 323 324 static void 325 nxt_event_engine_signal_pipe_error(nxt_task_t *task, void *obj, void *data) 326 { 327 nxt_event_engine_t *engine; 328 nxt_event_engine_pipe_t *pipe; 329 330 engine = task->thread->engine; 331 pipe = engine->pipe; 332 333 nxt_log(task, NXT_LOG_CRIT, "engine pipe(%FD:%FD) event error", 334 pipe->fds[0], pipe->fds[1]); 335 336 nxt_fd_event_close(engine, &pipe->event); 337 nxt_pipe_close(pipe->event.task, pipe->fds); 338 } 339 340 341 static void 342 nxt_event_engine_signal_handler(nxt_task_t *task, void *obj, void *data) 343 { 344 uintptr_t signo; 345 const nxt_sig_event_t *sigev; 346 347 signo = (uintptr_t) obj; 348 349 for (sigev = task->thread->engine->signals->sigev; 350 sigev->signo != 0; 351 sigev++) 352 { 353 if (signo == (nxt_uint_t) sigev->signo) { 354 sigev->handler(task, (void *) signo, (void *) sigev->name); 355 return; 356 } 357 } 358 359 nxt_log(task, NXT_LOG_CRIT, "signal %ui handler not found", signo); 360 } 361 362 363 nxt_int_t 364 nxt_event_engine_change(nxt_event_engine_t *engine, 365 const nxt_event_interface_t *interface, nxt_uint_t batch) 366 { 367 nxt_uint_t events; 368 369 engine->batch = batch; 370 371 if (!engine->event.signal_support && interface->signal_support) { 372 /* 373 * Block signal processing if the current event 374 * facility does not support signal processing. 375 */ 376 nxt_event_engine_signals_stop(engine); 377 378 /* 379 * Add to engine fast work queue the signal events possibly 380 * received before the blocking signal processing. 381 */ 382 nxt_event_engine_signal_pipe(&engine->task, &engine->pipe->event, NULL); 383 } 384 385 if (engine->pipe != NULL && interface->enable_post != NULL) { 386 /* 387 * An engine pipe must be closed after all signal events 388 * added above to engine fast work queue will be processed. 389 */ 390 nxt_work_queue_add(&engine->fast_work_queue, 391 nxt_event_engine_signal_pipe_close, 392 &engine->task, engine->pipe, NULL); 393 394 engine->pipe = NULL; 395 } 396 397 engine->event.free(engine); 398 399 events = (batch != 0) ? batch : 32; 400 401 if (interface->create(engine, 4 * events, events) != NXT_OK) { 402 return NXT_ERROR; 403 } 404 405 engine->event = *interface; 406 407 if (nxt_event_engine_post_init(engine) != NXT_OK) { 408 return NXT_ERROR; 409 } 410 411 if (engine->signals != NULL) { 412 413 if (!engine->event.signal_support) { 414 return nxt_event_engine_signals_start(engine); 415 } 416 417 /* 418 * Reset the PID flag to start the signal thread if 419 * some future event facility will not support signals. 420 */ 421 engine->signals->process = 0; 422 } 423 424 return NXT_OK; 425 } 426 427 428 void 429 nxt_event_engine_free(nxt_event_engine_t *engine) 430 { 431 nxt_thread_log_debug("free engine %p", engine); 432 433 nxt_event_engine_signal_pipe_free(engine); 434 nxt_free(engine->signals); 435 436 nxt_work_queue_cache_destroy(&engine->work_queue_cache); 437 438 engine->event.free(engine); 439 440 /* TODO: free timers */ 441 442 nxt_free(engine); 443 } 444 445 446 static nxt_work_handler_t 447 nxt_event_engine_queue_pop(nxt_event_engine_t *engine, nxt_task_t **task, 448 void **obj, void **data) 449 { 450 nxt_work_queue_t *wq, *last; 451 452 wq = engine->current_work_queue; 453 last = wq; 454 455 if (wq->head == NULL) { 456 wq = &engine->fast_work_queue; 457 458 if (wq->head == NULL) { 459 460 do { 461 engine->current_work_queue++; 462 wq = engine->current_work_queue; 463 464 if (wq > &engine->close_work_queue) { 465 wq = &engine->fast_work_queue; 466 engine->current_work_queue = wq; 467 } 468 469 if (wq->head != NULL) { 470 goto found; 471 } 472 473 } while (wq != last); 474 475 engine->current_work_queue = &engine->fast_work_queue; 476 477 return NULL; 478 } 479 } 480 481 found: 482 483 nxt_debug(&engine->task, "work queue: %s", wq->name); 484 485 return nxt_work_queue_pop(wq, task, obj, data); 486 } 487 488 489 void 490 nxt_event_engine_start(nxt_event_engine_t *engine) 491 { 492 void *obj, *data; 493 nxt_task_t *task; 494 nxt_msec_t timeout, now; 495 nxt_thread_t *thr; 496 nxt_work_handler_t handler; 497 498 thr = nxt_thread(); 499 500 if (engine->fibers) { 501 /* 502 * _setjmp() cannot be wrapped in a function since return from 503 * the function clobbers stack used by future _setjmp() returns. 504 */ 505 _setjmp(engine->fibers->fiber.jmp); 506 507 /* A return point from fibers. */ 508 } 509 510 thr->log = engine->task.log; 511 512 for ( ;; ) { 513 514 for ( ;; ) { 515 handler = nxt_event_engine_queue_pop(engine, &task, &obj, &data); 516 517 if (handler == NULL) { 518 break; 519 } 520 521 thr->task = task; 522 523 handler(task, obj, data); 524 } 525 526 /* Attach some event engine work queues in preferred order. */ 527 528 timeout = nxt_timer_find(engine); 529 530 engine->event.poll(engine, timeout); 531 532 now = nxt_thread_monotonic_time(thr) / 1000000; 533 534 nxt_timer_expire(engine, now); 535 } 536 } 537 538 539 #if (NXT_DEBUG) 540 541 void nxt_event_engine_thread_adopt(nxt_event_engine_t *engine) 542 { 543 nxt_work_queue_thread_adopt(&engine->fast_work_queue); 544 nxt_work_queue_thread_adopt(&engine->accept_work_queue); 545 nxt_work_queue_thread_adopt(&engine->read_work_queue); 546 nxt_work_queue_thread_adopt(&engine->socket_work_queue); 547 nxt_work_queue_thread_adopt(&engine->connect_work_queue); 548 nxt_work_queue_thread_adopt(&engine->write_work_queue); 549 nxt_work_queue_thread_adopt(&engine->shutdown_work_queue); 550 nxt_work_queue_thread_adopt(&engine->close_work_queue); 551 } 552 553 #endif 554