1 2 /* 3 * Copyright (C) Igor Sysoev 4 * Copyright (C) NGINX, Inc. 5 */ 6 7 #include <nxt_main.h> 8 9 10 /* 11 * The first epoll version has been introduced in Linux 2.5.44. The 12 * interface was changed several times since then and the final version 13 * of epoll_create(), epoll_ctl(), epoll_wait(), and EPOLLET mode has 14 * been introduced in Linux 2.6.0 and is supported since glibc 2.3.2. 15 * 16 * EPOLLET mode did not work reliable in early implementaions and in 17 * Linux 2.4 backport. 18 * 19 * EPOLLONESHOT Linux 2.6.2, glibc 2.3. 20 * EPOLLRDHUP Linux 2.6.17, glibc 2.8. 21 * epoll_pwait() Linux 2.6.19, glibc 2.6. 22 * signalfd() Linux 2.6.22, glibc 2.7. 23 * eventfd() Linux 2.6.22, glibc 2.7. 24 * timerfd_create() Linux 2.6.25, glibc 2.8. 25 * epoll_create1() Linux 2.6.27, glibc 2.9. 26 * signalfd4() Linux 2.6.27, glibc 2.9. 27 * eventfd2() Linux 2.6.27, glibc 2.9. 28 * accept4() Linux 2.6.28, glibc 2.10. 29 * eventfd2(EFD_SEMAPHORE) Linux 2.6.30, glibc 2.10. 30 * EPOLLEXCLUSIVE Linux 4.5, glibc 2.24. 31 */ 32 33 34 #if (NXT_HAVE_EPOLL_EDGE) 35 static nxt_int_t nxt_epoll_edge_create(nxt_event_engine_t *engine, 36 nxt_uint_t mchanges, nxt_uint_t mevents); 37 #endif 38 static nxt_int_t nxt_epoll_level_create(nxt_event_engine_t *engine, 39 nxt_uint_t mchanges, nxt_uint_t mevents); 40 static nxt_int_t nxt_epoll_create(nxt_event_engine_t *engine, 41 nxt_uint_t mchanges, nxt_uint_t mevents, nxt_conn_io_t *io, uint32_t mode); 42 static void nxt_epoll_test_accept4(nxt_event_engine_t *engine, 43 nxt_conn_io_t *io); 44 static void nxt_epoll_free(nxt_event_engine_t *engine); 45 static void nxt_epoll_enable(nxt_event_engine_t *engine, nxt_fd_event_t *ev); 46 static void nxt_epoll_disable(nxt_event_engine_t *engine, nxt_fd_event_t *ev); 47 static void nxt_epoll_delete(nxt_event_engine_t *engine, nxt_fd_event_t *ev); 48 static nxt_bool_t nxt_epoll_close(nxt_event_engine_t *engine, 49 nxt_fd_event_t *ev); 50 static void nxt_epoll_enable_read(nxt_event_engine_t *engine, 51 nxt_fd_event_t *ev); 52 static void nxt_epoll_enable_write(nxt_event_engine_t *engine, 53 nxt_fd_event_t *ev); 54 static void nxt_epoll_disable_read(nxt_event_engine_t *engine, 55 nxt_fd_event_t *ev); 56 static void nxt_epoll_disable_write(nxt_event_engine_t *engine, 57 nxt_fd_event_t *ev); 58 static void nxt_epoll_block_read(nxt_event_engine_t *engine, 59 nxt_fd_event_t *ev); 60 static void nxt_epoll_block_write(nxt_event_engine_t *engine, 61 nxt_fd_event_t *ev); 62 static void nxt_epoll_oneshot_read(nxt_event_engine_t *engine, 63 nxt_fd_event_t *ev); 64 static void nxt_epoll_oneshot_write(nxt_event_engine_t *engine, 65 nxt_fd_event_t *ev); 66 static void nxt_epoll_enable_accept(nxt_event_engine_t *engine, 67 nxt_fd_event_t *ev); 68 static void nxt_epoll_change(nxt_event_engine_t *engine, nxt_fd_event_t *ev, 69 int op, uint32_t events); 70 static nxt_int_t nxt_epoll_commit_changes(nxt_event_engine_t *engine); 71 static void nxt_epoll_error_handler(nxt_task_t *task, void *obj, void *data); 72 #if (NXT_HAVE_SIGNALFD) 73 static nxt_int_t nxt_epoll_add_signal(nxt_event_engine_t *engine); 74 static void nxt_epoll_signalfd_handler(nxt_task_t *task, void *obj, void *data); 75 #endif 76 #if (NXT_HAVE_EVENTFD) 77 static nxt_int_t nxt_epoll_enable_post(nxt_event_engine_t *engine, 78 nxt_work_handler_t handler); 79 static void nxt_epoll_eventfd_handler(nxt_task_t *task, void *obj, void *data); 80 static void nxt_epoll_signal(nxt_event_engine_t *engine, nxt_uint_t signo); 81 #endif 82 static void nxt_epoll_poll(nxt_event_engine_t *engine, nxt_msec_t timeout); 83 84 #if (NXT_HAVE_ACCEPT4) 85 static void nxt_epoll_conn_io_accept4(nxt_task_t *task, void *obj, 86 void *data); 87 #endif 88 89 90 #if (NXT_HAVE_EPOLL_EDGE) 91 92 static void nxt_epoll_edge_conn_io_connect(nxt_task_t *task, void *obj, 93 void *data); 94 static void nxt_epoll_edge_conn_connected(nxt_task_t *task, void *obj, 95 void *data); 96 static ssize_t nxt_epoll_edge_conn_io_recvbuf(nxt_conn_t *c, nxt_buf_t *b); 97 98 99 static nxt_conn_io_t nxt_epoll_edge_conn_io = { 100 nxt_epoll_edge_conn_io_connect, 101 nxt_conn_io_accept, 102 103 nxt_conn_io_read, 104 nxt_epoll_edge_conn_io_recvbuf, 105 nxt_conn_io_recv, 106 107 nxt_conn_io_write, 108 nxt_event_conn_io_write_chunk, 109 110 #if (NXT_HAVE_LINUX_SENDFILE) 111 nxt_linux_event_conn_io_sendfile, 112 #else 113 nxt_event_conn_io_sendbuf, 114 #endif 115 116 nxt_event_conn_io_writev, 117 nxt_event_conn_io_send, 118 119 nxt_conn_io_shutdown, 120 }; 121 122 123 const nxt_event_interface_t nxt_epoll_edge_engine = { 124 "epoll_edge", 125 nxt_epoll_edge_create, 126 nxt_epoll_free, 127 nxt_epoll_enable, 128 nxt_epoll_disable, 129 nxt_epoll_delete, 130 nxt_epoll_close, 131 nxt_epoll_enable_read, 132 nxt_epoll_enable_write, 133 nxt_epoll_disable_read, 134 nxt_epoll_disable_write, 135 nxt_epoll_block_read, 136 nxt_epoll_block_write, 137 nxt_epoll_oneshot_read, 138 nxt_epoll_oneshot_write, 139 nxt_epoll_enable_accept, 140 NULL, 141 NULL, 142 #if (NXT_HAVE_EVENTFD) 143 nxt_epoll_enable_post, 144 nxt_epoll_signal, 145 #else 146 NULL, 147 NULL, 148 #endif 149 nxt_epoll_poll, 150 151 &nxt_epoll_edge_conn_io, 152 153 #if (NXT_HAVE_INOTIFY) 154 NXT_FILE_EVENTS, 155 #else 156 NXT_NO_FILE_EVENTS, 157 #endif 158 159 #if (NXT_HAVE_SIGNALFD) 160 NXT_SIGNAL_EVENTS, 161 #else 162 NXT_NO_SIGNAL_EVENTS, 163 #endif 164 }; 165 166 #endif 167 168 169 const nxt_event_interface_t nxt_epoll_level_engine = { 170 "epoll_level", 171 nxt_epoll_level_create, 172 nxt_epoll_free, 173 nxt_epoll_enable, 174 nxt_epoll_disable, 175 nxt_epoll_delete, 176 nxt_epoll_close, 177 nxt_epoll_enable_read, 178 nxt_epoll_enable_write, 179 nxt_epoll_disable_read, 180 nxt_epoll_disable_write, 181 nxt_epoll_block_read, 182 nxt_epoll_block_write, 183 nxt_epoll_oneshot_read, 184 nxt_epoll_oneshot_write, 185 nxt_epoll_enable_accept, 186 NULL, 187 NULL, 188 #if (NXT_HAVE_EVENTFD) 189 nxt_epoll_enable_post, 190 nxt_epoll_signal, 191 #else 192 NULL, 193 NULL, 194 #endif 195 nxt_epoll_poll, 196 197 &nxt_unix_conn_io, 198 199 #if (NXT_HAVE_INOTIFY) 200 NXT_FILE_EVENTS, 201 #else 202 NXT_NO_FILE_EVENTS, 203 #endif 204 205 #if (NXT_HAVE_SIGNALFD) 206 NXT_SIGNAL_EVENTS, 207 #else 208 NXT_NO_SIGNAL_EVENTS, 209 #endif 210 }; 211 212 213 #if (NXT_HAVE_EPOLL_EDGE) 214 215 static nxt_int_t 216 nxt_epoll_edge_create(nxt_event_engine_t *engine, nxt_uint_t mchanges, 217 nxt_uint_t mevents) 218 { 219 return nxt_epoll_create(engine, mchanges, mevents, &nxt_epoll_edge_conn_io, 220 EPOLLET | EPOLLRDHUP); 221 } 222 223 #endif 224 225 226 static nxt_int_t 227 nxt_epoll_level_create(nxt_event_engine_t *engine, nxt_uint_t mchanges, 228 nxt_uint_t mevents) 229 { 230 return nxt_epoll_create(engine, mchanges, mevents, 231 &nxt_unix_conn_io, 0); 232 } 233 234 235 static nxt_int_t 236 nxt_epoll_create(nxt_event_engine_t *engine, nxt_uint_t mchanges, 237 nxt_uint_t mevents, nxt_conn_io_t *io, uint32_t mode) 238 { 239 engine->u.epoll.fd = -1; 240 engine->u.epoll.mode = mode; 241 engine->u.epoll.mchanges = mchanges; 242 engine->u.epoll.mevents = mevents; 243 #if (NXT_HAVE_SIGNALFD) 244 engine->u.epoll.signalfd.fd = -1; 245 #endif 246 247 engine->u.epoll.changes = nxt_malloc(sizeof(nxt_epoll_change_t) * mchanges); 248 if (engine->u.epoll.changes == NULL) { 249 goto fail; 250 } 251 252 engine->u.epoll.events = nxt_malloc(sizeof(struct epoll_event) * mevents); 253 if (engine->u.epoll.events == NULL) { 254 goto fail; 255 } 256 257 engine->u.epoll.fd = epoll_create(1); 258 if (engine->u.epoll.fd == -1) { 259 nxt_alert(&engine->task, "epoll_create() failed %E", nxt_errno); 260 goto fail; 261 } 262 263 nxt_debug(&engine->task, "epoll_create(): %d", engine->u.epoll.fd); 264 265 if (engine->signals != NULL) { 266 267 #if (NXT_HAVE_SIGNALFD) 268 269 if (nxt_epoll_add_signal(engine) != NXT_OK) { 270 goto fail; 271 } 272 273 #endif 274 275 nxt_epoll_test_accept4(engine, io); 276 } 277 278 return NXT_OK; 279 280 fail: 281 282 nxt_epoll_free(engine); 283 284 return NXT_ERROR; 285 } 286 287 288 static void 289 nxt_epoll_test_accept4(nxt_event_engine_t *engine, nxt_conn_io_t *io) 290 { 291 static nxt_work_handler_t handler; 292 293 if (handler == NULL) { 294 295 handler = io->accept; 296 297 #if (NXT_HAVE_ACCEPT4) 298 299 (void) accept4(-1, NULL, NULL, SOCK_NONBLOCK); 300 301 if (nxt_errno != NXT_ENOSYS) { 302 handler = nxt_epoll_conn_io_accept4; 303 304 } else { 305 nxt_log(&engine->task, NXT_LOG_INFO, "accept4() failed %E", 306 NXT_ENOSYS); 307 } 308 309 #endif 310 } 311 312 io->accept = handler; 313 } 314 315 316 static void 317 nxt_epoll_free(nxt_event_engine_t *engine) 318 { 319 int fd; 320 321 nxt_debug(&engine->task, "epoll %d free", engine->u.epoll.fd); 322 323 #if (NXT_HAVE_SIGNALFD) 324 325 fd = engine->u.epoll.signalfd.fd; 326 327 if (fd != -1 && close(fd) != 0) { 328 nxt_alert(&engine->task, "signalfd close(%d) failed %E", fd, nxt_errno); 329 } 330 331 #endif 332 333 #if (NXT_HAVE_EVENTFD) 334 335 fd = engine->u.epoll.eventfd.fd; 336 337 if (fd != -1 && close(fd) != 0) { 338 nxt_alert(&engine->task, "eventfd close(%d) failed %E", fd, nxt_errno); 339 } 340 341 #endif 342 343 fd = engine->u.epoll.fd; 344 345 if (fd != -1 && close(fd) != 0) { 346 nxt_alert(&engine->task, "epoll close(%d) failed %E", fd, nxt_errno); 347 } 348 349 nxt_free(engine->u.epoll.events); 350 nxt_free(engine->u.epoll.changes); 351 352 nxt_memzero(&engine->u.epoll, sizeof(nxt_epoll_engine_t)); 353 } 354 355 356 static void 357 nxt_epoll_enable(nxt_event_engine_t *engine, nxt_fd_event_t *ev) 358 { 359 ev->read = NXT_EVENT_ACTIVE; 360 ev->write = NXT_EVENT_ACTIVE; 361 362 nxt_epoll_change(engine, ev, EPOLL_CTL_ADD, 363 EPOLLIN | EPOLLOUT | engine->u.epoll.mode); 364 } 365 366 367 static void 368 nxt_epoll_disable(nxt_event_engine_t *engine, nxt_fd_event_t *ev) 369 { 370 if (ev->read > NXT_EVENT_DISABLED || ev->write > NXT_EVENT_DISABLED) { 371 372 ev->read = NXT_EVENT_INACTIVE; 373 ev->write = NXT_EVENT_INACTIVE; 374 375 nxt_epoll_change(engine, ev, EPOLL_CTL_DEL, 0); 376 } 377 } 378 379 380 static void 381 nxt_epoll_delete(nxt_event_engine_t *engine, nxt_fd_event_t *ev) 382 { 383 if (ev->read != NXT_EVENT_INACTIVE || ev->write != NXT_EVENT_INACTIVE) { 384 385 ev->read = NXT_EVENT_INACTIVE; 386 ev->write = NXT_EVENT_INACTIVE; 387 388 nxt_epoll_change(engine, ev, EPOLL_CTL_DEL, 0); 389 } 390 } 391 392 393 /* 394 * Although calling close() on a file descriptor will remove any epoll 395 * events that reference the descriptor, in this case the close() acquires 396 * the kernel global "epmutex" while epoll_ctl(EPOLL_CTL_DEL) does not 397 * acquire the "epmutex" since Linux 3.13 if the file descriptor presents 398 * only in one epoll set. Thus removing events explicitly before closing 399 * eliminates possible lock contention. 400 */ 401 402 static nxt_bool_t 403 nxt_epoll_close(nxt_event_engine_t *engine, nxt_fd_event_t *ev) 404 { 405 nxt_epoll_delete(engine, ev); 406 407 return ev->changing; 408 } 409 410 411 static void 412 nxt_epoll_enable_read(nxt_event_engine_t *engine, nxt_fd_event_t *ev) 413 { 414 int op; 415 uint32_t events; 416 417 if (ev->read != NXT_EVENT_BLOCKED) { 418 419 op = EPOLL_CTL_MOD; 420 events = EPOLLIN | engine->u.epoll.mode; 421 422 if (ev->read == NXT_EVENT_INACTIVE && ev->write == NXT_EVENT_INACTIVE) { 423 op = EPOLL_CTL_ADD; 424 425 } else if (ev->write >= NXT_EVENT_BLOCKED) { 426 events |= EPOLLOUT; 427 } 428 429 nxt_epoll_change(engine, ev, op, events); 430 } 431 432 ev->read = NXT_EVENT_ACTIVE; 433 } 434 435 436 static void 437 nxt_epoll_enable_write(nxt_event_engine_t *engine, nxt_fd_event_t *ev) 438 { 439 int op; 440 uint32_t events; 441 442 if (ev->write != NXT_EVENT_BLOCKED) { 443 444 op = EPOLL_CTL_MOD; 445 events = EPOLLOUT | engine->u.epoll.mode; 446 447 if (ev->read == NXT_EVENT_INACTIVE && ev->write == NXT_EVENT_INACTIVE) { 448 op = EPOLL_CTL_ADD; 449 450 } else if (ev->read >= NXT_EVENT_BLOCKED) { 451 events |= EPOLLIN; 452 } 453 454 nxt_epoll_change(engine, ev, op, events); 455 } 456 457 ev->write = NXT_EVENT_ACTIVE; 458 } 459 460 461 static void 462 nxt_epoll_disable_read(nxt_event_engine_t *engine, nxt_fd_event_t *ev) 463 { 464 int op; 465 uint32_t events; 466 467 ev->read = NXT_EVENT_INACTIVE; 468 469 if (ev->write <= NXT_EVENT_DISABLED) { 470 ev->write = NXT_EVENT_INACTIVE; 471 op = EPOLL_CTL_DEL; 472 events = 0; 473 474 } else { 475 op = EPOLL_CTL_MOD; 476 events = EPOLLOUT | engine->u.epoll.mode; 477 } 478 479 nxt_epoll_change(engine, ev, op, events); 480 } 481 482 483 static void 484 nxt_epoll_disable_write(nxt_event_engine_t *engine, nxt_fd_event_t *ev) 485 { 486 int op; 487 uint32_t events; 488 489 ev->write = NXT_EVENT_INACTIVE; 490 491 if (ev->read <= NXT_EVENT_DISABLED) { 492 ev->write = NXT_EVENT_INACTIVE; 493 op = EPOLL_CTL_DEL; 494 events = 0; 495 496 } else { 497 op = EPOLL_CTL_MOD; 498 events = EPOLLIN | engine->u.epoll.mode; 499 } 500 501 nxt_epoll_change(engine, ev, op, events); 502 } 503 504 505 static void 506 nxt_epoll_block_read(nxt_event_engine_t *engine, nxt_fd_event_t *ev) 507 { 508 if (ev->read != NXT_EVENT_INACTIVE) { 509 ev->read = NXT_EVENT_BLOCKED; 510 } 511 } 512 513 514 static void 515 nxt_epoll_block_write(nxt_event_engine_t *engine, nxt_fd_event_t *ev) 516 { 517 if (ev->write != NXT_EVENT_INACTIVE) { 518 ev->write = NXT_EVENT_BLOCKED; 519 } 520 } 521 522 523 /* 524 * NXT_EVENT_DISABLED state is used to track whether EPOLLONESHOT 525 * event should be added or modified, epoll_ctl(2): 526 * 527 * EPOLLONESHOT (since Linux 2.6.2) 528 * Sets the one-shot behavior for the associated file descriptor. 529 * This means that after an event is pulled out with epoll_wait(2) 530 * the associated file descriptor is internally disabled and no 531 * other events will be reported by the epoll interface. The user 532 * must call epoll_ctl() with EPOLL_CTL_MOD to rearm the file 533 * descriptor with a new event mask. 534 */ 535 536 static void 537 nxt_epoll_oneshot_read(nxt_event_engine_t *engine, nxt_fd_event_t *ev) 538 { 539 int op; 540 541 op = (ev->read == NXT_EVENT_INACTIVE && ev->write == NXT_EVENT_INACTIVE) ? 542 EPOLL_CTL_ADD : EPOLL_CTL_MOD; 543 544 ev->read = NXT_EVENT_ONESHOT; 545 ev->write = NXT_EVENT_INACTIVE; 546 547 nxt_epoll_change(engine, ev, op, EPOLLIN | EPOLLONESHOT); 548 } 549 550 551 static void 552 nxt_epoll_oneshot_write(nxt_event_engine_t *engine, nxt_fd_event_t *ev) 553 { 554 int op; 555 556 op = (ev->read == NXT_EVENT_INACTIVE && ev->write == NXT_EVENT_INACTIVE) ? 557 EPOLL_CTL_ADD : EPOLL_CTL_MOD; 558 559 ev->read = NXT_EVENT_INACTIVE; 560 ev->write = NXT_EVENT_ONESHOT; 561 562 nxt_epoll_change(engine, ev, op, EPOLLOUT | EPOLLONESHOT); 563 } 564 565 566 static void 567 nxt_epoll_enable_accept(nxt_event_engine_t *engine, nxt_fd_event_t *ev) 568 { 569 uint32_t events; 570 571 ev->read = NXT_EVENT_ACTIVE; 572 573 events = EPOLLIN; 574 575 #ifdef EPOLLEXCLUSIVE 576 events |= EPOLLEXCLUSIVE; 577 #endif 578 579 nxt_epoll_change(engine, ev, EPOLL_CTL_ADD, events); 580 } 581 582 583 /* 584 * epoll changes are batched to improve instruction and data cache 585 * locality of several epoll_ctl() calls followed by epoll_wait() call. 586 */ 587 588 static void 589 nxt_epoll_change(nxt_event_engine_t *engine, nxt_fd_event_t *ev, int op, 590 uint32_t events) 591 { 592 nxt_epoll_change_t *change; 593 594 nxt_debug(ev->task, "epoll %d set event: fd:%d op:%d ev:%XD", 595 engine->u.epoll.fd, ev->fd, op, events); 596 597 if (engine->u.epoll.nchanges >= engine->u.epoll.mchanges) { 598 (void) nxt_epoll_commit_changes(engine); 599 } 600 601 ev->changing = 1; 602 603 change = &engine->u.epoll.changes[engine->u.epoll.nchanges++]; 604 change->op = op; 605 change->event.events = events; 606 change->event.data.ptr = ev; 607 } 608 609 610 static nxt_int_t 611 nxt_epoll_commit_changes(nxt_event_engine_t *engine) 612 { 613 int ret; 614 nxt_int_t retval; 615 nxt_fd_event_t *ev; 616 nxt_epoll_change_t *change, *end; 617 618 nxt_debug(&engine->task, "epoll %d changes:%ui", 619 engine->u.epoll.fd, engine->u.epoll.nchanges); 620 621 retval = NXT_OK; 622 change = engine->u.epoll.changes; 623 end = change + engine->u.epoll.nchanges; 624 625 do { 626 ev = change->event.data.ptr; 627 ev->changing = 0; 628 629 nxt_debug(ev->task, "epoll_ctl(%d): fd:%d op:%d ev:%XD", 630 engine->u.epoll.fd, ev->fd, change->op, 631 change->event.events); 632 633 ret = epoll_ctl(engine->u.epoll.fd, change->op, ev->fd, &change->event); 634 635 if (nxt_slow_path(ret != 0)) { 636 nxt_alert(ev->task, "epoll_ctl(%d, %d, %d) failed %E", 637 engine->u.epoll.fd, change->op, ev->fd, nxt_errno); 638 639 nxt_work_queue_add(&engine->fast_work_queue, 640 nxt_epoll_error_handler, ev->task, ev, ev->data); 641 642 retval = NXT_ERROR; 643 } 644 645 change++; 646 647 } while (change < end); 648 649 engine->u.epoll.nchanges = 0; 650 651 return retval; 652 } 653 654 655 static void 656 nxt_epoll_error_handler(nxt_task_t *task, void *obj, void *data) 657 { 658 nxt_fd_event_t *ev; 659 660 ev = obj; 661 662 ev->read = NXT_EVENT_INACTIVE; 663 ev->write = NXT_EVENT_INACTIVE; 664 665 ev->error_handler(ev->task, ev, data); 666 } 667 668 669 #if (NXT_HAVE_SIGNALFD) 670 671 static nxt_int_t 672 nxt_epoll_add_signal(nxt_event_engine_t *engine) 673 { 674 int fd; 675 struct epoll_event ee; 676 677 if (sigprocmask(SIG_BLOCK, &engine->signals->sigmask, NULL) != 0) { 678 nxt_alert(&engine->task, "sigprocmask(SIG_BLOCK) failed %E", nxt_errno); 679 return NXT_ERROR; 680 } 681 682 /* 683 * Glibc signalfd() wrapper always has the flags argument. Glibc 2.7 684 * and 2.8 signalfd() wrappers call the original signalfd() syscall 685 * without the flags argument. Glibc 2.9+ signalfd() wrapper at first 686 * tries to call signalfd4() syscall and if it fails then calls the 687 * original signalfd() syscall. For this reason the non-blocking mode 688 * is set separately. 689 */ 690 691 fd = signalfd(-1, &engine->signals->sigmask, 0); 692 693 if (fd == -1) { 694 nxt_alert(&engine->task, "signalfd(%d) failed %E", 695 engine->u.epoll.signalfd.fd, nxt_errno); 696 return NXT_ERROR; 697 } 698 699 engine->u.epoll.signalfd.fd = fd; 700 701 if (nxt_fd_nonblocking(&engine->task, fd) != NXT_OK) { 702 return NXT_ERROR; 703 } 704 705 nxt_debug(&engine->task, "signalfd(): %d", fd); 706 707 engine->u.epoll.signalfd.data = engine->signals->handler; 708 engine->u.epoll.signalfd.read_work_queue = &engine->fast_work_queue; 709 engine->u.epoll.signalfd.read_handler = nxt_epoll_signalfd_handler; 710 engine->u.epoll.signalfd.log = engine->task.log; 711 engine->u.epoll.signalfd.task = &engine->task; 712 713 ee.events = EPOLLIN; 714 ee.data.ptr = &engine->u.epoll.signalfd; 715 716 if (epoll_ctl(engine->u.epoll.fd, EPOLL_CTL_ADD, fd, &ee) != 0) { 717 nxt_alert(&engine->task, "epoll_ctl(%d, %d, %d) failed %E", 718 engine->u.epoll.fd, EPOLL_CTL_ADD, fd, nxt_errno); 719 720 return NXT_ERROR; 721 } 722 723 return NXT_OK; 724 } 725 726 727 static void 728 nxt_epoll_signalfd_handler(nxt_task_t *task, void *obj, void *data) 729 { 730 int n; 731 nxt_fd_event_t *ev; 732 nxt_work_handler_t handler; 733 struct signalfd_siginfo sfd; 734 735 ev = obj; 736 handler = data; 737 738 nxt_debug(task, "signalfd handler"); 739 740 n = read(ev->fd, &sfd, sizeof(struct signalfd_siginfo)); 741 742 nxt_debug(task, "read signalfd(%d): %d", ev->fd, n); 743 744 if (n != sizeof(struct signalfd_siginfo)) { 745 nxt_alert(task, "read signalfd(%d) failed %E", ev->fd, nxt_errno); 746 return; 747 } 748 749 nxt_debug(task, "signalfd(%d) signo:%d", ev->fd, sfd.ssi_signo); 750 751 handler(task, (void *) (uintptr_t) sfd.ssi_signo, NULL); 752 } 753 754 #endif 755 756 757 #if (NXT_HAVE_EVENTFD) 758 759 static nxt_int_t 760 nxt_epoll_enable_post(nxt_event_engine_t *engine, nxt_work_handler_t handler) 761 { 762 int ret; 763 struct epoll_event ee; 764 765 engine->u.epoll.post_handler = handler; 766 767 /* 768 * Glibc eventfd() wrapper always has the flags argument. Glibc 2.7 769 * and 2.8 eventfd() wrappers call the original eventfd() syscall 770 * without the flags argument. Glibc 2.9+ eventfd() wrapper at first 771 * tries to call eventfd2() syscall and if it fails then calls the 772 * original eventfd() syscall. For this reason the non-blocking mode 773 * is set separately. 774 */ 775 776 engine->u.epoll.eventfd.fd = eventfd(0, 0); 777 778 if (engine->u.epoll.eventfd.fd == -1) { 779 nxt_alert(&engine->task, "eventfd() failed %E", nxt_errno); 780 return NXT_ERROR; 781 } 782 783 ret = nxt_fd_nonblocking(&engine->task, engine->u.epoll.eventfd.fd); 784 if (nxt_slow_path(ret != NXT_OK)) { 785 return NXT_ERROR; 786 } 787 788 nxt_debug(&engine->task, "eventfd(): %d", engine->u.epoll.eventfd.fd); 789 790 engine->u.epoll.eventfd.read_work_queue = &engine->fast_work_queue; 791 engine->u.epoll.eventfd.read_handler = nxt_epoll_eventfd_handler; 792 engine->u.epoll.eventfd.data = engine; 793 engine->u.epoll.eventfd.log = engine->task.log; 794 engine->u.epoll.eventfd.task = &engine->task; 795 796 ee.events = EPOLLIN | EPOLLET; 797 ee.data.ptr = &engine->u.epoll.eventfd; 798 799 ret = epoll_ctl(engine->u.epoll.fd, EPOLL_CTL_ADD, 800 engine->u.epoll.eventfd.fd, &ee); 801 802 if (nxt_fast_path(ret == 0)) { 803 return NXT_OK; 804 } 805 806 nxt_alert(&engine->task, "epoll_ctl(%d, %d, %d) failed %E", 807 engine->u.epoll.fd, EPOLL_CTL_ADD, engine->u.epoll.eventfd.fd, 808 nxt_errno); 809 810 return NXT_ERROR; 811 } 812 813 814 static void 815 nxt_epoll_eventfd_handler(nxt_task_t *task, void *obj, void *data) 816 { 817 int n; 818 uint64_t events; 819 nxt_event_engine_t *engine; 820 821 engine = data; 822 823 nxt_debug(task, "eventfd handler, times:%ui", engine->u.epoll.neventfd); 824 825 /* 826 * The maximum value after write() to a eventfd() descriptor will 827 * block or return EAGAIN is 0xFFFFFFFFFFFFFFFE, so the descriptor 828 * can be read once per many notifications, for example, once per 829 * 2^32-2 noticifcations. Since the eventfd() file descriptor is 830 * always registered in EPOLLET mode, epoll returns event about 831 * only the latest write() to the descriptor. 832 */ 833 834 if (engine->u.epoll.neventfd++ >= 0xFFFFFFFE) { 835 engine->u.epoll.neventfd = 0; 836 837 n = read(engine->u.epoll.eventfd.fd, &events, sizeof(uint64_t)); 838 839 nxt_debug(task, "read(%d): %d events:%uL", 840 engine->u.epoll.eventfd.fd, n, events); 841 842 if (n != sizeof(uint64_t)) { 843 nxt_alert(task, "read eventfd(%d) failed %E", 844 engine->u.epoll.eventfd.fd, nxt_errno); 845 } 846 } 847 848 engine->u.epoll.post_handler(task, NULL, NULL); 849 } 850 851 852 static void 853 nxt_epoll_signal(nxt_event_engine_t *engine, nxt_uint_t signo) 854 { 855 size_t ret; 856 uint64_t event; 857 858 /* 859 * eventfd() presents along with signalfd(), so the function 860 * is used only to post events and the signo argument is ignored. 861 */ 862 863 event = 1; 864 865 ret = write(engine->u.epoll.eventfd.fd, &event, sizeof(uint64_t)); 866 867 if (nxt_slow_path(ret != sizeof(uint64_t))) { 868 nxt_alert(&engine->task, "write(%d) to eventfd failed %E", 869 engine->u.epoll.eventfd.fd, nxt_errno); 870 } 871 } 872 873 #endif 874 875 876 static void 877 nxt_epoll_poll(nxt_event_engine_t *engine, nxt_msec_t timeout) 878 { 879 int nevents; 880 uint32_t events; 881 nxt_int_t i; 882 nxt_err_t err; 883 nxt_bool_t error; 884 nxt_uint_t level; 885 nxt_fd_event_t *ev; 886 struct epoll_event *event; 887 888 if (engine->u.epoll.nchanges != 0) { 889 if (nxt_epoll_commit_changes(engine) != NXT_OK) { 890 /* Error handlers have been enqueued on failure. */ 891 timeout = 0; 892 } 893 } 894 895 nxt_debug(&engine->task, "epoll_wait(%d) timeout:%M", 896 engine->u.epoll.fd, timeout); 897 898 nevents = epoll_wait(engine->u.epoll.fd, engine->u.epoll.events, 899 engine->u.epoll.mevents, timeout); 900 901 err = (nevents == -1) ? nxt_errno : 0; 902 903 nxt_thread_time_update(engine->task.thread); 904 905 nxt_debug(&engine->task, "epoll_wait(%d): %d", engine->u.epoll.fd, nevents); 906 907 if (nevents == -1) { 908 level = (err == NXT_EINTR) ? NXT_LOG_INFO : NXT_LOG_ALERT; 909 910 nxt_log(&engine->task, level, "epoll_wait(%d) failed %E", 911 engine->u.epoll.fd, err); 912 913 return; 914 } 915 916 for (i = 0; i < nevents; i++) { 917 918 event = &engine->u.epoll.events[i]; 919 events = event->events; 920 ev = event->data.ptr; 921 922 nxt_debug(ev->task, "epoll: fd:%d ev:%04XD d:%p rd:%d wr:%d", 923 ev->fd, events, ev, ev->read, ev->write); 924 925 /* 926 * On error epoll may set EPOLLERR and EPOLLHUP only without EPOLLIN or 927 * EPOLLOUT, so the "error" variable enqueues only one active handler. 928 */ 929 error = ((events & (EPOLLERR | EPOLLHUP)) != 0); 930 ev->epoll_error = error; 931 932 #if (NXT_HAVE_EPOLL_EDGE) 933 934 ev->epoll_eof = ((events & EPOLLRDHUP) != 0); 935 936 #endif 937 938 if ((events & EPOLLIN) || error) { 939 ev->read_ready = 1; 940 941 if (ev->read != NXT_EVENT_BLOCKED) { 942 943 if (ev->read == NXT_EVENT_ONESHOT) { 944 ev->read = NXT_EVENT_DISABLED; 945 } 946 947 error = 0; 948 949 nxt_work_queue_add(ev->read_work_queue, ev->read_handler, 950 ev->task, ev, ev->data); 951 952 } else if (engine->u.epoll.mode == 0) { 953 /* Level-triggered mode. */ 954 nxt_epoll_disable_read(engine, ev); 955 } 956 } 957 958 if ((events & EPOLLOUT) || error) { 959 ev->write_ready = 1; 960 961 if (ev->write != NXT_EVENT_BLOCKED) { 962 963 if (ev->write == NXT_EVENT_ONESHOT) { 964 ev->write = NXT_EVENT_DISABLED; 965 } 966 967 error = 0; 968 969 nxt_work_queue_add(ev->write_work_queue, ev->write_handler, 970 ev->task, ev, ev->data); 971 972 } else if (engine->u.epoll.mode == 0) { 973 /* Level-triggered mode. */ 974 nxt_epoll_disable_write(engine, ev); 975 } 976 } 977 978 if (error) { 979 ev->read_ready = 1; 980 ev->write_ready = 1; 981 } 982 } 983 } 984 985 986 #if (NXT_HAVE_ACCEPT4) 987 988 static void 989 nxt_epoll_conn_io_accept4(nxt_task_t *task, void *obj, void *data) 990 { 991 socklen_t socklen; 992 nxt_conn_t *c; 993 nxt_socket_t s; 994 struct sockaddr *sa; 995 nxt_listen_event_t *lev; 996 997 lev = obj; 998 c = lev->next; 999 1000 lev->ready--; 1001 lev->socket.read_ready = (lev->ready != 0); 1002 1003 sa = &c->remote->u.sockaddr; 1004 socklen = c->remote->socklen; 1005 /* 1006 * The returned socklen is ignored here, 1007 * see comment in nxt_conn_io_accept(). 1008 */ 1009 s = accept4(lev->socket.fd, sa, &socklen, SOCK_NONBLOCK); 1010 1011 if (s != -1) { 1012 c->socket.fd = s; 1013 1014 nxt_debug(task, "accept4(%d): %d", lev->socket.fd, s); 1015 1016 nxt_conn_accept(task, lev, c); 1017 return; 1018 } 1019 1020 nxt_conn_accept_error(task, lev, "accept4", nxt_errno); 1021 } 1022 1023 #endif 1024 1025 1026 #if (NXT_HAVE_EPOLL_EDGE) 1027 1028 /* 1029 * nxt_epoll_edge_event_conn_io_connect() eliminates the getsockopt() 1030 * syscall to test pending connect() error. Although this special 1031 * interface can work in both edge-triggered and level-triggered 1032 * modes it is enabled only for the former mode because this mode is 1033 * available in all modern Linux distributions. For the latter mode 1034 * it is required to create additional nxt_epoll_level_event_conn_io 1035 * with single non-generic connect() interface. 1036 */ 1037 1038 static void 1039 nxt_epoll_edge_conn_io_connect(nxt_task_t *task, void *obj, void *data) 1040 { 1041 nxt_conn_t *c; 1042 nxt_event_engine_t *engine; 1043 nxt_work_handler_t handler; 1044 const nxt_event_conn_state_t *state; 1045 1046 c = obj; 1047 1048 state = c->write_state; 1049 1050 switch (nxt_socket_connect(task, c->socket.fd, c->remote) ){ 1051 1052 case NXT_OK: 1053 c->socket.write_ready = 1; 1054 handler = state->ready_handler; 1055 break; 1056 1057 case NXT_AGAIN: 1058 c->socket.write_handler = nxt_epoll_edge_conn_connected; 1059 c->socket.error_handler = nxt_conn_connect_error; 1060 1061 engine = task->thread->engine; 1062 nxt_conn_timer(engine, c, state, &c->write_timer); 1063 1064 nxt_epoll_enable(engine, &c->socket); 1065 c->socket.read = NXT_EVENT_BLOCKED; 1066 return; 1067 1068 #if 0 1069 case NXT_AGAIN: 1070 nxt_conn_timer(engine, c, state, &c->write_timer); 1071 1072 /* Fall through. */ 1073 1074 case NXT_OK: 1075 /* 1076 * Mark both read and write directions as ready and try to perform 1077 * I/O operations before receiving readiness notifications. 1078 * On unconnected socket Linux send() and recv() return EAGAIN 1079 * instead of ENOTCONN. 1080 */ 1081 c->socket.read_ready = 1; 1082 c->socket.write_ready = 1; 1083 /* 1084 * Enabling both read and write notifications on a getting 1085 * connected socket eliminates one epoll_ctl() syscall. 1086 */ 1087 c->socket.write_handler = nxt_epoll_edge_event_conn_connected; 1088 c->socket.error_handler = state->error_handler; 1089 1090 nxt_epoll_enable(engine, &c->socket); 1091 c->socket.read = NXT_EVENT_BLOCKED; 1092 1093 handler = state->ready_handler; 1094 break; 1095 #endif 1096 1097 case NXT_ERROR: 1098 handler = state->error_handler; 1099 break; 1100 1101 default: /* NXT_DECLINED: connection refused. */ 1102 handler = state->close_handler; 1103 break; 1104 } 1105 1106 nxt_work_queue_add(c->write_work_queue, handler, task, c, data); 1107 } 1108 1109 1110 static void 1111 nxt_epoll_edge_conn_connected(nxt_task_t *task, void *obj, void *data) 1112 { 1113 nxt_conn_t *c; 1114 1115 c = obj; 1116 1117 nxt_debug(task, "epoll event conn connected fd:%d", c->socket.fd); 1118 1119 if (!c->socket.epoll_error) { 1120 c->socket.write = NXT_EVENT_BLOCKED; 1121 1122 if (c->write_state->timer_autoreset) { 1123 nxt_timer_disable(task->thread->engine, &c->write_timer); 1124 } 1125 1126 nxt_work_queue_add(c->write_work_queue, c->write_state->ready_handler, 1127 task, c, data); 1128 return; 1129 } 1130 1131 nxt_conn_connect_test(task, c, data); 1132 } 1133 1134 1135 /* 1136 * nxt_epoll_edge_conn_io_recvbuf() is just wrapper around 1137 * standard nxt_conn_io_recvbuf() to enforce to read a pending EOF 1138 * in edge-triggered mode. 1139 */ 1140 1141 static ssize_t 1142 nxt_epoll_edge_conn_io_recvbuf(nxt_conn_t *c, nxt_buf_t *b) 1143 { 1144 ssize_t n; 1145 1146 n = nxt_conn_io_recvbuf(c, b); 1147 1148 if (n > 0 && c->socket.epoll_eof) { 1149 c->socket.read_ready = 1; 1150 } 1151 1152 return n; 1153 } 1154 1155 #endif 1156