1 2 /* 3 * Copyright (C) Igor Sysoev 4 * Copyright (C) NGINX, Inc. 5 */ 6 7 #include <nxt_main.h> 8 9 10 static nxt_int_t nxt_event_engine_post_init(nxt_event_engine_t *engine); 11 static nxt_int_t nxt_event_engine_signal_pipe_create( 12 nxt_event_engine_t *engine); 13 static void nxt_event_engine_signal_pipe_close(nxt_task_t *task, void *obj, 14 void *data); 15 static void nxt_event_engine_signal_pipe(nxt_task_t *task, void *obj, 16 void *data); 17 static void nxt_event_engine_post_handler(nxt_task_t *task, void *obj, 18 void *data); 19 static void nxt_event_engine_signal_pipe_error(nxt_task_t *task, void *obj, 20 void *data); 21 static void nxt_event_engine_signal_handler(nxt_task_t *task, void *obj, 22 void *data); 23 static nxt_work_handler_t nxt_event_engine_queue_pop(nxt_event_engine_t *engine, 24 nxt_task_t **task, void **obj, void **data); 25 26 27 nxt_event_engine_t * 28 nxt_event_engine_create(nxt_thread_t *thr, const nxt_event_set_ops_t *event_set, 29 const nxt_event_sig_t *signals, nxt_uint_t flags, nxt_uint_t batch) 30 { 31 nxt_uint_t events; 32 nxt_event_engine_t *engine; 33 34 engine = nxt_zalloc(sizeof(nxt_event_engine_t)); 35 if (engine == NULL) { 36 return NULL; 37 } 38 39 engine->task.thread = thr; 40 engine->task.log = thr->log; 41 engine->task.ident = nxt_task_next_ident(); 42 43 thr->engine = engine; 44 thr->fiber = &engine->fibers->fiber; 45 46 engine->batch = batch; 47 48 if (flags & NXT_ENGINE_FIBERS) { 49 engine->fibers = nxt_fiber_main_create(engine); 50 if (engine->fibers == NULL) { 51 goto fibers_fail; 52 } 53 } 54 55 engine->current_work_queue = &engine->fast_work_queue; 56 57 nxt_work_queue_cache_create(&engine->work_queue_cache, 0); 58 59 engine->fast_work_queue.cache = &engine->work_queue_cache; 60 engine->accept_work_queue.cache = &engine->work_queue_cache; 61 engine->read_work_queue.cache = &engine->work_queue_cache; 62 engine->socket_work_queue.cache = &engine->work_queue_cache; 63 engine->connect_work_queue.cache = &engine->work_queue_cache; 64 engine->write_work_queue.cache = &engine->work_queue_cache; 65 engine->shutdown_work_queue.cache = &engine->work_queue_cache; 66 engine->close_work_queue.cache = &engine->work_queue_cache; 67 engine->final_work_queue.cache = &engine->work_queue_cache; 68 69 nxt_work_queue_name(&engine->fast_work_queue, "fast"); 70 nxt_work_queue_name(&engine->accept_work_queue, "accept"); 71 nxt_work_queue_name(&engine->read_work_queue, "read"); 72 nxt_work_queue_name(&engine->socket_work_queue, "socket"); 73 nxt_work_queue_name(&engine->connect_work_queue, "connect"); 74 nxt_work_queue_name(&engine->write_work_queue, "write"); 75 nxt_work_queue_name(&engine->shutdown_work_queue, "shutdown"); 76 nxt_work_queue_name(&engine->close_work_queue, "close"); 77 nxt_work_queue_name(&engine->final_work_queue, "final"); 78 79 if (signals != NULL) { 80 engine->signals = nxt_event_engine_signals(signals); 81 if (engine->signals == NULL) { 82 goto signals_fail; 83 } 84 85 engine->signals->handler = nxt_event_engine_signal_handler; 86 87 if (!event_set->signal_support) { 88 if (nxt_event_engine_signals_start(engine) != NXT_OK) { 89 goto signals_fail; 90 } 91 } 92 } 93 94 /* 95 * Number of event set and timers changes should be at least twice 96 * more than number of events to avoid premature flushes of the changes. 97 * Fourfold is for sure. 98 */ 99 events = (batch != 0) ? batch : 32; 100 101 engine->event_set = event_set->create(engine->signals, 4 * events, events); 102 if (engine->event_set == NULL) { 103 goto event_set_fail; 104 } 105 106 engine->event = event_set; 107 108 if (nxt_event_engine_post_init(engine) != NXT_OK) { 109 goto post_fail; 110 } 111 112 if (nxt_timers_init(&engine->timers, 4 * events) != NXT_OK) { 113 goto timers_fail; 114 } 115 116 nxt_thread_time_update(thr); 117 engine->timers.now = nxt_thread_monotonic_time(thr) / 1000000; 118 119 engine->max_connections = 0xffffffff; 120 121 nxt_queue_init(&engine->listen_connections); 122 nxt_queue_init(&engine->idle_connections); 123 124 engine->thread = thr; 125 126 #if !(NXT_THREADS) 127 128 if (engine->event->signal_support) { 129 thr->time.signal = -1; 130 } 131 132 #endif 133 134 return engine; 135 136 timers_fail: 137 post_fail: 138 139 event_set->free(engine->event_set); 140 141 event_set_fail: 142 signals_fail: 143 144 nxt_free(engine->signals); 145 nxt_work_queue_cache_destroy(&engine->work_queue_cache); 146 nxt_free(engine->fibers); 147 148 fibers_fail: 149 150 nxt_free(engine); 151 return NULL; 152 } 153 154 155 static nxt_int_t 156 nxt_event_engine_post_init(nxt_event_engine_t *engine) 157 { 158 if (engine->event->enable_post != NULL) { 159 return engine->event->enable_post(engine->event_set, 160 nxt_event_engine_post_handler); 161 } 162 163 #if !(NXT_THREADS) 164 165 /* Only signals may are posted in single-threaded mode. */ 166 167 if (engine->event->signal_support) { 168 return NXT_OK; 169 } 170 171 #endif 172 173 if (nxt_event_engine_signal_pipe_create(engine) != NXT_OK) { 174 return NXT_ERROR; 175 } 176 177 return NXT_OK; 178 } 179 180 181 static nxt_int_t 182 nxt_event_engine_signal_pipe_create(nxt_event_engine_t *engine) 183 { 184 nxt_event_engine_pipe_t *pipe; 185 186 pipe = nxt_zalloc(sizeof(nxt_event_engine_pipe_t)); 187 if (pipe == NULL) { 188 return NXT_ERROR; 189 } 190 191 engine->pipe = pipe; 192 193 /* 194 * An event engine pipe is in blocking mode for writer 195 * and in non-blocking node for reader. 196 */ 197 198 if (nxt_pipe_create(pipe->fds, 1, 0) != NXT_OK) { 199 nxt_free(pipe); 200 return NXT_ERROR; 201 } 202 203 pipe->event.fd = pipe->fds[0]; 204 pipe->event.read_work_queue = &engine->fast_work_queue; 205 pipe->event.read_handler = nxt_event_engine_signal_pipe; 206 pipe->event.write_work_queue = &engine->fast_work_queue; 207 pipe->event.error_handler = nxt_event_engine_signal_pipe_error; 208 pipe->event.log = &nxt_main_log; 209 210 nxt_event_fd_enable_read(engine, &pipe->event); 211 212 return NXT_OK; 213 } 214 215 216 static void 217 nxt_event_engine_signal_pipe_free(nxt_event_engine_t *engine) 218 { 219 nxt_event_engine_pipe_t *pipe; 220 221 pipe = engine->pipe; 222 223 if (pipe != NULL) { 224 225 if (pipe->event.read_work_queue != NULL) { 226 nxt_event_fd_close(engine, &pipe->event); 227 nxt_pipe_close(pipe->fds); 228 } 229 230 nxt_free(pipe); 231 } 232 } 233 234 235 static void 236 nxt_event_engine_signal_pipe_close(nxt_task_t *task, void *obj, void *data) 237 { 238 nxt_event_engine_pipe_t *pipe; 239 240 pipe = obj; 241 242 nxt_pipe_close(pipe->fds); 243 nxt_free(pipe); 244 } 245 246 247 void 248 nxt_event_engine_post(nxt_event_engine_t *engine, nxt_work_t *work) 249 { 250 nxt_thread_log_debug("event engine post"); 251 252 nxt_locked_work_queue_add(&engine->locked_work_queue, work); 253 254 nxt_event_engine_signal(engine, 0); 255 } 256 257 258 void 259 nxt_event_engine_signal(nxt_event_engine_t *engine, nxt_uint_t signo) 260 { 261 u_char buf; 262 263 nxt_thread_log_debug("event engine signal:%ui", signo); 264 265 /* 266 * A signal number may be sent in a signal context, so the signal 267 * information cannot be passed via a locked work queue. 268 */ 269 270 if (engine->event->signal != NULL) { 271 engine->event->signal(engine->event_set, signo); 272 return; 273 } 274 275 buf = (u_char) signo; 276 (void) nxt_fd_write(engine->pipe->fds[1], &buf, 1); 277 } 278 279 280 static void 281 nxt_event_engine_signal_pipe(nxt_task_t *task, void *obj, void *data) 282 { 283 int i, n; 284 u_char signo; 285 nxt_bool_t post; 286 nxt_event_fd_t *ev; 287 u_char buf[128]; 288 289 ev = obj; 290 291 nxt_debug(task, "engine signal pipe"); 292 293 post = 0; 294 295 do { 296 n = nxt_fd_read(ev->fd, buf, sizeof(buf)); 297 298 for (i = 0; i < n; i++) { 299 signo = buf[i]; 300 301 nxt_debug(task, "engine pipe signo:%d", signo); 302 303 if (signo == 0) { 304 /* A post should be processed only once. */ 305 post = 1; 306 307 } else { 308 nxt_event_engine_signal_handler(task, 309 (void *) (uintptr_t) signo, NULL); 310 } 311 } 312 313 } while (n == sizeof(buf)); 314 315 if (post) { 316 nxt_event_engine_post_handler(task, NULL, NULL); 317 } 318 } 319 320 321 static void 322 nxt_event_engine_post_handler(nxt_task_t *task, void *obj, void *data) 323 { 324 nxt_thread_t *thread; 325 nxt_event_engine_t *engine; 326 327 thread = task->thread; 328 engine = thread->engine; 329 330 nxt_locked_work_queue_move(thread, &engine->locked_work_queue, 331 &engine->fast_work_queue); 332 } 333 334 335 static void 336 nxt_event_engine_signal_pipe_error(nxt_task_t *task, void *obj, void *data) 337 { 338 nxt_event_engine_t *engine; 339 340 engine = task->thread->engine; 341 342 nxt_log(task, NXT_LOG_CRIT, "engine pipe(%FD:%FD) event error", 343 engine->pipe->fds[0], engine->pipe->fds[1]); 344 345 nxt_event_fd_close(engine, &engine->pipe->event); 346 nxt_pipe_close(engine->pipe->fds); 347 } 348 349 350 static void 351 nxt_event_engine_signal_handler(nxt_task_t *task, void *obj, void *data) 352 { 353 uintptr_t signo; 354 const nxt_event_sig_t *sigev; 355 356 signo = (uintptr_t) obj; 357 358 for (sigev = task->thread->engine->signals->sigev; 359 sigev->signo != 0; 360 sigev++) 361 { 362 if (signo == (nxt_uint_t) sigev->signo) { 363 sigev->handler(task, (void *) signo, (void *) sigev->name); 364 return; 365 } 366 } 367 368 nxt_log(task, NXT_LOG_CRIT, "signal %ui handler not found", signo); 369 } 370 371 372 nxt_int_t 373 nxt_event_engine_change(nxt_thread_t *thr, nxt_task_t *task, 374 const nxt_event_set_ops_t *event_set, nxt_uint_t batch) 375 { 376 nxt_uint_t events; 377 nxt_event_engine_t *engine; 378 379 engine = thr->engine; 380 engine->batch = batch; 381 382 if (!engine->event->signal_support && event_set->signal_support) { 383 /* 384 * Block signal processing if the current event 385 * facility does not support signal processing. 386 */ 387 nxt_event_engine_signals_stop(engine); 388 389 /* 390 * Add to engine fast work queue the signal events possibly 391 * received before the blocking signal processing. 392 */ 393 nxt_event_engine_signal_pipe(task, &engine->pipe->event, NULL); 394 } 395 396 if (engine->pipe != NULL && event_set->enable_post != NULL) { 397 /* 398 * An engine pipe must be closed after all signal events 399 * added above to engine fast work queue will be processed. 400 */ 401 nxt_work_queue_add(&engine->final_work_queue, 402 nxt_event_engine_signal_pipe_close, 403 &engine->task, engine->pipe, NULL); 404 405 engine->pipe = NULL; 406 } 407 408 engine->event->free(engine->event_set); 409 410 events = (batch != 0) ? batch : 32; 411 412 engine->event_set = event_set->create(engine->signals, 4 * events, events); 413 if (engine->event_set == NULL) { 414 return NXT_ERROR; 415 } 416 417 engine->event = event_set; 418 419 if (nxt_event_engine_post_init(engine) != NXT_OK) { 420 return NXT_ERROR; 421 } 422 423 if (engine->signals != NULL) { 424 425 if (!engine->event->signal_support) { 426 return nxt_event_engine_signals_start(engine); 427 } 428 429 #if (NXT_THREADS) 430 /* 431 * Reset the PID flag to start the signal thread if 432 * some future event facility will not support signals. 433 */ 434 engine->signals->process = 0; 435 #endif 436 } 437 438 return NXT_OK; 439 } 440 441 442 void 443 nxt_event_engine_free(nxt_event_engine_t *engine) 444 { 445 nxt_event_engine_signal_pipe_free(engine); 446 nxt_free(engine->signals); 447 448 nxt_work_queue_cache_destroy(&engine->work_queue_cache); 449 450 engine->event->free(engine->event_set); 451 452 /* TODO: free timers */ 453 454 nxt_free(engine); 455 } 456 457 458 static nxt_work_handler_t 459 nxt_event_engine_queue_pop(nxt_event_engine_t *engine, nxt_task_t **task, 460 void **obj, void **data) 461 { 462 nxt_work_queue_t *wq; 463 464 wq = engine->current_work_queue; 465 466 if (wq->head == NULL) { 467 wq = &engine->fast_work_queue; 468 469 while (wq->head == NULL) { 470 engine->current_work_queue++; 471 wq = engine->current_work_queue; 472 473 if (wq > &engine->final_work_queue) { 474 engine->current_work_queue = &engine->fast_work_queue; 475 476 return NULL; 477 } 478 } 479 } 480 481 nxt_debug(&engine->task, "work queue: %s", wq->name); 482 483 return nxt_work_queue_pop(wq, task, obj, data); 484 } 485 486 487 void 488 nxt_event_engine_start(nxt_event_engine_t *engine) 489 { 490 void *obj, *data; 491 nxt_task_t *task; 492 nxt_msec_t timeout, now; 493 nxt_thread_t *thr; 494 nxt_work_handler_t handler; 495 496 thr = nxt_thread(); 497 498 if (engine->fibers) { 499 /* 500 * _setjmp() cannot be wrapped in a function since return from 501 * the function clobbers stack used by future _setjmp() returns. 502 */ 503 _setjmp(engine->fibers->fiber.jmp); 504 505 /* A return point from fibers. */ 506 } 507 508 thr->log = &nxt_main_log; 509 510 for ( ;; ) { 511 512 for ( ;; ) { 513 handler = nxt_event_engine_queue_pop(engine, &task, &obj, &data); 514 515 if (handler == NULL) { 516 break; 517 } 518 519 handler(task, obj, data); 520 } 521 522 /* Attach some event engine work queues in preferred order. */ 523 524 timeout = nxt_timer_find(engine); 525 526 engine->event->poll(&engine->task, engine->event_set, timeout); 527 528 /* 529 * Look up expired timers only if a new zero timer has been 530 * just added before the event poll or if the event poll slept 531 * at least 1 millisecond, because all old eligible timers were 532 * processed in the previous iterations. 533 */ 534 535 now = nxt_thread_monotonic_time(thr) / 1000000; 536 537 if (timeout == 0 || now != engine->timers.now) { 538 nxt_timer_expire(thr, now); 539 } 540 } 541 } 542