1 2 /* 3 * Copyright (C) Igor Sysoev 4 * Copyright (C) NGINX, Inc. 5 */ 6 7 #include <nxt_main.h> 8 9 10 static nxt_int_t nxt_event_engine_post_init(nxt_event_engine_t *engine); 11 static nxt_int_t nxt_event_engine_signal_pipe_create( 12 nxt_event_engine_t *engine); 13 static void nxt_event_engine_signal_pipe_close(nxt_task_t *task, void *obj, 14 void *data); 15 static void nxt_event_engine_signal_pipe(nxt_task_t *task, void *obj, 16 void *data); 17 static void nxt_event_engine_post_handler(nxt_task_t *task, void *obj, 18 void *data); 19 static void nxt_event_engine_signal_pipe_error(nxt_task_t *task, void *obj, 20 void *data); 21 static void nxt_event_engine_signal_handler(nxt_task_t *task, void *obj, 22 void *data); 23 static nxt_work_handler_t nxt_event_engine_queue_pop(nxt_event_engine_t *engine, 24 nxt_task_t **task, void **obj, void **data); 25 26 27 nxt_event_engine_t * 28 nxt_event_engine_create(nxt_thread_t *thr, 29 const nxt_event_interface_t *interface, const nxt_sig_event_t *signals, 30 nxt_uint_t flags, nxt_uint_t batch) 31 { 32 nxt_uint_t events; 33 nxt_event_engine_t *engine; 34 35 engine = nxt_zalloc(sizeof(nxt_event_engine_t)); 36 if (engine == NULL) { 37 return NULL; 38 } 39 40 engine->task.thread = thr; 41 engine->task.log = thr->log; 42 engine->task.ident = nxt_task_next_ident(); 43 44 thr->engine = engine; 45 thr->fiber = &engine->fibers->fiber; 46 47 engine->batch0 = batch; 48 49 if (flags & NXT_ENGINE_FIBERS) { 50 engine->fibers = nxt_fiber_main_create(engine); 51 if (engine->fibers == NULL) { 52 goto fibers_fail; 53 } 54 } 55 56 engine->current_work_queue = &engine->fast_work_queue; 57 58 nxt_work_queue_cache_create(&engine->work_queue_cache, 0); 59 60 engine->fast_work_queue.cache = &engine->work_queue_cache; 61 engine->accept_work_queue.cache = &engine->work_queue_cache; 62 engine->read_work_queue.cache = &engine->work_queue_cache; 63 engine->socket_work_queue.cache = &engine->work_queue_cache; 64 engine->connect_work_queue.cache = &engine->work_queue_cache; 65 engine->write_work_queue.cache = &engine->work_queue_cache; 66 engine->shutdown_work_queue.cache = &engine->work_queue_cache; 67 engine->close_work_queue.cache = &engine->work_queue_cache; 68 69 nxt_work_queue_name(&engine->fast_work_queue, "fast"); 70 nxt_work_queue_name(&engine->accept_work_queue, "accept"); 71 nxt_work_queue_name(&engine->read_work_queue, "read"); 72 nxt_work_queue_name(&engine->socket_work_queue, "socket"); 73 nxt_work_queue_name(&engine->connect_work_queue, "connect"); 74 nxt_work_queue_name(&engine->write_work_queue, "write"); 75 nxt_work_queue_name(&engine->shutdown_work_queue, "shutdown"); 76 nxt_work_queue_name(&engine->close_work_queue, "close"); 77 78 if (signals != NULL) { 79 engine->signals = nxt_event_engine_signals(signals); 80 if (engine->signals == NULL) { 81 goto signals_fail; 82 } 83 84 engine->signals->handler = nxt_event_engine_signal_handler; 85 86 if (!interface->signal_support) { 87 if (nxt_event_engine_signals_start(engine) != NXT_OK) { 88 goto signals_fail; 89 } 90 } 91 } 92 93 /* 94 * Number of event set and timers changes should be at least twice 95 * more than number of events to avoid premature flushes of the changes. 96 * Fourfold is for sure. 97 */ 98 events = (batch != 0) ? batch : 32; 99 100 if (interface->create(engine, 4 * events, events) != NXT_OK) { 101 goto event_set_fail; 102 } 103 104 engine->event = *interface; 105 106 if (nxt_event_engine_post_init(engine) != NXT_OK) { 107 goto post_fail; 108 } 109 110 if (nxt_timers_init(&engine->timers, 4 * events) != NXT_OK) { 111 goto timers_fail; 112 } 113 114 nxt_thread_time_update(thr); 115 engine->timers.now = nxt_thread_monotonic_time(thr) / 1000000; 116 117 engine->max_connections = 0xffffffff; 118 119 nxt_queue_init(&engine->listen_connections); 120 nxt_queue_init(&engine->idle_connections); 121 122 #if !(NXT_THREADS) 123 124 if (interface->signal_support) { 125 thr->time.signal = -1; 126 } 127 128 #endif 129 130 return engine; 131 132 timers_fail: 133 post_fail: 134 135 interface->free(engine); 136 137 event_set_fail: 138 signals_fail: 139 140 nxt_free(engine->signals); 141 nxt_work_queue_cache_destroy(&engine->work_queue_cache); 142 nxt_free(engine->fibers); 143 144 fibers_fail: 145 146 nxt_free(engine); 147 return NULL; 148 } 149 150 151 static nxt_int_t 152 nxt_event_engine_post_init(nxt_event_engine_t *engine) 153 { 154 if (engine->event.enable_post != NULL) { 155 return engine->event.enable_post(engine, nxt_event_engine_post_handler); 156 } 157 158 #if !(NXT_THREADS) 159 160 /* Only signals may are posted in single-threaded mode. */ 161 162 if (engine->event->signal_support) { 163 return NXT_OK; 164 } 165 166 #endif 167 168 if (nxt_event_engine_signal_pipe_create(engine) != NXT_OK) { 169 return NXT_ERROR; 170 } 171 172 return NXT_OK; 173 } 174 175 176 static nxt_int_t 177 nxt_event_engine_signal_pipe_create(nxt_event_engine_t *engine) 178 { 179 nxt_event_engine_pipe_t *pipe; 180 181 pipe = nxt_zalloc(sizeof(nxt_event_engine_pipe_t)); 182 if (pipe == NULL) { 183 return NXT_ERROR; 184 } 185 186 engine->pipe = pipe; 187 188 /* 189 * An event engine pipe is in blocking mode for writer 190 * and in non-blocking node for reader. 191 */ 192 193 if (nxt_pipe_create(&engine->task, pipe->fds, 1, 0) != NXT_OK) { 194 nxt_free(pipe); 195 return NXT_ERROR; 196 } 197 198 pipe->event.fd = pipe->fds[0]; 199 pipe->event.task = &engine->task; 200 pipe->event.read_work_queue = &engine->fast_work_queue; 201 pipe->event.read_handler = nxt_event_engine_signal_pipe; 202 pipe->event.write_work_queue = &engine->fast_work_queue; 203 pipe->event.error_handler = nxt_event_engine_signal_pipe_error; 204 pipe->event.log = engine->task.log; 205 206 nxt_fd_event_enable_read(engine, &pipe->event); 207 208 return NXT_OK; 209 } 210 211 212 static void 213 nxt_event_engine_signal_pipe_free(nxt_event_engine_t *engine) 214 { 215 nxt_event_engine_pipe_t *pipe; 216 217 pipe = engine->pipe; 218 219 if (pipe != NULL) { 220 221 if (pipe->event.read_work_queue != NULL) { 222 nxt_fd_event_close(engine, &pipe->event); 223 nxt_pipe_close(pipe->event.task, pipe->fds); 224 } 225 226 nxt_free(pipe); 227 } 228 } 229 230 231 static void 232 nxt_event_engine_signal_pipe_close(nxt_task_t *task, void *obj, void *data) 233 { 234 nxt_event_engine_pipe_t *pipe; 235 236 pipe = obj; 237 238 nxt_pipe_close(pipe->event.task, pipe->fds); 239 nxt_free(pipe); 240 } 241 242 243 void 244 nxt_event_engine_post(nxt_event_engine_t *engine, nxt_work_t *work) 245 { 246 nxt_debug(&engine->task, "event engine post"); 247 248 nxt_locked_work_queue_add(&engine->locked_work_queue, work); 249 250 nxt_event_engine_signal(engine, 0); 251 } 252 253 254 void 255 nxt_event_engine_signal(nxt_event_engine_t *engine, nxt_uint_t signo) 256 { 257 u_char buf; 258 259 nxt_debug(&engine->task, "event engine signal:%ui", signo); 260 261 /* 262 * A signal number may be sent in a signal context, so the signal 263 * information cannot be passed via a locked work queue. 264 */ 265 266 if (engine->event.signal != NULL) { 267 engine->event.signal(engine, signo); 268 return; 269 } 270 271 buf = (u_char) signo; 272 (void) nxt_fd_write(engine->pipe->fds[1], &buf, 1); 273 } 274 275 276 static void 277 nxt_event_engine_signal_pipe(nxt_task_t *task, void *obj, void *data) 278 { 279 int i, n; 280 u_char signo; 281 nxt_bool_t post; 282 nxt_fd_event_t *ev; 283 u_char buf[128]; 284 285 ev = obj; 286 287 nxt_debug(task, "engine signal pipe"); 288 289 post = 0; 290 291 do { 292 n = nxt_fd_read(ev->fd, buf, sizeof(buf)); 293 294 for (i = 0; i < n; i++) { 295 signo = buf[i]; 296 297 nxt_debug(task, "engine pipe signo:%d", signo); 298 299 if (signo == 0) { 300 /* A post should be processed only once. */ 301 post = 1; 302 303 } else { 304 nxt_event_engine_signal_handler(task, 305 (void *) (uintptr_t) signo, NULL); 306 } 307 } 308 309 } while (n == sizeof(buf)); 310 311 if (post) { 312 nxt_event_engine_post_handler(task, NULL, NULL); 313 } 314 } 315 316 317 static void 318 nxt_event_engine_post_handler(nxt_task_t *task, void *obj, void *data) 319 { 320 nxt_thread_t *thread; 321 nxt_event_engine_t *engine; 322 323 thread = task->thread; 324 engine = thread->engine; 325 326 nxt_locked_work_queue_move(thread, &engine->locked_work_queue, 327 &engine->fast_work_queue); 328 } 329 330 331 static void 332 nxt_event_engine_signal_pipe_error(nxt_task_t *task, void *obj, void *data) 333 { 334 nxt_event_engine_t *engine; 335 nxt_event_engine_pipe_t *pipe; 336 337 engine = task->thread->engine; 338 pipe = engine->pipe; 339 340 nxt_log(task, NXT_LOG_CRIT, "engine pipe(%FD:%FD) event error", 341 pipe->fds[0], pipe->fds[1]); 342 343 nxt_fd_event_close(engine, &pipe->event); 344 nxt_pipe_close(pipe->event.task, pipe->fds); 345 } 346 347 348 static void 349 nxt_event_engine_signal_handler(nxt_task_t *task, void *obj, void *data) 350 { 351 uintptr_t signo; 352 const nxt_sig_event_t *sigev; 353 354 signo = (uintptr_t) obj; 355 356 for (sigev = task->thread->engine->signals->sigev; 357 sigev->signo != 0; 358 sigev++) 359 { 360 if (signo == (nxt_uint_t) sigev->signo) { 361 sigev->handler(task, (void *) signo, (void *) sigev->name); 362 return; 363 } 364 } 365 366 nxt_log(task, NXT_LOG_CRIT, "signal %ui handler not found", signo); 367 } 368 369 370 nxt_int_t 371 nxt_event_engine_change(nxt_thread_t *thr, nxt_task_t *task, 372 const nxt_event_interface_t *interface, nxt_uint_t batch) 373 { 374 nxt_uint_t events; 375 nxt_event_engine_t *engine; 376 377 engine = thr->engine; 378 engine->batch0 = batch; 379 380 if (!engine->event.signal_support && interface->signal_support) { 381 /* 382 * Block signal processing if the current event 383 * facility does not support signal processing. 384 */ 385 nxt_event_engine_signals_stop(engine); 386 387 /* 388 * Add to engine fast work queue the signal events possibly 389 * received before the blocking signal processing. 390 */ 391 nxt_event_engine_signal_pipe(task, &engine->pipe->event, NULL); 392 } 393 394 if (engine->pipe != NULL && interface->enable_post != NULL) { 395 /* 396 * An engine pipe must be closed after all signal events 397 * added above to engine fast work queue will be processed. 398 */ 399 nxt_work_queue_add(&engine->fast_work_queue, 400 nxt_event_engine_signal_pipe_close, 401 &engine->task, engine->pipe, NULL); 402 403 engine->pipe = NULL; 404 } 405 406 engine->event.free(engine); 407 408 events = (batch != 0) ? batch : 32; 409 410 if (interface->create(engine, 4 * events, events) != NXT_OK) { 411 return NXT_ERROR; 412 } 413 414 engine->event = *interface; 415 416 if (nxt_event_engine_post_init(engine) != NXT_OK) { 417 return NXT_ERROR; 418 } 419 420 if (engine->signals != NULL) { 421 422 if (!engine->event.signal_support) { 423 return nxt_event_engine_signals_start(engine); 424 } 425 426 #if (NXT_THREADS) 427 /* 428 * Reset the PID flag to start the signal thread if 429 * some future event facility will not support signals. 430 */ 431 engine->signals->process = 0; 432 #endif 433 } 434 435 return NXT_OK; 436 } 437 438 439 void 440 nxt_event_engine_free(nxt_event_engine_t *engine) 441 { 442 nxt_event_engine_signal_pipe_free(engine); 443 nxt_free(engine->signals); 444 445 nxt_work_queue_cache_destroy(&engine->work_queue_cache); 446 447 engine->event.free(engine); 448 449 /* TODO: free timers */ 450 451 nxt_free(engine); 452 } 453 454 455 static nxt_work_handler_t 456 nxt_event_engine_queue_pop(nxt_event_engine_t *engine, nxt_task_t **task, 457 void **obj, void **data) 458 { 459 nxt_work_queue_t *wq, *last; 460 461 wq = engine->current_work_queue; 462 last = wq; 463 464 if (wq->head == NULL) { 465 wq = &engine->fast_work_queue; 466 467 if (wq->head == NULL) { 468 469 do { 470 engine->current_work_queue++; 471 wq = engine->current_work_queue; 472 473 if (wq > &engine->close_work_queue) { 474 wq = &engine->fast_work_queue; 475 engine->current_work_queue = wq; 476 } 477 478 if (wq->head != NULL) { 479 goto found; 480 } 481 482 } while (wq != last); 483 484 engine->current_work_queue = &engine->fast_work_queue; 485 486 return NULL; 487 } 488 } 489 490 found: 491 492 nxt_debug(&engine->task, "work queue: %s", wq->name); 493 494 return nxt_work_queue_pop(wq, task, obj, data); 495 } 496 497 498 void 499 nxt_event_engine_start(nxt_event_engine_t *engine) 500 { 501 void *obj, *data; 502 nxt_task_t *task; 503 nxt_msec_t timeout, now; 504 nxt_thread_t *thr; 505 nxt_work_handler_t handler; 506 507 thr = nxt_thread(); 508 509 if (engine->fibers) { 510 /* 511 * _setjmp() cannot be wrapped in a function since return from 512 * the function clobbers stack used by future _setjmp() returns. 513 */ 514 _setjmp(engine->fibers->fiber.jmp); 515 516 /* A return point from fibers. */ 517 } 518 519 thr->log = engine->task.log; 520 521 for ( ;; ) { 522 523 for ( ;; ) { 524 handler = nxt_event_engine_queue_pop(engine, &task, &obj, &data); 525 526 if (handler == NULL) { 527 break; 528 } 529 530 handler(task, obj, data); 531 } 532 533 /* Attach some event engine work queues in preferred order. */ 534 535 timeout = nxt_timer_find(engine); 536 537 engine->event.poll(engine, timeout); 538 539 now = nxt_thread_monotonic_time(thr) / 1000000; 540 541 nxt_timer_expire(engine, now); 542 } 543 } 544