1 2 /* 3 * Copyright (C) Igor Sysoev 4 * Copyright (C) NGINX, Inc. 5 */ 6 7 #include <nxt_main.h> 8 9 10 static nxt_int_t nxt_event_engine_post_init(nxt_thread_t *thr, 11 nxt_event_engine_t *engine); 12 static nxt_int_t nxt_event_engine_signal_pipe_create(nxt_thread_t *thr, 13 nxt_event_engine_t *engine); 14 static void nxt_event_engine_signal_pipe_close(nxt_thread_t *thr, void *obj, 15 void *data); 16 static void nxt_event_engine_signal_pipe(nxt_thread_t *thr, void *obj, 17 void *data); 18 static void nxt_event_engine_post_handler(nxt_thread_t *thr, void *obj, 19 void *data); 20 static void nxt_event_engine_signal_pipe_error(nxt_thread_t *thr, void *obj, 21 void *data); 22 static void nxt_event_engine_signal_handler(nxt_thread_t *thr, void *obj, 23 void *data); 24 static const nxt_event_sig_t *nxt_event_engine_signal_find(nxt_thread_t *thr, 25 nxt_uint_t signo); 26 27 28 nxt_event_engine_t * 29 nxt_event_engine_create(nxt_thread_t *thr, const nxt_event_set_ops_t *event_set, 30 const nxt_event_sig_t *signals, nxt_uint_t flags, nxt_uint_t batch) 31 { 32 nxt_uint_t events; 33 nxt_event_engine_t *engine; 34 35 engine = nxt_zalloc(sizeof(nxt_event_engine_t)); 36 if (engine == NULL) { 37 return NULL; 38 } 39 40 engine->batch = batch; 41 42 if (flags & NXT_ENGINE_FIBERS) { 43 engine->fibers = nxt_fiber_main_create(engine); 44 if (engine->fibers == NULL) { 45 goto fibers_fail; 46 } 47 } 48 49 nxt_thread_work_queue_create(thr, 0); 50 51 nxt_work_queue_name(&engine->accept_work_queue, "accept"); 52 nxt_work_queue_name(&engine->read_work_queue, "read"); 53 nxt_work_queue_name(&engine->socket_work_queue, "socket"); 54 nxt_work_queue_name(&engine->connect_work_queue, "connect"); 55 nxt_work_queue_name(&engine->write_work_queue, "write"); 56 nxt_work_queue_name(&engine->shutdown_work_queue, "shutdown"); 57 nxt_work_queue_name(&engine->close_work_queue, "close"); 58 59 #if (NXT_THREADS) 60 61 nxt_locked_work_queue_create(&engine->work_queue, 7); 62 63 #endif 64 65 if (signals != NULL) { 66 engine->signals = nxt_event_engine_signals(signals); 67 if (engine->signals == NULL) { 68 goto signals_fail; 69 } 70 71 engine->signals->handler = nxt_event_engine_signal_handler; 72 73 if (!event_set->signal_support) { 74 if (nxt_event_engine_signals_start(engine) != NXT_OK) { 75 goto signals_fail; 76 } 77 } 78 } 79 80 /* 81 * Number of event set and timers changes should be at least twice 82 * more than number of events to avoid premature flushes of the changes. 83 * Fourfold is for sure. 84 */ 85 events = (batch != 0) ? batch : 32; 86 87 engine->event_set = event_set->create(engine->signals, 4 * events, events); 88 if (engine->event_set == NULL) { 89 goto event_set_fail; 90 } 91 92 engine->event = event_set; 93 94 if (nxt_event_engine_post_init(thr, engine) != NXT_OK) { 95 goto post_fail; 96 } 97 98 if (nxt_event_timers_init(&engine->timers, 4 * events) != NXT_OK) { 99 goto timers_fail; 100 } 101 102 nxt_thread_time_update(thr); 103 engine->timers.now = nxt_thread_monotonic_time(thr) / 1000000; 104 105 engine->max_connections = 0xffffffff; 106 107 nxt_queue_init(&engine->listen_connections); 108 nxt_queue_init(&engine->idle_connections); 109 110 thr->engine = engine; 111 thr->fiber = &engine->fibers->fiber; 112 113 #if !(NXT_THREADS) 114 115 if (engine->event->signal_support) { 116 thr->time.signal = -1; 117 } 118 119 #endif 120 121 return engine; 122 123 timers_fail: 124 post_fail: 125 126 event_set->free(engine->event_set); 127 128 event_set_fail: 129 signals_fail: 130 131 nxt_free(engine->signals); 132 nxt_thread_work_queue_destroy(thr); 133 nxt_free(engine->fibers); 134 135 fibers_fail: 136 137 nxt_free(engine); 138 return NULL; 139 } 140 141 142 static nxt_int_t 143 nxt_event_engine_post_init(nxt_thread_t *thr, nxt_event_engine_t *engine) 144 { 145 if (engine->event->enable_post != NULL) { 146 return engine->event->enable_post(engine->event_set, 147 nxt_event_engine_post_handler); 148 } 149 150 #if !(NXT_THREADS) 151 152 /* Only signals may are posted in single-threaded mode. */ 153 154 if (engine->event->signal_support) { 155 return NXT_OK; 156 } 157 158 #endif 159 160 if (nxt_event_engine_signal_pipe_create(thr, engine) != NXT_OK) { 161 return NXT_ERROR; 162 } 163 164 return NXT_OK; 165 } 166 167 168 static nxt_int_t 169 nxt_event_engine_signal_pipe_create(nxt_thread_t *thr, 170 nxt_event_engine_t *engine) 171 { 172 nxt_event_engine_pipe_t *pipe; 173 174 pipe = nxt_zalloc(sizeof(nxt_event_engine_pipe_t)); 175 if (pipe == NULL) { 176 return NXT_ERROR; 177 } 178 179 engine->pipe = pipe; 180 181 /* 182 * An event engine pipe is in blocking mode for writer 183 * and in non-blocking node for reader. 184 */ 185 186 if (nxt_pipe_create(pipe->fds, 1, 0) != NXT_OK) { 187 nxt_free(pipe); 188 return NXT_ERROR; 189 } 190 191 pipe->event.fd = pipe->fds[0]; 192 pipe->event.read_work_queue = &thr->work_queue.main; 193 pipe->event.read_handler = nxt_event_engine_signal_pipe; 194 pipe->event.write_work_queue = &thr->work_queue.main; 195 pipe->event.error_handler = nxt_event_engine_signal_pipe_error; 196 pipe->event.log = &nxt_main_log; 197 198 nxt_event_fd_enable_read(engine, &pipe->event); 199 200 return NXT_OK; 201 } 202 203 204 static void 205 nxt_event_engine_signal_pipe_free(nxt_event_engine_t *engine) 206 { 207 nxt_event_engine_pipe_t *pipe; 208 209 pipe = engine->pipe; 210 211 if (pipe != NULL) { 212 213 if (pipe->event.read_work_queue != NULL) { 214 nxt_event_fd_close(engine, &pipe->event); 215 nxt_pipe_close(pipe->fds); 216 } 217 218 nxt_free(pipe); 219 } 220 } 221 222 223 static void 224 nxt_event_engine_signal_pipe_close(nxt_thread_t *thr, void *obj, void *data) 225 { 226 nxt_event_engine_pipe_t *pipe; 227 228 pipe = obj; 229 230 nxt_pipe_close(pipe->fds); 231 nxt_free(pipe); 232 } 233 234 235 void 236 nxt_event_engine_post(nxt_event_engine_t *engine, nxt_work_handler_t handler, 237 void *obj, void *data, nxt_log_t *log) 238 { 239 nxt_thread_log_debug("event engine post"); 240 241 nxt_locked_work_queue_add(&engine->work_queue, handler, obj, data, log); 242 243 nxt_event_engine_signal(engine, 0); 244 } 245 246 247 void 248 nxt_event_engine_signal(nxt_event_engine_t *engine, nxt_uint_t signo) 249 { 250 u_char buf; 251 252 nxt_thread_log_debug("event engine signal:%ui", signo); 253 254 /* 255 * A signal number may be sent in a signal context, so the signal 256 * information cannot be passed via a locked work queue. 257 */ 258 259 if (engine->event->signal != NULL) { 260 engine->event->signal(engine->event_set, signo); 261 return; 262 } 263 264 buf = (u_char) signo; 265 (void) nxt_fd_write(engine->pipe->fds[1], &buf, 1); 266 } 267 268 269 static void 270 nxt_event_engine_signal_pipe(nxt_thread_t *thr, void *obj, void *data) 271 { 272 int i, n; 273 u_char signo; 274 nxt_bool_t post; 275 nxt_event_fd_t *ev; 276 const nxt_event_sig_t *sigev; 277 u_char buf[128]; 278 279 ev = obj; 280 281 nxt_log_debug(thr->log, "engine signal pipe"); 282 283 post = 0; 284 285 do { 286 n = nxt_fd_read(ev->fd, buf, sizeof(buf)); 287 288 for (i = 0; i < n; i++) { 289 signo = buf[i]; 290 291 nxt_log_debug(thr->log, "engine pipe signo:%d", signo); 292 293 if (signo == 0) { 294 /* A post should be processed only once. */ 295 post = 1; 296 297 } else { 298 sigev = nxt_event_engine_signal_find(thr, signo); 299 300 if (nxt_fast_path(sigev != NULL)) { 301 sigev->handler(thr, (void *) (uintptr_t) signo, 302 (void *) sigev->name); 303 } 304 } 305 } 306 307 } while (n == sizeof(buf)); 308 309 if (post) { 310 nxt_event_engine_post_handler(thr, NULL, NULL); 311 } 312 } 313 314 315 static void 316 nxt_event_engine_post_handler(nxt_thread_t *thr, void *obj, void *data) 317 { 318 nxt_locked_work_queue_move(thr, &thr->engine->work_queue, 319 &thr->work_queue.main); 320 } 321 322 323 static void 324 nxt_event_engine_signal_pipe_error(nxt_thread_t *thr, void *obj, void *data) 325 { 326 nxt_event_fd_t *ev; 327 328 ev = obj; 329 330 nxt_log_alert(ev->log, "engine pipe(%FD:%FD) event error", 331 thr->engine->pipe->fds[0], thr->engine->pipe->fds[1]); 332 333 nxt_event_fd_close(thr->engine, &thr->engine->pipe->event); 334 nxt_pipe_close(thr->engine->pipe->fds); 335 } 336 337 338 static void 339 nxt_event_engine_signal_handler(nxt_thread_t *thr, void *obj, void *data) 340 { 341 uintptr_t signo; 342 const nxt_event_sig_t *sigev; 343 344 signo = (uintptr_t) obj; 345 346 sigev = nxt_event_engine_signal_find(thr, signo); 347 348 if (nxt_fast_path(sigev != NULL)) { 349 sigev->handler(thr, (void *) (uintptr_t) signo, (void *) sigev->name); 350 } 351 } 352 353 354 static const nxt_event_sig_t * 355 nxt_event_engine_signal_find(nxt_thread_t *thr, nxt_uint_t signo) 356 { 357 const nxt_event_sig_t *sigev; 358 359 for (sigev = thr->engine->signals->sigev; sigev->signo != 0; sigev++) { 360 if (signo == (nxt_uint_t) sigev->signo) { 361 return sigev; 362 } 363 } 364 365 nxt_log_alert(thr->log, "signal %ui handler not found", signo); 366 367 return NULL; 368 } 369 370 371 nxt_int_t 372 nxt_event_engine_change(nxt_thread_t *thr, const nxt_event_set_ops_t *event_set, 373 nxt_uint_t batch) 374 { 375 nxt_uint_t events; 376 nxt_event_engine_t *engine; 377 378 engine = thr->engine; 379 engine->batch = batch; 380 381 if (!engine->event->signal_support && event_set->signal_support) { 382 /* 383 * Block signal processing if the current event 384 * facility does not support signal processing. 385 */ 386 nxt_event_engine_signals_stop(engine); 387 388 /* 389 * Add to thread main work queue the signal events possibly 390 * received before the blocking signal processing. 391 */ 392 nxt_event_engine_signal_pipe(thr, &engine->pipe->event, NULL); 393 } 394 395 if (engine->pipe != NULL && event_set->enable_post != NULL) { 396 /* 397 * An engine pipe must be closed after all signal events 398 * added above to thread main work queue will be processed. 399 */ 400 nxt_thread_work_queue_add(thr, &thr->work_queue.main, 401 nxt_event_engine_signal_pipe_close, 402 engine->pipe, NULL, &nxt_main_log); 403 404 engine->pipe = NULL; 405 } 406 407 engine->event->free(engine->event_set); 408 409 events = (batch != 0) ? batch : 32; 410 411 engine->event_set = event_set->create(engine->signals, 4 * events, events); 412 if (engine->event_set == NULL) { 413 return NXT_ERROR; 414 } 415 416 engine->event = event_set; 417 418 if (nxt_event_engine_post_init(thr, engine) != NXT_OK) { 419 return NXT_ERROR; 420 } 421 422 if (engine->signals != NULL) { 423 424 if (!engine->event->signal_support) { 425 return nxt_event_engine_signals_start(engine); 426 } 427 428 #if (NXT_THREADS) 429 /* 430 * Reset the PID flag to start the signal thread if 431 * some future event facility will not support signals. 432 */ 433 engine->signals->process = 0; 434 #endif 435 } 436 437 return NXT_OK; 438 } 439 440 441 void 442 nxt_event_engine_free(nxt_event_engine_t *engine) 443 { 444 nxt_event_engine_signal_pipe_free(engine); 445 nxt_free(engine->signals); 446 447 nxt_locked_work_queue_destroy(&engine->work_queue); 448 nxt_thread_work_queue_destroy(nxt_thread()); 449 450 engine->event->free(engine->event_set); 451 452 /* TODO: free timers */ 453 454 nxt_free(engine); 455 } 456 457 458 void 459 nxt_event_engine_start(nxt_event_engine_t *engine) 460 { 461 void *obj, *data; 462 nxt_msec_t timeout, now; 463 nxt_thread_t *thr; 464 nxt_work_handler_t handler; 465 466 thr = nxt_thread(); 467 468 if (engine->fibers) { 469 /* 470 * _setjmp() cannot be wrapped in a function since return from 471 * the function clobbers stack used by future _setjmp() returns. 472 */ 473 _setjmp(engine->fibers->fiber.jmp); 474 475 /* A return point from fibers. */ 476 } 477 478 for ( ;; ) { 479 480 for ( ;; ) { 481 handler = nxt_thread_work_queue_pop(thr, &obj, &data, &thr->log); 482 483 if (handler == NULL) { 484 break; 485 } 486 487 handler(thr, obj, data); 488 489 thr->log = &nxt_main_log; 490 } 491 492 for ( ;; ) { 493 handler = nxt_thread_last_work_queue_pop(thr, &obj, &data, 494 &thr->log); 495 if (handler == NULL) { 496 break; 497 } 498 499 handler(thr, obj, data); 500 501 thr->log = &nxt_main_log; 502 } 503 504 /* Attach some event engine work queues in preferred order. */ 505 506 nxt_work_queue_attach(thr, &engine->accept_work_queue); 507 nxt_work_queue_attach(thr, &engine->read_work_queue); 508 509 timeout = nxt_event_timer_find(engine); 510 511 engine->event->poll(thr, engine->event_set, timeout); 512 513 /* 514 * Look up expired timers only if a new zero timer has been 515 * just added before the event poll or if the event poll slept 516 * at least 1 millisecond, because all old eligible timers were 517 * processed in the previous iterations. 518 */ 519 520 now = nxt_thread_monotonic_time(thr) / 1000000; 521 522 if (timeout == 0 || now != engine->timers.now) { 523 nxt_event_timer_expire(thr, now); 524 } 525 } 526 } 527