1 2 /* 3 * Copyright (C) Igor Sysoev 4 * Copyright (C) NGINX, Inc. 5 */ 6 7 #include <nxt_main.h> 8 9 10 /* 11 * kqueue() has been introduced in FreeBSD 4.1 and then was ported 12 * to OpenBSD 2.9, MacOSX 10.3 (Panther), and NetBSD 2.0. 13 * DragonFlyBSD inherited it with FreeBSD 4 code base. 14 * 15 * NOTE_REVOKE has been introduced in FreeBSD 4.3 and then was ported 16 * to OpenBSD 2.9, MacOSX 10.3 (Panther), and NetBSD 2.0. 17 * DragonFlyBSD inherited it with FreeBSD 4 code base. 18 * 19 * EVFILT_TIMER has been introduced in FreeBSD 4.4-STABLE and then was 20 * ported to NetBSD 2.0, MacOSX 10.4 (Tiger), and OpenBSD 4.2. 21 * DragonFlyBSD inherited it with FreeBSD 4 code base. 22 * 23 * EVFILT_USER and EV_DISPATCH have been introduced in MacOSX 10.6 (Snow 24 * Leopard) as part of the Grand Central Dispatch framework 25 * and then were ported to FreeBSD 8.0-STABLE as part of the 26 * libdispatch support. 27 */ 28 29 30 /* 31 * EV_DISPATCH is better because it just disables an event on delivery 32 * whilst EV_ONESHOT deletes the event. This eliminates in-kernel memory 33 * deallocation and probable subsequent allocation with a lock acquiring. 34 */ 35 #ifdef EV_DISPATCH 36 #define NXT_KEVENT_ONESHOT EV_DISPATCH 37 #else 38 #define NXT_KEVENT_ONESHOT EV_ONESHOT 39 #endif 40 41 42 #if (NXT_NETBSD) 43 /* NetBSD defines the kevent.udata field as intptr_t. */ 44 45 #define nxt_kevent_set_udata(udata) (intptr_t) (udata) 46 #define nxt_kevent_get_udata(udata) (void *) (udata) 47 48 #else 49 #define nxt_kevent_set_udata(udata) (void *) (udata) 50 #define nxt_kevent_get_udata(udata) (udata) 51 #endif 52 53 54 static nxt_int_t nxt_kqueue_create(nxt_event_engine_t *engine, 55 nxt_uint_t mchanges, nxt_uint_t mevents); 56 static void nxt_kqueue_free(nxt_event_engine_t *engine); 57 static void nxt_kqueue_enable(nxt_event_engine_t *engine, nxt_fd_event_t *ev); 58 static void nxt_kqueue_disable(nxt_event_engine_t *engine, nxt_fd_event_t *ev); 59 static void nxt_kqueue_delete(nxt_event_engine_t *engine, nxt_fd_event_t *ev); 60 static nxt_bool_t nxt_kqueue_close(nxt_event_engine_t *engine, 61 nxt_fd_event_t *ev); 62 static void nxt_kqueue_enable_read(nxt_event_engine_t *engine, 63 nxt_fd_event_t *ev); 64 static void nxt_kqueue_enable_write(nxt_event_engine_t *engine, 65 nxt_fd_event_t *ev); 66 static void nxt_kqueue_disable_read(nxt_event_engine_t *engine, 67 nxt_fd_event_t *ev); 68 static void nxt_kqueue_disable_write(nxt_event_engine_t *engine, 69 nxt_fd_event_t *ev); 70 static void nxt_kqueue_block_read(nxt_event_engine_t *engine, 71 nxt_fd_event_t *ev); 72 static void nxt_kqueue_block_write(nxt_event_engine_t *engine, 73 nxt_fd_event_t *ev); 74 static void nxt_kqueue_oneshot_read(nxt_event_engine_t *engine, 75 nxt_fd_event_t *ev); 76 static void nxt_kqueue_oneshot_write(nxt_event_engine_t *engine, 77 nxt_fd_event_t *ev); 78 static void nxt_kqueue_enable_accept(nxt_event_engine_t *engine, 79 nxt_fd_event_t *ev); 80 static void nxt_kqueue_enable_file(nxt_event_engine_t *engine, 81 nxt_event_file_t *ev); 82 static void nxt_kqueue_close_file(nxt_event_engine_t *engine, 83 nxt_event_file_t *ev); 84 static void nxt_kqueue_fd_set(nxt_event_engine_t *engine, nxt_fd_event_t *ev, 85 nxt_int_t filter, nxt_uint_t flags); 86 static struct kevent *nxt_kqueue_get_kevent(nxt_event_engine_t *engine); 87 static void nxt_kqueue_error(nxt_event_engine_t *engine); 88 static void nxt_kqueue_fd_error_handler(nxt_task_t *task, void *obj, 89 void *data); 90 static void nxt_kqueue_file_error_handler(nxt_task_t *task, void *obj, 91 void *data); 92 static nxt_int_t nxt_kqueue_add_signal(nxt_event_engine_t *engine, 93 const nxt_sig_event_t *sigev); 94 #if (NXT_HAVE_EVFILT_USER) 95 static nxt_int_t nxt_kqueue_enable_post(nxt_event_engine_t *engine, 96 nxt_work_handler_t handler); 97 static void nxt_kqueue_signal(nxt_event_engine_t *engine, nxt_uint_t signo); 98 #endif 99 static void nxt_kqueue_poll(nxt_event_engine_t *engine, nxt_msec_t timeout); 100 101 static void nxt_kqueue_event_conn_io_connect(nxt_task_t *task, void *obj, 102 void *data); 103 static void nxt_kqueue_event_conn_connected(nxt_task_t *task, void *obj, 104 void *data); 105 static void nxt_kqueue_listen_handler(nxt_task_t *task, void *obj, void *data); 106 static void nxt_kqueue_event_conn_io_accept(nxt_task_t *task, void *obj, 107 void *data); 108 static void nxt_kqueue_event_conn_io_read(nxt_task_t *task, void *obj, 109 void *data); 110 static ssize_t nxt_kqueue_event_conn_io_recvbuf(nxt_event_conn_t *c, 111 nxt_buf_t *b); 112 113 114 static nxt_event_conn_io_t nxt_kqueue_event_conn_io = { 115 nxt_kqueue_event_conn_io_connect, 116 nxt_kqueue_event_conn_io_accept, 117 118 nxt_kqueue_event_conn_io_read, 119 nxt_kqueue_event_conn_io_recvbuf, 120 nxt_event_conn_io_recv, 121 122 nxt_conn_io_write, 123 nxt_event_conn_io_write_chunk, 124 125 #if (NXT_HAVE_FREEBSD_SENDFILE) 126 nxt_freebsd_event_conn_io_sendfile, 127 #elif (NXT_HAVE_MACOSX_SENDFILE) 128 nxt_macosx_event_conn_io_sendfile, 129 #else 130 nxt_event_conn_io_sendbuf, 131 #endif 132 133 nxt_event_conn_io_writev, 134 nxt_event_conn_io_send, 135 136 nxt_event_conn_io_shutdown, 137 }; 138 139 140 const nxt_event_interface_t nxt_kqueue_engine = { 141 "kqueue", 142 nxt_kqueue_create, 143 nxt_kqueue_free, 144 nxt_kqueue_enable, 145 nxt_kqueue_disable, 146 nxt_kqueue_delete, 147 nxt_kqueue_close, 148 nxt_kqueue_enable_read, 149 nxt_kqueue_enable_write, 150 nxt_kqueue_disable_read, 151 nxt_kqueue_disable_write, 152 nxt_kqueue_block_read, 153 nxt_kqueue_block_write, 154 nxt_kqueue_oneshot_read, 155 nxt_kqueue_oneshot_write, 156 nxt_kqueue_enable_accept, 157 nxt_kqueue_enable_file, 158 nxt_kqueue_close_file, 159 #if (NXT_HAVE_EVFILT_USER) 160 nxt_kqueue_enable_post, 161 nxt_kqueue_signal, 162 #else 163 NULL, 164 NULL, 165 #endif 166 nxt_kqueue_poll, 167 168 &nxt_kqueue_event_conn_io, 169 170 NXT_FILE_EVENTS, 171 NXT_SIGNAL_EVENTS, 172 }; 173 174 175 static nxt_int_t 176 nxt_kqueue_create(nxt_event_engine_t *engine, nxt_uint_t mchanges, 177 nxt_uint_t mevents) 178 { 179 const nxt_sig_event_t *sigev; 180 181 engine->u.kqueue.fd = -1; 182 engine->u.kqueue.mchanges = mchanges; 183 engine->u.kqueue.mevents = mevents; 184 engine->u.kqueue.pid = nxt_pid; 185 186 engine->u.kqueue.changes = nxt_malloc(sizeof(struct kevent) * mchanges); 187 if (engine->u.kqueue.changes == NULL) { 188 goto fail; 189 } 190 191 engine->u.kqueue.events = nxt_malloc(sizeof(struct kevent) * mevents); 192 if (engine->u.kqueue.events == NULL) { 193 goto fail; 194 } 195 196 engine->u.kqueue.fd = kqueue(); 197 if (engine->u.kqueue.fd == -1) { 198 nxt_log(&engine->task, NXT_LOG_CRIT, "kqueue() failed %E", nxt_errno); 199 goto fail; 200 } 201 202 nxt_debug(&engine->task, "kqueue(): %d", engine->u.kqueue.fd); 203 204 if (engine->signals != NULL) { 205 for (sigev = engine->signals->sigev; sigev->signo != 0; sigev++) { 206 if (nxt_kqueue_add_signal(engine, sigev) != NXT_OK) { 207 goto fail; 208 } 209 } 210 } 211 212 return NXT_OK; 213 214 fail: 215 216 nxt_kqueue_free(engine); 217 218 return NXT_ERROR; 219 } 220 221 222 static void 223 nxt_kqueue_free(nxt_event_engine_t *engine) 224 { 225 nxt_fd_t fd; 226 227 fd = engine->u.kqueue.fd; 228 229 nxt_debug(&engine->task, "kqueue %d free", fd); 230 231 if (fd != -1 && engine->u.kqueue.pid == nxt_pid) { 232 /* kqueue is not inherited by fork() */ 233 234 if (close(fd) != 0) { 235 nxt_log(&engine->task, NXT_LOG_CRIT, "kqueue close(%d) failed %E", 236 fd, nxt_errno); 237 } 238 } 239 240 nxt_free(engine->u.kqueue.events); 241 nxt_free(engine->u.kqueue.changes); 242 243 nxt_memzero(&engine->u.kqueue, sizeof(nxt_kqueue_engine_t)); 244 } 245 246 247 static void 248 nxt_kqueue_enable(nxt_event_engine_t *engine, nxt_fd_event_t *ev) 249 { 250 nxt_kqueue_enable_read(engine, ev); 251 nxt_kqueue_enable_write(engine, ev); 252 } 253 254 255 /* 256 * EV_DISABLE is better because it eliminates in-kernel memory 257 * deallocation and probable subsequent allocation with a lock acquiring. 258 */ 259 260 static void 261 nxt_kqueue_disable(nxt_event_engine_t *engine, nxt_fd_event_t *ev) 262 { 263 if (ev->read != NXT_EVENT_INACTIVE) { 264 ev->read = NXT_EVENT_INACTIVE; 265 nxt_kqueue_fd_set(engine, ev, EVFILT_READ, EV_DISABLE); 266 } 267 268 if (ev->write != NXT_EVENT_INACTIVE) { 269 ev->write = NXT_EVENT_INACTIVE; 270 nxt_kqueue_fd_set(engine, ev, EVFILT_WRITE, EV_DISABLE); 271 } 272 } 273 274 275 static void 276 nxt_kqueue_delete(nxt_event_engine_t *engine, nxt_fd_event_t *ev) 277 { 278 if (ev->read != NXT_EVENT_INACTIVE) { 279 ev->read = NXT_EVENT_INACTIVE; 280 nxt_kqueue_fd_set(engine, ev, EVFILT_READ, EV_DELETE); 281 } 282 283 if (ev->write != NXT_EVENT_INACTIVE) { 284 ev->write = NXT_EVENT_INACTIVE; 285 nxt_kqueue_fd_set(engine, ev, EVFILT_WRITE, EV_DELETE); 286 } 287 } 288 289 290 /* 291 * kqueue(2): 292 * 293 * Calling close() on a file descriptor will remove any kevents that 294 * reference the descriptor. 295 * 296 * nxt_kqueue_close() always returns true as there are pending events on 297 * closing file descriptor because kevent() passes whole change list at once. 298 */ 299 300 static nxt_bool_t 301 nxt_kqueue_close(nxt_event_engine_t *engine, nxt_fd_event_t *ev) 302 { 303 ev->read = NXT_EVENT_INACTIVE; 304 ev->write = NXT_EVENT_INACTIVE; 305 306 return 1; 307 } 308 309 310 /* 311 * The kqueue event engine uses only three states: inactive, blocked, and 312 * active. An active oneshot event is marked as it is in the default 313 * state. The event will be converted eventually to the default EV_CLEAR 314 * mode after it will become inactive after delivery. 315 */ 316 317 static void 318 nxt_kqueue_enable_read(nxt_event_engine_t *engine, nxt_fd_event_t *ev) 319 { 320 if (ev->read == NXT_EVENT_INACTIVE) { 321 nxt_kqueue_fd_set(engine, ev, EVFILT_READ, 322 EV_ADD | EV_ENABLE | EV_CLEAR); 323 } 324 325 ev->read = NXT_EVENT_ACTIVE; 326 } 327 328 329 static void 330 nxt_kqueue_enable_write(nxt_event_engine_t *engine, nxt_fd_event_t *ev) 331 { 332 if (ev->write == NXT_EVENT_INACTIVE) { 333 nxt_kqueue_fd_set(engine, ev, EVFILT_WRITE, 334 EV_ADD | EV_ENABLE | EV_CLEAR); 335 } 336 337 ev->write = NXT_EVENT_ACTIVE; 338 } 339 340 341 static void 342 nxt_kqueue_disable_read(nxt_event_engine_t *engine, nxt_fd_event_t *ev) 343 { 344 ev->read = NXT_EVENT_INACTIVE; 345 346 nxt_kqueue_fd_set(engine, ev, EVFILT_READ, EV_DISABLE); 347 } 348 349 350 static void 351 nxt_kqueue_disable_write(nxt_event_engine_t *engine, nxt_fd_event_t *ev) 352 { 353 ev->write = NXT_EVENT_INACTIVE; 354 355 nxt_kqueue_fd_set(engine, ev, EVFILT_WRITE, EV_DISABLE); 356 } 357 358 359 static void 360 nxt_kqueue_block_read(nxt_event_engine_t *engine, nxt_fd_event_t *ev) 361 { 362 if (ev->read != NXT_EVENT_INACTIVE) { 363 ev->read = NXT_EVENT_BLOCKED; 364 } 365 } 366 367 368 static void 369 nxt_kqueue_block_write(nxt_event_engine_t *engine, nxt_fd_event_t *ev) 370 { 371 if (ev->write != NXT_EVENT_INACTIVE) { 372 ev->write = NXT_EVENT_BLOCKED; 373 } 374 } 375 376 377 static void 378 nxt_kqueue_oneshot_read(nxt_event_engine_t *engine, nxt_fd_event_t *ev) 379 { 380 ev->write = NXT_EVENT_ACTIVE; 381 382 nxt_kqueue_fd_set(engine, ev, EVFILT_WRITE, 383 EV_ADD | EV_ENABLE | NXT_KEVENT_ONESHOT); 384 } 385 386 387 static void 388 nxt_kqueue_oneshot_write(nxt_event_engine_t *engine, nxt_fd_event_t *ev) 389 { 390 ev->write = NXT_EVENT_ACTIVE; 391 392 nxt_kqueue_fd_set(engine, ev, EVFILT_WRITE, 393 EV_ADD | EV_ENABLE | NXT_KEVENT_ONESHOT); 394 } 395 396 397 static void 398 nxt_kqueue_enable_accept(nxt_event_engine_t *engine, nxt_fd_event_t *ev) 399 { 400 ev->read = NXT_EVENT_ACTIVE; 401 ev->read_handler = nxt_kqueue_listen_handler; 402 403 nxt_kqueue_fd_set(engine, ev, EVFILT_READ, EV_ADD | EV_ENABLE); 404 } 405 406 407 static void 408 nxt_kqueue_enable_file(nxt_event_engine_t *engine, nxt_event_file_t *ev) 409 { 410 struct kevent *kev; 411 412 const nxt_int_t flags = EV_ADD | EV_ENABLE | EV_ONESHOT; 413 const nxt_uint_t fflags = NOTE_DELETE | NOTE_WRITE | NOTE_EXTEND 414 | NOTE_ATTRIB | NOTE_RENAME | NOTE_REVOKE; 415 416 nxt_debug(&engine->task, "kevent(%d) set: id:%d ft:%i fl:%04Xd, ff:%04XuD", 417 engine->u.kqueue.fd, ev->file->fd, EVFILT_VNODE, flags, fflags); 418 419 kev = nxt_kqueue_get_kevent(engine); 420 421 kev->ident = ev->file->fd; 422 kev->filter = EVFILT_VNODE; 423 kev->flags = flags; 424 kev->fflags = fflags; 425 kev->data = 0; 426 kev->udata = nxt_kevent_set_udata(ev); 427 } 428 429 430 static void 431 nxt_kqueue_close_file(nxt_event_engine_t *engine, nxt_event_file_t *ev) 432 { 433 /* TODO: pending event. */ 434 } 435 436 437 static void 438 nxt_kqueue_fd_set(nxt_event_engine_t *engine, nxt_fd_event_t *ev, 439 nxt_int_t filter, nxt_uint_t flags) 440 { 441 struct kevent *kev; 442 443 nxt_debug(ev->task, "kevent(%d) set event: id:%d ft:%i fl:%04Xui", 444 engine->u.kqueue.fd, ev->fd, filter, flags); 445 446 kev = nxt_kqueue_get_kevent(engine); 447 448 kev->ident = ev->fd; 449 kev->filter = filter; 450 kev->flags = flags; 451 kev->fflags = 0; 452 kev->data = 0; 453 kev->udata = nxt_kevent_set_udata(ev); 454 } 455 456 457 static struct kevent * 458 nxt_kqueue_get_kevent(nxt_event_engine_t *engine) 459 { 460 int ret, nchanges; 461 462 nchanges = engine->u.kqueue.nchanges; 463 464 if (nxt_slow_path(nchanges >= engine->u.kqueue.mchanges)) { 465 466 nxt_debug(&engine->task, "kevent(%d) changes:%d", 467 engine->u.kqueue.fd, nchanges); 468 469 ret = kevent(engine->u.kqueue.fd, engine->u.kqueue.changes, nchanges, 470 NULL, 0, NULL); 471 472 if (nxt_slow_path(ret != 0)) { 473 nxt_log(&engine->task, NXT_LOG_CRIT, "kevent(%d) failed %E", 474 engine->u.kqueue.fd, nxt_errno); 475 476 nxt_kqueue_error(engine); 477 } 478 479 engine->u.kqueue.nchanges = 0; 480 } 481 482 return &engine->u.kqueue.changes[engine->u.kqueue.nchanges++]; 483 } 484 485 486 static void 487 nxt_kqueue_error(nxt_event_engine_t *engine) 488 { 489 struct kevent *kev, *end; 490 nxt_fd_event_t *ev; 491 nxt_event_file_t *fev; 492 nxt_work_queue_t *wq; 493 494 wq = &engine->fast_work_queue; 495 end = &engine->u.kqueue.changes[engine->u.kqueue.nchanges]; 496 497 for (kev = engine->u.kqueue.changes; kev < end; kev++) { 498 499 switch (kev->filter) { 500 501 case EVFILT_READ: 502 case EVFILT_WRITE: 503 ev = nxt_kevent_get_udata(kev->udata); 504 nxt_work_queue_add(wq, nxt_kqueue_fd_error_handler, 505 ev->task, ev, ev->data); 506 break; 507 508 case EVFILT_VNODE: 509 fev = nxt_kevent_get_udata(kev->udata); 510 nxt_work_queue_add(wq, nxt_kqueue_file_error_handler, 511 fev->task, fev, fev->data); 512 break; 513 } 514 } 515 } 516 517 518 static void 519 nxt_kqueue_fd_error_handler(nxt_task_t *task, void *obj, void *data) 520 { 521 nxt_fd_event_t *ev; 522 523 ev = obj; 524 525 if (ev->kq_eof && ev->kq_errno != 0) { 526 ev->error = ev->kq_errno; 527 nxt_log(task, nxt_socket_error_level(ev->kq_errno), 528 "kevent() reported error on descriptor %d %E", 529 ev->fd, ev->kq_errno); 530 } 531 532 ev->read = NXT_EVENT_INACTIVE; 533 ev->write = NXT_EVENT_INACTIVE; 534 ev->error = ev->kq_errno; 535 536 ev->error_handler(task, ev, data); 537 } 538 539 540 static void 541 nxt_kqueue_file_error_handler(nxt_task_t *task, void *obj, void *data) 542 { 543 nxt_event_file_t *ev; 544 545 ev = obj; 546 547 ev->handler(task, ev, data); 548 } 549 550 551 static nxt_int_t 552 nxt_kqueue_add_signal(nxt_event_engine_t *engine, const nxt_sig_event_t *sigev) 553 { 554 int signo; 555 struct kevent kev; 556 struct sigaction sa; 557 558 signo = sigev->signo; 559 560 nxt_memzero(&sa, sizeof(struct sigaction)); 561 sigemptyset(&sa.sa_mask); 562 563 /* 564 * SIGCHLD must not be set to SIG_IGN, since kqueue cannot catch 565 * this signal. It should be set to SIG_DFL instead. And although 566 * SIGCHLD default action is also ignoring, nevertheless SIG_DFL 567 * allows kqueue to catch the signal. 568 */ 569 sa.sa_handler = (signo == SIGCHLD) ? SIG_DFL : SIG_IGN; 570 571 if (sigaction(signo, &sa, NULL) != 0) { 572 nxt_log(&engine->task, NXT_LOG_CRIT, "sigaction(%d) failed %E", 573 signo, nxt_errno); 574 575 return NXT_ERROR; 576 } 577 578 nxt_debug(&engine->task, "kevent(%d) signo:%d (%s)", 579 engine->u.kqueue.fd, signo, sigev->name); 580 581 kev.ident = signo; 582 kev.filter = EVFILT_SIGNAL; 583 kev.flags = EV_ADD; 584 kev.fflags = 0; 585 kev.data = 0; 586 kev.udata = nxt_kevent_set_udata(sigev); 587 588 if (kevent(engine->u.kqueue.fd, &kev, 1, NULL, 0, NULL) == 0) { 589 return NXT_OK; 590 } 591 592 nxt_log(&engine->task, NXT_LOG_CRIT, "kevent(%d) failed %E", 593 kqueue, nxt_errno); 594 595 return NXT_ERROR; 596 } 597 598 599 #if (NXT_HAVE_EVFILT_USER) 600 601 static nxt_int_t 602 nxt_kqueue_enable_post(nxt_event_engine_t *engine, nxt_work_handler_t handler) 603 { 604 struct kevent kev; 605 606 /* EVFILT_USER must be added to a kqueue before it can be triggered. */ 607 608 kev.ident = 0; 609 kev.filter = EVFILT_USER; 610 kev.flags = EV_ADD | EV_CLEAR; 611 kev.fflags = 0; 612 kev.data = 0; 613 kev.udata = NULL; 614 615 engine->u.kqueue.post_handler = handler; 616 617 if (kevent(engine->u.kqueue.fd, &kev, 1, NULL, 0, NULL) == 0) { 618 return NXT_OK; 619 } 620 621 nxt_log(&engine->task, NXT_LOG_CRIT, "kevent(%d) failed %E", 622 engine->u.kqueue.fd, nxt_errno); 623 624 return NXT_ERROR; 625 } 626 627 628 static void 629 nxt_kqueue_signal(nxt_event_engine_t *engine, nxt_uint_t signo) 630 { 631 struct kevent kev; 632 633 /* 634 * kqueue has a builtin signal processing support, so the function 635 * is used only to post events and the signo argument is ignored. 636 */ 637 638 kev.ident = 0; 639 kev.filter = EVFILT_USER; 640 kev.flags = 0; 641 kev.fflags = NOTE_TRIGGER; 642 kev.data = 0; 643 kev.udata = NULL; 644 645 if (kevent(engine->u.kqueue.fd, &kev, 1, NULL, 0, NULL) != 0) { 646 nxt_log(&engine->task, NXT_LOG_CRIT, "kevent(%d) failed %E", 647 engine->u.kqueue.fd, nxt_errno); 648 } 649 } 650 651 #endif 652 653 654 static void 655 nxt_kqueue_poll(nxt_event_engine_t *engine, nxt_msec_t timeout) 656 { 657 int nevents; 658 void *obj, *data; 659 nxt_int_t i; 660 nxt_err_t err; 661 nxt_uint_t level; 662 nxt_bool_t error, eof; 663 nxt_task_t *task; 664 struct kevent *kev; 665 nxt_fd_event_t *ev; 666 nxt_sig_event_t *sigev; 667 struct timespec ts, *tp; 668 nxt_event_file_t *fev; 669 nxt_work_queue_t *wq; 670 nxt_work_handler_t handler; 671 672 if (timeout == NXT_INFINITE_MSEC) { 673 tp = NULL; 674 675 } else { 676 ts.tv_sec = timeout / 1000; 677 ts.tv_nsec = (timeout % 1000) * 1000000; 678 tp = &ts; 679 } 680 681 nxt_debug(&engine->task, "kevent(%d) changes:%d timeout:%M", 682 engine->u.kqueue.fd, engine->u.kqueue.nchanges, timeout); 683 684 nevents = kevent(engine->u.kqueue.fd, 685 engine->u.kqueue.changes, engine->u.kqueue.nchanges, 686 engine->u.kqueue.events, engine->u.kqueue.mevents, tp); 687 688 err = (nevents == -1) ? nxt_errno : 0; 689 690 nxt_thread_time_update(engine->task.thread); 691 692 nxt_debug(&engine->task, "kevent(%d): %d", engine->u.kqueue.fd, nevents); 693 694 if (nevents == -1) { 695 level = (err == NXT_EINTR) ? NXT_LOG_INFO : NXT_LOG_CRIT; 696 697 nxt_log(&engine->task, level, "kevent(%d) failed %E", 698 engine->u.kqueue.fd, err); 699 700 nxt_kqueue_error(engine); 701 return; 702 } 703 704 engine->u.kqueue.nchanges = 0; 705 706 for (i = 0; i < nevents; i++) { 707 708 kev = &engine->u.kqueue.events[i]; 709 710 nxt_debug(&engine->task, 711 (kev->ident > 0x8000000 && kev->ident != (uintptr_t) -1) ? 712 "kevent: id:%p ft:%d fl:%04Xd ff:%d d:%d ud:%p": 713 "kevent: id:%d ft:%d fl:%04Xd ff:%d d:%d ud:%p", 714 kev->ident, kev->filter, kev->flags, kev->fflags, 715 kev->data, kev->udata); 716 717 error = (kev->flags & EV_ERROR); 718 719 if (nxt_slow_path(error)) { 720 nxt_log(&engine->task, NXT_LOG_CRIT, 721 "kevent(%d) error %E on ident:%d filter:%d", 722 engine->u.kqueue.fd, kev->data, kev->ident, kev->filter); 723 } 724 725 task = &engine->task; 726 wq = &engine->fast_work_queue; 727 handler = nxt_kqueue_fd_error_handler; 728 obj = nxt_kevent_get_udata(kev->udata); 729 730 switch (kev->filter) { 731 732 case EVFILT_READ: 733 ev = obj; 734 ev->read_ready = 1; 735 ev->kq_available = (int32_t) kev->data; 736 err = kev->fflags; 737 eof = (kev->flags & EV_EOF) != 0; 738 ev->kq_errno = err; 739 ev->kq_eof = eof; 740 741 if (ev->read <= NXT_EVENT_BLOCKED) { 742 nxt_debug(ev->task, "blocked read event fd:%d", ev->fd); 743 continue; 744 } 745 746 if ((kev->flags & NXT_KEVENT_ONESHOT) != 0) { 747 ev->read = NXT_EVENT_INACTIVE; 748 } 749 750 if (nxt_slow_path(ev->kq_available == 0 && eof && err != 0)) { 751 error = 1; 752 } 753 754 if (nxt_fast_path(!error)) { 755 handler = ev->read_handler; 756 wq = ev->read_work_queue; 757 } 758 759 task = ev->task; 760 data = ev->data; 761 762 break; 763 764 case EVFILT_WRITE: 765 ev = obj; 766 ev->write_ready = 1; 767 err = kev->fflags; 768 eof = (kev->flags & EV_EOF) != 0; 769 ev->kq_errno = err; 770 ev->kq_eof = eof; 771 772 if (ev->write <= NXT_EVENT_BLOCKED) { 773 nxt_debug(ev->task, "blocked write event fd:%d", ev->fd); 774 continue; 775 } 776 777 if ((kev->flags & NXT_KEVENT_ONESHOT) != 0) { 778 ev->write = NXT_EVENT_INACTIVE; 779 } 780 781 if (nxt_slow_path(eof && err != 0)) { 782 error = 1; 783 } 784 785 if (nxt_fast_path(!error)) { 786 handler = ev->write_handler; 787 wq = ev->write_work_queue; 788 } 789 790 task = ev->task; 791 data = ev->data; 792 793 break; 794 795 case EVFILT_VNODE: 796 fev = obj; 797 handler = fev->handler; 798 task = fev->task; 799 data = fev->data; 800 break; 801 802 case EVFILT_SIGNAL: 803 sigev = obj; 804 obj = (void *) kev->ident; 805 handler = sigev->handler; 806 data = (void *) sigev->name; 807 break; 808 809 #if (NXT_HAVE_EVFILT_USER) 810 811 case EVFILT_USER: 812 handler = engine->u.kqueue.post_handler; 813 data = NULL; 814 break; 815 816 #endif 817 818 default: 819 820 #if (NXT_DEBUG) 821 nxt_log(&engine->task, NXT_LOG_CRIT, 822 "unexpected kevent(%d) filter %d on ident %d", 823 engine->u.kqueue.fd, kev->filter, kev->ident); 824 #endif 825 826 continue; 827 } 828 829 nxt_work_queue_add(wq, handler, task, obj, data); 830 } 831 } 832 833 834 /* 835 * nxt_kqueue_event_conn_io_connect() eliminates the 836 * getsockopt() syscall to test pending connect() error. 837 */ 838 839 static void 840 nxt_kqueue_event_conn_io_connect(nxt_task_t *task, void *obj, void *data) 841 { 842 nxt_event_conn_t *c; 843 nxt_event_engine_t *engine; 844 nxt_work_handler_t handler; 845 const nxt_event_conn_state_t *state; 846 847 c = obj; 848 849 state = c->write_state; 850 851 switch (nxt_socket_connect(task, c->socket.fd, c->remote) ){ 852 853 case NXT_OK: 854 c->socket.write_ready = 1; 855 handler = state->ready_handler; 856 break; 857 858 case NXT_AGAIN: 859 c->socket.write_handler = nxt_kqueue_event_conn_connected; 860 c->socket.error_handler = nxt_event_conn_connect_error; 861 862 engine = task->thread->engine; 863 nxt_event_conn_timer(engine, c, state, &c->write_timer); 864 865 nxt_kqueue_enable_write(engine, &c->socket); 866 return; 867 868 case NXT_DECLINED: 869 handler = state->close_handler; 870 break; 871 872 default: /* NXT_ERROR */ 873 handler = state->error_handler; 874 break; 875 } 876 877 nxt_work_queue_add(c->write_work_queue, handler, task, c, data); 878 } 879 880 881 static void 882 nxt_kqueue_event_conn_connected(nxt_task_t *task, void *obj, void *data) 883 { 884 nxt_event_conn_t *c; 885 886 c = obj; 887 888 nxt_debug(task, "kqueue event conn connected fd:%d", c->socket.fd); 889 890 c->socket.write = NXT_EVENT_BLOCKED; 891 892 if (c->write_state->autoreset_timer) { 893 nxt_timer_disable(task->thread->engine, &c->write_timer); 894 } 895 896 nxt_work_queue_add(c->write_work_queue, c->write_state->ready_handler, 897 task, c, data); 898 } 899 900 901 static void 902 nxt_kqueue_listen_handler(nxt_task_t *task, void *obj, void *data) 903 { 904 nxt_event_conn_listen_t *cls; 905 906 cls = obj; 907 908 nxt_debug(task, "kevent fd:%d avail:%D", 909 cls->socket.fd, cls->socket.kq_available); 910 911 cls->ready = nxt_min(cls->batch, (uint32_t) cls->socket.kq_available); 912 913 nxt_kqueue_event_conn_io_accept(task, cls, data); 914 } 915 916 917 static void 918 nxt_kqueue_event_conn_io_accept(nxt_task_t *task, void *obj, void *data) 919 { 920 socklen_t len; 921 nxt_socket_t s; 922 struct sockaddr *sa; 923 nxt_event_conn_t *c; 924 nxt_event_conn_listen_t *cls; 925 926 cls = obj; 927 c = data; 928 929 cls->ready--; 930 cls->socket.read_ready = (cls->ready != 0); 931 932 cls->socket.kq_available--; 933 cls->socket.read_ready = (cls->socket.kq_available != 0); 934 935 len = c->remote->socklen; 936 937 if (len >= sizeof(struct sockaddr)) { 938 sa = &c->remote->u.sockaddr; 939 940 } else { 941 sa = NULL; 942 len = 0; 943 } 944 945 s = accept(cls->socket.fd, sa, &len); 946 947 if (s != -1) { 948 c->socket.fd = s; 949 950 nxt_debug(task, "accept(%d): %d", cls->socket.fd, s); 951 952 nxt_event_conn_accept(task, cls, c); 953 return; 954 } 955 956 nxt_event_conn_accept_error(task, cls, "accept", nxt_errno); 957 } 958 959 960 /* 961 * nxt_kqueue_event_conn_io_read() is just a wrapper to eliminate the 962 * readv() or recv() syscall if a remote side just closed connection. 963 */ 964 965 static void 966 nxt_kqueue_event_conn_io_read(nxt_task_t *task, void *obj, void *data) 967 { 968 nxt_event_conn_t *c; 969 970 c = obj; 971 972 nxt_debug(task, "kqueue event conn read fd:%d", c->socket.fd); 973 974 if (c->socket.kq_available == 0 && c->socket.kq_eof) { 975 nxt_debug(task, "kevent fd:%d eof", c->socket.fd); 976 977 c->socket.closed = 1; 978 nxt_work_queue_add(c->read_work_queue, c->read_state->close_handler, 979 task, c, data); 980 return; 981 } 982 983 nxt_event_conn_io_read(task, c, data); 984 } 985 986 987 /* 988 * nxt_kqueue_event_conn_io_recvbuf() is just wrapper around standard 989 * nxt_event_conn_io_recvbuf() to eliminate the readv() or recv() syscalls 990 * if there is no pending data or a remote side closed connection. 991 */ 992 993 static ssize_t 994 nxt_kqueue_event_conn_io_recvbuf(nxt_event_conn_t *c, nxt_buf_t *b) 995 { 996 ssize_t n; 997 998 if (c->socket.kq_available == 0 && c->socket.kq_eof) { 999 c->socket.closed = 1; 1000 return 0; 1001 } 1002 1003 n = nxt_event_conn_io_recvbuf(c, b); 1004 1005 if (n > 0) { 1006 c->socket.kq_available -= n; 1007 1008 if (c->socket.kq_available < 0) { 1009 c->socket.kq_available = 0; 1010 } 1011 1012 nxt_debug(c->socket.task, "kevent fd:%d avail:%D eof:%d", 1013 c->socket.fd, c->socket.kq_available, c->socket.kq_eof); 1014 1015 c->socket.read_ready = (c->socket.kq_available != 0 1016 || c->socket.kq_eof); 1017 } 1018 1019 return n; 1020 } 1021