1 2 /* 3 * Copyright (C) Igor Sysoev 4 * Copyright (C) NGINX, Inc. 5 */ 6 7 #include <nxt_main.h> 8 9 10 /* 11 * The first epoll version has been introduced in Linux 2.5.44. The 12 * interface was changed several times since then and the final version 13 * of epoll_create(), epoll_ctl(), epoll_wait(), and EPOLLET mode has 14 * been introduced in Linux 2.6.0 and is supported since glibc 2.3.2. 15 * 16 * EPOLLET mode did not work reliable in early implementaions and in 17 * Linux 2.4 backport. 18 * 19 * EPOLLONESHOT Linux 2.6.2, glibc 2.3. 20 * EPOLLRDHUP Linux 2.6.17, glibc 2.8. 21 * epoll_pwait() Linux 2.6.19, glibc 2.6. 22 * signalfd() Linux 2.6.22, glibc 2.7. 23 * eventfd() Linux 2.6.22, glibc 2.7. 24 * timerfd_create() Linux 2.6.25, glibc 2.8. 25 * epoll_create1() Linux 2.6.27, glibc 2.9. 26 * signalfd4() Linux 2.6.27, glibc 2.9. 27 * eventfd2() Linux 2.6.27, glibc 2.9. 28 * accept4() Linux 2.6.28, glibc 2.10. 29 * eventfd2(EFD_SEMAPHORE) Linux 2.6.30, glibc 2.10. 30 * EPOLLEXCLUSIVE Linux 4.5. 31 */ 32 33 34 #if (NXT_HAVE_EPOLL_EDGE) 35 static nxt_int_t nxt_epoll_edge_create(nxt_event_engine_t *engine, 36 nxt_uint_t mchanges, nxt_uint_t mevents); 37 #endif 38 static nxt_int_t nxt_epoll_level_create(nxt_event_engine_t *engine, 39 nxt_uint_t mchanges, nxt_uint_t mevents); 40 static nxt_int_t nxt_epoll_create(nxt_event_engine_t *engine, 41 nxt_uint_t mchanges, nxt_uint_t mevents, nxt_event_conn_io_t *io, 42 uint32_t mode); 43 static void nxt_epoll_test_accept4(nxt_event_engine_t *engine, 44 nxt_event_conn_io_t *io); 45 static void nxt_epoll_free(nxt_event_engine_t *engine); 46 static void nxt_epoll_enable(nxt_event_engine_t *engine, nxt_fd_event_t *ev); 47 static void nxt_epoll_disable(nxt_event_engine_t *engine, nxt_fd_event_t *ev); 48 static void nxt_epoll_delete(nxt_event_engine_t *engine, nxt_fd_event_t *ev); 49 static nxt_bool_t nxt_epoll_close(nxt_event_engine_t *engine, 50 nxt_fd_event_t *ev); 51 static void nxt_epoll_enable_read(nxt_event_engine_t *engine, 52 nxt_fd_event_t *ev); 53 static void nxt_epoll_enable_write(nxt_event_engine_t *engine, 54 nxt_fd_event_t *ev); 55 static void nxt_epoll_disable_read(nxt_event_engine_t *engine, 56 nxt_fd_event_t *ev); 57 static void nxt_epoll_disable_write(nxt_event_engine_t *engine, 58 nxt_fd_event_t *ev); 59 static void nxt_epoll_block_read(nxt_event_engine_t *engine, 60 nxt_fd_event_t *ev); 61 static void nxt_epoll_block_write(nxt_event_engine_t *engine, 62 nxt_fd_event_t *ev); 63 static void nxt_epoll_oneshot_read(nxt_event_engine_t *engine, 64 nxt_fd_event_t *ev); 65 static void nxt_epoll_oneshot_write(nxt_event_engine_t *engine, 66 nxt_fd_event_t *ev); 67 static void nxt_epoll_enable_accept(nxt_event_engine_t *engine, 68 nxt_fd_event_t *ev); 69 static void nxt_epoll_change(nxt_event_engine_t *engine, nxt_fd_event_t *ev, 70 int op, uint32_t events); 71 static nxt_int_t nxt_epoll_commit_changes(nxt_event_engine_t *engine); 72 static void nxt_epoll_error_handler(nxt_task_t *task, void *obj, void *data); 73 #if (NXT_HAVE_SIGNALFD) 74 static nxt_int_t nxt_epoll_add_signal(nxt_event_engine_t *engine); 75 static void nxt_epoll_signalfd_handler(nxt_task_t *task, void *obj, void *data); 76 #endif 77 #if (NXT_HAVE_EVENTFD) 78 static nxt_int_t nxt_epoll_enable_post(nxt_event_engine_t *engine, 79 nxt_work_handler_t handler); 80 static void nxt_epoll_eventfd_handler(nxt_task_t *task, void *obj, void *data); 81 static void nxt_epoll_signal(nxt_event_engine_t *engine, nxt_uint_t signo); 82 #endif 83 static void nxt_epoll_poll(nxt_event_engine_t *engine, nxt_msec_t timeout); 84 85 #if (NXT_HAVE_ACCEPT4) 86 static void nxt_epoll_event_conn_io_accept4(nxt_task_t *task, void *obj, 87 void *data); 88 #endif 89 90 91 #if (NXT_HAVE_EPOLL_EDGE) 92 93 static void nxt_epoll_edge_event_conn_io_connect(nxt_task_t *task, void *obj, 94 void *data); 95 static void nxt_epoll_edge_event_conn_connected(nxt_task_t *task, void *obj, 96 void *data); 97 static ssize_t nxt_epoll_edge_event_conn_io_recvbuf(nxt_event_conn_t *c, 98 nxt_buf_t *b); 99 100 101 static nxt_event_conn_io_t nxt_epoll_edge_event_conn_io = { 102 nxt_epoll_edge_event_conn_io_connect, 103 nxt_event_conn_io_accept, 104 105 nxt_event_conn_io_read, 106 nxt_epoll_edge_event_conn_io_recvbuf, 107 nxt_event_conn_io_recv, 108 109 nxt_conn_io_write, 110 nxt_event_conn_io_write_chunk, 111 112 #if (NXT_HAVE_LINUX_SENDFILE) 113 nxt_linux_event_conn_io_sendfile, 114 #else 115 nxt_event_conn_io_sendbuf, 116 #endif 117 118 nxt_event_conn_io_writev, 119 nxt_event_conn_io_send, 120 121 nxt_event_conn_io_shutdown, 122 }; 123 124 125 const nxt_event_interface_t nxt_epoll_edge_engine = { 126 "epoll_edge", 127 nxt_epoll_edge_create, 128 nxt_epoll_free, 129 nxt_epoll_enable, 130 nxt_epoll_disable, 131 nxt_epoll_delete, 132 nxt_epoll_close, 133 nxt_epoll_enable_read, 134 nxt_epoll_enable_write, 135 nxt_epoll_disable_read, 136 nxt_epoll_disable_write, 137 nxt_epoll_block_read, 138 nxt_epoll_block_write, 139 nxt_epoll_oneshot_read, 140 nxt_epoll_oneshot_write, 141 nxt_epoll_enable_accept, 142 NULL, 143 NULL, 144 #if (NXT_HAVE_EVENTFD) 145 nxt_epoll_enable_post, 146 nxt_epoll_signal, 147 #else 148 NULL, 149 NULL, 150 #endif 151 nxt_epoll_poll, 152 153 &nxt_epoll_edge_event_conn_io, 154 155 #if (NXT_HAVE_INOTIFY) 156 NXT_FILE_EVENTS, 157 #else 158 NXT_NO_FILE_EVENTS, 159 #endif 160 161 #if (NXT_HAVE_SIGNALFD) 162 NXT_SIGNAL_EVENTS, 163 #else 164 NXT_NO_SIGNAL_EVENTS, 165 #endif 166 }; 167 168 #endif 169 170 171 const nxt_event_interface_t nxt_epoll_level_engine = { 172 "epoll_level", 173 nxt_epoll_level_create, 174 nxt_epoll_free, 175 nxt_epoll_enable, 176 nxt_epoll_disable, 177 nxt_epoll_delete, 178 nxt_epoll_close, 179 nxt_epoll_enable_read, 180 nxt_epoll_enable_write, 181 nxt_epoll_disable_read, 182 nxt_epoll_disable_write, 183 nxt_epoll_block_read, 184 nxt_epoll_block_write, 185 nxt_epoll_oneshot_read, 186 nxt_epoll_oneshot_write, 187 nxt_epoll_enable_accept, 188 NULL, 189 NULL, 190 #if (NXT_HAVE_EVENTFD) 191 nxt_epoll_enable_post, 192 nxt_epoll_signal, 193 #else 194 NULL, 195 NULL, 196 #endif 197 nxt_epoll_poll, 198 199 &nxt_unix_event_conn_io, 200 201 #if (NXT_HAVE_INOTIFY) 202 NXT_FILE_EVENTS, 203 #else 204 NXT_NO_FILE_EVENTS, 205 #endif 206 207 #if (NXT_HAVE_SIGNALFD) 208 NXT_SIGNAL_EVENTS, 209 #else 210 NXT_NO_SIGNAL_EVENTS, 211 #endif 212 }; 213 214 215 #if (NXT_HAVE_EPOLL_EDGE) 216 217 static nxt_int_t 218 nxt_epoll_edge_create(nxt_event_engine_t *engine, nxt_uint_t mchanges, 219 nxt_uint_t mevents) 220 { 221 return nxt_epoll_create(engine, mchanges, mevents, 222 &nxt_epoll_edge_event_conn_io, 223 EPOLLET | EPOLLRDHUP); 224 } 225 226 #endif 227 228 229 static nxt_int_t 230 nxt_epoll_level_create(nxt_event_engine_t *engine, nxt_uint_t mchanges, 231 nxt_uint_t mevents) 232 { 233 return nxt_epoll_create(engine, mchanges, mevents, 234 &nxt_unix_event_conn_io, 0); 235 } 236 237 238 static nxt_int_t 239 nxt_epoll_create(nxt_event_engine_t *engine, nxt_uint_t mchanges, 240 nxt_uint_t mevents, nxt_event_conn_io_t *io, uint32_t mode) 241 { 242 engine->u.epoll.fd = -1; 243 engine->u.epoll.mode = mode; 244 engine->u.epoll.mchanges = mchanges; 245 engine->u.epoll.mevents = mevents; 246 #if (NXT_HAVE_SIGNALFD) 247 engine->u.epoll.signalfd.fd = -1; 248 #endif 249 250 engine->u.epoll.changes = nxt_malloc(sizeof(nxt_epoll_change_t) * mchanges); 251 if (engine->u.epoll.changes == NULL) { 252 goto fail; 253 } 254 255 engine->u.epoll.events = nxt_malloc(sizeof(struct epoll_event) * mevents); 256 if (engine->u.epoll.events == NULL) { 257 goto fail; 258 } 259 260 engine->u.epoll.fd = epoll_create(1); 261 if (engine->u.epoll.fd == -1) { 262 nxt_log(&engine->task, NXT_LOG_CRIT, "epoll_create() failed %E", 263 nxt_errno); 264 goto fail; 265 } 266 267 nxt_debug(&engine->task, "epoll_create(): %d", engine->u.epoll.fd); 268 269 if (engine->signals != NULL) { 270 271 #if (NXT_HAVE_SIGNALFD) 272 273 if (nxt_epoll_add_signal(engine) != NXT_OK) { 274 goto fail; 275 } 276 277 #endif 278 279 nxt_epoll_test_accept4(engine, io); 280 } 281 282 return NXT_OK; 283 284 fail: 285 286 nxt_epoll_free(engine); 287 288 return NXT_ERROR; 289 } 290 291 292 static void 293 nxt_epoll_test_accept4(nxt_event_engine_t *engine, nxt_event_conn_io_t *io) 294 { 295 static nxt_work_handler_t handler; 296 297 if (handler == NULL) { 298 299 handler = io->accept; 300 301 #if (NXT_HAVE_ACCEPT4) 302 303 (void) accept4(-1, NULL, NULL, SOCK_NONBLOCK); 304 305 if (nxt_errno != NXT_ENOSYS) { 306 handler = nxt_epoll_event_conn_io_accept4; 307 308 } else { 309 nxt_log(&engine->task, NXT_LOG_INFO, "accept4() failed %E", 310 NXT_ENOSYS); 311 } 312 313 #endif 314 } 315 316 io->accept = handler; 317 } 318 319 320 static void 321 nxt_epoll_free(nxt_event_engine_t *engine) 322 { 323 int fd; 324 325 nxt_debug(&engine->task, "epoll %d free", engine->u.epoll.fd); 326 327 #if (NXT_HAVE_SIGNALFD) 328 329 fd = engine->u.epoll.signalfd.fd; 330 331 if (fd != -1 && close(fd) != 0) { 332 nxt_log(&engine->task, NXT_LOG_CRIT, "signalfd close(%d) failed %E", 333 fd, nxt_errno); 334 } 335 336 #endif 337 338 #if (NXT_HAVE_EVENTFD) 339 340 fd = engine->u.epoll.eventfd.fd; 341 342 if (fd != -1 && close(fd) != 0) { 343 nxt_log(&engine->task, NXT_LOG_CRIT, "eventfd close(%d) failed %E", 344 fd, nxt_errno); 345 } 346 347 #endif 348 349 fd = engine->u.epoll.fd; 350 351 if (fd != -1 && close(fd) != 0) { 352 nxt_log(&engine->task, NXT_LOG_CRIT, "epoll close(%d) failed %E", 353 fd, nxt_errno); 354 } 355 356 nxt_free(engine->u.epoll.events); 357 358 nxt_memzero(&engine->u.epoll, sizeof(nxt_epoll_engine_t)); 359 } 360 361 362 static void 363 nxt_epoll_enable(nxt_event_engine_t *engine, nxt_fd_event_t *ev) 364 { 365 ev->read = NXT_EVENT_ACTIVE; 366 ev->write = NXT_EVENT_ACTIVE; 367 368 nxt_epoll_change(engine, ev, EPOLL_CTL_ADD, 369 EPOLLIN | EPOLLOUT | engine->u.epoll.mode); 370 } 371 372 373 static void 374 nxt_epoll_disable(nxt_event_engine_t *engine, nxt_fd_event_t *ev) 375 { 376 if (ev->read > NXT_EVENT_DISABLED || ev->write > NXT_EVENT_DISABLED) { 377 378 ev->read = NXT_EVENT_INACTIVE; 379 ev->write = NXT_EVENT_INACTIVE; 380 381 nxt_epoll_change(engine, ev, EPOLL_CTL_DEL, 0); 382 } 383 } 384 385 386 static void 387 nxt_epoll_delete(nxt_event_engine_t *engine, nxt_fd_event_t *ev) 388 { 389 if (ev->read != NXT_EVENT_INACTIVE || ev->write != NXT_EVENT_INACTIVE) { 390 391 ev->read = NXT_EVENT_INACTIVE; 392 ev->write = NXT_EVENT_INACTIVE; 393 394 nxt_epoll_change(engine, ev, EPOLL_CTL_DEL, 0); 395 } 396 } 397 398 399 /* 400 * Although calling close() on a file descriptor will remove any epoll 401 * events that reference the descriptor, in this case the close() acquires 402 * the kernel global "epmutex" while epoll_ctl(EPOLL_CTL_DEL) does not 403 * acquire the "epmutex" since Linux 3.13 if the file descriptor presents 404 * only in one epoll set. Thus removing events explicitly before closing 405 * eliminates possible lock contention. 406 */ 407 408 static nxt_bool_t 409 nxt_epoll_close(nxt_event_engine_t *engine, nxt_fd_event_t *ev) 410 { 411 nxt_epoll_delete(engine, ev); 412 413 return ev->changing; 414 } 415 416 417 static void 418 nxt_epoll_enable_read(nxt_event_engine_t *engine, nxt_fd_event_t *ev) 419 { 420 int op; 421 uint32_t events; 422 423 if (ev->read != NXT_EVENT_BLOCKED) { 424 425 op = EPOLL_CTL_MOD; 426 events = EPOLLIN | engine->u.epoll.mode; 427 428 if (ev->read == NXT_EVENT_INACTIVE && ev->write == NXT_EVENT_INACTIVE) { 429 op = EPOLL_CTL_ADD; 430 431 } else if (ev->write >= NXT_EVENT_BLOCKED) { 432 events |= EPOLLOUT; 433 } 434 435 nxt_epoll_change(engine, ev, op, events); 436 } 437 438 ev->read = NXT_EVENT_ACTIVE; 439 } 440 441 442 static void 443 nxt_epoll_enable_write(nxt_event_engine_t *engine, nxt_fd_event_t *ev) 444 { 445 int op; 446 uint32_t events; 447 448 if (ev->write != NXT_EVENT_BLOCKED) { 449 450 op = EPOLL_CTL_MOD; 451 events = EPOLLOUT | engine->u.epoll.mode; 452 453 if (ev->read == NXT_EVENT_INACTIVE && ev->write == NXT_EVENT_INACTIVE) { 454 op = EPOLL_CTL_ADD; 455 456 } else if (ev->read >= NXT_EVENT_BLOCKED) { 457 events |= EPOLLIN; 458 } 459 460 nxt_epoll_change(engine, ev, op, events); 461 } 462 463 ev->write = NXT_EVENT_ACTIVE; 464 } 465 466 467 static void 468 nxt_epoll_disable_read(nxt_event_engine_t *engine, nxt_fd_event_t *ev) 469 { 470 int op; 471 uint32_t events; 472 473 ev->read = NXT_EVENT_INACTIVE; 474 475 if (ev->write <= NXT_EVENT_DISABLED) { 476 ev->write = NXT_EVENT_INACTIVE; 477 op = EPOLL_CTL_DEL; 478 events = 0; 479 480 } else { 481 op = EPOLL_CTL_MOD; 482 events = EPOLLOUT | engine->u.epoll.mode; 483 } 484 485 nxt_epoll_change(engine, ev, op, events); 486 } 487 488 489 static void 490 nxt_epoll_disable_write(nxt_event_engine_t *engine, nxt_fd_event_t *ev) 491 { 492 int op; 493 uint32_t events; 494 495 ev->write = NXT_EVENT_INACTIVE; 496 497 if (ev->read <= NXT_EVENT_DISABLED) { 498 ev->write = NXT_EVENT_INACTIVE; 499 op = EPOLL_CTL_DEL; 500 events = 0; 501 502 } else { 503 op = EPOLL_CTL_MOD; 504 events = EPOLLIN | engine->u.epoll.mode; 505 } 506 507 nxt_epoll_change(engine, ev, op, events); 508 } 509 510 511 static void 512 nxt_epoll_block_read(nxt_event_engine_t *engine, nxt_fd_event_t *ev) 513 { 514 if (ev->read != NXT_EVENT_INACTIVE) { 515 ev->read = NXT_EVENT_BLOCKED; 516 } 517 } 518 519 520 static void 521 nxt_epoll_block_write(nxt_event_engine_t *engine, nxt_fd_event_t *ev) 522 { 523 if (ev->write != NXT_EVENT_INACTIVE) { 524 ev->write = NXT_EVENT_BLOCKED; 525 } 526 } 527 528 529 /* 530 * NXT_EVENT_DISABLED state is used to track whether EPOLLONESHOT 531 * event should be added or modified, epoll_ctl(2): 532 * 533 * EPOLLONESHOT (since Linux 2.6.2) 534 * Sets the one-shot behavior for the associated file descriptor. 535 * This means that after an event is pulled out with epoll_wait(2) 536 * the associated file descriptor is internally disabled and no 537 * other events will be reported by the epoll interface. The user 538 * must call epoll_ctl() with EPOLL_CTL_MOD to rearm the file 539 * descriptor with a new event mask. 540 */ 541 542 static void 543 nxt_epoll_oneshot_read(nxt_event_engine_t *engine, nxt_fd_event_t *ev) 544 { 545 int op; 546 547 op = (ev->read == NXT_EVENT_INACTIVE && ev->write == NXT_EVENT_INACTIVE) ? 548 EPOLL_CTL_ADD : EPOLL_CTL_MOD; 549 550 ev->read = NXT_EVENT_ONESHOT; 551 ev->write = NXT_EVENT_INACTIVE; 552 553 nxt_epoll_change(engine, ev, op, EPOLLIN | EPOLLONESHOT); 554 } 555 556 557 static void 558 nxt_epoll_oneshot_write(nxt_event_engine_t *engine, nxt_fd_event_t *ev) 559 { 560 int op; 561 562 op = (ev->read == NXT_EVENT_INACTIVE && ev->write == NXT_EVENT_INACTIVE) ? 563 EPOLL_CTL_ADD : EPOLL_CTL_MOD; 564 565 ev->read = NXT_EVENT_INACTIVE; 566 ev->write = NXT_EVENT_ONESHOT; 567 568 nxt_epoll_change(engine, ev, op, EPOLLOUT | EPOLLONESHOT); 569 } 570 571 572 static void 573 nxt_epoll_enable_accept(nxt_event_engine_t *engine, nxt_fd_event_t *ev) 574 { 575 ev->read = NXT_EVENT_ACTIVE; 576 577 nxt_epoll_change(engine, ev, EPOLL_CTL_ADD, EPOLLIN); 578 } 579 580 581 /* 582 * epoll changes are batched to improve instruction and data cache 583 * locality of several epoll_ctl() calls followed by epoll_wait() call. 584 */ 585 586 static void 587 nxt_epoll_change(nxt_event_engine_t *engine, nxt_fd_event_t *ev, int op, 588 uint32_t events) 589 { 590 nxt_epoll_change_t *change; 591 592 nxt_debug(ev->task, "epoll %d set event: fd:%d op:%d ev:%XD", 593 engine->u.epoll.fd, ev->fd, op, events); 594 595 if (engine->u.epoll.nchanges >= engine->u.epoll.mchanges) { 596 (void) nxt_epoll_commit_changes(engine); 597 } 598 599 ev->changing = 1; 600 601 change = &engine->u.epoll.changes[engine->u.epoll.nchanges++]; 602 change->op = op; 603 change->event.events = events; 604 change->event.data.ptr = ev; 605 } 606 607 608 static nxt_int_t 609 nxt_epoll_commit_changes(nxt_event_engine_t *engine) 610 { 611 int ret; 612 nxt_int_t retval; 613 nxt_fd_event_t *ev; 614 nxt_epoll_change_t *change, *end; 615 616 nxt_debug(&engine->task, "epoll %d changes:%ui", 617 engine->u.epoll.fd, engine->u.epoll.nchanges); 618 619 retval = NXT_OK; 620 change = engine->u.epoll.changes; 621 end = change + engine->u.epoll.nchanges; 622 623 do { 624 ev = change->event.data.ptr; 625 ev->changing = 0; 626 627 nxt_debug(ev->task, "epoll_ctl(%d): fd:%d op:%d ev:%XD", 628 engine->u.epoll.fd, ev->fd, change->op, 629 change->event.events); 630 631 ret = epoll_ctl(engine->u.epoll.fd, change->op, ev->fd, &change->event); 632 633 if (nxt_slow_path(ret != 0)) { 634 nxt_log(ev->task, NXT_LOG_CRIT, "epoll_ctl(%d, %d, %d) failed %E", 635 engine->u.epoll.fd, change->op, ev->fd, nxt_errno); 636 637 nxt_work_queue_add(&engine->fast_work_queue, 638 nxt_epoll_error_handler, ev->task, ev, ev->data); 639 640 retval = NXT_ERROR; 641 } 642 643 change++; 644 645 } while (change < end); 646 647 engine->u.epoll.nchanges = 0; 648 649 return retval; 650 } 651 652 653 static void 654 nxt_epoll_error_handler(nxt_task_t *task, void *obj, void *data) 655 { 656 nxt_fd_event_t *ev; 657 658 ev = obj; 659 660 ev->read = NXT_EVENT_INACTIVE; 661 ev->write = NXT_EVENT_INACTIVE; 662 663 ev->error_handler(ev->task, ev, data); 664 } 665 666 667 #if (NXT_HAVE_SIGNALFD) 668 669 static nxt_int_t 670 nxt_epoll_add_signal(nxt_event_engine_t *engine) 671 { 672 int fd; 673 struct epoll_event ee; 674 675 if (sigprocmask(SIG_BLOCK, &engine->signals->sigmask, NULL) != 0) { 676 nxt_log(&engine->task, NXT_LOG_CRIT, 677 "sigprocmask(SIG_BLOCK) failed %E", nxt_errno); 678 return NXT_ERROR; 679 } 680 681 /* 682 * Glibc signalfd() wrapper always has the flags argument. Glibc 2.7 683 * and 2.8 signalfd() wrappers call the original signalfd() syscall 684 * without the flags argument. Glibc 2.9+ signalfd() wrapper at first 685 * tries to call signalfd4() syscall and if it fails then calls the 686 * original signalfd() syscall. For this reason the non-blocking mode 687 * is set separately. 688 */ 689 690 fd = signalfd(-1, &engine->signals->sigmask, 0); 691 692 if (fd == -1) { 693 nxt_log(&engine->task, NXT_LOG_CRIT, "signalfd(%d) failed %E", 694 engine->u.epoll.signalfd.fd, nxt_errno); 695 return NXT_ERROR; 696 } 697 698 engine->u.epoll.signalfd.fd = fd; 699 700 if (nxt_fd_nonblocking(&engine->task, fd) != NXT_OK) { 701 return NXT_ERROR; 702 } 703 704 nxt_debug(&engine->task, "signalfd(): %d", fd); 705 706 engine->u.epoll.signalfd.data = engine->signals->handler; 707 engine->u.epoll.signalfd.read_work_queue = &engine->fast_work_queue; 708 engine->u.epoll.signalfd.read_handler = nxt_epoll_signalfd_handler; 709 engine->u.epoll.signalfd.log = engine->task.log; 710 engine->u.epoll.signalfd.task = &engine->task; 711 712 ee.events = EPOLLIN; 713 ee.data.ptr = &engine->u.epoll.signalfd; 714 715 if (epoll_ctl(engine->u.epoll.fd, EPOLL_CTL_ADD, fd, &ee) != 0) { 716 nxt_log(&engine->task, NXT_LOG_CRIT, "epoll_ctl(%d, %d, %d) failed %E", 717 engine->u.epoll.fd, EPOLL_CTL_ADD, fd, nxt_errno); 718 719 return NXT_ERROR; 720 } 721 722 return NXT_OK; 723 } 724 725 726 static void 727 nxt_epoll_signalfd_handler(nxt_task_t *task, void *obj, void *data) 728 { 729 int n; 730 nxt_fd_event_t *ev; 731 nxt_work_handler_t handler; 732 struct signalfd_siginfo sfd; 733 734 ev = obj; 735 handler = data; 736 737 nxt_debug(task, "signalfd handler"); 738 739 n = read(ev->fd, &sfd, sizeof(struct signalfd_siginfo)); 740 741 nxt_debug(task, "read signalfd(%d): %d", ev->fd, n); 742 743 if (n != sizeof(struct signalfd_siginfo)) { 744 nxt_log(task, NXT_LOG_CRIT, "read signalfd(%d) failed %E", 745 ev->fd, nxt_errno); 746 } 747 748 nxt_debug(task, "signalfd(%d) signo:%d", ev->fd, sfd.ssi_signo); 749 750 handler(task, (void *) (uintptr_t) sfd.ssi_signo, NULL); 751 } 752 753 #endif 754 755 756 #if (NXT_HAVE_EVENTFD) 757 758 static nxt_int_t 759 nxt_epoll_enable_post(nxt_event_engine_t *engine, nxt_work_handler_t handler) 760 { 761 int ret; 762 struct epoll_event ee; 763 764 engine->u.epoll.post_handler = handler; 765 766 /* 767 * Glibc eventfd() wrapper always has the flags argument. Glibc 2.7 768 * and 2.8 eventfd() wrappers call the original eventfd() syscall 769 * without the flags argument. Glibc 2.9+ eventfd() wrapper at first 770 * tries to call eventfd2() syscall and if it fails then calls the 771 * original eventfd() syscall. For this reason the non-blocking mode 772 * is set separately. 773 */ 774 775 engine->u.epoll.eventfd.fd = eventfd(0, 0); 776 777 if (engine->u.epoll.eventfd.fd == -1) { 778 nxt_log(&engine->task, NXT_LOG_CRIT, "eventfd() failed %E", nxt_errno); 779 return NXT_ERROR; 780 } 781 782 ret = nxt_fd_nonblocking(&engine->task, engine->u.epoll.eventfd.fd); 783 if (nxt_slow_path(ret != NXT_OK)) { 784 return NXT_ERROR; 785 } 786 787 nxt_debug(&engine->task, "eventfd(): %d", engine->u.epoll.eventfd.fd); 788 789 engine->u.epoll.eventfd.read_work_queue = &engine->fast_work_queue; 790 engine->u.epoll.eventfd.read_handler = nxt_epoll_eventfd_handler; 791 engine->u.epoll.eventfd.data = engine; 792 engine->u.epoll.eventfd.log = engine->task.log; 793 engine->u.epoll.eventfd.task = &engine->task; 794 795 ee.events = EPOLLIN | EPOLLET; 796 ee.data.ptr = &engine->u.epoll.eventfd; 797 798 ret = epoll_ctl(engine->u.epoll.fd, EPOLL_CTL_ADD, 799 engine->u.epoll.eventfd.fd, &ee); 800 801 if (nxt_fast_path(ret == 0)) { 802 return NXT_OK; 803 } 804 805 nxt_log(&engine->task, NXT_LOG_CRIT, "epoll_ctl(%d, %d, %d) failed %E", 806 engine->u.epoll.fd, EPOLL_CTL_ADD, engine->u.epoll.eventfd.fd, 807 nxt_errno); 808 809 return NXT_ERROR; 810 } 811 812 813 static void 814 nxt_epoll_eventfd_handler(nxt_task_t *task, void *obj, void *data) 815 { 816 int n; 817 uint64_t events; 818 nxt_event_engine_t *engine; 819 820 engine = data; 821 822 nxt_debug(task, "eventfd handler, times:%ui", engine->u.epoll.neventfd); 823 824 /* 825 * The maximum value after write() to a eventfd() descriptor will 826 * block or return EAGAIN is 0xfffffffffffffffe, so the descriptor 827 * can be read once per many notifications, for example, once per 828 * 2^32-2 noticifcations. Since the eventfd() file descriptor is 829 * always registered in EPOLLET mode, epoll returns event about 830 * only the latest write() to the descriptor. 831 */ 832 833 if (engine->u.epoll.neventfd++ >= 0xfffffffe) { 834 engine->u.epoll.neventfd = 0; 835 836 n = read(engine->u.epoll.eventfd.fd, &events, sizeof(uint64_t)); 837 838 nxt_debug(task, "read(%d): %d events:%uL", 839 engine->u.epoll.eventfd.fd, n, events); 840 841 if (n != sizeof(uint64_t)) { 842 nxt_log(task, NXT_LOG_CRIT, "read eventfd(%d) failed %E", 843 engine->u.epoll.eventfd.fd, nxt_errno); 844 } 845 } 846 847 engine->u.epoll.post_handler(task, NULL, NULL); 848 } 849 850 851 static void 852 nxt_epoll_signal(nxt_event_engine_t *engine, nxt_uint_t signo) 853 { 854 size_t ret; 855 uint64_t event; 856 857 /* 858 * eventfd() presents along with signalfd(), so the function 859 * is used only to post events and the signo argument is ignored. 860 */ 861 862 event = 1; 863 864 ret = write(engine->u.epoll.eventfd.fd, &event, sizeof(uint64_t)); 865 866 if (nxt_slow_path(ret != sizeof(uint64_t))) { 867 nxt_log(&engine->task, NXT_LOG_CRIT, "write(%d) to eventfd failed %E", 868 engine->u.epoll.eventfd.fd, nxt_errno); 869 } 870 } 871 872 #endif 873 874 875 static void 876 nxt_epoll_poll(nxt_event_engine_t *engine, nxt_msec_t timeout) 877 { 878 int nevents; 879 uint32_t events; 880 nxt_int_t i; 881 nxt_err_t err; 882 nxt_bool_t error; 883 nxt_uint_t level; 884 nxt_fd_event_t *ev; 885 struct epoll_event *event; 886 887 if (engine->u.epoll.nchanges != 0) { 888 if (nxt_epoll_commit_changes(engine) != NXT_OK) { 889 /* Error handlers have been enqueued on failure. */ 890 timeout = 0; 891 } 892 } 893 894 nxt_debug(&engine->task, "epoll_wait(%d) timeout:%M", 895 engine->u.epoll.fd, timeout); 896 897 nevents = epoll_wait(engine->u.epoll.fd, engine->u.epoll.events, 898 engine->u.epoll.mevents, timeout); 899 900 err = (nevents == -1) ? nxt_errno : 0; 901 902 nxt_thread_time_update(engine->task.thread); 903 904 nxt_debug(&engine->task, "epoll_wait(%d): %d", engine->u.epoll.fd, nevents); 905 906 if (nevents == -1) { 907 level = (err == NXT_EINTR) ? NXT_LOG_INFO : NXT_LOG_CRIT; 908 909 nxt_log(&engine->task, level, "epoll_wait(%d) failed %E", 910 engine->u.epoll.fd, err); 911 912 return; 913 } 914 915 for (i = 0; i < nevents; i++) { 916 917 event = &engine->u.epoll.events[i]; 918 events = event->events; 919 ev = event->data.ptr; 920 921 nxt_debug(ev->task, "epoll: fd:%d ev:%04XD d:%p rd:%d wr:%d", 922 ev->fd, events, ev, ev->read, ev->write); 923 924 /* 925 * On error epoll may set EPOLLERR and EPOLLHUP only without EPOLLIN or 926 * EPOLLOUT, so the "error" variable enqueues only one active handler. 927 */ 928 error = ((events & (EPOLLERR | EPOLLHUP)) != 0); 929 ev->epoll_error = error; 930 931 #if (NXT_HAVE_EPOLL_EDGE) 932 933 ev->epoll_eof = ((events & EPOLLRDHUP) != 0); 934 935 #endif 936 937 if ((events & EPOLLIN) || error) { 938 ev->read_ready = 1; 939 940 if (ev->read != NXT_EVENT_BLOCKED) { 941 942 if (ev->read == NXT_EVENT_ONESHOT) { 943 ev->read = NXT_EVENT_DISABLED; 944 } 945 946 error = 0; 947 948 nxt_work_queue_add(ev->read_work_queue, ev->read_handler, 949 ev->task, ev, ev->data); 950 951 } else if (engine->u.epoll.mode == 0) { 952 /* Level-triggered mode. */ 953 nxt_epoll_disable_read(engine, ev); 954 } 955 } 956 957 if ((events & EPOLLOUT) || error) { 958 ev->write_ready = 1; 959 960 if (ev->write != NXT_EVENT_BLOCKED) { 961 962 if (ev->write == NXT_EVENT_ONESHOT) { 963 ev->write = NXT_EVENT_DISABLED; 964 } 965 966 error = 0; 967 968 nxt_work_queue_add(ev->write_work_queue, ev->write_handler, 969 ev->task, ev, ev->data); 970 971 } else if (engine->u.epoll.mode == 0) { 972 /* Level-triggered mode. */ 973 nxt_epoll_disable_write(engine, ev); 974 } 975 } 976 977 if (error) { 978 ev->read_ready = 1; 979 ev->write_ready = 1; 980 } 981 } 982 } 983 984 985 #if (NXT_HAVE_ACCEPT4) 986 987 static void 988 nxt_epoll_event_conn_io_accept4(nxt_task_t *task, void *obj, void *data) 989 { 990 socklen_t len; 991 nxt_socket_t s; 992 struct sockaddr *sa; 993 nxt_event_conn_t *c; 994 nxt_event_conn_listen_t *cls; 995 996 cls = obj; 997 c = cls->next; 998 999 cls->ready--; 1000 cls->socket.read_ready = (cls->ready != 0); 1001 1002 len = c->remote->socklen; 1003 1004 if (len >= sizeof(struct sockaddr)) { 1005 sa = &c->remote->u.sockaddr; 1006 1007 } else { 1008 sa = NULL; 1009 len = 0; 1010 } 1011 1012 s = accept4(cls->socket.fd, sa, &len, SOCK_NONBLOCK); 1013 1014 if (s != -1) { 1015 c->socket.fd = s; 1016 1017 nxt_debug(task, "accept4(%d): %d", cls->socket.fd, s); 1018 1019 nxt_event_conn_accept(task, cls, c); 1020 return; 1021 } 1022 1023 nxt_event_conn_accept_error(task, cls, "accept4", nxt_errno); 1024 } 1025 1026 #endif 1027 1028 1029 #if (NXT_HAVE_EPOLL_EDGE) 1030 1031 /* 1032 * nxt_epoll_edge_event_conn_io_connect() eliminates the getsockopt() 1033 * syscall to test pending connect() error. Although this special 1034 * interface can work in both edge-triggered and level-triggered 1035 * modes it is enabled only for the former mode because this mode is 1036 * available in all modern Linux distributions. For the latter mode 1037 * it is required to create additional nxt_epoll_level_event_conn_io 1038 * with single non-generic connect() interface. 1039 */ 1040 1041 static void 1042 nxt_epoll_edge_event_conn_io_connect(nxt_task_t *task, void *obj, void *data) 1043 { 1044 nxt_event_conn_t *c; 1045 nxt_event_engine_t *engine; 1046 nxt_work_handler_t handler; 1047 const nxt_event_conn_state_t *state; 1048 1049 c = obj; 1050 1051 state = c->write_state; 1052 1053 switch (nxt_socket_connect(task, c->socket.fd, c->remote) ){ 1054 1055 case NXT_OK: 1056 c->socket.write_ready = 1; 1057 handler = state->ready_handler; 1058 break; 1059 1060 case NXT_AGAIN: 1061 c->socket.write_handler = nxt_epoll_edge_event_conn_connected; 1062 c->socket.error_handler = nxt_event_conn_connect_error; 1063 1064 engine = task->thread->engine; 1065 nxt_event_conn_timer(engine, c, state, &c->write_timer); 1066 1067 nxt_epoll_enable(engine, &c->socket); 1068 c->socket.read = NXT_EVENT_BLOCKED; 1069 return; 1070 1071 #if 0 1072 case NXT_AGAIN: 1073 nxt_event_conn_timer(engine, c, state, &c->write_timer); 1074 1075 /* Fall through. */ 1076 1077 case NXT_OK: 1078 /* 1079 * Mark both read and write directions as ready and try to perform 1080 * I/O operations before receiving readiness notifications. 1081 * On unconnected socket Linux send() and recv() return EAGAIN 1082 * instead of ENOTCONN. 1083 */ 1084 c->socket.read_ready = 1; 1085 c->socket.write_ready = 1; 1086 /* 1087 * Enabling both read and write notifications on a getting 1088 * connected socket eliminates one epoll_ctl() syscall. 1089 */ 1090 c->socket.write_handler = nxt_epoll_edge_event_conn_connected; 1091 c->socket.error_handler = state->error_handler; 1092 1093 nxt_epoll_enable(engine, &c->socket); 1094 c->socket.read = NXT_EVENT_BLOCKED; 1095 1096 handler = state->ready_handler; 1097 break; 1098 #endif 1099 1100 case NXT_ERROR: 1101 handler = state->error_handler; 1102 break; 1103 1104 default: /* NXT_DECLINED: connection refused. */ 1105 handler = state->close_handler; 1106 break; 1107 } 1108 1109 nxt_work_queue_add(c->write_work_queue, handler, task, c, data); 1110 } 1111 1112 1113 static void 1114 nxt_epoll_edge_event_conn_connected(nxt_task_t *task, void *obj, void *data) 1115 { 1116 nxt_event_conn_t *c; 1117 1118 c = obj; 1119 1120 nxt_debug(task, "epoll event conn connected fd:%d", c->socket.fd); 1121 1122 if (!c->socket.epoll_error) { 1123 c->socket.write = NXT_EVENT_BLOCKED; 1124 1125 if (c->write_state->timer_autoreset) { 1126 nxt_timer_disable(task->thread->engine, &c->write_timer); 1127 } 1128 1129 nxt_work_queue_add(c->write_work_queue, c->write_state->ready_handler, 1130 task, c, data); 1131 return; 1132 } 1133 1134 nxt_event_conn_connect_test(task, c, data); 1135 } 1136 1137 1138 /* 1139 * nxt_epoll_edge_event_conn_io_recvbuf() is just wrapper around 1140 * standard nxt_event_conn_io_recvbuf() to enforce to read a pending EOF 1141 * in edge-triggered mode. 1142 */ 1143 1144 static ssize_t 1145 nxt_epoll_edge_event_conn_io_recvbuf(nxt_event_conn_t *c, nxt_buf_t *b) 1146 { 1147 ssize_t n; 1148 1149 n = nxt_event_conn_io_recvbuf(c, b); 1150 1151 if (n > 0 && c->socket.epoll_eof) { 1152 c->socket.read_ready = 1; 1153 } 1154 1155 return n; 1156 } 1157 1158 #endif 1159