1 2 /* 3 * Copyright (C) Igor Sysoev 4 * Copyright (C) NGINX, Inc. 5 */ 6 7 #include <nxt_main.h> 8 9 10 /* 11 * The first epoll version has been introduced in Linux 2.5.44. The 12 * interface was changed several times since then and the final version 13 * of epoll_create(), epoll_ctl(), epoll_wait(), and EPOLLET mode has 14 * been introduced in Linux 2.6.0 and is supported since glibc 2.3.2. 15 * 16 * EPOLLET mode did not work reliable in early implementaions and in 17 * Linux 2.4 backport. 18 * 19 * EPOLLONESHOT Linux 2.6.2, glibc 2.3. 20 * EPOLLRDHUP Linux 2.6.17, glibc 2.8. 21 * epoll_pwait() Linux 2.6.19, glibc 2.6. 22 * signalfd() Linux 2.6.22, glibc 2.7. 23 * eventfd() Linux 2.6.22, glibc 2.7. 24 * timerfd_create() Linux 2.6.25, glibc 2.8. 25 * epoll_create1() Linux 2.6.27, glibc 2.9. 26 * signalfd4() Linux 2.6.27, glibc 2.9. 27 * eventfd2() Linux 2.6.27, glibc 2.9. 28 * accept4() Linux 2.6.28, glibc 2.10. 29 * eventfd2(EFD_SEMAPHORE) Linux 2.6.30, glibc 2.10. 30 * EPOLLEXCLUSIVE Linux 4.5, glibc 2.24. 31 */ 32 33 34 #if (NXT_HAVE_EPOLL_EDGE) 35 static nxt_int_t nxt_epoll_edge_create(nxt_event_engine_t *engine, 36 nxt_uint_t mchanges, nxt_uint_t mevents); 37 #endif 38 static nxt_int_t nxt_epoll_level_create(nxt_event_engine_t *engine, 39 nxt_uint_t mchanges, nxt_uint_t mevents); 40 static nxt_int_t nxt_epoll_create(nxt_event_engine_t *engine, 41 nxt_uint_t mchanges, nxt_uint_t mevents, nxt_conn_io_t *io, uint32_t mode); 42 static void nxt_epoll_test_accept4(nxt_event_engine_t *engine, 43 nxt_conn_io_t *io); 44 static void nxt_epoll_free(nxt_event_engine_t *engine); 45 static void nxt_epoll_enable(nxt_event_engine_t *engine, nxt_fd_event_t *ev); 46 static void nxt_epoll_disable(nxt_event_engine_t *engine, nxt_fd_event_t *ev); 47 static void nxt_epoll_delete(nxt_event_engine_t *engine, nxt_fd_event_t *ev); 48 static nxt_bool_t nxt_epoll_close(nxt_event_engine_t *engine, 49 nxt_fd_event_t *ev); 50 static void nxt_epoll_enable_read(nxt_event_engine_t *engine, 51 nxt_fd_event_t *ev); 52 static void nxt_epoll_enable_write(nxt_event_engine_t *engine, 53 nxt_fd_event_t *ev); 54 static void nxt_epoll_disable_read(nxt_event_engine_t *engine, 55 nxt_fd_event_t *ev); 56 static void nxt_epoll_disable_write(nxt_event_engine_t *engine, 57 nxt_fd_event_t *ev); 58 static void nxt_epoll_block_read(nxt_event_engine_t *engine, 59 nxt_fd_event_t *ev); 60 static void nxt_epoll_block_write(nxt_event_engine_t *engine, 61 nxt_fd_event_t *ev); 62 static void nxt_epoll_oneshot_read(nxt_event_engine_t *engine, 63 nxt_fd_event_t *ev); 64 static void nxt_epoll_oneshot_write(nxt_event_engine_t *engine, 65 nxt_fd_event_t *ev); 66 static void nxt_epoll_enable_accept(nxt_event_engine_t *engine, 67 nxt_fd_event_t *ev); 68 static void nxt_epoll_change(nxt_event_engine_t *engine, nxt_fd_event_t *ev, 69 int op, uint32_t events); 70 static nxt_int_t nxt_epoll_commit_changes(nxt_event_engine_t *engine); 71 static void nxt_epoll_error_handler(nxt_task_t *task, void *obj, void *data); 72 #if (NXT_HAVE_SIGNALFD) 73 static nxt_int_t nxt_epoll_add_signal(nxt_event_engine_t *engine); 74 static void nxt_epoll_signalfd_handler(nxt_task_t *task, void *obj, void *data); 75 #endif 76 #if (NXT_HAVE_EVENTFD) 77 static nxt_int_t nxt_epoll_enable_post(nxt_event_engine_t *engine, 78 nxt_work_handler_t handler); 79 static void nxt_epoll_eventfd_handler(nxt_task_t *task, void *obj, void *data); 80 static void nxt_epoll_signal(nxt_event_engine_t *engine, nxt_uint_t signo); 81 #endif 82 static void nxt_epoll_poll(nxt_event_engine_t *engine, nxt_msec_t timeout); 83 84 #if (NXT_HAVE_ACCEPT4) 85 static void nxt_epoll_conn_io_accept4(nxt_task_t *task, void *obj, 86 void *data); 87 #endif 88 89 90 #if (NXT_HAVE_EPOLL_EDGE) 91 92 static void nxt_epoll_edge_conn_io_connect(nxt_task_t *task, void *obj, 93 void *data); 94 static void nxt_epoll_edge_conn_connected(nxt_task_t *task, void *obj, 95 void *data); 96 static ssize_t nxt_epoll_edge_conn_io_recvbuf(nxt_conn_t *c, nxt_buf_t *b); 97 98 99 static nxt_conn_io_t nxt_epoll_edge_conn_io = { 100 nxt_epoll_edge_conn_io_connect, 101 nxt_conn_io_accept, 102 103 nxt_conn_io_read, 104 nxt_epoll_edge_conn_io_recvbuf, 105 nxt_conn_io_recv, 106 107 nxt_conn_io_write, 108 nxt_event_conn_io_write_chunk, 109 110 #if (NXT_HAVE_LINUX_SENDFILE) 111 nxt_linux_event_conn_io_sendfile, 112 #else 113 nxt_event_conn_io_sendbuf, 114 #endif 115 116 nxt_event_conn_io_writev, 117 nxt_event_conn_io_send, 118 119 nxt_conn_io_shutdown, 120 }; 121 122 123 const nxt_event_interface_t nxt_epoll_edge_engine = { 124 "epoll_edge", 125 nxt_epoll_edge_create, 126 nxt_epoll_free, 127 nxt_epoll_enable, 128 nxt_epoll_disable, 129 nxt_epoll_delete, 130 nxt_epoll_close, 131 nxt_epoll_enable_read, 132 nxt_epoll_enable_write, 133 nxt_epoll_disable_read, 134 nxt_epoll_disable_write, 135 nxt_epoll_block_read, 136 nxt_epoll_block_write, 137 nxt_epoll_oneshot_read, 138 nxt_epoll_oneshot_write, 139 nxt_epoll_enable_accept, 140 NULL, 141 NULL, 142 #if (NXT_HAVE_EVENTFD) 143 nxt_epoll_enable_post, 144 nxt_epoll_signal, 145 #else 146 NULL, 147 NULL, 148 #endif 149 nxt_epoll_poll, 150 151 &nxt_epoll_edge_conn_io, 152 153 #if (NXT_HAVE_INOTIFY) 154 NXT_FILE_EVENTS, 155 #else 156 NXT_NO_FILE_EVENTS, 157 #endif 158 159 #if (NXT_HAVE_SIGNALFD) 160 NXT_SIGNAL_EVENTS, 161 #else 162 NXT_NO_SIGNAL_EVENTS, 163 #endif 164 }; 165 166 #endif 167 168 169 const nxt_event_interface_t nxt_epoll_level_engine = { 170 "epoll_level", 171 nxt_epoll_level_create, 172 nxt_epoll_free, 173 nxt_epoll_enable, 174 nxt_epoll_disable, 175 nxt_epoll_delete, 176 nxt_epoll_close, 177 nxt_epoll_enable_read, 178 nxt_epoll_enable_write, 179 nxt_epoll_disable_read, 180 nxt_epoll_disable_write, 181 nxt_epoll_block_read, 182 nxt_epoll_block_write, 183 nxt_epoll_oneshot_read, 184 nxt_epoll_oneshot_write, 185 nxt_epoll_enable_accept, 186 NULL, 187 NULL, 188 #if (NXT_HAVE_EVENTFD) 189 nxt_epoll_enable_post, 190 nxt_epoll_signal, 191 #else 192 NULL, 193 NULL, 194 #endif 195 nxt_epoll_poll, 196 197 &nxt_unix_conn_io, 198 199 #if (NXT_HAVE_INOTIFY) 200 NXT_FILE_EVENTS, 201 #else 202 NXT_NO_FILE_EVENTS, 203 #endif 204 205 #if (NXT_HAVE_SIGNALFD) 206 NXT_SIGNAL_EVENTS, 207 #else 208 NXT_NO_SIGNAL_EVENTS, 209 #endif 210 }; 211 212 213 #if (NXT_HAVE_EPOLL_EDGE) 214 215 static nxt_int_t 216 nxt_epoll_edge_create(nxt_event_engine_t *engine, nxt_uint_t mchanges, 217 nxt_uint_t mevents) 218 { 219 return nxt_epoll_create(engine, mchanges, mevents, &nxt_epoll_edge_conn_io, 220 EPOLLET | EPOLLRDHUP); 221 } 222 223 #endif 224 225 226 static nxt_int_t 227 nxt_epoll_level_create(nxt_event_engine_t *engine, nxt_uint_t mchanges, 228 nxt_uint_t mevents) 229 { 230 return nxt_epoll_create(engine, mchanges, mevents, 231 &nxt_unix_conn_io, 0); 232 } 233 234 235 static nxt_int_t 236 nxt_epoll_create(nxt_event_engine_t *engine, nxt_uint_t mchanges, 237 nxt_uint_t mevents, nxt_conn_io_t *io, uint32_t mode) 238 { 239 engine->u.epoll.fd = -1; 240 engine->u.epoll.mode = mode; 241 engine->u.epoll.mchanges = mchanges; 242 engine->u.epoll.mevents = mevents; 243 #if (NXT_HAVE_SIGNALFD) 244 engine->u.epoll.signalfd.fd = -1; 245 #endif 246 247 engine->u.epoll.changes = nxt_malloc(sizeof(nxt_epoll_change_t) * mchanges); 248 if (engine->u.epoll.changes == NULL) { 249 goto fail; 250 } 251 252 engine->u.epoll.events = nxt_malloc(sizeof(struct epoll_event) * mevents); 253 if (engine->u.epoll.events == NULL) { 254 goto fail; 255 } 256 257 engine->u.epoll.fd = epoll_create(1); 258 if (engine->u.epoll.fd == -1) { 259 nxt_log(&engine->task, NXT_LOG_CRIT, "epoll_create() failed %E", 260 nxt_errno); 261 goto fail; 262 } 263 264 nxt_debug(&engine->task, "epoll_create(): %d", engine->u.epoll.fd); 265 266 if (engine->signals != NULL) { 267 268 #if (NXT_HAVE_SIGNALFD) 269 270 if (nxt_epoll_add_signal(engine) != NXT_OK) { 271 goto fail; 272 } 273 274 #endif 275 276 nxt_epoll_test_accept4(engine, io); 277 } 278 279 return NXT_OK; 280 281 fail: 282 283 nxt_epoll_free(engine); 284 285 return NXT_ERROR; 286 } 287 288 289 static void 290 nxt_epoll_test_accept4(nxt_event_engine_t *engine, nxt_conn_io_t *io) 291 { 292 static nxt_work_handler_t handler; 293 294 if (handler == NULL) { 295 296 handler = io->accept; 297 298 #if (NXT_HAVE_ACCEPT4) 299 300 (void) accept4(-1, NULL, NULL, SOCK_NONBLOCK); 301 302 if (nxt_errno != NXT_ENOSYS) { 303 handler = nxt_epoll_conn_io_accept4; 304 305 } else { 306 nxt_log(&engine->task, NXT_LOG_INFO, "accept4() failed %E", 307 NXT_ENOSYS); 308 } 309 310 #endif 311 } 312 313 io->accept = handler; 314 } 315 316 317 static void 318 nxt_epoll_free(nxt_event_engine_t *engine) 319 { 320 int fd; 321 322 nxt_debug(&engine->task, "epoll %d free", engine->u.epoll.fd); 323 324 #if (NXT_HAVE_SIGNALFD) 325 326 fd = engine->u.epoll.signalfd.fd; 327 328 if (fd != -1 && close(fd) != 0) { 329 nxt_log(&engine->task, NXT_LOG_CRIT, "signalfd close(%d) failed %E", 330 fd, nxt_errno); 331 } 332 333 #endif 334 335 #if (NXT_HAVE_EVENTFD) 336 337 fd = engine->u.epoll.eventfd.fd; 338 339 if (fd != -1 && close(fd) != 0) { 340 nxt_log(&engine->task, NXT_LOG_CRIT, "eventfd close(%d) failed %E", 341 fd, nxt_errno); 342 } 343 344 #endif 345 346 fd = engine->u.epoll.fd; 347 348 if (fd != -1 && close(fd) != 0) { 349 nxt_log(&engine->task, NXT_LOG_CRIT, "epoll close(%d) failed %E", 350 fd, nxt_errno); 351 } 352 353 nxt_free(engine->u.epoll.events); 354 nxt_free(engine->u.epoll.changes); 355 356 nxt_memzero(&engine->u.epoll, sizeof(nxt_epoll_engine_t)); 357 } 358 359 360 static void 361 nxt_epoll_enable(nxt_event_engine_t *engine, nxt_fd_event_t *ev) 362 { 363 ev->read = NXT_EVENT_ACTIVE; 364 ev->write = NXT_EVENT_ACTIVE; 365 366 nxt_epoll_change(engine, ev, EPOLL_CTL_ADD, 367 EPOLLIN | EPOLLOUT | engine->u.epoll.mode); 368 } 369 370 371 static void 372 nxt_epoll_disable(nxt_event_engine_t *engine, nxt_fd_event_t *ev) 373 { 374 if (ev->read > NXT_EVENT_DISABLED || ev->write > NXT_EVENT_DISABLED) { 375 376 ev->read = NXT_EVENT_INACTIVE; 377 ev->write = NXT_EVENT_INACTIVE; 378 379 nxt_epoll_change(engine, ev, EPOLL_CTL_DEL, 0); 380 } 381 } 382 383 384 static void 385 nxt_epoll_delete(nxt_event_engine_t *engine, nxt_fd_event_t *ev) 386 { 387 if (ev->read != NXT_EVENT_INACTIVE || ev->write != NXT_EVENT_INACTIVE) { 388 389 ev->read = NXT_EVENT_INACTIVE; 390 ev->write = NXT_EVENT_INACTIVE; 391 392 nxt_epoll_change(engine, ev, EPOLL_CTL_DEL, 0); 393 } 394 } 395 396 397 /* 398 * Although calling close() on a file descriptor will remove any epoll 399 * events that reference the descriptor, in this case the close() acquires 400 * the kernel global "epmutex" while epoll_ctl(EPOLL_CTL_DEL) does not 401 * acquire the "epmutex" since Linux 3.13 if the file descriptor presents 402 * only in one epoll set. Thus removing events explicitly before closing 403 * eliminates possible lock contention. 404 */ 405 406 static nxt_bool_t 407 nxt_epoll_close(nxt_event_engine_t *engine, nxt_fd_event_t *ev) 408 { 409 nxt_epoll_delete(engine, ev); 410 411 return ev->changing; 412 } 413 414 415 static void 416 nxt_epoll_enable_read(nxt_event_engine_t *engine, nxt_fd_event_t *ev) 417 { 418 int op; 419 uint32_t events; 420 421 if (ev->read != NXT_EVENT_BLOCKED) { 422 423 op = EPOLL_CTL_MOD; 424 events = EPOLLIN | engine->u.epoll.mode; 425 426 if (ev->read == NXT_EVENT_INACTIVE && ev->write == NXT_EVENT_INACTIVE) { 427 op = EPOLL_CTL_ADD; 428 429 } else if (ev->write >= NXT_EVENT_BLOCKED) { 430 events |= EPOLLOUT; 431 } 432 433 nxt_epoll_change(engine, ev, op, events); 434 } 435 436 ev->read = NXT_EVENT_ACTIVE; 437 } 438 439 440 static void 441 nxt_epoll_enable_write(nxt_event_engine_t *engine, nxt_fd_event_t *ev) 442 { 443 int op; 444 uint32_t events; 445 446 if (ev->write != NXT_EVENT_BLOCKED) { 447 448 op = EPOLL_CTL_MOD; 449 events = EPOLLOUT | engine->u.epoll.mode; 450 451 if (ev->read == NXT_EVENT_INACTIVE && ev->write == NXT_EVENT_INACTIVE) { 452 op = EPOLL_CTL_ADD; 453 454 } else if (ev->read >= NXT_EVENT_BLOCKED) { 455 events |= EPOLLIN; 456 } 457 458 nxt_epoll_change(engine, ev, op, events); 459 } 460 461 ev->write = NXT_EVENT_ACTIVE; 462 } 463 464 465 static void 466 nxt_epoll_disable_read(nxt_event_engine_t *engine, nxt_fd_event_t *ev) 467 { 468 int op; 469 uint32_t events; 470 471 ev->read = NXT_EVENT_INACTIVE; 472 473 if (ev->write <= NXT_EVENT_DISABLED) { 474 ev->write = NXT_EVENT_INACTIVE; 475 op = EPOLL_CTL_DEL; 476 events = 0; 477 478 } else { 479 op = EPOLL_CTL_MOD; 480 events = EPOLLOUT | engine->u.epoll.mode; 481 } 482 483 nxt_epoll_change(engine, ev, op, events); 484 } 485 486 487 static void 488 nxt_epoll_disable_write(nxt_event_engine_t *engine, nxt_fd_event_t *ev) 489 { 490 int op; 491 uint32_t events; 492 493 ev->write = NXT_EVENT_INACTIVE; 494 495 if (ev->read <= NXT_EVENT_DISABLED) { 496 ev->write = NXT_EVENT_INACTIVE; 497 op = EPOLL_CTL_DEL; 498 events = 0; 499 500 } else { 501 op = EPOLL_CTL_MOD; 502 events = EPOLLIN | engine->u.epoll.mode; 503 } 504 505 nxt_epoll_change(engine, ev, op, events); 506 } 507 508 509 static void 510 nxt_epoll_block_read(nxt_event_engine_t *engine, nxt_fd_event_t *ev) 511 { 512 if (ev->read != NXT_EVENT_INACTIVE) { 513 ev->read = NXT_EVENT_BLOCKED; 514 } 515 } 516 517 518 static void 519 nxt_epoll_block_write(nxt_event_engine_t *engine, nxt_fd_event_t *ev) 520 { 521 if (ev->write != NXT_EVENT_INACTIVE) { 522 ev->write = NXT_EVENT_BLOCKED; 523 } 524 } 525 526 527 /* 528 * NXT_EVENT_DISABLED state is used to track whether EPOLLONESHOT 529 * event should be added or modified, epoll_ctl(2): 530 * 531 * EPOLLONESHOT (since Linux 2.6.2) 532 * Sets the one-shot behavior for the associated file descriptor. 533 * This means that after an event is pulled out with epoll_wait(2) 534 * the associated file descriptor is internally disabled and no 535 * other events will be reported by the epoll interface. The user 536 * must call epoll_ctl() with EPOLL_CTL_MOD to rearm the file 537 * descriptor with a new event mask. 538 */ 539 540 static void 541 nxt_epoll_oneshot_read(nxt_event_engine_t *engine, nxt_fd_event_t *ev) 542 { 543 int op; 544 545 op = (ev->read == NXT_EVENT_INACTIVE && ev->write == NXT_EVENT_INACTIVE) ? 546 EPOLL_CTL_ADD : EPOLL_CTL_MOD; 547 548 ev->read = NXT_EVENT_ONESHOT; 549 ev->write = NXT_EVENT_INACTIVE; 550 551 nxt_epoll_change(engine, ev, op, EPOLLIN | EPOLLONESHOT); 552 } 553 554 555 static void 556 nxt_epoll_oneshot_write(nxt_event_engine_t *engine, nxt_fd_event_t *ev) 557 { 558 int op; 559 560 op = (ev->read == NXT_EVENT_INACTIVE && ev->write == NXT_EVENT_INACTIVE) ? 561 EPOLL_CTL_ADD : EPOLL_CTL_MOD; 562 563 ev->read = NXT_EVENT_INACTIVE; 564 ev->write = NXT_EVENT_ONESHOT; 565 566 nxt_epoll_change(engine, ev, op, EPOLLOUT | EPOLLONESHOT); 567 } 568 569 570 static void 571 nxt_epoll_enable_accept(nxt_event_engine_t *engine, nxt_fd_event_t *ev) 572 { 573 uint32_t events; 574 575 ev->read = NXT_EVENT_ACTIVE; 576 577 events = EPOLLIN; 578 579 #ifdef EPOLLEXCLUSIVE 580 events |= EPOLLEXCLUSIVE; 581 #endif 582 583 nxt_epoll_change(engine, ev, EPOLL_CTL_ADD, events); 584 } 585 586 587 /* 588 * epoll changes are batched to improve instruction and data cache 589 * locality of several epoll_ctl() calls followed by epoll_wait() call. 590 */ 591 592 static void 593 nxt_epoll_change(nxt_event_engine_t *engine, nxt_fd_event_t *ev, int op, 594 uint32_t events) 595 { 596 nxt_epoll_change_t *change; 597 598 nxt_debug(ev->task, "epoll %d set event: fd:%d op:%d ev:%XD", 599 engine->u.epoll.fd, ev->fd, op, events); 600 601 if (engine->u.epoll.nchanges >= engine->u.epoll.mchanges) { 602 (void) nxt_epoll_commit_changes(engine); 603 } 604 605 ev->changing = 1; 606 607 change = &engine->u.epoll.changes[engine->u.epoll.nchanges++]; 608 change->op = op; 609 change->event.events = events; 610 change->event.data.ptr = ev; 611 } 612 613 614 static nxt_int_t 615 nxt_epoll_commit_changes(nxt_event_engine_t *engine) 616 { 617 int ret; 618 nxt_int_t retval; 619 nxt_fd_event_t *ev; 620 nxt_epoll_change_t *change, *end; 621 622 nxt_debug(&engine->task, "epoll %d changes:%ui", 623 engine->u.epoll.fd, engine->u.epoll.nchanges); 624 625 retval = NXT_OK; 626 change = engine->u.epoll.changes; 627 end = change + engine->u.epoll.nchanges; 628 629 do { 630 ev = change->event.data.ptr; 631 ev->changing = 0; 632 633 nxt_debug(ev->task, "epoll_ctl(%d): fd:%d op:%d ev:%XD", 634 engine->u.epoll.fd, ev->fd, change->op, 635 change->event.events); 636 637 ret = epoll_ctl(engine->u.epoll.fd, change->op, ev->fd, &change->event); 638 639 if (nxt_slow_path(ret != 0)) { 640 nxt_log(ev->task, NXT_LOG_CRIT, "epoll_ctl(%d, %d, %d) failed %E", 641 engine->u.epoll.fd, change->op, ev->fd, nxt_errno); 642 643 nxt_work_queue_add(&engine->fast_work_queue, 644 nxt_epoll_error_handler, ev->task, ev, ev->data); 645 646 retval = NXT_ERROR; 647 } 648 649 change++; 650 651 } while (change < end); 652 653 engine->u.epoll.nchanges = 0; 654 655 return retval; 656 } 657 658 659 static void 660 nxt_epoll_error_handler(nxt_task_t *task, void *obj, void *data) 661 { 662 nxt_fd_event_t *ev; 663 664 ev = obj; 665 666 ev->read = NXT_EVENT_INACTIVE; 667 ev->write = NXT_EVENT_INACTIVE; 668 669 ev->error_handler(ev->task, ev, data); 670 } 671 672 673 #if (NXT_HAVE_SIGNALFD) 674 675 static nxt_int_t 676 nxt_epoll_add_signal(nxt_event_engine_t *engine) 677 { 678 int fd; 679 struct epoll_event ee; 680 681 if (sigprocmask(SIG_BLOCK, &engine->signals->sigmask, NULL) != 0) { 682 nxt_log(&engine->task, NXT_LOG_CRIT, 683 "sigprocmask(SIG_BLOCK) failed %E", nxt_errno); 684 return NXT_ERROR; 685 } 686 687 /* 688 * Glibc signalfd() wrapper always has the flags argument. Glibc 2.7 689 * and 2.8 signalfd() wrappers call the original signalfd() syscall 690 * without the flags argument. Glibc 2.9+ signalfd() wrapper at first 691 * tries to call signalfd4() syscall and if it fails then calls the 692 * original signalfd() syscall. For this reason the non-blocking mode 693 * is set separately. 694 */ 695 696 fd = signalfd(-1, &engine->signals->sigmask, 0); 697 698 if (fd == -1) { 699 nxt_log(&engine->task, NXT_LOG_CRIT, "signalfd(%d) failed %E", 700 engine->u.epoll.signalfd.fd, nxt_errno); 701 return NXT_ERROR; 702 } 703 704 engine->u.epoll.signalfd.fd = fd; 705 706 if (nxt_fd_nonblocking(&engine->task, fd) != NXT_OK) { 707 return NXT_ERROR; 708 } 709 710 nxt_debug(&engine->task, "signalfd(): %d", fd); 711 712 engine->u.epoll.signalfd.data = engine->signals->handler; 713 engine->u.epoll.signalfd.read_work_queue = &engine->fast_work_queue; 714 engine->u.epoll.signalfd.read_handler = nxt_epoll_signalfd_handler; 715 engine->u.epoll.signalfd.log = engine->task.log; 716 engine->u.epoll.signalfd.task = &engine->task; 717 718 ee.events = EPOLLIN; 719 ee.data.ptr = &engine->u.epoll.signalfd; 720 721 if (epoll_ctl(engine->u.epoll.fd, EPOLL_CTL_ADD, fd, &ee) != 0) { 722 nxt_log(&engine->task, NXT_LOG_CRIT, "epoll_ctl(%d, %d, %d) failed %E", 723 engine->u.epoll.fd, EPOLL_CTL_ADD, fd, nxt_errno); 724 725 return NXT_ERROR; 726 } 727 728 return NXT_OK; 729 } 730 731 732 static void 733 nxt_epoll_signalfd_handler(nxt_task_t *task, void *obj, void *data) 734 { 735 int n; 736 nxt_fd_event_t *ev; 737 nxt_work_handler_t handler; 738 struct signalfd_siginfo sfd; 739 740 ev = obj; 741 handler = data; 742 743 nxt_debug(task, "signalfd handler"); 744 745 n = read(ev->fd, &sfd, sizeof(struct signalfd_siginfo)); 746 747 nxt_debug(task, "read signalfd(%d): %d", ev->fd, n); 748 749 if (n != sizeof(struct signalfd_siginfo)) { 750 nxt_log(task, NXT_LOG_CRIT, "read signalfd(%d) failed %E", 751 ev->fd, nxt_errno); 752 } 753 754 nxt_debug(task, "signalfd(%d) signo:%d", ev->fd, sfd.ssi_signo); 755 756 handler(task, (void *) (uintptr_t) sfd.ssi_signo, NULL); 757 } 758 759 #endif 760 761 762 #if (NXT_HAVE_EVENTFD) 763 764 static nxt_int_t 765 nxt_epoll_enable_post(nxt_event_engine_t *engine, nxt_work_handler_t handler) 766 { 767 int ret; 768 struct epoll_event ee; 769 770 engine->u.epoll.post_handler = handler; 771 772 /* 773 * Glibc eventfd() wrapper always has the flags argument. Glibc 2.7 774 * and 2.8 eventfd() wrappers call the original eventfd() syscall 775 * without the flags argument. Glibc 2.9+ eventfd() wrapper at first 776 * tries to call eventfd2() syscall and if it fails then calls the 777 * original eventfd() syscall. For this reason the non-blocking mode 778 * is set separately. 779 */ 780 781 engine->u.epoll.eventfd.fd = eventfd(0, 0); 782 783 if (engine->u.epoll.eventfd.fd == -1) { 784 nxt_log(&engine->task, NXT_LOG_CRIT, "eventfd() failed %E", nxt_errno); 785 return NXT_ERROR; 786 } 787 788 ret = nxt_fd_nonblocking(&engine->task, engine->u.epoll.eventfd.fd); 789 if (nxt_slow_path(ret != NXT_OK)) { 790 return NXT_ERROR; 791 } 792 793 nxt_debug(&engine->task, "eventfd(): %d", engine->u.epoll.eventfd.fd); 794 795 engine->u.epoll.eventfd.read_work_queue = &engine->fast_work_queue; 796 engine->u.epoll.eventfd.read_handler = nxt_epoll_eventfd_handler; 797 engine->u.epoll.eventfd.data = engine; 798 engine->u.epoll.eventfd.log = engine->task.log; 799 engine->u.epoll.eventfd.task = &engine->task; 800 801 ee.events = EPOLLIN | EPOLLET; 802 ee.data.ptr = &engine->u.epoll.eventfd; 803 804 ret = epoll_ctl(engine->u.epoll.fd, EPOLL_CTL_ADD, 805 engine->u.epoll.eventfd.fd, &ee); 806 807 if (nxt_fast_path(ret == 0)) { 808 return NXT_OK; 809 } 810 811 nxt_log(&engine->task, NXT_LOG_CRIT, "epoll_ctl(%d, %d, %d) failed %E", 812 engine->u.epoll.fd, EPOLL_CTL_ADD, engine->u.epoll.eventfd.fd, 813 nxt_errno); 814 815 return NXT_ERROR; 816 } 817 818 819 static void 820 nxt_epoll_eventfd_handler(nxt_task_t *task, void *obj, void *data) 821 { 822 int n; 823 uint64_t events; 824 nxt_event_engine_t *engine; 825 826 engine = data; 827 828 nxt_debug(task, "eventfd handler, times:%ui", engine->u.epoll.neventfd); 829 830 /* 831 * The maximum value after write() to a eventfd() descriptor will 832 * block or return EAGAIN is 0xfffffffffffffffe, so the descriptor 833 * can be read once per many notifications, for example, once per 834 * 2^32-2 noticifcations. Since the eventfd() file descriptor is 835 * always registered in EPOLLET mode, epoll returns event about 836 * only the latest write() to the descriptor. 837 */ 838 839 if (engine->u.epoll.neventfd++ >= 0xfffffffe) { 840 engine->u.epoll.neventfd = 0; 841 842 n = read(engine->u.epoll.eventfd.fd, &events, sizeof(uint64_t)); 843 844 nxt_debug(task, "read(%d): %d events:%uL", 845 engine->u.epoll.eventfd.fd, n, events); 846 847 if (n != sizeof(uint64_t)) { 848 nxt_log(task, NXT_LOG_CRIT, "read eventfd(%d) failed %E", 849 engine->u.epoll.eventfd.fd, nxt_errno); 850 } 851 } 852 853 engine->u.epoll.post_handler(task, NULL, NULL); 854 } 855 856 857 static void 858 nxt_epoll_signal(nxt_event_engine_t *engine, nxt_uint_t signo) 859 { 860 size_t ret; 861 uint64_t event; 862 863 /* 864 * eventfd() presents along with signalfd(), so the function 865 * is used only to post events and the signo argument is ignored. 866 */ 867 868 event = 1; 869 870 ret = write(engine->u.epoll.eventfd.fd, &event, sizeof(uint64_t)); 871 872 if (nxt_slow_path(ret != sizeof(uint64_t))) { 873 nxt_log(&engine->task, NXT_LOG_CRIT, "write(%d) to eventfd failed %E", 874 engine->u.epoll.eventfd.fd, nxt_errno); 875 } 876 } 877 878 #endif 879 880 881 static void 882 nxt_epoll_poll(nxt_event_engine_t *engine, nxt_msec_t timeout) 883 { 884 int nevents; 885 uint32_t events; 886 nxt_int_t i; 887 nxt_err_t err; 888 nxt_bool_t error; 889 nxt_uint_t level; 890 nxt_fd_event_t *ev; 891 struct epoll_event *event; 892 893 if (engine->u.epoll.nchanges != 0) { 894 if (nxt_epoll_commit_changes(engine) != NXT_OK) { 895 /* Error handlers have been enqueued on failure. */ 896 timeout = 0; 897 } 898 } 899 900 nxt_debug(&engine->task, "epoll_wait(%d) timeout:%M", 901 engine->u.epoll.fd, timeout); 902 903 nevents = epoll_wait(engine->u.epoll.fd, engine->u.epoll.events, 904 engine->u.epoll.mevents, timeout); 905 906 err = (nevents == -1) ? nxt_errno : 0; 907 908 nxt_thread_time_update(engine->task.thread); 909 910 nxt_debug(&engine->task, "epoll_wait(%d): %d", engine->u.epoll.fd, nevents); 911 912 if (nevents == -1) { 913 level = (err == NXT_EINTR) ? NXT_LOG_INFO : NXT_LOG_CRIT; 914 915 nxt_log(&engine->task, level, "epoll_wait(%d) failed %E", 916 engine->u.epoll.fd, err); 917 918 return; 919 } 920 921 for (i = 0; i < nevents; i++) { 922 923 event = &engine->u.epoll.events[i]; 924 events = event->events; 925 ev = event->data.ptr; 926 927 nxt_debug(ev->task, "epoll: fd:%d ev:%04XD d:%p rd:%d wr:%d", 928 ev->fd, events, ev, ev->read, ev->write); 929 930 /* 931 * On error epoll may set EPOLLERR and EPOLLHUP only without EPOLLIN or 932 * EPOLLOUT, so the "error" variable enqueues only one active handler. 933 */ 934 error = ((events & (EPOLLERR | EPOLLHUP)) != 0); 935 ev->epoll_error = error; 936 937 #if (NXT_HAVE_EPOLL_EDGE) 938 939 ev->epoll_eof = ((events & EPOLLRDHUP) != 0); 940 941 #endif 942 943 if ((events & EPOLLIN) || error) { 944 ev->read_ready = 1; 945 946 if (ev->read != NXT_EVENT_BLOCKED) { 947 948 if (ev->read == NXT_EVENT_ONESHOT) { 949 ev->read = NXT_EVENT_DISABLED; 950 } 951 952 error = 0; 953 954 nxt_work_queue_add(ev->read_work_queue, ev->read_handler, 955 ev->task, ev, ev->data); 956 957 } else if (engine->u.epoll.mode == 0) { 958 /* Level-triggered mode. */ 959 nxt_epoll_disable_read(engine, ev); 960 } 961 } 962 963 if ((events & EPOLLOUT) || error) { 964 ev->write_ready = 1; 965 966 if (ev->write != NXT_EVENT_BLOCKED) { 967 968 if (ev->write == NXT_EVENT_ONESHOT) { 969 ev->write = NXT_EVENT_DISABLED; 970 } 971 972 error = 0; 973 974 nxt_work_queue_add(ev->write_work_queue, ev->write_handler, 975 ev->task, ev, ev->data); 976 977 } else if (engine->u.epoll.mode == 0) { 978 /* Level-triggered mode. */ 979 nxt_epoll_disable_write(engine, ev); 980 } 981 } 982 983 if (error) { 984 ev->read_ready = 1; 985 ev->write_ready = 1; 986 } 987 } 988 } 989 990 991 #if (NXT_HAVE_ACCEPT4) 992 993 static void 994 nxt_epoll_conn_io_accept4(nxt_task_t *task, void *obj, void *data) 995 { 996 socklen_t socklen; 997 nxt_conn_t *c; 998 nxt_socket_t s; 999 struct sockaddr *sa; 1000 nxt_listen_event_t *lev; 1001 1002 lev = obj; 1003 c = lev->next; 1004 1005 lev->ready--; 1006 lev->socket.read_ready = (lev->ready != 0); 1007 1008 sa = &c->remote->u.sockaddr; 1009 socklen = c->remote->socklen; 1010 /* 1011 * The returned socklen is ignored here, 1012 * see comment in nxt_conn_io_accept(). 1013 */ 1014 s = accept4(lev->socket.fd, sa, &socklen, SOCK_NONBLOCK); 1015 1016 if (s != -1) { 1017 c->socket.fd = s; 1018 1019 nxt_debug(task, "accept4(%d): %d", lev->socket.fd, s); 1020 1021 nxt_conn_accept(task, lev, c); 1022 return; 1023 } 1024 1025 nxt_conn_accept_error(task, lev, "accept4", nxt_errno); 1026 } 1027 1028 #endif 1029 1030 1031 #if (NXT_HAVE_EPOLL_EDGE) 1032 1033 /* 1034 * nxt_epoll_edge_event_conn_io_connect() eliminates the getsockopt() 1035 * syscall to test pending connect() error. Although this special 1036 * interface can work in both edge-triggered and level-triggered 1037 * modes it is enabled only for the former mode because this mode is 1038 * available in all modern Linux distributions. For the latter mode 1039 * it is required to create additional nxt_epoll_level_event_conn_io 1040 * with single non-generic connect() interface. 1041 */ 1042 1043 static void 1044 nxt_epoll_edge_conn_io_connect(nxt_task_t *task, void *obj, void *data) 1045 { 1046 nxt_conn_t *c; 1047 nxt_event_engine_t *engine; 1048 nxt_work_handler_t handler; 1049 const nxt_event_conn_state_t *state; 1050 1051 c = obj; 1052 1053 state = c->write_state; 1054 1055 switch (nxt_socket_connect(task, c->socket.fd, c->remote) ){ 1056 1057 case NXT_OK: 1058 c->socket.write_ready = 1; 1059 handler = state->ready_handler; 1060 break; 1061 1062 case NXT_AGAIN: 1063 c->socket.write_handler = nxt_epoll_edge_conn_connected; 1064 c->socket.error_handler = nxt_conn_connect_error; 1065 1066 engine = task->thread->engine; 1067 nxt_conn_timer(engine, c, state, &c->write_timer); 1068 1069 nxt_epoll_enable(engine, &c->socket); 1070 c->socket.read = NXT_EVENT_BLOCKED; 1071 return; 1072 1073 #if 0 1074 case NXT_AGAIN: 1075 nxt_conn_timer(engine, c, state, &c->write_timer); 1076 1077 /* Fall through. */ 1078 1079 case NXT_OK: 1080 /* 1081 * Mark both read and write directions as ready and try to perform 1082 * I/O operations before receiving readiness notifications. 1083 * On unconnected socket Linux send() and recv() return EAGAIN 1084 * instead of ENOTCONN. 1085 */ 1086 c->socket.read_ready = 1; 1087 c->socket.write_ready = 1; 1088 /* 1089 * Enabling both read and write notifications on a getting 1090 * connected socket eliminates one epoll_ctl() syscall. 1091 */ 1092 c->socket.write_handler = nxt_epoll_edge_event_conn_connected; 1093 c->socket.error_handler = state->error_handler; 1094 1095 nxt_epoll_enable(engine, &c->socket); 1096 c->socket.read = NXT_EVENT_BLOCKED; 1097 1098 handler = state->ready_handler; 1099 break; 1100 #endif 1101 1102 case NXT_ERROR: 1103 handler = state->error_handler; 1104 break; 1105 1106 default: /* NXT_DECLINED: connection refused. */ 1107 handler = state->close_handler; 1108 break; 1109 } 1110 1111 nxt_work_queue_add(c->write_work_queue, handler, task, c, data); 1112 } 1113 1114 1115 static void 1116 nxt_epoll_edge_conn_connected(nxt_task_t *task, void *obj, void *data) 1117 { 1118 nxt_conn_t *c; 1119 1120 c = obj; 1121 1122 nxt_debug(task, "epoll event conn connected fd:%d", c->socket.fd); 1123 1124 if (!c->socket.epoll_error) { 1125 c->socket.write = NXT_EVENT_BLOCKED; 1126 1127 if (c->write_state->timer_autoreset) { 1128 nxt_timer_disable(task->thread->engine, &c->write_timer); 1129 } 1130 1131 nxt_work_queue_add(c->write_work_queue, c->write_state->ready_handler, 1132 task, c, data); 1133 return; 1134 } 1135 1136 nxt_conn_connect_test(task, c, data); 1137 } 1138 1139 1140 /* 1141 * nxt_epoll_edge_conn_io_recvbuf() is just wrapper around 1142 * standard nxt_conn_io_recvbuf() to enforce to read a pending EOF 1143 * in edge-triggered mode. 1144 */ 1145 1146 static ssize_t 1147 nxt_epoll_edge_conn_io_recvbuf(nxt_conn_t *c, nxt_buf_t *b) 1148 { 1149 ssize_t n; 1150 1151 n = nxt_conn_io_recvbuf(c, b); 1152 1153 if (n > 0 && c->socket.epoll_eof) { 1154 c->socket.read_ready = 1; 1155 } 1156 1157 return n; 1158 } 1159 1160 #endif 1161