1 2 /* 3 * Copyright (C) Igor Sysoev 4 * Copyright (C) NGINX, Inc. 5 */ 6 7 #include <nxt_main.h> 8 9 10 /* 11 * The first epoll version has been introduced in Linux 2.5.44. The 12 * interface was changed several times since then and the final version 13 * of epoll_create(), epoll_ctl(), epoll_wait(), and EPOLLET mode has 14 * been introduced in Linux 2.6.0 and is supported since glibc 2.3.2. 15 * 16 * EPOLLET mode did not work reliable in early implementaions and in 17 * Linux 2.4 backport. 18 * 19 * EPOLLONESHOT Linux 2.6.2, glibc 2.3. 20 * EPOLLRDHUP Linux 2.6.17, glibc 2.8. 21 * epoll_pwait() Linux 2.6.19, glibc 2.6. 22 * signalfd() Linux 2.6.22, glibc 2.7. 23 * eventfd() Linux 2.6.22, glibc 2.7. 24 * timerfd_create() Linux 2.6.25, glibc 2.8. 25 * epoll_create1() Linux 2.6.27, glibc 2.9. 26 * signalfd4() Linux 2.6.27, glibc 2.9. 27 * eventfd2() Linux 2.6.27, glibc 2.9. 28 * accept4() Linux 2.6.28, glibc 2.10. 29 * eventfd2(EFD_SEMAPHORE) Linux 2.6.30, glibc 2.10. 30 * EPOLLEXCLUSIVE Linux 4.5. 31 */ 32 33 34 #if (NXT_HAVE_EPOLL_EDGE) 35 static nxt_int_t nxt_epoll_edge_create(nxt_event_engine_t *engine, 36 nxt_uint_t mchanges, nxt_uint_t mevents); 37 #endif 38 static nxt_int_t nxt_epoll_level_create(nxt_event_engine_t *engine, 39 nxt_uint_t mchanges, nxt_uint_t mevents); 40 static nxt_int_t nxt_epoll_create(nxt_event_engine_t *engine, 41 nxt_uint_t mchanges, nxt_uint_t mevents, nxt_conn_io_t *io, uint32_t mode); 42 static void nxt_epoll_test_accept4(nxt_event_engine_t *engine, 43 nxt_conn_io_t *io); 44 static void nxt_epoll_free(nxt_event_engine_t *engine); 45 static void nxt_epoll_enable(nxt_event_engine_t *engine, nxt_fd_event_t *ev); 46 static void nxt_epoll_disable(nxt_event_engine_t *engine, nxt_fd_event_t *ev); 47 static void nxt_epoll_delete(nxt_event_engine_t *engine, nxt_fd_event_t *ev); 48 static nxt_bool_t nxt_epoll_close(nxt_event_engine_t *engine, 49 nxt_fd_event_t *ev); 50 static void nxt_epoll_enable_read(nxt_event_engine_t *engine, 51 nxt_fd_event_t *ev); 52 static void nxt_epoll_enable_write(nxt_event_engine_t *engine, 53 nxt_fd_event_t *ev); 54 static void nxt_epoll_disable_read(nxt_event_engine_t *engine, 55 nxt_fd_event_t *ev); 56 static void nxt_epoll_disable_write(nxt_event_engine_t *engine, 57 nxt_fd_event_t *ev); 58 static void nxt_epoll_block_read(nxt_event_engine_t *engine, 59 nxt_fd_event_t *ev); 60 static void nxt_epoll_block_write(nxt_event_engine_t *engine, 61 nxt_fd_event_t *ev); 62 static void nxt_epoll_oneshot_read(nxt_event_engine_t *engine, 63 nxt_fd_event_t *ev); 64 static void nxt_epoll_oneshot_write(nxt_event_engine_t *engine, 65 nxt_fd_event_t *ev); 66 static void nxt_epoll_enable_accept(nxt_event_engine_t *engine, 67 nxt_fd_event_t *ev); 68 static void nxt_epoll_change(nxt_event_engine_t *engine, nxt_fd_event_t *ev, 69 int op, uint32_t events); 70 static nxt_int_t nxt_epoll_commit_changes(nxt_event_engine_t *engine); 71 static void nxt_epoll_error_handler(nxt_task_t *task, void *obj, void *data); 72 #if (NXT_HAVE_SIGNALFD) 73 static nxt_int_t nxt_epoll_add_signal(nxt_event_engine_t *engine); 74 static void nxt_epoll_signalfd_handler(nxt_task_t *task, void *obj, void *data); 75 #endif 76 #if (NXT_HAVE_EVENTFD) 77 static nxt_int_t nxt_epoll_enable_post(nxt_event_engine_t *engine, 78 nxt_work_handler_t handler); 79 static void nxt_epoll_eventfd_handler(nxt_task_t *task, void *obj, void *data); 80 static void nxt_epoll_signal(nxt_event_engine_t *engine, nxt_uint_t signo); 81 #endif 82 static void nxt_epoll_poll(nxt_event_engine_t *engine, nxt_msec_t timeout); 83 84 #if (NXT_HAVE_ACCEPT4) 85 static void nxt_epoll_conn_io_accept4(nxt_task_t *task, void *obj, 86 void *data); 87 #endif 88 89 90 #if (NXT_HAVE_EPOLL_EDGE) 91 92 static void nxt_epoll_edge_conn_io_connect(nxt_task_t *task, void *obj, 93 void *data); 94 static void nxt_epoll_edge_conn_connected(nxt_task_t *task, void *obj, 95 void *data); 96 static ssize_t nxt_epoll_edge_conn_io_recvbuf(nxt_conn_t *c, nxt_buf_t *b); 97 98 99 static nxt_conn_io_t nxt_epoll_edge_conn_io = { 100 nxt_epoll_edge_conn_io_connect, 101 nxt_conn_io_accept, 102 103 nxt_conn_io_read, 104 nxt_epoll_edge_conn_io_recvbuf, 105 nxt_conn_io_recv, 106 107 nxt_conn_io_write, 108 nxt_event_conn_io_write_chunk, 109 110 #if (NXT_HAVE_LINUX_SENDFILE) 111 nxt_linux_event_conn_io_sendfile, 112 #else 113 nxt_event_conn_io_sendbuf, 114 #endif 115 116 nxt_event_conn_io_writev, 117 nxt_event_conn_io_send, 118 119 nxt_conn_io_shutdown, 120 }; 121 122 123 const nxt_event_interface_t nxt_epoll_edge_engine = { 124 "epoll_edge", 125 nxt_epoll_edge_create, 126 nxt_epoll_free, 127 nxt_epoll_enable, 128 nxt_epoll_disable, 129 nxt_epoll_delete, 130 nxt_epoll_close, 131 nxt_epoll_enable_read, 132 nxt_epoll_enable_write, 133 nxt_epoll_disable_read, 134 nxt_epoll_disable_write, 135 nxt_epoll_block_read, 136 nxt_epoll_block_write, 137 nxt_epoll_oneshot_read, 138 nxt_epoll_oneshot_write, 139 nxt_epoll_enable_accept, 140 NULL, 141 NULL, 142 #if (NXT_HAVE_EVENTFD) 143 nxt_epoll_enable_post, 144 nxt_epoll_signal, 145 #else 146 NULL, 147 NULL, 148 #endif 149 nxt_epoll_poll, 150 151 &nxt_epoll_edge_conn_io, 152 153 #if (NXT_HAVE_INOTIFY) 154 NXT_FILE_EVENTS, 155 #else 156 NXT_NO_FILE_EVENTS, 157 #endif 158 159 #if (NXT_HAVE_SIGNALFD) 160 NXT_SIGNAL_EVENTS, 161 #else 162 NXT_NO_SIGNAL_EVENTS, 163 #endif 164 }; 165 166 #endif 167 168 169 const nxt_event_interface_t nxt_epoll_level_engine = { 170 "epoll_level", 171 nxt_epoll_level_create, 172 nxt_epoll_free, 173 nxt_epoll_enable, 174 nxt_epoll_disable, 175 nxt_epoll_delete, 176 nxt_epoll_close, 177 nxt_epoll_enable_read, 178 nxt_epoll_enable_write, 179 nxt_epoll_disable_read, 180 nxt_epoll_disable_write, 181 nxt_epoll_block_read, 182 nxt_epoll_block_write, 183 nxt_epoll_oneshot_read, 184 nxt_epoll_oneshot_write, 185 nxt_epoll_enable_accept, 186 NULL, 187 NULL, 188 #if (NXT_HAVE_EVENTFD) 189 nxt_epoll_enable_post, 190 nxt_epoll_signal, 191 #else 192 NULL, 193 NULL, 194 #endif 195 nxt_epoll_poll, 196 197 &nxt_unix_conn_io, 198 199 #if (NXT_HAVE_INOTIFY) 200 NXT_FILE_EVENTS, 201 #else 202 NXT_NO_FILE_EVENTS, 203 #endif 204 205 #if (NXT_HAVE_SIGNALFD) 206 NXT_SIGNAL_EVENTS, 207 #else 208 NXT_NO_SIGNAL_EVENTS, 209 #endif 210 }; 211 212 213 #if (NXT_HAVE_EPOLL_EDGE) 214 215 static nxt_int_t 216 nxt_epoll_edge_create(nxt_event_engine_t *engine, nxt_uint_t mchanges, 217 nxt_uint_t mevents) 218 { 219 return nxt_epoll_create(engine, mchanges, mevents, &nxt_epoll_edge_conn_io, 220 EPOLLET | EPOLLRDHUP); 221 } 222 223 #endif 224 225 226 static nxt_int_t 227 nxt_epoll_level_create(nxt_event_engine_t *engine, nxt_uint_t mchanges, 228 nxt_uint_t mevents) 229 { 230 return nxt_epoll_create(engine, mchanges, mevents, 231 &nxt_unix_conn_io, 0); 232 } 233 234 235 static nxt_int_t 236 nxt_epoll_create(nxt_event_engine_t *engine, nxt_uint_t mchanges, 237 nxt_uint_t mevents, nxt_conn_io_t *io, uint32_t mode) 238 { 239 engine->u.epoll.fd = -1; 240 engine->u.epoll.mode = mode; 241 engine->u.epoll.mchanges = mchanges; 242 engine->u.epoll.mevents = mevents; 243 #if (NXT_HAVE_SIGNALFD) 244 engine->u.epoll.signalfd.fd = -1; 245 #endif 246 247 engine->u.epoll.changes = nxt_malloc(sizeof(nxt_epoll_change_t) * mchanges); 248 if (engine->u.epoll.changes == NULL) { 249 goto fail; 250 } 251 252 engine->u.epoll.events = nxt_malloc(sizeof(struct epoll_event) * mevents); 253 if (engine->u.epoll.events == NULL) { 254 goto fail; 255 } 256 257 engine->u.epoll.fd = epoll_create(1); 258 if (engine->u.epoll.fd == -1) { 259 nxt_log(&engine->task, NXT_LOG_CRIT, "epoll_create() failed %E", 260 nxt_errno); 261 goto fail; 262 } 263 264 nxt_debug(&engine->task, "epoll_create(): %d", engine->u.epoll.fd); 265 266 if (engine->signals != NULL) { 267 268 #if (NXT_HAVE_SIGNALFD) 269 270 if (nxt_epoll_add_signal(engine) != NXT_OK) { 271 goto fail; 272 } 273 274 #endif 275 276 nxt_epoll_test_accept4(engine, io); 277 } 278 279 return NXT_OK; 280 281 fail: 282 283 nxt_epoll_free(engine); 284 285 return NXT_ERROR; 286 } 287 288 289 static void 290 nxt_epoll_test_accept4(nxt_event_engine_t *engine, nxt_conn_io_t *io) 291 { 292 static nxt_work_handler_t handler; 293 294 if (handler == NULL) { 295 296 handler = io->accept; 297 298 #if (NXT_HAVE_ACCEPT4) 299 300 (void) accept4(-1, NULL, NULL, SOCK_NONBLOCK); 301 302 if (nxt_errno != NXT_ENOSYS) { 303 handler = nxt_epoll_conn_io_accept4; 304 305 } else { 306 nxt_log(&engine->task, NXT_LOG_INFO, "accept4() failed %E", 307 NXT_ENOSYS); 308 } 309 310 #endif 311 } 312 313 io->accept = handler; 314 } 315 316 317 static void 318 nxt_epoll_free(nxt_event_engine_t *engine) 319 { 320 int fd; 321 322 nxt_debug(&engine->task, "epoll %d free", engine->u.epoll.fd); 323 324 #if (NXT_HAVE_SIGNALFD) 325 326 fd = engine->u.epoll.signalfd.fd; 327 328 if (fd != -1 && close(fd) != 0) { 329 nxt_log(&engine->task, NXT_LOG_CRIT, "signalfd close(%d) failed %E", 330 fd, nxt_errno); 331 } 332 333 #endif 334 335 #if (NXT_HAVE_EVENTFD) 336 337 fd = engine->u.epoll.eventfd.fd; 338 339 if (fd != -1 && close(fd) != 0) { 340 nxt_log(&engine->task, NXT_LOG_CRIT, "eventfd close(%d) failed %E", 341 fd, nxt_errno); 342 } 343 344 #endif 345 346 fd = engine->u.epoll.fd; 347 348 if (fd != -1 && close(fd) != 0) { 349 nxt_log(&engine->task, NXT_LOG_CRIT, "epoll close(%d) failed %E", 350 fd, nxt_errno); 351 } 352 353 nxt_free(engine->u.epoll.events); 354 nxt_free(engine->u.epoll.changes); 355 356 nxt_memzero(&engine->u.epoll, sizeof(nxt_epoll_engine_t)); 357 } 358 359 360 static void 361 nxt_epoll_enable(nxt_event_engine_t *engine, nxt_fd_event_t *ev) 362 { 363 ev->read = NXT_EVENT_ACTIVE; 364 ev->write = NXT_EVENT_ACTIVE; 365 366 nxt_epoll_change(engine, ev, EPOLL_CTL_ADD, 367 EPOLLIN | EPOLLOUT | engine->u.epoll.mode); 368 } 369 370 371 static void 372 nxt_epoll_disable(nxt_event_engine_t *engine, nxt_fd_event_t *ev) 373 { 374 if (ev->read > NXT_EVENT_DISABLED || ev->write > NXT_EVENT_DISABLED) { 375 376 ev->read = NXT_EVENT_INACTIVE; 377 ev->write = NXT_EVENT_INACTIVE; 378 379 nxt_epoll_change(engine, ev, EPOLL_CTL_DEL, 0); 380 } 381 } 382 383 384 static void 385 nxt_epoll_delete(nxt_event_engine_t *engine, nxt_fd_event_t *ev) 386 { 387 if (ev->read != NXT_EVENT_INACTIVE || ev->write != NXT_EVENT_INACTIVE) { 388 389 ev->read = NXT_EVENT_INACTIVE; 390 ev->write = NXT_EVENT_INACTIVE; 391 392 nxt_epoll_change(engine, ev, EPOLL_CTL_DEL, 0); 393 } 394 } 395 396 397 /* 398 * Although calling close() on a file descriptor will remove any epoll 399 * events that reference the descriptor, in this case the close() acquires 400 * the kernel global "epmutex" while epoll_ctl(EPOLL_CTL_DEL) does not 401 * acquire the "epmutex" since Linux 3.13 if the file descriptor presents 402 * only in one epoll set. Thus removing events explicitly before closing 403 * eliminates possible lock contention. 404 */ 405 406 static nxt_bool_t 407 nxt_epoll_close(nxt_event_engine_t *engine, nxt_fd_event_t *ev) 408 { 409 nxt_epoll_delete(engine, ev); 410 411 return ev->changing; 412 } 413 414 415 static void 416 nxt_epoll_enable_read(nxt_event_engine_t *engine, nxt_fd_event_t *ev) 417 { 418 int op; 419 uint32_t events; 420 421 if (ev->read != NXT_EVENT_BLOCKED) { 422 423 op = EPOLL_CTL_MOD; 424 events = EPOLLIN | engine->u.epoll.mode; 425 426 if (ev->read == NXT_EVENT_INACTIVE && ev->write == NXT_EVENT_INACTIVE) { 427 op = EPOLL_CTL_ADD; 428 429 } else if (ev->write >= NXT_EVENT_BLOCKED) { 430 events |= EPOLLOUT; 431 } 432 433 nxt_epoll_change(engine, ev, op, events); 434 } 435 436 ev->read = NXT_EVENT_ACTIVE; 437 } 438 439 440 static void 441 nxt_epoll_enable_write(nxt_event_engine_t *engine, nxt_fd_event_t *ev) 442 { 443 int op; 444 uint32_t events; 445 446 if (ev->write != NXT_EVENT_BLOCKED) { 447 448 op = EPOLL_CTL_MOD; 449 events = EPOLLOUT | engine->u.epoll.mode; 450 451 if (ev->read == NXT_EVENT_INACTIVE && ev->write == NXT_EVENT_INACTIVE) { 452 op = EPOLL_CTL_ADD; 453 454 } else if (ev->read >= NXT_EVENT_BLOCKED) { 455 events |= EPOLLIN; 456 } 457 458 nxt_epoll_change(engine, ev, op, events); 459 } 460 461 ev->write = NXT_EVENT_ACTIVE; 462 } 463 464 465 static void 466 nxt_epoll_disable_read(nxt_event_engine_t *engine, nxt_fd_event_t *ev) 467 { 468 int op; 469 uint32_t events; 470 471 ev->read = NXT_EVENT_INACTIVE; 472 473 if (ev->write <= NXT_EVENT_DISABLED) { 474 ev->write = NXT_EVENT_INACTIVE; 475 op = EPOLL_CTL_DEL; 476 events = 0; 477 478 } else { 479 op = EPOLL_CTL_MOD; 480 events = EPOLLOUT | engine->u.epoll.mode; 481 } 482 483 nxt_epoll_change(engine, ev, op, events); 484 } 485 486 487 static void 488 nxt_epoll_disable_write(nxt_event_engine_t *engine, nxt_fd_event_t *ev) 489 { 490 int op; 491 uint32_t events; 492 493 ev->write = NXT_EVENT_INACTIVE; 494 495 if (ev->read <= NXT_EVENT_DISABLED) { 496 ev->write = NXT_EVENT_INACTIVE; 497 op = EPOLL_CTL_DEL; 498 events = 0; 499 500 } else { 501 op = EPOLL_CTL_MOD; 502 events = EPOLLIN | engine->u.epoll.mode; 503 } 504 505 nxt_epoll_change(engine, ev, op, events); 506 } 507 508 509 static void 510 nxt_epoll_block_read(nxt_event_engine_t *engine, nxt_fd_event_t *ev) 511 { 512 if (ev->read != NXT_EVENT_INACTIVE) { 513 ev->read = NXT_EVENT_BLOCKED; 514 } 515 } 516 517 518 static void 519 nxt_epoll_block_write(nxt_event_engine_t *engine, nxt_fd_event_t *ev) 520 { 521 if (ev->write != NXT_EVENT_INACTIVE) { 522 ev->write = NXT_EVENT_BLOCKED; 523 } 524 } 525 526 527 /* 528 * NXT_EVENT_DISABLED state is used to track whether EPOLLONESHOT 529 * event should be added or modified, epoll_ctl(2): 530 * 531 * EPOLLONESHOT (since Linux 2.6.2) 532 * Sets the one-shot behavior for the associated file descriptor. 533 * This means that after an event is pulled out with epoll_wait(2) 534 * the associated file descriptor is internally disabled and no 535 * other events will be reported by the epoll interface. The user 536 * must call epoll_ctl() with EPOLL_CTL_MOD to rearm the file 537 * descriptor with a new event mask. 538 */ 539 540 static void 541 nxt_epoll_oneshot_read(nxt_event_engine_t *engine, nxt_fd_event_t *ev) 542 { 543 int op; 544 545 op = (ev->read == NXT_EVENT_INACTIVE && ev->write == NXT_EVENT_INACTIVE) ? 546 EPOLL_CTL_ADD : EPOLL_CTL_MOD; 547 548 ev->read = NXT_EVENT_ONESHOT; 549 ev->write = NXT_EVENT_INACTIVE; 550 551 nxt_epoll_change(engine, ev, op, EPOLLIN | EPOLLONESHOT); 552 } 553 554 555 static void 556 nxt_epoll_oneshot_write(nxt_event_engine_t *engine, nxt_fd_event_t *ev) 557 { 558 int op; 559 560 op = (ev->read == NXT_EVENT_INACTIVE && ev->write == NXT_EVENT_INACTIVE) ? 561 EPOLL_CTL_ADD : EPOLL_CTL_MOD; 562 563 ev->read = NXT_EVENT_INACTIVE; 564 ev->write = NXT_EVENT_ONESHOT; 565 566 nxt_epoll_change(engine, ev, op, EPOLLOUT | EPOLLONESHOT); 567 } 568 569 570 static void 571 nxt_epoll_enable_accept(nxt_event_engine_t *engine, nxt_fd_event_t *ev) 572 { 573 ev->read = NXT_EVENT_ACTIVE; 574 575 nxt_epoll_change(engine, ev, EPOLL_CTL_ADD, EPOLLIN); 576 } 577 578 579 /* 580 * epoll changes are batched to improve instruction and data cache 581 * locality of several epoll_ctl() calls followed by epoll_wait() call. 582 */ 583 584 static void 585 nxt_epoll_change(nxt_event_engine_t *engine, nxt_fd_event_t *ev, int op, 586 uint32_t events) 587 { 588 nxt_epoll_change_t *change; 589 590 nxt_debug(ev->task, "epoll %d set event: fd:%d op:%d ev:%XD", 591 engine->u.epoll.fd, ev->fd, op, events); 592 593 if (engine->u.epoll.nchanges >= engine->u.epoll.mchanges) { 594 (void) nxt_epoll_commit_changes(engine); 595 } 596 597 ev->changing = 1; 598 599 change = &engine->u.epoll.changes[engine->u.epoll.nchanges++]; 600 change->op = op; 601 change->event.events = events; 602 change->event.data.ptr = ev; 603 } 604 605 606 static nxt_int_t 607 nxt_epoll_commit_changes(nxt_event_engine_t *engine) 608 { 609 int ret; 610 nxt_int_t retval; 611 nxt_fd_event_t *ev; 612 nxt_epoll_change_t *change, *end; 613 614 nxt_debug(&engine->task, "epoll %d changes:%ui", 615 engine->u.epoll.fd, engine->u.epoll.nchanges); 616 617 retval = NXT_OK; 618 change = engine->u.epoll.changes; 619 end = change + engine->u.epoll.nchanges; 620 621 do { 622 ev = change->event.data.ptr; 623 ev->changing = 0; 624 625 nxt_debug(ev->task, "epoll_ctl(%d): fd:%d op:%d ev:%XD", 626 engine->u.epoll.fd, ev->fd, change->op, 627 change->event.events); 628 629 ret = epoll_ctl(engine->u.epoll.fd, change->op, ev->fd, &change->event); 630 631 if (nxt_slow_path(ret != 0)) { 632 nxt_log(ev->task, NXT_LOG_CRIT, "epoll_ctl(%d, %d, %d) failed %E", 633 engine->u.epoll.fd, change->op, ev->fd, nxt_errno); 634 635 nxt_work_queue_add(&engine->fast_work_queue, 636 nxt_epoll_error_handler, ev->task, ev, ev->data); 637 638 retval = NXT_ERROR; 639 } 640 641 change++; 642 643 } while (change < end); 644 645 engine->u.epoll.nchanges = 0; 646 647 return retval; 648 } 649 650 651 static void 652 nxt_epoll_error_handler(nxt_task_t *task, void *obj, void *data) 653 { 654 nxt_fd_event_t *ev; 655 656 ev = obj; 657 658 ev->read = NXT_EVENT_INACTIVE; 659 ev->write = NXT_EVENT_INACTIVE; 660 661 ev->error_handler(ev->task, ev, data); 662 } 663 664 665 #if (NXT_HAVE_SIGNALFD) 666 667 static nxt_int_t 668 nxt_epoll_add_signal(nxt_event_engine_t *engine) 669 { 670 int fd; 671 struct epoll_event ee; 672 673 if (sigprocmask(SIG_BLOCK, &engine->signals->sigmask, NULL) != 0) { 674 nxt_log(&engine->task, NXT_LOG_CRIT, 675 "sigprocmask(SIG_BLOCK) failed %E", nxt_errno); 676 return NXT_ERROR; 677 } 678 679 /* 680 * Glibc signalfd() wrapper always has the flags argument. Glibc 2.7 681 * and 2.8 signalfd() wrappers call the original signalfd() syscall 682 * without the flags argument. Glibc 2.9+ signalfd() wrapper at first 683 * tries to call signalfd4() syscall and if it fails then calls the 684 * original signalfd() syscall. For this reason the non-blocking mode 685 * is set separately. 686 */ 687 688 fd = signalfd(-1, &engine->signals->sigmask, 0); 689 690 if (fd == -1) { 691 nxt_log(&engine->task, NXT_LOG_CRIT, "signalfd(%d) failed %E", 692 engine->u.epoll.signalfd.fd, nxt_errno); 693 return NXT_ERROR; 694 } 695 696 engine->u.epoll.signalfd.fd = fd; 697 698 if (nxt_fd_nonblocking(&engine->task, fd) != NXT_OK) { 699 return NXT_ERROR; 700 } 701 702 nxt_debug(&engine->task, "signalfd(): %d", fd); 703 704 engine->u.epoll.signalfd.data = engine->signals->handler; 705 engine->u.epoll.signalfd.read_work_queue = &engine->fast_work_queue; 706 engine->u.epoll.signalfd.read_handler = nxt_epoll_signalfd_handler; 707 engine->u.epoll.signalfd.log = engine->task.log; 708 engine->u.epoll.signalfd.task = &engine->task; 709 710 ee.events = EPOLLIN; 711 ee.data.ptr = &engine->u.epoll.signalfd; 712 713 if (epoll_ctl(engine->u.epoll.fd, EPOLL_CTL_ADD, fd, &ee) != 0) { 714 nxt_log(&engine->task, NXT_LOG_CRIT, "epoll_ctl(%d, %d, %d) failed %E", 715 engine->u.epoll.fd, EPOLL_CTL_ADD, fd, nxt_errno); 716 717 return NXT_ERROR; 718 } 719 720 return NXT_OK; 721 } 722 723 724 static void 725 nxt_epoll_signalfd_handler(nxt_task_t *task, void *obj, void *data) 726 { 727 int n; 728 nxt_fd_event_t *ev; 729 nxt_work_handler_t handler; 730 struct signalfd_siginfo sfd; 731 732 ev = obj; 733 handler = data; 734 735 nxt_debug(task, "signalfd handler"); 736 737 n = read(ev->fd, &sfd, sizeof(struct signalfd_siginfo)); 738 739 nxt_debug(task, "read signalfd(%d): %d", ev->fd, n); 740 741 if (n != sizeof(struct signalfd_siginfo)) { 742 nxt_log(task, NXT_LOG_CRIT, "read signalfd(%d) failed %E", 743 ev->fd, nxt_errno); 744 } 745 746 nxt_debug(task, "signalfd(%d) signo:%d", ev->fd, sfd.ssi_signo); 747 748 handler(task, (void *) (uintptr_t) sfd.ssi_signo, NULL); 749 } 750 751 #endif 752 753 754 #if (NXT_HAVE_EVENTFD) 755 756 static nxt_int_t 757 nxt_epoll_enable_post(nxt_event_engine_t *engine, nxt_work_handler_t handler) 758 { 759 int ret; 760 struct epoll_event ee; 761 762 engine->u.epoll.post_handler = handler; 763 764 /* 765 * Glibc eventfd() wrapper always has the flags argument. Glibc 2.7 766 * and 2.8 eventfd() wrappers call the original eventfd() syscall 767 * without the flags argument. Glibc 2.9+ eventfd() wrapper at first 768 * tries to call eventfd2() syscall and if it fails then calls the 769 * original eventfd() syscall. For this reason the non-blocking mode 770 * is set separately. 771 */ 772 773 engine->u.epoll.eventfd.fd = eventfd(0, 0); 774 775 if (engine->u.epoll.eventfd.fd == -1) { 776 nxt_log(&engine->task, NXT_LOG_CRIT, "eventfd() failed %E", nxt_errno); 777 return NXT_ERROR; 778 } 779 780 ret = nxt_fd_nonblocking(&engine->task, engine->u.epoll.eventfd.fd); 781 if (nxt_slow_path(ret != NXT_OK)) { 782 return NXT_ERROR; 783 } 784 785 nxt_debug(&engine->task, "eventfd(): %d", engine->u.epoll.eventfd.fd); 786 787 engine->u.epoll.eventfd.read_work_queue = &engine->fast_work_queue; 788 engine->u.epoll.eventfd.read_handler = nxt_epoll_eventfd_handler; 789 engine->u.epoll.eventfd.data = engine; 790 engine->u.epoll.eventfd.log = engine->task.log; 791 engine->u.epoll.eventfd.task = &engine->task; 792 793 ee.events = EPOLLIN | EPOLLET; 794 ee.data.ptr = &engine->u.epoll.eventfd; 795 796 ret = epoll_ctl(engine->u.epoll.fd, EPOLL_CTL_ADD, 797 engine->u.epoll.eventfd.fd, &ee); 798 799 if (nxt_fast_path(ret == 0)) { 800 return NXT_OK; 801 } 802 803 nxt_log(&engine->task, NXT_LOG_CRIT, "epoll_ctl(%d, %d, %d) failed %E", 804 engine->u.epoll.fd, EPOLL_CTL_ADD, engine->u.epoll.eventfd.fd, 805 nxt_errno); 806 807 return NXT_ERROR; 808 } 809 810 811 static void 812 nxt_epoll_eventfd_handler(nxt_task_t *task, void *obj, void *data) 813 { 814 int n; 815 uint64_t events; 816 nxt_event_engine_t *engine; 817 818 engine = data; 819 820 nxt_debug(task, "eventfd handler, times:%ui", engine->u.epoll.neventfd); 821 822 /* 823 * The maximum value after write() to a eventfd() descriptor will 824 * block or return EAGAIN is 0xfffffffffffffffe, so the descriptor 825 * can be read once per many notifications, for example, once per 826 * 2^32-2 noticifcations. Since the eventfd() file descriptor is 827 * always registered in EPOLLET mode, epoll returns event about 828 * only the latest write() to the descriptor. 829 */ 830 831 if (engine->u.epoll.neventfd++ >= 0xfffffffe) { 832 engine->u.epoll.neventfd = 0; 833 834 n = read(engine->u.epoll.eventfd.fd, &events, sizeof(uint64_t)); 835 836 nxt_debug(task, "read(%d): %d events:%uL", 837 engine->u.epoll.eventfd.fd, n, events); 838 839 if (n != sizeof(uint64_t)) { 840 nxt_log(task, NXT_LOG_CRIT, "read eventfd(%d) failed %E", 841 engine->u.epoll.eventfd.fd, nxt_errno); 842 } 843 } 844 845 engine->u.epoll.post_handler(task, NULL, NULL); 846 } 847 848 849 static void 850 nxt_epoll_signal(nxt_event_engine_t *engine, nxt_uint_t signo) 851 { 852 size_t ret; 853 uint64_t event; 854 855 /* 856 * eventfd() presents along with signalfd(), so the function 857 * is used only to post events and the signo argument is ignored. 858 */ 859 860 event = 1; 861 862 ret = write(engine->u.epoll.eventfd.fd, &event, sizeof(uint64_t)); 863 864 if (nxt_slow_path(ret != sizeof(uint64_t))) { 865 nxt_log(&engine->task, NXT_LOG_CRIT, "write(%d) to eventfd failed %E", 866 engine->u.epoll.eventfd.fd, nxt_errno); 867 } 868 } 869 870 #endif 871 872 873 static void 874 nxt_epoll_poll(nxt_event_engine_t *engine, nxt_msec_t timeout) 875 { 876 int nevents; 877 uint32_t events; 878 nxt_int_t i; 879 nxt_err_t err; 880 nxt_bool_t error; 881 nxt_uint_t level; 882 nxt_fd_event_t *ev; 883 struct epoll_event *event; 884 885 if (engine->u.epoll.nchanges != 0) { 886 if (nxt_epoll_commit_changes(engine) != NXT_OK) { 887 /* Error handlers have been enqueued on failure. */ 888 timeout = 0; 889 } 890 } 891 892 nxt_debug(&engine->task, "epoll_wait(%d) timeout:%M", 893 engine->u.epoll.fd, timeout); 894 895 nevents = epoll_wait(engine->u.epoll.fd, engine->u.epoll.events, 896 engine->u.epoll.mevents, timeout); 897 898 err = (nevents == -1) ? nxt_errno : 0; 899 900 nxt_thread_time_update(engine->task.thread); 901 902 nxt_debug(&engine->task, "epoll_wait(%d): %d", engine->u.epoll.fd, nevents); 903 904 if (nevents == -1) { 905 level = (err == NXT_EINTR) ? NXT_LOG_INFO : NXT_LOG_CRIT; 906 907 nxt_log(&engine->task, level, "epoll_wait(%d) failed %E", 908 engine->u.epoll.fd, err); 909 910 return; 911 } 912 913 for (i = 0; i < nevents; i++) { 914 915 event = &engine->u.epoll.events[i]; 916 events = event->events; 917 ev = event->data.ptr; 918 919 nxt_debug(ev->task, "epoll: fd:%d ev:%04XD d:%p rd:%d wr:%d", 920 ev->fd, events, ev, ev->read, ev->write); 921 922 /* 923 * On error epoll may set EPOLLERR and EPOLLHUP only without EPOLLIN or 924 * EPOLLOUT, so the "error" variable enqueues only one active handler. 925 */ 926 error = ((events & (EPOLLERR | EPOLLHUP)) != 0); 927 ev->epoll_error = error; 928 929 #if (NXT_HAVE_EPOLL_EDGE) 930 931 ev->epoll_eof = ((events & EPOLLRDHUP) != 0); 932 933 #endif 934 935 if ((events & EPOLLIN) || error) { 936 ev->read_ready = 1; 937 938 if (ev->read != NXT_EVENT_BLOCKED) { 939 940 if (ev->read == NXT_EVENT_ONESHOT) { 941 ev->read = NXT_EVENT_DISABLED; 942 } 943 944 error = 0; 945 946 nxt_work_queue_add(ev->read_work_queue, ev->read_handler, 947 ev->task, ev, ev->data); 948 949 } else if (engine->u.epoll.mode == 0) { 950 /* Level-triggered mode. */ 951 nxt_epoll_disable_read(engine, ev); 952 } 953 } 954 955 if ((events & EPOLLOUT) || error) { 956 ev->write_ready = 1; 957 958 if (ev->write != NXT_EVENT_BLOCKED) { 959 960 if (ev->write == NXT_EVENT_ONESHOT) { 961 ev->write = NXT_EVENT_DISABLED; 962 } 963 964 error = 0; 965 966 nxt_work_queue_add(ev->write_work_queue, ev->write_handler, 967 ev->task, ev, ev->data); 968 969 } else if (engine->u.epoll.mode == 0) { 970 /* Level-triggered mode. */ 971 nxt_epoll_disable_write(engine, ev); 972 } 973 } 974 975 if (error) { 976 ev->read_ready = 1; 977 ev->write_ready = 1; 978 } 979 } 980 } 981 982 983 #if (NXT_HAVE_ACCEPT4) 984 985 static void 986 nxt_epoll_conn_io_accept4(nxt_task_t *task, void *obj, void *data) 987 { 988 socklen_t len; 989 nxt_conn_t *c; 990 nxt_socket_t s; 991 struct sockaddr *sa; 992 nxt_listen_event_t *lev; 993 994 lev = obj; 995 c = lev->next; 996 997 lev->ready--; 998 lev->socket.read_ready = (lev->ready != 0); 999 1000 len = c->remote->socklen; 1001 1002 if (len >= sizeof(struct sockaddr)) { 1003 sa = &c->remote->u.sockaddr; 1004 1005 } else { 1006 sa = NULL; 1007 len = 0; 1008 } 1009 1010 s = accept4(lev->socket.fd, sa, &len, SOCK_NONBLOCK); 1011 1012 if (s != -1) { 1013 c->socket.fd = s; 1014 1015 nxt_debug(task, "accept4(%d): %d", lev->socket.fd, s); 1016 1017 nxt_conn_accept(task, lev, c); 1018 return; 1019 } 1020 1021 nxt_conn_accept_error(task, lev, "accept4", nxt_errno); 1022 } 1023 1024 #endif 1025 1026 1027 #if (NXT_HAVE_EPOLL_EDGE) 1028 1029 /* 1030 * nxt_epoll_edge_event_conn_io_connect() eliminates the getsockopt() 1031 * syscall to test pending connect() error. Although this special 1032 * interface can work in both edge-triggered and level-triggered 1033 * modes it is enabled only for the former mode because this mode is 1034 * available in all modern Linux distributions. For the latter mode 1035 * it is required to create additional nxt_epoll_level_event_conn_io 1036 * with single non-generic connect() interface. 1037 */ 1038 1039 static void 1040 nxt_epoll_edge_conn_io_connect(nxt_task_t *task, void *obj, void *data) 1041 { 1042 nxt_conn_t *c; 1043 nxt_event_engine_t *engine; 1044 nxt_work_handler_t handler; 1045 const nxt_event_conn_state_t *state; 1046 1047 c = obj; 1048 1049 state = c->write_state; 1050 1051 switch (nxt_socket_connect(task, c->socket.fd, c->remote) ){ 1052 1053 case NXT_OK: 1054 c->socket.write_ready = 1; 1055 handler = state->ready_handler; 1056 break; 1057 1058 case NXT_AGAIN: 1059 c->socket.write_handler = nxt_epoll_edge_conn_connected; 1060 c->socket.error_handler = nxt_conn_connect_error; 1061 1062 engine = task->thread->engine; 1063 nxt_conn_timer(engine, c, state, &c->write_timer); 1064 1065 nxt_epoll_enable(engine, &c->socket); 1066 c->socket.read = NXT_EVENT_BLOCKED; 1067 return; 1068 1069 #if 0 1070 case NXT_AGAIN: 1071 nxt_conn_timer(engine, c, state, &c->write_timer); 1072 1073 /* Fall through. */ 1074 1075 case NXT_OK: 1076 /* 1077 * Mark both read and write directions as ready and try to perform 1078 * I/O operations before receiving readiness notifications. 1079 * On unconnected socket Linux send() and recv() return EAGAIN 1080 * instead of ENOTCONN. 1081 */ 1082 c->socket.read_ready = 1; 1083 c->socket.write_ready = 1; 1084 /* 1085 * Enabling both read and write notifications on a getting 1086 * connected socket eliminates one epoll_ctl() syscall. 1087 */ 1088 c->socket.write_handler = nxt_epoll_edge_event_conn_connected; 1089 c->socket.error_handler = state->error_handler; 1090 1091 nxt_epoll_enable(engine, &c->socket); 1092 c->socket.read = NXT_EVENT_BLOCKED; 1093 1094 handler = state->ready_handler; 1095 break; 1096 #endif 1097 1098 case NXT_ERROR: 1099 handler = state->error_handler; 1100 break; 1101 1102 default: /* NXT_DECLINED: connection refused. */ 1103 handler = state->close_handler; 1104 break; 1105 } 1106 1107 nxt_work_queue_add(c->write_work_queue, handler, task, c, data); 1108 } 1109 1110 1111 static void 1112 nxt_epoll_edge_conn_connected(nxt_task_t *task, void *obj, void *data) 1113 { 1114 nxt_conn_t *c; 1115 1116 c = obj; 1117 1118 nxt_debug(task, "epoll event conn connected fd:%d", c->socket.fd); 1119 1120 if (!c->socket.epoll_error) { 1121 c->socket.write = NXT_EVENT_BLOCKED; 1122 1123 if (c->write_state->timer_autoreset) { 1124 nxt_timer_disable(task->thread->engine, &c->write_timer); 1125 } 1126 1127 nxt_work_queue_add(c->write_work_queue, c->write_state->ready_handler, 1128 task, c, data); 1129 return; 1130 } 1131 1132 nxt_conn_connect_test(task, c, data); 1133 } 1134 1135 1136 /* 1137 * nxt_epoll_edge_conn_io_recvbuf() is just wrapper around 1138 * standard nxt_conn_io_recvbuf() to enforce to read a pending EOF 1139 * in edge-triggered mode. 1140 */ 1141 1142 static ssize_t 1143 nxt_epoll_edge_conn_io_recvbuf(nxt_conn_t *c, nxt_buf_t *b) 1144 { 1145 ssize_t n; 1146 1147 n = nxt_conn_io_recvbuf(c, b); 1148 1149 if (n > 0 && c->socket.epoll_eof) { 1150 c->socket.read_ready = 1; 1151 } 1152 1153 return n; 1154 } 1155 1156 #endif 1157