1 2 /* 3 * Copyright (C) Igor Sysoev 4 * Copyright (C) NGINX, Inc. 5 */ 6 7 #include <nxt_main.h> 8 9 10 /* 11 * The first epoll version has been introduced in Linux 2.5.44. The 12 * interface was changed several times since then and the final version 13 * of epoll_create(), epoll_ctl(), epoll_wait(), and EPOLLET mode has 14 * been introduced in Linux 2.6.0 and is supported since glibc 2.3.2. 15 * 16 * EPOLLET mode did not work reliable in early implementaions and in 17 * Linux 2.4 backport. 18 * 19 * EPOLLONESHOT Linux 2.6.2, glibc 2.3. 20 * EPOLLRDHUP Linux 2.6.17, glibc 2.8. 21 * epoll_pwait() Linux 2.6.19, glibc 2.6. 22 * signalfd() Linux 2.6.22, glibc 2.7. 23 * eventfd() Linux 2.6.22, glibc 2.7. 24 * timerfd_create() Linux 2.6.25, glibc 2.8. 25 * epoll_create1() Linux 2.6.27, glibc 2.9. 26 * signalfd4() Linux 2.6.27, glibc 2.9. 27 * eventfd2() Linux 2.6.27, glibc 2.9. 28 * accept4() Linux 2.6.28, glibc 2.10. 29 * eventfd2(EFD_SEMAPHORE) Linux 2.6.30, glibc 2.10. 30 * EPOLLEXCLUSIVE Linux 4.5, glibc 2.24. 31 */ 32 33 34 #if (NXT_HAVE_EPOLL_EDGE) 35 static nxt_int_t nxt_epoll_edge_create(nxt_event_engine_t *engine, 36 nxt_uint_t mchanges, nxt_uint_t mevents); 37 #endif 38 static nxt_int_t nxt_epoll_level_create(nxt_event_engine_t *engine, 39 nxt_uint_t mchanges, nxt_uint_t mevents); 40 static nxt_int_t nxt_epoll_create(nxt_event_engine_t *engine, 41 nxt_uint_t mchanges, nxt_uint_t mevents, nxt_conn_io_t *io, uint32_t mode); 42 static void nxt_epoll_test_accept4(nxt_event_engine_t *engine, 43 nxt_conn_io_t *io); 44 static void nxt_epoll_free(nxt_event_engine_t *engine); 45 static void nxt_epoll_enable(nxt_event_engine_t *engine, nxt_fd_event_t *ev); 46 static void nxt_epoll_disable(nxt_event_engine_t *engine, nxt_fd_event_t *ev); 47 static void nxt_epoll_delete(nxt_event_engine_t *engine, nxt_fd_event_t *ev); 48 static nxt_bool_t nxt_epoll_close(nxt_event_engine_t *engine, 49 nxt_fd_event_t *ev); 50 static void nxt_epoll_enable_read(nxt_event_engine_t *engine, 51 nxt_fd_event_t *ev); 52 static void nxt_epoll_enable_write(nxt_event_engine_t *engine, 53 nxt_fd_event_t *ev); 54 static void nxt_epoll_disable_read(nxt_event_engine_t *engine, 55 nxt_fd_event_t *ev); 56 static void nxt_epoll_disable_write(nxt_event_engine_t *engine, 57 nxt_fd_event_t *ev); 58 static void nxt_epoll_block_read(nxt_event_engine_t *engine, 59 nxt_fd_event_t *ev); 60 static void nxt_epoll_block_write(nxt_event_engine_t *engine, 61 nxt_fd_event_t *ev); 62 static void nxt_epoll_oneshot_read(nxt_event_engine_t *engine, 63 nxt_fd_event_t *ev); 64 static void nxt_epoll_oneshot_write(nxt_event_engine_t *engine, 65 nxt_fd_event_t *ev); 66 static void nxt_epoll_enable_accept(nxt_event_engine_t *engine, 67 nxt_fd_event_t *ev); 68 static void nxt_epoll_change(nxt_event_engine_t *engine, nxt_fd_event_t *ev, 69 int op, uint32_t events); 70 static nxt_int_t nxt_epoll_commit_changes(nxt_event_engine_t *engine); 71 static void nxt_epoll_error_handler(nxt_task_t *task, void *obj, void *data); 72 #if (NXT_HAVE_SIGNALFD) 73 static nxt_int_t nxt_epoll_add_signal(nxt_event_engine_t *engine); 74 static void nxt_epoll_signalfd_handler(nxt_task_t *task, void *obj, void *data); 75 #endif 76 #if (NXT_HAVE_EVENTFD) 77 static nxt_int_t nxt_epoll_enable_post(nxt_event_engine_t *engine, 78 nxt_work_handler_t handler); 79 static void nxt_epoll_eventfd_handler(nxt_task_t *task, void *obj, void *data); 80 static void nxt_epoll_signal(nxt_event_engine_t *engine, nxt_uint_t signo); 81 #endif 82 static void nxt_epoll_poll(nxt_event_engine_t *engine, nxt_msec_t timeout); 83 84 #if (NXT_HAVE_ACCEPT4) 85 static void nxt_epoll_conn_io_accept4(nxt_task_t *task, void *obj, 86 void *data); 87 #endif 88 89 90 #if (NXT_HAVE_EPOLL_EDGE) 91 92 static void nxt_epoll_edge_conn_io_connect(nxt_task_t *task, void *obj, 93 void *data); 94 static void nxt_epoll_edge_conn_connected(nxt_task_t *task, void *obj, 95 void *data); 96 static ssize_t nxt_epoll_edge_conn_io_recvbuf(nxt_conn_t *c, nxt_buf_t *b); 97 98 99 static nxt_conn_io_t nxt_epoll_edge_conn_io = { 100 .connect = nxt_epoll_edge_conn_io_connect, 101 .accept = nxt_conn_io_accept, 102 103 .read = nxt_conn_io_read, 104 .recvbuf = nxt_epoll_edge_conn_io_recvbuf, 105 .recv = nxt_conn_io_recv, 106 107 .write = nxt_conn_io_write, 108 .sendbuf = nxt_conn_io_sendbuf, 109 110 #if (NXT_HAVE_LINUX_SENDFILE) 111 .old_sendbuf = nxt_linux_event_conn_io_sendfile, 112 #else 113 .old_sendbuf = nxt_event_conn_io_sendbuf, 114 #endif 115 116 .writev = nxt_event_conn_io_writev, 117 .send = nxt_event_conn_io_send, 118 }; 119 120 121 const nxt_event_interface_t nxt_epoll_edge_engine = { 122 "epoll_edge", 123 nxt_epoll_edge_create, 124 nxt_epoll_free, 125 nxt_epoll_enable, 126 nxt_epoll_disable, 127 nxt_epoll_delete, 128 nxt_epoll_close, 129 nxt_epoll_enable_read, 130 nxt_epoll_enable_write, 131 nxt_epoll_disable_read, 132 nxt_epoll_disable_write, 133 nxt_epoll_block_read, 134 nxt_epoll_block_write, 135 nxt_epoll_oneshot_read, 136 nxt_epoll_oneshot_write, 137 nxt_epoll_enable_accept, 138 NULL, 139 NULL, 140 #if (NXT_HAVE_EVENTFD) 141 nxt_epoll_enable_post, 142 nxt_epoll_signal, 143 #else 144 NULL, 145 NULL, 146 #endif 147 nxt_epoll_poll, 148 149 &nxt_epoll_edge_conn_io, 150 151 #if (NXT_HAVE_INOTIFY) 152 NXT_FILE_EVENTS, 153 #else 154 NXT_NO_FILE_EVENTS, 155 #endif 156 157 #if (NXT_HAVE_SIGNALFD) 158 NXT_SIGNAL_EVENTS, 159 #else 160 NXT_NO_SIGNAL_EVENTS, 161 #endif 162 }; 163 164 #endif 165 166 167 const nxt_event_interface_t nxt_epoll_level_engine = { 168 "epoll_level", 169 nxt_epoll_level_create, 170 nxt_epoll_free, 171 nxt_epoll_enable, 172 nxt_epoll_disable, 173 nxt_epoll_delete, 174 nxt_epoll_close, 175 nxt_epoll_enable_read, 176 nxt_epoll_enable_write, 177 nxt_epoll_disable_read, 178 nxt_epoll_disable_write, 179 nxt_epoll_block_read, 180 nxt_epoll_block_write, 181 nxt_epoll_oneshot_read, 182 nxt_epoll_oneshot_write, 183 nxt_epoll_enable_accept, 184 NULL, 185 NULL, 186 #if (NXT_HAVE_EVENTFD) 187 nxt_epoll_enable_post, 188 nxt_epoll_signal, 189 #else 190 NULL, 191 NULL, 192 #endif 193 nxt_epoll_poll, 194 195 &nxt_unix_conn_io, 196 197 #if (NXT_HAVE_INOTIFY) 198 NXT_FILE_EVENTS, 199 #else 200 NXT_NO_FILE_EVENTS, 201 #endif 202 203 #if (NXT_HAVE_SIGNALFD) 204 NXT_SIGNAL_EVENTS, 205 #else 206 NXT_NO_SIGNAL_EVENTS, 207 #endif 208 }; 209 210 211 #if (NXT_HAVE_EPOLL_EDGE) 212 213 static nxt_int_t 214 nxt_epoll_edge_create(nxt_event_engine_t *engine, nxt_uint_t mchanges, 215 nxt_uint_t mevents) 216 { 217 return nxt_epoll_create(engine, mchanges, mevents, &nxt_epoll_edge_conn_io, 218 EPOLLET | EPOLLRDHUP); 219 } 220 221 #endif 222 223 224 static nxt_int_t 225 nxt_epoll_level_create(nxt_event_engine_t *engine, nxt_uint_t mchanges, 226 nxt_uint_t mevents) 227 { 228 return nxt_epoll_create(engine, mchanges, mevents, 229 &nxt_unix_conn_io, 0); 230 } 231 232 233 static nxt_int_t 234 nxt_epoll_create(nxt_event_engine_t *engine, nxt_uint_t mchanges, 235 nxt_uint_t mevents, nxt_conn_io_t *io, uint32_t mode) 236 { 237 engine->u.epoll.fd = -1; 238 engine->u.epoll.mode = mode; 239 engine->u.epoll.mchanges = mchanges; 240 engine->u.epoll.mevents = mevents; 241 #if (NXT_HAVE_SIGNALFD) 242 engine->u.epoll.signalfd.fd = -1; 243 #endif 244 245 engine->u.epoll.changes = nxt_malloc(sizeof(nxt_epoll_change_t) * mchanges); 246 if (engine->u.epoll.changes == NULL) { 247 goto fail; 248 } 249 250 engine->u.epoll.events = nxt_malloc(sizeof(struct epoll_event) * mevents); 251 if (engine->u.epoll.events == NULL) { 252 goto fail; 253 } 254 255 engine->u.epoll.fd = epoll_create(1); 256 if (engine->u.epoll.fd == -1) { 257 nxt_alert(&engine->task, "epoll_create() failed %E", nxt_errno); 258 goto fail; 259 } 260 261 nxt_debug(&engine->task, "epoll_create(): %d", engine->u.epoll.fd); 262 263 if (engine->signals != NULL) { 264 265 #if (NXT_HAVE_SIGNALFD) 266 267 if (nxt_epoll_add_signal(engine) != NXT_OK) { 268 goto fail; 269 } 270 271 #endif 272 273 nxt_epoll_test_accept4(engine, io); 274 } 275 276 return NXT_OK; 277 278 fail: 279 280 nxt_epoll_free(engine); 281 282 return NXT_ERROR; 283 } 284 285 286 static void 287 nxt_epoll_test_accept4(nxt_event_engine_t *engine, nxt_conn_io_t *io) 288 { 289 static nxt_work_handler_t handler; 290 291 if (handler == NULL) { 292 293 handler = io->accept; 294 295 #if (NXT_HAVE_ACCEPT4) 296 297 (void) accept4(-1, NULL, NULL, SOCK_NONBLOCK); 298 299 if (nxt_errno != NXT_ENOSYS) { 300 handler = nxt_epoll_conn_io_accept4; 301 302 } else { 303 nxt_log(&engine->task, NXT_LOG_INFO, "accept4() failed %E", 304 NXT_ENOSYS); 305 } 306 307 #endif 308 } 309 310 io->accept = handler; 311 } 312 313 314 static void 315 nxt_epoll_free(nxt_event_engine_t *engine) 316 { 317 int fd; 318 319 nxt_debug(&engine->task, "epoll %d free", engine->u.epoll.fd); 320 321 #if (NXT_HAVE_SIGNALFD) 322 323 fd = engine->u.epoll.signalfd.fd; 324 325 if (fd != -1 && close(fd) != 0) { 326 nxt_alert(&engine->task, "signalfd close(%d) failed %E", fd, nxt_errno); 327 } 328 329 #endif 330 331 #if (NXT_HAVE_EVENTFD) 332 333 fd = engine->u.epoll.eventfd.fd; 334 335 if (fd != -1 && close(fd) != 0) { 336 nxt_alert(&engine->task, "eventfd close(%d) failed %E", fd, nxt_errno); 337 } 338 339 #endif 340 341 fd = engine->u.epoll.fd; 342 343 if (fd != -1 && close(fd) != 0) { 344 nxt_alert(&engine->task, "epoll close(%d) failed %E", fd, nxt_errno); 345 } 346 347 nxt_free(engine->u.epoll.events); 348 nxt_free(engine->u.epoll.changes); 349 350 nxt_memzero(&engine->u.epoll, sizeof(nxt_epoll_engine_t)); 351 } 352 353 354 static void 355 nxt_epoll_enable(nxt_event_engine_t *engine, nxt_fd_event_t *ev) 356 { 357 ev->read = NXT_EVENT_ACTIVE; 358 ev->write = NXT_EVENT_ACTIVE; 359 360 nxt_epoll_change(engine, ev, EPOLL_CTL_ADD, 361 EPOLLIN | EPOLLOUT | engine->u.epoll.mode); 362 } 363 364 365 static void 366 nxt_epoll_disable(nxt_event_engine_t *engine, nxt_fd_event_t *ev) 367 { 368 if (ev->read > NXT_EVENT_DISABLED || ev->write > NXT_EVENT_DISABLED) { 369 370 ev->read = NXT_EVENT_INACTIVE; 371 ev->write = NXT_EVENT_INACTIVE; 372 373 nxt_epoll_change(engine, ev, EPOLL_CTL_DEL, 0); 374 } 375 } 376 377 378 static void 379 nxt_epoll_delete(nxt_event_engine_t *engine, nxt_fd_event_t *ev) 380 { 381 if (ev->read != NXT_EVENT_INACTIVE || ev->write != NXT_EVENT_INACTIVE) { 382 383 ev->read = NXT_EVENT_INACTIVE; 384 ev->write = NXT_EVENT_INACTIVE; 385 386 nxt_epoll_change(engine, ev, EPOLL_CTL_DEL, 0); 387 } 388 } 389 390 391 /* 392 * Although calling close() on a file descriptor will remove any epoll 393 * events that reference the descriptor, in this case the close() acquires 394 * the kernel global "epmutex" while epoll_ctl(EPOLL_CTL_DEL) does not 395 * acquire the "epmutex" since Linux 3.13 if the file descriptor presents 396 * only in one epoll set. Thus removing events explicitly before closing 397 * eliminates possible lock contention. 398 */ 399 400 static nxt_bool_t 401 nxt_epoll_close(nxt_event_engine_t *engine, nxt_fd_event_t *ev) 402 { 403 nxt_epoll_delete(engine, ev); 404 405 return ev->changing; 406 } 407 408 409 static void 410 nxt_epoll_enable_read(nxt_event_engine_t *engine, nxt_fd_event_t *ev) 411 { 412 int op; 413 uint32_t events; 414 415 if (ev->read != NXT_EVENT_BLOCKED) { 416 417 op = EPOLL_CTL_MOD; 418 events = EPOLLIN | engine->u.epoll.mode; 419 420 if (ev->read == NXT_EVENT_INACTIVE && ev->write == NXT_EVENT_INACTIVE) { 421 op = EPOLL_CTL_ADD; 422 423 } else if (ev->write >= NXT_EVENT_BLOCKED) { 424 events |= EPOLLOUT; 425 } 426 427 nxt_epoll_change(engine, ev, op, events); 428 } 429 430 ev->read = NXT_EVENT_ACTIVE; 431 } 432 433 434 static void 435 nxt_epoll_enable_write(nxt_event_engine_t *engine, nxt_fd_event_t *ev) 436 { 437 int op; 438 uint32_t events; 439 440 if (ev->write != NXT_EVENT_BLOCKED) { 441 442 op = EPOLL_CTL_MOD; 443 events = EPOLLOUT | engine->u.epoll.mode; 444 445 if (ev->read == NXT_EVENT_INACTIVE && ev->write == NXT_EVENT_INACTIVE) { 446 op = EPOLL_CTL_ADD; 447 448 } else if (ev->read >= NXT_EVENT_BLOCKED) { 449 events |= EPOLLIN; 450 } 451 452 nxt_epoll_change(engine, ev, op, events); 453 } 454 455 ev->write = NXT_EVENT_ACTIVE; 456 } 457 458 459 static void 460 nxt_epoll_disable_read(nxt_event_engine_t *engine, nxt_fd_event_t *ev) 461 { 462 int op; 463 uint32_t events; 464 465 ev->read = NXT_EVENT_INACTIVE; 466 467 if (ev->write <= NXT_EVENT_DISABLED) { 468 ev->write = NXT_EVENT_INACTIVE; 469 op = EPOLL_CTL_DEL; 470 events = 0; 471 472 } else { 473 op = EPOLL_CTL_MOD; 474 events = EPOLLOUT | engine->u.epoll.mode; 475 } 476 477 nxt_epoll_change(engine, ev, op, events); 478 } 479 480 481 static void 482 nxt_epoll_disable_write(nxt_event_engine_t *engine, nxt_fd_event_t *ev) 483 { 484 int op; 485 uint32_t events; 486 487 ev->write = NXT_EVENT_INACTIVE; 488 489 if (ev->read <= NXT_EVENT_DISABLED) { 490 ev->read = NXT_EVENT_INACTIVE; 491 op = EPOLL_CTL_DEL; 492 events = 0; 493 494 } else { 495 op = EPOLL_CTL_MOD; 496 events = EPOLLIN | engine->u.epoll.mode; 497 } 498 499 nxt_epoll_change(engine, ev, op, events); 500 } 501 502 503 static void 504 nxt_epoll_block_read(nxt_event_engine_t *engine, nxt_fd_event_t *ev) 505 { 506 if (ev->read != NXT_EVENT_INACTIVE) { 507 ev->read = NXT_EVENT_BLOCKED; 508 } 509 } 510 511 512 static void 513 nxt_epoll_block_write(nxt_event_engine_t *engine, nxt_fd_event_t *ev) 514 { 515 if (ev->write != NXT_EVENT_INACTIVE) { 516 ev->write = NXT_EVENT_BLOCKED; 517 } 518 } 519 520 521 /* 522 * NXT_EVENT_DISABLED state is used to track whether EPOLLONESHOT 523 * event should be added or modified, epoll_ctl(2): 524 * 525 * EPOLLONESHOT (since Linux 2.6.2) 526 * Sets the one-shot behavior for the associated file descriptor. 527 * This means that after an event is pulled out with epoll_wait(2) 528 * the associated file descriptor is internally disabled and no 529 * other events will be reported by the epoll interface. The user 530 * must call epoll_ctl() with EPOLL_CTL_MOD to rearm the file 531 * descriptor with a new event mask. 532 */ 533 534 static void 535 nxt_epoll_oneshot_read(nxt_event_engine_t *engine, nxt_fd_event_t *ev) 536 { 537 int op; 538 539 op = (ev->read == NXT_EVENT_INACTIVE && ev->write == NXT_EVENT_INACTIVE) ? 540 EPOLL_CTL_ADD : EPOLL_CTL_MOD; 541 542 ev->read = NXT_EVENT_ONESHOT; 543 ev->write = NXT_EVENT_INACTIVE; 544 545 nxt_epoll_change(engine, ev, op, EPOLLIN | EPOLLONESHOT); 546 } 547 548 549 static void 550 nxt_epoll_oneshot_write(nxt_event_engine_t *engine, nxt_fd_event_t *ev) 551 { 552 int op; 553 554 op = (ev->read == NXT_EVENT_INACTIVE && ev->write == NXT_EVENT_INACTIVE) ? 555 EPOLL_CTL_ADD : EPOLL_CTL_MOD; 556 557 ev->read = NXT_EVENT_INACTIVE; 558 ev->write = NXT_EVENT_ONESHOT; 559 560 nxt_epoll_change(engine, ev, op, EPOLLOUT | EPOLLONESHOT); 561 } 562 563 564 static void 565 nxt_epoll_enable_accept(nxt_event_engine_t *engine, nxt_fd_event_t *ev) 566 { 567 uint32_t events; 568 569 ev->read = NXT_EVENT_ACTIVE; 570 571 events = EPOLLIN; 572 573 #ifdef EPOLLEXCLUSIVE 574 events |= EPOLLEXCLUSIVE; 575 #endif 576 577 nxt_epoll_change(engine, ev, EPOLL_CTL_ADD, events); 578 } 579 580 581 /* 582 * epoll changes are batched to improve instruction and data cache 583 * locality of several epoll_ctl() calls followed by epoll_wait() call. 584 */ 585 586 static void 587 nxt_epoll_change(nxt_event_engine_t *engine, nxt_fd_event_t *ev, int op, 588 uint32_t events) 589 { 590 nxt_epoll_change_t *change; 591 592 nxt_debug(ev->task, "epoll %d set event: fd:%d op:%d ev:%XD", 593 engine->u.epoll.fd, ev->fd, op, events); 594 595 if (engine->u.epoll.nchanges >= engine->u.epoll.mchanges) { 596 (void) nxt_epoll_commit_changes(engine); 597 } 598 599 ev->changing = 1; 600 601 change = &engine->u.epoll.changes[engine->u.epoll.nchanges++]; 602 change->op = op; 603 change->event.events = events; 604 change->event.data.ptr = ev; 605 } 606 607 608 static nxt_int_t 609 nxt_epoll_commit_changes(nxt_event_engine_t *engine) 610 { 611 int ret; 612 nxt_int_t retval; 613 nxt_fd_event_t *ev; 614 nxt_epoll_change_t *change, *end; 615 616 nxt_debug(&engine->task, "epoll %d changes:%ui", 617 engine->u.epoll.fd, engine->u.epoll.nchanges); 618 619 retval = NXT_OK; 620 change = engine->u.epoll.changes; 621 end = change + engine->u.epoll.nchanges; 622 623 do { 624 ev = change->event.data.ptr; 625 ev->changing = 0; 626 627 nxt_debug(ev->task, "epoll_ctl(%d): fd:%d op:%d ev:%XD", 628 engine->u.epoll.fd, ev->fd, change->op, 629 change->event.events); 630 631 ret = epoll_ctl(engine->u.epoll.fd, change->op, ev->fd, &change->event); 632 633 if (nxt_slow_path(ret != 0)) { 634 nxt_alert(ev->task, "epoll_ctl(%d, %d, %d) failed %E", 635 engine->u.epoll.fd, change->op, ev->fd, nxt_errno); 636 637 nxt_work_queue_add(&engine->fast_work_queue, 638 nxt_epoll_error_handler, ev->task, ev, ev->data); 639 640 retval = NXT_ERROR; 641 } 642 643 change++; 644 645 } while (change < end); 646 647 engine->u.epoll.nchanges = 0; 648 649 return retval; 650 } 651 652 653 static void 654 nxt_epoll_error_handler(nxt_task_t *task, void *obj, void *data) 655 { 656 nxt_fd_event_t *ev; 657 658 ev = obj; 659 660 ev->read = NXT_EVENT_INACTIVE; 661 ev->write = NXT_EVENT_INACTIVE; 662 663 ev->error_handler(ev->task, ev, data); 664 } 665 666 667 #if (NXT_HAVE_SIGNALFD) 668 669 static nxt_int_t 670 nxt_epoll_add_signal(nxt_event_engine_t *engine) 671 { 672 int fd; 673 struct epoll_event ee; 674 675 if (sigprocmask(SIG_BLOCK, &engine->signals->sigmask, NULL) != 0) { 676 nxt_alert(&engine->task, "sigprocmask(SIG_BLOCK) failed %E", nxt_errno); 677 return NXT_ERROR; 678 } 679 680 /* 681 * Glibc signalfd() wrapper always has the flags argument. Glibc 2.7 682 * and 2.8 signalfd() wrappers call the original signalfd() syscall 683 * without the flags argument. Glibc 2.9+ signalfd() wrapper at first 684 * tries to call signalfd4() syscall and if it fails then calls the 685 * original signalfd() syscall. For this reason the non-blocking mode 686 * is set separately. 687 */ 688 689 fd = signalfd(-1, &engine->signals->sigmask, 0); 690 691 if (fd == -1) { 692 nxt_alert(&engine->task, "signalfd(%d) failed %E", 693 engine->u.epoll.signalfd.fd, nxt_errno); 694 return NXT_ERROR; 695 } 696 697 engine->u.epoll.signalfd.fd = fd; 698 699 if (nxt_fd_nonblocking(&engine->task, fd) != NXT_OK) { 700 return NXT_ERROR; 701 } 702 703 nxt_debug(&engine->task, "signalfd(): %d", fd); 704 705 engine->u.epoll.signalfd.data = engine->signals->handler; 706 engine->u.epoll.signalfd.read_work_queue = &engine->fast_work_queue; 707 engine->u.epoll.signalfd.read_handler = nxt_epoll_signalfd_handler; 708 engine->u.epoll.signalfd.log = engine->task.log; 709 engine->u.epoll.signalfd.task = &engine->task; 710 711 ee.events = EPOLLIN; 712 ee.data.ptr = &engine->u.epoll.signalfd; 713 714 if (epoll_ctl(engine->u.epoll.fd, EPOLL_CTL_ADD, fd, &ee) != 0) { 715 nxt_alert(&engine->task, "epoll_ctl(%d, %d, %d) failed %E", 716 engine->u.epoll.fd, EPOLL_CTL_ADD, fd, nxt_errno); 717 718 return NXT_ERROR; 719 } 720 721 return NXT_OK; 722 } 723 724 725 static void 726 nxt_epoll_signalfd_handler(nxt_task_t *task, void *obj, void *data) 727 { 728 int n; 729 nxt_fd_event_t *ev; 730 nxt_work_handler_t handler; 731 struct signalfd_siginfo sfd; 732 733 ev = obj; 734 handler = data; 735 736 nxt_debug(task, "signalfd handler"); 737 738 n = read(ev->fd, &sfd, sizeof(struct signalfd_siginfo)); 739 740 nxt_debug(task, "read signalfd(%d): %d", ev->fd, n); 741 742 if (n != sizeof(struct signalfd_siginfo)) { 743 nxt_alert(task, "read signalfd(%d) failed %E", ev->fd, nxt_errno); 744 return; 745 } 746 747 nxt_debug(task, "signalfd(%d) signo:%d", ev->fd, sfd.ssi_signo); 748 749 handler(task, (void *) (uintptr_t) sfd.ssi_signo, NULL); 750 } 751 752 #endif 753 754 755 #if (NXT_HAVE_EVENTFD) 756 757 static nxt_int_t 758 nxt_epoll_enable_post(nxt_event_engine_t *engine, nxt_work_handler_t handler) 759 { 760 int ret; 761 struct epoll_event ee; 762 763 engine->u.epoll.post_handler = handler; 764 765 /* 766 * Glibc eventfd() wrapper always has the flags argument. Glibc 2.7 767 * and 2.8 eventfd() wrappers call the original eventfd() syscall 768 * without the flags argument. Glibc 2.9+ eventfd() wrapper at first 769 * tries to call eventfd2() syscall and if it fails then calls the 770 * original eventfd() syscall. For this reason the non-blocking mode 771 * is set separately. 772 */ 773 774 engine->u.epoll.eventfd.fd = eventfd(0, 0); 775 776 if (engine->u.epoll.eventfd.fd == -1) { 777 nxt_alert(&engine->task, "eventfd() failed %E", nxt_errno); 778 return NXT_ERROR; 779 } 780 781 ret = nxt_fd_nonblocking(&engine->task, engine->u.epoll.eventfd.fd); 782 if (nxt_slow_path(ret != NXT_OK)) { 783 return NXT_ERROR; 784 } 785 786 nxt_debug(&engine->task, "eventfd(): %d", engine->u.epoll.eventfd.fd); 787 788 engine->u.epoll.eventfd.read_work_queue = &engine->fast_work_queue; 789 engine->u.epoll.eventfd.read_handler = nxt_epoll_eventfd_handler; 790 engine->u.epoll.eventfd.data = engine; 791 engine->u.epoll.eventfd.log = engine->task.log; 792 engine->u.epoll.eventfd.task = &engine->task; 793 794 ee.events = EPOLLIN | EPOLLET; 795 ee.data.ptr = &engine->u.epoll.eventfd; 796 797 ret = epoll_ctl(engine->u.epoll.fd, EPOLL_CTL_ADD, 798 engine->u.epoll.eventfd.fd, &ee); 799 800 if (nxt_fast_path(ret == 0)) { 801 return NXT_OK; 802 } 803 804 nxt_alert(&engine->task, "epoll_ctl(%d, %d, %d) failed %E", 805 engine->u.epoll.fd, EPOLL_CTL_ADD, engine->u.epoll.eventfd.fd, 806 nxt_errno); 807 808 return NXT_ERROR; 809 } 810 811 812 static void 813 nxt_epoll_eventfd_handler(nxt_task_t *task, void *obj, void *data) 814 { 815 int n; 816 uint64_t events; 817 nxt_event_engine_t *engine; 818 819 engine = data; 820 821 nxt_debug(task, "eventfd handler, times:%ui", engine->u.epoll.neventfd); 822 823 /* 824 * The maximum value after write() to a eventfd() descriptor will 825 * block or return EAGAIN is 0xFFFFFFFFFFFFFFFE, so the descriptor 826 * can be read once per many notifications, for example, once per 827 * 2^32-2 noticifcations. Since the eventfd() file descriptor is 828 * always registered in EPOLLET mode, epoll returns event about 829 * only the latest write() to the descriptor. 830 */ 831 832 if (engine->u.epoll.neventfd++ >= 0xFFFFFFFE) { 833 engine->u.epoll.neventfd = 0; 834 835 n = read(engine->u.epoll.eventfd.fd, &events, sizeof(uint64_t)); 836 837 nxt_debug(task, "read(%d): %d events:%uL", 838 engine->u.epoll.eventfd.fd, n, events); 839 840 if (n != sizeof(uint64_t)) { 841 nxt_alert(task, "read eventfd(%d) failed %E", 842 engine->u.epoll.eventfd.fd, nxt_errno); 843 } 844 } 845 846 engine->u.epoll.post_handler(task, NULL, NULL); 847 } 848 849 850 static void 851 nxt_epoll_signal(nxt_event_engine_t *engine, nxt_uint_t signo) 852 { 853 size_t ret; 854 uint64_t event; 855 856 /* 857 * eventfd() presents along with signalfd(), so the function 858 * is used only to post events and the signo argument is ignored. 859 */ 860 861 event = 1; 862 863 ret = write(engine->u.epoll.eventfd.fd, &event, sizeof(uint64_t)); 864 865 if (nxt_slow_path(ret != sizeof(uint64_t))) { 866 nxt_alert(&engine->task, "write(%d) to eventfd failed %E", 867 engine->u.epoll.eventfd.fd, nxt_errno); 868 } 869 } 870 871 #endif 872 873 874 static void 875 nxt_epoll_poll(nxt_event_engine_t *engine, nxt_msec_t timeout) 876 { 877 int nevents; 878 uint32_t events; 879 nxt_int_t i; 880 nxt_err_t err; 881 nxt_bool_t error; 882 nxt_uint_t level; 883 nxt_fd_event_t *ev; 884 struct epoll_event *event; 885 886 if (engine->u.epoll.nchanges != 0) { 887 if (nxt_epoll_commit_changes(engine) != NXT_OK) { 888 /* Error handlers have been enqueued on failure. */ 889 timeout = 0; 890 } 891 } 892 893 nxt_debug(&engine->task, "epoll_wait(%d) timeout:%M", 894 engine->u.epoll.fd, timeout); 895 896 nevents = epoll_wait(engine->u.epoll.fd, engine->u.epoll.events, 897 engine->u.epoll.mevents, timeout); 898 899 err = (nevents == -1) ? nxt_errno : 0; 900 901 nxt_thread_time_update(engine->task.thread); 902 903 nxt_debug(&engine->task, "epoll_wait(%d): %d", engine->u.epoll.fd, nevents); 904 905 if (nevents == -1) { 906 level = (err == NXT_EINTR) ? NXT_LOG_INFO : NXT_LOG_ALERT; 907 908 nxt_log(&engine->task, level, "epoll_wait(%d) failed %E", 909 engine->u.epoll.fd, err); 910 911 return; 912 } 913 914 for (i = 0; i < nevents; i++) { 915 916 event = &engine->u.epoll.events[i]; 917 events = event->events; 918 ev = event->data.ptr; 919 920 nxt_debug(ev->task, "epoll: fd:%d ev:%04XD d:%p rd:%d wr:%d", 921 ev->fd, events, ev, ev->read, ev->write); 922 923 /* 924 * On error epoll may set EPOLLERR and EPOLLHUP only without EPOLLIN or 925 * EPOLLOUT, so the "error" variable enqueues only one active handler. 926 */ 927 error = ((events & (EPOLLERR | EPOLLHUP)) != 0); 928 ev->epoll_error = error; 929 930 #if (NXT_HAVE_EPOLL_EDGE) 931 932 ev->epoll_eof = ((events & EPOLLRDHUP) != 0); 933 934 #endif 935 936 if ((events & EPOLLIN) || error) { 937 ev->read_ready = 1; 938 939 if (ev->read != NXT_EVENT_BLOCKED) { 940 941 if (ev->read == NXT_EVENT_ONESHOT) { 942 ev->read = NXT_EVENT_DISABLED; 943 } 944 945 nxt_work_queue_add(ev->read_work_queue, ev->read_handler, 946 ev->task, ev, ev->data); 947 948 } else if (engine->u.epoll.mode == 0) { 949 /* Level-triggered mode. */ 950 nxt_epoll_disable_read(engine, ev); 951 } 952 } 953 954 if ((events & EPOLLOUT) || error) { 955 ev->write_ready = 1; 956 957 if (ev->write != NXT_EVENT_BLOCKED) { 958 959 if (ev->write == NXT_EVENT_ONESHOT) { 960 ev->write = NXT_EVENT_DISABLED; 961 } 962 963 nxt_work_queue_add(ev->write_work_queue, ev->write_handler, 964 ev->task, ev, ev->data); 965 966 } else if (engine->u.epoll.mode == 0) { 967 /* Level-triggered mode. */ 968 nxt_epoll_disable_write(engine, ev); 969 } 970 } 971 } 972 } 973 974 975 #if (NXT_HAVE_ACCEPT4) 976 977 static void 978 nxt_epoll_conn_io_accept4(nxt_task_t *task, void *obj, void *data) 979 { 980 socklen_t socklen; 981 nxt_conn_t *c; 982 nxt_socket_t s; 983 struct sockaddr *sa; 984 nxt_listen_event_t *lev; 985 986 lev = obj; 987 c = lev->next; 988 989 lev->ready--; 990 lev->socket.read_ready = (lev->ready != 0); 991 992 sa = &c->remote->u.sockaddr; 993 socklen = c->remote->socklen; 994 /* 995 * The returned socklen is ignored here, 996 * see comment in nxt_conn_io_accept(). 997 */ 998 s = accept4(lev->socket.fd, sa, &socklen, SOCK_NONBLOCK); 999 1000 if (s != -1) { 1001 c->socket.fd = s; 1002 1003 nxt_debug(task, "accept4(%d): %d", lev->socket.fd, s); 1004 1005 nxt_conn_accept(task, lev, c); 1006 return; 1007 } 1008 1009 nxt_conn_accept_error(task, lev, "accept4", nxt_errno); 1010 } 1011 1012 #endif 1013 1014 1015 #if (NXT_HAVE_EPOLL_EDGE) 1016 1017 /* 1018 * nxt_epoll_edge_event_conn_io_connect() eliminates the getsockopt() 1019 * syscall to test pending connect() error. Although this special 1020 * interface can work in both edge-triggered and level-triggered 1021 * modes it is enabled only for the former mode because this mode is 1022 * available in all modern Linux distributions. For the latter mode 1023 * it is required to create additional nxt_epoll_level_event_conn_io 1024 * with single non-generic connect() interface. 1025 */ 1026 1027 static void 1028 nxt_epoll_edge_conn_io_connect(nxt_task_t *task, void *obj, void *data) 1029 { 1030 nxt_conn_t *c; 1031 nxt_event_engine_t *engine; 1032 nxt_work_handler_t handler; 1033 const nxt_event_conn_state_t *state; 1034 1035 c = obj; 1036 1037 state = c->write_state; 1038 1039 switch (nxt_socket_connect(task, c->socket.fd, c->remote) ){ 1040 1041 case NXT_OK: 1042 c->socket.write_ready = 1; 1043 handler = state->ready_handler; 1044 break; 1045 1046 case NXT_AGAIN: 1047 c->socket.write_handler = nxt_epoll_edge_conn_connected; 1048 c->socket.error_handler = nxt_conn_connect_error; 1049 1050 engine = task->thread->engine; 1051 nxt_conn_timer(engine, c, state, &c->write_timer); 1052 1053 nxt_epoll_enable(engine, &c->socket); 1054 c->socket.read = NXT_EVENT_BLOCKED; 1055 return; 1056 1057 #if 0 1058 case NXT_AGAIN: 1059 nxt_conn_timer(engine, c, state, &c->write_timer); 1060 1061 /* Fall through. */ 1062 1063 case NXT_OK: 1064 /* 1065 * Mark both read and write directions as ready and try to perform 1066 * I/O operations before receiving readiness notifications. 1067 * On unconnected socket Linux send() and recv() return EAGAIN 1068 * instead of ENOTCONN. 1069 */ 1070 c->socket.read_ready = 1; 1071 c->socket.write_ready = 1; 1072 /* 1073 * Enabling both read and write notifications on a getting 1074 * connected socket eliminates one epoll_ctl() syscall. 1075 */ 1076 c->socket.write_handler = nxt_epoll_edge_event_conn_connected; 1077 c->socket.error_handler = state->error_handler; 1078 1079 nxt_epoll_enable(engine, &c->socket); 1080 c->socket.read = NXT_EVENT_BLOCKED; 1081 1082 handler = state->ready_handler; 1083 break; 1084 #endif 1085 1086 case NXT_ERROR: 1087 handler = state->error_handler; 1088 break; 1089 1090 default: /* NXT_DECLINED: connection refused. */ 1091 handler = state->close_handler; 1092 break; 1093 } 1094 1095 nxt_work_queue_add(c->write_work_queue, handler, task, c, data); 1096 } 1097 1098 1099 static void 1100 nxt_epoll_edge_conn_connected(nxt_task_t *task, void *obj, void *data) 1101 { 1102 nxt_conn_t *c; 1103 1104 c = obj; 1105 1106 nxt_debug(task, "epoll event conn connected fd:%d", c->socket.fd); 1107 1108 if (!c->socket.epoll_error) { 1109 c->socket.write = NXT_EVENT_BLOCKED; 1110 1111 if (c->write_state->timer_autoreset) { 1112 nxt_timer_disable(task->thread->engine, &c->write_timer); 1113 } 1114 1115 nxt_work_queue_add(c->write_work_queue, c->write_state->ready_handler, 1116 task, c, data); 1117 return; 1118 } 1119 1120 nxt_conn_connect_test(task, c, data); 1121 } 1122 1123 1124 /* 1125 * nxt_epoll_edge_conn_io_recvbuf() is just wrapper around 1126 * standard nxt_conn_io_recvbuf() to enforce to read a pending EOF 1127 * in edge-triggered mode. 1128 */ 1129 1130 static ssize_t 1131 nxt_epoll_edge_conn_io_recvbuf(nxt_conn_t *c, nxt_buf_t *b) 1132 { 1133 ssize_t n; 1134 1135 n = nxt_conn_io_recvbuf(c, b); 1136 1137 if (n > 0 && c->socket.epoll_eof) { 1138 c->socket.read_ready = 1; 1139 } 1140 1141 return n; 1142 } 1143 1144 #endif 1145