xref: /unit/src/nxt_kqueue_engine.c (revision 1384:55e5f73d0e1e)
1 
2 /*
3  * Copyright (C) Igor Sysoev
4  * Copyright (C) NGINX, Inc.
5  */
6 
7 #include <nxt_main.h>
8 
9 
10 /*
11  * kqueue()      has been introduced in FreeBSD 4.1 and then was ported
12  *               to OpenBSD 2.9, MacOSX 10.3 (Panther), and NetBSD 2.0.
13  *               DragonFlyBSD inherited it with FreeBSD 4 code base.
14  *
15  * NOTE_REVOKE   has been introduced in FreeBSD 4.3 and then was ported
16  *               to OpenBSD 2.9, MacOSX 10.3 (Panther), and NetBSD 2.0.
17  *               DragonFlyBSD inherited it with FreeBSD 4 code base.
18  *
19  * EVFILT_TIMER  has been introduced in FreeBSD 4.4-STABLE and then was
20  *               ported to NetBSD 2.0, MacOSX 10.4 (Tiger), and OpenBSD 4.2.
21  *               DragonFlyBSD inherited it with FreeBSD 4 code base.
22  *
23  * EVFILT_USER and EV_DISPATCH have been introduced in MacOSX 10.6 (Snow
24  *               Leopard) as part of the Grand Central Dispatch framework
25  *               and then were ported to FreeBSD 8.0-STABLE as part of the
26  *               libdispatch support.
27  */
28 
29 
30 /*
31  * EV_DISPATCH is better because it just disables an event on delivery
32  * whilst EV_ONESHOT deletes the event.  This eliminates in-kernel memory
33  * deallocation and probable subsequent allocation with a lock acquiring.
34  */
35 #ifdef EV_DISPATCH
36 #define NXT_KEVENT_ONESHOT  EV_DISPATCH
37 #else
38 #define NXT_KEVENT_ONESHOT  EV_ONESHOT
39 #endif
40 
41 
42 #if (NXT_NETBSD)
43 /* NetBSD defines the kevent.udata field as intptr_t. */
44 
45 #define nxt_kevent_set_udata(udata)  (intptr_t) (udata)
46 #define nxt_kevent_get_udata(udata)  (void *) (udata)
47 
48 #else
49 #define nxt_kevent_set_udata(udata)  (void *) (udata)
50 #define nxt_kevent_get_udata(udata)  (udata)
51 #endif
52 
53 
54 static nxt_int_t nxt_kqueue_create(nxt_event_engine_t *engine,
55     nxt_uint_t mchanges, nxt_uint_t mevents);
56 static void nxt_kqueue_free(nxt_event_engine_t *engine);
57 static void nxt_kqueue_enable(nxt_event_engine_t *engine, nxt_fd_event_t *ev);
58 static void nxt_kqueue_disable(nxt_event_engine_t *engine, nxt_fd_event_t *ev);
59 static void nxt_kqueue_delete(nxt_event_engine_t *engine, nxt_fd_event_t *ev);
60 static nxt_bool_t nxt_kqueue_close(nxt_event_engine_t *engine,
61     nxt_fd_event_t *ev);
62 static void nxt_kqueue_enable_read(nxt_event_engine_t *engine,
63     nxt_fd_event_t *ev);
64 static void nxt_kqueue_enable_write(nxt_event_engine_t *engine,
65     nxt_fd_event_t *ev);
66 static void nxt_kqueue_disable_read(nxt_event_engine_t *engine,
67     nxt_fd_event_t *ev);
68 static void nxt_kqueue_disable_write(nxt_event_engine_t *engine,
69     nxt_fd_event_t *ev);
70 static void nxt_kqueue_block_read(nxt_event_engine_t *engine,
71     nxt_fd_event_t *ev);
72 static void nxt_kqueue_block_write(nxt_event_engine_t *engine,
73     nxt_fd_event_t *ev);
74 static void nxt_kqueue_oneshot_read(nxt_event_engine_t *engine,
75     nxt_fd_event_t *ev);
76 static void nxt_kqueue_oneshot_write(nxt_event_engine_t *engine,
77     nxt_fd_event_t *ev);
78 static void nxt_kqueue_enable_accept(nxt_event_engine_t *engine,
79     nxt_fd_event_t *ev);
80 static void nxt_kqueue_enable_file(nxt_event_engine_t *engine,
81     nxt_file_event_t *ev);
82 static void nxt_kqueue_close_file(nxt_event_engine_t *engine,
83     nxt_file_event_t *ev);
84 static void nxt_kqueue_fd_set(nxt_event_engine_t *engine, nxt_fd_event_t *ev,
85     nxt_int_t filter, nxt_uint_t flags);
86 static struct kevent *nxt_kqueue_get_kevent(nxt_event_engine_t *engine);
87 static void nxt_kqueue_error(nxt_event_engine_t *engine);
88 static void nxt_kqueue_fd_error_handler(nxt_task_t *task, void *obj,
89     void *data);
90 static void nxt_kqueue_file_error_handler(nxt_task_t *task, void *obj,
91     void *data);
92 static nxt_int_t nxt_kqueue_add_signal(nxt_event_engine_t *engine,
93     const nxt_sig_event_t *sigev);
94 #if (NXT_HAVE_EVFILT_USER)
95 static nxt_int_t nxt_kqueue_enable_post(nxt_event_engine_t *engine,
96     nxt_work_handler_t handler);
97 static void nxt_kqueue_signal(nxt_event_engine_t *engine, nxt_uint_t signo);
98 #endif
99 static void nxt_kqueue_poll(nxt_event_engine_t *engine, nxt_msec_t timeout);
100 
101 static void nxt_kqueue_conn_io_connect(nxt_task_t *task, void *obj,
102     void *data);
103 static void nxt_kqueue_conn_connected(nxt_task_t *task, void *obj,
104     void *data);
105 static void nxt_kqueue_listen_handler(nxt_task_t *task, void *obj, void *data);
106 static void nxt_kqueue_conn_io_accept(nxt_task_t *task, void *obj,
107     void *data);
108 static void nxt_kqueue_conn_io_read(nxt_task_t *task, void *obj,
109     void *data);
110 static ssize_t nxt_kqueue_conn_io_recvbuf(nxt_conn_t *c, nxt_buf_t *b);
111 
112 
113 static nxt_conn_io_t  nxt_kqueue_conn_io = {
114     .connect = nxt_kqueue_conn_io_connect,
115     .accept = nxt_kqueue_conn_io_accept,
116 
117     .read = nxt_kqueue_conn_io_read,
118     .recvbuf = nxt_kqueue_conn_io_recvbuf,
119     .recv = nxt_conn_io_recv,
120 
121     .write = nxt_conn_io_write,
122     .sendbuf = nxt_conn_io_sendbuf,
123 
124 #if (NXT_HAVE_FREEBSD_SENDFILE)
125     .old_sendbuf = nxt_freebsd_event_conn_io_sendfile,
126 #elif (NXT_HAVE_MACOSX_SENDFILE)
127     .old_sendbuf = nxt_macosx_event_conn_io_sendfile,
128 #else
129     .old_sendbuf = nxt_event_conn_io_sendbuf,
130 #endif
131 
132     .writev = nxt_event_conn_io_writev,
133     .send = nxt_event_conn_io_send,
134 };
135 
136 
137 const nxt_event_interface_t  nxt_kqueue_engine = {
138     "kqueue",
139     nxt_kqueue_create,
140     nxt_kqueue_free,
141     nxt_kqueue_enable,
142     nxt_kqueue_disable,
143     nxt_kqueue_delete,
144     nxt_kqueue_close,
145     nxt_kqueue_enable_read,
146     nxt_kqueue_enable_write,
147     nxt_kqueue_disable_read,
148     nxt_kqueue_disable_write,
149     nxt_kqueue_block_read,
150     nxt_kqueue_block_write,
151     nxt_kqueue_oneshot_read,
152     nxt_kqueue_oneshot_write,
153     nxt_kqueue_enable_accept,
154     nxt_kqueue_enable_file,
155     nxt_kqueue_close_file,
156 #if (NXT_HAVE_EVFILT_USER)
157     nxt_kqueue_enable_post,
158     nxt_kqueue_signal,
159 #else
160     NULL,
161     NULL,
162 #endif
163     nxt_kqueue_poll,
164 
165     &nxt_kqueue_conn_io,
166 
167     NXT_FILE_EVENTS,
168     NXT_SIGNAL_EVENTS,
169 };
170 
171 
172 static nxt_int_t
nxt_kqueue_create(nxt_event_engine_t * engine,nxt_uint_t mchanges,nxt_uint_t mevents)173 nxt_kqueue_create(nxt_event_engine_t *engine, nxt_uint_t mchanges,
174     nxt_uint_t mevents)
175 {
176     const nxt_sig_event_t  *sigev;
177 
178     engine->u.kqueue.fd = -1;
179     engine->u.kqueue.mchanges = mchanges;
180     engine->u.kqueue.mevents = mevents;
181     engine->u.kqueue.pid = nxt_pid;
182 
183     engine->u.kqueue.changes = nxt_malloc(sizeof(struct kevent) * mchanges);
184     if (engine->u.kqueue.changes == NULL) {
185         goto fail;
186     }
187 
188     engine->u.kqueue.events = nxt_malloc(sizeof(struct kevent) * mevents);
189     if (engine->u.kqueue.events == NULL) {
190         goto fail;
191     }
192 
193     engine->u.kqueue.fd = kqueue();
194     if (engine->u.kqueue.fd == -1) {
195         nxt_alert(&engine->task, "kqueue() failed %E", nxt_errno);
196         goto fail;
197     }
198 
199     nxt_debug(&engine->task, "kqueue(): %d", engine->u.kqueue.fd);
200 
201     if (engine->signals != NULL) {
202         for (sigev = engine->signals->sigev; sigev->signo != 0; sigev++) {
203             if (nxt_kqueue_add_signal(engine, sigev) != NXT_OK) {
204                 goto fail;
205             }
206         }
207     }
208 
209     return NXT_OK;
210 
211 fail:
212 
213     nxt_kqueue_free(engine);
214 
215     return NXT_ERROR;
216 }
217 
218 
219 static void
nxt_kqueue_free(nxt_event_engine_t * engine)220 nxt_kqueue_free(nxt_event_engine_t *engine)
221 {
222     nxt_fd_t  fd;
223 
224     fd = engine->u.kqueue.fd;
225 
226     nxt_debug(&engine->task, "kqueue %d free", fd);
227 
228     if (fd != -1 && engine->u.kqueue.pid == nxt_pid) {
229         /* kqueue is not inherited by fork() */
230 
231         if (close(fd) != 0) {
232             nxt_alert(&engine->task, "kqueue close(%d) failed %E",
233                       fd, nxt_errno);
234         }
235     }
236 
237     nxt_free(engine->u.kqueue.events);
238     nxt_free(engine->u.kqueue.changes);
239 
240     nxt_memzero(&engine->u.kqueue, sizeof(nxt_kqueue_engine_t));
241 }
242 
243 
244 static void
nxt_kqueue_enable(nxt_event_engine_t * engine,nxt_fd_event_t * ev)245 nxt_kqueue_enable(nxt_event_engine_t *engine, nxt_fd_event_t *ev)
246 {
247     nxt_kqueue_enable_read(engine, ev);
248     nxt_kqueue_enable_write(engine, ev);
249 }
250 
251 
252 /*
253  * EV_DISABLE is better because it eliminates in-kernel memory
254  * deallocation and probable subsequent allocation with a lock acquiring.
255  */
256 
257 static void
nxt_kqueue_disable(nxt_event_engine_t * engine,nxt_fd_event_t * ev)258 nxt_kqueue_disable(nxt_event_engine_t *engine, nxt_fd_event_t *ev)
259 {
260     if (ev->read != NXT_EVENT_INACTIVE) {
261         ev->read = NXT_EVENT_INACTIVE;
262         nxt_kqueue_fd_set(engine, ev, EVFILT_READ, EV_DISABLE);
263     }
264 
265     if (ev->write != NXT_EVENT_INACTIVE) {
266         ev->write = NXT_EVENT_INACTIVE;
267         nxt_kqueue_fd_set(engine, ev, EVFILT_WRITE, EV_DISABLE);
268     }
269 }
270 
271 
272 static void
nxt_kqueue_delete(nxt_event_engine_t * engine,nxt_fd_event_t * ev)273 nxt_kqueue_delete(nxt_event_engine_t *engine, nxt_fd_event_t *ev)
274 {
275     if (ev->read != NXT_EVENT_INACTIVE) {
276         ev->read = NXT_EVENT_INACTIVE;
277         nxt_kqueue_fd_set(engine, ev, EVFILT_READ, EV_DELETE);
278     }
279 
280     if (ev->write != NXT_EVENT_INACTIVE) {
281         ev->write = NXT_EVENT_INACTIVE;
282         nxt_kqueue_fd_set(engine, ev, EVFILT_WRITE, EV_DELETE);
283     }
284 }
285 
286 
287 /*
288  * kqueue(2):
289  *
290  *   Calling close() on a file descriptor will remove any kevents that
291  *   reference the descriptor.
292  *
293  * So nxt_kqueue_close() returns true only if there are pending events.
294  */
295 
296 static nxt_bool_t
nxt_kqueue_close(nxt_event_engine_t * engine,nxt_fd_event_t * ev)297 nxt_kqueue_close(nxt_event_engine_t *engine, nxt_fd_event_t *ev)
298 {
299     struct kevent  *kev, *end;
300 
301     ev->read = NXT_EVENT_INACTIVE;
302     ev->write = NXT_EVENT_INACTIVE;
303 
304     end = &engine->u.kqueue.changes[engine->u.kqueue.nchanges];
305 
306     for (kev = engine->u.kqueue.changes; kev < end; kev++) {
307         if (kev->ident == (uintptr_t) ev->fd) {
308             return 1;
309         }
310     }
311 
312     return 0;
313 }
314 
315 
316 /*
317  * The kqueue event engine uses only three states: inactive, blocked, and
318  * active.  An active oneshot event is marked as it is in the default
319  * state.  The event will be converted eventually to the default EV_CLEAR
320  * mode after it will become inactive after delivery.
321  */
322 
323 static void
nxt_kqueue_enable_read(nxt_event_engine_t * engine,nxt_fd_event_t * ev)324 nxt_kqueue_enable_read(nxt_event_engine_t *engine, nxt_fd_event_t *ev)
325 {
326     if (ev->read == NXT_EVENT_INACTIVE) {
327         nxt_kqueue_fd_set(engine, ev, EVFILT_READ,
328                           EV_ADD | EV_ENABLE | EV_CLEAR);
329     }
330 
331     ev->read = NXT_EVENT_ACTIVE;
332 }
333 
334 
335 static void
nxt_kqueue_enable_write(nxt_event_engine_t * engine,nxt_fd_event_t * ev)336 nxt_kqueue_enable_write(nxt_event_engine_t *engine, nxt_fd_event_t *ev)
337 {
338     if (ev->write == NXT_EVENT_INACTIVE) {
339         nxt_kqueue_fd_set(engine, ev, EVFILT_WRITE,
340                           EV_ADD | EV_ENABLE | EV_CLEAR);
341     }
342 
343     ev->write = NXT_EVENT_ACTIVE;
344 }
345 
346 
347 static void
nxt_kqueue_disable_read(nxt_event_engine_t * engine,nxt_fd_event_t * ev)348 nxt_kqueue_disable_read(nxt_event_engine_t *engine, nxt_fd_event_t *ev)
349 {
350     ev->read = NXT_EVENT_INACTIVE;
351 
352     nxt_kqueue_fd_set(engine, ev, EVFILT_READ, EV_DISABLE);
353 }
354 
355 
356 static void
nxt_kqueue_disable_write(nxt_event_engine_t * engine,nxt_fd_event_t * ev)357 nxt_kqueue_disable_write(nxt_event_engine_t *engine, nxt_fd_event_t *ev)
358 {
359     ev->write = NXT_EVENT_INACTIVE;
360 
361     nxt_kqueue_fd_set(engine, ev, EVFILT_WRITE, EV_DISABLE);
362 }
363 
364 
365 static void
nxt_kqueue_block_read(nxt_event_engine_t * engine,nxt_fd_event_t * ev)366 nxt_kqueue_block_read(nxt_event_engine_t *engine, nxt_fd_event_t *ev)
367 {
368     if (ev->read != NXT_EVENT_INACTIVE) {
369         ev->read = NXT_EVENT_BLOCKED;
370     }
371 }
372 
373 
374 static void
nxt_kqueue_block_write(nxt_event_engine_t * engine,nxt_fd_event_t * ev)375 nxt_kqueue_block_write(nxt_event_engine_t *engine, nxt_fd_event_t *ev)
376 {
377     if (ev->write != NXT_EVENT_INACTIVE) {
378         ev->write = NXT_EVENT_BLOCKED;
379     }
380 }
381 
382 
383 static void
nxt_kqueue_oneshot_read(nxt_event_engine_t * engine,nxt_fd_event_t * ev)384 nxt_kqueue_oneshot_read(nxt_event_engine_t *engine, nxt_fd_event_t *ev)
385 {
386     ev->write = NXT_EVENT_ACTIVE;
387 
388     nxt_kqueue_fd_set(engine, ev, EVFILT_WRITE,
389                       EV_ADD | EV_ENABLE | NXT_KEVENT_ONESHOT);
390 }
391 
392 
393 static void
nxt_kqueue_oneshot_write(nxt_event_engine_t * engine,nxt_fd_event_t * ev)394 nxt_kqueue_oneshot_write(nxt_event_engine_t *engine, nxt_fd_event_t *ev)
395 {
396     ev->write = NXT_EVENT_ACTIVE;
397 
398     nxt_kqueue_fd_set(engine, ev, EVFILT_WRITE,
399                       EV_ADD | EV_ENABLE | NXT_KEVENT_ONESHOT);
400 }
401 
402 
403 static void
nxt_kqueue_enable_accept(nxt_event_engine_t * engine,nxt_fd_event_t * ev)404 nxt_kqueue_enable_accept(nxt_event_engine_t *engine, nxt_fd_event_t *ev)
405 {
406     ev->read = NXT_EVENT_ACTIVE;
407     ev->read_handler = nxt_kqueue_listen_handler;
408 
409     nxt_kqueue_fd_set(engine, ev, EVFILT_READ, EV_ADD | EV_ENABLE);
410 }
411 
412 
413 static void
nxt_kqueue_enable_file(nxt_event_engine_t * engine,nxt_file_event_t * ev)414 nxt_kqueue_enable_file(nxt_event_engine_t *engine, nxt_file_event_t *ev)
415 {
416     struct kevent  *kev;
417 
418     const nxt_int_t   flags = EV_ADD | EV_ENABLE | EV_ONESHOT;
419     const nxt_uint_t  fflags = NOTE_DELETE | NOTE_WRITE | NOTE_EXTEND
420                                | NOTE_ATTRIB | NOTE_RENAME | NOTE_REVOKE;
421 
422     nxt_debug(&engine->task, "kevent(%d) set: id:%d ft:%i fl:%04Xd, ff:%04XuD",
423               engine->u.kqueue.fd, ev->file->fd, EVFILT_VNODE, flags, fflags);
424 
425     kev = nxt_kqueue_get_kevent(engine);
426 
427     kev->ident = ev->file->fd;
428     kev->filter = EVFILT_VNODE;
429     kev->flags = flags;
430     kev->fflags = fflags;
431     kev->data = 0;
432     kev->udata = nxt_kevent_set_udata(ev);
433 }
434 
435 
436 static void
nxt_kqueue_close_file(nxt_event_engine_t * engine,nxt_file_event_t * ev)437 nxt_kqueue_close_file(nxt_event_engine_t *engine, nxt_file_event_t *ev)
438 {
439     /* TODO: pending event. */
440 }
441 
442 
443 static void
nxt_kqueue_fd_set(nxt_event_engine_t * engine,nxt_fd_event_t * ev,nxt_int_t filter,nxt_uint_t flags)444 nxt_kqueue_fd_set(nxt_event_engine_t *engine, nxt_fd_event_t *ev,
445     nxt_int_t filter, nxt_uint_t flags)
446 {
447     struct kevent  *kev;
448 
449     nxt_debug(ev->task, "kevent(%d) set event: id:%d ft:%i fl:%04Xui",
450               engine->u.kqueue.fd, ev->fd, filter, flags);
451 
452     kev = nxt_kqueue_get_kevent(engine);
453 
454     kev->ident = ev->fd;
455     kev->filter = filter;
456     kev->flags = flags;
457     kev->fflags = 0;
458     kev->data = 0;
459     kev->udata = nxt_kevent_set_udata(ev);
460 }
461 
462 
463 static struct kevent *
nxt_kqueue_get_kevent(nxt_event_engine_t * engine)464 nxt_kqueue_get_kevent(nxt_event_engine_t *engine)
465 {
466     int  ret, nchanges;
467 
468     nchanges = engine->u.kqueue.nchanges;
469 
470     if (nxt_slow_path(nchanges >= engine->u.kqueue.mchanges)) {
471 
472         nxt_debug(&engine->task, "kevent(%d) changes:%d",
473                   engine->u.kqueue.fd, nchanges);
474 
475         ret = kevent(engine->u.kqueue.fd, engine->u.kqueue.changes, nchanges,
476                      NULL, 0, NULL);
477 
478         if (nxt_slow_path(ret != 0)) {
479             nxt_alert(&engine->task, "kevent(%d) failed %E",
480                       engine->u.kqueue.fd, nxt_errno);
481 
482             nxt_kqueue_error(engine);
483         }
484 
485         engine->u.kqueue.nchanges = 0;
486     }
487 
488     return &engine->u.kqueue.changes[engine->u.kqueue.nchanges++];
489 }
490 
491 
492 static void
nxt_kqueue_error(nxt_event_engine_t * engine)493 nxt_kqueue_error(nxt_event_engine_t *engine)
494 {
495     struct kevent     *kev, *end;
496     nxt_fd_event_t    *ev;
497     nxt_file_event_t  *fev;
498     nxt_work_queue_t  *wq;
499 
500     wq = &engine->fast_work_queue;
501     end = &engine->u.kqueue.changes[engine->u.kqueue.nchanges];
502 
503     for (kev = engine->u.kqueue.changes; kev < end; kev++) {
504 
505         switch (kev->filter) {
506 
507         case EVFILT_READ:
508         case EVFILT_WRITE:
509             ev = nxt_kevent_get_udata(kev->udata);
510             nxt_work_queue_add(wq, nxt_kqueue_fd_error_handler,
511                                ev->task, ev, ev->data);
512             break;
513 
514         case EVFILT_VNODE:
515             fev = nxt_kevent_get_udata(kev->udata);
516             nxt_work_queue_add(wq, nxt_kqueue_file_error_handler,
517                                fev->task, fev, fev->data);
518             break;
519         }
520     }
521 }
522 
523 
524 static void
nxt_kqueue_fd_error_handler(nxt_task_t * task,void * obj,void * data)525 nxt_kqueue_fd_error_handler(nxt_task_t *task, void *obj, void *data)
526 {
527     nxt_fd_event_t  *ev;
528 
529     ev = obj;
530 
531     nxt_debug(task, "kqueue fd error handler fd:%d", ev->fd);
532 
533     if (ev->kq_eof && ev->kq_errno != 0) {
534         ev->error = ev->kq_errno;
535         nxt_log(task, nxt_socket_error_level(ev->kq_errno),
536                 "kevent() reported error on descriptor %d %E",
537                 ev->fd, ev->kq_errno);
538     }
539 
540     ev->read = NXT_EVENT_INACTIVE;
541     ev->write = NXT_EVENT_INACTIVE;
542     ev->error = ev->kq_errno;
543 
544     ev->error_handler(task, ev, data);
545 }
546 
547 
548 static void
nxt_kqueue_file_error_handler(nxt_task_t * task,void * obj,void * data)549 nxt_kqueue_file_error_handler(nxt_task_t *task, void *obj, void *data)
550 {
551     nxt_file_event_t  *ev;
552 
553     ev = obj;
554 
555     nxt_debug(task, "kqueue file error handler fd:%d", ev->file->fd);
556 
557     ev->handler(task, ev, data);
558 }
559 
560 
561 static nxt_int_t
nxt_kqueue_add_signal(nxt_event_engine_t * engine,const nxt_sig_event_t * sigev)562 nxt_kqueue_add_signal(nxt_event_engine_t *engine, const nxt_sig_event_t *sigev)
563 {
564     int               signo;
565     struct kevent     kev;
566     struct sigaction  sa;
567 
568     signo = sigev->signo;
569 
570     nxt_memzero(&sa, sizeof(struct sigaction));
571     sigemptyset(&sa.sa_mask);
572 
573     /*
574      * SIGCHLD must not be set to SIG_IGN, since kqueue cannot catch
575      * this signal.  It should be set to SIG_DFL instead.  And although
576      * SIGCHLD default action is also ignoring, nevertheless SIG_DFL
577      * allows kqueue to catch the signal.
578      */
579     sa.sa_handler = (signo == SIGCHLD) ? SIG_DFL : SIG_IGN;
580 
581     if (sigaction(signo, &sa, NULL) != 0) {
582         nxt_alert(&engine->task, "sigaction(%d) failed %E", signo, nxt_errno);
583 
584         return NXT_ERROR;
585     }
586 
587     nxt_debug(&engine->task, "kevent(%d) signo:%d (%s)",
588               engine->u.kqueue.fd, signo, sigev->name);
589 
590     kev.ident = signo;
591     kev.filter = EVFILT_SIGNAL;
592     kev.flags = EV_ADD;
593     kev.fflags = 0;
594     kev.data = 0;
595     kev.udata = nxt_kevent_set_udata(sigev);
596 
597     if (kevent(engine->u.kqueue.fd, &kev, 1, NULL, 0, NULL) == 0) {
598         return NXT_OK;
599     }
600 
601     nxt_alert(&engine->task, "kevent(%d) failed %E", kqueue, nxt_errno);
602 
603     return NXT_ERROR;
604 }
605 
606 
607 #if (NXT_HAVE_EVFILT_USER)
608 
609 static nxt_int_t
nxt_kqueue_enable_post(nxt_event_engine_t * engine,nxt_work_handler_t handler)610 nxt_kqueue_enable_post(nxt_event_engine_t *engine, nxt_work_handler_t handler)
611 {
612     struct kevent  kev;
613 
614     /* EVFILT_USER must be added to a kqueue before it can be triggered. */
615 
616     kev.ident = 0;
617     kev.filter = EVFILT_USER;
618     kev.flags = EV_ADD | EV_CLEAR;
619     kev.fflags = 0;
620     kev.data = 0;
621     kev.udata = NULL;
622 
623     engine->u.kqueue.post_handler = handler;
624 
625     if (kevent(engine->u.kqueue.fd, &kev, 1, NULL, 0, NULL) == 0) {
626         return NXT_OK;
627     }
628 
629     nxt_alert(&engine->task, "kevent(%d) failed %E",
630               engine->u.kqueue.fd, nxt_errno);
631 
632     return NXT_ERROR;
633 }
634 
635 
636 static void
nxt_kqueue_signal(nxt_event_engine_t * engine,nxt_uint_t signo)637 nxt_kqueue_signal(nxt_event_engine_t *engine, nxt_uint_t signo)
638 {
639     struct kevent  kev;
640 
641     /*
642      * kqueue has a builtin signal processing support, so the function
643      * is used only to post events and the signo argument is ignored.
644      */
645 
646     kev.ident = 0;
647     kev.filter = EVFILT_USER;
648     kev.flags = 0;
649     kev.fflags = NOTE_TRIGGER;
650     kev.data = 0;
651     kev.udata = NULL;
652 
653     if (kevent(engine->u.kqueue.fd, &kev, 1, NULL, 0, NULL) != 0) {
654         nxt_alert(&engine->task, "kevent(%d) failed %E",
655                   engine->u.kqueue.fd, nxt_errno);
656     }
657 }
658 
659 #endif
660 
661 
662 static void
nxt_kqueue_poll(nxt_event_engine_t * engine,nxt_msec_t timeout)663 nxt_kqueue_poll(nxt_event_engine_t *engine, nxt_msec_t timeout)
664 {
665     int                 nevents;
666     void                *obj, *data;
667     nxt_int_t           i;
668     nxt_err_t           err;
669     nxt_uint_t          level;
670     nxt_bool_t          error, eof;
671     nxt_task_t          *task;
672     struct kevent       *kev;
673     nxt_fd_event_t      *ev;
674     nxt_sig_event_t     *sigev;
675     struct timespec     ts, *tp;
676     nxt_file_event_t    *fev;
677     nxt_work_queue_t    *wq;
678     nxt_work_handler_t  handler;
679 
680     if (timeout == NXT_INFINITE_MSEC) {
681         tp = NULL;
682 
683     } else {
684         ts.tv_sec = timeout / 1000;
685         ts.tv_nsec = (timeout % 1000) * 1000000;
686         tp = &ts;
687     }
688 
689     nxt_debug(&engine->task, "kevent(%d) changes:%d timeout:%M",
690               engine->u.kqueue.fd, engine->u.kqueue.nchanges, timeout);
691 
692     nevents = kevent(engine->u.kqueue.fd,
693                      engine->u.kqueue.changes, engine->u.kqueue.nchanges,
694                      engine->u.kqueue.events, engine->u.kqueue.mevents, tp);
695 
696     err = (nevents == -1) ? nxt_errno : 0;
697 
698     nxt_thread_time_update(engine->task.thread);
699 
700     nxt_debug(&engine->task, "kevent(%d): %d", engine->u.kqueue.fd, nevents);
701 
702     if (nevents == -1) {
703         level = (err == NXT_EINTR) ? NXT_LOG_INFO : NXT_LOG_ALERT;
704 
705         nxt_log(&engine->task, level, "kevent(%d) failed %E",
706                 engine->u.kqueue.fd, err);
707 
708         if (err != NXT_EINTR) {
709             nxt_kqueue_error(engine);
710         }
711 
712         return;
713     }
714 
715     engine->u.kqueue.nchanges = 0;
716 
717     for (i = 0; i < nevents; i++) {
718 
719         kev = &engine->u.kqueue.events[i];
720 
721         nxt_debug(&engine->task,
722                   (kev->ident > 0x8000000 && kev->ident != (uintptr_t) -1) ?
723                       "kevent: id:%p ft:%d fl:%04Xd ff:%d d:%d ud:%p":
724                       "kevent: id:%d ft:%d fl:%04Xd ff:%d d:%d ud:%p",
725                   kev->ident, kev->filter, kev->flags, kev->fflags,
726                   kev->data, kev->udata);
727 
728         error = (kev->flags & EV_ERROR);
729 
730         if (nxt_slow_path(error)) {
731             nxt_alert(&engine->task,
732                       "kevent(%d) error %E on ident:%d filter:%d",
733                       engine->u.kqueue.fd, kev->data, kev->ident, kev->filter);
734         }
735 
736         task = &engine->task;
737         wq = &engine->fast_work_queue;
738         handler = nxt_kqueue_fd_error_handler;
739         obj = nxt_kevent_get_udata(kev->udata);
740 
741         switch (kev->filter) {
742 
743         case EVFILT_READ:
744             ev = obj;
745             ev->read_ready = 1;
746             ev->kq_available = (int32_t) kev->data;
747             err = kev->fflags;
748             eof = (kev->flags & EV_EOF) != 0;
749             ev->kq_errno = err;
750             ev->kq_eof |= eof;
751 
752             if (ev->read <= NXT_EVENT_BLOCKED) {
753                 nxt_debug(ev->task, "blocked read event fd:%d", ev->fd);
754                 continue;
755             }
756 
757             if ((kev->flags & NXT_KEVENT_ONESHOT) != 0) {
758                 ev->read = NXT_EVENT_INACTIVE;
759             }
760 
761             if (nxt_slow_path(ev->kq_available == 0 && eof && err != 0)) {
762                 error = 1;
763             }
764 
765             if (nxt_fast_path(!error)) {
766                 handler = ev->read_handler;
767                 wq = ev->read_work_queue;
768             }
769 
770             task = ev->task;
771             data = ev->data;
772 
773             break;
774 
775         case EVFILT_WRITE:
776             ev = obj;
777             ev->write_ready = 1;
778             err = kev->fflags;
779             eof = (kev->flags & EV_EOF) != 0;
780             ev->kq_errno = err;
781             ev->kq_eof |= eof;
782 
783             if (ev->write <= NXT_EVENT_BLOCKED) {
784                 nxt_debug(ev->task, "blocked write event fd:%d", ev->fd);
785                 continue;
786             }
787 
788             if ((kev->flags & NXT_KEVENT_ONESHOT) != 0) {
789                 ev->write = NXT_EVENT_INACTIVE;
790             }
791 
792             if (nxt_slow_path(eof && err != 0)) {
793                 error = 1;
794             }
795 
796             if (nxt_fast_path(!error)) {
797                 handler = ev->write_handler;
798                 wq = ev->write_work_queue;
799             }
800 
801             task = ev->task;
802             data = ev->data;
803 
804             break;
805 
806         case EVFILT_VNODE:
807             fev = obj;
808             handler = fev->handler;
809             task = fev->task;
810             data = fev->data;
811             break;
812 
813         case EVFILT_SIGNAL:
814             sigev = obj;
815             obj = (void *) kev->ident;
816             handler = sigev->handler;
817             data = (void *) sigev->name;
818             break;
819 
820 #if (NXT_HAVE_EVFILT_USER)
821 
822         case EVFILT_USER:
823             handler = engine->u.kqueue.post_handler;
824             data = NULL;
825             break;
826 
827 #endif
828 
829         default:
830 
831 #if (NXT_DEBUG)
832             nxt_alert(&engine->task,
833                       "unexpected kevent(%d) filter %d on ident %d",
834                       engine->u.kqueue.fd, kev->filter, kev->ident);
835 #endif
836 
837             continue;
838         }
839 
840         nxt_work_queue_add(wq, handler, task, obj, data);
841     }
842 }
843 
844 
845 /*
846  * nxt_kqueue_event_conn_io_connect() eliminates the
847  * getsockopt() syscall to test pending connect() error.
848  */
849 
850 static void
nxt_kqueue_conn_io_connect(nxt_task_t * task,void * obj,void * data)851 nxt_kqueue_conn_io_connect(nxt_task_t *task, void *obj, void *data)
852 {
853     nxt_conn_t                    *c;
854     nxt_event_engine_t            *engine;
855     nxt_work_handler_t            handler;
856     const nxt_event_conn_state_t  *state;
857 
858     c = obj;
859 
860     state = c->write_state;
861 
862     switch (nxt_socket_connect(task, c->socket.fd, c->remote)) {
863 
864     case NXT_OK:
865         c->socket.write_ready = 1;
866         handler = state->ready_handler;
867         break;
868 
869     case NXT_AGAIN:
870         c->socket.write_handler = nxt_kqueue_conn_connected;
871         c->socket.error_handler = nxt_conn_connect_error;
872 
873         engine = task->thread->engine;
874         nxt_conn_timer(engine, c, state, &c->write_timer);
875 
876         nxt_kqueue_enable_write(engine, &c->socket);
877         return;
878 
879     case NXT_DECLINED:
880         handler = state->close_handler;
881         break;
882 
883     default: /* NXT_ERROR */
884         handler = state->error_handler;
885         break;
886     }
887 
888     nxt_work_queue_add(c->write_work_queue, handler, task, c, data);
889 }
890 
891 
892 static void
nxt_kqueue_conn_connected(nxt_task_t * task,void * obj,void * data)893 nxt_kqueue_conn_connected(nxt_task_t *task, void *obj, void *data)
894 {
895     nxt_conn_t  *c;
896 
897     c = obj;
898 
899     nxt_debug(task, "kqueue conn connected fd:%d", c->socket.fd);
900 
901     c->socket.write = NXT_EVENT_BLOCKED;
902 
903     if (c->write_state->timer_autoreset) {
904         nxt_timer_disable(task->thread->engine, &c->write_timer);
905     }
906 
907     nxt_work_queue_add(c->write_work_queue, c->write_state->ready_handler,
908                        task, c, data);
909 }
910 
911 
912 static void
nxt_kqueue_listen_handler(nxt_task_t * task,void * obj,void * data)913 nxt_kqueue_listen_handler(nxt_task_t *task, void *obj, void *data)
914 {
915     nxt_listen_event_t  *lev;
916 
917     lev = obj;
918 
919     nxt_debug(task, "kevent fd:%d avail:%D",
920               lev->socket.fd, lev->socket.kq_available);
921 
922     lev->ready = nxt_min(lev->batch, (uint32_t) lev->socket.kq_available);
923 
924     nxt_kqueue_conn_io_accept(task, lev, data);
925 }
926 
927 
928 static void
nxt_kqueue_conn_io_accept(nxt_task_t * task,void * obj,void * data)929 nxt_kqueue_conn_io_accept(nxt_task_t *task, void *obj, void *data)
930 {
931     socklen_t           socklen;
932     nxt_conn_t          *c;
933     nxt_socket_t        s;
934     struct sockaddr     *sa;
935     nxt_listen_event_t  *lev;
936 
937     lev = obj;
938     c = lev->next;
939 
940     lev->ready--;
941     lev->socket.read_ready = (lev->ready != 0);
942 
943     lev->socket.kq_available--;
944     lev->socket.read_ready = (lev->socket.kq_available != 0);
945 
946     sa = &c->remote->u.sockaddr;
947     socklen = c->remote->socklen;
948     /*
949      * The returned socklen is ignored here,
950      * see comment in nxt_conn_io_accept().
951      */
952     s = accept(lev->socket.fd, sa, &socklen);
953 
954     if (s != -1) {
955         c->socket.fd = s;
956 
957         nxt_debug(task, "accept(%d): %d", lev->socket.fd, s);
958 
959         nxt_conn_accept(task, lev, c);
960         return;
961     }
962 
963     nxt_conn_accept_error(task, lev, "accept", nxt_errno);
964 }
965 
966 
967 /*
968  * nxt_kqueue_conn_io_read() is just a wrapper to eliminate the
969  * readv() or recv() syscall if a remote side just closed connection.
970  */
971 
972 static void
nxt_kqueue_conn_io_read(nxt_task_t * task,void * obj,void * data)973 nxt_kqueue_conn_io_read(nxt_task_t *task, void *obj, void *data)
974 {
975     nxt_conn_t  *c;
976 
977     c = obj;
978 
979     nxt_debug(task, "kqueue conn read fd:%d", c->socket.fd);
980 
981     if (c->socket.kq_available == 0 && c->socket.kq_eof) {
982         nxt_debug(task, "kevent fd:%d eof", c->socket.fd);
983 
984         c->socket.closed = 1;
985         nxt_work_queue_add(c->read_work_queue, c->read_state->close_handler,
986                            task, c, data);
987         return;
988     }
989 
990     nxt_conn_io_read(task, c, data);
991 }
992 
993 
994 /*
995  * nxt_kqueue_conn_io_recvbuf() is just wrapper around standard
996  * nxt_conn_io_recvbuf() to eliminate the readv() or recv() syscalls
997  * if there is no pending data or a remote side closed connection.
998  */
999 
1000 static ssize_t
nxt_kqueue_conn_io_recvbuf(nxt_conn_t * c,nxt_buf_t * b)1001 nxt_kqueue_conn_io_recvbuf(nxt_conn_t *c, nxt_buf_t *b)
1002 {
1003     ssize_t  n;
1004 
1005     if (c->socket.kq_available == 0 && c->socket.kq_eof) {
1006         c->socket.closed = 1;
1007         return 0;
1008     }
1009 
1010     n = nxt_conn_io_recvbuf(c, b);
1011 
1012     if (n > 0) {
1013         c->socket.kq_available -= n;
1014 
1015         if (c->socket.kq_available < 0) {
1016             c->socket.kq_available = 0;
1017         }
1018 
1019         nxt_debug(c->socket.task, "kevent fd:%d avail:%D eof:%d",
1020                   c->socket.fd, c->socket.kq_available, c->socket.kq_eof);
1021 
1022         c->socket.read_ready = (c->socket.kq_available != 0
1023                                 || c->socket.kq_eof);
1024     }
1025 
1026     return n;
1027 }
1028