1 2 /* 3 * Copyright (C) Igor Sysoev 4 * Copyright (C) NGINX, Inc. 5 */ 6 7 #include <nxt_main.h> 8 9 10 /* 11 * The first epoll version has been introduced in Linux 2.5.44. The 12 * interface was changed several times since then and the final version 13 * of epoll_create(), epoll_ctl(), epoll_wait(), and EPOLLET mode has 14 * been introduced in Linux 2.6.0 and is supported since glibc 2.3.2. 15 * 16 * EPOLLET mode did not work reliable in early implementaions and in 17 * Linux 2.4 backport. 18 * 19 * EPOLLONESHOT Linux 2.6.2, glibc 2.3. 20 * EPOLLRDHUP Linux 2.6.17, glibc 2.8. 21 * epoll_pwait() Linux 2.6.19, glibc 2.6. 22 * signalfd() Linux 2.6.22, glibc 2.7. 23 * eventfd() Linux 2.6.22, glibc 2.7. 24 * timerfd_create() Linux 2.6.25, glibc 2.8. 25 * epoll_create1() Linux 2.6.27, glibc 2.9. 26 * signalfd4() Linux 2.6.27, glibc 2.9. 27 * eventfd2() Linux 2.6.27, glibc 2.9. 28 * accept4() Linux 2.6.28, glibc 2.10. 29 * eventfd2(EFD_SEMAPHORE) Linux 2.6.30, glibc 2.10. 30 * EPOLLEXCLUSIVE Linux 4.5. 31 */ 32 33 34 #if (NXT_HAVE_EPOLL_EDGE) 35 static nxt_int_t nxt_epoll_edge_create(nxt_event_engine_t *engine, 36 nxt_uint_t mchanges, nxt_uint_t mevents); 37 #endif 38 static nxt_int_t nxt_epoll_level_create(nxt_event_engine_t *engine, 39 nxt_uint_t mchanges, nxt_uint_t mevents); 40 static nxt_int_t nxt_epoll_create(nxt_event_engine_t *engine, 41 nxt_uint_t mchanges, nxt_uint_t mevents, nxt_conn_io_t *io, uint32_t mode); 42 static void nxt_epoll_test_accept4(nxt_event_engine_t *engine, 43 nxt_conn_io_t *io); 44 static void nxt_epoll_free(nxt_event_engine_t *engine); 45 static void nxt_epoll_enable(nxt_event_engine_t *engine, nxt_fd_event_t *ev); 46 static void nxt_epoll_disable(nxt_event_engine_t *engine, nxt_fd_event_t *ev); 47 static void nxt_epoll_delete(nxt_event_engine_t *engine, nxt_fd_event_t *ev); 48 static nxt_bool_t nxt_epoll_close(nxt_event_engine_t *engine, 49 nxt_fd_event_t *ev); 50 static void nxt_epoll_enable_read(nxt_event_engine_t *engine, 51 nxt_fd_event_t *ev); 52 static void nxt_epoll_enable_write(nxt_event_engine_t *engine, 53 nxt_fd_event_t *ev); 54 static void nxt_epoll_disable_read(nxt_event_engine_t *engine, 55 nxt_fd_event_t *ev); 56 static void nxt_epoll_disable_write(nxt_event_engine_t *engine, 57 nxt_fd_event_t *ev); 58 static void nxt_epoll_block_read(nxt_event_engine_t *engine, 59 nxt_fd_event_t *ev); 60 static void nxt_epoll_block_write(nxt_event_engine_t *engine, 61 nxt_fd_event_t *ev); 62 static void nxt_epoll_oneshot_read(nxt_event_engine_t *engine, 63 nxt_fd_event_t *ev); 64 static void nxt_epoll_oneshot_write(nxt_event_engine_t *engine, 65 nxt_fd_event_t *ev); 66 static void nxt_epoll_enable_accept(nxt_event_engine_t *engine, 67 nxt_fd_event_t *ev); 68 static void nxt_epoll_change(nxt_event_engine_t *engine, nxt_fd_event_t *ev, 69 int op, uint32_t events); 70 static nxt_int_t nxt_epoll_commit_changes(nxt_event_engine_t *engine); 71 static void nxt_epoll_error_handler(nxt_task_t *task, void *obj, void *data); 72 #if (NXT_HAVE_SIGNALFD) 73 static nxt_int_t nxt_epoll_add_signal(nxt_event_engine_t *engine); 74 static void nxt_epoll_signalfd_handler(nxt_task_t *task, void *obj, void *data); 75 #endif 76 #if (NXT_HAVE_EVENTFD) 77 static nxt_int_t nxt_epoll_enable_post(nxt_event_engine_t *engine, 78 nxt_work_handler_t handler); 79 static void nxt_epoll_eventfd_handler(nxt_task_t *task, void *obj, void *data); 80 static void nxt_epoll_signal(nxt_event_engine_t *engine, nxt_uint_t signo); 81 #endif 82 static void nxt_epoll_poll(nxt_event_engine_t *engine, nxt_msec_t timeout); 83 84 #if (NXT_HAVE_ACCEPT4) 85 static void nxt_epoll_conn_io_accept4(nxt_task_t *task, void *obj, 86 void *data); 87 #endif 88 89 90 #if (NXT_HAVE_EPOLL_EDGE) 91 92 static void nxt_epoll_edge_conn_io_connect(nxt_task_t *task, void *obj, 93 void *data); 94 static void nxt_epoll_edge_conn_connected(nxt_task_t *task, void *obj, 95 void *data); 96 static ssize_t nxt_epoll_edge_conn_io_recvbuf(nxt_conn_t *c, nxt_buf_t *b); 97 98 99 static nxt_conn_io_t nxt_epoll_edge_conn_io = { 100 nxt_epoll_edge_conn_io_connect, 101 nxt_conn_io_accept, 102 103 nxt_conn_io_read, 104 nxt_epoll_edge_conn_io_recvbuf, 105 nxt_conn_io_recv, 106 107 nxt_conn_io_write, 108 nxt_event_conn_io_write_chunk, 109 110 #if (NXT_HAVE_LINUX_SENDFILE) 111 nxt_linux_event_conn_io_sendfile, 112 #else 113 nxt_event_conn_io_sendbuf, 114 #endif 115 116 nxt_event_conn_io_writev, 117 nxt_event_conn_io_send, 118 119 nxt_conn_io_shutdown, 120 }; 121 122 123 const nxt_event_interface_t nxt_epoll_edge_engine = { 124 "epoll_edge", 125 nxt_epoll_edge_create, 126 nxt_epoll_free, 127 nxt_epoll_enable, 128 nxt_epoll_disable, 129 nxt_epoll_delete, 130 nxt_epoll_close, 131 nxt_epoll_enable_read, 132 nxt_epoll_enable_write, 133 nxt_epoll_disable_read, 134 nxt_epoll_disable_write, 135 nxt_epoll_block_read, 136 nxt_epoll_block_write, 137 nxt_epoll_oneshot_read, 138 nxt_epoll_oneshot_write, 139 nxt_epoll_enable_accept, 140 NULL, 141 NULL, 142 #if (NXT_HAVE_EVENTFD) 143 nxt_epoll_enable_post, 144 nxt_epoll_signal, 145 #else 146 NULL, 147 NULL, 148 #endif 149 nxt_epoll_poll, 150 151 &nxt_epoll_edge_conn_io, 152 153 #if (NXT_HAVE_INOTIFY) 154 NXT_FILE_EVENTS, 155 #else 156 NXT_NO_FILE_EVENTS, 157 #endif 158 159 #if (NXT_HAVE_SIGNALFD) 160 NXT_SIGNAL_EVENTS, 161 #else 162 NXT_NO_SIGNAL_EVENTS, 163 #endif 164 }; 165 166 #endif 167 168 169 const nxt_event_interface_t nxt_epoll_level_engine = { 170 "epoll_level", 171 nxt_epoll_level_create, 172 nxt_epoll_free, 173 nxt_epoll_enable, 174 nxt_epoll_disable, 175 nxt_epoll_delete, 176 nxt_epoll_close, 177 nxt_epoll_enable_read, 178 nxt_epoll_enable_write, 179 nxt_epoll_disable_read, 180 nxt_epoll_disable_write, 181 nxt_epoll_block_read, 182 nxt_epoll_block_write, 183 nxt_epoll_oneshot_read, 184 nxt_epoll_oneshot_write, 185 nxt_epoll_enable_accept, 186 NULL, 187 NULL, 188 #if (NXT_HAVE_EVENTFD) 189 nxt_epoll_enable_post, 190 nxt_epoll_signal, 191 #else 192 NULL, 193 NULL, 194 #endif 195 nxt_epoll_poll, 196 197 &nxt_unix_conn_io, 198 199 #if (NXT_HAVE_INOTIFY) 200 NXT_FILE_EVENTS, 201 #else 202 NXT_NO_FILE_EVENTS, 203 #endif 204 205 #if (NXT_HAVE_SIGNALFD) 206 NXT_SIGNAL_EVENTS, 207 #else 208 NXT_NO_SIGNAL_EVENTS, 209 #endif 210 }; 211 212 213 #if (NXT_HAVE_EPOLL_EDGE) 214 215 static nxt_int_t 216 nxt_epoll_edge_create(nxt_event_engine_t *engine, nxt_uint_t mchanges, 217 nxt_uint_t mevents) 218 { 219 return nxt_epoll_create(engine, mchanges, mevents, &nxt_epoll_edge_conn_io, 220 EPOLLET | EPOLLRDHUP); 221 } 222 223 #endif 224 225 226 static nxt_int_t 227 nxt_epoll_level_create(nxt_event_engine_t *engine, nxt_uint_t mchanges, 228 nxt_uint_t mevents) 229 { 230 return nxt_epoll_create(engine, mchanges, mevents, 231 &nxt_unix_conn_io, 0); 232 } 233 234 235 static nxt_int_t 236 nxt_epoll_create(nxt_event_engine_t *engine, nxt_uint_t mchanges, 237 nxt_uint_t mevents, nxt_conn_io_t *io, uint32_t mode) 238 { 239 engine->u.epoll.fd = -1; 240 engine->u.epoll.mode = mode; 241 engine->u.epoll.mchanges = mchanges; 242 engine->u.epoll.mevents = mevents; 243 #if (NXT_HAVE_SIGNALFD) 244 engine->u.epoll.signalfd.fd = -1; 245 #endif 246 247 engine->u.epoll.changes = nxt_malloc(sizeof(nxt_epoll_change_t) * mchanges); 248 if (engine->u.epoll.changes == NULL) { 249 goto fail; 250 } 251 252 engine->u.epoll.events = nxt_malloc(sizeof(struct epoll_event) * mevents); 253 if (engine->u.epoll.events == NULL) { 254 goto fail; 255 } 256 257 engine->u.epoll.fd = epoll_create(1); 258 if (engine->u.epoll.fd == -1) { 259 nxt_log(&engine->task, NXT_LOG_CRIT, "epoll_create() failed %E", 260 nxt_errno); 261 goto fail; 262 } 263 264 nxt_debug(&engine->task, "epoll_create(): %d", engine->u.epoll.fd); 265 266 if (engine->signals != NULL) { 267 268 #if (NXT_HAVE_SIGNALFD) 269 270 if (nxt_epoll_add_signal(engine) != NXT_OK) { 271 goto fail; 272 } 273 274 #endif 275 276 nxt_epoll_test_accept4(engine, io); 277 } 278 279 return NXT_OK; 280 281 fail: 282 283 nxt_epoll_free(engine); 284 285 return NXT_ERROR; 286 } 287 288 289 static void 290 nxt_epoll_test_accept4(nxt_event_engine_t *engine, nxt_conn_io_t *io) 291 { 292 static nxt_work_handler_t handler; 293 294 if (handler == NULL) { 295 296 handler = io->accept; 297 298 #if (NXT_HAVE_ACCEPT4) 299 300 (void) accept4(-1, NULL, NULL, SOCK_NONBLOCK); 301 302 if (nxt_errno != NXT_ENOSYS) { 303 handler = nxt_epoll_conn_io_accept4; 304 305 } else { 306 nxt_log(&engine->task, NXT_LOG_INFO, "accept4() failed %E", 307 NXT_ENOSYS); 308 } 309 310 #endif 311 } 312 313 io->accept = handler; 314 } 315 316 317 static void 318 nxt_epoll_free(nxt_event_engine_t *engine) 319 { 320 int fd; 321 322 nxt_debug(&engine->task, "epoll %d free", engine->u.epoll.fd); 323 324 #if (NXT_HAVE_SIGNALFD) 325 326 fd = engine->u.epoll.signalfd.fd; 327 328 if (fd != -1 && close(fd) != 0) { 329 nxt_log(&engine->task, NXT_LOG_CRIT, "signalfd close(%d) failed %E", 330 fd, nxt_errno); 331 } 332 333 #endif 334 335 #if (NXT_HAVE_EVENTFD) 336 337 fd = engine->u.epoll.eventfd.fd; 338 339 if (fd != -1 && close(fd) != 0) { 340 nxt_log(&engine->task, NXT_LOG_CRIT, "eventfd close(%d) failed %E", 341 fd, nxt_errno); 342 } 343 344 #endif 345 346 fd = engine->u.epoll.fd; 347 348 if (fd != -1 && close(fd) != 0) { 349 nxt_log(&engine->task, NXT_LOG_CRIT, "epoll close(%d) failed %E", 350 fd, nxt_errno); 351 } 352 353 nxt_free(engine->u.epoll.events); 354 355 nxt_memzero(&engine->u.epoll, sizeof(nxt_epoll_engine_t)); 356 } 357 358 359 static void 360 nxt_epoll_enable(nxt_event_engine_t *engine, nxt_fd_event_t *ev) 361 { 362 ev->read = NXT_EVENT_ACTIVE; 363 ev->write = NXT_EVENT_ACTIVE; 364 365 nxt_epoll_change(engine, ev, EPOLL_CTL_ADD, 366 EPOLLIN | EPOLLOUT | engine->u.epoll.mode); 367 } 368 369 370 static void 371 nxt_epoll_disable(nxt_event_engine_t *engine, nxt_fd_event_t *ev) 372 { 373 if (ev->read > NXT_EVENT_DISABLED || ev->write > NXT_EVENT_DISABLED) { 374 375 ev->read = NXT_EVENT_INACTIVE; 376 ev->write = NXT_EVENT_INACTIVE; 377 378 nxt_epoll_change(engine, ev, EPOLL_CTL_DEL, 0); 379 } 380 } 381 382 383 static void 384 nxt_epoll_delete(nxt_event_engine_t *engine, nxt_fd_event_t *ev) 385 { 386 if (ev->read != NXT_EVENT_INACTIVE || ev->write != NXT_EVENT_INACTIVE) { 387 388 ev->read = NXT_EVENT_INACTIVE; 389 ev->write = NXT_EVENT_INACTIVE; 390 391 nxt_epoll_change(engine, ev, EPOLL_CTL_DEL, 0); 392 } 393 } 394 395 396 /* 397 * Although calling close() on a file descriptor will remove any epoll 398 * events that reference the descriptor, in this case the close() acquires 399 * the kernel global "epmutex" while epoll_ctl(EPOLL_CTL_DEL) does not 400 * acquire the "epmutex" since Linux 3.13 if the file descriptor presents 401 * only in one epoll set. Thus removing events explicitly before closing 402 * eliminates possible lock contention. 403 */ 404 405 static nxt_bool_t 406 nxt_epoll_close(nxt_event_engine_t *engine, nxt_fd_event_t *ev) 407 { 408 nxt_epoll_delete(engine, ev); 409 410 return ev->changing; 411 } 412 413 414 static void 415 nxt_epoll_enable_read(nxt_event_engine_t *engine, nxt_fd_event_t *ev) 416 { 417 int op; 418 uint32_t events; 419 420 if (ev->read != NXT_EVENT_BLOCKED) { 421 422 op = EPOLL_CTL_MOD; 423 events = EPOLLIN | engine->u.epoll.mode; 424 425 if (ev->read == NXT_EVENT_INACTIVE && ev->write == NXT_EVENT_INACTIVE) { 426 op = EPOLL_CTL_ADD; 427 428 } else if (ev->write >= NXT_EVENT_BLOCKED) { 429 events |= EPOLLOUT; 430 } 431 432 nxt_epoll_change(engine, ev, op, events); 433 } 434 435 ev->read = NXT_EVENT_ACTIVE; 436 } 437 438 439 static void 440 nxt_epoll_enable_write(nxt_event_engine_t *engine, nxt_fd_event_t *ev) 441 { 442 int op; 443 uint32_t events; 444 445 if (ev->write != NXT_EVENT_BLOCKED) { 446 447 op = EPOLL_CTL_MOD; 448 events = EPOLLOUT | engine->u.epoll.mode; 449 450 if (ev->read == NXT_EVENT_INACTIVE && ev->write == NXT_EVENT_INACTIVE) { 451 op = EPOLL_CTL_ADD; 452 453 } else if (ev->read >= NXT_EVENT_BLOCKED) { 454 events |= EPOLLIN; 455 } 456 457 nxt_epoll_change(engine, ev, op, events); 458 } 459 460 ev->write = NXT_EVENT_ACTIVE; 461 } 462 463 464 static void 465 nxt_epoll_disable_read(nxt_event_engine_t *engine, nxt_fd_event_t *ev) 466 { 467 int op; 468 uint32_t events; 469 470 ev->read = NXT_EVENT_INACTIVE; 471 472 if (ev->write <= NXT_EVENT_DISABLED) { 473 ev->write = NXT_EVENT_INACTIVE; 474 op = EPOLL_CTL_DEL; 475 events = 0; 476 477 } else { 478 op = EPOLL_CTL_MOD; 479 events = EPOLLOUT | engine->u.epoll.mode; 480 } 481 482 nxt_epoll_change(engine, ev, op, events); 483 } 484 485 486 static void 487 nxt_epoll_disable_write(nxt_event_engine_t *engine, nxt_fd_event_t *ev) 488 { 489 int op; 490 uint32_t events; 491 492 ev->write = NXT_EVENT_INACTIVE; 493 494 if (ev->read <= NXT_EVENT_DISABLED) { 495 ev->write = NXT_EVENT_INACTIVE; 496 op = EPOLL_CTL_DEL; 497 events = 0; 498 499 } else { 500 op = EPOLL_CTL_MOD; 501 events = EPOLLIN | engine->u.epoll.mode; 502 } 503 504 nxt_epoll_change(engine, ev, op, events); 505 } 506 507 508 static void 509 nxt_epoll_block_read(nxt_event_engine_t *engine, nxt_fd_event_t *ev) 510 { 511 if (ev->read != NXT_EVENT_INACTIVE) { 512 ev->read = NXT_EVENT_BLOCKED; 513 } 514 } 515 516 517 static void 518 nxt_epoll_block_write(nxt_event_engine_t *engine, nxt_fd_event_t *ev) 519 { 520 if (ev->write != NXT_EVENT_INACTIVE) { 521 ev->write = NXT_EVENT_BLOCKED; 522 } 523 } 524 525 526 /* 527 * NXT_EVENT_DISABLED state is used to track whether EPOLLONESHOT 528 * event should be added or modified, epoll_ctl(2): 529 * 530 * EPOLLONESHOT (since Linux 2.6.2) 531 * Sets the one-shot behavior for the associated file descriptor. 532 * This means that after an event is pulled out with epoll_wait(2) 533 * the associated file descriptor is internally disabled and no 534 * other events will be reported by the epoll interface. The user 535 * must call epoll_ctl() with EPOLL_CTL_MOD to rearm the file 536 * descriptor with a new event mask. 537 */ 538 539 static void 540 nxt_epoll_oneshot_read(nxt_event_engine_t *engine, nxt_fd_event_t *ev) 541 { 542 int op; 543 544 op = (ev->read == NXT_EVENT_INACTIVE && ev->write == NXT_EVENT_INACTIVE) ? 545 EPOLL_CTL_ADD : EPOLL_CTL_MOD; 546 547 ev->read = NXT_EVENT_ONESHOT; 548 ev->write = NXT_EVENT_INACTIVE; 549 550 nxt_epoll_change(engine, ev, op, EPOLLIN | EPOLLONESHOT); 551 } 552 553 554 static void 555 nxt_epoll_oneshot_write(nxt_event_engine_t *engine, nxt_fd_event_t *ev) 556 { 557 int op; 558 559 op = (ev->read == NXT_EVENT_INACTIVE && ev->write == NXT_EVENT_INACTIVE) ? 560 EPOLL_CTL_ADD : EPOLL_CTL_MOD; 561 562 ev->read = NXT_EVENT_INACTIVE; 563 ev->write = NXT_EVENT_ONESHOT; 564 565 nxt_epoll_change(engine, ev, op, EPOLLOUT | EPOLLONESHOT); 566 } 567 568 569 static void 570 nxt_epoll_enable_accept(nxt_event_engine_t *engine, nxt_fd_event_t *ev) 571 { 572 ev->read = NXT_EVENT_ACTIVE; 573 574 nxt_epoll_change(engine, ev, EPOLL_CTL_ADD, EPOLLIN); 575 } 576 577 578 /* 579 * epoll changes are batched to improve instruction and data cache 580 * locality of several epoll_ctl() calls followed by epoll_wait() call. 581 */ 582 583 static void 584 nxt_epoll_change(nxt_event_engine_t *engine, nxt_fd_event_t *ev, int op, 585 uint32_t events) 586 { 587 nxt_epoll_change_t *change; 588 589 nxt_debug(ev->task, "epoll %d set event: fd:%d op:%d ev:%XD", 590 engine->u.epoll.fd, ev->fd, op, events); 591 592 if (engine->u.epoll.nchanges >= engine->u.epoll.mchanges) { 593 (void) nxt_epoll_commit_changes(engine); 594 } 595 596 ev->changing = 1; 597 598 change = &engine->u.epoll.changes[engine->u.epoll.nchanges++]; 599 change->op = op; 600 change->event.events = events; 601 change->event.data.ptr = ev; 602 } 603 604 605 static nxt_int_t 606 nxt_epoll_commit_changes(nxt_event_engine_t *engine) 607 { 608 int ret; 609 nxt_int_t retval; 610 nxt_fd_event_t *ev; 611 nxt_epoll_change_t *change, *end; 612 613 nxt_debug(&engine->task, "epoll %d changes:%ui", 614 engine->u.epoll.fd, engine->u.epoll.nchanges); 615 616 retval = NXT_OK; 617 change = engine->u.epoll.changes; 618 end = change + engine->u.epoll.nchanges; 619 620 do { 621 ev = change->event.data.ptr; 622 ev->changing = 0; 623 624 nxt_debug(ev->task, "epoll_ctl(%d): fd:%d op:%d ev:%XD", 625 engine->u.epoll.fd, ev->fd, change->op, 626 change->event.events); 627 628 ret = epoll_ctl(engine->u.epoll.fd, change->op, ev->fd, &change->event); 629 630 if (nxt_slow_path(ret != 0)) { 631 nxt_log(ev->task, NXT_LOG_CRIT, "epoll_ctl(%d, %d, %d) failed %E", 632 engine->u.epoll.fd, change->op, ev->fd, nxt_errno); 633 634 nxt_work_queue_add(&engine->fast_work_queue, 635 nxt_epoll_error_handler, ev->task, ev, ev->data); 636 637 retval = NXT_ERROR; 638 } 639 640 change++; 641 642 } while (change < end); 643 644 engine->u.epoll.nchanges = 0; 645 646 return retval; 647 } 648 649 650 static void 651 nxt_epoll_error_handler(nxt_task_t *task, void *obj, void *data) 652 { 653 nxt_fd_event_t *ev; 654 655 ev = obj; 656 657 ev->read = NXT_EVENT_INACTIVE; 658 ev->write = NXT_EVENT_INACTIVE; 659 660 ev->error_handler(ev->task, ev, data); 661 } 662 663 664 #if (NXT_HAVE_SIGNALFD) 665 666 static nxt_int_t 667 nxt_epoll_add_signal(nxt_event_engine_t *engine) 668 { 669 int fd; 670 struct epoll_event ee; 671 672 if (sigprocmask(SIG_BLOCK, &engine->signals->sigmask, NULL) != 0) { 673 nxt_log(&engine->task, NXT_LOG_CRIT, 674 "sigprocmask(SIG_BLOCK) failed %E", nxt_errno); 675 return NXT_ERROR; 676 } 677 678 /* 679 * Glibc signalfd() wrapper always has the flags argument. Glibc 2.7 680 * and 2.8 signalfd() wrappers call the original signalfd() syscall 681 * without the flags argument. Glibc 2.9+ signalfd() wrapper at first 682 * tries to call signalfd4() syscall and if it fails then calls the 683 * original signalfd() syscall. For this reason the non-blocking mode 684 * is set separately. 685 */ 686 687 fd = signalfd(-1, &engine->signals->sigmask, 0); 688 689 if (fd == -1) { 690 nxt_log(&engine->task, NXT_LOG_CRIT, "signalfd(%d) failed %E", 691 engine->u.epoll.signalfd.fd, nxt_errno); 692 return NXT_ERROR; 693 } 694 695 engine->u.epoll.signalfd.fd = fd; 696 697 if (nxt_fd_nonblocking(&engine->task, fd) != NXT_OK) { 698 return NXT_ERROR; 699 } 700 701 nxt_debug(&engine->task, "signalfd(): %d", fd); 702 703 engine->u.epoll.signalfd.data = engine->signals->handler; 704 engine->u.epoll.signalfd.read_work_queue = &engine->fast_work_queue; 705 engine->u.epoll.signalfd.read_handler = nxt_epoll_signalfd_handler; 706 engine->u.epoll.signalfd.log = engine->task.log; 707 engine->u.epoll.signalfd.task = &engine->task; 708 709 ee.events = EPOLLIN; 710 ee.data.ptr = &engine->u.epoll.signalfd; 711 712 if (epoll_ctl(engine->u.epoll.fd, EPOLL_CTL_ADD, fd, &ee) != 0) { 713 nxt_log(&engine->task, NXT_LOG_CRIT, "epoll_ctl(%d, %d, %d) failed %E", 714 engine->u.epoll.fd, EPOLL_CTL_ADD, fd, nxt_errno); 715 716 return NXT_ERROR; 717 } 718 719 return NXT_OK; 720 } 721 722 723 static void 724 nxt_epoll_signalfd_handler(nxt_task_t *task, void *obj, void *data) 725 { 726 int n; 727 nxt_fd_event_t *ev; 728 nxt_work_handler_t handler; 729 struct signalfd_siginfo sfd; 730 731 ev = obj; 732 handler = data; 733 734 nxt_debug(task, "signalfd handler"); 735 736 n = read(ev->fd, &sfd, sizeof(struct signalfd_siginfo)); 737 738 nxt_debug(task, "read signalfd(%d): %d", ev->fd, n); 739 740 if (n != sizeof(struct signalfd_siginfo)) { 741 nxt_log(task, NXT_LOG_CRIT, "read signalfd(%d) failed %E", 742 ev->fd, nxt_errno); 743 } 744 745 nxt_debug(task, "signalfd(%d) signo:%d", ev->fd, sfd.ssi_signo); 746 747 handler(task, (void *) (uintptr_t) sfd.ssi_signo, NULL); 748 } 749 750 #endif 751 752 753 #if (NXT_HAVE_EVENTFD) 754 755 static nxt_int_t 756 nxt_epoll_enable_post(nxt_event_engine_t *engine, nxt_work_handler_t handler) 757 { 758 int ret; 759 struct epoll_event ee; 760 761 engine->u.epoll.post_handler = handler; 762 763 /* 764 * Glibc eventfd() wrapper always has the flags argument. Glibc 2.7 765 * and 2.8 eventfd() wrappers call the original eventfd() syscall 766 * without the flags argument. Glibc 2.9+ eventfd() wrapper at first 767 * tries to call eventfd2() syscall and if it fails then calls the 768 * original eventfd() syscall. For this reason the non-blocking mode 769 * is set separately. 770 */ 771 772 engine->u.epoll.eventfd.fd = eventfd(0, 0); 773 774 if (engine->u.epoll.eventfd.fd == -1) { 775 nxt_log(&engine->task, NXT_LOG_CRIT, "eventfd() failed %E", nxt_errno); 776 return NXT_ERROR; 777 } 778 779 ret = nxt_fd_nonblocking(&engine->task, engine->u.epoll.eventfd.fd); 780 if (nxt_slow_path(ret != NXT_OK)) { 781 return NXT_ERROR; 782 } 783 784 nxt_debug(&engine->task, "eventfd(): %d", engine->u.epoll.eventfd.fd); 785 786 engine->u.epoll.eventfd.read_work_queue = &engine->fast_work_queue; 787 engine->u.epoll.eventfd.read_handler = nxt_epoll_eventfd_handler; 788 engine->u.epoll.eventfd.data = engine; 789 engine->u.epoll.eventfd.log = engine->task.log; 790 engine->u.epoll.eventfd.task = &engine->task; 791 792 ee.events = EPOLLIN | EPOLLET; 793 ee.data.ptr = &engine->u.epoll.eventfd; 794 795 ret = epoll_ctl(engine->u.epoll.fd, EPOLL_CTL_ADD, 796 engine->u.epoll.eventfd.fd, &ee); 797 798 if (nxt_fast_path(ret == 0)) { 799 return NXT_OK; 800 } 801 802 nxt_log(&engine->task, NXT_LOG_CRIT, "epoll_ctl(%d, %d, %d) failed %E", 803 engine->u.epoll.fd, EPOLL_CTL_ADD, engine->u.epoll.eventfd.fd, 804 nxt_errno); 805 806 return NXT_ERROR; 807 } 808 809 810 static void 811 nxt_epoll_eventfd_handler(nxt_task_t *task, void *obj, void *data) 812 { 813 int n; 814 uint64_t events; 815 nxt_event_engine_t *engine; 816 817 engine = data; 818 819 nxt_debug(task, "eventfd handler, times:%ui", engine->u.epoll.neventfd); 820 821 /* 822 * The maximum value after write() to a eventfd() descriptor will 823 * block or return EAGAIN is 0xfffffffffffffffe, so the descriptor 824 * can be read once per many notifications, for example, once per 825 * 2^32-2 noticifcations. Since the eventfd() file descriptor is 826 * always registered in EPOLLET mode, epoll returns event about 827 * only the latest write() to the descriptor. 828 */ 829 830 if (engine->u.epoll.neventfd++ >= 0xfffffffe) { 831 engine->u.epoll.neventfd = 0; 832 833 n = read(engine->u.epoll.eventfd.fd, &events, sizeof(uint64_t)); 834 835 nxt_debug(task, "read(%d): %d events:%uL", 836 engine->u.epoll.eventfd.fd, n, events); 837 838 if (n != sizeof(uint64_t)) { 839 nxt_log(task, NXT_LOG_CRIT, "read eventfd(%d) failed %E", 840 engine->u.epoll.eventfd.fd, nxt_errno); 841 } 842 } 843 844 engine->u.epoll.post_handler(task, NULL, NULL); 845 } 846 847 848 static void 849 nxt_epoll_signal(nxt_event_engine_t *engine, nxt_uint_t signo) 850 { 851 size_t ret; 852 uint64_t event; 853 854 /* 855 * eventfd() presents along with signalfd(), so the function 856 * is used only to post events and the signo argument is ignored. 857 */ 858 859 event = 1; 860 861 ret = write(engine->u.epoll.eventfd.fd, &event, sizeof(uint64_t)); 862 863 if (nxt_slow_path(ret != sizeof(uint64_t))) { 864 nxt_log(&engine->task, NXT_LOG_CRIT, "write(%d) to eventfd failed %E", 865 engine->u.epoll.eventfd.fd, nxt_errno); 866 } 867 } 868 869 #endif 870 871 872 static void 873 nxt_epoll_poll(nxt_event_engine_t *engine, nxt_msec_t timeout) 874 { 875 int nevents; 876 uint32_t events; 877 nxt_int_t i; 878 nxt_err_t err; 879 nxt_bool_t error; 880 nxt_uint_t level; 881 nxt_fd_event_t *ev; 882 struct epoll_event *event; 883 884 if (engine->u.epoll.nchanges != 0) { 885 if (nxt_epoll_commit_changes(engine) != NXT_OK) { 886 /* Error handlers have been enqueued on failure. */ 887 timeout = 0; 888 } 889 } 890 891 nxt_debug(&engine->task, "epoll_wait(%d) timeout:%M", 892 engine->u.epoll.fd, timeout); 893 894 nevents = epoll_wait(engine->u.epoll.fd, engine->u.epoll.events, 895 engine->u.epoll.mevents, timeout); 896 897 err = (nevents == -1) ? nxt_errno : 0; 898 899 nxt_thread_time_update(engine->task.thread); 900 901 nxt_debug(&engine->task, "epoll_wait(%d): %d", engine->u.epoll.fd, nevents); 902 903 if (nevents == -1) { 904 level = (err == NXT_EINTR) ? NXT_LOG_INFO : NXT_LOG_CRIT; 905 906 nxt_log(&engine->task, level, "epoll_wait(%d) failed %E", 907 engine->u.epoll.fd, err); 908 909 return; 910 } 911 912 for (i = 0; i < nevents; i++) { 913 914 event = &engine->u.epoll.events[i]; 915 events = event->events; 916 ev = event->data.ptr; 917 918 nxt_debug(ev->task, "epoll: fd:%d ev:%04XD d:%p rd:%d wr:%d", 919 ev->fd, events, ev, ev->read, ev->write); 920 921 /* 922 * On error epoll may set EPOLLERR and EPOLLHUP only without EPOLLIN or 923 * EPOLLOUT, so the "error" variable enqueues only one active handler. 924 */ 925 error = ((events & (EPOLLERR | EPOLLHUP)) != 0); 926 ev->epoll_error = error; 927 928 #if (NXT_HAVE_EPOLL_EDGE) 929 930 ev->epoll_eof = ((events & EPOLLRDHUP) != 0); 931 932 #endif 933 934 if ((events & EPOLLIN) || error) { 935 ev->read_ready = 1; 936 937 if (ev->read != NXT_EVENT_BLOCKED) { 938 939 if (ev->read == NXT_EVENT_ONESHOT) { 940 ev->read = NXT_EVENT_DISABLED; 941 } 942 943 error = 0; 944 945 nxt_work_queue_add(ev->read_work_queue, ev->read_handler, 946 ev->task, ev, ev->data); 947 948 } else if (engine->u.epoll.mode == 0) { 949 /* Level-triggered mode. */ 950 nxt_epoll_disable_read(engine, ev); 951 } 952 } 953 954 if ((events & EPOLLOUT) || error) { 955 ev->write_ready = 1; 956 957 if (ev->write != NXT_EVENT_BLOCKED) { 958 959 if (ev->write == NXT_EVENT_ONESHOT) { 960 ev->write = NXT_EVENT_DISABLED; 961 } 962 963 error = 0; 964 965 nxt_work_queue_add(ev->write_work_queue, ev->write_handler, 966 ev->task, ev, ev->data); 967 968 } else if (engine->u.epoll.mode == 0) { 969 /* Level-triggered mode. */ 970 nxt_epoll_disable_write(engine, ev); 971 } 972 } 973 974 if (error) { 975 ev->read_ready = 1; 976 ev->write_ready = 1; 977 } 978 } 979 } 980 981 982 #if (NXT_HAVE_ACCEPT4) 983 984 static void 985 nxt_epoll_conn_io_accept4(nxt_task_t *task, void *obj, void *data) 986 { 987 socklen_t len; 988 nxt_conn_t *c; 989 nxt_socket_t s; 990 struct sockaddr *sa; 991 nxt_listen_event_t *lev; 992 993 lev = obj; 994 c = lev->next; 995 996 lev->ready--; 997 lev->socket.read_ready = (lev->ready != 0); 998 999 len = c->remote->socklen; 1000 1001 if (len >= sizeof(struct sockaddr)) { 1002 sa = &c->remote->u.sockaddr; 1003 1004 } else { 1005 sa = NULL; 1006 len = 0; 1007 } 1008 1009 s = accept4(lev->socket.fd, sa, &len, SOCK_NONBLOCK); 1010 1011 if (s != -1) { 1012 c->socket.fd = s; 1013 1014 nxt_debug(task, "accept4(%d): %d", lev->socket.fd, s); 1015 1016 nxt_conn_accept(task, lev, c); 1017 return; 1018 } 1019 1020 nxt_conn_accept_error(task, lev, "accept4", nxt_errno); 1021 } 1022 1023 #endif 1024 1025 1026 #if (NXT_HAVE_EPOLL_EDGE) 1027 1028 /* 1029 * nxt_epoll_edge_event_conn_io_connect() eliminates the getsockopt() 1030 * syscall to test pending connect() error. Although this special 1031 * interface can work in both edge-triggered and level-triggered 1032 * modes it is enabled only for the former mode because this mode is 1033 * available in all modern Linux distributions. For the latter mode 1034 * it is required to create additional nxt_epoll_level_event_conn_io 1035 * with single non-generic connect() interface. 1036 */ 1037 1038 static void 1039 nxt_epoll_edge_conn_io_connect(nxt_task_t *task, void *obj, void *data) 1040 { 1041 nxt_conn_t *c; 1042 nxt_event_engine_t *engine; 1043 nxt_work_handler_t handler; 1044 const nxt_event_conn_state_t *state; 1045 1046 c = obj; 1047 1048 state = c->write_state; 1049 1050 switch (nxt_socket_connect(task, c->socket.fd, c->remote) ){ 1051 1052 case NXT_OK: 1053 c->socket.write_ready = 1; 1054 handler = state->ready_handler; 1055 break; 1056 1057 case NXT_AGAIN: 1058 c->socket.write_handler = nxt_epoll_edge_conn_connected; 1059 c->socket.error_handler = nxt_conn_connect_error; 1060 1061 engine = task->thread->engine; 1062 nxt_conn_timer(engine, c, state, &c->write_timer); 1063 1064 nxt_epoll_enable(engine, &c->socket); 1065 c->socket.read = NXT_EVENT_BLOCKED; 1066 return; 1067 1068 #if 0 1069 case NXT_AGAIN: 1070 nxt_conn_timer(engine, c, state, &c->write_timer); 1071 1072 /* Fall through. */ 1073 1074 case NXT_OK: 1075 /* 1076 * Mark both read and write directions as ready and try to perform 1077 * I/O operations before receiving readiness notifications. 1078 * On unconnected socket Linux send() and recv() return EAGAIN 1079 * instead of ENOTCONN. 1080 */ 1081 c->socket.read_ready = 1; 1082 c->socket.write_ready = 1; 1083 /* 1084 * Enabling both read and write notifications on a getting 1085 * connected socket eliminates one epoll_ctl() syscall. 1086 */ 1087 c->socket.write_handler = nxt_epoll_edge_event_conn_connected; 1088 c->socket.error_handler = state->error_handler; 1089 1090 nxt_epoll_enable(engine, &c->socket); 1091 c->socket.read = NXT_EVENT_BLOCKED; 1092 1093 handler = state->ready_handler; 1094 break; 1095 #endif 1096 1097 case NXT_ERROR: 1098 handler = state->error_handler; 1099 break; 1100 1101 default: /* NXT_DECLINED: connection refused. */ 1102 handler = state->close_handler; 1103 break; 1104 } 1105 1106 nxt_work_queue_add(c->write_work_queue, handler, task, c, data); 1107 } 1108 1109 1110 static void 1111 nxt_epoll_edge_conn_connected(nxt_task_t *task, void *obj, void *data) 1112 { 1113 nxt_conn_t *c; 1114 1115 c = obj; 1116 1117 nxt_debug(task, "epoll event conn connected fd:%d", c->socket.fd); 1118 1119 if (!c->socket.epoll_error) { 1120 c->socket.write = NXT_EVENT_BLOCKED; 1121 1122 if (c->write_state->timer_autoreset) { 1123 nxt_timer_disable(task->thread->engine, &c->write_timer); 1124 } 1125 1126 nxt_work_queue_add(c->write_work_queue, c->write_state->ready_handler, 1127 task, c, data); 1128 return; 1129 } 1130 1131 nxt_conn_connect_test(task, c, data); 1132 } 1133 1134 1135 /* 1136 * nxt_epoll_edge_conn_io_recvbuf() is just wrapper around 1137 * standard nxt_conn_io_recvbuf() to enforce to read a pending EOF 1138 * in edge-triggered mode. 1139 */ 1140 1141 static ssize_t 1142 nxt_epoll_edge_conn_io_recvbuf(nxt_conn_t *c, nxt_buf_t *b) 1143 { 1144 ssize_t n; 1145 1146 n = nxt_conn_io_recvbuf(c, b); 1147 1148 if (n > 0 && c->socket.epoll_eof) { 1149 c->socket.read_ready = 1; 1150 } 1151 1152 return n; 1153 } 1154 1155 #endif 1156