1 2 /* 3 * Copyright (C) Igor Sysoev 4 * Copyright (C) NGINX, Inc. 5 */ 6 7 #include <nxt_main.h> 8 9 10 /* 11 * The first epoll version has been introduced in Linux 2.5.44. The 12 * interface was changed several times since then and the final version 13 * of epoll_create(), epoll_ctl(), epoll_wait(), and EPOLLET mode has 14 * been introduced in Linux 2.6.0 and is supported since glibc 2.3.2. 15 * 16 * EPOLLET mode did not work reliable in early implementaions and in 17 * Linux 2.4 backport. 18 * 19 * EPOLLONESHOT Linux 2.6.2, glibc 2.3. 20 * EPOLLRDHUP Linux 2.6.17, glibc 2.8. 21 * epoll_pwait() Linux 2.6.19, glibc 2.6. 22 * signalfd() Linux 2.6.22, glibc 2.7. 23 * eventfd() Linux 2.6.22, glibc 2.7. 24 * timerfd_create() Linux 2.6.25, glibc 2.8. 25 * epoll_create1() Linux 2.6.27, glibc 2.9. 26 * signalfd4() Linux 2.6.27, glibc 2.9. 27 * eventfd2() Linux 2.6.27, glibc 2.9. 28 * accept4() Linux 2.6.28, glibc 2.10. 29 * eventfd2(EFD_SEMAPHORE) Linux 2.6.30, glibc 2.10. 30 * EPOLLEXCLUSIVE Linux 4.5, glibc 2.24. 31 */ 32 33 34 #if (NXT_HAVE_EPOLL_EDGE) 35 static nxt_int_t nxt_epoll_edge_create(nxt_event_engine_t *engine, 36 nxt_uint_t mchanges, nxt_uint_t mevents); 37 #endif 38 static nxt_int_t nxt_epoll_level_create(nxt_event_engine_t *engine, 39 nxt_uint_t mchanges, nxt_uint_t mevents); 40 static nxt_int_t nxt_epoll_create(nxt_event_engine_t *engine, 41 nxt_uint_t mchanges, nxt_uint_t mevents, nxt_conn_io_t *io, uint32_t mode); 42 static void nxt_epoll_test_accept4(nxt_event_engine_t *engine, 43 nxt_conn_io_t *io); 44 static void nxt_epoll_free(nxt_event_engine_t *engine); 45 static void nxt_epoll_enable(nxt_event_engine_t *engine, nxt_fd_event_t *ev); 46 static void nxt_epoll_disable(nxt_event_engine_t *engine, nxt_fd_event_t *ev); 47 static void nxt_epoll_delete(nxt_event_engine_t *engine, nxt_fd_event_t *ev); 48 static nxt_bool_t nxt_epoll_close(nxt_event_engine_t *engine, 49 nxt_fd_event_t *ev); 50 static void nxt_epoll_enable_read(nxt_event_engine_t *engine, 51 nxt_fd_event_t *ev); 52 static void nxt_epoll_enable_write(nxt_event_engine_t *engine, 53 nxt_fd_event_t *ev); 54 static void nxt_epoll_disable_read(nxt_event_engine_t *engine, 55 nxt_fd_event_t *ev); 56 static void nxt_epoll_disable_write(nxt_event_engine_t *engine, 57 nxt_fd_event_t *ev); 58 static void nxt_epoll_block_read(nxt_event_engine_t *engine, 59 nxt_fd_event_t *ev); 60 static void nxt_epoll_block_write(nxt_event_engine_t *engine, 61 nxt_fd_event_t *ev); 62 static void nxt_epoll_oneshot_read(nxt_event_engine_t *engine, 63 nxt_fd_event_t *ev); 64 static void nxt_epoll_oneshot_write(nxt_event_engine_t *engine, 65 nxt_fd_event_t *ev); 66 static void nxt_epoll_enable_accept(nxt_event_engine_t *engine, 67 nxt_fd_event_t *ev); 68 static void nxt_epoll_change(nxt_event_engine_t *engine, nxt_fd_event_t *ev, 69 int op, uint32_t events); 70 static nxt_int_t nxt_epoll_commit_changes(nxt_event_engine_t *engine); 71 static void nxt_epoll_error_handler(nxt_task_t *task, void *obj, void *data); 72 #if (NXT_HAVE_SIGNALFD) 73 static nxt_int_t nxt_epoll_add_signal(nxt_event_engine_t *engine); 74 static void nxt_epoll_signalfd_handler(nxt_task_t *task, void *obj, void *data); 75 #endif 76 #if (NXT_HAVE_EVENTFD) 77 static nxt_int_t nxt_epoll_enable_post(nxt_event_engine_t *engine, 78 nxt_work_handler_t handler); 79 static void nxt_epoll_eventfd_handler(nxt_task_t *task, void *obj, void *data); 80 static void nxt_epoll_signal(nxt_event_engine_t *engine, nxt_uint_t signo); 81 #endif 82 static void nxt_epoll_poll(nxt_event_engine_t *engine, nxt_msec_t timeout); 83 84 #if (NXT_HAVE_ACCEPT4) 85 static void nxt_epoll_conn_io_accept4(nxt_task_t *task, void *obj, 86 void *data); 87 #endif 88 89 90 #if (NXT_HAVE_EPOLL_EDGE) 91 92 static void nxt_epoll_edge_conn_io_connect(nxt_task_t *task, void *obj, 93 void *data); 94 static void nxt_epoll_edge_conn_connected(nxt_task_t *task, void *obj, 95 void *data); 96 static ssize_t nxt_epoll_edge_conn_io_recvbuf(nxt_conn_t *c, nxt_buf_t *b); 97 98 99 static nxt_conn_io_t nxt_epoll_edge_conn_io = { 100 nxt_epoll_edge_conn_io_connect, 101 nxt_conn_io_accept, 102 103 nxt_conn_io_read, 104 nxt_epoll_edge_conn_io_recvbuf, 105 nxt_conn_io_recv, 106 107 nxt_conn_io_write, 108 nxt_event_conn_io_write_chunk, 109 110 #if (NXT_HAVE_LINUX_SENDFILE) 111 nxt_linux_event_conn_io_sendfile, 112 #else 113 nxt_event_conn_io_sendbuf, 114 #endif 115 116 nxt_event_conn_io_writev, 117 nxt_event_conn_io_send, 118 119 nxt_conn_io_shutdown, 120 }; 121 122 123 const nxt_event_interface_t nxt_epoll_edge_engine = { 124 "epoll_edge", 125 nxt_epoll_edge_create, 126 nxt_epoll_free, 127 nxt_epoll_enable, 128 nxt_epoll_disable, 129 nxt_epoll_delete, 130 nxt_epoll_close, 131 nxt_epoll_enable_read, 132 nxt_epoll_enable_write, 133 nxt_epoll_disable_read, 134 nxt_epoll_disable_write, 135 nxt_epoll_block_read, 136 nxt_epoll_block_write, 137 nxt_epoll_oneshot_read, 138 nxt_epoll_oneshot_write, 139 nxt_epoll_enable_accept, 140 NULL, 141 NULL, 142 #if (NXT_HAVE_EVENTFD) 143 nxt_epoll_enable_post, 144 nxt_epoll_signal, 145 #else 146 NULL, 147 NULL, 148 #endif 149 nxt_epoll_poll, 150 151 &nxt_epoll_edge_conn_io, 152 153 #if (NXT_HAVE_INOTIFY) 154 NXT_FILE_EVENTS, 155 #else 156 NXT_NO_FILE_EVENTS, 157 #endif 158 159 #if (NXT_HAVE_SIGNALFD) 160 NXT_SIGNAL_EVENTS, 161 #else 162 NXT_NO_SIGNAL_EVENTS, 163 #endif 164 }; 165 166 #endif 167 168 169 const nxt_event_interface_t nxt_epoll_level_engine = { 170 "epoll_level", 171 nxt_epoll_level_create, 172 nxt_epoll_free, 173 nxt_epoll_enable, 174 nxt_epoll_disable, 175 nxt_epoll_delete, 176 nxt_epoll_close, 177 nxt_epoll_enable_read, 178 nxt_epoll_enable_write, 179 nxt_epoll_disable_read, 180 nxt_epoll_disable_write, 181 nxt_epoll_block_read, 182 nxt_epoll_block_write, 183 nxt_epoll_oneshot_read, 184 nxt_epoll_oneshot_write, 185 nxt_epoll_enable_accept, 186 NULL, 187 NULL, 188 #if (NXT_HAVE_EVENTFD) 189 nxt_epoll_enable_post, 190 nxt_epoll_signal, 191 #else 192 NULL, 193 NULL, 194 #endif 195 nxt_epoll_poll, 196 197 &nxt_unix_conn_io, 198 199 #if (NXT_HAVE_INOTIFY) 200 NXT_FILE_EVENTS, 201 #else 202 NXT_NO_FILE_EVENTS, 203 #endif 204 205 #if (NXT_HAVE_SIGNALFD) 206 NXT_SIGNAL_EVENTS, 207 #else 208 NXT_NO_SIGNAL_EVENTS, 209 #endif 210 }; 211 212 213 #if (NXT_HAVE_EPOLL_EDGE) 214 215 static nxt_int_t 216 nxt_epoll_edge_create(nxt_event_engine_t *engine, nxt_uint_t mchanges, 217 nxt_uint_t mevents) 218 { 219 return nxt_epoll_create(engine, mchanges, mevents, &nxt_epoll_edge_conn_io, 220 EPOLLET | EPOLLRDHUP); 221 } 222 223 #endif 224 225 226 static nxt_int_t 227 nxt_epoll_level_create(nxt_event_engine_t *engine, nxt_uint_t mchanges, 228 nxt_uint_t mevents) 229 { 230 return nxt_epoll_create(engine, mchanges, mevents, 231 &nxt_unix_conn_io, 0); 232 } 233 234 235 static nxt_int_t 236 nxt_epoll_create(nxt_event_engine_t *engine, nxt_uint_t mchanges, 237 nxt_uint_t mevents, nxt_conn_io_t *io, uint32_t mode) 238 { 239 engine->u.epoll.fd = -1; 240 engine->u.epoll.mode = mode; 241 engine->u.epoll.mchanges = mchanges; 242 engine->u.epoll.mevents = mevents; 243 #if (NXT_HAVE_SIGNALFD) 244 engine->u.epoll.signalfd.fd = -1; 245 #endif 246 247 engine->u.epoll.changes = nxt_malloc(sizeof(nxt_epoll_change_t) * mchanges); 248 if (engine->u.epoll.changes == NULL) { 249 goto fail; 250 } 251 252 engine->u.epoll.events = nxt_malloc(sizeof(struct epoll_event) * mevents); 253 if (engine->u.epoll.events == NULL) { 254 goto fail; 255 } 256 257 engine->u.epoll.fd = epoll_create(1); 258 if (engine->u.epoll.fd == -1) { 259 nxt_log(&engine->task, NXT_LOG_CRIT, "epoll_create() failed %E", 260 nxt_errno); 261 goto fail; 262 } 263 264 nxt_debug(&engine->task, "epoll_create(): %d", engine->u.epoll.fd); 265 266 if (engine->signals != NULL) { 267 268 #if (NXT_HAVE_SIGNALFD) 269 270 if (nxt_epoll_add_signal(engine) != NXT_OK) { 271 goto fail; 272 } 273 274 #endif 275 276 nxt_epoll_test_accept4(engine, io); 277 } 278 279 return NXT_OK; 280 281 fail: 282 283 nxt_epoll_free(engine); 284 285 return NXT_ERROR; 286 } 287 288 289 static void 290 nxt_epoll_test_accept4(nxt_event_engine_t *engine, nxt_conn_io_t *io) 291 { 292 static nxt_work_handler_t handler; 293 294 if (handler == NULL) { 295 296 handler = io->accept; 297 298 #if (NXT_HAVE_ACCEPT4) 299 300 (void) accept4(-1, NULL, NULL, SOCK_NONBLOCK); 301 302 if (nxt_errno != NXT_ENOSYS) { 303 handler = nxt_epoll_conn_io_accept4; 304 305 } else { 306 nxt_log(&engine->task, NXT_LOG_INFO, "accept4() failed %E", 307 NXT_ENOSYS); 308 } 309 310 #endif 311 } 312 313 io->accept = handler; 314 } 315 316 317 static void 318 nxt_epoll_free(nxt_event_engine_t *engine) 319 { 320 int fd; 321 322 nxt_debug(&engine->task, "epoll %d free", engine->u.epoll.fd); 323 324 #if (NXT_HAVE_SIGNALFD) 325 326 fd = engine->u.epoll.signalfd.fd; 327 328 if (fd != -1 && close(fd) != 0) { 329 nxt_log(&engine->task, NXT_LOG_CRIT, "signalfd close(%d) failed %E", 330 fd, nxt_errno); 331 } 332 333 #endif 334 335 #if (NXT_HAVE_EVENTFD) 336 337 fd = engine->u.epoll.eventfd.fd; 338 339 if (fd != -1 && close(fd) != 0) { 340 nxt_log(&engine->task, NXT_LOG_CRIT, "eventfd close(%d) failed %E", 341 fd, nxt_errno); 342 } 343 344 #endif 345 346 fd = engine->u.epoll.fd; 347 348 if (fd != -1 && close(fd) != 0) { 349 nxt_log(&engine->task, NXT_LOG_CRIT, "epoll close(%d) failed %E", 350 fd, nxt_errno); 351 } 352 353 nxt_free(engine->u.epoll.events); 354 nxt_free(engine->u.epoll.changes); 355 356 nxt_memzero(&engine->u.epoll, sizeof(nxt_epoll_engine_t)); 357 } 358 359 360 static void 361 nxt_epoll_enable(nxt_event_engine_t *engine, nxt_fd_event_t *ev) 362 { 363 ev->read = NXT_EVENT_ACTIVE; 364 ev->write = NXT_EVENT_ACTIVE; 365 366 nxt_epoll_change(engine, ev, EPOLL_CTL_ADD, 367 EPOLLIN | EPOLLOUT | engine->u.epoll.mode); 368 } 369 370 371 static void 372 nxt_epoll_disable(nxt_event_engine_t *engine, nxt_fd_event_t *ev) 373 { 374 if (ev->read > NXT_EVENT_DISABLED || ev->write > NXT_EVENT_DISABLED) { 375 376 ev->read = NXT_EVENT_INACTIVE; 377 ev->write = NXT_EVENT_INACTIVE; 378 379 nxt_epoll_change(engine, ev, EPOLL_CTL_DEL, 0); 380 } 381 } 382 383 384 static void 385 nxt_epoll_delete(nxt_event_engine_t *engine, nxt_fd_event_t *ev) 386 { 387 if (ev->read != NXT_EVENT_INACTIVE || ev->write != NXT_EVENT_INACTIVE) { 388 389 ev->read = NXT_EVENT_INACTIVE; 390 ev->write = NXT_EVENT_INACTIVE; 391 392 nxt_epoll_change(engine, ev, EPOLL_CTL_DEL, 0); 393 } 394 } 395 396 397 /* 398 * Although calling close() on a file descriptor will remove any epoll 399 * events that reference the descriptor, in this case the close() acquires 400 * the kernel global "epmutex" while epoll_ctl(EPOLL_CTL_DEL) does not 401 * acquire the "epmutex" since Linux 3.13 if the file descriptor presents 402 * only in one epoll set. Thus removing events explicitly before closing 403 * eliminates possible lock contention. 404 */ 405 406 static nxt_bool_t 407 nxt_epoll_close(nxt_event_engine_t *engine, nxt_fd_event_t *ev) 408 { 409 nxt_epoll_delete(engine, ev); 410 411 return ev->changing; 412 } 413 414 415 static void 416 nxt_epoll_enable_read(nxt_event_engine_t *engine, nxt_fd_event_t *ev) 417 { 418 int op; 419 uint32_t events; 420 421 if (ev->read != NXT_EVENT_BLOCKED) { 422 423 op = EPOLL_CTL_MOD; 424 events = EPOLLIN | engine->u.epoll.mode; 425 426 if (ev->read == NXT_EVENT_INACTIVE && ev->write == NXT_EVENT_INACTIVE) { 427 op = EPOLL_CTL_ADD; 428 429 } else if (ev->write >= NXT_EVENT_BLOCKED) { 430 events |= EPOLLOUT; 431 } 432 433 nxt_epoll_change(engine, ev, op, events); 434 } 435 436 ev->read = NXT_EVENT_ACTIVE; 437 } 438 439 440 static void 441 nxt_epoll_enable_write(nxt_event_engine_t *engine, nxt_fd_event_t *ev) 442 { 443 int op; 444 uint32_t events; 445 446 if (ev->write != NXT_EVENT_BLOCKED) { 447 448 op = EPOLL_CTL_MOD; 449 events = EPOLLOUT | engine->u.epoll.mode; 450 451 if (ev->read == NXT_EVENT_INACTIVE && ev->write == NXT_EVENT_INACTIVE) { 452 op = EPOLL_CTL_ADD; 453 454 } else if (ev->read >= NXT_EVENT_BLOCKED) { 455 events |= EPOLLIN; 456 } 457 458 nxt_epoll_change(engine, ev, op, events); 459 } 460 461 ev->write = NXT_EVENT_ACTIVE; 462 } 463 464 465 static void 466 nxt_epoll_disable_read(nxt_event_engine_t *engine, nxt_fd_event_t *ev) 467 { 468 int op; 469 uint32_t events; 470 471 ev->read = NXT_EVENT_INACTIVE; 472 473 if (ev->write <= NXT_EVENT_DISABLED) { 474 ev->write = NXT_EVENT_INACTIVE; 475 op = EPOLL_CTL_DEL; 476 events = 0; 477 478 } else { 479 op = EPOLL_CTL_MOD; 480 events = EPOLLOUT | engine->u.epoll.mode; 481 } 482 483 nxt_epoll_change(engine, ev, op, events); 484 } 485 486 487 static void 488 nxt_epoll_disable_write(nxt_event_engine_t *engine, nxt_fd_event_t *ev) 489 { 490 int op; 491 uint32_t events; 492 493 ev->write = NXT_EVENT_INACTIVE; 494 495 if (ev->read <= NXT_EVENT_DISABLED) { 496 ev->write = NXT_EVENT_INACTIVE; 497 op = EPOLL_CTL_DEL; 498 events = 0; 499 500 } else { 501 op = EPOLL_CTL_MOD; 502 events = EPOLLIN | engine->u.epoll.mode; 503 } 504 505 nxt_epoll_change(engine, ev, op, events); 506 } 507 508 509 static void 510 nxt_epoll_block_read(nxt_event_engine_t *engine, nxt_fd_event_t *ev) 511 { 512 if (ev->read != NXT_EVENT_INACTIVE) { 513 ev->read = NXT_EVENT_BLOCKED; 514 } 515 } 516 517 518 static void 519 nxt_epoll_block_write(nxt_event_engine_t *engine, nxt_fd_event_t *ev) 520 { 521 if (ev->write != NXT_EVENT_INACTIVE) { 522 ev->write = NXT_EVENT_BLOCKED; 523 } 524 } 525 526 527 /* 528 * NXT_EVENT_DISABLED state is used to track whether EPOLLONESHOT 529 * event should be added or modified, epoll_ctl(2): 530 * 531 * EPOLLONESHOT (since Linux 2.6.2) 532 * Sets the one-shot behavior for the associated file descriptor. 533 * This means that after an event is pulled out with epoll_wait(2) 534 * the associated file descriptor is internally disabled and no 535 * other events will be reported by the epoll interface. The user 536 * must call epoll_ctl() with EPOLL_CTL_MOD to rearm the file 537 * descriptor with a new event mask. 538 */ 539 540 static void 541 nxt_epoll_oneshot_read(nxt_event_engine_t *engine, nxt_fd_event_t *ev) 542 { 543 int op; 544 545 op = (ev->read == NXT_EVENT_INACTIVE && ev->write == NXT_EVENT_INACTIVE) ? 546 EPOLL_CTL_ADD : EPOLL_CTL_MOD; 547 548 ev->read = NXT_EVENT_ONESHOT; 549 ev->write = NXT_EVENT_INACTIVE; 550 551 nxt_epoll_change(engine, ev, op, EPOLLIN | EPOLLONESHOT); 552 } 553 554 555 static void 556 nxt_epoll_oneshot_write(nxt_event_engine_t *engine, nxt_fd_event_t *ev) 557 { 558 int op; 559 560 op = (ev->read == NXT_EVENT_INACTIVE && ev->write == NXT_EVENT_INACTIVE) ? 561 EPOLL_CTL_ADD : EPOLL_CTL_MOD; 562 563 ev->read = NXT_EVENT_INACTIVE; 564 ev->write = NXT_EVENT_ONESHOT; 565 566 nxt_epoll_change(engine, ev, op, EPOLLOUT | EPOLLONESHOT); 567 } 568 569 570 static void 571 nxt_epoll_enable_accept(nxt_event_engine_t *engine, nxt_fd_event_t *ev) 572 { 573 uint32_t events; 574 575 ev->read = NXT_EVENT_ACTIVE; 576 577 events = EPOLLIN; 578 579 #ifdef EPOLLEXCLUSIVE 580 events |= EPOLLEXCLUSIVE; 581 #endif 582 583 nxt_epoll_change(engine, ev, EPOLL_CTL_ADD, events); 584 } 585 586 587 /* 588 * epoll changes are batched to improve instruction and data cache 589 * locality of several epoll_ctl() calls followed by epoll_wait() call. 590 */ 591 592 static void 593 nxt_epoll_change(nxt_event_engine_t *engine, nxt_fd_event_t *ev, int op, 594 uint32_t events) 595 { 596 nxt_epoll_change_t *change; 597 598 nxt_debug(ev->task, "epoll %d set event: fd:%d op:%d ev:%XD", 599 engine->u.epoll.fd, ev->fd, op, events); 600 601 if (engine->u.epoll.nchanges >= engine->u.epoll.mchanges) { 602 (void) nxt_epoll_commit_changes(engine); 603 } 604 605 ev->changing = 1; 606 607 change = &engine->u.epoll.changes[engine->u.epoll.nchanges++]; 608 change->op = op; 609 change->event.events = events; 610 change->event.data.ptr = ev; 611 } 612 613 614 static nxt_int_t 615 nxt_epoll_commit_changes(nxt_event_engine_t *engine) 616 { 617 int ret; 618 nxt_int_t retval; 619 nxt_fd_event_t *ev; 620 nxt_epoll_change_t *change, *end; 621 622 nxt_debug(&engine->task, "epoll %d changes:%ui", 623 engine->u.epoll.fd, engine->u.epoll.nchanges); 624 625 retval = NXT_OK; 626 change = engine->u.epoll.changes; 627 end = change + engine->u.epoll.nchanges; 628 629 do { 630 ev = change->event.data.ptr; 631 ev->changing = 0; 632 633 nxt_debug(ev->task, "epoll_ctl(%d): fd:%d op:%d ev:%XD", 634 engine->u.epoll.fd, ev->fd, change->op, 635 change->event.events); 636 637 ret = epoll_ctl(engine->u.epoll.fd, change->op, ev->fd, &change->event); 638 639 if (nxt_slow_path(ret != 0)) { 640 nxt_log(ev->task, NXT_LOG_CRIT, "epoll_ctl(%d, %d, %d) failed %E", 641 engine->u.epoll.fd, change->op, ev->fd, nxt_errno); 642 643 nxt_work_queue_add(&engine->fast_work_queue, 644 nxt_epoll_error_handler, ev->task, ev, ev->data); 645 646 retval = NXT_ERROR; 647 } 648 649 change++; 650 651 } while (change < end); 652 653 engine->u.epoll.nchanges = 0; 654 655 return retval; 656 } 657 658 659 static void 660 nxt_epoll_error_handler(nxt_task_t *task, void *obj, void *data) 661 { 662 nxt_fd_event_t *ev; 663 664 ev = obj; 665 666 ev->read = NXT_EVENT_INACTIVE; 667 ev->write = NXT_EVENT_INACTIVE; 668 669 ev->error_handler(ev->task, ev, data); 670 } 671 672 673 #if (NXT_HAVE_SIGNALFD) 674 675 static nxt_int_t 676 nxt_epoll_add_signal(nxt_event_engine_t *engine) 677 { 678 int fd; 679 struct epoll_event ee; 680 681 if (sigprocmask(SIG_BLOCK, &engine->signals->sigmask, NULL) != 0) { 682 nxt_log(&engine->task, NXT_LOG_CRIT, 683 "sigprocmask(SIG_BLOCK) failed %E", nxt_errno); 684 return NXT_ERROR; 685 } 686 687 /* 688 * Glibc signalfd() wrapper always has the flags argument. Glibc 2.7 689 * and 2.8 signalfd() wrappers call the original signalfd() syscall 690 * without the flags argument. Glibc 2.9+ signalfd() wrapper at first 691 * tries to call signalfd4() syscall and if it fails then calls the 692 * original signalfd() syscall. For this reason the non-blocking mode 693 * is set separately. 694 */ 695 696 fd = signalfd(-1, &engine->signals->sigmask, 0); 697 698 if (fd == -1) { 699 nxt_log(&engine->task, NXT_LOG_CRIT, "signalfd(%d) failed %E", 700 engine->u.epoll.signalfd.fd, nxt_errno); 701 return NXT_ERROR; 702 } 703 704 engine->u.epoll.signalfd.fd = fd; 705 706 if (nxt_fd_nonblocking(&engine->task, fd) != NXT_OK) { 707 return NXT_ERROR; 708 } 709 710 nxt_debug(&engine->task, "signalfd(): %d", fd); 711 712 engine->u.epoll.signalfd.data = engine->signals->handler; 713 engine->u.epoll.signalfd.read_work_queue = &engine->fast_work_queue; 714 engine->u.epoll.signalfd.read_handler = nxt_epoll_signalfd_handler; 715 engine->u.epoll.signalfd.log = engine->task.log; 716 engine->u.epoll.signalfd.task = &engine->task; 717 718 ee.events = EPOLLIN; 719 ee.data.ptr = &engine->u.epoll.signalfd; 720 721 if (epoll_ctl(engine->u.epoll.fd, EPOLL_CTL_ADD, fd, &ee) != 0) { 722 nxt_log(&engine->task, NXT_LOG_CRIT, "epoll_ctl(%d, %d, %d) failed %E", 723 engine->u.epoll.fd, EPOLL_CTL_ADD, fd, nxt_errno); 724 725 return NXT_ERROR; 726 } 727 728 return NXT_OK; 729 } 730 731 732 static void 733 nxt_epoll_signalfd_handler(nxt_task_t *task, void *obj, void *data) 734 { 735 int n; 736 nxt_fd_event_t *ev; 737 nxt_work_handler_t handler; 738 struct signalfd_siginfo sfd; 739 740 ev = obj; 741 handler = data; 742 743 nxt_debug(task, "signalfd handler"); 744 745 n = read(ev->fd, &sfd, sizeof(struct signalfd_siginfo)); 746 747 nxt_debug(task, "read signalfd(%d): %d", ev->fd, n); 748 749 if (n != sizeof(struct signalfd_siginfo)) { 750 nxt_log(task, NXT_LOG_CRIT, "read signalfd(%d) failed %E", 751 ev->fd, nxt_errno); 752 return; 753 } 754 755 nxt_debug(task, "signalfd(%d) signo:%d", ev->fd, sfd.ssi_signo); 756 757 handler(task, (void *) (uintptr_t) sfd.ssi_signo, NULL); 758 } 759 760 #endif 761 762 763 #if (NXT_HAVE_EVENTFD) 764 765 static nxt_int_t 766 nxt_epoll_enable_post(nxt_event_engine_t *engine, nxt_work_handler_t handler) 767 { 768 int ret; 769 struct epoll_event ee; 770 771 engine->u.epoll.post_handler = handler; 772 773 /* 774 * Glibc eventfd() wrapper always has the flags argument. Glibc 2.7 775 * and 2.8 eventfd() wrappers call the original eventfd() syscall 776 * without the flags argument. Glibc 2.9+ eventfd() wrapper at first 777 * tries to call eventfd2() syscall and if it fails then calls the 778 * original eventfd() syscall. For this reason the non-blocking mode 779 * is set separately. 780 */ 781 782 engine->u.epoll.eventfd.fd = eventfd(0, 0); 783 784 if (engine->u.epoll.eventfd.fd == -1) { 785 nxt_log(&engine->task, NXT_LOG_CRIT, "eventfd() failed %E", nxt_errno); 786 return NXT_ERROR; 787 } 788 789 ret = nxt_fd_nonblocking(&engine->task, engine->u.epoll.eventfd.fd); 790 if (nxt_slow_path(ret != NXT_OK)) { 791 return NXT_ERROR; 792 } 793 794 nxt_debug(&engine->task, "eventfd(): %d", engine->u.epoll.eventfd.fd); 795 796 engine->u.epoll.eventfd.read_work_queue = &engine->fast_work_queue; 797 engine->u.epoll.eventfd.read_handler = nxt_epoll_eventfd_handler; 798 engine->u.epoll.eventfd.data = engine; 799 engine->u.epoll.eventfd.log = engine->task.log; 800 engine->u.epoll.eventfd.task = &engine->task; 801 802 ee.events = EPOLLIN | EPOLLET; 803 ee.data.ptr = &engine->u.epoll.eventfd; 804 805 ret = epoll_ctl(engine->u.epoll.fd, EPOLL_CTL_ADD, 806 engine->u.epoll.eventfd.fd, &ee); 807 808 if (nxt_fast_path(ret == 0)) { 809 return NXT_OK; 810 } 811 812 nxt_log(&engine->task, NXT_LOG_CRIT, "epoll_ctl(%d, %d, %d) failed %E", 813 engine->u.epoll.fd, EPOLL_CTL_ADD, engine->u.epoll.eventfd.fd, 814 nxt_errno); 815 816 return NXT_ERROR; 817 } 818 819 820 static void 821 nxt_epoll_eventfd_handler(nxt_task_t *task, void *obj, void *data) 822 { 823 int n; 824 uint64_t events; 825 nxt_event_engine_t *engine; 826 827 engine = data; 828 829 nxt_debug(task, "eventfd handler, times:%ui", engine->u.epoll.neventfd); 830 831 /* 832 * The maximum value after write() to a eventfd() descriptor will 833 * block or return EAGAIN is 0xfffffffffffffffe, so the descriptor 834 * can be read once per many notifications, for example, once per 835 * 2^32-2 noticifcations. Since the eventfd() file descriptor is 836 * always registered in EPOLLET mode, epoll returns event about 837 * only the latest write() to the descriptor. 838 */ 839 840 if (engine->u.epoll.neventfd++ >= 0xfffffffe) { 841 engine->u.epoll.neventfd = 0; 842 843 n = read(engine->u.epoll.eventfd.fd, &events, sizeof(uint64_t)); 844 845 nxt_debug(task, "read(%d): %d events:%uL", 846 engine->u.epoll.eventfd.fd, n, events); 847 848 if (n != sizeof(uint64_t)) { 849 nxt_log(task, NXT_LOG_CRIT, "read eventfd(%d) failed %E", 850 engine->u.epoll.eventfd.fd, nxt_errno); 851 } 852 } 853 854 engine->u.epoll.post_handler(task, NULL, NULL); 855 } 856 857 858 static void 859 nxt_epoll_signal(nxt_event_engine_t *engine, nxt_uint_t signo) 860 { 861 size_t ret; 862 uint64_t event; 863 864 /* 865 * eventfd() presents along with signalfd(), so the function 866 * is used only to post events and the signo argument is ignored. 867 */ 868 869 event = 1; 870 871 ret = write(engine->u.epoll.eventfd.fd, &event, sizeof(uint64_t)); 872 873 if (nxt_slow_path(ret != sizeof(uint64_t))) { 874 nxt_log(&engine->task, NXT_LOG_CRIT, "write(%d) to eventfd failed %E", 875 engine->u.epoll.eventfd.fd, nxt_errno); 876 } 877 } 878 879 #endif 880 881 882 static void 883 nxt_epoll_poll(nxt_event_engine_t *engine, nxt_msec_t timeout) 884 { 885 int nevents; 886 uint32_t events; 887 nxt_int_t i; 888 nxt_err_t err; 889 nxt_bool_t error; 890 nxt_uint_t level; 891 nxt_fd_event_t *ev; 892 struct epoll_event *event; 893 894 if (engine->u.epoll.nchanges != 0) { 895 if (nxt_epoll_commit_changes(engine) != NXT_OK) { 896 /* Error handlers have been enqueued on failure. */ 897 timeout = 0; 898 } 899 } 900 901 nxt_debug(&engine->task, "epoll_wait(%d) timeout:%M", 902 engine->u.epoll.fd, timeout); 903 904 nevents = epoll_wait(engine->u.epoll.fd, engine->u.epoll.events, 905 engine->u.epoll.mevents, timeout); 906 907 err = (nevents == -1) ? nxt_errno : 0; 908 909 nxt_thread_time_update(engine->task.thread); 910 911 nxt_debug(&engine->task, "epoll_wait(%d): %d", engine->u.epoll.fd, nevents); 912 913 if (nevents == -1) { 914 level = (err == NXT_EINTR) ? NXT_LOG_INFO : NXT_LOG_CRIT; 915 916 nxt_log(&engine->task, level, "epoll_wait(%d) failed %E", 917 engine->u.epoll.fd, err); 918 919 return; 920 } 921 922 for (i = 0; i < nevents; i++) { 923 924 event = &engine->u.epoll.events[i]; 925 events = event->events; 926 ev = event->data.ptr; 927 928 nxt_debug(ev->task, "epoll: fd:%d ev:%04XD d:%p rd:%d wr:%d", 929 ev->fd, events, ev, ev->read, ev->write); 930 931 /* 932 * On error epoll may set EPOLLERR and EPOLLHUP only without EPOLLIN or 933 * EPOLLOUT, so the "error" variable enqueues only one active handler. 934 */ 935 error = ((events & (EPOLLERR | EPOLLHUP)) != 0); 936 ev->epoll_error = error; 937 938 #if (NXT_HAVE_EPOLL_EDGE) 939 940 ev->epoll_eof = ((events & EPOLLRDHUP) != 0); 941 942 #endif 943 944 if ((events & EPOLLIN) || error) { 945 ev->read_ready = 1; 946 947 if (ev->read != NXT_EVENT_BLOCKED) { 948 949 if (ev->read == NXT_EVENT_ONESHOT) { 950 ev->read = NXT_EVENT_DISABLED; 951 } 952 953 error = 0; 954 955 nxt_work_queue_add(ev->read_work_queue, ev->read_handler, 956 ev->task, ev, ev->data); 957 958 } else if (engine->u.epoll.mode == 0) { 959 /* Level-triggered mode. */ 960 nxt_epoll_disable_read(engine, ev); 961 } 962 } 963 964 if ((events & EPOLLOUT) || error) { 965 ev->write_ready = 1; 966 967 if (ev->write != NXT_EVENT_BLOCKED) { 968 969 if (ev->write == NXT_EVENT_ONESHOT) { 970 ev->write = NXT_EVENT_DISABLED; 971 } 972 973 error = 0; 974 975 nxt_work_queue_add(ev->write_work_queue, ev->write_handler, 976 ev->task, ev, ev->data); 977 978 } else if (engine->u.epoll.mode == 0) { 979 /* Level-triggered mode. */ 980 nxt_epoll_disable_write(engine, ev); 981 } 982 } 983 984 if (error) { 985 ev->read_ready = 1; 986 ev->write_ready = 1; 987 } 988 } 989 } 990 991 992 #if (NXT_HAVE_ACCEPT4) 993 994 static void 995 nxt_epoll_conn_io_accept4(nxt_task_t *task, void *obj, void *data) 996 { 997 socklen_t socklen; 998 nxt_conn_t *c; 999 nxt_socket_t s; 1000 struct sockaddr *sa; 1001 nxt_listen_event_t *lev; 1002 1003 lev = obj; 1004 c = lev->next; 1005 1006 lev->ready--; 1007 lev->socket.read_ready = (lev->ready != 0); 1008 1009 sa = &c->remote->u.sockaddr; 1010 socklen = c->remote->socklen; 1011 /* 1012 * The returned socklen is ignored here, 1013 * see comment in nxt_conn_io_accept(). 1014 */ 1015 s = accept4(lev->socket.fd, sa, &socklen, SOCK_NONBLOCK); 1016 1017 if (s != -1) { 1018 c->socket.fd = s; 1019 1020 nxt_debug(task, "accept4(%d): %d", lev->socket.fd, s); 1021 1022 nxt_conn_accept(task, lev, c); 1023 return; 1024 } 1025 1026 nxt_conn_accept_error(task, lev, "accept4", nxt_errno); 1027 } 1028 1029 #endif 1030 1031 1032 #if (NXT_HAVE_EPOLL_EDGE) 1033 1034 /* 1035 * nxt_epoll_edge_event_conn_io_connect() eliminates the getsockopt() 1036 * syscall to test pending connect() error. Although this special 1037 * interface can work in both edge-triggered and level-triggered 1038 * modes it is enabled only for the former mode because this mode is 1039 * available in all modern Linux distributions. For the latter mode 1040 * it is required to create additional nxt_epoll_level_event_conn_io 1041 * with single non-generic connect() interface. 1042 */ 1043 1044 static void 1045 nxt_epoll_edge_conn_io_connect(nxt_task_t *task, void *obj, void *data) 1046 { 1047 nxt_conn_t *c; 1048 nxt_event_engine_t *engine; 1049 nxt_work_handler_t handler; 1050 const nxt_event_conn_state_t *state; 1051 1052 c = obj; 1053 1054 state = c->write_state; 1055 1056 switch (nxt_socket_connect(task, c->socket.fd, c->remote) ){ 1057 1058 case NXT_OK: 1059 c->socket.write_ready = 1; 1060 handler = state->ready_handler; 1061 break; 1062 1063 case NXT_AGAIN: 1064 c->socket.write_handler = nxt_epoll_edge_conn_connected; 1065 c->socket.error_handler = nxt_conn_connect_error; 1066 1067 engine = task->thread->engine; 1068 nxt_conn_timer(engine, c, state, &c->write_timer); 1069 1070 nxt_epoll_enable(engine, &c->socket); 1071 c->socket.read = NXT_EVENT_BLOCKED; 1072 return; 1073 1074 #if 0 1075 case NXT_AGAIN: 1076 nxt_conn_timer(engine, c, state, &c->write_timer); 1077 1078 /* Fall through. */ 1079 1080 case NXT_OK: 1081 /* 1082 * Mark both read and write directions as ready and try to perform 1083 * I/O operations before receiving readiness notifications. 1084 * On unconnected socket Linux send() and recv() return EAGAIN 1085 * instead of ENOTCONN. 1086 */ 1087 c->socket.read_ready = 1; 1088 c->socket.write_ready = 1; 1089 /* 1090 * Enabling both read and write notifications on a getting 1091 * connected socket eliminates one epoll_ctl() syscall. 1092 */ 1093 c->socket.write_handler = nxt_epoll_edge_event_conn_connected; 1094 c->socket.error_handler = state->error_handler; 1095 1096 nxt_epoll_enable(engine, &c->socket); 1097 c->socket.read = NXT_EVENT_BLOCKED; 1098 1099 handler = state->ready_handler; 1100 break; 1101 #endif 1102 1103 case NXT_ERROR: 1104 handler = state->error_handler; 1105 break; 1106 1107 default: /* NXT_DECLINED: connection refused. */ 1108 handler = state->close_handler; 1109 break; 1110 } 1111 1112 nxt_work_queue_add(c->write_work_queue, handler, task, c, data); 1113 } 1114 1115 1116 static void 1117 nxt_epoll_edge_conn_connected(nxt_task_t *task, void *obj, void *data) 1118 { 1119 nxt_conn_t *c; 1120 1121 c = obj; 1122 1123 nxt_debug(task, "epoll event conn connected fd:%d", c->socket.fd); 1124 1125 if (!c->socket.epoll_error) { 1126 c->socket.write = NXT_EVENT_BLOCKED; 1127 1128 if (c->write_state->timer_autoreset) { 1129 nxt_timer_disable(task->thread->engine, &c->write_timer); 1130 } 1131 1132 nxt_work_queue_add(c->write_work_queue, c->write_state->ready_handler, 1133 task, c, data); 1134 return; 1135 } 1136 1137 nxt_conn_connect_test(task, c, data); 1138 } 1139 1140 1141 /* 1142 * nxt_epoll_edge_conn_io_recvbuf() is just wrapper around 1143 * standard nxt_conn_io_recvbuf() to enforce to read a pending EOF 1144 * in edge-triggered mode. 1145 */ 1146 1147 static ssize_t 1148 nxt_epoll_edge_conn_io_recvbuf(nxt_conn_t *c, nxt_buf_t *b) 1149 { 1150 ssize_t n; 1151 1152 n = nxt_conn_io_recvbuf(c, b); 1153 1154 if (n > 0 && c->socket.epoll_eof) { 1155 c->socket.read_ready = 1; 1156 } 1157 1158 return n; 1159 } 1160 1161 #endif 1162