1 2 /* 3 * Copyright (C) Igor Sysoev 4 * Copyright (C) NGINX, Inc. 5 */ 6 7 #include <nxt_main.h> 8 9 10 #define NXT_POLL_ADD 0 11 #define NXT_POLL_CHANGE 1 12 #define NXT_POLL_DELETE 2 13 14 15 typedef struct { 16 /* 17 * A file descriptor is stored in hash entry to allow 18 * nxt_poll_fd_hash_test() to not dereference a pointer to 19 * nxt_fd_event_t which may be invalid if the file descriptor has 20 * been already closed and the nxt_fd_event_t's memory has been freed. 21 */ 22 nxt_socket_t fd; 23 24 uint32_t index; 25 void *event; 26 } nxt_poll_hash_entry_t; 27 28 29 static nxt_int_t nxt_poll_create(nxt_event_engine_t *engine, 30 nxt_uint_t mchanges, nxt_uint_t mevents); 31 static void nxt_poll_free(nxt_event_engine_t *engine); 32 static void nxt_poll_enable(nxt_event_engine_t *engine, nxt_fd_event_t *ev); 33 static void nxt_poll_disable(nxt_event_engine_t *engine, 34 nxt_fd_event_t *ev); 35 static nxt_bool_t nxt_poll_close(nxt_event_engine_t *engine, 36 nxt_fd_event_t *ev); 37 static void nxt_poll_enable_read(nxt_event_engine_t *engine, 38 nxt_fd_event_t *ev); 39 static void nxt_poll_enable_write(nxt_event_engine_t *engine, 40 nxt_fd_event_t *ev); 41 static void nxt_poll_disable_read(nxt_event_engine_t *engine, 42 nxt_fd_event_t *ev); 43 static void nxt_poll_disable_write(nxt_event_engine_t *engine, 44 nxt_fd_event_t *ev); 45 static void nxt_poll_block_read(nxt_event_engine_t *engine, nxt_fd_event_t *ev); 46 static void nxt_poll_block_write(nxt_event_engine_t *engine, 47 nxt_fd_event_t *ev); 48 static void nxt_poll_oneshot_read(nxt_event_engine_t *engine, 49 nxt_fd_event_t *ev); 50 static void nxt_poll_oneshot_write(nxt_event_engine_t *engine, 51 nxt_fd_event_t *ev); 52 static void nxt_poll_change(nxt_event_engine_t *engine, nxt_fd_event_t *ev, 53 nxt_uint_t op, nxt_uint_t events); 54 static nxt_int_t nxt_poll_commit_changes(nxt_event_engine_t *engine); 55 static nxt_int_t nxt_poll_set_add(nxt_event_engine_t *engine, 56 nxt_fd_event_t *ev, int events); 57 static nxt_int_t nxt_poll_set_change(nxt_event_engine_t *engine, 58 nxt_fd_t fd, int events); 59 static nxt_int_t nxt_poll_set_delete(nxt_event_engine_t *engine, nxt_fd_t fd); 60 static void nxt_poll(nxt_event_engine_t *engine, nxt_msec_t timeout); 61 static nxt_poll_hash_entry_t *nxt_poll_fd_hash_get(nxt_event_engine_t *engine, 62 nxt_fd_t fd); 63 static nxt_int_t nxt_poll_fd_hash_test(nxt_lvlhsh_query_t *lhq, void *data); 64 static void nxt_poll_fd_hash_destroy(nxt_event_engine_t *engine, 65 nxt_lvlhsh_t *lh); 66 67 68 const nxt_event_interface_t nxt_poll_engine = { 69 "poll", 70 nxt_poll_create, 71 nxt_poll_free, 72 nxt_poll_enable, 73 nxt_poll_disable, 74 nxt_poll_disable, 75 nxt_poll_close, 76 nxt_poll_enable_read, 77 nxt_poll_enable_write, 78 nxt_poll_disable_read, 79 nxt_poll_disable_write, 80 nxt_poll_block_read, 81 nxt_poll_block_write, 82 nxt_poll_oneshot_read, 83 nxt_poll_oneshot_write, 84 nxt_poll_enable_read, 85 NULL, 86 NULL, 87 NULL, 88 NULL, 89 nxt_poll, 90 91 &nxt_unix_conn_io, 92 93 NXT_NO_FILE_EVENTS, 94 NXT_NO_SIGNAL_EVENTS, 95 }; 96 97 98 static const nxt_lvlhsh_proto_t nxt_poll_fd_hash_proto nxt_aligned(64) = 99 { 100 NXT_LVLHSH_LARGE_MEMALIGN, 101 nxt_poll_fd_hash_test, 102 nxt_lvlhsh_alloc, 103 nxt_lvlhsh_free, 104 }; 105 106 107 static nxt_int_t 108 nxt_poll_create(nxt_event_engine_t *engine, nxt_uint_t mchanges, 109 nxt_uint_t mevents) 110 { 111 engine->u.poll.mchanges = mchanges; 112 113 engine->u.poll.changes = nxt_malloc(sizeof(nxt_poll_change_t) * mchanges); 114 115 if (engine->u.poll.changes != NULL) { 116 return NXT_OK; 117 } 118 119 return NXT_ERROR; 120 } 121 122 123 static void 124 nxt_poll_free(nxt_event_engine_t *engine) 125 { 126 nxt_debug(&engine->task, "poll free"); 127 128 nxt_free(engine->u.poll.set); 129 nxt_free(engine->u.poll.changes); 130 nxt_poll_fd_hash_destroy(engine, &engine->u.poll.fd_hash); 131 132 nxt_memzero(&engine->u.poll, sizeof(nxt_poll_engine_t)); 133 } 134 135 136 static void 137 nxt_poll_enable(nxt_event_engine_t *engine, nxt_fd_event_t *ev) 138 { 139 ev->read = NXT_EVENT_ACTIVE; 140 ev->write = NXT_EVENT_ACTIVE; 141 142 nxt_poll_change(engine, ev, NXT_POLL_ADD, POLLIN | POLLOUT); 143 } 144 145 146 static void 147 nxt_poll_disable(nxt_event_engine_t *engine, nxt_fd_event_t *ev) 148 { 149 if (ev->read != NXT_EVENT_INACTIVE && ev->write != NXT_EVENT_INACTIVE) { 150 ev->read = NXT_EVENT_INACTIVE; 151 ev->write = NXT_EVENT_INACTIVE; 152 153 nxt_poll_change(engine, ev, NXT_POLL_DELETE, 0); 154 } 155 } 156 157 158 static nxt_bool_t 159 nxt_poll_close(nxt_event_engine_t *engine, nxt_fd_event_t *ev) 160 { 161 nxt_poll_disable(engine, ev); 162 163 return ev->changing; 164 } 165 166 167 static void 168 nxt_poll_enable_read(nxt_event_engine_t *engine, nxt_fd_event_t *ev) 169 { 170 nxt_uint_t op, events; 171 172 ev->read = NXT_EVENT_ACTIVE; 173 174 if (ev->write == NXT_EVENT_INACTIVE) { 175 op = NXT_POLL_ADD; 176 events = POLLIN; 177 178 } else { 179 op = NXT_POLL_CHANGE; 180 events = POLLIN | POLLOUT; 181 } 182 183 nxt_poll_change(engine, ev, op, events); 184 } 185 186 187 static void 188 nxt_poll_enable_write(nxt_event_engine_t *engine, nxt_fd_event_t *ev) 189 { 190 nxt_uint_t op, events; 191 192 ev->write = NXT_EVENT_ACTIVE; 193 194 if (ev->read == NXT_EVENT_INACTIVE) { 195 op = NXT_POLL_ADD; 196 events = POLLOUT; 197 198 } else { 199 op = NXT_POLL_CHANGE; 200 events = POLLIN | POLLOUT; 201 } 202 203 nxt_poll_change(engine, ev, op, events); 204 } 205 206 207 static void 208 nxt_poll_disable_read(nxt_event_engine_t *engine, nxt_fd_event_t *ev) 209 { 210 nxt_uint_t op, events; 211 212 ev->read = NXT_EVENT_INACTIVE; 213 214 if (ev->write == NXT_EVENT_INACTIVE) { 215 op = NXT_POLL_DELETE; 216 events = 0; 217 218 } else { 219 op = NXT_POLL_CHANGE; 220 events = POLLOUT; 221 } 222 223 nxt_poll_change(engine, ev, op, events); 224 } 225 226 227 static void 228 nxt_poll_disable_write(nxt_event_engine_t *engine, nxt_fd_event_t *ev) 229 { 230 nxt_uint_t op, events; 231 232 ev->write = NXT_EVENT_INACTIVE; 233 234 if (ev->read == NXT_EVENT_INACTIVE) { 235 op = NXT_POLL_DELETE; 236 events = 0; 237 238 } else { 239 op = NXT_POLL_CHANGE; 240 events = POLLIN; 241 } 242 243 nxt_poll_change(engine, ev, op, events); 244 } 245 246 247 static void 248 nxt_poll_block_read(nxt_event_engine_t *engine, nxt_fd_event_t *ev) 249 { 250 if (ev->read != NXT_EVENT_INACTIVE) { 251 nxt_poll_disable_read(engine, ev); 252 } 253 } 254 255 256 static void 257 nxt_poll_block_write(nxt_event_engine_t *engine, nxt_fd_event_t *ev) 258 { 259 if (ev->write != NXT_EVENT_INACTIVE) { 260 nxt_poll_disable_write(engine, ev); 261 } 262 } 263 264 265 static void 266 nxt_poll_oneshot_read(nxt_event_engine_t *engine, nxt_fd_event_t *ev) 267 { 268 nxt_uint_t op; 269 270 op = (ev->read == NXT_EVENT_INACTIVE && ev->write == NXT_EVENT_INACTIVE) ? 271 NXT_POLL_ADD : NXT_POLL_CHANGE; 272 273 ev->read = NXT_EVENT_ONESHOT; 274 ev->write = NXT_EVENT_INACTIVE; 275 276 nxt_poll_change(engine, ev, op, POLLIN); 277 } 278 279 280 static void 281 nxt_poll_oneshot_write(nxt_event_engine_t *engine, nxt_fd_event_t *ev) 282 { 283 nxt_uint_t op; 284 285 op = (ev->read == NXT_EVENT_INACTIVE && ev->write == NXT_EVENT_INACTIVE) ? 286 NXT_POLL_ADD : NXT_POLL_CHANGE; 287 288 ev->read = NXT_EVENT_INACTIVE; 289 ev->write = NXT_EVENT_ONESHOT; 290 291 nxt_poll_change(engine, ev, op, POLLOUT); 292 } 293 294 295 /* 296 * poll changes are batched to improve instruction and data cache 297 * locality of several lvlhsh operations followed by poll() call. 298 */ 299 300 static void 301 nxt_poll_change(nxt_event_engine_t *engine, nxt_fd_event_t *ev, nxt_uint_t op, 302 nxt_uint_t events) 303 { 304 nxt_poll_change_t *change; 305 306 nxt_debug(ev->task, "poll change: fd:%d op:%d ev:%XD", ev->fd, op, events); 307 308 if (engine->u.poll.nchanges >= engine->u.poll.mchanges) { 309 (void) nxt_poll_commit_changes(engine); 310 } 311 312 ev->changing = 1; 313 314 change = &engine->u.poll.changes[engine->u.poll.nchanges++]; 315 change->op = op; 316 change->events = events; 317 change->event = ev; 318 } 319 320 321 static nxt_int_t 322 nxt_poll_commit_changes(nxt_event_engine_t *engine) 323 { 324 nxt_int_t ret, retval; 325 nxt_fd_event_t *ev; 326 nxt_poll_change_t *change, *end; 327 328 nxt_debug(&engine->task, "poll changes:%ui", engine->u.poll.nchanges); 329 330 retval = NXT_OK; 331 change = engine->u.poll.changes; 332 end = change + engine->u.poll.nchanges; 333 334 do { 335 ev = change->event; 336 ev->changing = 0; 337 338 switch (change->op) { 339 340 case NXT_POLL_ADD: 341 ret = nxt_poll_set_add(engine, ev, change->events); 342 343 if (nxt_fast_path(ret == NXT_OK)) { 344 goto next; 345 } 346 347 break; 348 349 case NXT_POLL_CHANGE: 350 ret = nxt_poll_set_change(engine, ev->fd, change->events); 351 352 if (nxt_fast_path(ret == NXT_OK)) { 353 goto next; 354 } 355 356 break; 357 358 case NXT_POLL_DELETE: 359 ret = nxt_poll_set_delete(engine, ev->fd); 360 361 if (nxt_fast_path(ret == NXT_OK)) { 362 goto next; 363 } 364 365 break; 366 } 367 368 nxt_work_queue_add(&engine->fast_work_queue, ev->error_handler, 369 ev->task, ev, ev->data); 370 371 retval = NXT_ERROR; 372 373 next: 374 375 change++; 376 377 } while (change < end); 378 379 engine->u.poll.nchanges = 0; 380 381 return retval; 382 } 383 384 385 static nxt_int_t 386 nxt_poll_set_add(nxt_event_engine_t *engine, nxt_fd_event_t *ev, int events) 387 { 388 nxt_int_t ret; 389 nxt_uint_t max_nfds; 390 struct pollfd *pfd; 391 nxt_lvlhsh_query_t lhq; 392 nxt_poll_hash_entry_t *phe; 393 394 nxt_debug(&engine->task, "poll add event: fd:%d ev:%04Xi", ev->fd, events); 395 396 if (engine->u.poll.nfds >= engine->u.poll.max_nfds) { 397 max_nfds = engine->u.poll.max_nfds + 512; /* 4K */ 398 399 pfd = nxt_realloc(engine->u.poll.set, sizeof(struct pollfd) * max_nfds); 400 if (nxt_slow_path(pfd == NULL)) { 401 return NXT_ERROR; 402 } 403 404 engine->u.poll.set = pfd; 405 engine->u.poll.max_nfds = max_nfds; 406 } 407 408 phe = nxt_malloc(sizeof(nxt_poll_hash_entry_t)); 409 if (nxt_slow_path(phe == NULL)) { 410 return NXT_ERROR; 411 } 412 413 phe->fd = ev->fd; 414 phe->index = engine->u.poll.nfds; 415 phe->event = ev; 416 417 pfd = &engine->u.poll.set[engine->u.poll.nfds++]; 418 pfd->fd = ev->fd; 419 pfd->events = events; 420 pfd->revents = 0; 421 422 lhq.key_hash = nxt_murmur_hash2(&ev->fd, sizeof(nxt_fd_t)); 423 lhq.replace = 0; 424 lhq.value = phe; 425 lhq.proto = &nxt_poll_fd_hash_proto; 426 lhq.data = engine; 427 428 ret = nxt_lvlhsh_insert(&engine->u.poll.fd_hash, &lhq); 429 430 if (nxt_fast_path(ret == NXT_OK)) { 431 return NXT_OK; 432 } 433 434 nxt_free(phe); 435 436 return NXT_ERROR; 437 } 438 439 440 static nxt_int_t 441 nxt_poll_set_change(nxt_event_engine_t *engine, nxt_fd_t fd, int events) 442 { 443 nxt_poll_hash_entry_t *phe; 444 445 nxt_debug(&engine->task, "poll change event: fd:%d ev:%04Xi", 446 fd, events); 447 448 phe = nxt_poll_fd_hash_get(engine, fd); 449 450 if (nxt_fast_path(phe != NULL)) { 451 engine->u.poll.set[phe->index].events = events; 452 return NXT_OK; 453 } 454 455 return NXT_ERROR; 456 } 457 458 459 static nxt_int_t 460 nxt_poll_set_delete(nxt_event_engine_t *engine, nxt_fd_t fd) 461 { 462 nxt_int_t ret; 463 nxt_uint_t index, nfds; 464 nxt_lvlhsh_query_t lhq; 465 nxt_poll_hash_entry_t *phe; 466 467 nxt_debug(&engine->task, "poll delete event: fd:%d", fd); 468 469 lhq.key_hash = nxt_murmur_hash2(&fd, sizeof(nxt_fd_t)); 470 lhq.proto = &nxt_poll_fd_hash_proto; 471 lhq.data = engine; 472 473 ret = nxt_lvlhsh_delete(&engine->u.poll.fd_hash, &lhq); 474 475 if (nxt_slow_path(ret != NXT_OK)) { 476 return NXT_ERROR; 477 } 478 479 phe = lhq.value; 480 481 index = phe->index; 482 engine->u.poll.nfds--; 483 nfds = engine->u.poll.nfds; 484 485 if (index != nfds) { 486 engine->u.poll.set[index] = engine->u.poll.set[nfds]; 487 488 phe = nxt_poll_fd_hash_get(engine, engine->u.poll.set[nfds].fd); 489 490 phe->index = index; 491 } 492 493 nxt_free(lhq.value); 494 495 return NXT_OK; 496 } 497 498 499 static void 500 nxt_poll(nxt_event_engine_t *engine, nxt_msec_t timeout) 501 { 502 int nevents; 503 nxt_fd_t fd; 504 nxt_err_t err; 505 nxt_bool_t error; 506 nxt_uint_t i, events, level; 507 struct pollfd *pfd; 508 nxt_fd_event_t *ev; 509 nxt_poll_hash_entry_t *phe; 510 511 if (engine->u.poll.nchanges != 0) { 512 if (nxt_poll_commit_changes(engine) != NXT_OK) { 513 /* Error handlers have been enqueued on failure. */ 514 timeout = 0; 515 } 516 } 517 518 nxt_debug(&engine->task, "poll() events:%ui timeout:%M", 519 engine->u.poll.nfds, timeout); 520 521 nevents = poll(engine->u.poll.set, engine->u.poll.nfds, timeout); 522 523 err = (nevents == -1) ? nxt_errno : 0; 524 525 nxt_thread_time_update(engine->task.thread); 526 527 nxt_debug(&engine->task, "poll(): %d", nevents); 528 529 if (nevents == -1) { 530 level = (err == NXT_EINTR) ? NXT_LOG_INFO : NXT_LOG_ALERT; 531 nxt_log(&engine->task, level, "poll() failed %E", err); 532 return; 533 } 534 535 for (i = 0; i < engine->u.poll.nfds && nevents != 0; i++) { 536 537 pfd = &engine->u.poll.set[i]; 538 events = pfd->revents; 539 540 if (events == 0) { 541 continue; 542 } 543 544 fd = pfd->fd; 545 546 phe = nxt_poll_fd_hash_get(engine, fd); 547 548 if (nxt_slow_path(phe == NULL)) { 549 nxt_log(&engine->task, NXT_LOG_CRIT, 550 "poll() returned invalid fd:%d ev:%04Xd rev:%04uXi", 551 fd, pfd->events, events); 552 553 /* Mark the poll entry to ignore it by the kernel. */ 554 pfd->fd = -1; 555 goto next; 556 } 557 558 ev = phe->event; 559 560 nxt_debug(ev->task, "poll: fd:%d ev:%04uXi rd:%d wr:%d", 561 fd, events, ev->read, ev->write); 562 563 if (nxt_slow_path((events & POLLNVAL) != 0)) { 564 nxt_log(ev->task, NXT_LOG_CRIT, 565 "poll() error fd:%d ev:%04Xd rev:%04uXi", 566 fd, pfd->events, events); 567 568 /* Mark the poll entry to ignore it by the kernel. */ 569 pfd->fd = -1; 570 571 nxt_work_queue_add(&engine->fast_work_queue, 572 ev->error_handler, ev->task, ev, ev->data); 573 goto next; 574 } 575 576 /* 577 * On a socket's remote end close: 578 * 579 * Linux, FreeBSD, and Solaris set POLLIN; 580 * MacOSX sets POLLIN and POLLHUP; 581 * NetBSD sets POLLIN, and poll(2) claims this explicitly: 582 * 583 * If the remote end of a socket is closed, poll() 584 * returns a POLLIN event, rather than a POLLHUP. 585 * 586 * On error: 587 * 588 * Linux sets POLLHUP and POLLERR only; 589 * FreeBSD adds POLLHUP to POLLIN or POLLOUT, although poll(2) 590 * claims the opposite: 591 * 592 * Note that POLLHUP and POLLOUT should never be 593 * present in the revents bitmask at the same time. 594 * 595 * Solaris and NetBSD do not add POLLHUP or POLLERR; 596 * MacOSX sets POLLHUP only. 597 * 598 * If an implementation sets POLLERR or POLLHUP only without POLLIN 599 * or POLLOUT, the "error" variable enqueues only one active handler. 600 */ 601 602 error = (((events & (POLLERR | POLLHUP)) != 0) 603 && ((events & (POLLIN | POLLOUT)) == 0)); 604 605 if ((events & POLLIN) || (error && ev->read_handler != NULL)) { 606 error = 0; 607 ev->read_ready = 1; 608 609 if (ev->read == NXT_EVENT_ONESHOT) { 610 ev->read = NXT_EVENT_INACTIVE; 611 nxt_poll_change(engine, ev, NXT_POLL_DELETE, 0); 612 } 613 614 nxt_work_queue_add(ev->read_work_queue, ev->read_handler, 615 ev->task, ev, ev->data); 616 } 617 618 if ((events & POLLOUT) || (error && ev->write_handler != NULL)) { 619 ev->write_ready = 1; 620 621 if (ev->write == NXT_EVENT_ONESHOT) { 622 ev->write = NXT_EVENT_INACTIVE; 623 nxt_poll_change(engine, ev, NXT_POLL_DELETE, 0); 624 } 625 626 nxt_work_queue_add(ev->write_work_queue, ev->write_handler, 627 ev->task, ev, ev->data); 628 } 629 630 next: 631 632 nevents--; 633 } 634 } 635 636 637 static nxt_poll_hash_entry_t * 638 nxt_poll_fd_hash_get(nxt_event_engine_t *engine, nxt_fd_t fd) 639 { 640 nxt_lvlhsh_query_t lhq; 641 nxt_poll_hash_entry_t *phe; 642 643 lhq.key_hash = nxt_murmur_hash2(&fd, sizeof(nxt_fd_t)); 644 lhq.proto = &nxt_poll_fd_hash_proto; 645 lhq.data = engine; 646 647 if (nxt_lvlhsh_find(&engine->u.poll.fd_hash, &lhq) == NXT_OK) { 648 phe = lhq.value; 649 return phe; 650 } 651 652 nxt_log(&engine->task, NXT_LOG_CRIT, "fd %d not found in hash", fd); 653 654 return NULL; 655 } 656 657 658 static nxt_int_t 659 nxt_poll_fd_hash_test(nxt_lvlhsh_query_t *lhq, void *data) 660 { 661 nxt_event_engine_t *engine; 662 nxt_poll_hash_entry_t *phe; 663 664 phe = data; 665 666 /* nxt_murmur_hash2() is unique for 4 bytes. */ 667 668 engine = lhq->data; 669 670 if (nxt_fast_path(phe->fd == engine->u.poll.set[phe->index].fd)) { 671 return NXT_OK; 672 } 673 674 nxt_log(&engine->task, NXT_LOG_CRIT, 675 "fd %d in hash mismatches fd %d in poll set", 676 phe->fd, engine->u.poll.set[phe->index].fd); 677 678 return NXT_DECLINED; 679 } 680 681 682 static void 683 nxt_poll_fd_hash_destroy(nxt_event_engine_t *engine, nxt_lvlhsh_t *lh) 684 { 685 nxt_lvlhsh_each_t lhe; 686 nxt_lvlhsh_query_t lhq; 687 nxt_poll_hash_entry_t *phe; 688 689 nxt_memzero(&lhe, sizeof(nxt_lvlhsh_each_t)); 690 lhe.proto = &nxt_poll_fd_hash_proto; 691 lhq.proto = &nxt_poll_fd_hash_proto; 692 693 for ( ;; ) { 694 phe = nxt_lvlhsh_each(lh, &lhe); 695 696 if (phe == NULL) { 697 return; 698 } 699 700 lhq.key_hash = nxt_murmur_hash2(&phe->fd, sizeof(nxt_fd_t)); 701 702 if (nxt_lvlhsh_delete(lh, &lhq) != NXT_OK) { 703 nxt_log(&engine->task, NXT_LOG_CRIT, 704 "event fd %d not found in hash", phe->fd); 705 } 706 707 nxt_free(phe); 708 } 709 } 710