1 2 /* 3 * Copyright (C) Igor Sysoev 4 * Copyright (C) NGINX, Inc. 5 */ 6 7 #include <nxt_main.h> 8 9 10 static nxt_int_t nxt_event_engine_post_init(nxt_event_engine_t *engine); 11 static nxt_int_t nxt_event_engine_signal_pipe_create( 12 nxt_event_engine_t *engine); 13 static void nxt_event_engine_signal_pipe_close(nxt_task_t *task, void *obj, 14 void *data); 15 static void nxt_event_engine_signal_pipe(nxt_task_t *task, void *obj, 16 void *data); 17 static void nxt_event_engine_post_handler(nxt_task_t *task, void *obj, 18 void *data); 19 static void nxt_event_engine_signal_pipe_error(nxt_task_t *task, void *obj, 20 void *data); 21 static void nxt_event_engine_signal_handler(nxt_task_t *task, void *obj, 22 void *data); 23 static nxt_work_handler_t nxt_event_engine_queue_pop(nxt_event_engine_t *engine, 24 nxt_task_t **task, void **obj, void **data); 25 26 27 nxt_event_engine_t * 28 nxt_event_engine_create(nxt_thread_t *thr, 29 const nxt_event_interface_t *interface, const nxt_sig_event_t *signals, 30 nxt_uint_t flags, nxt_uint_t batch) 31 { 32 nxt_uint_t events; 33 nxt_event_engine_t *engine; 34 35 engine = nxt_zalloc(sizeof(nxt_event_engine_t)); 36 if (engine == NULL) { 37 return NULL; 38 } 39 40 engine->task.thread = thr; 41 engine->task.log = thr->log; 42 engine->task.ident = nxt_task_next_ident(); 43 44 thr->engine = engine; 45 thr->fiber = &engine->fibers->fiber; 46 47 engine->batch = batch; 48 49 if (flags & NXT_ENGINE_FIBERS) { 50 engine->fibers = nxt_fiber_main_create(engine); 51 if (engine->fibers == NULL) { 52 goto fibers_fail; 53 } 54 } 55 56 engine->current_work_queue = &engine->fast_work_queue; 57 58 nxt_work_queue_cache_create(&engine->work_queue_cache, 0); 59 60 engine->fast_work_queue.cache = &engine->work_queue_cache; 61 engine->accept_work_queue.cache = &engine->work_queue_cache; 62 engine->read_work_queue.cache = &engine->work_queue_cache; 63 engine->socket_work_queue.cache = &engine->work_queue_cache; 64 engine->connect_work_queue.cache = &engine->work_queue_cache; 65 engine->write_work_queue.cache = &engine->work_queue_cache; 66 engine->shutdown_work_queue.cache = &engine->work_queue_cache; 67 engine->close_work_queue.cache = &engine->work_queue_cache; 68 69 nxt_work_queue_name(&engine->fast_work_queue, "fast"); 70 nxt_work_queue_name(&engine->accept_work_queue, "accept"); 71 nxt_work_queue_name(&engine->read_work_queue, "read"); 72 nxt_work_queue_name(&engine->socket_work_queue, "socket"); 73 nxt_work_queue_name(&engine->connect_work_queue, "connect"); 74 nxt_work_queue_name(&engine->write_work_queue, "write"); 75 nxt_work_queue_name(&engine->shutdown_work_queue, "shutdown"); 76 nxt_work_queue_name(&engine->close_work_queue, "close"); 77 78 if (signals != NULL) { 79 engine->signals = nxt_event_engine_signals(signals); 80 if (engine->signals == NULL) { 81 goto signals_fail; 82 } 83 84 engine->signals->handler = nxt_event_engine_signal_handler; 85 86 if (!interface->signal_support) { 87 if (nxt_event_engine_signals_start(engine) != NXT_OK) { 88 goto signals_fail; 89 } 90 } 91 } 92 93 /* 94 * Number of event set and timers changes should be at least twice 95 * more than number of events to avoid premature flushes of the changes. 96 * Fourfold is for sure. 97 */ 98 events = (batch != 0) ? batch : 32; 99 100 if (interface->create(engine, 4 * events, events) != NXT_OK) { 101 goto event_set_fail; 102 } 103 104 engine->event = *interface; 105 106 if (nxt_event_engine_post_init(engine) != NXT_OK) { 107 goto post_fail; 108 } 109 110 if (nxt_timers_init(&engine->timers, 4 * events) != NXT_OK) { 111 goto timers_fail; 112 } 113 114 nxt_thread_time_update(thr); 115 engine->timers.now = nxt_thread_monotonic_time(thr) / 1000000; 116 117 engine->max_connections = 0xffffffff; 118 119 nxt_queue_init(&engine->listen_connections); 120 nxt_queue_init(&engine->idle_connections); 121 122 #if !(NXT_THREADS) 123 124 if (interface->signal_support) { 125 thr->time.signal = -1; 126 } 127 128 #endif 129 130 return engine; 131 132 timers_fail: 133 post_fail: 134 135 interface->free(engine); 136 137 event_set_fail: 138 signals_fail: 139 140 nxt_free(engine->signals); 141 nxt_work_queue_cache_destroy(&engine->work_queue_cache); 142 nxt_free(engine->fibers); 143 144 fibers_fail: 145 146 nxt_free(engine); 147 return NULL; 148 } 149 150 151 static nxt_int_t 152 nxt_event_engine_post_init(nxt_event_engine_t *engine) 153 { 154 if (engine->event.enable_post != NULL) { 155 return engine->event.enable_post(engine, nxt_event_engine_post_handler); 156 } 157 158 #if !(NXT_THREADS) 159 160 /* Only signals may are posted in single-threaded mode. */ 161 162 if (engine->event->signal_support) { 163 return NXT_OK; 164 } 165 166 #endif 167 168 if (nxt_event_engine_signal_pipe_create(engine) != NXT_OK) { 169 return NXT_ERROR; 170 } 171 172 return NXT_OK; 173 } 174 175 176 static nxt_int_t 177 nxt_event_engine_signal_pipe_create(nxt_event_engine_t *engine) 178 { 179 nxt_event_engine_pipe_t *pipe; 180 181 pipe = nxt_zalloc(sizeof(nxt_event_engine_pipe_t)); 182 if (pipe == NULL) { 183 return NXT_ERROR; 184 } 185 186 engine->pipe = pipe; 187 188 /* 189 * An event engine pipe is in blocking mode for writer 190 * and in non-blocking node for reader. 191 */ 192 193 if (nxt_pipe_create(pipe->fds, 1, 0) != NXT_OK) { 194 nxt_free(pipe); 195 return NXT_ERROR; 196 } 197 198 pipe->event.fd = pipe->fds[0]; 199 pipe->event.task = &engine->task; 200 pipe->event.read_work_queue = &engine->fast_work_queue; 201 pipe->event.read_handler = nxt_event_engine_signal_pipe; 202 pipe->event.write_work_queue = &engine->fast_work_queue; 203 pipe->event.error_handler = nxt_event_engine_signal_pipe_error; 204 pipe->event.log = engine->task.log; 205 206 nxt_fd_event_enable_read(engine, &pipe->event); 207 208 return NXT_OK; 209 } 210 211 212 static void 213 nxt_event_engine_signal_pipe_free(nxt_event_engine_t *engine) 214 { 215 nxt_event_engine_pipe_t *pipe; 216 217 pipe = engine->pipe; 218 219 if (pipe != NULL) { 220 221 if (pipe->event.read_work_queue != NULL) { 222 nxt_fd_event_close(engine, &pipe->event); 223 nxt_pipe_close(pipe->fds); 224 } 225 226 nxt_free(pipe); 227 } 228 } 229 230 231 static void 232 nxt_event_engine_signal_pipe_close(nxt_task_t *task, void *obj, void *data) 233 { 234 nxt_event_engine_pipe_t *pipe; 235 236 pipe = obj; 237 238 nxt_pipe_close(pipe->fds); 239 nxt_free(pipe); 240 } 241 242 243 void 244 nxt_event_engine_post(nxt_event_engine_t *engine, nxt_work_t *work) 245 { 246 nxt_debug(&engine->task, "event engine post"); 247 248 nxt_locked_work_queue_add(&engine->locked_work_queue, work); 249 250 nxt_event_engine_signal(engine, 0); 251 } 252 253 254 void 255 nxt_event_engine_signal(nxt_event_engine_t *engine, nxt_uint_t signo) 256 { 257 u_char buf; 258 259 nxt_debug(&engine->task, "event engine signal:%ui", signo); 260 261 /* 262 * A signal number may be sent in a signal context, so the signal 263 * information cannot be passed via a locked work queue. 264 */ 265 266 if (engine->event.signal != NULL) { 267 engine->event.signal(engine, signo); 268 return; 269 } 270 271 buf = (u_char) signo; 272 (void) nxt_fd_write(engine->pipe->fds[1], &buf, 1); 273 } 274 275 276 static void 277 nxt_event_engine_signal_pipe(nxt_task_t *task, void *obj, void *data) 278 { 279 int i, n; 280 u_char signo; 281 nxt_bool_t post; 282 nxt_fd_event_t *ev; 283 u_char buf[128]; 284 285 ev = obj; 286 287 nxt_debug(task, "engine signal pipe"); 288 289 post = 0; 290 291 do { 292 n = nxt_fd_read(ev->fd, buf, sizeof(buf)); 293 294 for (i = 0; i < n; i++) { 295 signo = buf[i]; 296 297 nxt_debug(task, "engine pipe signo:%d", signo); 298 299 if (signo == 0) { 300 /* A post should be processed only once. */ 301 post = 1; 302 303 } else { 304 nxt_event_engine_signal_handler(task, 305 (void *) (uintptr_t) signo, NULL); 306 } 307 } 308 309 } while (n == sizeof(buf)); 310 311 if (post) { 312 nxt_event_engine_post_handler(task, NULL, NULL); 313 } 314 } 315 316 317 static void 318 nxt_event_engine_post_handler(nxt_task_t *task, void *obj, void *data) 319 { 320 nxt_thread_t *thread; 321 nxt_event_engine_t *engine; 322 323 thread = task->thread; 324 engine = thread->engine; 325 326 nxt_locked_work_queue_move(thread, &engine->locked_work_queue, 327 &engine->fast_work_queue); 328 } 329 330 331 static void 332 nxt_event_engine_signal_pipe_error(nxt_task_t *task, void *obj, void *data) 333 { 334 nxt_event_engine_t *engine; 335 336 engine = task->thread->engine; 337 338 nxt_log(task, NXT_LOG_CRIT, "engine pipe(%FD:%FD) event error", 339 engine->pipe->fds[0], engine->pipe->fds[1]); 340 341 nxt_fd_event_close(engine, &engine->pipe->event); 342 nxt_pipe_close(engine->pipe->fds); 343 } 344 345 346 static void 347 nxt_event_engine_signal_handler(nxt_task_t *task, void *obj, void *data) 348 { 349 uintptr_t signo; 350 const nxt_sig_event_t *sigev; 351 352 signo = (uintptr_t) obj; 353 354 for (sigev = task->thread->engine->signals->sigev; 355 sigev->signo != 0; 356 sigev++) 357 { 358 if (signo == (nxt_uint_t) sigev->signo) { 359 sigev->handler(task, (void *) signo, (void *) sigev->name); 360 return; 361 } 362 } 363 364 nxt_log(task, NXT_LOG_CRIT, "signal %ui handler not found", signo); 365 } 366 367 368 nxt_int_t 369 nxt_event_engine_change(nxt_thread_t *thr, nxt_task_t *task, 370 const nxt_event_interface_t *interface, nxt_uint_t batch) 371 { 372 nxt_uint_t events; 373 nxt_event_engine_t *engine; 374 375 engine = thr->engine; 376 engine->batch = batch; 377 378 if (!engine->event.signal_support && interface->signal_support) { 379 /* 380 * Block signal processing if the current event 381 * facility does not support signal processing. 382 */ 383 nxt_event_engine_signals_stop(engine); 384 385 /* 386 * Add to engine fast work queue the signal events possibly 387 * received before the blocking signal processing. 388 */ 389 nxt_event_engine_signal_pipe(task, &engine->pipe->event, NULL); 390 } 391 392 if (engine->pipe != NULL && interface->enable_post != NULL) { 393 /* 394 * An engine pipe must be closed after all signal events 395 * added above to engine fast work queue will be processed. 396 */ 397 nxt_work_queue_add(&engine->fast_work_queue, 398 nxt_event_engine_signal_pipe_close, 399 &engine->task, engine->pipe, NULL); 400 401 engine->pipe = NULL; 402 } 403 404 engine->event.free(engine); 405 406 events = (batch != 0) ? batch : 32; 407 408 if (interface->create(engine, 4 * events, events) != NXT_OK) { 409 return NXT_ERROR; 410 } 411 412 engine->event = *interface; 413 414 if (nxt_event_engine_post_init(engine) != NXT_OK) { 415 return NXT_ERROR; 416 } 417 418 if (engine->signals != NULL) { 419 420 if (!engine->event.signal_support) { 421 return nxt_event_engine_signals_start(engine); 422 } 423 424 #if (NXT_THREADS) 425 /* 426 * Reset the PID flag to start the signal thread if 427 * some future event facility will not support signals. 428 */ 429 engine->signals->process = 0; 430 #endif 431 } 432 433 return NXT_OK; 434 } 435 436 437 void 438 nxt_event_engine_free(nxt_event_engine_t *engine) 439 { 440 nxt_event_engine_signal_pipe_free(engine); 441 nxt_free(engine->signals); 442 443 nxt_work_queue_cache_destroy(&engine->work_queue_cache); 444 445 engine->event.free(engine); 446 447 /* TODO: free timers */ 448 449 nxt_free(engine); 450 } 451 452 453 static nxt_work_handler_t 454 nxt_event_engine_queue_pop(nxt_event_engine_t *engine, nxt_task_t **task, 455 void **obj, void **data) 456 { 457 nxt_work_queue_t *wq, *last; 458 459 wq = engine->current_work_queue; 460 last = wq; 461 462 if (wq->head == NULL) { 463 wq = &engine->fast_work_queue; 464 465 if (wq->head == NULL) { 466 467 do { 468 engine->current_work_queue++; 469 wq = engine->current_work_queue; 470 471 if (wq > &engine->close_work_queue) { 472 wq = &engine->fast_work_queue; 473 engine->current_work_queue = wq; 474 } 475 476 if (wq->head != NULL) { 477 goto found; 478 } 479 480 } while (wq != last); 481 482 engine->current_work_queue = &engine->fast_work_queue; 483 484 return NULL; 485 } 486 } 487 488 found: 489 490 nxt_debug(&engine->task, "work queue: %s", wq->name); 491 492 return nxt_work_queue_pop(wq, task, obj, data); 493 } 494 495 496 void 497 nxt_event_engine_start(nxt_event_engine_t *engine) 498 { 499 void *obj, *data; 500 nxt_task_t *task; 501 nxt_msec_t timeout, now; 502 nxt_thread_t *thr; 503 nxt_work_handler_t handler; 504 505 thr = nxt_thread(); 506 507 if (engine->fibers) { 508 /* 509 * _setjmp() cannot be wrapped in a function since return from 510 * the function clobbers stack used by future _setjmp() returns. 511 */ 512 _setjmp(engine->fibers->fiber.jmp); 513 514 /* A return point from fibers. */ 515 } 516 517 thr->log = engine->task.log; 518 519 for ( ;; ) { 520 521 for ( ;; ) { 522 handler = nxt_event_engine_queue_pop(engine, &task, &obj, &data); 523 524 if (handler == NULL) { 525 break; 526 } 527 528 handler(task, obj, data); 529 } 530 531 /* Attach some event engine work queues in preferred order. */ 532 533 timeout = nxt_timer_find(engine); 534 535 engine->event.poll(engine, timeout); 536 537 now = nxt_thread_monotonic_time(thr) / 1000000; 538 539 nxt_timer_expire(engine, now); 540 } 541 } 542