1 2/* 3 * Copyright (C) Igor Sysoev 4 * Copyright (C) NGINX, Inc. 5 */ 6 7#include <nxt_main.h> 8 9 10/* 11 * The first epoll version has been introduced in Linux 2.5.44. The 12 * interface was changed several times since then and the final version 13 * of epoll_create(), epoll_ctl(), epoll_wait(), and EPOLLET mode has 14 * been introduced in Linux 2.6.0 and is supported since glibc 2.3.2. 15 * 16 * EPOLLET mode did not work reliable in early implementaions and in 17 * Linux 2.4 backport. 18 * 19 * EPOLLONESHOT Linux 2.6.2, glibc 2.3. 20 * EPOLLRDHUP Linux 2.6.17, glibc 2.8. 21 * epoll_pwait() Linux 2.6.19, glibc 2.6. 22 * signalfd() Linux 2.6.22, glibc 2.7. 23 * eventfd() Linux 2.6.22, glibc 2.7. 24 * timerfd_create() Linux 2.6.25, glibc 2.8. 25 * epoll_create1() Linux 2.6.27, glibc 2.9. 26 * signalfd4() Linux 2.6.27, glibc 2.9. 27 * eventfd2() Linux 2.6.27, glibc 2.9. 28 * accept4() Linux 2.6.28, glibc 2.10. 29 * eventfd2(EFD_SEMAPHORE) Linux 2.6.30, glibc 2.10. 30 * EPOLLEXCLUSIVE Linux 4.5, glibc 2.24. 31 */ 32 33 34#if (NXT_HAVE_EPOLL_EDGE) 35static nxt_int_t nxt_epoll_edge_create(nxt_event_engine_t *engine, 36 nxt_uint_t mchanges, nxt_uint_t mevents); 37#endif 38static nxt_int_t nxt_epoll_level_create(nxt_event_engine_t *engine, 39 nxt_uint_t mchanges, nxt_uint_t mevents); 40static nxt_int_t nxt_epoll_create(nxt_event_engine_t *engine, 41 nxt_uint_t mchanges, nxt_uint_t mevents, nxt_conn_io_t *io, uint32_t mode); 42static void nxt_epoll_test_accept4(nxt_event_engine_t *engine, 43 nxt_conn_io_t *io); 44static void nxt_epoll_free(nxt_event_engine_t *engine); 45static void nxt_epoll_enable(nxt_event_engine_t *engine, nxt_fd_event_t *ev); 46static void nxt_epoll_disable(nxt_event_engine_t *engine, nxt_fd_event_t *ev); 47static void nxt_epoll_delete(nxt_event_engine_t *engine, nxt_fd_event_t *ev); 48static nxt_bool_t nxt_epoll_close(nxt_event_engine_t *engine, 49 nxt_fd_event_t *ev); 50static void nxt_epoll_enable_read(nxt_event_engine_t *engine, 51 nxt_fd_event_t *ev); 52static void nxt_epoll_enable_write(nxt_event_engine_t *engine, 53 nxt_fd_event_t *ev); 54static void nxt_epoll_disable_read(nxt_event_engine_t *engine, 55 nxt_fd_event_t *ev); 56static void nxt_epoll_disable_write(nxt_event_engine_t *engine, 57 nxt_fd_event_t *ev); 58static void nxt_epoll_block_read(nxt_event_engine_t *engine, 59 nxt_fd_event_t *ev); 60static void nxt_epoll_block_write(nxt_event_engine_t *engine, 61 nxt_fd_event_t *ev); 62static void nxt_epoll_oneshot_read(nxt_event_engine_t *engine, 63 nxt_fd_event_t *ev); 64static void nxt_epoll_oneshot_write(nxt_event_engine_t *engine, 65 nxt_fd_event_t *ev); 66static void nxt_epoll_enable_accept(nxt_event_engine_t *engine, 67 nxt_fd_event_t *ev); 68static void nxt_epoll_change(nxt_event_engine_t *engine, nxt_fd_event_t *ev, 69 int op, uint32_t events); 70static nxt_int_t nxt_epoll_commit_changes(nxt_event_engine_t *engine); 71static void nxt_epoll_error_handler(nxt_task_t *task, void *obj, void *data); 72#if (NXT_HAVE_SIGNALFD) 73static nxt_int_t nxt_epoll_add_signal(nxt_event_engine_t *engine); 74static void nxt_epoll_signalfd_handler(nxt_task_t *task, void *obj, void *data); 75#endif 76#if (NXT_HAVE_EVENTFD) 77static nxt_int_t nxt_epoll_enable_post(nxt_event_engine_t *engine, 78 nxt_work_handler_t handler); 79static void nxt_epoll_eventfd_handler(nxt_task_t *task, void *obj, void *data); 80static void nxt_epoll_signal(nxt_event_engine_t *engine, nxt_uint_t signo); 81#endif 82static void nxt_epoll_poll(nxt_event_engine_t *engine, nxt_msec_t timeout); 83 84#if (NXT_HAVE_ACCEPT4) 85static void nxt_epoll_conn_io_accept4(nxt_task_t *task, void *obj, 86 void *data); 87#endif 88 89 90#if (NXT_HAVE_EPOLL_EDGE) 91 92static void nxt_epoll_edge_conn_io_connect(nxt_task_t *task, void *obj, 93 void *data); 94static void nxt_epoll_edge_conn_connected(nxt_task_t *task, void *obj, 95 void *data); 96static ssize_t nxt_epoll_edge_conn_io_recvbuf(nxt_conn_t *c, nxt_buf_t *b); 97 98 99static nxt_conn_io_t nxt_epoll_edge_conn_io = { 100 nxt_epoll_edge_conn_io_connect, 101 nxt_conn_io_accept, 102 103 nxt_conn_io_read, 104 nxt_epoll_edge_conn_io_recvbuf, 105 nxt_conn_io_recv, 106 107 nxt_conn_io_write, 108 nxt_event_conn_io_write_chunk, 109 110#if (NXT_HAVE_LINUX_SENDFILE) 111 nxt_linux_event_conn_io_sendfile, 112#else 113 nxt_event_conn_io_sendbuf, 114#endif 115 116 nxt_event_conn_io_writev, 117 nxt_event_conn_io_send, 118 119 nxt_conn_io_shutdown, 120}; 121 122 123const nxt_event_interface_t nxt_epoll_edge_engine = { 124 "epoll_edge", 125 nxt_epoll_edge_create, 126 nxt_epoll_free, 127 nxt_epoll_enable, 128 nxt_epoll_disable, 129 nxt_epoll_delete, 130 nxt_epoll_close, 131 nxt_epoll_enable_read, 132 nxt_epoll_enable_write, 133 nxt_epoll_disable_read, 134 nxt_epoll_disable_write, 135 nxt_epoll_block_read, 136 nxt_epoll_block_write, 137 nxt_epoll_oneshot_read, 138 nxt_epoll_oneshot_write, 139 nxt_epoll_enable_accept, 140 NULL, 141 NULL, 142#if (NXT_HAVE_EVENTFD) 143 nxt_epoll_enable_post, 144 nxt_epoll_signal, 145#else 146 NULL, 147 NULL, 148#endif 149 nxt_epoll_poll, 150 151 &nxt_epoll_edge_conn_io, 152 153#if (NXT_HAVE_INOTIFY) 154 NXT_FILE_EVENTS, 155#else 156 NXT_NO_FILE_EVENTS, 157#endif 158 159#if (NXT_HAVE_SIGNALFD) 160 NXT_SIGNAL_EVENTS, 161#else 162 NXT_NO_SIGNAL_EVENTS, 163#endif 164}; 165 166#endif 167 168 169const nxt_event_interface_t nxt_epoll_level_engine = { 170 "epoll_level", 171 nxt_epoll_level_create, 172 nxt_epoll_free, 173 nxt_epoll_enable, 174 nxt_epoll_disable, 175 nxt_epoll_delete, 176 nxt_epoll_close, 177 nxt_epoll_enable_read, 178 nxt_epoll_enable_write, 179 nxt_epoll_disable_read, 180 nxt_epoll_disable_write, 181 nxt_epoll_block_read, 182 nxt_epoll_block_write, 183 nxt_epoll_oneshot_read, 184 nxt_epoll_oneshot_write, 185 nxt_epoll_enable_accept, 186 NULL, 187 NULL, 188#if (NXT_HAVE_EVENTFD) 189 nxt_epoll_enable_post, 190 nxt_epoll_signal, 191#else 192 NULL, 193 NULL, 194#endif 195 nxt_epoll_poll, 196 197 &nxt_unix_conn_io, 198 199#if (NXT_HAVE_INOTIFY) 200 NXT_FILE_EVENTS, 201#else 202 NXT_NO_FILE_EVENTS, 203#endif 204 205#if (NXT_HAVE_SIGNALFD) 206 NXT_SIGNAL_EVENTS, 207#else 208 NXT_NO_SIGNAL_EVENTS, 209#endif 210}; 211 212 213#if (NXT_HAVE_EPOLL_EDGE) 214 215static nxt_int_t 216nxt_epoll_edge_create(nxt_event_engine_t *engine, nxt_uint_t mchanges, 217 nxt_uint_t mevents) 218{ 219 return nxt_epoll_create(engine, mchanges, mevents, &nxt_epoll_edge_conn_io, 220 EPOLLET | EPOLLRDHUP); 221} 222 223#endif 224 225 226static nxt_int_t 227nxt_epoll_level_create(nxt_event_engine_t *engine, nxt_uint_t mchanges, 228 nxt_uint_t mevents) 229{ 230 return nxt_epoll_create(engine, mchanges, mevents, 231 &nxt_unix_conn_io, 0); 232} 233 234 235static nxt_int_t 236nxt_epoll_create(nxt_event_engine_t *engine, nxt_uint_t mchanges, 237 nxt_uint_t mevents, nxt_conn_io_t *io, uint32_t mode) 238{ 239 engine->u.epoll.fd = -1; 240 engine->u.epoll.mode = mode; 241 engine->u.epoll.mchanges = mchanges; 242 engine->u.epoll.mevents = mevents; 243#if (NXT_HAVE_SIGNALFD) 244 engine->u.epoll.signalfd.fd = -1; 245#endif 246 247 engine->u.epoll.changes = nxt_malloc(sizeof(nxt_epoll_change_t) * mchanges); 248 if (engine->u.epoll.changes == NULL) { 249 goto fail; 250 } 251 252 engine->u.epoll.events = nxt_malloc(sizeof(struct epoll_event) * mevents); 253 if (engine->u.epoll.events == NULL) { 254 goto fail; 255 } 256 257 engine->u.epoll.fd = epoll_create(1); 258 if (engine->u.epoll.fd == -1) { 259 nxt_alert(&engine->task, "epoll_create() failed %E", nxt_errno); 260 goto fail; 261 } 262 263 nxt_debug(&engine->task, "epoll_create(): %d", engine->u.epoll.fd); 264 265 if (engine->signals != NULL) { 266 267#if (NXT_HAVE_SIGNALFD) 268 269 if (nxt_epoll_add_signal(engine) != NXT_OK) { 270 goto fail; 271 } 272 273#endif 274 275 nxt_epoll_test_accept4(engine, io); 276 } 277 278 return NXT_OK; 279 280fail: 281 282 nxt_epoll_free(engine); 283 284 return NXT_ERROR; 285} 286 287 288static void 289nxt_epoll_test_accept4(nxt_event_engine_t *engine, nxt_conn_io_t *io) 290{ 291 static nxt_work_handler_t handler; 292 293 if (handler == NULL) { 294 295 handler = io->accept; 296 297#if (NXT_HAVE_ACCEPT4) 298 299 (void) accept4(-1, NULL, NULL, SOCK_NONBLOCK); 300 301 if (nxt_errno != NXT_ENOSYS) { 302 handler = nxt_epoll_conn_io_accept4; 303 304 } else { 305 nxt_log(&engine->task, NXT_LOG_INFO, "accept4() failed %E", 306 NXT_ENOSYS); 307 } 308 309#endif 310 } 311 312 io->accept = handler; 313} 314 315 316static void 317nxt_epoll_free(nxt_event_engine_t *engine) 318{ 319 int fd; 320 321 nxt_debug(&engine->task, "epoll %d free", engine->u.epoll.fd); 322 323#if (NXT_HAVE_SIGNALFD) 324 325 fd = engine->u.epoll.signalfd.fd; 326 327 if (fd != -1 && close(fd) != 0) { 328 nxt_alert(&engine->task, "signalfd close(%d) failed %E", fd, nxt_errno); 329 } 330 331#endif 332 333#if (NXT_HAVE_EVENTFD) 334 335 fd = engine->u.epoll.eventfd.fd; 336 337 if (fd != -1 && close(fd) != 0) { 338 nxt_alert(&engine->task, "eventfd close(%d) failed %E", fd, nxt_errno); 339 } 340 341#endif 342 343 fd = engine->u.epoll.fd; 344 345 if (fd != -1 && close(fd) != 0) { 346 nxt_alert(&engine->task, "epoll close(%d) failed %E", fd, nxt_errno); 347 } 348 349 nxt_free(engine->u.epoll.events); 350 nxt_free(engine->u.epoll.changes); 351 352 nxt_memzero(&engine->u.epoll, sizeof(nxt_epoll_engine_t)); 353} 354 355 356static void 357nxt_epoll_enable(nxt_event_engine_t *engine, nxt_fd_event_t *ev) 358{ 359 ev->read = NXT_EVENT_ACTIVE; 360 ev->write = NXT_EVENT_ACTIVE; 361 362 nxt_epoll_change(engine, ev, EPOLL_CTL_ADD, 363 EPOLLIN | EPOLLOUT | engine->u.epoll.mode); 364} 365 366 367static void 368nxt_epoll_disable(nxt_event_engine_t *engine, nxt_fd_event_t *ev) 369{ 370 if (ev->read > NXT_EVENT_DISABLED || ev->write > NXT_EVENT_DISABLED) { 371 372 ev->read = NXT_EVENT_INACTIVE; 373 ev->write = NXT_EVENT_INACTIVE; 374 375 nxt_epoll_change(engine, ev, EPOLL_CTL_DEL, 0); 376 } 377} 378 379 380static void 381nxt_epoll_delete(nxt_event_engine_t *engine, nxt_fd_event_t *ev) 382{ 383 if (ev->read != NXT_EVENT_INACTIVE || ev->write != NXT_EVENT_INACTIVE) { 384 385 ev->read = NXT_EVENT_INACTIVE; 386 ev->write = NXT_EVENT_INACTIVE; 387 388 nxt_epoll_change(engine, ev, EPOLL_CTL_DEL, 0); 389 } 390} 391 392 393/* 394 * Although calling close() on a file descriptor will remove any epoll 395 * events that reference the descriptor, in this case the close() acquires 396 * the kernel global "epmutex" while epoll_ctl(EPOLL_CTL_DEL) does not 397 * acquire the "epmutex" since Linux 3.13 if the file descriptor presents 398 * only in one epoll set. Thus removing events explicitly before closing 399 * eliminates possible lock contention. 400 */ 401 402static nxt_bool_t 403nxt_epoll_close(nxt_event_engine_t *engine, nxt_fd_event_t *ev) 404{ 405 nxt_epoll_delete(engine, ev); 406 407 return ev->changing; 408} 409 410 411static void 412nxt_epoll_enable_read(nxt_event_engine_t *engine, nxt_fd_event_t *ev) 413{ 414 int op; 415 uint32_t events; 416 417 if (ev->read != NXT_EVENT_BLOCKED) { 418 419 op = EPOLL_CTL_MOD; 420 events = EPOLLIN | engine->u.epoll.mode; 421 422 if (ev->read == NXT_EVENT_INACTIVE && ev->write == NXT_EVENT_INACTIVE) { 423 op = EPOLL_CTL_ADD; 424 425 } else if (ev->write >= NXT_EVENT_BLOCKED) { 426 events |= EPOLLOUT; 427 } 428 429 nxt_epoll_change(engine, ev, op, events); 430 } 431 432 ev->read = NXT_EVENT_ACTIVE; 433} 434 435 436static void 437nxt_epoll_enable_write(nxt_event_engine_t *engine, nxt_fd_event_t *ev) 438{ 439 int op; 440 uint32_t events; 441 442 if (ev->write != NXT_EVENT_BLOCKED) { 443 444 op = EPOLL_CTL_MOD; 445 events = EPOLLOUT | engine->u.epoll.mode; 446 447 if (ev->read == NXT_EVENT_INACTIVE && ev->write == NXT_EVENT_INACTIVE) { 448 op = EPOLL_CTL_ADD; 449 450 } else if (ev->read >= NXT_EVENT_BLOCKED) { 451 events |= EPOLLIN; 452 } 453 454 nxt_epoll_change(engine, ev, op, events); 455 } 456 457 ev->write = NXT_EVENT_ACTIVE; 458} 459 460 461static void 462nxt_epoll_disable_read(nxt_event_engine_t *engine, nxt_fd_event_t *ev) 463{ 464 int op; 465 uint32_t events; 466 467 ev->read = NXT_EVENT_INACTIVE; 468 469 if (ev->write <= NXT_EVENT_DISABLED) { 470 ev->write = NXT_EVENT_INACTIVE; 471 op = EPOLL_CTL_DEL; 472 events = 0; 473 474 } else { 475 op = EPOLL_CTL_MOD; 476 events = EPOLLOUT | engine->u.epoll.mode; 477 } 478 479 nxt_epoll_change(engine, ev, op, events); 480} 481 482 483static void 484nxt_epoll_disable_write(nxt_event_engine_t *engine, nxt_fd_event_t *ev) 485{ 486 int op; 487 uint32_t events; 488 489 ev->write = NXT_EVENT_INACTIVE; 490 491 if (ev->read <= NXT_EVENT_DISABLED) { 492 ev->write = NXT_EVENT_INACTIVE; 493 op = EPOLL_CTL_DEL; 494 events = 0; 495 496 } else { 497 op = EPOLL_CTL_MOD; 498 events = EPOLLIN | engine->u.epoll.mode; 499 } 500 501 nxt_epoll_change(engine, ev, op, events); 502} 503 504 505static void 506nxt_epoll_block_read(nxt_event_engine_t *engine, nxt_fd_event_t *ev) 507{ 508 if (ev->read != NXT_EVENT_INACTIVE) { 509 ev->read = NXT_EVENT_BLOCKED; 510 } 511} 512 513 514static void 515nxt_epoll_block_write(nxt_event_engine_t *engine, nxt_fd_event_t *ev) 516{ 517 if (ev->write != NXT_EVENT_INACTIVE) { 518 ev->write = NXT_EVENT_BLOCKED; 519 } 520} 521 522 523/* 524 * NXT_EVENT_DISABLED state is used to track whether EPOLLONESHOT 525 * event should be added or modified, epoll_ctl(2): 526 * 527 * EPOLLONESHOT (since Linux 2.6.2) 528 * Sets the one-shot behavior for the associated file descriptor. 529 * This means that after an event is pulled out with epoll_wait(2) 530 * the associated file descriptor is internally disabled and no 531 * other events will be reported by the epoll interface. The user 532 * must call epoll_ctl() with EPOLL_CTL_MOD to rearm the file 533 * descriptor with a new event mask. 534 */ 535 536static void 537nxt_epoll_oneshot_read(nxt_event_engine_t *engine, nxt_fd_event_t *ev) 538{ 539 int op; 540 541 op = (ev->read == NXT_EVENT_INACTIVE && ev->write == NXT_EVENT_INACTIVE) ? 542 EPOLL_CTL_ADD : EPOLL_CTL_MOD; 543 544 ev->read = NXT_EVENT_ONESHOT; 545 ev->write = NXT_EVENT_INACTIVE; 546 547 nxt_epoll_change(engine, ev, op, EPOLLIN | EPOLLONESHOT); 548} 549 550 551static void 552nxt_epoll_oneshot_write(nxt_event_engine_t *engine, nxt_fd_event_t *ev) 553{ 554 int op; 555 556 op = (ev->read == NXT_EVENT_INACTIVE && ev->write == NXT_EVENT_INACTIVE) ? 557 EPOLL_CTL_ADD : EPOLL_CTL_MOD; 558 559 ev->read = NXT_EVENT_INACTIVE; 560 ev->write = NXT_EVENT_ONESHOT; 561 562 nxt_epoll_change(engine, ev, op, EPOLLOUT | EPOLLONESHOT); 563} 564 565 566static void 567nxt_epoll_enable_accept(nxt_event_engine_t *engine, nxt_fd_event_t *ev) 568{ 569 uint32_t events; 570 571 ev->read = NXT_EVENT_ACTIVE; 572 573 events = EPOLLIN; 574 575#ifdef EPOLLEXCLUSIVE 576 events |= EPOLLEXCLUSIVE; 577#endif 578 579 nxt_epoll_change(engine, ev, EPOLL_CTL_ADD, events); 580} 581 582 583/* 584 * epoll changes are batched to improve instruction and data cache 585 * locality of several epoll_ctl() calls followed by epoll_wait() call. 586 */ 587 588static void 589nxt_epoll_change(nxt_event_engine_t *engine, nxt_fd_event_t *ev, int op, 590 uint32_t events) 591{ 592 nxt_epoll_change_t *change; 593 594 nxt_debug(ev->task, "epoll %d set event: fd:%d op:%d ev:%XD", 595 engine->u.epoll.fd, ev->fd, op, events); 596 597 if (engine->u.epoll.nchanges >= engine->u.epoll.mchanges) { 598 (void) nxt_epoll_commit_changes(engine); 599 } 600 601 ev->changing = 1; 602 603 change = &engine->u.epoll.changes[engine->u.epoll.nchanges++]; 604 change->op = op; 605 change->event.events = events; 606 change->event.data.ptr = ev; 607} 608 609 610static nxt_int_t 611nxt_epoll_commit_changes(nxt_event_engine_t *engine) 612{ 613 int ret; 614 nxt_int_t retval; 615 nxt_fd_event_t *ev; 616 nxt_epoll_change_t *change, *end; 617 618 nxt_debug(&engine->task, "epoll %d changes:%ui", 619 engine->u.epoll.fd, engine->u.epoll.nchanges); 620 621 retval = NXT_OK; 622 change = engine->u.epoll.changes; 623 end = change + engine->u.epoll.nchanges; 624 625 do { 626 ev = change->event.data.ptr; 627 ev->changing = 0; 628 629 nxt_debug(ev->task, "epoll_ctl(%d): fd:%d op:%d ev:%XD", 630 engine->u.epoll.fd, ev->fd, change->op, 631 change->event.events); 632 633 ret = epoll_ctl(engine->u.epoll.fd, change->op, ev->fd, &change->event); 634 635 if (nxt_slow_path(ret != 0)) { 636 nxt_alert(ev->task, "epoll_ctl(%d, %d, %d) failed %E", 637 engine->u.epoll.fd, change->op, ev->fd, nxt_errno); 638 639 nxt_work_queue_add(&engine->fast_work_queue, 640 nxt_epoll_error_handler, ev->task, ev, ev->data); 641 642 retval = NXT_ERROR; 643 } 644 645 change++; 646 647 } while (change < end); 648 649 engine->u.epoll.nchanges = 0; 650 651 return retval; 652} 653 654 655static void 656nxt_epoll_error_handler(nxt_task_t *task, void *obj, void *data) 657{ 658 nxt_fd_event_t *ev; 659 660 ev = obj; 661 662 ev->read = NXT_EVENT_INACTIVE; 663 ev->write = NXT_EVENT_INACTIVE; 664 665 ev->error_handler(ev->task, ev, data); 666} 667 668 669#if (NXT_HAVE_SIGNALFD) 670 671static nxt_int_t 672nxt_epoll_add_signal(nxt_event_engine_t *engine) 673{ 674 int fd; 675 struct epoll_event ee; 676 677 if (sigprocmask(SIG_BLOCK, &engine->signals->sigmask, NULL) != 0) { 678 nxt_alert(&engine->task, "sigprocmask(SIG_BLOCK) failed %E", nxt_errno); 679 return NXT_ERROR; 680 } 681 682 /* 683 * Glibc signalfd() wrapper always has the flags argument. Glibc 2.7 684 * and 2.8 signalfd() wrappers call the original signalfd() syscall 685 * without the flags argument. Glibc 2.9+ signalfd() wrapper at first 686 * tries to call signalfd4() syscall and if it fails then calls the 687 * original signalfd() syscall. For this reason the non-blocking mode 688 * is set separately. 689 */ 690 691 fd = signalfd(-1, &engine->signals->sigmask, 0); 692 693 if (fd == -1) { 694 nxt_alert(&engine->task, "signalfd(%d) failed %E", 695 engine->u.epoll.signalfd.fd, nxt_errno); 696 return NXT_ERROR; 697 } 698 699 engine->u.epoll.signalfd.fd = fd; 700 701 if (nxt_fd_nonblocking(&engine->task, fd) != NXT_OK) { 702 return NXT_ERROR; 703 } 704 705 nxt_debug(&engine->task, "signalfd(): %d", fd); 706 707 engine->u.epoll.signalfd.data = engine->signals->handler; 708 engine->u.epoll.signalfd.read_work_queue = &engine->fast_work_queue; 709 engine->u.epoll.signalfd.read_handler = nxt_epoll_signalfd_handler; 710 engine->u.epoll.signalfd.log = engine->task.log; 711 engine->u.epoll.signalfd.task = &engine->task; 712 713 ee.events = EPOLLIN; 714 ee.data.ptr = &engine->u.epoll.signalfd; 715 716 if (epoll_ctl(engine->u.epoll.fd, EPOLL_CTL_ADD, fd, &ee) != 0) { 717 nxt_alert(&engine->task, "epoll_ctl(%d, %d, %d) failed %E", 718 engine->u.epoll.fd, EPOLL_CTL_ADD, fd, nxt_errno); 719 720 return NXT_ERROR; 721 } 722 723 return NXT_OK; 724} 725 726 727static void 728nxt_epoll_signalfd_handler(nxt_task_t *task, void *obj, void *data) 729{ 730 int n; 731 nxt_fd_event_t *ev; 732 nxt_work_handler_t handler; 733 struct signalfd_siginfo sfd; 734 735 ev = obj; 736 handler = data; 737 738 nxt_debug(task, "signalfd handler"); 739 740 n = read(ev->fd, &sfd, sizeof(struct signalfd_siginfo)); 741 742 nxt_debug(task, "read signalfd(%d): %d", ev->fd, n); 743 744 if (n != sizeof(struct signalfd_siginfo)) { 745 nxt_alert(task, "read signalfd(%d) failed %E", ev->fd, nxt_errno); 746 return; 747 } 748 749 nxt_debug(task, "signalfd(%d) signo:%d", ev->fd, sfd.ssi_signo); 750 751 handler(task, (void *) (uintptr_t) sfd.ssi_signo, NULL); 752} 753 754#endif 755 756 757#if (NXT_HAVE_EVENTFD) 758 759static nxt_int_t 760nxt_epoll_enable_post(nxt_event_engine_t *engine, nxt_work_handler_t handler) 761{ 762 int ret; 763 struct epoll_event ee; 764 765 engine->u.epoll.post_handler = handler; 766 767 /* 768 * Glibc eventfd() wrapper always has the flags argument. Glibc 2.7 769 * and 2.8 eventfd() wrappers call the original eventfd() syscall 770 * without the flags argument. Glibc 2.9+ eventfd() wrapper at first 771 * tries to call eventfd2() syscall and if it fails then calls the 772 * original eventfd() syscall. For this reason the non-blocking mode 773 * is set separately. 774 */ 775 776 engine->u.epoll.eventfd.fd = eventfd(0, 0); 777 778 if (engine->u.epoll.eventfd.fd == -1) { 779 nxt_alert(&engine->task, "eventfd() failed %E", nxt_errno); 780 return NXT_ERROR; 781 } 782 783 ret = nxt_fd_nonblocking(&engine->task, engine->u.epoll.eventfd.fd); 784 if (nxt_slow_path(ret != NXT_OK)) { 785 return NXT_ERROR; 786 } 787 788 nxt_debug(&engine->task, "eventfd(): %d", engine->u.epoll.eventfd.fd); 789 790 engine->u.epoll.eventfd.read_work_queue = &engine->fast_work_queue; 791 engine->u.epoll.eventfd.read_handler = nxt_epoll_eventfd_handler; 792 engine->u.epoll.eventfd.data = engine; 793 engine->u.epoll.eventfd.log = engine->task.log; 794 engine->u.epoll.eventfd.task = &engine->task; 795 796 ee.events = EPOLLIN | EPOLLET; 797 ee.data.ptr = &engine->u.epoll.eventfd; 798 799 ret = epoll_ctl(engine->u.epoll.fd, EPOLL_CTL_ADD, 800 engine->u.epoll.eventfd.fd, &ee); 801 802 if (nxt_fast_path(ret == 0)) { 803 return NXT_OK; 804 } 805 806 nxt_alert(&engine->task, "epoll_ctl(%d, %d, %d) failed %E", 807 engine->u.epoll.fd, EPOLL_CTL_ADD, engine->u.epoll.eventfd.fd, 808 nxt_errno); 809 810 return NXT_ERROR; 811} 812 813 814static void 815nxt_epoll_eventfd_handler(nxt_task_t *task, void *obj, void *data) 816{ 817 int n; 818 uint64_t events; 819 nxt_event_engine_t *engine; 820 821 engine = data; 822 823 nxt_debug(task, "eventfd handler, times:%ui", engine->u.epoll.neventfd); 824 825 /* 826 * The maximum value after write() to a eventfd() descriptor will
| 1 2/* 3 * Copyright (C) Igor Sysoev 4 * Copyright (C) NGINX, Inc. 5 */ 6 7#include <nxt_main.h> 8 9 10/* 11 * The first epoll version has been introduced in Linux 2.5.44. The 12 * interface was changed several times since then and the final version 13 * of epoll_create(), epoll_ctl(), epoll_wait(), and EPOLLET mode has 14 * been introduced in Linux 2.6.0 and is supported since glibc 2.3.2. 15 * 16 * EPOLLET mode did not work reliable in early implementaions and in 17 * Linux 2.4 backport. 18 * 19 * EPOLLONESHOT Linux 2.6.2, glibc 2.3. 20 * EPOLLRDHUP Linux 2.6.17, glibc 2.8. 21 * epoll_pwait() Linux 2.6.19, glibc 2.6. 22 * signalfd() Linux 2.6.22, glibc 2.7. 23 * eventfd() Linux 2.6.22, glibc 2.7. 24 * timerfd_create() Linux 2.6.25, glibc 2.8. 25 * epoll_create1() Linux 2.6.27, glibc 2.9. 26 * signalfd4() Linux 2.6.27, glibc 2.9. 27 * eventfd2() Linux 2.6.27, glibc 2.9. 28 * accept4() Linux 2.6.28, glibc 2.10. 29 * eventfd2(EFD_SEMAPHORE) Linux 2.6.30, glibc 2.10. 30 * EPOLLEXCLUSIVE Linux 4.5, glibc 2.24. 31 */ 32 33 34#if (NXT_HAVE_EPOLL_EDGE) 35static nxt_int_t nxt_epoll_edge_create(nxt_event_engine_t *engine, 36 nxt_uint_t mchanges, nxt_uint_t mevents); 37#endif 38static nxt_int_t nxt_epoll_level_create(nxt_event_engine_t *engine, 39 nxt_uint_t mchanges, nxt_uint_t mevents); 40static nxt_int_t nxt_epoll_create(nxt_event_engine_t *engine, 41 nxt_uint_t mchanges, nxt_uint_t mevents, nxt_conn_io_t *io, uint32_t mode); 42static void nxt_epoll_test_accept4(nxt_event_engine_t *engine, 43 nxt_conn_io_t *io); 44static void nxt_epoll_free(nxt_event_engine_t *engine); 45static void nxt_epoll_enable(nxt_event_engine_t *engine, nxt_fd_event_t *ev); 46static void nxt_epoll_disable(nxt_event_engine_t *engine, nxt_fd_event_t *ev); 47static void nxt_epoll_delete(nxt_event_engine_t *engine, nxt_fd_event_t *ev); 48static nxt_bool_t nxt_epoll_close(nxt_event_engine_t *engine, 49 nxt_fd_event_t *ev); 50static void nxt_epoll_enable_read(nxt_event_engine_t *engine, 51 nxt_fd_event_t *ev); 52static void nxt_epoll_enable_write(nxt_event_engine_t *engine, 53 nxt_fd_event_t *ev); 54static void nxt_epoll_disable_read(nxt_event_engine_t *engine, 55 nxt_fd_event_t *ev); 56static void nxt_epoll_disable_write(nxt_event_engine_t *engine, 57 nxt_fd_event_t *ev); 58static void nxt_epoll_block_read(nxt_event_engine_t *engine, 59 nxt_fd_event_t *ev); 60static void nxt_epoll_block_write(nxt_event_engine_t *engine, 61 nxt_fd_event_t *ev); 62static void nxt_epoll_oneshot_read(nxt_event_engine_t *engine, 63 nxt_fd_event_t *ev); 64static void nxt_epoll_oneshot_write(nxt_event_engine_t *engine, 65 nxt_fd_event_t *ev); 66static void nxt_epoll_enable_accept(nxt_event_engine_t *engine, 67 nxt_fd_event_t *ev); 68static void nxt_epoll_change(nxt_event_engine_t *engine, nxt_fd_event_t *ev, 69 int op, uint32_t events); 70static nxt_int_t nxt_epoll_commit_changes(nxt_event_engine_t *engine); 71static void nxt_epoll_error_handler(nxt_task_t *task, void *obj, void *data); 72#if (NXT_HAVE_SIGNALFD) 73static nxt_int_t nxt_epoll_add_signal(nxt_event_engine_t *engine); 74static void nxt_epoll_signalfd_handler(nxt_task_t *task, void *obj, void *data); 75#endif 76#if (NXT_HAVE_EVENTFD) 77static nxt_int_t nxt_epoll_enable_post(nxt_event_engine_t *engine, 78 nxt_work_handler_t handler); 79static void nxt_epoll_eventfd_handler(nxt_task_t *task, void *obj, void *data); 80static void nxt_epoll_signal(nxt_event_engine_t *engine, nxt_uint_t signo); 81#endif 82static void nxt_epoll_poll(nxt_event_engine_t *engine, nxt_msec_t timeout); 83 84#if (NXT_HAVE_ACCEPT4) 85static void nxt_epoll_conn_io_accept4(nxt_task_t *task, void *obj, 86 void *data); 87#endif 88 89 90#if (NXT_HAVE_EPOLL_EDGE) 91 92static void nxt_epoll_edge_conn_io_connect(nxt_task_t *task, void *obj, 93 void *data); 94static void nxt_epoll_edge_conn_connected(nxt_task_t *task, void *obj, 95 void *data); 96static ssize_t nxt_epoll_edge_conn_io_recvbuf(nxt_conn_t *c, nxt_buf_t *b); 97 98 99static nxt_conn_io_t nxt_epoll_edge_conn_io = { 100 nxt_epoll_edge_conn_io_connect, 101 nxt_conn_io_accept, 102 103 nxt_conn_io_read, 104 nxt_epoll_edge_conn_io_recvbuf, 105 nxt_conn_io_recv, 106 107 nxt_conn_io_write, 108 nxt_event_conn_io_write_chunk, 109 110#if (NXT_HAVE_LINUX_SENDFILE) 111 nxt_linux_event_conn_io_sendfile, 112#else 113 nxt_event_conn_io_sendbuf, 114#endif 115 116 nxt_event_conn_io_writev, 117 nxt_event_conn_io_send, 118 119 nxt_conn_io_shutdown, 120}; 121 122 123const nxt_event_interface_t nxt_epoll_edge_engine = { 124 "epoll_edge", 125 nxt_epoll_edge_create, 126 nxt_epoll_free, 127 nxt_epoll_enable, 128 nxt_epoll_disable, 129 nxt_epoll_delete, 130 nxt_epoll_close, 131 nxt_epoll_enable_read, 132 nxt_epoll_enable_write, 133 nxt_epoll_disable_read, 134 nxt_epoll_disable_write, 135 nxt_epoll_block_read, 136 nxt_epoll_block_write, 137 nxt_epoll_oneshot_read, 138 nxt_epoll_oneshot_write, 139 nxt_epoll_enable_accept, 140 NULL, 141 NULL, 142#if (NXT_HAVE_EVENTFD) 143 nxt_epoll_enable_post, 144 nxt_epoll_signal, 145#else 146 NULL, 147 NULL, 148#endif 149 nxt_epoll_poll, 150 151 &nxt_epoll_edge_conn_io, 152 153#if (NXT_HAVE_INOTIFY) 154 NXT_FILE_EVENTS, 155#else 156 NXT_NO_FILE_EVENTS, 157#endif 158 159#if (NXT_HAVE_SIGNALFD) 160 NXT_SIGNAL_EVENTS, 161#else 162 NXT_NO_SIGNAL_EVENTS, 163#endif 164}; 165 166#endif 167 168 169const nxt_event_interface_t nxt_epoll_level_engine = { 170 "epoll_level", 171 nxt_epoll_level_create, 172 nxt_epoll_free, 173 nxt_epoll_enable, 174 nxt_epoll_disable, 175 nxt_epoll_delete, 176 nxt_epoll_close, 177 nxt_epoll_enable_read, 178 nxt_epoll_enable_write, 179 nxt_epoll_disable_read, 180 nxt_epoll_disable_write, 181 nxt_epoll_block_read, 182 nxt_epoll_block_write, 183 nxt_epoll_oneshot_read, 184 nxt_epoll_oneshot_write, 185 nxt_epoll_enable_accept, 186 NULL, 187 NULL, 188#if (NXT_HAVE_EVENTFD) 189 nxt_epoll_enable_post, 190 nxt_epoll_signal, 191#else 192 NULL, 193 NULL, 194#endif 195 nxt_epoll_poll, 196 197 &nxt_unix_conn_io, 198 199#if (NXT_HAVE_INOTIFY) 200 NXT_FILE_EVENTS, 201#else 202 NXT_NO_FILE_EVENTS, 203#endif 204 205#if (NXT_HAVE_SIGNALFD) 206 NXT_SIGNAL_EVENTS, 207#else 208 NXT_NO_SIGNAL_EVENTS, 209#endif 210}; 211 212 213#if (NXT_HAVE_EPOLL_EDGE) 214 215static nxt_int_t 216nxt_epoll_edge_create(nxt_event_engine_t *engine, nxt_uint_t mchanges, 217 nxt_uint_t mevents) 218{ 219 return nxt_epoll_create(engine, mchanges, mevents, &nxt_epoll_edge_conn_io, 220 EPOLLET | EPOLLRDHUP); 221} 222 223#endif 224 225 226static nxt_int_t 227nxt_epoll_level_create(nxt_event_engine_t *engine, nxt_uint_t mchanges, 228 nxt_uint_t mevents) 229{ 230 return nxt_epoll_create(engine, mchanges, mevents, 231 &nxt_unix_conn_io, 0); 232} 233 234 235static nxt_int_t 236nxt_epoll_create(nxt_event_engine_t *engine, nxt_uint_t mchanges, 237 nxt_uint_t mevents, nxt_conn_io_t *io, uint32_t mode) 238{ 239 engine->u.epoll.fd = -1; 240 engine->u.epoll.mode = mode; 241 engine->u.epoll.mchanges = mchanges; 242 engine->u.epoll.mevents = mevents; 243#if (NXT_HAVE_SIGNALFD) 244 engine->u.epoll.signalfd.fd = -1; 245#endif 246 247 engine->u.epoll.changes = nxt_malloc(sizeof(nxt_epoll_change_t) * mchanges); 248 if (engine->u.epoll.changes == NULL) { 249 goto fail; 250 } 251 252 engine->u.epoll.events = nxt_malloc(sizeof(struct epoll_event) * mevents); 253 if (engine->u.epoll.events == NULL) { 254 goto fail; 255 } 256 257 engine->u.epoll.fd = epoll_create(1); 258 if (engine->u.epoll.fd == -1) { 259 nxt_alert(&engine->task, "epoll_create() failed %E", nxt_errno); 260 goto fail; 261 } 262 263 nxt_debug(&engine->task, "epoll_create(): %d", engine->u.epoll.fd); 264 265 if (engine->signals != NULL) { 266 267#if (NXT_HAVE_SIGNALFD) 268 269 if (nxt_epoll_add_signal(engine) != NXT_OK) { 270 goto fail; 271 } 272 273#endif 274 275 nxt_epoll_test_accept4(engine, io); 276 } 277 278 return NXT_OK; 279 280fail: 281 282 nxt_epoll_free(engine); 283 284 return NXT_ERROR; 285} 286 287 288static void 289nxt_epoll_test_accept4(nxt_event_engine_t *engine, nxt_conn_io_t *io) 290{ 291 static nxt_work_handler_t handler; 292 293 if (handler == NULL) { 294 295 handler = io->accept; 296 297#if (NXT_HAVE_ACCEPT4) 298 299 (void) accept4(-1, NULL, NULL, SOCK_NONBLOCK); 300 301 if (nxt_errno != NXT_ENOSYS) { 302 handler = nxt_epoll_conn_io_accept4; 303 304 } else { 305 nxt_log(&engine->task, NXT_LOG_INFO, "accept4() failed %E", 306 NXT_ENOSYS); 307 } 308 309#endif 310 } 311 312 io->accept = handler; 313} 314 315 316static void 317nxt_epoll_free(nxt_event_engine_t *engine) 318{ 319 int fd; 320 321 nxt_debug(&engine->task, "epoll %d free", engine->u.epoll.fd); 322 323#if (NXT_HAVE_SIGNALFD) 324 325 fd = engine->u.epoll.signalfd.fd; 326 327 if (fd != -1 && close(fd) != 0) { 328 nxt_alert(&engine->task, "signalfd close(%d) failed %E", fd, nxt_errno); 329 } 330 331#endif 332 333#if (NXT_HAVE_EVENTFD) 334 335 fd = engine->u.epoll.eventfd.fd; 336 337 if (fd != -1 && close(fd) != 0) { 338 nxt_alert(&engine->task, "eventfd close(%d) failed %E", fd, nxt_errno); 339 } 340 341#endif 342 343 fd = engine->u.epoll.fd; 344 345 if (fd != -1 && close(fd) != 0) { 346 nxt_alert(&engine->task, "epoll close(%d) failed %E", fd, nxt_errno); 347 } 348 349 nxt_free(engine->u.epoll.events); 350 nxt_free(engine->u.epoll.changes); 351 352 nxt_memzero(&engine->u.epoll, sizeof(nxt_epoll_engine_t)); 353} 354 355 356static void 357nxt_epoll_enable(nxt_event_engine_t *engine, nxt_fd_event_t *ev) 358{ 359 ev->read = NXT_EVENT_ACTIVE; 360 ev->write = NXT_EVENT_ACTIVE; 361 362 nxt_epoll_change(engine, ev, EPOLL_CTL_ADD, 363 EPOLLIN | EPOLLOUT | engine->u.epoll.mode); 364} 365 366 367static void 368nxt_epoll_disable(nxt_event_engine_t *engine, nxt_fd_event_t *ev) 369{ 370 if (ev->read > NXT_EVENT_DISABLED || ev->write > NXT_EVENT_DISABLED) { 371 372 ev->read = NXT_EVENT_INACTIVE; 373 ev->write = NXT_EVENT_INACTIVE; 374 375 nxt_epoll_change(engine, ev, EPOLL_CTL_DEL, 0); 376 } 377} 378 379 380static void 381nxt_epoll_delete(nxt_event_engine_t *engine, nxt_fd_event_t *ev) 382{ 383 if (ev->read != NXT_EVENT_INACTIVE || ev->write != NXT_EVENT_INACTIVE) { 384 385 ev->read = NXT_EVENT_INACTIVE; 386 ev->write = NXT_EVENT_INACTIVE; 387 388 nxt_epoll_change(engine, ev, EPOLL_CTL_DEL, 0); 389 } 390} 391 392 393/* 394 * Although calling close() on a file descriptor will remove any epoll 395 * events that reference the descriptor, in this case the close() acquires 396 * the kernel global "epmutex" while epoll_ctl(EPOLL_CTL_DEL) does not 397 * acquire the "epmutex" since Linux 3.13 if the file descriptor presents 398 * only in one epoll set. Thus removing events explicitly before closing 399 * eliminates possible lock contention. 400 */ 401 402static nxt_bool_t 403nxt_epoll_close(nxt_event_engine_t *engine, nxt_fd_event_t *ev) 404{ 405 nxt_epoll_delete(engine, ev); 406 407 return ev->changing; 408} 409 410 411static void 412nxt_epoll_enable_read(nxt_event_engine_t *engine, nxt_fd_event_t *ev) 413{ 414 int op; 415 uint32_t events; 416 417 if (ev->read != NXT_EVENT_BLOCKED) { 418 419 op = EPOLL_CTL_MOD; 420 events = EPOLLIN | engine->u.epoll.mode; 421 422 if (ev->read == NXT_EVENT_INACTIVE && ev->write == NXT_EVENT_INACTIVE) { 423 op = EPOLL_CTL_ADD; 424 425 } else if (ev->write >= NXT_EVENT_BLOCKED) { 426 events |= EPOLLOUT; 427 } 428 429 nxt_epoll_change(engine, ev, op, events); 430 } 431 432 ev->read = NXT_EVENT_ACTIVE; 433} 434 435 436static void 437nxt_epoll_enable_write(nxt_event_engine_t *engine, nxt_fd_event_t *ev) 438{ 439 int op; 440 uint32_t events; 441 442 if (ev->write != NXT_EVENT_BLOCKED) { 443 444 op = EPOLL_CTL_MOD; 445 events = EPOLLOUT | engine->u.epoll.mode; 446 447 if (ev->read == NXT_EVENT_INACTIVE && ev->write == NXT_EVENT_INACTIVE) { 448 op = EPOLL_CTL_ADD; 449 450 } else if (ev->read >= NXT_EVENT_BLOCKED) { 451 events |= EPOLLIN; 452 } 453 454 nxt_epoll_change(engine, ev, op, events); 455 } 456 457 ev->write = NXT_EVENT_ACTIVE; 458} 459 460 461static void 462nxt_epoll_disable_read(nxt_event_engine_t *engine, nxt_fd_event_t *ev) 463{ 464 int op; 465 uint32_t events; 466 467 ev->read = NXT_EVENT_INACTIVE; 468 469 if (ev->write <= NXT_EVENT_DISABLED) { 470 ev->write = NXT_EVENT_INACTIVE; 471 op = EPOLL_CTL_DEL; 472 events = 0; 473 474 } else { 475 op = EPOLL_CTL_MOD; 476 events = EPOLLOUT | engine->u.epoll.mode; 477 } 478 479 nxt_epoll_change(engine, ev, op, events); 480} 481 482 483static void 484nxt_epoll_disable_write(nxt_event_engine_t *engine, nxt_fd_event_t *ev) 485{ 486 int op; 487 uint32_t events; 488 489 ev->write = NXT_EVENT_INACTIVE; 490 491 if (ev->read <= NXT_EVENT_DISABLED) { 492 ev->write = NXT_EVENT_INACTIVE; 493 op = EPOLL_CTL_DEL; 494 events = 0; 495 496 } else { 497 op = EPOLL_CTL_MOD; 498 events = EPOLLIN | engine->u.epoll.mode; 499 } 500 501 nxt_epoll_change(engine, ev, op, events); 502} 503 504 505static void 506nxt_epoll_block_read(nxt_event_engine_t *engine, nxt_fd_event_t *ev) 507{ 508 if (ev->read != NXT_EVENT_INACTIVE) { 509 ev->read = NXT_EVENT_BLOCKED; 510 } 511} 512 513 514static void 515nxt_epoll_block_write(nxt_event_engine_t *engine, nxt_fd_event_t *ev) 516{ 517 if (ev->write != NXT_EVENT_INACTIVE) { 518 ev->write = NXT_EVENT_BLOCKED; 519 } 520} 521 522 523/* 524 * NXT_EVENT_DISABLED state is used to track whether EPOLLONESHOT 525 * event should be added or modified, epoll_ctl(2): 526 * 527 * EPOLLONESHOT (since Linux 2.6.2) 528 * Sets the one-shot behavior for the associated file descriptor. 529 * This means that after an event is pulled out with epoll_wait(2) 530 * the associated file descriptor is internally disabled and no 531 * other events will be reported by the epoll interface. The user 532 * must call epoll_ctl() with EPOLL_CTL_MOD to rearm the file 533 * descriptor with a new event mask. 534 */ 535 536static void 537nxt_epoll_oneshot_read(nxt_event_engine_t *engine, nxt_fd_event_t *ev) 538{ 539 int op; 540 541 op = (ev->read == NXT_EVENT_INACTIVE && ev->write == NXT_EVENT_INACTIVE) ? 542 EPOLL_CTL_ADD : EPOLL_CTL_MOD; 543 544 ev->read = NXT_EVENT_ONESHOT; 545 ev->write = NXT_EVENT_INACTIVE; 546 547 nxt_epoll_change(engine, ev, op, EPOLLIN | EPOLLONESHOT); 548} 549 550 551static void 552nxt_epoll_oneshot_write(nxt_event_engine_t *engine, nxt_fd_event_t *ev) 553{ 554 int op; 555 556 op = (ev->read == NXT_EVENT_INACTIVE && ev->write == NXT_EVENT_INACTIVE) ? 557 EPOLL_CTL_ADD : EPOLL_CTL_MOD; 558 559 ev->read = NXT_EVENT_INACTIVE; 560 ev->write = NXT_EVENT_ONESHOT; 561 562 nxt_epoll_change(engine, ev, op, EPOLLOUT | EPOLLONESHOT); 563} 564 565 566static void 567nxt_epoll_enable_accept(nxt_event_engine_t *engine, nxt_fd_event_t *ev) 568{ 569 uint32_t events; 570 571 ev->read = NXT_EVENT_ACTIVE; 572 573 events = EPOLLIN; 574 575#ifdef EPOLLEXCLUSIVE 576 events |= EPOLLEXCLUSIVE; 577#endif 578 579 nxt_epoll_change(engine, ev, EPOLL_CTL_ADD, events); 580} 581 582 583/* 584 * epoll changes are batched to improve instruction and data cache 585 * locality of several epoll_ctl() calls followed by epoll_wait() call. 586 */ 587 588static void 589nxt_epoll_change(nxt_event_engine_t *engine, nxt_fd_event_t *ev, int op, 590 uint32_t events) 591{ 592 nxt_epoll_change_t *change; 593 594 nxt_debug(ev->task, "epoll %d set event: fd:%d op:%d ev:%XD", 595 engine->u.epoll.fd, ev->fd, op, events); 596 597 if (engine->u.epoll.nchanges >= engine->u.epoll.mchanges) { 598 (void) nxt_epoll_commit_changes(engine); 599 } 600 601 ev->changing = 1; 602 603 change = &engine->u.epoll.changes[engine->u.epoll.nchanges++]; 604 change->op = op; 605 change->event.events = events; 606 change->event.data.ptr = ev; 607} 608 609 610static nxt_int_t 611nxt_epoll_commit_changes(nxt_event_engine_t *engine) 612{ 613 int ret; 614 nxt_int_t retval; 615 nxt_fd_event_t *ev; 616 nxt_epoll_change_t *change, *end; 617 618 nxt_debug(&engine->task, "epoll %d changes:%ui", 619 engine->u.epoll.fd, engine->u.epoll.nchanges); 620 621 retval = NXT_OK; 622 change = engine->u.epoll.changes; 623 end = change + engine->u.epoll.nchanges; 624 625 do { 626 ev = change->event.data.ptr; 627 ev->changing = 0; 628 629 nxt_debug(ev->task, "epoll_ctl(%d): fd:%d op:%d ev:%XD", 630 engine->u.epoll.fd, ev->fd, change->op, 631 change->event.events); 632 633 ret = epoll_ctl(engine->u.epoll.fd, change->op, ev->fd, &change->event); 634 635 if (nxt_slow_path(ret != 0)) { 636 nxt_alert(ev->task, "epoll_ctl(%d, %d, %d) failed %E", 637 engine->u.epoll.fd, change->op, ev->fd, nxt_errno); 638 639 nxt_work_queue_add(&engine->fast_work_queue, 640 nxt_epoll_error_handler, ev->task, ev, ev->data); 641 642 retval = NXT_ERROR; 643 } 644 645 change++; 646 647 } while (change < end); 648 649 engine->u.epoll.nchanges = 0; 650 651 return retval; 652} 653 654 655static void 656nxt_epoll_error_handler(nxt_task_t *task, void *obj, void *data) 657{ 658 nxt_fd_event_t *ev; 659 660 ev = obj; 661 662 ev->read = NXT_EVENT_INACTIVE; 663 ev->write = NXT_EVENT_INACTIVE; 664 665 ev->error_handler(ev->task, ev, data); 666} 667 668 669#if (NXT_HAVE_SIGNALFD) 670 671static nxt_int_t 672nxt_epoll_add_signal(nxt_event_engine_t *engine) 673{ 674 int fd; 675 struct epoll_event ee; 676 677 if (sigprocmask(SIG_BLOCK, &engine->signals->sigmask, NULL) != 0) { 678 nxt_alert(&engine->task, "sigprocmask(SIG_BLOCK) failed %E", nxt_errno); 679 return NXT_ERROR; 680 } 681 682 /* 683 * Glibc signalfd() wrapper always has the flags argument. Glibc 2.7 684 * and 2.8 signalfd() wrappers call the original signalfd() syscall 685 * without the flags argument. Glibc 2.9+ signalfd() wrapper at first 686 * tries to call signalfd4() syscall and if it fails then calls the 687 * original signalfd() syscall. For this reason the non-blocking mode 688 * is set separately. 689 */ 690 691 fd = signalfd(-1, &engine->signals->sigmask, 0); 692 693 if (fd == -1) { 694 nxt_alert(&engine->task, "signalfd(%d) failed %E", 695 engine->u.epoll.signalfd.fd, nxt_errno); 696 return NXT_ERROR; 697 } 698 699 engine->u.epoll.signalfd.fd = fd; 700 701 if (nxt_fd_nonblocking(&engine->task, fd) != NXT_OK) { 702 return NXT_ERROR; 703 } 704 705 nxt_debug(&engine->task, "signalfd(): %d", fd); 706 707 engine->u.epoll.signalfd.data = engine->signals->handler; 708 engine->u.epoll.signalfd.read_work_queue = &engine->fast_work_queue; 709 engine->u.epoll.signalfd.read_handler = nxt_epoll_signalfd_handler; 710 engine->u.epoll.signalfd.log = engine->task.log; 711 engine->u.epoll.signalfd.task = &engine->task; 712 713 ee.events = EPOLLIN; 714 ee.data.ptr = &engine->u.epoll.signalfd; 715 716 if (epoll_ctl(engine->u.epoll.fd, EPOLL_CTL_ADD, fd, &ee) != 0) { 717 nxt_alert(&engine->task, "epoll_ctl(%d, %d, %d) failed %E", 718 engine->u.epoll.fd, EPOLL_CTL_ADD, fd, nxt_errno); 719 720 return NXT_ERROR; 721 } 722 723 return NXT_OK; 724} 725 726 727static void 728nxt_epoll_signalfd_handler(nxt_task_t *task, void *obj, void *data) 729{ 730 int n; 731 nxt_fd_event_t *ev; 732 nxt_work_handler_t handler; 733 struct signalfd_siginfo sfd; 734 735 ev = obj; 736 handler = data; 737 738 nxt_debug(task, "signalfd handler"); 739 740 n = read(ev->fd, &sfd, sizeof(struct signalfd_siginfo)); 741 742 nxt_debug(task, "read signalfd(%d): %d", ev->fd, n); 743 744 if (n != sizeof(struct signalfd_siginfo)) { 745 nxt_alert(task, "read signalfd(%d) failed %E", ev->fd, nxt_errno); 746 return; 747 } 748 749 nxt_debug(task, "signalfd(%d) signo:%d", ev->fd, sfd.ssi_signo); 750 751 handler(task, (void *) (uintptr_t) sfd.ssi_signo, NULL); 752} 753 754#endif 755 756 757#if (NXT_HAVE_EVENTFD) 758 759static nxt_int_t 760nxt_epoll_enable_post(nxt_event_engine_t *engine, nxt_work_handler_t handler) 761{ 762 int ret; 763 struct epoll_event ee; 764 765 engine->u.epoll.post_handler = handler; 766 767 /* 768 * Glibc eventfd() wrapper always has the flags argument. Glibc 2.7 769 * and 2.8 eventfd() wrappers call the original eventfd() syscall 770 * without the flags argument. Glibc 2.9+ eventfd() wrapper at first 771 * tries to call eventfd2() syscall and if it fails then calls the 772 * original eventfd() syscall. For this reason the non-blocking mode 773 * is set separately. 774 */ 775 776 engine->u.epoll.eventfd.fd = eventfd(0, 0); 777 778 if (engine->u.epoll.eventfd.fd == -1) { 779 nxt_alert(&engine->task, "eventfd() failed %E", nxt_errno); 780 return NXT_ERROR; 781 } 782 783 ret = nxt_fd_nonblocking(&engine->task, engine->u.epoll.eventfd.fd); 784 if (nxt_slow_path(ret != NXT_OK)) { 785 return NXT_ERROR; 786 } 787 788 nxt_debug(&engine->task, "eventfd(): %d", engine->u.epoll.eventfd.fd); 789 790 engine->u.epoll.eventfd.read_work_queue = &engine->fast_work_queue; 791 engine->u.epoll.eventfd.read_handler = nxt_epoll_eventfd_handler; 792 engine->u.epoll.eventfd.data = engine; 793 engine->u.epoll.eventfd.log = engine->task.log; 794 engine->u.epoll.eventfd.task = &engine->task; 795 796 ee.events = EPOLLIN | EPOLLET; 797 ee.data.ptr = &engine->u.epoll.eventfd; 798 799 ret = epoll_ctl(engine->u.epoll.fd, EPOLL_CTL_ADD, 800 engine->u.epoll.eventfd.fd, &ee); 801 802 if (nxt_fast_path(ret == 0)) { 803 return NXT_OK; 804 } 805 806 nxt_alert(&engine->task, "epoll_ctl(%d, %d, %d) failed %E", 807 engine->u.epoll.fd, EPOLL_CTL_ADD, engine->u.epoll.eventfd.fd, 808 nxt_errno); 809 810 return NXT_ERROR; 811} 812 813 814static void 815nxt_epoll_eventfd_handler(nxt_task_t *task, void *obj, void *data) 816{ 817 int n; 818 uint64_t events; 819 nxt_event_engine_t *engine; 820 821 engine = data; 822 823 nxt_debug(task, "eventfd handler, times:%ui", engine->u.epoll.neventfd); 824 825 /* 826 * The maximum value after write() to a eventfd() descriptor will
|
835 engine->u.epoll.neventfd = 0; 836 837 n = read(engine->u.epoll.eventfd.fd, &events, sizeof(uint64_t)); 838 839 nxt_debug(task, "read(%d): %d events:%uL", 840 engine->u.epoll.eventfd.fd, n, events); 841 842 if (n != sizeof(uint64_t)) { 843 nxt_alert(task, "read eventfd(%d) failed %E", 844 engine->u.epoll.eventfd.fd, nxt_errno); 845 } 846 } 847 848 engine->u.epoll.post_handler(task, NULL, NULL); 849} 850 851 852static void 853nxt_epoll_signal(nxt_event_engine_t *engine, nxt_uint_t signo) 854{ 855 size_t ret; 856 uint64_t event; 857 858 /* 859 * eventfd() presents along with signalfd(), so the function 860 * is used only to post events and the signo argument is ignored. 861 */ 862 863 event = 1; 864 865 ret = write(engine->u.epoll.eventfd.fd, &event, sizeof(uint64_t)); 866 867 if (nxt_slow_path(ret != sizeof(uint64_t))) { 868 nxt_alert(&engine->task, "write(%d) to eventfd failed %E", 869 engine->u.epoll.eventfd.fd, nxt_errno); 870 } 871} 872 873#endif 874 875 876static void 877nxt_epoll_poll(nxt_event_engine_t *engine, nxt_msec_t timeout) 878{ 879 int nevents; 880 uint32_t events; 881 nxt_int_t i; 882 nxt_err_t err; 883 nxt_bool_t error; 884 nxt_uint_t level; 885 nxt_fd_event_t *ev; 886 struct epoll_event *event; 887 888 if (engine->u.epoll.nchanges != 0) { 889 if (nxt_epoll_commit_changes(engine) != NXT_OK) { 890 /* Error handlers have been enqueued on failure. */ 891 timeout = 0; 892 } 893 } 894 895 nxt_debug(&engine->task, "epoll_wait(%d) timeout:%M", 896 engine->u.epoll.fd, timeout); 897 898 nevents = epoll_wait(engine->u.epoll.fd, engine->u.epoll.events, 899 engine->u.epoll.mevents, timeout); 900 901 err = (nevents == -1) ? nxt_errno : 0; 902 903 nxt_thread_time_update(engine->task.thread); 904 905 nxt_debug(&engine->task, "epoll_wait(%d): %d", engine->u.epoll.fd, nevents); 906 907 if (nevents == -1) { 908 level = (err == NXT_EINTR) ? NXT_LOG_INFO : NXT_LOG_ALERT; 909 910 nxt_log(&engine->task, level, "epoll_wait(%d) failed %E", 911 engine->u.epoll.fd, err); 912 913 return; 914 } 915 916 for (i = 0; i < nevents; i++) { 917 918 event = &engine->u.epoll.events[i]; 919 events = event->events; 920 ev = event->data.ptr; 921 922 nxt_debug(ev->task, "epoll: fd:%d ev:%04XD d:%p rd:%d wr:%d", 923 ev->fd, events, ev, ev->read, ev->write); 924 925 /* 926 * On error epoll may set EPOLLERR and EPOLLHUP only without EPOLLIN or 927 * EPOLLOUT, so the "error" variable enqueues only one active handler. 928 */ 929 error = ((events & (EPOLLERR | EPOLLHUP)) != 0); 930 ev->epoll_error = error; 931 932#if (NXT_HAVE_EPOLL_EDGE) 933 934 ev->epoll_eof = ((events & EPOLLRDHUP) != 0); 935 936#endif 937 938 if ((events & EPOLLIN) || error) { 939 ev->read_ready = 1; 940 941 if (ev->read != NXT_EVENT_BLOCKED) { 942 943 if (ev->read == NXT_EVENT_ONESHOT) { 944 ev->read = NXT_EVENT_DISABLED; 945 } 946 947 error = 0; 948 949 nxt_work_queue_add(ev->read_work_queue, ev->read_handler, 950 ev->task, ev, ev->data); 951 952 } else if (engine->u.epoll.mode == 0) { 953 /* Level-triggered mode. */ 954 nxt_epoll_disable_read(engine, ev); 955 } 956 } 957 958 if ((events & EPOLLOUT) || error) { 959 ev->write_ready = 1; 960 961 if (ev->write != NXT_EVENT_BLOCKED) { 962 963 if (ev->write == NXT_EVENT_ONESHOT) { 964 ev->write = NXT_EVENT_DISABLED; 965 } 966 967 error = 0; 968 969 nxt_work_queue_add(ev->write_work_queue, ev->write_handler, 970 ev->task, ev, ev->data); 971 972 } else if (engine->u.epoll.mode == 0) { 973 /* Level-triggered mode. */ 974 nxt_epoll_disable_write(engine, ev); 975 } 976 } 977 978 if (error) { 979 ev->read_ready = 1; 980 ev->write_ready = 1; 981 } 982 } 983} 984 985 986#if (NXT_HAVE_ACCEPT4) 987 988static void 989nxt_epoll_conn_io_accept4(nxt_task_t *task, void *obj, void *data) 990{ 991 socklen_t socklen; 992 nxt_conn_t *c; 993 nxt_socket_t s; 994 struct sockaddr *sa; 995 nxt_listen_event_t *lev; 996 997 lev = obj; 998 c = lev->next; 999 1000 lev->ready--; 1001 lev->socket.read_ready = (lev->ready != 0); 1002 1003 sa = &c->remote->u.sockaddr; 1004 socklen = c->remote->socklen; 1005 /* 1006 * The returned socklen is ignored here, 1007 * see comment in nxt_conn_io_accept(). 1008 */ 1009 s = accept4(lev->socket.fd, sa, &socklen, SOCK_NONBLOCK); 1010 1011 if (s != -1) { 1012 c->socket.fd = s; 1013 1014 nxt_debug(task, "accept4(%d): %d", lev->socket.fd, s); 1015 1016 nxt_conn_accept(task, lev, c); 1017 return; 1018 } 1019 1020 nxt_conn_accept_error(task, lev, "accept4", nxt_errno); 1021} 1022 1023#endif 1024 1025 1026#if (NXT_HAVE_EPOLL_EDGE) 1027 1028/* 1029 * nxt_epoll_edge_event_conn_io_connect() eliminates the getsockopt() 1030 * syscall to test pending connect() error. Although this special 1031 * interface can work in both edge-triggered and level-triggered 1032 * modes it is enabled only for the former mode because this mode is 1033 * available in all modern Linux distributions. For the latter mode 1034 * it is required to create additional nxt_epoll_level_event_conn_io 1035 * with single non-generic connect() interface. 1036 */ 1037 1038static void 1039nxt_epoll_edge_conn_io_connect(nxt_task_t *task, void *obj, void *data) 1040{ 1041 nxt_conn_t *c; 1042 nxt_event_engine_t *engine; 1043 nxt_work_handler_t handler; 1044 const nxt_event_conn_state_t *state; 1045 1046 c = obj; 1047 1048 state = c->write_state; 1049 1050 switch (nxt_socket_connect(task, c->socket.fd, c->remote) ){ 1051 1052 case NXT_OK: 1053 c->socket.write_ready = 1; 1054 handler = state->ready_handler; 1055 break; 1056 1057 case NXT_AGAIN: 1058 c->socket.write_handler = nxt_epoll_edge_conn_connected; 1059 c->socket.error_handler = nxt_conn_connect_error; 1060 1061 engine = task->thread->engine; 1062 nxt_conn_timer(engine, c, state, &c->write_timer); 1063 1064 nxt_epoll_enable(engine, &c->socket); 1065 c->socket.read = NXT_EVENT_BLOCKED; 1066 return; 1067 1068#if 0 1069 case NXT_AGAIN: 1070 nxt_conn_timer(engine, c, state, &c->write_timer); 1071 1072 /* Fall through. */ 1073 1074 case NXT_OK: 1075 /* 1076 * Mark both read and write directions as ready and try to perform 1077 * I/O operations before receiving readiness notifications. 1078 * On unconnected socket Linux send() and recv() return EAGAIN 1079 * instead of ENOTCONN. 1080 */ 1081 c->socket.read_ready = 1; 1082 c->socket.write_ready = 1; 1083 /* 1084 * Enabling both read and write notifications on a getting 1085 * connected socket eliminates one epoll_ctl() syscall. 1086 */ 1087 c->socket.write_handler = nxt_epoll_edge_event_conn_connected; 1088 c->socket.error_handler = state->error_handler; 1089 1090 nxt_epoll_enable(engine, &c->socket); 1091 c->socket.read = NXT_EVENT_BLOCKED; 1092 1093 handler = state->ready_handler; 1094 break; 1095#endif 1096 1097 case NXT_ERROR: 1098 handler = state->error_handler; 1099 break; 1100 1101 default: /* NXT_DECLINED: connection refused. */ 1102 handler = state->close_handler; 1103 break; 1104 } 1105 1106 nxt_work_queue_add(c->write_work_queue, handler, task, c, data); 1107} 1108 1109 1110static void 1111nxt_epoll_edge_conn_connected(nxt_task_t *task, void *obj, void *data) 1112{ 1113 nxt_conn_t *c; 1114 1115 c = obj; 1116 1117 nxt_debug(task, "epoll event conn connected fd:%d", c->socket.fd); 1118 1119 if (!c->socket.epoll_error) { 1120 c->socket.write = NXT_EVENT_BLOCKED; 1121 1122 if (c->write_state->timer_autoreset) { 1123 nxt_timer_disable(task->thread->engine, &c->write_timer); 1124 } 1125 1126 nxt_work_queue_add(c->write_work_queue, c->write_state->ready_handler, 1127 task, c, data); 1128 return; 1129 } 1130 1131 nxt_conn_connect_test(task, c, data); 1132} 1133 1134 1135/* 1136 * nxt_epoll_edge_conn_io_recvbuf() is just wrapper around 1137 * standard nxt_conn_io_recvbuf() to enforce to read a pending EOF 1138 * in edge-triggered mode. 1139 */ 1140 1141static ssize_t 1142nxt_epoll_edge_conn_io_recvbuf(nxt_conn_t *c, nxt_buf_t *b) 1143{ 1144 ssize_t n; 1145 1146 n = nxt_conn_io_recvbuf(c, b); 1147 1148 if (n > 0 && c->socket.epoll_eof) { 1149 c->socket.read_ready = 1; 1150 } 1151 1152 return n; 1153} 1154 1155#endif
| 835 engine->u.epoll.neventfd = 0; 836 837 n = read(engine->u.epoll.eventfd.fd, &events, sizeof(uint64_t)); 838 839 nxt_debug(task, "read(%d): %d events:%uL", 840 engine->u.epoll.eventfd.fd, n, events); 841 842 if (n != sizeof(uint64_t)) { 843 nxt_alert(task, "read eventfd(%d) failed %E", 844 engine->u.epoll.eventfd.fd, nxt_errno); 845 } 846 } 847 848 engine->u.epoll.post_handler(task, NULL, NULL); 849} 850 851 852static void 853nxt_epoll_signal(nxt_event_engine_t *engine, nxt_uint_t signo) 854{ 855 size_t ret; 856 uint64_t event; 857 858 /* 859 * eventfd() presents along with signalfd(), so the function 860 * is used only to post events and the signo argument is ignored. 861 */ 862 863 event = 1; 864 865 ret = write(engine->u.epoll.eventfd.fd, &event, sizeof(uint64_t)); 866 867 if (nxt_slow_path(ret != sizeof(uint64_t))) { 868 nxt_alert(&engine->task, "write(%d) to eventfd failed %E", 869 engine->u.epoll.eventfd.fd, nxt_errno); 870 } 871} 872 873#endif 874 875 876static void 877nxt_epoll_poll(nxt_event_engine_t *engine, nxt_msec_t timeout) 878{ 879 int nevents; 880 uint32_t events; 881 nxt_int_t i; 882 nxt_err_t err; 883 nxt_bool_t error; 884 nxt_uint_t level; 885 nxt_fd_event_t *ev; 886 struct epoll_event *event; 887 888 if (engine->u.epoll.nchanges != 0) { 889 if (nxt_epoll_commit_changes(engine) != NXT_OK) { 890 /* Error handlers have been enqueued on failure. */ 891 timeout = 0; 892 } 893 } 894 895 nxt_debug(&engine->task, "epoll_wait(%d) timeout:%M", 896 engine->u.epoll.fd, timeout); 897 898 nevents = epoll_wait(engine->u.epoll.fd, engine->u.epoll.events, 899 engine->u.epoll.mevents, timeout); 900 901 err = (nevents == -1) ? nxt_errno : 0; 902 903 nxt_thread_time_update(engine->task.thread); 904 905 nxt_debug(&engine->task, "epoll_wait(%d): %d", engine->u.epoll.fd, nevents); 906 907 if (nevents == -1) { 908 level = (err == NXT_EINTR) ? NXT_LOG_INFO : NXT_LOG_ALERT; 909 910 nxt_log(&engine->task, level, "epoll_wait(%d) failed %E", 911 engine->u.epoll.fd, err); 912 913 return; 914 } 915 916 for (i = 0; i < nevents; i++) { 917 918 event = &engine->u.epoll.events[i]; 919 events = event->events; 920 ev = event->data.ptr; 921 922 nxt_debug(ev->task, "epoll: fd:%d ev:%04XD d:%p rd:%d wr:%d", 923 ev->fd, events, ev, ev->read, ev->write); 924 925 /* 926 * On error epoll may set EPOLLERR and EPOLLHUP only without EPOLLIN or 927 * EPOLLOUT, so the "error" variable enqueues only one active handler. 928 */ 929 error = ((events & (EPOLLERR | EPOLLHUP)) != 0); 930 ev->epoll_error = error; 931 932#if (NXT_HAVE_EPOLL_EDGE) 933 934 ev->epoll_eof = ((events & EPOLLRDHUP) != 0); 935 936#endif 937 938 if ((events & EPOLLIN) || error) { 939 ev->read_ready = 1; 940 941 if (ev->read != NXT_EVENT_BLOCKED) { 942 943 if (ev->read == NXT_EVENT_ONESHOT) { 944 ev->read = NXT_EVENT_DISABLED; 945 } 946 947 error = 0; 948 949 nxt_work_queue_add(ev->read_work_queue, ev->read_handler, 950 ev->task, ev, ev->data); 951 952 } else if (engine->u.epoll.mode == 0) { 953 /* Level-triggered mode. */ 954 nxt_epoll_disable_read(engine, ev); 955 } 956 } 957 958 if ((events & EPOLLOUT) || error) { 959 ev->write_ready = 1; 960 961 if (ev->write != NXT_EVENT_BLOCKED) { 962 963 if (ev->write == NXT_EVENT_ONESHOT) { 964 ev->write = NXT_EVENT_DISABLED; 965 } 966 967 error = 0; 968 969 nxt_work_queue_add(ev->write_work_queue, ev->write_handler, 970 ev->task, ev, ev->data); 971 972 } else if (engine->u.epoll.mode == 0) { 973 /* Level-triggered mode. */ 974 nxt_epoll_disable_write(engine, ev); 975 } 976 } 977 978 if (error) { 979 ev->read_ready = 1; 980 ev->write_ready = 1; 981 } 982 } 983} 984 985 986#if (NXT_HAVE_ACCEPT4) 987 988static void 989nxt_epoll_conn_io_accept4(nxt_task_t *task, void *obj, void *data) 990{ 991 socklen_t socklen; 992 nxt_conn_t *c; 993 nxt_socket_t s; 994 struct sockaddr *sa; 995 nxt_listen_event_t *lev; 996 997 lev = obj; 998 c = lev->next; 999 1000 lev->ready--; 1001 lev->socket.read_ready = (lev->ready != 0); 1002 1003 sa = &c->remote->u.sockaddr; 1004 socklen = c->remote->socklen; 1005 /* 1006 * The returned socklen is ignored here, 1007 * see comment in nxt_conn_io_accept(). 1008 */ 1009 s = accept4(lev->socket.fd, sa, &socklen, SOCK_NONBLOCK); 1010 1011 if (s != -1) { 1012 c->socket.fd = s; 1013 1014 nxt_debug(task, "accept4(%d): %d", lev->socket.fd, s); 1015 1016 nxt_conn_accept(task, lev, c); 1017 return; 1018 } 1019 1020 nxt_conn_accept_error(task, lev, "accept4", nxt_errno); 1021} 1022 1023#endif 1024 1025 1026#if (NXT_HAVE_EPOLL_EDGE) 1027 1028/* 1029 * nxt_epoll_edge_event_conn_io_connect() eliminates the getsockopt() 1030 * syscall to test pending connect() error. Although this special 1031 * interface can work in both edge-triggered and level-triggered 1032 * modes it is enabled only for the former mode because this mode is 1033 * available in all modern Linux distributions. For the latter mode 1034 * it is required to create additional nxt_epoll_level_event_conn_io 1035 * with single non-generic connect() interface. 1036 */ 1037 1038static void 1039nxt_epoll_edge_conn_io_connect(nxt_task_t *task, void *obj, void *data) 1040{ 1041 nxt_conn_t *c; 1042 nxt_event_engine_t *engine; 1043 nxt_work_handler_t handler; 1044 const nxt_event_conn_state_t *state; 1045 1046 c = obj; 1047 1048 state = c->write_state; 1049 1050 switch (nxt_socket_connect(task, c->socket.fd, c->remote) ){ 1051 1052 case NXT_OK: 1053 c->socket.write_ready = 1; 1054 handler = state->ready_handler; 1055 break; 1056 1057 case NXT_AGAIN: 1058 c->socket.write_handler = nxt_epoll_edge_conn_connected; 1059 c->socket.error_handler = nxt_conn_connect_error; 1060 1061 engine = task->thread->engine; 1062 nxt_conn_timer(engine, c, state, &c->write_timer); 1063 1064 nxt_epoll_enable(engine, &c->socket); 1065 c->socket.read = NXT_EVENT_BLOCKED; 1066 return; 1067 1068#if 0 1069 case NXT_AGAIN: 1070 nxt_conn_timer(engine, c, state, &c->write_timer); 1071 1072 /* Fall through. */ 1073 1074 case NXT_OK: 1075 /* 1076 * Mark both read and write directions as ready and try to perform 1077 * I/O operations before receiving readiness notifications. 1078 * On unconnected socket Linux send() and recv() return EAGAIN 1079 * instead of ENOTCONN. 1080 */ 1081 c->socket.read_ready = 1; 1082 c->socket.write_ready = 1; 1083 /* 1084 * Enabling both read and write notifications on a getting 1085 * connected socket eliminates one epoll_ctl() syscall. 1086 */ 1087 c->socket.write_handler = nxt_epoll_edge_event_conn_connected; 1088 c->socket.error_handler = state->error_handler; 1089 1090 nxt_epoll_enable(engine, &c->socket); 1091 c->socket.read = NXT_EVENT_BLOCKED; 1092 1093 handler = state->ready_handler; 1094 break; 1095#endif 1096 1097 case NXT_ERROR: 1098 handler = state->error_handler; 1099 break; 1100 1101 default: /* NXT_DECLINED: connection refused. */ 1102 handler = state->close_handler; 1103 break; 1104 } 1105 1106 nxt_work_queue_add(c->write_work_queue, handler, task, c, data); 1107} 1108 1109 1110static void 1111nxt_epoll_edge_conn_connected(nxt_task_t *task, void *obj, void *data) 1112{ 1113 nxt_conn_t *c; 1114 1115 c = obj; 1116 1117 nxt_debug(task, "epoll event conn connected fd:%d", c->socket.fd); 1118 1119 if (!c->socket.epoll_error) { 1120 c->socket.write = NXT_EVENT_BLOCKED; 1121 1122 if (c->write_state->timer_autoreset) { 1123 nxt_timer_disable(task->thread->engine, &c->write_timer); 1124 } 1125 1126 nxt_work_queue_add(c->write_work_queue, c->write_state->ready_handler, 1127 task, c, data); 1128 return; 1129 } 1130 1131 nxt_conn_connect_test(task, c, data); 1132} 1133 1134 1135/* 1136 * nxt_epoll_edge_conn_io_recvbuf() is just wrapper around 1137 * standard nxt_conn_io_recvbuf() to enforce to read a pending EOF 1138 * in edge-triggered mode. 1139 */ 1140 1141static ssize_t 1142nxt_epoll_edge_conn_io_recvbuf(nxt_conn_t *c, nxt_buf_t *b) 1143{ 1144 ssize_t n; 1145 1146 n = nxt_conn_io_recvbuf(c, b); 1147 1148 if (n > 0 && c->socket.epoll_eof) { 1149 c->socket.read_ready = 1; 1150 } 1151 1152 return n; 1153} 1154 1155#endif
|