1 2 /* 3 * Copyright (C) Igor Sysoev 4 * Copyright (C) NGINX, Inc. 5 */ 6 7 #include <nxt_main.h> 8 9 10 /* 11 * kqueue() has been introduced in FreeBSD 4.1 and then was ported 12 * to OpenBSD 2.9, MacOSX 10.3 (Panther), and NetBSD 2.0. 13 * DragonFlyBSD inherited it with FreeBSD 4 code base. 14 * 15 * NOTE_REVOKE has been introduced in FreeBSD 4.3 and then was ported 16 * to OpenBSD 2.9, MacOSX 10.3 (Panther), and NetBSD 2.0. 17 * DragonFlyBSD inherited it with FreeBSD 4 code base. 18 * 19 * EVFILT_TIMER has been introduced in FreeBSD 4.4-STABLE and then was 20 * ported to NetBSD 2.0, MacOSX 10.4 (Tiger), and OpenBSD 4.2. 21 * DragonFlyBSD inherited it with FreeBSD 4 code base. 22 * 23 * EVFILT_USER and EV_DISPATCH have been introduced in MacOSX 10.6 (Snow 24 * Leopard) as part of the Grand Central Dispatch framework 25 * and then were ported to FreeBSD 8.0-STABLE as part of the 26 * libdispatch support. 27 */ 28 29 30 /* 31 * EV_DISPATCH is better because it just disables an event on delivery 32 * whilst EV_ONESHOT deletes the event. This eliminates in-kernel memory 33 * deallocation and probable subsequent allocation with a lock acquiring. 34 */ 35 #ifdef EV_DISPATCH 36 #define NXT_KEVENT_ONESHOT EV_DISPATCH 37 #else 38 #define NXT_KEVENT_ONESHOT EV_ONESHOT 39 #endif 40 41 42 #if (NXT_NETBSD) 43 /* NetBSD defines the kevent.udata field as intptr_t. */ 44 45 #define nxt_kevent_set_udata(udata) (intptr_t) (udata) 46 #define nxt_kevent_get_udata(udata) (void *) (udata) 47 48 #else 49 #define nxt_kevent_set_udata(udata) (void *) (udata) 50 #define nxt_kevent_get_udata(udata) (udata) 51 #endif 52 53 54 static nxt_int_t nxt_kqueue_create(nxt_event_engine_t *engine, 55 nxt_uint_t mchanges, nxt_uint_t mevents); 56 static void nxt_kqueue_free(nxt_event_engine_t *engine); 57 static void nxt_kqueue_enable(nxt_event_engine_t *engine, nxt_fd_event_t *ev); 58 static void nxt_kqueue_disable(nxt_event_engine_t *engine, nxt_fd_event_t *ev); 59 static void nxt_kqueue_delete(nxt_event_engine_t *engine, nxt_fd_event_t *ev); 60 static nxt_bool_t nxt_kqueue_close(nxt_event_engine_t *engine, 61 nxt_fd_event_t *ev); 62 static void nxt_kqueue_enable_read(nxt_event_engine_t *engine, 63 nxt_fd_event_t *ev); 64 static void nxt_kqueue_enable_write(nxt_event_engine_t *engine, 65 nxt_fd_event_t *ev); 66 static void nxt_kqueue_disable_read(nxt_event_engine_t *engine, 67 nxt_fd_event_t *ev); 68 static void nxt_kqueue_disable_write(nxt_event_engine_t *engine, 69 nxt_fd_event_t *ev); 70 static void nxt_kqueue_block_read(nxt_event_engine_t *engine, 71 nxt_fd_event_t *ev); 72 static void nxt_kqueue_block_write(nxt_event_engine_t *engine, 73 nxt_fd_event_t *ev); 74 static void nxt_kqueue_oneshot_read(nxt_event_engine_t *engine, 75 nxt_fd_event_t *ev); 76 static void nxt_kqueue_oneshot_write(nxt_event_engine_t *engine, 77 nxt_fd_event_t *ev); 78 static void nxt_kqueue_enable_accept(nxt_event_engine_t *engine, 79 nxt_fd_event_t *ev); 80 static void nxt_kqueue_enable_file(nxt_event_engine_t *engine, 81 nxt_file_event_t *ev); 82 static void nxt_kqueue_close_file(nxt_event_engine_t *engine, 83 nxt_file_event_t *ev); 84 static void nxt_kqueue_fd_set(nxt_event_engine_t *engine, nxt_fd_event_t *ev, 85 nxt_int_t filter, nxt_uint_t flags); 86 static struct kevent *nxt_kqueue_get_kevent(nxt_event_engine_t *engine); 87 static void nxt_kqueue_error(nxt_event_engine_t *engine); 88 static void nxt_kqueue_fd_error_handler(nxt_task_t *task, void *obj, 89 void *data); 90 static void nxt_kqueue_file_error_handler(nxt_task_t *task, void *obj, 91 void *data); 92 static nxt_int_t nxt_kqueue_add_signal(nxt_event_engine_t *engine, 93 const nxt_sig_event_t *sigev); 94 #if (NXT_HAVE_EVFILT_USER) 95 static nxt_int_t nxt_kqueue_enable_post(nxt_event_engine_t *engine, 96 nxt_work_handler_t handler); 97 static void nxt_kqueue_signal(nxt_event_engine_t *engine, nxt_uint_t signo); 98 #endif 99 static void nxt_kqueue_poll(nxt_event_engine_t *engine, nxt_msec_t timeout); 100 101 static void nxt_kqueue_conn_io_connect(nxt_task_t *task, void *obj, 102 void *data); 103 static void nxt_kqueue_conn_connected(nxt_task_t *task, void *obj, 104 void *data); 105 static void nxt_kqueue_listen_handler(nxt_task_t *task, void *obj, void *data); 106 static void nxt_kqueue_conn_io_accept(nxt_task_t *task, void *obj, 107 void *data); 108 static void nxt_kqueue_conn_io_read(nxt_task_t *task, void *obj, 109 void *data); 110 static ssize_t nxt_kqueue_conn_io_recvbuf(nxt_conn_t *c, nxt_buf_t *b); 111 112 113 static nxt_conn_io_t nxt_kqueue_conn_io = { 114 .connect = nxt_kqueue_conn_io_connect, 115 .accept = nxt_kqueue_conn_io_accept, 116 117 .read = nxt_kqueue_conn_io_read, 118 .recvbuf = nxt_kqueue_conn_io_recvbuf, 119 .recv = nxt_conn_io_recv, 120 121 .write = nxt_conn_io_write, 122 .sendbuf = nxt_conn_io_sendbuf, 123 124 #if (NXT_HAVE_FREEBSD_SENDFILE) 125 .old_sendbuf = nxt_freebsd_event_conn_io_sendfile, 126 #elif (NXT_HAVE_MACOSX_SENDFILE) 127 .old_sendbuf = nxt_macosx_event_conn_io_sendfile, 128 #else 129 .old_sendbuf = nxt_event_conn_io_sendbuf, 130 #endif 131 132 .writev = nxt_event_conn_io_writev, 133 .send = nxt_event_conn_io_send, 134 }; 135 136 137 const nxt_event_interface_t nxt_kqueue_engine = { 138 "kqueue", 139 nxt_kqueue_create, 140 nxt_kqueue_free, 141 nxt_kqueue_enable, 142 nxt_kqueue_disable, 143 nxt_kqueue_delete, 144 nxt_kqueue_close, 145 nxt_kqueue_enable_read, 146 nxt_kqueue_enable_write, 147 nxt_kqueue_disable_read, 148 nxt_kqueue_disable_write, 149 nxt_kqueue_block_read, 150 nxt_kqueue_block_write, 151 nxt_kqueue_oneshot_read, 152 nxt_kqueue_oneshot_write, 153 nxt_kqueue_enable_accept, 154 nxt_kqueue_enable_file, 155 nxt_kqueue_close_file, 156 #if (NXT_HAVE_EVFILT_USER) 157 nxt_kqueue_enable_post, 158 nxt_kqueue_signal, 159 #else 160 NULL, 161 NULL, 162 #endif 163 nxt_kqueue_poll, 164 165 &nxt_kqueue_conn_io, 166 167 NXT_FILE_EVENTS, 168 NXT_SIGNAL_EVENTS, 169 }; 170 171 172 static nxt_int_t 173 nxt_kqueue_create(nxt_event_engine_t *engine, nxt_uint_t mchanges, 174 nxt_uint_t mevents) 175 { 176 const nxt_sig_event_t *sigev; 177 178 engine->u.kqueue.fd = -1; 179 engine->u.kqueue.mchanges = mchanges; 180 engine->u.kqueue.mevents = mevents; 181 engine->u.kqueue.pid = nxt_pid; 182 183 engine->u.kqueue.changes = nxt_malloc(sizeof(struct kevent) * mchanges); 184 if (engine->u.kqueue.changes == NULL) { 185 goto fail; 186 } 187 188 engine->u.kqueue.events = nxt_malloc(sizeof(struct kevent) * mevents); 189 if (engine->u.kqueue.events == NULL) { 190 goto fail; 191 } 192 193 engine->u.kqueue.fd = kqueue(); 194 if (engine->u.kqueue.fd == -1) { 195 nxt_alert(&engine->task, "kqueue() failed %E", nxt_errno); 196 goto fail; 197 } 198 199 nxt_debug(&engine->task, "kqueue(): %d", engine->u.kqueue.fd); 200 201 if (engine->signals != NULL) { 202 for (sigev = engine->signals->sigev; sigev->signo != 0; sigev++) { 203 if (nxt_kqueue_add_signal(engine, sigev) != NXT_OK) { 204 goto fail; 205 } 206 } 207 } 208 209 return NXT_OK; 210 211 fail: 212 213 nxt_kqueue_free(engine); 214 215 return NXT_ERROR; 216 } 217 218 219 static void 220 nxt_kqueue_free(nxt_event_engine_t *engine) 221 { 222 nxt_fd_t fd; 223 224 fd = engine->u.kqueue.fd; 225 226 nxt_debug(&engine->task, "kqueue %d free", fd); 227 228 if (fd != -1 && engine->u.kqueue.pid == nxt_pid) { 229 /* kqueue is not inherited by fork() */ 230 231 if (close(fd) != 0) { 232 nxt_alert(&engine->task, "kqueue close(%d) failed %E", 233 fd, nxt_errno); 234 } 235 } 236 237 nxt_free(engine->u.kqueue.events); 238 nxt_free(engine->u.kqueue.changes); 239 240 nxt_memzero(&engine->u.kqueue, sizeof(nxt_kqueue_engine_t)); 241 } 242 243 244 static void 245 nxt_kqueue_enable(nxt_event_engine_t *engine, nxt_fd_event_t *ev) 246 { 247 nxt_kqueue_enable_read(engine, ev); 248 nxt_kqueue_enable_write(engine, ev); 249 } 250 251 252 /* 253 * EV_DISABLE is better because it eliminates in-kernel memory 254 * deallocation and probable subsequent allocation with a lock acquiring. 255 */ 256 257 static void 258 nxt_kqueue_disable(nxt_event_engine_t *engine, nxt_fd_event_t *ev) 259 { 260 if (ev->read != NXT_EVENT_INACTIVE) { 261 ev->read = NXT_EVENT_INACTIVE; 262 nxt_kqueue_fd_set(engine, ev, EVFILT_READ, EV_DISABLE); 263 } 264 265 if (ev->write != NXT_EVENT_INACTIVE) { 266 ev->write = NXT_EVENT_INACTIVE; 267 nxt_kqueue_fd_set(engine, ev, EVFILT_WRITE, EV_DISABLE); 268 } 269 } 270 271 272 static void 273 nxt_kqueue_delete(nxt_event_engine_t *engine, nxt_fd_event_t *ev) 274 { 275 if (ev->read != NXT_EVENT_INACTIVE) { 276 ev->read = NXT_EVENT_INACTIVE; 277 nxt_kqueue_fd_set(engine, ev, EVFILT_READ, EV_DELETE); 278 } 279 280 if (ev->write != NXT_EVENT_INACTIVE) { 281 ev->write = NXT_EVENT_INACTIVE; 282 nxt_kqueue_fd_set(engine, ev, EVFILT_WRITE, EV_DELETE); 283 } 284 } 285 286 287 /* 288 * kqueue(2): 289 * 290 * Calling close() on a file descriptor will remove any kevents that 291 * reference the descriptor. 292 * 293 * So nxt_kqueue_close() returns true only if there are pending events. 294 */ 295 296 static nxt_bool_t 297 nxt_kqueue_close(nxt_event_engine_t *engine, nxt_fd_event_t *ev) 298 { 299 struct kevent *kev, *end; 300 301 ev->read = NXT_EVENT_INACTIVE; 302 ev->write = NXT_EVENT_INACTIVE; 303 304 end = &engine->u.kqueue.changes[engine->u.kqueue.nchanges]; 305 306 for (kev = engine->u.kqueue.changes; kev < end; kev++) { 307 if (kev->ident == (uintptr_t) ev->fd) { 308 return 1; 309 } 310 } 311 312 return 0; 313 } 314 315 316 /* 317 * The kqueue event engine uses only three states: inactive, blocked, and 318 * active. An active oneshot event is marked as it is in the default 319 * state. The event will be converted eventually to the default EV_CLEAR 320 * mode after it will become inactive after delivery. 321 */ 322 323 static void 324 nxt_kqueue_enable_read(nxt_event_engine_t *engine, nxt_fd_event_t *ev) 325 { 326 if (ev->read == NXT_EVENT_INACTIVE) { 327 nxt_kqueue_fd_set(engine, ev, EVFILT_READ, 328 EV_ADD | EV_ENABLE | EV_CLEAR); 329 } 330 331 ev->read = NXT_EVENT_ACTIVE; 332 } 333 334 335 static void 336 nxt_kqueue_enable_write(nxt_event_engine_t *engine, nxt_fd_event_t *ev) 337 { 338 if (ev->write == NXT_EVENT_INACTIVE) { 339 nxt_kqueue_fd_set(engine, ev, EVFILT_WRITE, 340 EV_ADD | EV_ENABLE | EV_CLEAR); 341 } 342 343 ev->write = NXT_EVENT_ACTIVE; 344 } 345 346 347 static void 348 nxt_kqueue_disable_read(nxt_event_engine_t *engine, nxt_fd_event_t *ev) 349 { 350 ev->read = NXT_EVENT_INACTIVE; 351 352 nxt_kqueue_fd_set(engine, ev, EVFILT_READ, EV_DISABLE); 353 } 354 355 356 static void 357 nxt_kqueue_disable_write(nxt_event_engine_t *engine, nxt_fd_event_t *ev) 358 { 359 ev->write = NXT_EVENT_INACTIVE; 360 361 nxt_kqueue_fd_set(engine, ev, EVFILT_WRITE, EV_DISABLE); 362 } 363 364 365 static void 366 nxt_kqueue_block_read(nxt_event_engine_t *engine, nxt_fd_event_t *ev) 367 { 368 if (ev->read != NXT_EVENT_INACTIVE) { 369 ev->read = NXT_EVENT_BLOCKED; 370 } 371 } 372 373 374 static void 375 nxt_kqueue_block_write(nxt_event_engine_t *engine, nxt_fd_event_t *ev) 376 { 377 if (ev->write != NXT_EVENT_INACTIVE) { 378 ev->write = NXT_EVENT_BLOCKED; 379 } 380 } 381 382 383 static void 384 nxt_kqueue_oneshot_read(nxt_event_engine_t *engine, nxt_fd_event_t *ev) 385 { 386 ev->write = NXT_EVENT_ACTIVE; 387 388 nxt_kqueue_fd_set(engine, ev, EVFILT_WRITE, 389 EV_ADD | EV_ENABLE | NXT_KEVENT_ONESHOT); 390 } 391 392 393 static void 394 nxt_kqueue_oneshot_write(nxt_event_engine_t *engine, nxt_fd_event_t *ev) 395 { 396 ev->write = NXT_EVENT_ACTIVE; 397 398 nxt_kqueue_fd_set(engine, ev, EVFILT_WRITE, 399 EV_ADD | EV_ENABLE | NXT_KEVENT_ONESHOT); 400 } 401 402 403 static void 404 nxt_kqueue_enable_accept(nxt_event_engine_t *engine, nxt_fd_event_t *ev) 405 { 406 ev->read = NXT_EVENT_ACTIVE; 407 ev->read_handler = nxt_kqueue_listen_handler; 408 409 nxt_kqueue_fd_set(engine, ev, EVFILT_READ, EV_ADD | EV_ENABLE); 410 } 411 412 413 static void 414 nxt_kqueue_enable_file(nxt_event_engine_t *engine, nxt_file_event_t *ev) 415 { 416 struct kevent *kev; 417 418 const nxt_int_t flags = EV_ADD | EV_ENABLE | EV_ONESHOT; 419 const nxt_uint_t fflags = NOTE_DELETE | NOTE_WRITE | NOTE_EXTEND 420 | NOTE_ATTRIB | NOTE_RENAME | NOTE_REVOKE; 421 422 nxt_debug(&engine->task, "kevent(%d) set: id:%d ft:%i fl:%04Xd, ff:%04XuD", 423 engine->u.kqueue.fd, ev->file->fd, EVFILT_VNODE, flags, fflags); 424 425 kev = nxt_kqueue_get_kevent(engine); 426 427 kev->ident = ev->file->fd; 428 kev->filter = EVFILT_VNODE; 429 kev->flags = flags; 430 kev->fflags = fflags; 431 kev->data = 0; 432 kev->udata = nxt_kevent_set_udata(ev); 433 } 434 435 436 static void 437 nxt_kqueue_close_file(nxt_event_engine_t *engine, nxt_file_event_t *ev) 438 { 439 /* TODO: pending event. */ 440 } 441 442 443 static void 444 nxt_kqueue_fd_set(nxt_event_engine_t *engine, nxt_fd_event_t *ev, 445 nxt_int_t filter, nxt_uint_t flags) 446 { 447 struct kevent *kev; 448 449 nxt_debug(ev->task, "kevent(%d) set event: id:%d ft:%i fl:%04Xui", 450 engine->u.kqueue.fd, ev->fd, filter, flags); 451 452 kev = nxt_kqueue_get_kevent(engine); 453 454 kev->ident = ev->fd; 455 kev->filter = filter; 456 kev->flags = flags; 457 kev->fflags = 0; 458 kev->data = 0; 459 kev->udata = nxt_kevent_set_udata(ev); 460 } 461 462 463 static struct kevent * 464 nxt_kqueue_get_kevent(nxt_event_engine_t *engine) 465 { 466 int ret, nchanges; 467 468 nchanges = engine->u.kqueue.nchanges; 469 470 if (nxt_slow_path(nchanges >= engine->u.kqueue.mchanges)) { 471 472 nxt_debug(&engine->task, "kevent(%d) changes:%d", 473 engine->u.kqueue.fd, nchanges); 474 475 ret = kevent(engine->u.kqueue.fd, engine->u.kqueue.changes, nchanges, 476 NULL, 0, NULL); 477 478 if (nxt_slow_path(ret != 0)) { 479 nxt_alert(&engine->task, "kevent(%d) failed %E", 480 engine->u.kqueue.fd, nxt_errno); 481 482 nxt_kqueue_error(engine); 483 } 484 485 engine->u.kqueue.nchanges = 0; 486 } 487 488 return &engine->u.kqueue.changes[engine->u.kqueue.nchanges++]; 489 } 490 491 492 static void 493 nxt_kqueue_error(nxt_event_engine_t *engine) 494 { 495 struct kevent *kev, *end; 496 nxt_fd_event_t *ev; 497 nxt_file_event_t *fev; 498 nxt_work_queue_t *wq; 499 500 wq = &engine->fast_work_queue; 501 end = &engine->u.kqueue.changes[engine->u.kqueue.nchanges]; 502 503 for (kev = engine->u.kqueue.changes; kev < end; kev++) { 504 505 switch (kev->filter) { 506 507 case EVFILT_READ: 508 case EVFILT_WRITE: 509 ev = nxt_kevent_get_udata(kev->udata); 510 nxt_work_queue_add(wq, nxt_kqueue_fd_error_handler, 511 ev->task, ev, ev->data); 512 break; 513 514 case EVFILT_VNODE: 515 fev = nxt_kevent_get_udata(kev->udata); 516 nxt_work_queue_add(wq, nxt_kqueue_file_error_handler, 517 fev->task, fev, fev->data); 518 break; 519 } 520 } 521 } 522 523 524 static void 525 nxt_kqueue_fd_error_handler(nxt_task_t *task, void *obj, void *data) 526 { 527 nxt_fd_event_t *ev; 528 529 ev = obj; 530 531 nxt_debug(task, "kqueue fd error handler fd:%d", ev->fd); 532 533 if (ev->kq_eof && ev->kq_errno != 0) { 534 ev->error = ev->kq_errno; 535 nxt_log(task, nxt_socket_error_level(ev->kq_errno), 536 "kevent() reported error on descriptor %d %E", 537 ev->fd, ev->kq_errno); 538 } 539 540 ev->read = NXT_EVENT_INACTIVE; 541 ev->write = NXT_EVENT_INACTIVE; 542 ev->error = ev->kq_errno; 543 544 ev->error_handler(task, ev, data); 545 } 546 547 548 static void 549 nxt_kqueue_file_error_handler(nxt_task_t *task, void *obj, void *data) 550 { 551 nxt_file_event_t *ev; 552 553 ev = obj; 554 555 nxt_debug(task, "kqueue file error handler fd:%d", ev->file->fd); 556 557 ev->handler(task, ev, data); 558 } 559 560 561 static nxt_int_t 562 nxt_kqueue_add_signal(nxt_event_engine_t *engine, const nxt_sig_event_t *sigev) 563 { 564 int signo; 565 struct kevent kev; 566 struct sigaction sa; 567 568 signo = sigev->signo; 569 570 nxt_memzero(&sa, sizeof(struct sigaction)); 571 sigemptyset(&sa.sa_mask); 572 573 /* 574 * SIGCHLD must not be set to SIG_IGN, since kqueue cannot catch 575 * this signal. It should be set to SIG_DFL instead. And although 576 * SIGCHLD default action is also ignoring, nevertheless SIG_DFL 577 * allows kqueue to catch the signal. 578 */ 579 sa.sa_handler = (signo == SIGCHLD) ? SIG_DFL : SIG_IGN; 580 581 if (sigaction(signo, &sa, NULL) != 0) { 582 nxt_alert(&engine->task, "sigaction(%d) failed %E", signo, nxt_errno); 583 584 return NXT_ERROR; 585 } 586 587 nxt_debug(&engine->task, "kevent(%d) signo:%d (%s)", 588 engine->u.kqueue.fd, signo, sigev->name); 589 590 kev.ident = signo; 591 kev.filter = EVFILT_SIGNAL; 592 kev.flags = EV_ADD; 593 kev.fflags = 0; 594 kev.data = 0; 595 kev.udata = nxt_kevent_set_udata(sigev); 596 597 if (kevent(engine->u.kqueue.fd, &kev, 1, NULL, 0, NULL) == 0) { 598 return NXT_OK; 599 } 600 601 nxt_alert(&engine->task, "kevent(%d) failed %E", kqueue, nxt_errno); 602 603 return NXT_ERROR; 604 } 605 606 607 #if (NXT_HAVE_EVFILT_USER) 608 609 static nxt_int_t 610 nxt_kqueue_enable_post(nxt_event_engine_t *engine, nxt_work_handler_t handler) 611 { 612 struct kevent kev; 613 614 /* EVFILT_USER must be added to a kqueue before it can be triggered. */ 615 616 kev.ident = 0; 617 kev.filter = EVFILT_USER; 618 kev.flags = EV_ADD | EV_CLEAR; 619 kev.fflags = 0; 620 kev.data = 0; 621 kev.udata = NULL; 622 623 engine->u.kqueue.post_handler = handler; 624 625 if (kevent(engine->u.kqueue.fd, &kev, 1, NULL, 0, NULL) == 0) { 626 return NXT_OK; 627 } 628 629 nxt_alert(&engine->task, "kevent(%d) failed %E", 630 engine->u.kqueue.fd, nxt_errno); 631 632 return NXT_ERROR; 633 } 634 635 636 static void 637 nxt_kqueue_signal(nxt_event_engine_t *engine, nxt_uint_t signo) 638 { 639 struct kevent kev; 640 641 /* 642 * kqueue has a builtin signal processing support, so the function 643 * is used only to post events and the signo argument is ignored. 644 */ 645 646 kev.ident = 0; 647 kev.filter = EVFILT_USER; 648 kev.flags = 0; 649 kev.fflags = NOTE_TRIGGER; 650 kev.data = 0; 651 kev.udata = NULL; 652 653 if (kevent(engine->u.kqueue.fd, &kev, 1, NULL, 0, NULL) != 0) { 654 nxt_alert(&engine->task, "kevent(%d) failed %E", 655 engine->u.kqueue.fd, nxt_errno); 656 } 657 } 658 659 #endif 660 661 662 static void 663 nxt_kqueue_poll(nxt_event_engine_t *engine, nxt_msec_t timeout) 664 { 665 int nevents; 666 void *obj, *data; 667 nxt_int_t i; 668 nxt_err_t err; 669 nxt_uint_t level; 670 nxt_bool_t error, eof; 671 nxt_task_t *task; 672 struct kevent *kev; 673 nxt_fd_event_t *ev; 674 nxt_sig_event_t *sigev; 675 struct timespec ts, *tp; 676 nxt_file_event_t *fev; 677 nxt_work_queue_t *wq; 678 nxt_work_handler_t handler; 679 680 if (timeout == NXT_INFINITE_MSEC) { 681 tp = NULL; 682 683 } else { 684 ts.tv_sec = timeout / 1000; 685 ts.tv_nsec = (timeout % 1000) * 1000000; 686 tp = &ts; 687 } 688 689 nxt_debug(&engine->task, "kevent(%d) changes:%d timeout:%M", 690 engine->u.kqueue.fd, engine->u.kqueue.nchanges, timeout); 691 692 nevents = kevent(engine->u.kqueue.fd, 693 engine->u.kqueue.changes, engine->u.kqueue.nchanges, 694 engine->u.kqueue.events, engine->u.kqueue.mevents, tp); 695 696 err = (nevents == -1) ? nxt_errno : 0; 697 698 nxt_thread_time_update(engine->task.thread); 699 700 nxt_debug(&engine->task, "kevent(%d): %d", engine->u.kqueue.fd, nevents); 701 702 if (nevents == -1) { 703 level = (err == NXT_EINTR) ? NXT_LOG_INFO : NXT_LOG_ALERT; 704 705 nxt_log(&engine->task, level, "kevent(%d) failed %E", 706 engine->u.kqueue.fd, err); 707 708 nxt_kqueue_error(engine); 709 return; 710 } 711 712 engine->u.kqueue.nchanges = 0; 713 714 for (i = 0; i < nevents; i++) { 715 716 kev = &engine->u.kqueue.events[i]; 717 718 nxt_debug(&engine->task, 719 (kev->ident > 0x8000000 && kev->ident != (uintptr_t) -1) ? 720 "kevent: id:%p ft:%d fl:%04Xd ff:%d d:%d ud:%p": 721 "kevent: id:%d ft:%d fl:%04Xd ff:%d d:%d ud:%p", 722 kev->ident, kev->filter, kev->flags, kev->fflags, 723 kev->data, kev->udata); 724 725 error = (kev->flags & EV_ERROR); 726 727 if (nxt_slow_path(error)) { 728 nxt_alert(&engine->task, 729 "kevent(%d) error %E on ident:%d filter:%d", 730 engine->u.kqueue.fd, kev->data, kev->ident, kev->filter); 731 } 732 733 task = &engine->task; 734 wq = &engine->fast_work_queue; 735 handler = nxt_kqueue_fd_error_handler; 736 obj = nxt_kevent_get_udata(kev->udata); 737 738 switch (kev->filter) { 739 740 case EVFILT_READ: 741 ev = obj; 742 ev->read_ready = 1; 743 ev->kq_available = (int32_t) kev->data; 744 err = kev->fflags; 745 eof = (kev->flags & EV_EOF) != 0; 746 ev->kq_errno = err; 747 ev->kq_eof = eof; 748 749 if (ev->read <= NXT_EVENT_BLOCKED) { 750 nxt_debug(ev->task, "blocked read event fd:%d", ev->fd); 751 continue; 752 } 753 754 if ((kev->flags & NXT_KEVENT_ONESHOT) != 0) { 755 ev->read = NXT_EVENT_INACTIVE; 756 } 757 758 if (nxt_slow_path(ev->kq_available == 0 && eof && err != 0)) { 759 error = 1; 760 } 761 762 if (nxt_fast_path(!error)) { 763 handler = ev->read_handler; 764 wq = ev->read_work_queue; 765 } 766 767 task = ev->task; 768 data = ev->data; 769 770 break; 771 772 case EVFILT_WRITE: 773 ev = obj; 774 ev->write_ready = 1; 775 err = kev->fflags; 776 eof = (kev->flags & EV_EOF) != 0; 777 ev->kq_errno = err; 778 ev->kq_eof = eof; 779 780 if (ev->write <= NXT_EVENT_BLOCKED) { 781 nxt_debug(ev->task, "blocked write event fd:%d", ev->fd); 782 continue; 783 } 784 785 if ((kev->flags & NXT_KEVENT_ONESHOT) != 0) { 786 ev->write = NXT_EVENT_INACTIVE; 787 } 788 789 if (nxt_slow_path(eof && err != 0)) { 790 error = 1; 791 } 792 793 if (nxt_fast_path(!error)) { 794 handler = ev->write_handler; 795 wq = ev->write_work_queue; 796 } 797 798 task = ev->task; 799 data = ev->data; 800 801 break; 802 803 case EVFILT_VNODE: 804 fev = obj; 805 handler = fev->handler; 806 task = fev->task; 807 data = fev->data; 808 break; 809 810 case EVFILT_SIGNAL: 811 sigev = obj; 812 obj = (void *) kev->ident; 813 handler = sigev->handler; 814 data = (void *) sigev->name; 815 break; 816 817 #if (NXT_HAVE_EVFILT_USER) 818 819 case EVFILT_USER: 820 handler = engine->u.kqueue.post_handler; 821 data = NULL; 822 break; 823 824 #endif 825 826 default: 827 828 #if (NXT_DEBUG) 829 nxt_alert(&engine->task, 830 "unexpected kevent(%d) filter %d on ident %d", 831 engine->u.kqueue.fd, kev->filter, kev->ident); 832 #endif 833 834 continue; 835 } 836 837 nxt_work_queue_add(wq, handler, task, obj, data); 838 } 839 } 840 841 842 /* 843 * nxt_kqueue_event_conn_io_connect() eliminates the 844 * getsockopt() syscall to test pending connect() error. 845 */ 846 847 static void 848 nxt_kqueue_conn_io_connect(nxt_task_t *task, void *obj, void *data) 849 { 850 nxt_conn_t *c; 851 nxt_event_engine_t *engine; 852 nxt_work_handler_t handler; 853 const nxt_event_conn_state_t *state; 854 855 c = obj; 856 857 state = c->write_state; 858 859 switch (nxt_socket_connect(task, c->socket.fd, c->remote)) { 860 861 case NXT_OK: 862 c->socket.write_ready = 1; 863 handler = state->ready_handler; 864 break; 865 866 case NXT_AGAIN: 867 c->socket.write_handler = nxt_kqueue_conn_connected; 868 c->socket.error_handler = nxt_conn_connect_error; 869 870 engine = task->thread->engine; 871 nxt_conn_timer(engine, c, state, &c->write_timer); 872 873 nxt_kqueue_enable_write(engine, &c->socket); 874 return; 875 876 case NXT_DECLINED: 877 handler = state->close_handler; 878 break; 879 880 default: /* NXT_ERROR */ 881 handler = state->error_handler; 882 break; 883 } 884 885 nxt_work_queue_add(c->write_work_queue, handler, task, c, data); 886 } 887 888 889 static void 890 nxt_kqueue_conn_connected(nxt_task_t *task, void *obj, void *data) 891 { 892 nxt_conn_t *c; 893 894 c = obj; 895 896 nxt_debug(task, "kqueue conn connected fd:%d", c->socket.fd); 897 898 c->socket.write = NXT_EVENT_BLOCKED; 899 900 if (c->write_state->timer_autoreset) { 901 nxt_timer_disable(task->thread->engine, &c->write_timer); 902 } 903 904 nxt_work_queue_add(c->write_work_queue, c->write_state->ready_handler, 905 task, c, data); 906 } 907 908 909 static void 910 nxt_kqueue_listen_handler(nxt_task_t *task, void *obj, void *data) 911 { 912 nxt_listen_event_t *lev; 913 914 lev = obj; 915 916 nxt_debug(task, "kevent fd:%d avail:%D", 917 lev->socket.fd, lev->socket.kq_available); 918 919 lev->ready = nxt_min(lev->batch, (uint32_t) lev->socket.kq_available); 920 921 nxt_kqueue_conn_io_accept(task, lev, data); 922 } 923 924 925 static void 926 nxt_kqueue_conn_io_accept(nxt_task_t *task, void *obj, void *data) 927 { 928 socklen_t socklen; 929 nxt_conn_t *c; 930 nxt_socket_t s; 931 struct sockaddr *sa; 932 nxt_listen_event_t *lev; 933 934 lev = obj; 935 c = lev->next; 936 937 lev->ready--; 938 lev->socket.read_ready = (lev->ready != 0); 939 940 lev->socket.kq_available--; 941 lev->socket.read_ready = (lev->socket.kq_available != 0); 942 943 sa = &c->remote->u.sockaddr; 944 socklen = c->remote->socklen; 945 /* 946 * The returned socklen is ignored here, 947 * see comment in nxt_conn_io_accept(). 948 */ 949 s = accept(lev->socket.fd, sa, &socklen); 950 951 if (s != -1) { 952 c->socket.fd = s; 953 954 nxt_debug(task, "accept(%d): %d", lev->socket.fd, s); 955 956 nxt_conn_accept(task, lev, c); 957 return; 958 } 959 960 nxt_conn_accept_error(task, lev, "accept", nxt_errno); 961 } 962 963 964 /* 965 * nxt_kqueue_conn_io_read() is just a wrapper to eliminate the 966 * readv() or recv() syscall if a remote side just closed connection. 967 */ 968 969 static void 970 nxt_kqueue_conn_io_read(nxt_task_t *task, void *obj, void *data) 971 { 972 nxt_conn_t *c; 973 974 c = obj; 975 976 nxt_debug(task, "kqueue conn read fd:%d", c->socket.fd); 977 978 if (c->socket.kq_available == 0 && c->socket.kq_eof) { 979 nxt_debug(task, "kevent fd:%d eof", c->socket.fd); 980 981 c->socket.closed = 1; 982 nxt_work_queue_add(c->read_work_queue, c->read_state->close_handler, 983 task, c, data); 984 return; 985 } 986 987 nxt_conn_io_read(task, c, data); 988 } 989 990 991 /* 992 * nxt_kqueue_conn_io_recvbuf() is just wrapper around standard 993 * nxt_conn_io_recvbuf() to eliminate the readv() or recv() syscalls 994 * if there is no pending data or a remote side closed connection. 995 */ 996 997 static ssize_t 998 nxt_kqueue_conn_io_recvbuf(nxt_conn_t *c, nxt_buf_t *b) 999 { 1000 ssize_t n; 1001 1002 if (c->socket.kq_available == 0 && c->socket.kq_eof) { 1003 c->socket.closed = 1; 1004 return 0; 1005 } 1006 1007 n = nxt_conn_io_recvbuf(c, b); 1008 1009 if (n > 0) { 1010 c->socket.kq_available -= n; 1011 1012 if (c->socket.kq_available < 0) { 1013 c->socket.kq_available = 0; 1014 } 1015 1016 nxt_debug(c->socket.task, "kevent fd:%d avail:%D eof:%d", 1017 c->socket.fd, c->socket.kq_available, c->socket.kq_eof); 1018 1019 c->socket.read_ready = (c->socket.kq_available != 0 1020 || c->socket.kq_eof); 1021 } 1022 1023 return n; 1024 } 1025