xref: /unit/src/nxt_kqueue_engine.c (revision 20:4dc92b438f58)
1 
2 /*
3  * Copyright (C) Igor Sysoev
4  * Copyright (C) NGINX, Inc.
5  */
6 
7 #include <nxt_main.h>
8 
9 
10 /*
11  * kqueue()      has been introduced in FreeBSD 4.1 and then was ported
12  *               to OpenBSD 2.9, MacOSX 10.3 (Panther), and NetBSD 2.0.
13  *               DragonFlyBSD inherited it with FreeBSD 4 code base.
14  *
15  * NOTE_REVOKE   has been introduced in FreeBSD 4.3 and then was ported
16  *               to OpenBSD 2.9, MacOSX 10.3 (Panther), and NetBSD 2.0.
17  *               DragonFlyBSD inherited it with FreeBSD 4 code base.
18  *
19  * EVFILT_TIMER  has been introduced in FreeBSD 4.4-STABLE and then was
20  *               ported to NetBSD 2.0, MacOSX 10.4 (Tiger), and OpenBSD 4.2.
21  *               DragonFlyBSD inherited it with FreeBSD 4 code base.
22  *
23  * EVFILT_USER and EV_DISPATCH have been introduced in MacOSX 10.6 (Snow
24  *               Leopard) as part of the Grand Central Dispatch framework
25  *               and then were ported to FreeBSD 8.0-STABLE as part of the
26  *               libdispatch support.
27  */
28 
29 
30 /*
31  * EV_DISPATCH is better because it just disables an event on delivery
32  * whilst EV_ONESHOT deletes the event.  This eliminates in-kernel memory
33  * deallocation and probable subsequent allocation with a lock acquiring.
34  */
35 #ifdef EV_DISPATCH
36 #define NXT_KEVENT_ONESHOT  EV_DISPATCH
37 #else
38 #define NXT_KEVENT_ONESHOT  EV_ONESHOT
39 #endif
40 
41 
42 #if (NXT_NETBSD)
43 /* NetBSD defines the kevent.udata field as intptr_t. */
44 
45 #define nxt_kevent_set_udata(udata)  (intptr_t) (udata)
46 #define nxt_kevent_get_udata(udata)  (void *) (udata)
47 
48 #else
49 #define nxt_kevent_set_udata(udata)  (void *) (udata)
50 #define nxt_kevent_get_udata(udata)  (udata)
51 #endif
52 
53 
54 static nxt_int_t nxt_kqueue_create(nxt_event_engine_t *engine,
55     nxt_uint_t mchanges, nxt_uint_t mevents);
56 static void nxt_kqueue_free(nxt_event_engine_t *engine);
57 static void nxt_kqueue_enable(nxt_event_engine_t *engine, nxt_fd_event_t *ev);
58 static void nxt_kqueue_disable(nxt_event_engine_t *engine, nxt_fd_event_t *ev);
59 static void nxt_kqueue_delete(nxt_event_engine_t *engine, nxt_fd_event_t *ev);
60 static nxt_bool_t nxt_kqueue_close(nxt_event_engine_t *engine,
61     nxt_fd_event_t *ev);
62 static void nxt_kqueue_enable_read(nxt_event_engine_t *engine,
63     nxt_fd_event_t *ev);
64 static void nxt_kqueue_enable_write(nxt_event_engine_t *engine,
65     nxt_fd_event_t *ev);
66 static void nxt_kqueue_disable_read(nxt_event_engine_t *engine,
67     nxt_fd_event_t *ev);
68 static void nxt_kqueue_disable_write(nxt_event_engine_t *engine,
69     nxt_fd_event_t *ev);
70 static void nxt_kqueue_block_read(nxt_event_engine_t *engine,
71     nxt_fd_event_t *ev);
72 static void nxt_kqueue_block_write(nxt_event_engine_t *engine,
73     nxt_fd_event_t *ev);
74 static void nxt_kqueue_oneshot_read(nxt_event_engine_t *engine,
75     nxt_fd_event_t *ev);
76 static void nxt_kqueue_oneshot_write(nxt_event_engine_t *engine,
77     nxt_fd_event_t *ev);
78 static void nxt_kqueue_enable_accept(nxt_event_engine_t *engine,
79     nxt_fd_event_t *ev);
80 static void nxt_kqueue_enable_file(nxt_event_engine_t *engine,
81     nxt_event_file_t *ev);
82 static void nxt_kqueue_close_file(nxt_event_engine_t *engine,
83     nxt_event_file_t *ev);
84 static void nxt_kqueue_fd_set(nxt_event_engine_t *engine, nxt_fd_event_t *ev,
85     nxt_int_t filter, nxt_uint_t flags);
86 static struct kevent *nxt_kqueue_get_kevent(nxt_event_engine_t *engine);
87 static void nxt_kqueue_error(nxt_event_engine_t *engine);
88 static void nxt_kqueue_fd_error_handler(nxt_task_t *task, void *obj,
89     void *data);
90 static void nxt_kqueue_file_error_handler(nxt_task_t *task, void *obj,
91     void *data);
92 static nxt_int_t nxt_kqueue_add_signal(nxt_event_engine_t *engine,
93     const nxt_sig_event_t *sigev);
94 #if (NXT_HAVE_EVFILT_USER)
95 static nxt_int_t nxt_kqueue_enable_post(nxt_event_engine_t *engine,
96     nxt_work_handler_t handler);
97 static void nxt_kqueue_signal(nxt_event_engine_t *engine, nxt_uint_t signo);
98 #endif
99 static void nxt_kqueue_poll(nxt_event_engine_t *engine, nxt_msec_t timeout);
100 
101 static void nxt_kqueue_event_conn_io_connect(nxt_task_t *task, void *obj,
102     void *data);
103 static void nxt_kqueue_event_conn_connected(nxt_task_t *task, void *obj,
104     void *data);
105 static void nxt_kqueue_listen_handler(nxt_task_t *task, void *obj, void *data);
106 static void nxt_kqueue_event_conn_io_accept(nxt_task_t *task, void *obj,
107     void *data);
108 static void nxt_kqueue_event_conn_io_read(nxt_task_t *task, void *obj,
109     void *data);
110 static ssize_t nxt_kqueue_event_conn_io_recvbuf(nxt_event_conn_t *c,
111     nxt_buf_t *b);
112 
113 
114 static nxt_event_conn_io_t  nxt_kqueue_event_conn_io = {
115     nxt_kqueue_event_conn_io_connect,
116     nxt_kqueue_event_conn_io_accept,
117 
118     nxt_kqueue_event_conn_io_read,
119     nxt_kqueue_event_conn_io_recvbuf,
120     nxt_event_conn_io_recv,
121 
122     nxt_conn_io_write,
123     nxt_event_conn_io_write_chunk,
124 
125 #if (NXT_HAVE_FREEBSD_SENDFILE)
126     nxt_freebsd_event_conn_io_sendfile,
127 #elif (NXT_HAVE_MACOSX_SENDFILE)
128     nxt_macosx_event_conn_io_sendfile,
129 #else
130     nxt_event_conn_io_sendbuf,
131 #endif
132 
133     nxt_event_conn_io_writev,
134     nxt_event_conn_io_send,
135 
136     nxt_event_conn_io_shutdown,
137 };
138 
139 
140 const nxt_event_interface_t  nxt_kqueue_engine = {
141     "kqueue",
142     nxt_kqueue_create,
143     nxt_kqueue_free,
144     nxt_kqueue_enable,
145     nxt_kqueue_disable,
146     nxt_kqueue_delete,
147     nxt_kqueue_close,
148     nxt_kqueue_enable_read,
149     nxt_kqueue_enable_write,
150     nxt_kqueue_disable_read,
151     nxt_kqueue_disable_write,
152     nxt_kqueue_block_read,
153     nxt_kqueue_block_write,
154     nxt_kqueue_oneshot_read,
155     nxt_kqueue_oneshot_write,
156     nxt_kqueue_enable_accept,
157     nxt_kqueue_enable_file,
158     nxt_kqueue_close_file,
159 #if (NXT_HAVE_EVFILT_USER)
160     nxt_kqueue_enable_post,
161     nxt_kqueue_signal,
162 #else
163     NULL,
164     NULL,
165 #endif
166     nxt_kqueue_poll,
167 
168     &nxt_kqueue_event_conn_io,
169 
170     NXT_FILE_EVENTS,
171     NXT_SIGNAL_EVENTS,
172 };
173 
174 
175 static nxt_int_t
176 nxt_kqueue_create(nxt_event_engine_t *engine, nxt_uint_t mchanges,
177     nxt_uint_t mevents)
178 {
179     const nxt_sig_event_t  *sigev;
180 
181     engine->u.kqueue.fd = -1;
182     engine->u.kqueue.mchanges = mchanges;
183     engine->u.kqueue.mevents = mevents;
184     engine->u.kqueue.pid = nxt_pid;
185 
186     engine->u.kqueue.changes = nxt_malloc(sizeof(struct kevent) * mchanges);
187     if (engine->u.kqueue.changes == NULL) {
188         goto fail;
189     }
190 
191     engine->u.kqueue.events = nxt_malloc(sizeof(struct kevent) * mevents);
192     if (engine->u.kqueue.events == NULL) {
193         goto fail;
194     }
195 
196     engine->u.kqueue.fd = kqueue();
197     if (engine->u.kqueue.fd == -1) {
198         nxt_log(&engine->task, NXT_LOG_CRIT, "kqueue() failed %E", nxt_errno);
199         goto fail;
200     }
201 
202     nxt_debug(&engine->task, "kqueue(): %d", engine->u.kqueue.fd);
203 
204     if (engine->signals != NULL) {
205         for (sigev = engine->signals->sigev; sigev->signo != 0; sigev++) {
206             if (nxt_kqueue_add_signal(engine, sigev) != NXT_OK) {
207                 goto fail;
208             }
209         }
210     }
211 
212     return NXT_OK;
213 
214 fail:
215 
216     nxt_kqueue_free(engine);
217 
218     return NXT_ERROR;
219 }
220 
221 
222 static void
223 nxt_kqueue_free(nxt_event_engine_t *engine)
224 {
225     nxt_fd_t  fd;
226 
227     fd = engine->u.kqueue.fd;
228 
229     nxt_debug(&engine->task, "kqueue %d free", fd);
230 
231     if (fd != -1 && engine->u.kqueue.pid == nxt_pid) {
232         /* kqueue is not inherited by fork() */
233 
234         if (close(fd) != 0) {
235             nxt_log(&engine->task, NXT_LOG_CRIT, "kqueue close(%d) failed %E",
236                     fd, nxt_errno);
237         }
238     }
239 
240     nxt_free(engine->u.kqueue.events);
241     nxt_free(engine->u.kqueue.changes);
242 
243     nxt_memzero(&engine->u.kqueue, sizeof(nxt_kqueue_engine_t));
244 }
245 
246 
247 static void
248 nxt_kqueue_enable(nxt_event_engine_t *engine, nxt_fd_event_t *ev)
249 {
250     nxt_kqueue_enable_read(engine, ev);
251     nxt_kqueue_enable_write(engine, ev);
252 }
253 
254 
255 /*
256  * EV_DISABLE is better because it eliminates in-kernel memory
257  * deallocation and probable subsequent allocation with a lock acquiring.
258  */
259 
260 static void
261 nxt_kqueue_disable(nxt_event_engine_t *engine, nxt_fd_event_t *ev)
262 {
263     if (ev->read != NXT_EVENT_INACTIVE) {
264         ev->read = NXT_EVENT_INACTIVE;
265         nxt_kqueue_fd_set(engine, ev, EVFILT_READ, EV_DISABLE);
266     }
267 
268     if (ev->write != NXT_EVENT_INACTIVE) {
269         ev->write = NXT_EVENT_INACTIVE;
270         nxt_kqueue_fd_set(engine, ev, EVFILT_WRITE, EV_DISABLE);
271     }
272 }
273 
274 
275 static void
276 nxt_kqueue_delete(nxt_event_engine_t *engine, nxt_fd_event_t *ev)
277 {
278     if (ev->read != NXT_EVENT_INACTIVE) {
279         ev->read = NXT_EVENT_INACTIVE;
280         nxt_kqueue_fd_set(engine, ev, EVFILT_READ, EV_DELETE);
281     }
282 
283     if (ev->write != NXT_EVENT_INACTIVE) {
284         ev->write = NXT_EVENT_INACTIVE;
285         nxt_kqueue_fd_set(engine, ev, EVFILT_WRITE, EV_DELETE);
286     }
287 }
288 
289 
290 /*
291  * kqueue(2):
292  *
293  *   Calling close() on a file descriptor will remove any kevents that
294  *   reference the descriptor.
295  *
296  * nxt_kqueue_close() always returns true as there are pending events on
297  * closing file descriptor because kevent() passes whole change list at once.
298  */
299 
300 static nxt_bool_t
301 nxt_kqueue_close(nxt_event_engine_t *engine, nxt_fd_event_t *ev)
302 {
303     ev->read = NXT_EVENT_INACTIVE;
304     ev->write = NXT_EVENT_INACTIVE;
305 
306     return 1;
307 }
308 
309 
310 /*
311  * The kqueue event engine uses only three states: inactive, blocked, and
312  * active.  An active oneshot event is marked as it is in the default
313  * state.  The event will be converted eventually to the default EV_CLEAR
314  * mode after it will become inactive after delivery.
315  */
316 
317 static void
318 nxt_kqueue_enable_read(nxt_event_engine_t *engine, nxt_fd_event_t *ev)
319 {
320     if (ev->read == NXT_EVENT_INACTIVE) {
321         nxt_kqueue_fd_set(engine, ev, EVFILT_READ,
322                           EV_ADD | EV_ENABLE | EV_CLEAR);
323     }
324 
325     ev->read = NXT_EVENT_ACTIVE;
326 }
327 
328 
329 static void
330 nxt_kqueue_enable_write(nxt_event_engine_t *engine, nxt_fd_event_t *ev)
331 {
332     if (ev->write == NXT_EVENT_INACTIVE) {
333         nxt_kqueue_fd_set(engine, ev, EVFILT_WRITE,
334                           EV_ADD | EV_ENABLE | EV_CLEAR);
335     }
336 
337     ev->write = NXT_EVENT_ACTIVE;
338 }
339 
340 
341 static void
342 nxt_kqueue_disable_read(nxt_event_engine_t *engine, nxt_fd_event_t *ev)
343 {
344     ev->read = NXT_EVENT_INACTIVE;
345 
346     nxt_kqueue_fd_set(engine, ev, EVFILT_READ, EV_DISABLE);
347 }
348 
349 
350 static void
351 nxt_kqueue_disable_write(nxt_event_engine_t *engine, nxt_fd_event_t *ev)
352 {
353     ev->write = NXT_EVENT_INACTIVE;
354 
355     nxt_kqueue_fd_set(engine, ev, EVFILT_WRITE, EV_DISABLE);
356 }
357 
358 
359 static void
360 nxt_kqueue_block_read(nxt_event_engine_t *engine, nxt_fd_event_t *ev)
361 {
362     if (ev->read != NXT_EVENT_INACTIVE) {
363         ev->read = NXT_EVENT_BLOCKED;
364     }
365 }
366 
367 
368 static void
369 nxt_kqueue_block_write(nxt_event_engine_t *engine, nxt_fd_event_t *ev)
370 {
371     if (ev->write != NXT_EVENT_INACTIVE) {
372         ev->write = NXT_EVENT_BLOCKED;
373     }
374 }
375 
376 
377 static void
378 nxt_kqueue_oneshot_read(nxt_event_engine_t *engine, nxt_fd_event_t *ev)
379 {
380     ev->write = NXT_EVENT_ACTIVE;
381 
382     nxt_kqueue_fd_set(engine, ev, EVFILT_WRITE,
383                       EV_ADD | EV_ENABLE | NXT_KEVENT_ONESHOT);
384 }
385 
386 
387 static void
388 nxt_kqueue_oneshot_write(nxt_event_engine_t *engine, nxt_fd_event_t *ev)
389 {
390     ev->write = NXT_EVENT_ACTIVE;
391 
392     nxt_kqueue_fd_set(engine, ev, EVFILT_WRITE,
393                       EV_ADD | EV_ENABLE | NXT_KEVENT_ONESHOT);
394 }
395 
396 
397 static void
398 nxt_kqueue_enable_accept(nxt_event_engine_t *engine, nxt_fd_event_t *ev)
399 {
400     ev->read = NXT_EVENT_ACTIVE;
401     ev->read_handler = nxt_kqueue_listen_handler;
402 
403     nxt_kqueue_fd_set(engine, ev, EVFILT_READ, EV_ADD | EV_ENABLE);
404 }
405 
406 
407 static void
408 nxt_kqueue_enable_file(nxt_event_engine_t *engine, nxt_event_file_t *ev)
409 {
410     struct kevent  *kev;
411 
412     const nxt_int_t   flags = EV_ADD | EV_ENABLE | EV_ONESHOT;
413     const nxt_uint_t  fflags = NOTE_DELETE | NOTE_WRITE | NOTE_EXTEND
414                                | NOTE_ATTRIB | NOTE_RENAME | NOTE_REVOKE;
415 
416     nxt_debug(&engine->task, "kevent(%d) set: id:%d ft:%i fl:%04Xd, ff:%04XuD",
417               engine->u.kqueue.fd, ev->file->fd, EVFILT_VNODE, flags, fflags);
418 
419     kev = nxt_kqueue_get_kevent(engine);
420 
421     kev->ident = ev->file->fd;
422     kev->filter = EVFILT_VNODE;
423     kev->flags = flags;
424     kev->fflags = fflags;
425     kev->data = 0;
426     kev->udata = nxt_kevent_set_udata(ev);
427 }
428 
429 
430 static void
431 nxt_kqueue_close_file(nxt_event_engine_t *engine, nxt_event_file_t *ev)
432 {
433     /* TODO: pending event. */
434 }
435 
436 
437 static void
438 nxt_kqueue_fd_set(nxt_event_engine_t *engine, nxt_fd_event_t *ev,
439     nxt_int_t filter, nxt_uint_t flags)
440 {
441     struct kevent  *kev;
442 
443     nxt_debug(ev->task, "kevent(%d) set event: id:%d ft:%i fl:%04Xui",
444               engine->u.kqueue.fd, ev->fd, filter, flags);
445 
446     kev = nxt_kqueue_get_kevent(engine);
447 
448     kev->ident = ev->fd;
449     kev->filter = filter;
450     kev->flags = flags;
451     kev->fflags = 0;
452     kev->data = 0;
453     kev->udata = nxt_kevent_set_udata(ev);
454 }
455 
456 
457 static struct kevent *
458 nxt_kqueue_get_kevent(nxt_event_engine_t *engine)
459 {
460     int  ret, nchanges;
461 
462     nchanges = engine->u.kqueue.nchanges;
463 
464     if (nxt_slow_path(nchanges >= engine->u.kqueue.mchanges)) {
465 
466         nxt_debug(&engine->task, "kevent(%d) changes:%d",
467                   engine->u.kqueue.fd, nchanges);
468 
469         ret = kevent(engine->u.kqueue.fd, engine->u.kqueue.changes, nchanges,
470                      NULL, 0, NULL);
471 
472         if (nxt_slow_path(ret != 0)) {
473             nxt_log(&engine->task, NXT_LOG_CRIT, "kevent(%d) failed %E",
474                     engine->u.kqueue.fd, nxt_errno);
475 
476             nxt_kqueue_error(engine);
477         }
478 
479         engine->u.kqueue.nchanges = 0;
480     }
481 
482     return &engine->u.kqueue.changes[engine->u.kqueue.nchanges++];
483 }
484 
485 
486 static void
487 nxt_kqueue_error(nxt_event_engine_t *engine)
488 {
489     struct kevent     *kev, *end;
490     nxt_fd_event_t    *ev;
491     nxt_event_file_t  *fev;
492     nxt_work_queue_t  *wq;
493 
494     wq = &engine->fast_work_queue;
495     end = &engine->u.kqueue.changes[engine->u.kqueue.nchanges];
496 
497     for (kev = engine->u.kqueue.changes; kev < end; kev++) {
498 
499         switch (kev->filter) {
500 
501         case EVFILT_READ:
502         case EVFILT_WRITE:
503             ev = nxt_kevent_get_udata(kev->udata);
504             nxt_work_queue_add(wq, nxt_kqueue_fd_error_handler,
505                                ev->task, ev, ev->data);
506             break;
507 
508         case EVFILT_VNODE:
509             fev = nxt_kevent_get_udata(kev->udata);
510             nxt_work_queue_add(wq, nxt_kqueue_file_error_handler,
511                                fev->task, fev, fev->data);
512             break;
513         }
514     }
515 }
516 
517 
518 static void
519 nxt_kqueue_fd_error_handler(nxt_task_t *task, void *obj, void *data)
520 {
521     nxt_fd_event_t  *ev;
522 
523     ev = obj;
524 
525     if (ev->kq_eof && ev->kq_errno != 0) {
526         ev->error = ev->kq_errno;
527         nxt_log(task, nxt_socket_error_level(ev->kq_errno),
528                 "kevent() reported error on descriptor %d %E",
529                 ev->fd, ev->kq_errno);
530     }
531 
532     ev->read = NXT_EVENT_INACTIVE;
533     ev->write = NXT_EVENT_INACTIVE;
534     ev->error = ev->kq_errno;
535 
536     ev->error_handler(task, ev, data);
537 }
538 
539 
540 static void
541 nxt_kqueue_file_error_handler(nxt_task_t *task, void *obj, void *data)
542 {
543     nxt_event_file_t  *ev;
544 
545     ev = obj;
546 
547     ev->handler(task, ev, data);
548 }
549 
550 
551 static nxt_int_t
552 nxt_kqueue_add_signal(nxt_event_engine_t *engine, const nxt_sig_event_t *sigev)
553 {
554     int               signo;
555     struct kevent     kev;
556     struct sigaction  sa;
557 
558     signo = sigev->signo;
559 
560     nxt_memzero(&sa, sizeof(struct sigaction));
561     sigemptyset(&sa.sa_mask);
562 
563     /*
564      * SIGCHLD must not be set to SIG_IGN, since kqueue cannot catch
565      * this signal.  It should be set to SIG_DFL instead.  And although
566      * SIGCHLD default action is also ignoring, nevertheless SIG_DFL
567      * allows kqueue to catch the signal.
568      */
569     sa.sa_handler = (signo == SIGCHLD) ? SIG_DFL : SIG_IGN;
570 
571     if (sigaction(signo, &sa, NULL) != 0) {
572         nxt_log(&engine->task, NXT_LOG_CRIT, "sigaction(%d) failed %E",
573                 signo, nxt_errno);
574 
575         return NXT_ERROR;
576     }
577 
578     nxt_debug(&engine->task, "kevent(%d) signo:%d (%s)",
579               engine->u.kqueue.fd, signo, sigev->name);
580 
581     kev.ident = signo;
582     kev.filter = EVFILT_SIGNAL;
583     kev.flags = EV_ADD;
584     kev.fflags = 0;
585     kev.data = 0;
586     kev.udata = nxt_kevent_set_udata(sigev);
587 
588     if (kevent(engine->u.kqueue.fd, &kev, 1, NULL, 0, NULL) == 0) {
589         return NXT_OK;
590     }
591 
592     nxt_log(&engine->task, NXT_LOG_CRIT, "kevent(%d) failed %E",
593             kqueue, nxt_errno);
594 
595     return NXT_ERROR;
596 }
597 
598 
599 #if (NXT_HAVE_EVFILT_USER)
600 
601 static nxt_int_t
602 nxt_kqueue_enable_post(nxt_event_engine_t *engine, nxt_work_handler_t handler)
603 {
604     struct kevent  kev;
605 
606     /* EVFILT_USER must be added to a kqueue before it can be triggered. */
607 
608     kev.ident = 0;
609     kev.filter = EVFILT_USER;
610     kev.flags = EV_ADD | EV_CLEAR;
611     kev.fflags = 0;
612     kev.data = 0;
613     kev.udata = NULL;
614 
615     engine->u.kqueue.post_handler = handler;
616 
617     if (kevent(engine->u.kqueue.fd, &kev, 1, NULL, 0, NULL) == 0) {
618         return NXT_OK;
619     }
620 
621     nxt_log(&engine->task, NXT_LOG_CRIT, "kevent(%d) failed %E",
622             engine->u.kqueue.fd, nxt_errno);
623 
624     return NXT_ERROR;
625 }
626 
627 
628 static void
629 nxt_kqueue_signal(nxt_event_engine_t *engine, nxt_uint_t signo)
630 {
631     struct kevent  kev;
632 
633     /*
634      * kqueue has a builtin signal processing support, so the function
635      * is used only to post events and the signo argument is ignored.
636      */
637 
638     kev.ident = 0;
639     kev.filter = EVFILT_USER;
640     kev.flags = 0;
641     kev.fflags = NOTE_TRIGGER;
642     kev.data = 0;
643     kev.udata = NULL;
644 
645     if (kevent(engine->u.kqueue.fd, &kev, 1, NULL, 0, NULL) != 0) {
646         nxt_log(&engine->task, NXT_LOG_CRIT, "kevent(%d) failed %E",
647                 engine->u.kqueue.fd, nxt_errno);
648     }
649 }
650 
651 #endif
652 
653 
654 static void
655 nxt_kqueue_poll(nxt_event_engine_t *engine, nxt_msec_t timeout)
656 {
657     int                 nevents;
658     void                *obj, *data;
659     nxt_int_t           i;
660     nxt_err_t           err;
661     nxt_uint_t          level;
662     nxt_bool_t          error, eof;
663     nxt_task_t          *task;
664     struct kevent       *kev;
665     nxt_fd_event_t      *ev;
666     nxt_sig_event_t     *sigev;
667     struct timespec     ts, *tp;
668     nxt_event_file_t    *fev;
669     nxt_work_queue_t    *wq;
670     nxt_work_handler_t  handler;
671 
672     if (timeout == NXT_INFINITE_MSEC) {
673         tp = NULL;
674 
675     } else {
676         ts.tv_sec = timeout / 1000;
677         ts.tv_nsec = (timeout % 1000) * 1000000;
678         tp = &ts;
679     }
680 
681     nxt_debug(&engine->task, "kevent(%d) changes:%d timeout:%M",
682               engine->u.kqueue.fd, engine->u.kqueue.nchanges, timeout);
683 
684     nevents = kevent(engine->u.kqueue.fd,
685                      engine->u.kqueue.changes, engine->u.kqueue.nchanges,
686                      engine->u.kqueue.events, engine->u.kqueue.mevents, tp);
687 
688     err = (nevents == -1) ? nxt_errno : 0;
689 
690     nxt_thread_time_update(engine->task.thread);
691 
692     nxt_debug(&engine->task, "kevent(%d): %d", engine->u.kqueue.fd, nevents);
693 
694     if (nevents == -1) {
695         level = (err == NXT_EINTR) ? NXT_LOG_INFO : NXT_LOG_CRIT;
696 
697         nxt_log(&engine->task, level, "kevent(%d) failed %E",
698                 engine->u.kqueue.fd, err);
699 
700         nxt_kqueue_error(engine);
701         return;
702     }
703 
704     engine->u.kqueue.nchanges = 0;
705 
706     for (i = 0; i < nevents; i++) {
707 
708         kev = &engine->u.kqueue.events[i];
709 
710         nxt_debug(&engine->task,
711                   (kev->ident > 0x8000000 && kev->ident != (uintptr_t) -1) ?
712                       "kevent: id:%p ft:%d fl:%04Xd ff:%d d:%d ud:%p":
713                       "kevent: id:%d ft:%d fl:%04Xd ff:%d d:%d ud:%p",
714                   kev->ident, kev->filter, kev->flags, kev->fflags,
715                   kev->data, kev->udata);
716 
717         error = (kev->flags & EV_ERROR);
718 
719         if (nxt_slow_path(error)) {
720             nxt_log(&engine->task, NXT_LOG_CRIT,
721                     "kevent(%d) error %E on ident:%d filter:%d",
722                     engine->u.kqueue.fd, kev->data, kev->ident, kev->filter);
723         }
724 
725         task = &engine->task;
726         wq = &engine->fast_work_queue;
727         handler = nxt_kqueue_fd_error_handler;
728         obj = nxt_kevent_get_udata(kev->udata);
729 
730         switch (kev->filter) {
731 
732         case EVFILT_READ:
733             ev = obj;
734             ev->read_ready = 1;
735             ev->kq_available = (int32_t) kev->data;
736             err = kev->fflags;
737             eof = (kev->flags & EV_EOF) != 0;
738             ev->kq_errno = err;
739             ev->kq_eof = eof;
740 
741             if (ev->read <= NXT_EVENT_BLOCKED) {
742                 nxt_debug(ev->task, "blocked read event fd:%d", ev->fd);
743                 continue;
744             }
745 
746             if ((kev->flags & NXT_KEVENT_ONESHOT) != 0) {
747                 ev->read = NXT_EVENT_INACTIVE;
748             }
749 
750             if (nxt_slow_path(ev->kq_available == 0 && eof && err != 0)) {
751                 error = 1;
752             }
753 
754             if (nxt_fast_path(!error)) {
755                 handler = ev->read_handler;
756                 wq = ev->read_work_queue;
757             }
758 
759             task = ev->task;
760             data = ev->data;
761 
762             break;
763 
764         case EVFILT_WRITE:
765             ev = obj;
766             ev->write_ready = 1;
767             err = kev->fflags;
768             eof = (kev->flags & EV_EOF) != 0;
769             ev->kq_errno = err;
770             ev->kq_eof = eof;
771 
772             if (ev->write <= NXT_EVENT_BLOCKED) {
773                 nxt_debug(ev->task, "blocked write event fd:%d", ev->fd);
774                 continue;
775             }
776 
777             if ((kev->flags & NXT_KEVENT_ONESHOT) != 0) {
778                 ev->write = NXT_EVENT_INACTIVE;
779             }
780 
781             if (nxt_slow_path(eof && err != 0)) {
782                 error = 1;
783             }
784 
785             if (nxt_fast_path(!error)) {
786                 handler = ev->write_handler;
787                 wq = ev->write_work_queue;
788             }
789 
790             task = ev->task;
791             data = ev->data;
792 
793             break;
794 
795         case EVFILT_VNODE:
796             fev = obj;
797             handler = fev->handler;
798             task = fev->task;
799             data = fev->data;
800             break;
801 
802         case EVFILT_SIGNAL:
803             sigev = obj;
804             obj = (void *) kev->ident;
805             handler = sigev->handler;
806             data = (void *) sigev->name;
807             break;
808 
809 #if (NXT_HAVE_EVFILT_USER)
810 
811         case EVFILT_USER:
812             handler = engine->u.kqueue.post_handler;
813             data = NULL;
814             break;
815 
816 #endif
817 
818         default:
819 
820 #if (NXT_DEBUG)
821             nxt_log(&engine->task, NXT_LOG_CRIT,
822                     "unexpected kevent(%d) filter %d on ident %d",
823                     engine->u.kqueue.fd, kev->filter, kev->ident);
824 #endif
825 
826             continue;
827         }
828 
829         nxt_work_queue_add(wq, handler, task, obj, data);
830     }
831 }
832 
833 
834 /*
835  * nxt_kqueue_event_conn_io_connect() eliminates the
836  * getsockopt() syscall to test pending connect() error.
837  */
838 
839 static void
840 nxt_kqueue_event_conn_io_connect(nxt_task_t *task, void *obj, void *data)
841 {
842     nxt_event_conn_t              *c;
843     nxt_event_engine_t            *engine;
844     nxt_work_handler_t            handler;
845     const nxt_event_conn_state_t  *state;
846 
847     c = obj;
848 
849     state = c->write_state;
850 
851     switch (nxt_socket_connect(task, c->socket.fd, c->remote) ){
852 
853     case NXT_OK:
854         c->socket.write_ready = 1;
855         handler = state->ready_handler;
856         break;
857 
858     case NXT_AGAIN:
859         c->socket.write_handler = nxt_kqueue_event_conn_connected;
860         c->socket.error_handler = nxt_event_conn_connect_error;
861 
862         engine = task->thread->engine;
863         nxt_event_conn_timer(engine, c, state, &c->write_timer);
864 
865         nxt_kqueue_enable_write(engine, &c->socket);
866         return;
867 
868     case NXT_DECLINED:
869         handler = state->close_handler;
870         break;
871 
872     default: /* NXT_ERROR */
873         handler = state->error_handler;
874         break;
875     }
876 
877     nxt_work_queue_add(c->write_work_queue, handler, task, c, data);
878 }
879 
880 
881 static void
882 nxt_kqueue_event_conn_connected(nxt_task_t *task, void *obj, void *data)
883 {
884     nxt_event_conn_t  *c;
885 
886     c = obj;
887 
888     nxt_debug(task, "kqueue event conn connected fd:%d", c->socket.fd);
889 
890     c->socket.write = NXT_EVENT_BLOCKED;
891 
892     if (c->write_state->autoreset_timer) {
893         nxt_timer_disable(task->thread->engine, &c->write_timer);
894     }
895 
896     nxt_work_queue_add(c->write_work_queue, c->write_state->ready_handler,
897                        task, c, data);
898 }
899 
900 
901 static void
902 nxt_kqueue_listen_handler(nxt_task_t *task, void *obj, void *data)
903 {
904     nxt_event_conn_listen_t  *cls;
905 
906     cls = obj;
907 
908     nxt_debug(task, "kevent fd:%d avail:%D",
909               cls->socket.fd, cls->socket.kq_available);
910 
911     cls->ready = nxt_min(cls->batch, (uint32_t) cls->socket.kq_available);
912 
913     nxt_kqueue_event_conn_io_accept(task, cls, data);
914 }
915 
916 
917 static void
918 nxt_kqueue_event_conn_io_accept(nxt_task_t *task, void *obj, void *data)
919 {
920     socklen_t                len;
921     nxt_socket_t             s;
922     struct sockaddr          *sa;
923     nxt_event_conn_t         *c;
924     nxt_event_conn_listen_t  *cls;
925 
926     cls = obj;
927     c = data;
928 
929     cls->ready--;
930     cls->socket.read_ready = (cls->ready != 0);
931 
932     cls->socket.kq_available--;
933     cls->socket.read_ready = (cls->socket.kq_available != 0);
934 
935     len = c->remote->socklen;
936 
937     if (len >= sizeof(struct sockaddr)) {
938         sa = &c->remote->u.sockaddr;
939 
940     } else {
941         sa = NULL;
942         len = 0;
943     }
944 
945     s = accept(cls->socket.fd, sa, &len);
946 
947     if (s != -1) {
948         c->socket.fd = s;
949 
950         nxt_debug(task, "accept(%d): %d", cls->socket.fd, s);
951 
952         nxt_event_conn_accept(task, cls, c);
953         return;
954     }
955 
956     nxt_event_conn_accept_error(task, cls, "accept", nxt_errno);
957 }
958 
959 
960 /*
961  * nxt_kqueue_event_conn_io_read() is just a wrapper to eliminate the
962  * readv() or recv() syscall if a remote side just closed connection.
963  */
964 
965 static void
966 nxt_kqueue_event_conn_io_read(nxt_task_t *task, void *obj, void *data)
967 {
968     nxt_event_conn_t  *c;
969 
970     c = obj;
971 
972     nxt_debug(task, "kqueue event conn read fd:%d", c->socket.fd);
973 
974     if (c->socket.kq_available == 0 && c->socket.kq_eof) {
975         nxt_debug(task, "kevent fd:%d eof", c->socket.fd);
976 
977         c->socket.closed = 1;
978         nxt_work_queue_add(c->read_work_queue, c->read_state->close_handler,
979                            task, c, data);
980         return;
981     }
982 
983     nxt_event_conn_io_read(task, c, data);
984 }
985 
986 
987 /*
988  * nxt_kqueue_event_conn_io_recvbuf() is just wrapper around standard
989  * nxt_event_conn_io_recvbuf() to eliminate the readv() or recv() syscalls
990  * if there is no pending data or a remote side closed connection.
991  */
992 
993 static ssize_t
994 nxt_kqueue_event_conn_io_recvbuf(nxt_event_conn_t *c, nxt_buf_t *b)
995 {
996     ssize_t  n;
997 
998     if (c->socket.kq_available == 0 && c->socket.kq_eof) {
999         c->socket.closed = 1;
1000         return 0;
1001     }
1002 
1003     n = nxt_event_conn_io_recvbuf(c, b);
1004 
1005     if (n > 0) {
1006         c->socket.kq_available -= n;
1007 
1008         if (c->socket.kq_available < 0) {
1009             c->socket.kq_available = 0;
1010         }
1011 
1012         nxt_debug(c->socket.task, "kevent fd:%d avail:%D eof:%d",
1013                   c->socket.fd, c->socket.kq_available, c->socket.kq_eof);
1014 
1015         c->socket.read_ready = (c->socket.kq_available != 0
1016                                 || c->socket.kq_eof);
1017     }
1018 
1019     return n;
1020 }
1021