1 2 /* 3 * Copyright (C) Igor Sysoev 4 * Copyright (C) NGINX, Inc. 5 */ 6 7 #include <nxt_main.h> 8 9 10 static nxt_int_t nxt_event_engine_post_init(nxt_event_engine_t *engine); 11 static nxt_int_t nxt_event_engine_signal_pipe_create( 12 nxt_event_engine_t *engine); 13 static void nxt_event_engine_signal_pipe_close(nxt_task_t *task, void *obj, 14 void *data); 15 static void nxt_event_engine_signal_pipe(nxt_task_t *task, void *obj, 16 void *data); 17 static void nxt_event_engine_post_handler(nxt_task_t *task, void *obj, 18 void *data); 19 static void nxt_event_engine_signal_pipe_error(nxt_task_t *task, void *obj, 20 void *data); 21 static void nxt_event_engine_signal_handler(nxt_task_t *task, void *obj, 22 void *data); 23 static nxt_work_handler_t nxt_event_engine_queue_pop(nxt_event_engine_t *engine, 24 nxt_task_t **task, void **obj, void **data); 25 26 27 nxt_event_engine_t * 28 nxt_event_engine_create(nxt_task_t *task, 29 const nxt_event_interface_t *interface, const nxt_sig_event_t *signals, 30 nxt_uint_t flags, nxt_uint_t batch) 31 { 32 nxt_uint_t events; 33 nxt_thread_t *thread; 34 nxt_event_engine_t *engine; 35 36 engine = nxt_zalloc(sizeof(nxt_event_engine_t)); 37 if (engine == NULL) { 38 return NULL; 39 } 40 41 thread = task->thread; 42 43 engine->task.thread = thread; 44 engine->task.log = thread->log; 45 engine->task.ident = nxt_task_next_ident(); 46 47 thread->engine = engine; 48 thread->fiber = &engine->fibers->fiber; 49 50 engine->batch = batch; 51 52 if (flags & NXT_ENGINE_FIBERS) { 53 engine->fibers = nxt_fiber_main_create(engine); 54 if (engine->fibers == NULL) { 55 goto fibers_fail; 56 } 57 } 58 59 engine->current_work_queue = &engine->fast_work_queue; 60 61 nxt_work_queue_cache_create(&engine->work_queue_cache, 0); 62 63 engine->fast_work_queue.cache = &engine->work_queue_cache; 64 engine->accept_work_queue.cache = &engine->work_queue_cache; 65 engine->read_work_queue.cache = &engine->work_queue_cache; 66 engine->socket_work_queue.cache = &engine->work_queue_cache; 67 engine->connect_work_queue.cache = &engine->work_queue_cache; 68 engine->write_work_queue.cache = &engine->work_queue_cache; 69 engine->shutdown_work_queue.cache = &engine->work_queue_cache; 70 engine->close_work_queue.cache = &engine->work_queue_cache; 71 72 nxt_work_queue_name(&engine->fast_work_queue, "fast"); 73 nxt_work_queue_name(&engine->accept_work_queue, "accept"); 74 nxt_work_queue_name(&engine->read_work_queue, "read"); 75 nxt_work_queue_name(&engine->socket_work_queue, "socket"); 76 nxt_work_queue_name(&engine->connect_work_queue, "connect"); 77 nxt_work_queue_name(&engine->write_work_queue, "write"); 78 nxt_work_queue_name(&engine->shutdown_work_queue, "shutdown"); 79 nxt_work_queue_name(&engine->close_work_queue, "close"); 80 81 if (signals != NULL) { 82 engine->signals = nxt_event_engine_signals(signals); 83 if (engine->signals == NULL) { 84 goto signals_fail; 85 } 86 87 engine->signals->handler = nxt_event_engine_signal_handler; 88 89 if (!interface->signal_support) { 90 if (nxt_event_engine_signals_start(engine) != NXT_OK) { 91 goto signals_fail; 92 } 93 } 94 } 95 96 /* 97 * Number of event set and timers changes should be at least twice 98 * more than number of events to avoid premature flushes of the changes. 99 * Fourfold is for sure. 100 */ 101 events = (batch != 0) ? batch : 32; 102 103 if (interface->create(engine, 4 * events, events) != NXT_OK) { 104 goto event_set_fail; 105 } 106 107 engine->event = *interface; 108 109 if (nxt_event_engine_post_init(engine) != NXT_OK) { 110 goto post_fail; 111 } 112 113 if (nxt_timers_init(&engine->timers, 4 * events) != NXT_OK) { 114 goto timers_fail; 115 } 116 117 thread = task->thread; 118 119 nxt_thread_time_update(thread); 120 engine->timers.now = nxt_thread_monotonic_time(thread) / 1000000; 121 122 engine->max_connections = 0xffffffff; 123 124 nxt_queue_init(&engine->listen_connections); 125 nxt_queue_init(&engine->idle_connections); 126 127 #if !(NXT_THREADS) 128 129 if (interface->signal_support) { 130 thread->time.signal = -1; 131 } 132 133 #endif 134 135 return engine; 136 137 timers_fail: 138 post_fail: 139 140 interface->free(engine); 141 142 event_set_fail: 143 signals_fail: 144 145 nxt_free(engine->signals); 146 nxt_work_queue_cache_destroy(&engine->work_queue_cache); 147 nxt_free(engine->fibers); 148 149 fibers_fail: 150 151 nxt_free(engine); 152 return NULL; 153 } 154 155 156 static nxt_int_t 157 nxt_event_engine_post_init(nxt_event_engine_t *engine) 158 { 159 if (engine->event.enable_post != NULL) { 160 return engine->event.enable_post(engine, nxt_event_engine_post_handler); 161 } 162 163 #if !(NXT_THREADS) 164 165 /* Only signals may are posted in single-threaded mode. */ 166 167 if (engine->event->signal_support) { 168 return NXT_OK; 169 } 170 171 #endif 172 173 if (nxt_event_engine_signal_pipe_create(engine) != NXT_OK) { 174 return NXT_ERROR; 175 } 176 177 return NXT_OK; 178 } 179 180 181 static nxt_int_t 182 nxt_event_engine_signal_pipe_create(nxt_event_engine_t *engine) 183 { 184 nxt_event_engine_pipe_t *pipe; 185 186 pipe = nxt_zalloc(sizeof(nxt_event_engine_pipe_t)); 187 if (pipe == NULL) { 188 return NXT_ERROR; 189 } 190 191 engine->pipe = pipe; 192 193 /* 194 * An event engine pipe is in blocking mode for writer 195 * and in non-blocking node for reader. 196 */ 197 198 if (nxt_pipe_create(&engine->task, pipe->fds, 1, 0) != NXT_OK) { 199 nxt_free(pipe); 200 return NXT_ERROR; 201 } 202 203 pipe->event.fd = pipe->fds[0]; 204 pipe->event.task = &engine->task; 205 pipe->event.read_work_queue = &engine->fast_work_queue; 206 pipe->event.read_handler = nxt_event_engine_signal_pipe; 207 pipe->event.write_work_queue = &engine->fast_work_queue; 208 pipe->event.error_handler = nxt_event_engine_signal_pipe_error; 209 pipe->event.log = engine->task.log; 210 211 nxt_fd_event_enable_read(engine, &pipe->event); 212 213 return NXT_OK; 214 } 215 216 217 static void 218 nxt_event_engine_signal_pipe_free(nxt_event_engine_t *engine) 219 { 220 nxt_event_engine_pipe_t *pipe; 221 222 pipe = engine->pipe; 223 224 if (pipe != NULL) { 225 226 if (pipe->event.read_work_queue != NULL) { 227 nxt_fd_event_close(engine, &pipe->event); 228 nxt_pipe_close(pipe->event.task, pipe->fds); 229 } 230 231 nxt_free(pipe); 232 } 233 } 234 235 236 static void 237 nxt_event_engine_signal_pipe_close(nxt_task_t *task, void *obj, void *data) 238 { 239 nxt_event_engine_pipe_t *pipe; 240 241 pipe = obj; 242 243 nxt_pipe_close(pipe->event.task, pipe->fds); 244 nxt_free(pipe); 245 } 246 247 248 void 249 nxt_event_engine_post(nxt_event_engine_t *engine, nxt_work_t *work) 250 { 251 nxt_debug(&engine->task, "event engine post"); 252 253 nxt_locked_work_queue_add(&engine->locked_work_queue, work); 254 255 nxt_event_engine_signal(engine, 0); 256 } 257 258 259 void 260 nxt_event_engine_signal(nxt_event_engine_t *engine, nxt_uint_t signo) 261 { 262 u_char buf; 263 264 nxt_debug(&engine->task, "event engine signal:%ui", signo); 265 266 /* 267 * A signal number may be sent in a signal context, so the signal 268 * information cannot be passed via a locked work queue. 269 */ 270 271 if (engine->event.signal != NULL) { 272 engine->event.signal(engine, signo); 273 return; 274 } 275 276 buf = (u_char) signo; 277 (void) nxt_fd_write(engine->pipe->fds[1], &buf, 1); 278 } 279 280 281 static void 282 nxt_event_engine_signal_pipe(nxt_task_t *task, void *obj, void *data) 283 { 284 int i, n; 285 u_char signo; 286 nxt_bool_t post; 287 nxt_fd_event_t *ev; 288 u_char buf[128]; 289 290 ev = obj; 291 292 nxt_debug(task, "engine signal pipe"); 293 294 post = 0; 295 296 do { 297 n = nxt_fd_read(ev->fd, buf, sizeof(buf)); 298 299 for (i = 0; i < n; i++) { 300 signo = buf[i]; 301 302 nxt_debug(task, "engine pipe signo:%d", signo); 303 304 if (signo == 0) { 305 /* A post should be processed only once. */ 306 post = 1; 307 308 } else { 309 nxt_event_engine_signal_handler(task, 310 (void *) (uintptr_t) signo, NULL); 311 } 312 } 313 314 } while (n == sizeof(buf)); 315 316 if (post) { 317 nxt_event_engine_post_handler(task, NULL, NULL); 318 } 319 } 320 321 322 static void 323 nxt_event_engine_post_handler(nxt_task_t *task, void *obj, void *data) 324 { 325 nxt_thread_t *thread; 326 nxt_event_engine_t *engine; 327 328 thread = task->thread; 329 engine = thread->engine; 330 331 nxt_locked_work_queue_move(thread, &engine->locked_work_queue, 332 &engine->fast_work_queue); 333 } 334 335 336 static void 337 nxt_event_engine_signal_pipe_error(nxt_task_t *task, void *obj, void *data) 338 { 339 nxt_event_engine_t *engine; 340 nxt_event_engine_pipe_t *pipe; 341 342 engine = task->thread->engine; 343 pipe = engine->pipe; 344 345 nxt_log(task, NXT_LOG_CRIT, "engine pipe(%FD:%FD) event error", 346 pipe->fds[0], pipe->fds[1]); 347 348 nxt_fd_event_close(engine, &pipe->event); 349 nxt_pipe_close(pipe->event.task, pipe->fds); 350 } 351 352 353 static void 354 nxt_event_engine_signal_handler(nxt_task_t *task, void *obj, void *data) 355 { 356 uintptr_t signo; 357 const nxt_sig_event_t *sigev; 358 359 signo = (uintptr_t) obj; 360 361 for (sigev = task->thread->engine->signals->sigev; 362 sigev->signo != 0; 363 sigev++) 364 { 365 if (signo == (nxt_uint_t) sigev->signo) { 366 sigev->handler(task, (void *) signo, (void *) sigev->name); 367 return; 368 } 369 } 370 371 nxt_log(task, NXT_LOG_CRIT, "signal %ui handler not found", signo); 372 } 373 374 375 nxt_int_t 376 nxt_event_engine_change(nxt_event_engine_t *engine, 377 const nxt_event_interface_t *interface, nxt_uint_t batch) 378 { 379 nxt_uint_t events; 380 381 engine->batch = batch; 382 383 if (!engine->event.signal_support && interface->signal_support) { 384 /* 385 * Block signal processing if the current event 386 * facility does not support signal processing. 387 */ 388 nxt_event_engine_signals_stop(engine); 389 390 /* 391 * Add to engine fast work queue the signal events possibly 392 * received before the blocking signal processing. 393 */ 394 nxt_event_engine_signal_pipe(&engine->task, &engine->pipe->event, NULL); 395 } 396 397 if (engine->pipe != NULL && interface->enable_post != NULL) { 398 /* 399 * An engine pipe must be closed after all signal events 400 * added above to engine fast work queue will be processed. 401 */ 402 nxt_work_queue_add(&engine->fast_work_queue, 403 nxt_event_engine_signal_pipe_close, 404 &engine->task, engine->pipe, NULL); 405 406 engine->pipe = NULL; 407 } 408 409 engine->event.free(engine); 410 411 events = (batch != 0) ? batch : 32; 412 413 if (interface->create(engine, 4 * events, events) != NXT_OK) { 414 return NXT_ERROR; 415 } 416 417 engine->event = *interface; 418 419 if (nxt_event_engine_post_init(engine) != NXT_OK) { 420 return NXT_ERROR; 421 } 422 423 if (engine->signals != NULL) { 424 425 if (!engine->event.signal_support) { 426 return nxt_event_engine_signals_start(engine); 427 } 428 429 #if (NXT_THREADS) 430 /* 431 * Reset the PID flag to start the signal thread if 432 * some future event facility will not support signals. 433 */ 434 engine->signals->process = 0; 435 #endif 436 } 437 438 return NXT_OK; 439 } 440 441 442 void 443 nxt_event_engine_free(nxt_event_engine_t *engine) 444 { 445 nxt_event_engine_signal_pipe_free(engine); 446 nxt_free(engine->signals); 447 448 nxt_work_queue_cache_destroy(&engine->work_queue_cache); 449 450 engine->event.free(engine); 451 452 /* TODO: free timers */ 453 454 nxt_free(engine); 455 } 456 457 458 static nxt_work_handler_t 459 nxt_event_engine_queue_pop(nxt_event_engine_t *engine, nxt_task_t **task, 460 void **obj, void **data) 461 { 462 nxt_work_queue_t *wq, *last; 463 464 wq = engine->current_work_queue; 465 last = wq; 466 467 if (wq->head == NULL) { 468 wq = &engine->fast_work_queue; 469 470 if (wq->head == NULL) { 471 472 do { 473 engine->current_work_queue++; 474 wq = engine->current_work_queue; 475 476 if (wq > &engine->close_work_queue) { 477 wq = &engine->fast_work_queue; 478 engine->current_work_queue = wq; 479 } 480 481 if (wq->head != NULL) { 482 goto found; 483 } 484 485 } while (wq != last); 486 487 engine->current_work_queue = &engine->fast_work_queue; 488 489 return NULL; 490 } 491 } 492 493 found: 494 495 nxt_debug(&engine->task, "work queue: %s", wq->name); 496 497 return nxt_work_queue_pop(wq, task, obj, data); 498 } 499 500 501 void 502 nxt_event_engine_start(nxt_event_engine_t *engine) 503 { 504 void *obj, *data; 505 nxt_task_t *task; 506 nxt_msec_t timeout, now; 507 nxt_thread_t *thr; 508 nxt_work_handler_t handler; 509 510 thr = nxt_thread(); 511 512 if (engine->fibers) { 513 /* 514 * _setjmp() cannot be wrapped in a function since return from 515 * the function clobbers stack used by future _setjmp() returns. 516 */ 517 _setjmp(engine->fibers->fiber.jmp); 518 519 /* A return point from fibers. */ 520 } 521 522 thr->log = engine->task.log; 523 524 for ( ;; ) { 525 526 for ( ;; ) { 527 handler = nxt_event_engine_queue_pop(engine, &task, &obj, &data); 528 529 if (handler == NULL) { 530 break; 531 } 532 533 handler(task, obj, data); 534 } 535 536 /* Attach some event engine work queues in preferred order. */ 537 538 timeout = nxt_timer_find(engine); 539 540 engine->event.poll(engine, timeout); 541 542 now = nxt_thread_monotonic_time(thr) / 1000000; 543 544 nxt_timer_expire(engine, now); 545 } 546 } 547