1 2 /* 3 * Copyright (C) Igor Sysoev 4 * Copyright (C) NGINX, Inc. 5 */ 6 7 #include <nxt_main.h> 8 9 10 #define NXT_POLL_ADD 0 11 #define NXT_POLL_CHANGE 1 12 #define NXT_POLL_DELETE 2 13 14 15 typedef struct { 16 /* 17 * A file descriptor is stored in hash entry to allow 18 * nxt_poll_fd_hash_test() to not dereference a pointer to 19 * nxt_fd_event_t which may be invalid if the file descriptor has 20 * been already closed and the nxt_fd_event_t's memory has been freed. 21 */ 22 nxt_socket_t fd; 23 24 uint32_t index; 25 void *event; 26 } nxt_poll_hash_entry_t; 27 28 29 static nxt_int_t nxt_poll_create(nxt_event_engine_t *engine, 30 nxt_uint_t mchanges, nxt_uint_t mevents); 31 static void nxt_poll_free(nxt_event_engine_t *engine); 32 static void nxt_poll_enable(nxt_event_engine_t *engine, nxt_fd_event_t *ev); 33 static void nxt_poll_disable(nxt_event_engine_t *engine, 34 nxt_fd_event_t *ev); 35 static nxt_bool_t nxt_poll_close(nxt_event_engine_t *engine, 36 nxt_fd_event_t *ev); 37 static void nxt_poll_enable_read(nxt_event_engine_t *engine, 38 nxt_fd_event_t *ev); 39 static void nxt_poll_enable_write(nxt_event_engine_t *engine, 40 nxt_fd_event_t *ev); 41 static void nxt_poll_disable_read(nxt_event_engine_t *engine, 42 nxt_fd_event_t *ev); 43 static void nxt_poll_disable_write(nxt_event_engine_t *engine, 44 nxt_fd_event_t *ev); 45 static void nxt_poll_block_read(nxt_event_engine_t *engine, nxt_fd_event_t *ev); 46 static void nxt_poll_block_write(nxt_event_engine_t *engine, 47 nxt_fd_event_t *ev); 48 static void nxt_poll_oneshot_read(nxt_event_engine_t *engine, 49 nxt_fd_event_t *ev); 50 static void nxt_poll_oneshot_write(nxt_event_engine_t *engine, 51 nxt_fd_event_t *ev); 52 static void nxt_poll_change(nxt_event_engine_t *engine, nxt_fd_event_t *ev, 53 nxt_uint_t op, nxt_uint_t events); 54 static nxt_int_t nxt_poll_commit_changes(nxt_event_engine_t *engine); 55 static nxt_int_t nxt_poll_set_add(nxt_event_engine_t *engine, 56 nxt_fd_event_t *ev, int events); 57 static nxt_int_t nxt_poll_set_change(nxt_event_engine_t *engine, 58 nxt_fd_t fd, int events); 59 static nxt_int_t nxt_poll_set_delete(nxt_event_engine_t *engine, nxt_fd_t fd); 60 static void nxt_poll(nxt_event_engine_t *engine, nxt_msec_t timeout); 61 static nxt_poll_hash_entry_t *nxt_poll_fd_hash_get(nxt_event_engine_t *engine, 62 nxt_fd_t fd); 63 static nxt_int_t nxt_poll_fd_hash_test(nxt_lvlhsh_query_t *lhq, void *data); 64 static void nxt_poll_fd_hash_destroy(nxt_event_engine_t *engine, 65 nxt_lvlhsh_t *lh); 66 67 68 const nxt_event_interface_t nxt_poll_engine = { 69 "poll", 70 nxt_poll_create, 71 nxt_poll_free, 72 nxt_poll_enable, 73 nxt_poll_disable, 74 nxt_poll_disable, 75 nxt_poll_close, 76 nxt_poll_enable_read, 77 nxt_poll_enable_write, 78 nxt_poll_disable_read, 79 nxt_poll_disable_write, 80 nxt_poll_block_read, 81 nxt_poll_block_write, 82 nxt_poll_oneshot_read, 83 nxt_poll_oneshot_write, 84 nxt_poll_enable_read, 85 NULL, 86 NULL, 87 NULL, 88 NULL, 89 nxt_poll, 90 91 &nxt_unix_event_conn_io, 92 93 NXT_NO_FILE_EVENTS, 94 NXT_NO_SIGNAL_EVENTS, 95 }; 96 97 98 static const nxt_lvlhsh_proto_t nxt_poll_fd_hash_proto nxt_aligned(64) = 99 { 100 NXT_LVLHSH_LARGE_MEMALIGN, 101 0, 102 nxt_poll_fd_hash_test, 103 nxt_lvlhsh_alloc, 104 nxt_lvlhsh_free, 105 }; 106 107 108 static nxt_int_t 109 nxt_poll_create(nxt_event_engine_t *engine, nxt_uint_t mchanges, 110 nxt_uint_t mevents) 111 { 112 engine->u.poll.mchanges = mchanges; 113 114 engine->u.poll.changes = nxt_malloc(sizeof(nxt_poll_change_t) * mchanges); 115 116 if (engine->u.poll.changes != NULL) { 117 return NXT_OK; 118 } 119 120 return NXT_ERROR; 121 } 122 123 124 static void 125 nxt_poll_free(nxt_event_engine_t *engine) 126 { 127 nxt_debug(&engine->task, "poll free"); 128 129 nxt_free(engine->u.poll.set); 130 nxt_free(engine->u.poll.changes); 131 nxt_poll_fd_hash_destroy(engine, &engine->u.poll.fd_hash); 132 133 nxt_memzero(&engine->u.poll, sizeof(nxt_poll_engine_t)); 134 } 135 136 137 static void 138 nxt_poll_enable(nxt_event_engine_t *engine, nxt_fd_event_t *ev) 139 { 140 ev->read = NXT_EVENT_ACTIVE; 141 ev->write = NXT_EVENT_ACTIVE; 142 143 nxt_poll_change(engine, ev, NXT_POLL_ADD, POLLIN | POLLOUT); 144 } 145 146 147 static void 148 nxt_poll_disable(nxt_event_engine_t *engine, nxt_fd_event_t *ev) 149 { 150 if (ev->read != NXT_EVENT_INACTIVE && ev->write != NXT_EVENT_INACTIVE) { 151 ev->read = NXT_EVENT_INACTIVE; 152 ev->write = NXT_EVENT_INACTIVE; 153 154 nxt_poll_change(engine, ev, NXT_POLL_DELETE, 0); 155 } 156 } 157 158 159 static nxt_bool_t 160 nxt_poll_close(nxt_event_engine_t *engine, nxt_fd_event_t *ev) 161 { 162 nxt_poll_disable(engine, ev); 163 164 return ev->changing; 165 } 166 167 168 static void 169 nxt_poll_enable_read(nxt_event_engine_t *engine, nxt_fd_event_t *ev) 170 { 171 nxt_uint_t op, events; 172 173 ev->read = NXT_EVENT_ACTIVE; 174 175 if (ev->write == NXT_EVENT_INACTIVE) { 176 op = NXT_POLL_ADD; 177 events = POLLIN; 178 179 } else { 180 op = NXT_POLL_CHANGE; 181 events = POLLIN | POLLOUT; 182 } 183 184 nxt_poll_change(engine, ev, op, events); 185 } 186 187 188 static void 189 nxt_poll_enable_write(nxt_event_engine_t *engine, nxt_fd_event_t *ev) 190 { 191 nxt_uint_t op, events; 192 193 ev->write = NXT_EVENT_ACTIVE; 194 195 if (ev->read == NXT_EVENT_INACTIVE) { 196 op = NXT_POLL_ADD; 197 events = POLLOUT; 198 199 } else { 200 op = NXT_POLL_CHANGE; 201 events = POLLIN | POLLOUT; 202 } 203 204 nxt_poll_change(engine, ev, op, events); 205 } 206 207 208 static void 209 nxt_poll_disable_read(nxt_event_engine_t *engine, nxt_fd_event_t *ev) 210 { 211 nxt_uint_t op, events; 212 213 ev->read = NXT_EVENT_INACTIVE; 214 215 if (ev->write == NXT_EVENT_INACTIVE) { 216 op = NXT_POLL_DELETE; 217 events = 0; 218 219 } else { 220 op = NXT_POLL_CHANGE; 221 events = POLLOUT; 222 } 223 224 nxt_poll_change(engine, ev, op, events); 225 } 226 227 228 static void 229 nxt_poll_disable_write(nxt_event_engine_t *engine, nxt_fd_event_t *ev) 230 { 231 nxt_uint_t op, events; 232 233 ev->write = NXT_EVENT_INACTIVE; 234 235 if (ev->read == NXT_EVENT_INACTIVE) { 236 op = NXT_POLL_DELETE; 237 events = 0; 238 239 } else { 240 op = NXT_POLL_CHANGE; 241 events = POLLIN; 242 } 243 244 nxt_poll_change(engine, ev, op, events); 245 } 246 247 248 static void 249 nxt_poll_block_read(nxt_event_engine_t *engine, nxt_fd_event_t *ev) 250 { 251 if (ev->read != NXT_EVENT_INACTIVE) { 252 nxt_poll_disable_read(engine, ev); 253 } 254 } 255 256 257 static void 258 nxt_poll_block_write(nxt_event_engine_t *engine, nxt_fd_event_t *ev) 259 { 260 if (ev->write != NXT_EVENT_INACTIVE) { 261 nxt_poll_disable_write(engine, ev); 262 } 263 } 264 265 266 static void 267 nxt_poll_oneshot_read(nxt_event_engine_t *engine, nxt_fd_event_t *ev) 268 { 269 nxt_uint_t op; 270 271 op = (ev->read == NXT_EVENT_INACTIVE && ev->write == NXT_EVENT_INACTIVE) ? 272 NXT_POLL_ADD : NXT_POLL_CHANGE; 273 274 ev->read = NXT_EVENT_ONESHOT; 275 ev->write = NXT_EVENT_INACTIVE; 276 277 nxt_poll_change(engine, ev, op, POLLIN); 278 } 279 280 281 static void 282 nxt_poll_oneshot_write(nxt_event_engine_t *engine, nxt_fd_event_t *ev) 283 { 284 nxt_uint_t op; 285 286 op = (ev->read == NXT_EVENT_INACTIVE && ev->write == NXT_EVENT_INACTIVE) ? 287 NXT_POLL_ADD : NXT_POLL_CHANGE; 288 289 ev->read = NXT_EVENT_INACTIVE; 290 ev->write = NXT_EVENT_ONESHOT; 291 292 nxt_poll_change(engine, ev, op, POLLOUT); 293 } 294 295 296 /* 297 * poll changes are batched to improve instruction and data cache 298 * locality of several lvlhsh operations followed by poll() call. 299 */ 300 301 static void 302 nxt_poll_change(nxt_event_engine_t *engine, nxt_fd_event_t *ev, nxt_uint_t op, 303 nxt_uint_t events) 304 { 305 nxt_poll_change_t *change; 306 307 nxt_debug(ev->task, "poll change: fd:%d op:%d ev:%XD", ev->fd, op, events); 308 309 if (engine->u.poll.nchanges >= engine->u.poll.mchanges) { 310 (void) nxt_poll_commit_changes(engine); 311 } 312 313 ev->changing = 1; 314 315 change = &engine->u.poll.changes[engine->u.poll.nchanges++]; 316 change->op = op; 317 change->events = events; 318 change->event = ev; 319 } 320 321 322 static nxt_int_t 323 nxt_poll_commit_changes(nxt_event_engine_t *engine) 324 { 325 nxt_int_t ret, retval; 326 nxt_fd_event_t *ev; 327 nxt_poll_change_t *change, *end; 328 329 nxt_debug(&engine->task, "poll changes:%ui", engine->u.poll.nchanges); 330 331 retval = NXT_OK; 332 change = engine->u.poll.changes; 333 end = change + engine->u.poll.nchanges; 334 335 do { 336 ev = change->event; 337 ev->changing = 0; 338 339 switch (change->op) { 340 341 case NXT_POLL_ADD: 342 ret = nxt_poll_set_add(engine, ev, change->events); 343 344 if (nxt_fast_path(ret == NXT_OK)) { 345 goto next; 346 } 347 348 break; 349 350 case NXT_POLL_CHANGE: 351 ret = nxt_poll_set_change(engine, ev->fd, change->events); 352 353 if (nxt_fast_path(ret == NXT_OK)) { 354 goto next; 355 } 356 357 break; 358 359 case NXT_POLL_DELETE: 360 ret = nxt_poll_set_delete(engine, ev->fd); 361 362 if (nxt_fast_path(ret == NXT_OK)) { 363 goto next; 364 } 365 366 break; 367 } 368 369 nxt_work_queue_add(&engine->fast_work_queue, ev->error_handler, 370 ev->task, ev, ev->data); 371 372 retval = NXT_ERROR; 373 374 next: 375 376 change++; 377 378 } while (change < end); 379 380 engine->u.poll.nchanges = 0; 381 382 return retval; 383 } 384 385 386 static nxt_int_t 387 nxt_poll_set_add(nxt_event_engine_t *engine, nxt_fd_event_t *ev, int events) 388 { 389 nxt_int_t ret; 390 nxt_uint_t max_nfds; 391 struct pollfd *pfd; 392 nxt_lvlhsh_query_t lhq; 393 nxt_poll_hash_entry_t *phe; 394 395 nxt_debug(&engine->task, "poll add event: fd:%d ev:%04Xi", ev->fd, events); 396 397 if (engine->u.poll.nfds >= engine->u.poll.max_nfds) { 398 max_nfds = engine->u.poll.max_nfds + 512; /* 4K */ 399 400 pfd = nxt_realloc(engine->u.poll.set, sizeof(struct pollfd) * max_nfds); 401 if (nxt_slow_path(pfd == NULL)) { 402 return NXT_ERROR; 403 } 404 405 engine->u.poll.set = pfd; 406 engine->u.poll.max_nfds = max_nfds; 407 } 408 409 phe = nxt_malloc(sizeof(nxt_poll_hash_entry_t)); 410 if (nxt_slow_path(phe == NULL)) { 411 return NXT_ERROR; 412 } 413 414 phe->fd = ev->fd; 415 phe->index = engine->u.poll.nfds; 416 phe->event = ev; 417 418 pfd = &engine->u.poll.set[engine->u.poll.nfds++]; 419 pfd->fd = ev->fd; 420 pfd->events = events; 421 pfd->revents = 0; 422 423 lhq.key_hash = nxt_murmur_hash2(&ev->fd, sizeof(nxt_fd_t)); 424 lhq.replace = 0; 425 lhq.value = phe; 426 lhq.proto = &nxt_poll_fd_hash_proto; 427 lhq.data = engine; 428 429 ret = nxt_lvlhsh_insert(&engine->u.poll.fd_hash, &lhq); 430 431 if (nxt_fast_path(ret == NXT_OK)) { 432 return NXT_OK; 433 } 434 435 nxt_free(phe); 436 437 return NXT_ERROR; 438 } 439 440 441 static nxt_int_t 442 nxt_poll_set_change(nxt_event_engine_t *engine, nxt_fd_t fd, int events) 443 { 444 nxt_poll_hash_entry_t *phe; 445 446 nxt_debug(&engine->task, "poll change event: fd:%d ev:%04Xi", 447 fd, events); 448 449 phe = nxt_poll_fd_hash_get(engine, fd); 450 451 if (nxt_fast_path(phe != NULL)) { 452 engine->u.poll.set[phe->index].events = events; 453 return NXT_OK; 454 } 455 456 return NXT_ERROR; 457 } 458 459 460 static nxt_int_t 461 nxt_poll_set_delete(nxt_event_engine_t *engine, nxt_fd_t fd) 462 { 463 nxt_int_t ret; 464 nxt_uint_t index, nfds; 465 nxt_lvlhsh_query_t lhq; 466 nxt_poll_hash_entry_t *phe; 467 468 nxt_debug(&engine->task, "poll delete event: fd:%d", fd); 469 470 lhq.key_hash = nxt_murmur_hash2(&fd, sizeof(nxt_fd_t)); 471 lhq.proto = &nxt_poll_fd_hash_proto; 472 lhq.data = engine; 473 474 ret = nxt_lvlhsh_delete(&engine->u.poll.fd_hash, &lhq); 475 476 if (nxt_slow_path(ret != NXT_OK)) { 477 return NXT_ERROR; 478 } 479 480 phe = lhq.value; 481 482 index = phe->index; 483 engine->u.poll.nfds--; 484 nfds = engine->u.poll.nfds; 485 486 if (index != nfds) { 487 engine->u.poll.set[index] = engine->u.poll.set[nfds]; 488 489 phe = nxt_poll_fd_hash_get(engine, engine->u.poll.set[nfds].fd); 490 491 phe->index = index; 492 } 493 494 nxt_free(lhq.value); 495 496 return NXT_OK; 497 } 498 499 500 static void 501 nxt_poll(nxt_event_engine_t *engine, nxt_msec_t timeout) 502 { 503 int nevents; 504 nxt_fd_t fd; 505 nxt_err_t err; 506 nxt_bool_t error; 507 nxt_uint_t i, events, level; 508 struct pollfd *pfd; 509 nxt_fd_event_t *ev; 510 nxt_poll_hash_entry_t *phe; 511 512 if (engine->u.poll.nchanges != 0) { 513 if (nxt_poll_commit_changes(engine) != NXT_OK) { 514 /* Error handlers have been enqueued on failure. */ 515 timeout = 0; 516 } 517 } 518 519 nxt_debug(&engine->task, "poll() events:%ui timeout:%M", 520 engine->u.poll.nfds, timeout); 521 522 nevents = poll(engine->u.poll.set, engine->u.poll.nfds, timeout); 523 524 err = (nevents == -1) ? nxt_errno : 0; 525 526 nxt_thread_time_update(engine->task.thread); 527 528 nxt_debug(&engine->task, "poll(): %d", nevents); 529 530 if (nevents == -1) { 531 level = (err == NXT_EINTR) ? NXT_LOG_INFO : NXT_LOG_ALERT; 532 nxt_log(&engine->task, level, "poll() failed %E", err); 533 return; 534 } 535 536 for (i = 0; i < engine->u.poll.nfds && nevents != 0; i++) { 537 538 pfd = &engine->u.poll.set[i]; 539 events = pfd->revents; 540 541 if (events == 0) { 542 continue; 543 } 544 545 fd = pfd->fd; 546 547 phe = nxt_poll_fd_hash_get(engine, fd); 548 549 if (nxt_slow_path(phe == NULL)) { 550 nxt_log(&engine->task, NXT_LOG_CRIT, 551 "poll() returned invalid fd:%d ev:%04Xd rev:%04uXi", 552 fd, pfd->events, events); 553 554 /* Mark the poll entry to ignore it by the kernel. */ 555 pfd->fd = -1; 556 goto next; 557 } 558 559 ev = phe->event; 560 561 nxt_debug(ev->task, "poll: fd:%d ev:%04uXi rd:%d %wr:%d", 562 fd, events, ev->read, ev->write); 563 564 if (nxt_slow_path((events & POLLNVAL) != 0)) { 565 nxt_log(ev->task, NXT_LOG_CRIT, 566 "poll() error fd:%d ev:%04Xd rev:%04uXi", 567 fd, pfd->events, events); 568 569 /* Mark the poll entry to ignore it by the kernel. */ 570 pfd->fd = -1; 571 572 nxt_work_queue_add(&engine->fast_work_queue, 573 ev->error_handler, ev->task, ev, ev->data); 574 goto next; 575 } 576 577 /* 578 * On a socket's remote end close: 579 * 580 * Linux, FreeBSD, and Solaris set POLLIN; 581 * MacOSX sets POLLIN and POLLHUP; 582 * NetBSD sets POLLIN, and poll(2) claims this explicitly: 583 * 584 * If the remote end of a socket is closed, poll() 585 * returns a POLLIN event, rather than a POLLHUP. 586 * 587 * On error: 588 * 589 * Linux sets POLLHUP and POLLERR only; 590 * FreeBSD adds POLLHUP to POLLIN or POLLOUT, although poll(2) 591 * claims the opposite: 592 * 593 * Note that POLLHUP and POLLOUT should never be 594 * present in the revents bitmask at the same time. 595 * 596 * Solaris and NetBSD do not add POLLHUP or POLLERR; 597 * MacOSX sets POLLHUP only. 598 * 599 * If an implementation sets POLLERR or POLLHUP only without POLLIN 600 * or POLLOUT, the "error" variable enqueues only one active handler. 601 */ 602 603 error = (((events & (POLLERR | POLLHUP)) != 0) 604 && ((events & (POLLIN | POLLOUT)) == 0)); 605 606 if ((events & POLLIN) || (error && ev->read_handler != NULL)) { 607 error = 0; 608 ev->read_ready = 1; 609 610 if (ev->read == NXT_EVENT_ONESHOT) { 611 ev->read = NXT_EVENT_INACTIVE; 612 nxt_poll_change(engine, ev, NXT_POLL_DELETE, 0); 613 } 614 615 nxt_work_queue_add(ev->read_work_queue, ev->read_handler, 616 ev->task, ev, ev->data); 617 } 618 619 if ((events & POLLOUT) || (error && ev->write_handler != NULL)) { 620 ev->write_ready = 1; 621 622 if (ev->write == NXT_EVENT_ONESHOT) { 623 ev->write = NXT_EVENT_INACTIVE; 624 nxt_poll_change(engine, ev, NXT_POLL_DELETE, 0); 625 } 626 627 nxt_work_queue_add(ev->write_work_queue, ev->write_handler, 628 ev->task, ev, ev->data); 629 } 630 631 next: 632 633 nevents--; 634 } 635 } 636 637 638 static nxt_poll_hash_entry_t * 639 nxt_poll_fd_hash_get(nxt_event_engine_t *engine, nxt_fd_t fd) 640 { 641 nxt_lvlhsh_query_t lhq; 642 nxt_poll_hash_entry_t *phe; 643 644 lhq.key_hash = nxt_murmur_hash2(&fd, sizeof(nxt_fd_t)); 645 lhq.proto = &nxt_poll_fd_hash_proto; 646 lhq.data = engine; 647 648 if (nxt_lvlhsh_find(&engine->u.poll.fd_hash, &lhq) == NXT_OK) { 649 phe = lhq.value; 650 return phe; 651 } 652 653 nxt_log(&engine->task, NXT_LOG_CRIT, "fd %d not found in hash", fd); 654 655 return NULL; 656 } 657 658 659 static nxt_int_t 660 nxt_poll_fd_hash_test(nxt_lvlhsh_query_t *lhq, void *data) 661 { 662 nxt_event_engine_t *engine; 663 nxt_poll_hash_entry_t *phe; 664 665 phe = data; 666 667 /* nxt_murmur_hash2() is unique for 4 bytes. */ 668 669 engine = lhq->data; 670 671 if (nxt_fast_path(phe->fd == engine->u.poll.set[phe->index].fd)) { 672 return NXT_OK; 673 } 674 675 nxt_log(&engine->task, NXT_LOG_CRIT, 676 "fd %d in hash mismatches fd %d in poll set", 677 phe->fd, engine->u.poll.set[phe->index].fd); 678 679 return NXT_DECLINED; 680 } 681 682 683 static void 684 nxt_poll_fd_hash_destroy(nxt_event_engine_t *engine, nxt_lvlhsh_t *lh) 685 { 686 nxt_lvlhsh_each_t lhe; 687 nxt_lvlhsh_query_t lhq; 688 nxt_poll_hash_entry_t *phe; 689 690 nxt_memzero(&lhe, sizeof(nxt_lvlhsh_each_t)); 691 lhe.proto = &nxt_poll_fd_hash_proto; 692 lhq.proto = &nxt_poll_fd_hash_proto; 693 694 for ( ;; ) { 695 phe = nxt_lvlhsh_each(lh, &lhe); 696 697 if (phe == NULL) { 698 return; 699 } 700 701 lhq.key_hash = nxt_murmur_hash2(&phe->fd, sizeof(nxt_fd_t)); 702 703 if (nxt_lvlhsh_delete(lh, &lhq) != NXT_OK) { 704 nxt_log(&engine->task, NXT_LOG_CRIT, 705 "event fd %d not found in hash", phe->fd); 706 } 707 708 nxt_free(phe); 709 } 710 } 711