xref: /unit/src/nxt_kqueue_engine.c (revision 1008:84f2370bd642)
1 
2 /*
3  * Copyright (C) Igor Sysoev
4  * Copyright (C) NGINX, Inc.
5  */
6 
7 #include <nxt_main.h>
8 
9 
10 /*
11  * kqueue()      has been introduced in FreeBSD 4.1 and then was ported
12  *               to OpenBSD 2.9, MacOSX 10.3 (Panther), and NetBSD 2.0.
13  *               DragonFlyBSD inherited it with FreeBSD 4 code base.
14  *
15  * NOTE_REVOKE   has been introduced in FreeBSD 4.3 and then was ported
16  *               to OpenBSD 2.9, MacOSX 10.3 (Panther), and NetBSD 2.0.
17  *               DragonFlyBSD inherited it with FreeBSD 4 code base.
18  *
19  * EVFILT_TIMER  has been introduced in FreeBSD 4.4-STABLE and then was
20  *               ported to NetBSD 2.0, MacOSX 10.4 (Tiger), and OpenBSD 4.2.
21  *               DragonFlyBSD inherited it with FreeBSD 4 code base.
22  *
23  * EVFILT_USER and EV_DISPATCH have been introduced in MacOSX 10.6 (Snow
24  *               Leopard) as part of the Grand Central Dispatch framework
25  *               and then were ported to FreeBSD 8.0-STABLE as part of the
26  *               libdispatch support.
27  */
28 
29 
30 /*
31  * EV_DISPATCH is better because it just disables an event on delivery
32  * whilst EV_ONESHOT deletes the event.  This eliminates in-kernel memory
33  * deallocation and probable subsequent allocation with a lock acquiring.
34  */
35 #ifdef EV_DISPATCH
36 #define NXT_KEVENT_ONESHOT  EV_DISPATCH
37 #else
38 #define NXT_KEVENT_ONESHOT  EV_ONESHOT
39 #endif
40 
41 
42 #if (NXT_NETBSD)
43 /* NetBSD defines the kevent.udata field as intptr_t. */
44 
45 #define nxt_kevent_set_udata(udata)  (intptr_t) (udata)
46 #define nxt_kevent_get_udata(udata)  (void *) (udata)
47 
48 #else
49 #define nxt_kevent_set_udata(udata)  (void *) (udata)
50 #define nxt_kevent_get_udata(udata)  (udata)
51 #endif
52 
53 
54 static nxt_int_t nxt_kqueue_create(nxt_event_engine_t *engine,
55     nxt_uint_t mchanges, nxt_uint_t mevents);
56 static void nxt_kqueue_free(nxt_event_engine_t *engine);
57 static void nxt_kqueue_enable(nxt_event_engine_t *engine, nxt_fd_event_t *ev);
58 static void nxt_kqueue_disable(nxt_event_engine_t *engine, nxt_fd_event_t *ev);
59 static void nxt_kqueue_delete(nxt_event_engine_t *engine, nxt_fd_event_t *ev);
60 static nxt_bool_t nxt_kqueue_close(nxt_event_engine_t *engine,
61     nxt_fd_event_t *ev);
62 static void nxt_kqueue_enable_read(nxt_event_engine_t *engine,
63     nxt_fd_event_t *ev);
64 static void nxt_kqueue_enable_write(nxt_event_engine_t *engine,
65     nxt_fd_event_t *ev);
66 static void nxt_kqueue_disable_read(nxt_event_engine_t *engine,
67     nxt_fd_event_t *ev);
68 static void nxt_kqueue_disable_write(nxt_event_engine_t *engine,
69     nxt_fd_event_t *ev);
70 static void nxt_kqueue_block_read(nxt_event_engine_t *engine,
71     nxt_fd_event_t *ev);
72 static void nxt_kqueue_block_write(nxt_event_engine_t *engine,
73     nxt_fd_event_t *ev);
74 static void nxt_kqueue_oneshot_read(nxt_event_engine_t *engine,
75     nxt_fd_event_t *ev);
76 static void nxt_kqueue_oneshot_write(nxt_event_engine_t *engine,
77     nxt_fd_event_t *ev);
78 static void nxt_kqueue_enable_accept(nxt_event_engine_t *engine,
79     nxt_fd_event_t *ev);
80 static void nxt_kqueue_enable_file(nxt_event_engine_t *engine,
81     nxt_file_event_t *ev);
82 static void nxt_kqueue_close_file(nxt_event_engine_t *engine,
83     nxt_file_event_t *ev);
84 static void nxt_kqueue_fd_set(nxt_event_engine_t *engine, nxt_fd_event_t *ev,
85     nxt_int_t filter, nxt_uint_t flags);
86 static struct kevent *nxt_kqueue_get_kevent(nxt_event_engine_t *engine);
87 static void nxt_kqueue_error(nxt_event_engine_t *engine);
88 static void nxt_kqueue_fd_error_handler(nxt_task_t *task, void *obj,
89     void *data);
90 static void nxt_kqueue_file_error_handler(nxt_task_t *task, void *obj,
91     void *data);
92 static nxt_int_t nxt_kqueue_add_signal(nxt_event_engine_t *engine,
93     const nxt_sig_event_t *sigev);
94 #if (NXT_HAVE_EVFILT_USER)
95 static nxt_int_t nxt_kqueue_enable_post(nxt_event_engine_t *engine,
96     nxt_work_handler_t handler);
97 static void nxt_kqueue_signal(nxt_event_engine_t *engine, nxt_uint_t signo);
98 #endif
99 static void nxt_kqueue_poll(nxt_event_engine_t *engine, nxt_msec_t timeout);
100 
101 static void nxt_kqueue_conn_io_connect(nxt_task_t *task, void *obj,
102     void *data);
103 static void nxt_kqueue_conn_connected(nxt_task_t *task, void *obj,
104     void *data);
105 static void nxt_kqueue_listen_handler(nxt_task_t *task, void *obj, void *data);
106 static void nxt_kqueue_conn_io_accept(nxt_task_t *task, void *obj,
107     void *data);
108 static void nxt_kqueue_conn_io_read(nxt_task_t *task, void *obj,
109     void *data);
110 static ssize_t nxt_kqueue_conn_io_recvbuf(nxt_conn_t *c, nxt_buf_t *b);
111 
112 
113 static nxt_conn_io_t  nxt_kqueue_conn_io = {
114     .connect = nxt_kqueue_conn_io_connect,
115     .accept = nxt_kqueue_conn_io_accept,
116 
117     .read = nxt_kqueue_conn_io_read,
118     .recvbuf = nxt_kqueue_conn_io_recvbuf,
119     .recv = nxt_conn_io_recv,
120 
121     .write = nxt_conn_io_write,
122     .sendbuf = nxt_conn_io_sendbuf,
123 
124 #if (NXT_HAVE_FREEBSD_SENDFILE)
125     .old_sendbuf = nxt_freebsd_event_conn_io_sendfile,
126 #elif (NXT_HAVE_MACOSX_SENDFILE)
127     .old_sendbuf = nxt_macosx_event_conn_io_sendfile,
128 #else
129     .old_sendbuf = nxt_event_conn_io_sendbuf,
130 #endif
131 
132     .writev = nxt_event_conn_io_writev,
133     .send = nxt_event_conn_io_send,
134 };
135 
136 
137 const nxt_event_interface_t  nxt_kqueue_engine = {
138     "kqueue",
139     nxt_kqueue_create,
140     nxt_kqueue_free,
141     nxt_kqueue_enable,
142     nxt_kqueue_disable,
143     nxt_kqueue_delete,
144     nxt_kqueue_close,
145     nxt_kqueue_enable_read,
146     nxt_kqueue_enable_write,
147     nxt_kqueue_disable_read,
148     nxt_kqueue_disable_write,
149     nxt_kqueue_block_read,
150     nxt_kqueue_block_write,
151     nxt_kqueue_oneshot_read,
152     nxt_kqueue_oneshot_write,
153     nxt_kqueue_enable_accept,
154     nxt_kqueue_enable_file,
155     nxt_kqueue_close_file,
156 #if (NXT_HAVE_EVFILT_USER)
157     nxt_kqueue_enable_post,
158     nxt_kqueue_signal,
159 #else
160     NULL,
161     NULL,
162 #endif
163     nxt_kqueue_poll,
164 
165     &nxt_kqueue_conn_io,
166 
167     NXT_FILE_EVENTS,
168     NXT_SIGNAL_EVENTS,
169 };
170 
171 
172 static nxt_int_t
173 nxt_kqueue_create(nxt_event_engine_t *engine, nxt_uint_t mchanges,
174     nxt_uint_t mevents)
175 {
176     const nxt_sig_event_t  *sigev;
177 
178     engine->u.kqueue.fd = -1;
179     engine->u.kqueue.mchanges = mchanges;
180     engine->u.kqueue.mevents = mevents;
181     engine->u.kqueue.pid = nxt_pid;
182 
183     engine->u.kqueue.changes = nxt_malloc(sizeof(struct kevent) * mchanges);
184     if (engine->u.kqueue.changes == NULL) {
185         goto fail;
186     }
187 
188     engine->u.kqueue.events = nxt_malloc(sizeof(struct kevent) * mevents);
189     if (engine->u.kqueue.events == NULL) {
190         goto fail;
191     }
192 
193     engine->u.kqueue.fd = kqueue();
194     if (engine->u.kqueue.fd == -1) {
195         nxt_alert(&engine->task, "kqueue() failed %E", nxt_errno);
196         goto fail;
197     }
198 
199     nxt_debug(&engine->task, "kqueue(): %d", engine->u.kqueue.fd);
200 
201     if (engine->signals != NULL) {
202         for (sigev = engine->signals->sigev; sigev->signo != 0; sigev++) {
203             if (nxt_kqueue_add_signal(engine, sigev) != NXT_OK) {
204                 goto fail;
205             }
206         }
207     }
208 
209     return NXT_OK;
210 
211 fail:
212 
213     nxt_kqueue_free(engine);
214 
215     return NXT_ERROR;
216 }
217 
218 
219 static void
220 nxt_kqueue_free(nxt_event_engine_t *engine)
221 {
222     nxt_fd_t  fd;
223 
224     fd = engine->u.kqueue.fd;
225 
226     nxt_debug(&engine->task, "kqueue %d free", fd);
227 
228     if (fd != -1 && engine->u.kqueue.pid == nxt_pid) {
229         /* kqueue is not inherited by fork() */
230 
231         if (close(fd) != 0) {
232             nxt_alert(&engine->task, "kqueue close(%d) failed %E",
233                       fd, nxt_errno);
234         }
235     }
236 
237     nxt_free(engine->u.kqueue.events);
238     nxt_free(engine->u.kqueue.changes);
239 
240     nxt_memzero(&engine->u.kqueue, sizeof(nxt_kqueue_engine_t));
241 }
242 
243 
244 static void
245 nxt_kqueue_enable(nxt_event_engine_t *engine, nxt_fd_event_t *ev)
246 {
247     nxt_kqueue_enable_read(engine, ev);
248     nxt_kqueue_enable_write(engine, ev);
249 }
250 
251 
252 /*
253  * EV_DISABLE is better because it eliminates in-kernel memory
254  * deallocation and probable subsequent allocation with a lock acquiring.
255  */
256 
257 static void
258 nxt_kqueue_disable(nxt_event_engine_t *engine, nxt_fd_event_t *ev)
259 {
260     if (ev->read != NXT_EVENT_INACTIVE) {
261         ev->read = NXT_EVENT_INACTIVE;
262         nxt_kqueue_fd_set(engine, ev, EVFILT_READ, EV_DISABLE);
263     }
264 
265     if (ev->write != NXT_EVENT_INACTIVE) {
266         ev->write = NXT_EVENT_INACTIVE;
267         nxt_kqueue_fd_set(engine, ev, EVFILT_WRITE, EV_DISABLE);
268     }
269 }
270 
271 
272 static void
273 nxt_kqueue_delete(nxt_event_engine_t *engine, nxt_fd_event_t *ev)
274 {
275     if (ev->read != NXT_EVENT_INACTIVE) {
276         ev->read = NXT_EVENT_INACTIVE;
277         nxt_kqueue_fd_set(engine, ev, EVFILT_READ, EV_DELETE);
278     }
279 
280     if (ev->write != NXT_EVENT_INACTIVE) {
281         ev->write = NXT_EVENT_INACTIVE;
282         nxt_kqueue_fd_set(engine, ev, EVFILT_WRITE, EV_DELETE);
283     }
284 }
285 
286 
287 /*
288  * kqueue(2):
289  *
290  *   Calling close() on a file descriptor will remove any kevents that
291  *   reference the descriptor.
292  *
293  * So nxt_kqueue_close() returns true only if there are pending events.
294  */
295 
296 static nxt_bool_t
297 nxt_kqueue_close(nxt_event_engine_t *engine, nxt_fd_event_t *ev)
298 {
299     struct kevent  *kev, *end;
300 
301     ev->read = NXT_EVENT_INACTIVE;
302     ev->write = NXT_EVENT_INACTIVE;
303 
304     end = &engine->u.kqueue.changes[engine->u.kqueue.nchanges];
305 
306     for (kev = engine->u.kqueue.changes; kev < end; kev++) {
307         if (kev->ident == (uintptr_t) ev->fd) {
308             return 1;
309         }
310     }
311 
312     return 0;
313 }
314 
315 
316 /*
317  * The kqueue event engine uses only three states: inactive, blocked, and
318  * active.  An active oneshot event is marked as it is in the default
319  * state.  The event will be converted eventually to the default EV_CLEAR
320  * mode after it will become inactive after delivery.
321  */
322 
323 static void
324 nxt_kqueue_enable_read(nxt_event_engine_t *engine, nxt_fd_event_t *ev)
325 {
326     if (ev->read == NXT_EVENT_INACTIVE) {
327         nxt_kqueue_fd_set(engine, ev, EVFILT_READ,
328                           EV_ADD | EV_ENABLE | EV_CLEAR);
329     }
330 
331     ev->read = NXT_EVENT_ACTIVE;
332 }
333 
334 
335 static void
336 nxt_kqueue_enable_write(nxt_event_engine_t *engine, nxt_fd_event_t *ev)
337 {
338     if (ev->write == NXT_EVENT_INACTIVE) {
339         nxt_kqueue_fd_set(engine, ev, EVFILT_WRITE,
340                           EV_ADD | EV_ENABLE | EV_CLEAR);
341     }
342 
343     ev->write = NXT_EVENT_ACTIVE;
344 }
345 
346 
347 static void
348 nxt_kqueue_disable_read(nxt_event_engine_t *engine, nxt_fd_event_t *ev)
349 {
350     ev->read = NXT_EVENT_INACTIVE;
351 
352     nxt_kqueue_fd_set(engine, ev, EVFILT_READ, EV_DISABLE);
353 }
354 
355 
356 static void
357 nxt_kqueue_disable_write(nxt_event_engine_t *engine, nxt_fd_event_t *ev)
358 {
359     ev->write = NXT_EVENT_INACTIVE;
360 
361     nxt_kqueue_fd_set(engine, ev, EVFILT_WRITE, EV_DISABLE);
362 }
363 
364 
365 static void
366 nxt_kqueue_block_read(nxt_event_engine_t *engine, nxt_fd_event_t *ev)
367 {
368     if (ev->read != NXT_EVENT_INACTIVE) {
369         ev->read = NXT_EVENT_BLOCKED;
370     }
371 }
372 
373 
374 static void
375 nxt_kqueue_block_write(nxt_event_engine_t *engine, nxt_fd_event_t *ev)
376 {
377     if (ev->write != NXT_EVENT_INACTIVE) {
378         ev->write = NXT_EVENT_BLOCKED;
379     }
380 }
381 
382 
383 static void
384 nxt_kqueue_oneshot_read(nxt_event_engine_t *engine, nxt_fd_event_t *ev)
385 {
386     ev->write = NXT_EVENT_ACTIVE;
387 
388     nxt_kqueue_fd_set(engine, ev, EVFILT_WRITE,
389                       EV_ADD | EV_ENABLE | NXT_KEVENT_ONESHOT);
390 }
391 
392 
393 static void
394 nxt_kqueue_oneshot_write(nxt_event_engine_t *engine, nxt_fd_event_t *ev)
395 {
396     ev->write = NXT_EVENT_ACTIVE;
397 
398     nxt_kqueue_fd_set(engine, ev, EVFILT_WRITE,
399                       EV_ADD | EV_ENABLE | NXT_KEVENT_ONESHOT);
400 }
401 
402 
403 static void
404 nxt_kqueue_enable_accept(nxt_event_engine_t *engine, nxt_fd_event_t *ev)
405 {
406     ev->read = NXT_EVENT_ACTIVE;
407     ev->read_handler = nxt_kqueue_listen_handler;
408 
409     nxt_kqueue_fd_set(engine, ev, EVFILT_READ, EV_ADD | EV_ENABLE);
410 }
411 
412 
413 static void
414 nxt_kqueue_enable_file(nxt_event_engine_t *engine, nxt_file_event_t *ev)
415 {
416     struct kevent  *kev;
417 
418     const nxt_int_t   flags = EV_ADD | EV_ENABLE | EV_ONESHOT;
419     const nxt_uint_t  fflags = NOTE_DELETE | NOTE_WRITE | NOTE_EXTEND
420                                | NOTE_ATTRIB | NOTE_RENAME | NOTE_REVOKE;
421 
422     nxt_debug(&engine->task, "kevent(%d) set: id:%d ft:%i fl:%04Xd, ff:%04XuD",
423               engine->u.kqueue.fd, ev->file->fd, EVFILT_VNODE, flags, fflags);
424 
425     kev = nxt_kqueue_get_kevent(engine);
426 
427     kev->ident = ev->file->fd;
428     kev->filter = EVFILT_VNODE;
429     kev->flags = flags;
430     kev->fflags = fflags;
431     kev->data = 0;
432     kev->udata = nxt_kevent_set_udata(ev);
433 }
434 
435 
436 static void
437 nxt_kqueue_close_file(nxt_event_engine_t *engine, nxt_file_event_t *ev)
438 {
439     /* TODO: pending event. */
440 }
441 
442 
443 static void
444 nxt_kqueue_fd_set(nxt_event_engine_t *engine, nxt_fd_event_t *ev,
445     nxt_int_t filter, nxt_uint_t flags)
446 {
447     struct kevent  *kev;
448 
449     nxt_debug(ev->task, "kevent(%d) set event: id:%d ft:%i fl:%04Xui",
450               engine->u.kqueue.fd, ev->fd, filter, flags);
451 
452     kev = nxt_kqueue_get_kevent(engine);
453 
454     kev->ident = ev->fd;
455     kev->filter = filter;
456     kev->flags = flags;
457     kev->fflags = 0;
458     kev->data = 0;
459     kev->udata = nxt_kevent_set_udata(ev);
460 }
461 
462 
463 static struct kevent *
464 nxt_kqueue_get_kevent(nxt_event_engine_t *engine)
465 {
466     int  ret, nchanges;
467 
468     nchanges = engine->u.kqueue.nchanges;
469 
470     if (nxt_slow_path(nchanges >= engine->u.kqueue.mchanges)) {
471 
472         nxt_debug(&engine->task, "kevent(%d) changes:%d",
473                   engine->u.kqueue.fd, nchanges);
474 
475         ret = kevent(engine->u.kqueue.fd, engine->u.kqueue.changes, nchanges,
476                      NULL, 0, NULL);
477 
478         if (nxt_slow_path(ret != 0)) {
479             nxt_alert(&engine->task, "kevent(%d) failed %E",
480                       engine->u.kqueue.fd, nxt_errno);
481 
482             nxt_kqueue_error(engine);
483         }
484 
485         engine->u.kqueue.nchanges = 0;
486     }
487 
488     return &engine->u.kqueue.changes[engine->u.kqueue.nchanges++];
489 }
490 
491 
492 static void
493 nxt_kqueue_error(nxt_event_engine_t *engine)
494 {
495     struct kevent     *kev, *end;
496     nxt_fd_event_t    *ev;
497     nxt_file_event_t  *fev;
498     nxt_work_queue_t  *wq;
499 
500     wq = &engine->fast_work_queue;
501     end = &engine->u.kqueue.changes[engine->u.kqueue.nchanges];
502 
503     for (kev = engine->u.kqueue.changes; kev < end; kev++) {
504 
505         switch (kev->filter) {
506 
507         case EVFILT_READ:
508         case EVFILT_WRITE:
509             ev = nxt_kevent_get_udata(kev->udata);
510             nxt_work_queue_add(wq, nxt_kqueue_fd_error_handler,
511                                ev->task, ev, ev->data);
512             break;
513 
514         case EVFILT_VNODE:
515             fev = nxt_kevent_get_udata(kev->udata);
516             nxt_work_queue_add(wq, nxt_kqueue_file_error_handler,
517                                fev->task, fev, fev->data);
518             break;
519         }
520     }
521 }
522 
523 
524 static void
525 nxt_kqueue_fd_error_handler(nxt_task_t *task, void *obj, void *data)
526 {
527     nxt_fd_event_t  *ev;
528 
529     ev = obj;
530 
531     nxt_debug(task, "kqueue fd error handler fd:%d", ev->fd);
532 
533     if (ev->kq_eof && ev->kq_errno != 0) {
534         ev->error = ev->kq_errno;
535         nxt_log(task, nxt_socket_error_level(ev->kq_errno),
536                 "kevent() reported error on descriptor %d %E",
537                 ev->fd, ev->kq_errno);
538     }
539 
540     ev->read = NXT_EVENT_INACTIVE;
541     ev->write = NXT_EVENT_INACTIVE;
542     ev->error = ev->kq_errno;
543 
544     ev->error_handler(task, ev, data);
545 }
546 
547 
548 static void
549 nxt_kqueue_file_error_handler(nxt_task_t *task, void *obj, void *data)
550 {
551     nxt_file_event_t  *ev;
552 
553     ev = obj;
554 
555     nxt_debug(task, "kqueue file error handler fd:%d", ev->file->fd);
556 
557     ev->handler(task, ev, data);
558 }
559 
560 
561 static nxt_int_t
562 nxt_kqueue_add_signal(nxt_event_engine_t *engine, const nxt_sig_event_t *sigev)
563 {
564     int               signo;
565     struct kevent     kev;
566     struct sigaction  sa;
567 
568     signo = sigev->signo;
569 
570     nxt_memzero(&sa, sizeof(struct sigaction));
571     sigemptyset(&sa.sa_mask);
572 
573     /*
574      * SIGCHLD must not be set to SIG_IGN, since kqueue cannot catch
575      * this signal.  It should be set to SIG_DFL instead.  And although
576      * SIGCHLD default action is also ignoring, nevertheless SIG_DFL
577      * allows kqueue to catch the signal.
578      */
579     sa.sa_handler = (signo == SIGCHLD) ? SIG_DFL : SIG_IGN;
580 
581     if (sigaction(signo, &sa, NULL) != 0) {
582         nxt_alert(&engine->task, "sigaction(%d) failed %E", signo, nxt_errno);
583 
584         return NXT_ERROR;
585     }
586 
587     nxt_debug(&engine->task, "kevent(%d) signo:%d (%s)",
588               engine->u.kqueue.fd, signo, sigev->name);
589 
590     kev.ident = signo;
591     kev.filter = EVFILT_SIGNAL;
592     kev.flags = EV_ADD;
593     kev.fflags = 0;
594     kev.data = 0;
595     kev.udata = nxt_kevent_set_udata(sigev);
596 
597     if (kevent(engine->u.kqueue.fd, &kev, 1, NULL, 0, NULL) == 0) {
598         return NXT_OK;
599     }
600 
601     nxt_alert(&engine->task, "kevent(%d) failed %E", kqueue, nxt_errno);
602 
603     return NXT_ERROR;
604 }
605 
606 
607 #if (NXT_HAVE_EVFILT_USER)
608 
609 static nxt_int_t
610 nxt_kqueue_enable_post(nxt_event_engine_t *engine, nxt_work_handler_t handler)
611 {
612     struct kevent  kev;
613 
614     /* EVFILT_USER must be added to a kqueue before it can be triggered. */
615 
616     kev.ident = 0;
617     kev.filter = EVFILT_USER;
618     kev.flags = EV_ADD | EV_CLEAR;
619     kev.fflags = 0;
620     kev.data = 0;
621     kev.udata = NULL;
622 
623     engine->u.kqueue.post_handler = handler;
624 
625     if (kevent(engine->u.kqueue.fd, &kev, 1, NULL, 0, NULL) == 0) {
626         return NXT_OK;
627     }
628 
629     nxt_alert(&engine->task, "kevent(%d) failed %E",
630               engine->u.kqueue.fd, nxt_errno);
631 
632     return NXT_ERROR;
633 }
634 
635 
636 static void
637 nxt_kqueue_signal(nxt_event_engine_t *engine, nxt_uint_t signo)
638 {
639     struct kevent  kev;
640 
641     /*
642      * kqueue has a builtin signal processing support, so the function
643      * is used only to post events and the signo argument is ignored.
644      */
645 
646     kev.ident = 0;
647     kev.filter = EVFILT_USER;
648     kev.flags = 0;
649     kev.fflags = NOTE_TRIGGER;
650     kev.data = 0;
651     kev.udata = NULL;
652 
653     if (kevent(engine->u.kqueue.fd, &kev, 1, NULL, 0, NULL) != 0) {
654         nxt_alert(&engine->task, "kevent(%d) failed %E",
655                   engine->u.kqueue.fd, nxt_errno);
656     }
657 }
658 
659 #endif
660 
661 
662 static void
663 nxt_kqueue_poll(nxt_event_engine_t *engine, nxt_msec_t timeout)
664 {
665     int                 nevents;
666     void                *obj, *data;
667     nxt_int_t           i;
668     nxt_err_t           err;
669     nxt_uint_t          level;
670     nxt_bool_t          error, eof;
671     nxt_task_t          *task;
672     struct kevent       *kev;
673     nxt_fd_event_t      *ev;
674     nxt_sig_event_t     *sigev;
675     struct timespec     ts, *tp;
676     nxt_file_event_t    *fev;
677     nxt_work_queue_t    *wq;
678     nxt_work_handler_t  handler;
679 
680     if (timeout == NXT_INFINITE_MSEC) {
681         tp = NULL;
682 
683     } else {
684         ts.tv_sec = timeout / 1000;
685         ts.tv_nsec = (timeout % 1000) * 1000000;
686         tp = &ts;
687     }
688 
689     nxt_debug(&engine->task, "kevent(%d) changes:%d timeout:%M",
690               engine->u.kqueue.fd, engine->u.kqueue.nchanges, timeout);
691 
692     nevents = kevent(engine->u.kqueue.fd,
693                      engine->u.kqueue.changes, engine->u.kqueue.nchanges,
694                      engine->u.kqueue.events, engine->u.kqueue.mevents, tp);
695 
696     err = (nevents == -1) ? nxt_errno : 0;
697 
698     nxt_thread_time_update(engine->task.thread);
699 
700     nxt_debug(&engine->task, "kevent(%d): %d", engine->u.kqueue.fd, nevents);
701 
702     if (nevents == -1) {
703         level = (err == NXT_EINTR) ? NXT_LOG_INFO : NXT_LOG_ALERT;
704 
705         nxt_log(&engine->task, level, "kevent(%d) failed %E",
706                 engine->u.kqueue.fd, err);
707 
708         nxt_kqueue_error(engine);
709         return;
710     }
711 
712     engine->u.kqueue.nchanges = 0;
713 
714     for (i = 0; i < nevents; i++) {
715 
716         kev = &engine->u.kqueue.events[i];
717 
718         nxt_debug(&engine->task,
719                   (kev->ident > 0x8000000 && kev->ident != (uintptr_t) -1) ?
720                       "kevent: id:%p ft:%d fl:%04Xd ff:%d d:%d ud:%p":
721                       "kevent: id:%d ft:%d fl:%04Xd ff:%d d:%d ud:%p",
722                   kev->ident, kev->filter, kev->flags, kev->fflags,
723                   kev->data, kev->udata);
724 
725         error = (kev->flags & EV_ERROR);
726 
727         if (nxt_slow_path(error)) {
728             nxt_alert(&engine->task,
729                       "kevent(%d) error %E on ident:%d filter:%d",
730                       engine->u.kqueue.fd, kev->data, kev->ident, kev->filter);
731         }
732 
733         task = &engine->task;
734         wq = &engine->fast_work_queue;
735         handler = nxt_kqueue_fd_error_handler;
736         obj = nxt_kevent_get_udata(kev->udata);
737 
738         switch (kev->filter) {
739 
740         case EVFILT_READ:
741             ev = obj;
742             ev->read_ready = 1;
743             ev->kq_available = (int32_t) kev->data;
744             err = kev->fflags;
745             eof = (kev->flags & EV_EOF) != 0;
746             ev->kq_errno = err;
747             ev->kq_eof = eof;
748 
749             if (ev->read <= NXT_EVENT_BLOCKED) {
750                 nxt_debug(ev->task, "blocked read event fd:%d", ev->fd);
751                 continue;
752             }
753 
754             if ((kev->flags & NXT_KEVENT_ONESHOT) != 0) {
755                 ev->read = NXT_EVENT_INACTIVE;
756             }
757 
758             if (nxt_slow_path(ev->kq_available == 0 && eof && err != 0)) {
759                 error = 1;
760             }
761 
762             if (nxt_fast_path(!error)) {
763                 handler = ev->read_handler;
764                 wq = ev->read_work_queue;
765             }
766 
767             task = ev->task;
768             data = ev->data;
769 
770             break;
771 
772         case EVFILT_WRITE:
773             ev = obj;
774             ev->write_ready = 1;
775             err = kev->fflags;
776             eof = (kev->flags & EV_EOF) != 0;
777             ev->kq_errno = err;
778             ev->kq_eof = eof;
779 
780             if (ev->write <= NXT_EVENT_BLOCKED) {
781                 nxt_debug(ev->task, "blocked write event fd:%d", ev->fd);
782                 continue;
783             }
784 
785             if ((kev->flags & NXT_KEVENT_ONESHOT) != 0) {
786                 ev->write = NXT_EVENT_INACTIVE;
787             }
788 
789             if (nxt_slow_path(eof && err != 0)) {
790                 error = 1;
791             }
792 
793             if (nxt_fast_path(!error)) {
794                 handler = ev->write_handler;
795                 wq = ev->write_work_queue;
796             }
797 
798             task = ev->task;
799             data = ev->data;
800 
801             break;
802 
803         case EVFILT_VNODE:
804             fev = obj;
805             handler = fev->handler;
806             task = fev->task;
807             data = fev->data;
808             break;
809 
810         case EVFILT_SIGNAL:
811             sigev = obj;
812             obj = (void *) kev->ident;
813             handler = sigev->handler;
814             data = (void *) sigev->name;
815             break;
816 
817 #if (NXT_HAVE_EVFILT_USER)
818 
819         case EVFILT_USER:
820             handler = engine->u.kqueue.post_handler;
821             data = NULL;
822             break;
823 
824 #endif
825 
826         default:
827 
828 #if (NXT_DEBUG)
829             nxt_alert(&engine->task,
830                       "unexpected kevent(%d) filter %d on ident %d",
831                       engine->u.kqueue.fd, kev->filter, kev->ident);
832 #endif
833 
834             continue;
835         }
836 
837         nxt_work_queue_add(wq, handler, task, obj, data);
838     }
839 }
840 
841 
842 /*
843  * nxt_kqueue_event_conn_io_connect() eliminates the
844  * getsockopt() syscall to test pending connect() error.
845  */
846 
847 static void
848 nxt_kqueue_conn_io_connect(nxt_task_t *task, void *obj, void *data)
849 {
850     nxt_conn_t                    *c;
851     nxt_event_engine_t            *engine;
852     nxt_work_handler_t            handler;
853     const nxt_event_conn_state_t  *state;
854 
855     c = obj;
856 
857     state = c->write_state;
858 
859     switch (nxt_socket_connect(task, c->socket.fd, c->remote)) {
860 
861     case NXT_OK:
862         c->socket.write_ready = 1;
863         handler = state->ready_handler;
864         break;
865 
866     case NXT_AGAIN:
867         c->socket.write_handler = nxt_kqueue_conn_connected;
868         c->socket.error_handler = nxt_conn_connect_error;
869 
870         engine = task->thread->engine;
871         nxt_conn_timer(engine, c, state, &c->write_timer);
872 
873         nxt_kqueue_enable_write(engine, &c->socket);
874         return;
875 
876     case NXT_DECLINED:
877         handler = state->close_handler;
878         break;
879 
880     default: /* NXT_ERROR */
881         handler = state->error_handler;
882         break;
883     }
884 
885     nxt_work_queue_add(c->write_work_queue, handler, task, c, data);
886 }
887 
888 
889 static void
890 nxt_kqueue_conn_connected(nxt_task_t *task, void *obj, void *data)
891 {
892     nxt_conn_t  *c;
893 
894     c = obj;
895 
896     nxt_debug(task, "kqueue conn connected fd:%d", c->socket.fd);
897 
898     c->socket.write = NXT_EVENT_BLOCKED;
899 
900     if (c->write_state->timer_autoreset) {
901         nxt_timer_disable(task->thread->engine, &c->write_timer);
902     }
903 
904     nxt_work_queue_add(c->write_work_queue, c->write_state->ready_handler,
905                        task, c, data);
906 }
907 
908 
909 static void
910 nxt_kqueue_listen_handler(nxt_task_t *task, void *obj, void *data)
911 {
912     nxt_listen_event_t  *lev;
913 
914     lev = obj;
915 
916     nxt_debug(task, "kevent fd:%d avail:%D",
917               lev->socket.fd, lev->socket.kq_available);
918 
919     lev->ready = nxt_min(lev->batch, (uint32_t) lev->socket.kq_available);
920 
921     nxt_kqueue_conn_io_accept(task, lev, data);
922 }
923 
924 
925 static void
926 nxt_kqueue_conn_io_accept(nxt_task_t *task, void *obj, void *data)
927 {
928     socklen_t           socklen;
929     nxt_conn_t          *c;
930     nxt_socket_t        s;
931     struct sockaddr     *sa;
932     nxt_listen_event_t  *lev;
933 
934     lev = obj;
935     c = lev->next;
936 
937     lev->ready--;
938     lev->socket.read_ready = (lev->ready != 0);
939 
940     lev->socket.kq_available--;
941     lev->socket.read_ready = (lev->socket.kq_available != 0);
942 
943     sa = &c->remote->u.sockaddr;
944     socklen = c->remote->socklen;
945     /*
946      * The returned socklen is ignored here,
947      * see comment in nxt_conn_io_accept().
948      */
949     s = accept(lev->socket.fd, sa, &socklen);
950 
951     if (s != -1) {
952         c->socket.fd = s;
953 
954         nxt_debug(task, "accept(%d): %d", lev->socket.fd, s);
955 
956         nxt_conn_accept(task, lev, c);
957         return;
958     }
959 
960     nxt_conn_accept_error(task, lev, "accept", nxt_errno);
961 }
962 
963 
964 /*
965  * nxt_kqueue_conn_io_read() is just a wrapper to eliminate the
966  * readv() or recv() syscall if a remote side just closed connection.
967  */
968 
969 static void
970 nxt_kqueue_conn_io_read(nxt_task_t *task, void *obj, void *data)
971 {
972     nxt_conn_t  *c;
973 
974     c = obj;
975 
976     nxt_debug(task, "kqueue conn read fd:%d", c->socket.fd);
977 
978     if (c->socket.kq_available == 0 && c->socket.kq_eof) {
979         nxt_debug(task, "kevent fd:%d eof", c->socket.fd);
980 
981         c->socket.closed = 1;
982         nxt_work_queue_add(c->read_work_queue, c->read_state->close_handler,
983                            task, c, data);
984         return;
985     }
986 
987     nxt_conn_io_read(task, c, data);
988 }
989 
990 
991 /*
992  * nxt_kqueue_conn_io_recvbuf() is just wrapper around standard
993  * nxt_conn_io_recvbuf() to eliminate the readv() or recv() syscalls
994  * if there is no pending data or a remote side closed connection.
995  */
996 
997 static ssize_t
998 nxt_kqueue_conn_io_recvbuf(nxt_conn_t *c, nxt_buf_t *b)
999 {
1000     ssize_t  n;
1001 
1002     if (c->socket.kq_available == 0 && c->socket.kq_eof) {
1003         c->socket.closed = 1;
1004         return 0;
1005     }
1006 
1007     n = nxt_conn_io_recvbuf(c, b);
1008 
1009     if (n > 0) {
1010         c->socket.kq_available -= n;
1011 
1012         if (c->socket.kq_available < 0) {
1013             c->socket.kq_available = 0;
1014         }
1015 
1016         nxt_debug(c->socket.task, "kevent fd:%d avail:%D eof:%d",
1017                   c->socket.fd, c->socket.kq_available, c->socket.kq_eof);
1018 
1019         c->socket.read_ready = (c->socket.kq_available != 0
1020                                 || c->socket.kq_eof);
1021     }
1022 
1023     return n;
1024 }
1025