1 2 /* 3 * Copyright (C) Igor Sysoev 4 * Copyright (C) NGINX, Inc. 5 */ 6 7 #include <nxt_main.h> 8 9 10 /* 11 * The first epoll version has been introduced in Linux 2.5.44. The 12 * interface was changed several times since then and the final version 13 * of epoll_create(), epoll_ctl(), epoll_wait(), and EPOLLET mode has 14 * been introduced in Linux 2.6.0 and is supported since glibc 2.3.2. 15 * 16 * EPOLLET mode did not work reliable in early implementaions and in 17 * Linux 2.4 backport. 18 * 19 * EPOLLONESHOT Linux 2.6.2, glibc 2.3. 20 * EPOLLRDHUP Linux 2.6.17, glibc 2.8. 21 * epoll_pwait() Linux 2.6.19, glibc 2.6. 22 * signalfd() Linux 2.6.22, glibc 2.7. 23 * eventfd() Linux 2.6.22, glibc 2.7. 24 * timerfd_create() Linux 2.6.25, glibc 2.8. 25 * epoll_create1() Linux 2.6.27, glibc 2.9. 26 * signalfd4() Linux 2.6.27, glibc 2.9. 27 * eventfd2() Linux 2.6.27, glibc 2.9. 28 * accept4() Linux 2.6.28, glibc 2.10. 29 * eventfd2(EFD_SEMAPHORE) Linux 2.6.30, glibc 2.10. 30 * EPOLLEXCLUSIVE Linux 4.5, glibc 2.24. 31 */ 32 33 34 #if (NXT_HAVE_EPOLL_EDGE) 35 static nxt_int_t nxt_epoll_edge_create(nxt_event_engine_t *engine, 36 nxt_uint_t mchanges, nxt_uint_t mevents); 37 #endif 38 static nxt_int_t nxt_epoll_level_create(nxt_event_engine_t *engine, 39 nxt_uint_t mchanges, nxt_uint_t mevents); 40 static nxt_int_t nxt_epoll_create(nxt_event_engine_t *engine, 41 nxt_uint_t mchanges, nxt_uint_t mevents, nxt_conn_io_t *io, uint32_t mode); 42 static void nxt_epoll_test_accept4(nxt_event_engine_t *engine, 43 nxt_conn_io_t *io); 44 static void nxt_epoll_free(nxt_event_engine_t *engine); 45 static void nxt_epoll_enable(nxt_event_engine_t *engine, nxt_fd_event_t *ev); 46 static void nxt_epoll_disable(nxt_event_engine_t *engine, nxt_fd_event_t *ev); 47 static void nxt_epoll_delete(nxt_event_engine_t *engine, nxt_fd_event_t *ev); 48 static nxt_bool_t nxt_epoll_close(nxt_event_engine_t *engine, 49 nxt_fd_event_t *ev); 50 static void nxt_epoll_enable_read(nxt_event_engine_t *engine, 51 nxt_fd_event_t *ev); 52 static void nxt_epoll_enable_write(nxt_event_engine_t *engine, 53 nxt_fd_event_t *ev); 54 static void nxt_epoll_disable_read(nxt_event_engine_t *engine, 55 nxt_fd_event_t *ev); 56 static void nxt_epoll_disable_write(nxt_event_engine_t *engine, 57 nxt_fd_event_t *ev); 58 static void nxt_epoll_block_read(nxt_event_engine_t *engine, 59 nxt_fd_event_t *ev); 60 static void nxt_epoll_block_write(nxt_event_engine_t *engine, 61 nxt_fd_event_t *ev); 62 static void nxt_epoll_oneshot_read(nxt_event_engine_t *engine, 63 nxt_fd_event_t *ev); 64 static void nxt_epoll_oneshot_write(nxt_event_engine_t *engine, 65 nxt_fd_event_t *ev); 66 static void nxt_epoll_enable_accept(nxt_event_engine_t *engine, 67 nxt_fd_event_t *ev); 68 static void nxt_epoll_change(nxt_event_engine_t *engine, nxt_fd_event_t *ev, 69 int op, uint32_t events); 70 static void nxt_epoll_commit_changes(nxt_event_engine_t *engine); 71 static void nxt_epoll_error_handler(nxt_task_t *task, void *obj, void *data); 72 #if (NXT_HAVE_SIGNALFD) 73 static nxt_int_t nxt_epoll_add_signal(nxt_event_engine_t *engine); 74 static void nxt_epoll_signalfd_handler(nxt_task_t *task, void *obj, void *data); 75 #endif 76 #if (NXT_HAVE_EVENTFD) 77 static nxt_int_t nxt_epoll_enable_post(nxt_event_engine_t *engine, 78 nxt_work_handler_t handler); 79 static void nxt_epoll_eventfd_handler(nxt_task_t *task, void *obj, void *data); 80 static void nxt_epoll_signal(nxt_event_engine_t *engine, nxt_uint_t signo); 81 #endif 82 static void nxt_epoll_poll(nxt_event_engine_t *engine, nxt_msec_t timeout); 83 84 #if (NXT_HAVE_ACCEPT4) 85 static void nxt_epoll_conn_io_accept4(nxt_task_t *task, void *obj, 86 void *data); 87 #endif 88 89 90 #if (NXT_HAVE_EPOLL_EDGE) 91 92 static void nxt_epoll_edge_conn_io_connect(nxt_task_t *task, void *obj, 93 void *data); 94 static void nxt_epoll_edge_conn_connected(nxt_task_t *task, void *obj, 95 void *data); 96 static ssize_t nxt_epoll_edge_conn_io_recvbuf(nxt_conn_t *c, nxt_buf_t *b); 97 98 99 static nxt_conn_io_t nxt_epoll_edge_conn_io = { 100 .connect = nxt_epoll_edge_conn_io_connect, 101 .accept = nxt_conn_io_accept, 102 103 .read = nxt_conn_io_read, 104 .recvbuf = nxt_epoll_edge_conn_io_recvbuf, 105 .recv = nxt_conn_io_recv, 106 107 .write = nxt_conn_io_write, 108 .sendbuf = nxt_conn_io_sendbuf, 109 110 #if (NXT_HAVE_LINUX_SENDFILE) 111 .old_sendbuf = nxt_linux_event_conn_io_sendfile, 112 #else 113 .old_sendbuf = nxt_event_conn_io_sendbuf, 114 #endif 115 116 .writev = nxt_event_conn_io_writev, 117 .send = nxt_event_conn_io_send, 118 }; 119 120 121 const nxt_event_interface_t nxt_epoll_edge_engine = { 122 "epoll_edge", 123 nxt_epoll_edge_create, 124 nxt_epoll_free, 125 nxt_epoll_enable, 126 nxt_epoll_disable, 127 nxt_epoll_delete, 128 nxt_epoll_close, 129 nxt_epoll_enable_read, 130 nxt_epoll_enable_write, 131 nxt_epoll_disable_read, 132 nxt_epoll_disable_write, 133 nxt_epoll_block_read, 134 nxt_epoll_block_write, 135 nxt_epoll_oneshot_read, 136 nxt_epoll_oneshot_write, 137 nxt_epoll_enable_accept, 138 NULL, 139 NULL, 140 #if (NXT_HAVE_EVENTFD) 141 nxt_epoll_enable_post, 142 nxt_epoll_signal, 143 #else 144 NULL, 145 NULL, 146 #endif 147 nxt_epoll_poll, 148 149 &nxt_epoll_edge_conn_io, 150 151 #if (NXT_HAVE_INOTIFY) 152 NXT_FILE_EVENTS, 153 #else 154 NXT_NO_FILE_EVENTS, 155 #endif 156 157 #if (NXT_HAVE_SIGNALFD) 158 NXT_SIGNAL_EVENTS, 159 #else 160 NXT_NO_SIGNAL_EVENTS, 161 #endif 162 }; 163 164 #endif 165 166 167 const nxt_event_interface_t nxt_epoll_level_engine = { 168 "epoll_level", 169 nxt_epoll_level_create, 170 nxt_epoll_free, 171 nxt_epoll_enable, 172 nxt_epoll_disable, 173 nxt_epoll_delete, 174 nxt_epoll_close, 175 nxt_epoll_enable_read, 176 nxt_epoll_enable_write, 177 nxt_epoll_disable_read, 178 nxt_epoll_disable_write, 179 nxt_epoll_block_read, 180 nxt_epoll_block_write, 181 nxt_epoll_oneshot_read, 182 nxt_epoll_oneshot_write, 183 nxt_epoll_enable_accept, 184 NULL, 185 NULL, 186 #if (NXT_HAVE_EVENTFD) 187 nxt_epoll_enable_post, 188 nxt_epoll_signal, 189 #else 190 NULL, 191 NULL, 192 #endif 193 nxt_epoll_poll, 194 195 &nxt_unix_conn_io, 196 197 #if (NXT_HAVE_INOTIFY) 198 NXT_FILE_EVENTS, 199 #else 200 NXT_NO_FILE_EVENTS, 201 #endif 202 203 #if (NXT_HAVE_SIGNALFD) 204 NXT_SIGNAL_EVENTS, 205 #else 206 NXT_NO_SIGNAL_EVENTS, 207 #endif 208 }; 209 210 211 #if (NXT_HAVE_EPOLL_EDGE) 212 213 static nxt_int_t 214 nxt_epoll_edge_create(nxt_event_engine_t *engine, nxt_uint_t mchanges, 215 nxt_uint_t mevents) 216 { 217 return nxt_epoll_create(engine, mchanges, mevents, &nxt_epoll_edge_conn_io, 218 EPOLLET | EPOLLRDHUP); 219 } 220 221 #endif 222 223 224 static nxt_int_t 225 nxt_epoll_level_create(nxt_event_engine_t *engine, nxt_uint_t mchanges, 226 nxt_uint_t mevents) 227 { 228 return nxt_epoll_create(engine, mchanges, mevents, 229 &nxt_unix_conn_io, 0); 230 } 231 232 233 static nxt_int_t 234 nxt_epoll_create(nxt_event_engine_t *engine, nxt_uint_t mchanges, 235 nxt_uint_t mevents, nxt_conn_io_t *io, uint32_t mode) 236 { 237 engine->u.epoll.fd = -1; 238 engine->u.epoll.mode = mode; 239 engine->u.epoll.mchanges = mchanges; 240 engine->u.epoll.mevents = mevents; 241 #if (NXT_HAVE_SIGNALFD) 242 engine->u.epoll.signalfd.fd = -1; 243 #endif 244 245 engine->u.epoll.changes = nxt_malloc(sizeof(nxt_epoll_change_t) * mchanges); 246 if (engine->u.epoll.changes == NULL) { 247 goto fail; 248 } 249 250 engine->u.epoll.events = nxt_malloc(sizeof(struct epoll_event) * mevents); 251 if (engine->u.epoll.events == NULL) { 252 goto fail; 253 } 254 255 engine->u.epoll.fd = epoll_create(1); 256 if (engine->u.epoll.fd == -1) { 257 nxt_alert(&engine->task, "epoll_create() failed %E", nxt_errno); 258 goto fail; 259 } 260 261 nxt_debug(&engine->task, "epoll_create(): %d", engine->u.epoll.fd); 262 263 if (engine->signals != NULL) { 264 265 #if (NXT_HAVE_SIGNALFD) 266 267 if (nxt_epoll_add_signal(engine) != NXT_OK) { 268 goto fail; 269 } 270 271 #endif 272 273 nxt_epoll_test_accept4(engine, io); 274 } 275 276 return NXT_OK; 277 278 fail: 279 280 nxt_epoll_free(engine); 281 282 return NXT_ERROR; 283 } 284 285 286 static void 287 nxt_epoll_test_accept4(nxt_event_engine_t *engine, nxt_conn_io_t *io) 288 { 289 static nxt_work_handler_t handler; 290 291 if (handler == NULL) { 292 293 handler = io->accept; 294 295 #if (NXT_HAVE_ACCEPT4) 296 297 (void) accept4(-1, NULL, NULL, SOCK_NONBLOCK); 298 299 if (nxt_errno != NXT_ENOSYS) { 300 handler = nxt_epoll_conn_io_accept4; 301 302 } else { 303 nxt_log(&engine->task, NXT_LOG_INFO, "accept4() failed %E", 304 NXT_ENOSYS); 305 } 306 307 #endif 308 } 309 310 io->accept = handler; 311 } 312 313 314 static void 315 nxt_epoll_free(nxt_event_engine_t *engine) 316 { 317 int fd; 318 319 nxt_debug(&engine->task, "epoll %d free", engine->u.epoll.fd); 320 321 #if (NXT_HAVE_SIGNALFD) 322 323 fd = engine->u.epoll.signalfd.fd; 324 325 if (fd != -1 && close(fd) != 0) { 326 nxt_alert(&engine->task, "signalfd close(%d) failed %E", fd, nxt_errno); 327 } 328 329 #endif 330 331 #if (NXT_HAVE_EVENTFD) 332 333 fd = engine->u.epoll.eventfd.fd; 334 335 if (fd != -1 && close(fd) != 0) { 336 nxt_alert(&engine->task, "eventfd close(%d) failed %E", fd, nxt_errno); 337 } 338 339 #endif 340 341 fd = engine->u.epoll.fd; 342 343 if (fd != -1 && close(fd) != 0) { 344 nxt_alert(&engine->task, "epoll close(%d) failed %E", fd, nxt_errno); 345 } 346 347 nxt_free(engine->u.epoll.events); 348 nxt_free(engine->u.epoll.changes); 349 350 nxt_memzero(&engine->u.epoll, sizeof(nxt_epoll_engine_t)); 351 } 352 353 354 static void 355 nxt_epoll_enable(nxt_event_engine_t *engine, nxt_fd_event_t *ev) 356 { 357 ev->read = NXT_EVENT_ACTIVE; 358 ev->write = NXT_EVENT_ACTIVE; 359 360 nxt_epoll_change(engine, ev, EPOLL_CTL_ADD, 361 EPOLLIN | EPOLLOUT | engine->u.epoll.mode); 362 } 363 364 365 static void 366 nxt_epoll_disable(nxt_event_engine_t *engine, nxt_fd_event_t *ev) 367 { 368 if (ev->read > NXT_EVENT_DISABLED || ev->write > NXT_EVENT_DISABLED) { 369 370 ev->read = NXT_EVENT_INACTIVE; 371 ev->write = NXT_EVENT_INACTIVE; 372 373 nxt_epoll_change(engine, ev, EPOLL_CTL_DEL, 0); 374 } 375 } 376 377 378 static void 379 nxt_epoll_delete(nxt_event_engine_t *engine, nxt_fd_event_t *ev) 380 { 381 if (ev->read != NXT_EVENT_INACTIVE || ev->write != NXT_EVENT_INACTIVE) { 382 383 ev->read = NXT_EVENT_INACTIVE; 384 ev->write = NXT_EVENT_INACTIVE; 385 386 nxt_epoll_change(engine, ev, EPOLL_CTL_DEL, 0); 387 } 388 } 389 390 391 /* 392 * Although calling close() on a file descriptor will remove any epoll 393 * events that reference the descriptor, in this case the close() acquires 394 * the kernel global "epmutex" while epoll_ctl(EPOLL_CTL_DEL) does not 395 * acquire the "epmutex" since Linux 3.13 if the file descriptor presents 396 * only in one epoll set. Thus removing events explicitly before closing 397 * eliminates possible lock contention. 398 */ 399 400 static nxt_bool_t 401 nxt_epoll_close(nxt_event_engine_t *engine, nxt_fd_event_t *ev) 402 { 403 nxt_epoll_delete(engine, ev); 404 405 return ev->changing; 406 } 407 408 409 static void 410 nxt_epoll_enable_read(nxt_event_engine_t *engine, nxt_fd_event_t *ev) 411 { 412 int op; 413 uint32_t events; 414 415 if (ev->read != NXT_EVENT_BLOCKED) { 416 417 op = EPOLL_CTL_MOD; 418 events = EPOLLIN | engine->u.epoll.mode; 419 420 if (ev->read == NXT_EVENT_INACTIVE && ev->write == NXT_EVENT_INACTIVE) { 421 op = EPOLL_CTL_ADD; 422 423 } else if (ev->write >= NXT_EVENT_BLOCKED) { 424 events |= EPOLLOUT; 425 } 426 427 nxt_epoll_change(engine, ev, op, events); 428 } 429 430 ev->read = NXT_EVENT_ACTIVE; 431 } 432 433 434 static void 435 nxt_epoll_enable_write(nxt_event_engine_t *engine, nxt_fd_event_t *ev) 436 { 437 int op; 438 uint32_t events; 439 440 if (ev->write != NXT_EVENT_BLOCKED) { 441 442 op = EPOLL_CTL_MOD; 443 events = EPOLLOUT | engine->u.epoll.mode; 444 445 if (ev->read == NXT_EVENT_INACTIVE && ev->write == NXT_EVENT_INACTIVE) { 446 op = EPOLL_CTL_ADD; 447 448 } else if (ev->read >= NXT_EVENT_BLOCKED) { 449 events |= EPOLLIN; 450 } 451 452 nxt_epoll_change(engine, ev, op, events); 453 } 454 455 ev->write = NXT_EVENT_ACTIVE; 456 } 457 458 459 static void 460 nxt_epoll_disable_read(nxt_event_engine_t *engine, nxt_fd_event_t *ev) 461 { 462 int op; 463 uint32_t events; 464 465 ev->read = NXT_EVENT_INACTIVE; 466 467 if (ev->write <= NXT_EVENT_DISABLED) { 468 ev->write = NXT_EVENT_INACTIVE; 469 op = EPOLL_CTL_DEL; 470 events = 0; 471 472 } else { 473 op = EPOLL_CTL_MOD; 474 events = EPOLLOUT | engine->u.epoll.mode; 475 } 476 477 nxt_epoll_change(engine, ev, op, events); 478 } 479 480 481 static void 482 nxt_epoll_disable_write(nxt_event_engine_t *engine, nxt_fd_event_t *ev) 483 { 484 int op; 485 uint32_t events; 486 487 ev->write = NXT_EVENT_INACTIVE; 488 489 if (ev->read <= NXT_EVENT_DISABLED) { 490 ev->read = NXT_EVENT_INACTIVE; 491 op = EPOLL_CTL_DEL; 492 events = 0; 493 494 } else { 495 op = EPOLL_CTL_MOD; 496 events = EPOLLIN | engine->u.epoll.mode; 497 } 498 499 nxt_epoll_change(engine, ev, op, events); 500 } 501 502 503 static void 504 nxt_epoll_block_read(nxt_event_engine_t *engine, nxt_fd_event_t *ev) 505 { 506 if (ev->read != NXT_EVENT_INACTIVE) { 507 ev->read = NXT_EVENT_BLOCKED; 508 } 509 } 510 511 512 static void 513 nxt_epoll_block_write(nxt_event_engine_t *engine, nxt_fd_event_t *ev) 514 { 515 if (ev->write != NXT_EVENT_INACTIVE) { 516 ev->write = NXT_EVENT_BLOCKED; 517 } 518 } 519 520 521 /* 522 * NXT_EVENT_DISABLED state is used to track whether EPOLLONESHOT 523 * event should be added or modified, epoll_ctl(2): 524 * 525 * EPOLLONESHOT (since Linux 2.6.2) 526 * Sets the one-shot behavior for the associated file descriptor. 527 * This means that after an event is pulled out with epoll_wait(2) 528 * the associated file descriptor is internally disabled and no 529 * other events will be reported by the epoll interface. The user 530 * must call epoll_ctl() with EPOLL_CTL_MOD to rearm the file 531 * descriptor with a new event mask. 532 */ 533 534 static void 535 nxt_epoll_oneshot_read(nxt_event_engine_t *engine, nxt_fd_event_t *ev) 536 { 537 int op; 538 539 op = (ev->read == NXT_EVENT_INACTIVE && ev->write == NXT_EVENT_INACTIVE) ? 540 EPOLL_CTL_ADD : EPOLL_CTL_MOD; 541 542 ev->read = NXT_EVENT_ONESHOT; 543 ev->write = NXT_EVENT_INACTIVE; 544 545 nxt_epoll_change(engine, ev, op, EPOLLIN | EPOLLONESHOT); 546 } 547 548 549 static void 550 nxt_epoll_oneshot_write(nxt_event_engine_t *engine, nxt_fd_event_t *ev) 551 { 552 int op; 553 554 op = (ev->read == NXT_EVENT_INACTIVE && ev->write == NXT_EVENT_INACTIVE) ? 555 EPOLL_CTL_ADD : EPOLL_CTL_MOD; 556 557 ev->read = NXT_EVENT_INACTIVE; 558 ev->write = NXT_EVENT_ONESHOT; 559 560 nxt_epoll_change(engine, ev, op, EPOLLOUT | EPOLLONESHOT); 561 } 562 563 564 static void 565 nxt_epoll_enable_accept(nxt_event_engine_t *engine, nxt_fd_event_t *ev) 566 { 567 uint32_t events; 568 569 ev->read = NXT_EVENT_ACTIVE; 570 571 events = EPOLLIN; 572 573 #ifdef EPOLLEXCLUSIVE 574 events |= EPOLLEXCLUSIVE; 575 #endif 576 577 nxt_epoll_change(engine, ev, EPOLL_CTL_ADD, events); 578 } 579 580 581 /* 582 * epoll changes are batched to improve instruction and data cache 583 * locality of several epoll_ctl() calls followed by epoll_wait() call. 584 */ 585 586 static void 587 nxt_epoll_change(nxt_event_engine_t *engine, nxt_fd_event_t *ev, int op, 588 uint32_t events) 589 { 590 nxt_epoll_change_t *change; 591 592 nxt_debug(ev->task, "epoll %d set event: fd:%d op:%d ev:%XD", 593 engine->u.epoll.fd, ev->fd, op, events); 594 595 if (engine->u.epoll.nchanges >= engine->u.epoll.mchanges) { 596 nxt_epoll_commit_changes(engine); 597 } 598 599 ev->changing = 1; 600 601 change = &engine->u.epoll.changes[engine->u.epoll.nchanges++]; 602 change->op = op; 603 change->event.events = events; 604 change->event.data.ptr = ev; 605 } 606 607 608 static void 609 nxt_epoll_commit_changes(nxt_event_engine_t *engine) 610 { 611 int ret; 612 nxt_fd_event_t *ev; 613 nxt_epoll_change_t *change, *end; 614 615 nxt_debug(&engine->task, "epoll %d changes:%ui", 616 engine->u.epoll.fd, engine->u.epoll.nchanges); 617 618 change = engine->u.epoll.changes; 619 end = change + engine->u.epoll.nchanges; 620 621 do { 622 ev = change->event.data.ptr; 623 ev->changing = 0; 624 625 nxt_debug(ev->task, "epoll_ctl(%d): fd:%d op:%d ev:%XD", 626 engine->u.epoll.fd, ev->fd, change->op, 627 change->event.events); 628 629 ret = epoll_ctl(engine->u.epoll.fd, change->op, ev->fd, &change->event); 630 631 if (nxt_slow_path(ret != 0)) { 632 nxt_alert(ev->task, "epoll_ctl(%d, %d, %d) failed %E", 633 engine->u.epoll.fd, change->op, ev->fd, nxt_errno); 634 635 nxt_work_queue_add(&engine->fast_work_queue, 636 nxt_epoll_error_handler, ev->task, ev, ev->data); 637 638 engine->u.epoll.error = 1; 639 } 640 641 change++; 642 643 } while (change < end); 644 645 engine->u.epoll.nchanges = 0; 646 } 647 648 649 static void 650 nxt_epoll_error_handler(nxt_task_t *task, void *obj, void *data) 651 { 652 nxt_fd_event_t *ev; 653 654 ev = obj; 655 656 ev->read = NXT_EVENT_INACTIVE; 657 ev->write = NXT_EVENT_INACTIVE; 658 659 ev->error_handler(ev->task, ev, data); 660 } 661 662 663 #if (NXT_HAVE_SIGNALFD) 664 665 static nxt_int_t 666 nxt_epoll_add_signal(nxt_event_engine_t *engine) 667 { 668 int fd; 669 struct epoll_event ee; 670 671 if (sigprocmask(SIG_BLOCK, &engine->signals->sigmask, NULL) != 0) { 672 nxt_alert(&engine->task, "sigprocmask(SIG_BLOCK) failed %E", nxt_errno); 673 return NXT_ERROR; 674 } 675 676 /* 677 * Glibc signalfd() wrapper always has the flags argument. Glibc 2.7 678 * and 2.8 signalfd() wrappers call the original signalfd() syscall 679 * without the flags argument. Glibc 2.9+ signalfd() wrapper at first 680 * tries to call signalfd4() syscall and if it fails then calls the 681 * original signalfd() syscall. For this reason the non-blocking mode 682 * is set separately. 683 */ 684 685 fd = signalfd(-1, &engine->signals->sigmask, 0); 686 687 if (fd == -1) { 688 nxt_alert(&engine->task, "signalfd(%d) failed %E", 689 engine->u.epoll.signalfd.fd, nxt_errno); 690 return NXT_ERROR; 691 } 692 693 engine->u.epoll.signalfd.fd = fd; 694 695 if (nxt_fd_nonblocking(&engine->task, fd) != NXT_OK) { 696 return NXT_ERROR; 697 } 698 699 nxt_debug(&engine->task, "signalfd(): %d", fd); 700 701 engine->u.epoll.signalfd.data = engine->signals->handler; 702 engine->u.epoll.signalfd.read_work_queue = &engine->fast_work_queue; 703 engine->u.epoll.signalfd.read_handler = nxt_epoll_signalfd_handler; 704 engine->u.epoll.signalfd.log = engine->task.log; 705 engine->u.epoll.signalfd.task = &engine->task; 706 707 ee.events = EPOLLIN; 708 ee.data.ptr = &engine->u.epoll.signalfd; 709 710 if (epoll_ctl(engine->u.epoll.fd, EPOLL_CTL_ADD, fd, &ee) != 0) { 711 nxt_alert(&engine->task, "epoll_ctl(%d, %d, %d) failed %E", 712 engine->u.epoll.fd, EPOLL_CTL_ADD, fd, nxt_errno); 713 714 return NXT_ERROR; 715 } 716 717 return NXT_OK; 718 } 719 720 721 static void 722 nxt_epoll_signalfd_handler(nxt_task_t *task, void *obj, void *data) 723 { 724 int n; 725 nxt_fd_event_t *ev; 726 nxt_work_handler_t handler; 727 struct signalfd_siginfo sfd; 728 729 ev = obj; 730 handler = data; 731 732 nxt_debug(task, "signalfd handler"); 733 734 n = read(ev->fd, &sfd, sizeof(struct signalfd_siginfo)); 735 736 nxt_debug(task, "read signalfd(%d): %d", ev->fd, n); 737 738 if (n != sizeof(struct signalfd_siginfo)) { 739 nxt_alert(task, "read signalfd(%d) failed %E", ev->fd, nxt_errno); 740 return; 741 } 742 743 nxt_debug(task, "signalfd(%d) signo:%d", ev->fd, sfd.ssi_signo); 744 745 handler(task, (void *) (uintptr_t) sfd.ssi_signo, NULL); 746 } 747 748 #endif 749 750 751 #if (NXT_HAVE_EVENTFD) 752 753 static nxt_int_t 754 nxt_epoll_enable_post(nxt_event_engine_t *engine, nxt_work_handler_t handler) 755 { 756 int ret; 757 struct epoll_event ee; 758 759 engine->u.epoll.post_handler = handler; 760 761 /* 762 * Glibc eventfd() wrapper always has the flags argument. Glibc 2.7 763 * and 2.8 eventfd() wrappers call the original eventfd() syscall 764 * without the flags argument. Glibc 2.9+ eventfd() wrapper at first 765 * tries to call eventfd2() syscall and if it fails then calls the 766 * original eventfd() syscall. For this reason the non-blocking mode 767 * is set separately. 768 */ 769 770 engine->u.epoll.eventfd.fd = eventfd(0, 0); 771 772 if (engine->u.epoll.eventfd.fd == -1) { 773 nxt_alert(&engine->task, "eventfd() failed %E", nxt_errno); 774 return NXT_ERROR; 775 } 776 777 ret = nxt_fd_nonblocking(&engine->task, engine->u.epoll.eventfd.fd); 778 if (nxt_slow_path(ret != NXT_OK)) { 779 return NXT_ERROR; 780 } 781 782 nxt_debug(&engine->task, "eventfd(): %d", engine->u.epoll.eventfd.fd); 783 784 engine->u.epoll.eventfd.read_work_queue = &engine->fast_work_queue; 785 engine->u.epoll.eventfd.read_handler = nxt_epoll_eventfd_handler; 786 engine->u.epoll.eventfd.data = engine; 787 engine->u.epoll.eventfd.log = engine->task.log; 788 engine->u.epoll.eventfd.task = &engine->task; 789 790 ee.events = EPOLLIN | EPOLLET; 791 ee.data.ptr = &engine->u.epoll.eventfd; 792 793 ret = epoll_ctl(engine->u.epoll.fd, EPOLL_CTL_ADD, 794 engine->u.epoll.eventfd.fd, &ee); 795 796 if (nxt_fast_path(ret == 0)) { 797 return NXT_OK; 798 } 799 800 nxt_alert(&engine->task, "epoll_ctl(%d, %d, %d) failed %E", 801 engine->u.epoll.fd, EPOLL_CTL_ADD, engine->u.epoll.eventfd.fd, 802 nxt_errno); 803 804 return NXT_ERROR; 805 } 806 807 808 static void 809 nxt_epoll_eventfd_handler(nxt_task_t *task, void *obj, void *data) 810 { 811 int n; 812 uint64_t events; 813 nxt_event_engine_t *engine; 814 815 engine = data; 816 817 nxt_debug(task, "eventfd handler, times:%ui", engine->u.epoll.neventfd); 818 819 /* 820 * The maximum value after write() to a eventfd() descriptor will 821 * block or return EAGAIN is 0xFFFFFFFFFFFFFFFE, so the descriptor 822 * can be read once per many notifications, for example, once per 823 * 2^32-2 noticifcations. Since the eventfd() file descriptor is 824 * always registered in EPOLLET mode, epoll returns event about 825 * only the latest write() to the descriptor. 826 */ 827 828 if (engine->u.epoll.neventfd++ >= 0xFFFFFFFE) { 829 engine->u.epoll.neventfd = 0; 830 831 n = read(engine->u.epoll.eventfd.fd, &events, sizeof(uint64_t)); 832 833 nxt_debug(task, "read(%d): %d events:%uL", 834 engine->u.epoll.eventfd.fd, n, events); 835 836 if (n != sizeof(uint64_t)) { 837 nxt_alert(task, "read eventfd(%d) failed %E", 838 engine->u.epoll.eventfd.fd, nxt_errno); 839 } 840 } 841 842 engine->u.epoll.post_handler(task, NULL, NULL); 843 } 844 845 846 static void 847 nxt_epoll_signal(nxt_event_engine_t *engine, nxt_uint_t signo) 848 { 849 size_t ret; 850 uint64_t event; 851 852 /* 853 * eventfd() presents along with signalfd(), so the function 854 * is used only to post events and the signo argument is ignored. 855 */ 856 857 event = 1; 858 859 ret = write(engine->u.epoll.eventfd.fd, &event, sizeof(uint64_t)); 860 861 if (nxt_slow_path(ret != sizeof(uint64_t))) { 862 nxt_alert(&engine->task, "write(%d) to eventfd failed %E", 863 engine->u.epoll.eventfd.fd, nxt_errno); 864 } 865 } 866 867 #endif 868 869 870 static void 871 nxt_epoll_poll(nxt_event_engine_t *engine, nxt_msec_t timeout) 872 { 873 int nevents; 874 uint32_t events; 875 nxt_int_t i; 876 nxt_err_t err; 877 nxt_bool_t error; 878 nxt_uint_t level; 879 nxt_fd_event_t *ev; 880 struct epoll_event *event; 881 882 if (engine->u.epoll.nchanges != 0) { 883 nxt_epoll_commit_changes(engine); 884 } 885 886 if (engine->u.epoll.error) { 887 engine->u.epoll.error = 0; 888 /* Error handlers have been enqueued on failure. */ 889 timeout = 0; 890 } 891 892 nxt_debug(&engine->task, "epoll_wait(%d) timeout:%M", 893 engine->u.epoll.fd, timeout); 894 895 nevents = epoll_wait(engine->u.epoll.fd, engine->u.epoll.events, 896 engine->u.epoll.mevents, timeout); 897 898 err = (nevents == -1) ? nxt_errno : 0; 899 900 nxt_thread_time_update(engine->task.thread); 901 902 nxt_debug(&engine->task, "epoll_wait(%d): %d", engine->u.epoll.fd, nevents); 903 904 if (nevents == -1) { 905 level = (err == NXT_EINTR) ? NXT_LOG_INFO : NXT_LOG_ALERT; 906 907 nxt_log(&engine->task, level, "epoll_wait(%d) failed %E", 908 engine->u.epoll.fd, err); 909 910 return; 911 } 912 913 for (i = 0; i < nevents; i++) { 914 915 event = &engine->u.epoll.events[i]; 916 events = event->events; 917 ev = event->data.ptr; 918 919 nxt_debug(ev->task, "epoll: fd:%d ev:%04XD d:%p rd:%d wr:%d", 920 ev->fd, events, ev, ev->read, ev->write); 921 922 /* 923 * On error epoll may set EPOLLERR and EPOLLHUP only without EPOLLIN or 924 * EPOLLOUT, so the "error" variable enqueues only one active handler. 925 */ 926 error = ((events & (EPOLLERR | EPOLLHUP)) != 0); 927 ev->epoll_error = error; 928 929 #if (NXT_HAVE_EPOLL_EDGE) 930 931 ev->epoll_eof = ((events & EPOLLRDHUP) != 0); 932 933 #endif 934 935 if ((events & EPOLLIN) || error) { 936 ev->read_ready = 1; 937 938 if (ev->read != NXT_EVENT_BLOCKED) { 939 940 if (ev->read == NXT_EVENT_ONESHOT) { 941 ev->read = NXT_EVENT_DISABLED; 942 } 943 944 nxt_work_queue_add(ev->read_work_queue, ev->read_handler, 945 ev->task, ev, ev->data); 946 947 } else if (engine->u.epoll.mode == 0) { 948 /* Level-triggered mode. */ 949 nxt_epoll_disable_read(engine, ev); 950 } 951 } 952 953 if ((events & EPOLLOUT) || error) { 954 ev->write_ready = 1; 955 956 if (ev->write != NXT_EVENT_BLOCKED) { 957 958 if (ev->write == NXT_EVENT_ONESHOT) { 959 ev->write = NXT_EVENT_DISABLED; 960 } 961 962 nxt_work_queue_add(ev->write_work_queue, ev->write_handler, 963 ev->task, ev, ev->data); 964 965 } else if (engine->u.epoll.mode == 0) { 966 /* Level-triggered mode. */ 967 nxt_epoll_disable_write(engine, ev); 968 } 969 } 970 } 971 } 972 973 974 #if (NXT_HAVE_ACCEPT4) 975 976 static void 977 nxt_epoll_conn_io_accept4(nxt_task_t *task, void *obj, void *data) 978 { 979 socklen_t socklen; 980 nxt_conn_t *c; 981 nxt_socket_t s; 982 struct sockaddr *sa; 983 nxt_listen_event_t *lev; 984 985 lev = obj; 986 c = lev->next; 987 988 lev->ready--; 989 lev->socket.read_ready = (lev->ready != 0); 990 991 sa = &c->remote->u.sockaddr; 992 socklen = c->remote->socklen; 993 /* 994 * The returned socklen is ignored here, 995 * see comment in nxt_conn_io_accept(). 996 */ 997 s = accept4(lev->socket.fd, sa, &socklen, SOCK_NONBLOCK); 998 999 if (s != -1) { 1000 c->socket.fd = s; 1001 1002 nxt_debug(task, "accept4(%d): %d", lev->socket.fd, s); 1003 1004 nxt_conn_accept(task, lev, c); 1005 return; 1006 } 1007 1008 nxt_conn_accept_error(task, lev, "accept4", nxt_errno); 1009 } 1010 1011 #endif 1012 1013 1014 #if (NXT_HAVE_EPOLL_EDGE) 1015 1016 /* 1017 * nxt_epoll_edge_event_conn_io_connect() eliminates the getsockopt() 1018 * syscall to test pending connect() error. Although this special 1019 * interface can work in both edge-triggered and level-triggered 1020 * modes it is enabled only for the former mode because this mode is 1021 * available in all modern Linux distributions. For the latter mode 1022 * it is required to create additional nxt_epoll_level_event_conn_io 1023 * with single non-generic connect() interface. 1024 */ 1025 1026 static void 1027 nxt_epoll_edge_conn_io_connect(nxt_task_t *task, void *obj, void *data) 1028 { 1029 nxt_conn_t *c; 1030 nxt_event_engine_t *engine; 1031 nxt_work_handler_t handler; 1032 const nxt_event_conn_state_t *state; 1033 1034 c = obj; 1035 1036 state = c->write_state; 1037 1038 switch (nxt_socket_connect(task, c->socket.fd, c->remote) ){ 1039 1040 case NXT_OK: 1041 c->socket.write_ready = 1; 1042 handler = state->ready_handler; 1043 break; 1044 1045 case NXT_AGAIN: 1046 c->socket.write_handler = nxt_epoll_edge_conn_connected; 1047 c->socket.error_handler = nxt_conn_connect_error; 1048 1049 engine = task->thread->engine; 1050 nxt_conn_timer(engine, c, state, &c->write_timer); 1051 1052 nxt_epoll_enable(engine, &c->socket); 1053 c->socket.read = NXT_EVENT_BLOCKED; 1054 return; 1055 1056 #if 0 1057 case NXT_AGAIN: 1058 nxt_conn_timer(engine, c, state, &c->write_timer); 1059 1060 /* Fall through. */ 1061 1062 case NXT_OK: 1063 /* 1064 * Mark both read and write directions as ready and try to perform 1065 * I/O operations before receiving readiness notifications. 1066 * On unconnected socket Linux send() and recv() return EAGAIN 1067 * instead of ENOTCONN. 1068 */ 1069 c->socket.read_ready = 1; 1070 c->socket.write_ready = 1; 1071 /* 1072 * Enabling both read and write notifications on a getting 1073 * connected socket eliminates one epoll_ctl() syscall. 1074 */ 1075 c->socket.write_handler = nxt_epoll_edge_event_conn_connected; 1076 c->socket.error_handler = state->error_handler; 1077 1078 nxt_epoll_enable(engine, &c->socket); 1079 c->socket.read = NXT_EVENT_BLOCKED; 1080 1081 handler = state->ready_handler; 1082 break; 1083 #endif 1084 1085 case NXT_ERROR: 1086 handler = state->error_handler; 1087 break; 1088 1089 default: /* NXT_DECLINED: connection refused. */ 1090 handler = state->close_handler; 1091 break; 1092 } 1093 1094 nxt_work_queue_add(c->write_work_queue, handler, task, c, data); 1095 } 1096 1097 1098 static void 1099 nxt_epoll_edge_conn_connected(nxt_task_t *task, void *obj, void *data) 1100 { 1101 nxt_conn_t *c; 1102 1103 c = obj; 1104 1105 nxt_debug(task, "epoll event conn connected fd:%d", c->socket.fd); 1106 1107 if (!c->socket.epoll_error) { 1108 c->socket.write = NXT_EVENT_BLOCKED; 1109 1110 if (c->write_state->timer_autoreset) { 1111 nxt_timer_disable(task->thread->engine, &c->write_timer); 1112 } 1113 1114 nxt_work_queue_add(c->write_work_queue, c->write_state->ready_handler, 1115 task, c, data); 1116 return; 1117 } 1118 1119 nxt_conn_connect_test(task, c, data); 1120 } 1121 1122 1123 /* 1124 * nxt_epoll_edge_conn_io_recvbuf() is just wrapper around 1125 * standard nxt_conn_io_recvbuf() to enforce to read a pending EOF 1126 * in edge-triggered mode. 1127 */ 1128 1129 static ssize_t 1130 nxt_epoll_edge_conn_io_recvbuf(nxt_conn_t *c, nxt_buf_t *b) 1131 { 1132 ssize_t n; 1133 1134 n = nxt_conn_io_recvbuf(c, b); 1135 1136 if (n > 0 && c->socket.epoll_eof) { 1137 c->socket.read_ready = 1; 1138 } 1139 1140 return n; 1141 } 1142 1143 #endif 1144