1
2 /*
3 * Copyright (C) Igor Sysoev
4 * Copyright (C) NGINX, Inc.
5 */
6
7 #include <nxt_main.h>
8
9
10 /*
11 * kqueue() has been introduced in FreeBSD 4.1 and then was ported
12 * to OpenBSD 2.9, MacOSX 10.3 (Panther), and NetBSD 2.0.
13 * DragonFlyBSD inherited it with FreeBSD 4 code base.
14 *
15 * NOTE_REVOKE has been introduced in FreeBSD 4.3 and then was ported
16 * to OpenBSD 2.9, MacOSX 10.3 (Panther), and NetBSD 2.0.
17 * DragonFlyBSD inherited it with FreeBSD 4 code base.
18 *
19 * EVFILT_TIMER has been introduced in FreeBSD 4.4-STABLE and then was
20 * ported to NetBSD 2.0, MacOSX 10.4 (Tiger), and OpenBSD 4.2.
21 * DragonFlyBSD inherited it with FreeBSD 4 code base.
22 *
23 * EVFILT_USER and EV_DISPATCH have been introduced in MacOSX 10.6 (Snow
24 * Leopard) as part of the Grand Central Dispatch framework
25 * and then were ported to FreeBSD 8.0-STABLE as part of the
26 * libdispatch support.
27 */
28
29
30 /*
31 * EV_DISPATCH is better because it just disables an event on delivery
32 * whilst EV_ONESHOT deletes the event. This eliminates in-kernel memory
33 * deallocation and probable subsequent allocation with a lock acquiring.
34 */
35 #ifdef EV_DISPATCH
36 #define NXT_KEVENT_ONESHOT EV_DISPATCH
37 #else
38 #define NXT_KEVENT_ONESHOT EV_ONESHOT
39 #endif
40
41
42 #if (NXT_NETBSD)
43 /* NetBSD defines the kevent.udata field as intptr_t. */
44
45 #define nxt_kevent_set_udata(udata) (intptr_t) (udata)
46 #define nxt_kevent_get_udata(udata) (void *) (udata)
47
48 #else
49 #define nxt_kevent_set_udata(udata) (void *) (udata)
50 #define nxt_kevent_get_udata(udata) (udata)
51 #endif
52
53
54 static nxt_int_t nxt_kqueue_create(nxt_event_engine_t *engine,
55 nxt_uint_t mchanges, nxt_uint_t mevents);
56 static void nxt_kqueue_free(nxt_event_engine_t *engine);
57 static void nxt_kqueue_enable(nxt_event_engine_t *engine, nxt_fd_event_t *ev);
58 static void nxt_kqueue_disable(nxt_event_engine_t *engine, nxt_fd_event_t *ev);
59 static void nxt_kqueue_delete(nxt_event_engine_t *engine, nxt_fd_event_t *ev);
60 static nxt_bool_t nxt_kqueue_close(nxt_event_engine_t *engine,
61 nxt_fd_event_t *ev);
62 static void nxt_kqueue_enable_read(nxt_event_engine_t *engine,
63 nxt_fd_event_t *ev);
64 static void nxt_kqueue_enable_write(nxt_event_engine_t *engine,
65 nxt_fd_event_t *ev);
66 static void nxt_kqueue_disable_read(nxt_event_engine_t *engine,
67 nxt_fd_event_t *ev);
68 static void nxt_kqueue_disable_write(nxt_event_engine_t *engine,
69 nxt_fd_event_t *ev);
70 static void nxt_kqueue_block_read(nxt_event_engine_t *engine,
71 nxt_fd_event_t *ev);
72 static void nxt_kqueue_block_write(nxt_event_engine_t *engine,
73 nxt_fd_event_t *ev);
74 static void nxt_kqueue_oneshot_read(nxt_event_engine_t *engine,
75 nxt_fd_event_t *ev);
76 static void nxt_kqueue_oneshot_write(nxt_event_engine_t *engine,
77 nxt_fd_event_t *ev);
78 static void nxt_kqueue_enable_accept(nxt_event_engine_t *engine,
79 nxt_fd_event_t *ev);
80 static void nxt_kqueue_enable_file(nxt_event_engine_t *engine,
81 nxt_file_event_t *ev);
82 static void nxt_kqueue_close_file(nxt_event_engine_t *engine,
83 nxt_file_event_t *ev);
84 static void nxt_kqueue_fd_set(nxt_event_engine_t *engine, nxt_fd_event_t *ev,
85 nxt_int_t filter, nxt_uint_t flags);
86 static struct kevent *nxt_kqueue_get_kevent(nxt_event_engine_t *engine);
87 static void nxt_kqueue_error(nxt_event_engine_t *engine);
88 static void nxt_kqueue_fd_error_handler(nxt_task_t *task, void *obj,
89 void *data);
90 static void nxt_kqueue_file_error_handler(nxt_task_t *task, void *obj,
91 void *data);
92 static nxt_int_t nxt_kqueue_add_signal(nxt_event_engine_t *engine,
93 const nxt_sig_event_t *sigev);
94 #if (NXT_HAVE_EVFILT_USER)
95 static nxt_int_t nxt_kqueue_enable_post(nxt_event_engine_t *engine,
96 nxt_work_handler_t handler);
97 static void nxt_kqueue_signal(nxt_event_engine_t *engine, nxt_uint_t signo);
98 #endif
99 static void nxt_kqueue_poll(nxt_event_engine_t *engine, nxt_msec_t timeout);
100
101 static void nxt_kqueue_conn_io_connect(nxt_task_t *task, void *obj,
102 void *data);
103 static void nxt_kqueue_conn_connected(nxt_task_t *task, void *obj,
104 void *data);
105 static void nxt_kqueue_listen_handler(nxt_task_t *task, void *obj, void *data);
106 static void nxt_kqueue_conn_io_accept(nxt_task_t *task, void *obj,
107 void *data);
108 static void nxt_kqueue_conn_io_read(nxt_task_t *task, void *obj,
109 void *data);
110 static ssize_t nxt_kqueue_conn_io_recvbuf(nxt_conn_t *c, nxt_buf_t *b);
111
112
113 static nxt_conn_io_t nxt_kqueue_conn_io = {
114 .connect = nxt_kqueue_conn_io_connect,
115 .accept = nxt_kqueue_conn_io_accept,
116
117 .read = nxt_kqueue_conn_io_read,
118 .recvbuf = nxt_kqueue_conn_io_recvbuf,
119 .recv = nxt_conn_io_recv,
120
121 .write = nxt_conn_io_write,
122 .sendbuf = nxt_conn_io_sendbuf,
123
124 #if (NXT_HAVE_FREEBSD_SENDFILE)
125 .old_sendbuf = nxt_freebsd_event_conn_io_sendfile,
126 #elif (NXT_HAVE_MACOSX_SENDFILE)
127 .old_sendbuf = nxt_macosx_event_conn_io_sendfile,
128 #else
129 .old_sendbuf = nxt_event_conn_io_sendbuf,
130 #endif
131
132 .writev = nxt_event_conn_io_writev,
133 .send = nxt_event_conn_io_send,
134 };
135
136
137 const nxt_event_interface_t nxt_kqueue_engine = {
138 "kqueue",
139 nxt_kqueue_create,
140 nxt_kqueue_free,
141 nxt_kqueue_enable,
142 nxt_kqueue_disable,
143 nxt_kqueue_delete,
144 nxt_kqueue_close,
145 nxt_kqueue_enable_read,
146 nxt_kqueue_enable_write,
147 nxt_kqueue_disable_read,
148 nxt_kqueue_disable_write,
149 nxt_kqueue_block_read,
150 nxt_kqueue_block_write,
151 nxt_kqueue_oneshot_read,
152 nxt_kqueue_oneshot_write,
153 nxt_kqueue_enable_accept,
154 nxt_kqueue_enable_file,
155 nxt_kqueue_close_file,
156 #if (NXT_HAVE_EVFILT_USER)
157 nxt_kqueue_enable_post,
158 nxt_kqueue_signal,
159 #else
160 NULL,
161 NULL,
162 #endif
163 nxt_kqueue_poll,
164
165 &nxt_kqueue_conn_io,
166
167 NXT_FILE_EVENTS,
168 NXT_SIGNAL_EVENTS,
169 };
170
171
172 static nxt_int_t
nxt_kqueue_create(nxt_event_engine_t * engine,nxt_uint_t mchanges,nxt_uint_t mevents)173 nxt_kqueue_create(nxt_event_engine_t *engine, nxt_uint_t mchanges,
174 nxt_uint_t mevents)
175 {
176 const nxt_sig_event_t *sigev;
177
178 engine->u.kqueue.fd = -1;
179 engine->u.kqueue.mchanges = mchanges;
180 engine->u.kqueue.mevents = mevents;
181 engine->u.kqueue.pid = nxt_pid;
182
183 engine->u.kqueue.changes = nxt_malloc(sizeof(struct kevent) * mchanges);
184 if (engine->u.kqueue.changes == NULL) {
185 goto fail;
186 }
187
188 engine->u.kqueue.events = nxt_malloc(sizeof(struct kevent) * mevents);
189 if (engine->u.kqueue.events == NULL) {
190 goto fail;
191 }
192
193 engine->u.kqueue.fd = kqueue();
194 if (engine->u.kqueue.fd == -1) {
195 nxt_alert(&engine->task, "kqueue() failed %E", nxt_errno);
196 goto fail;
197 }
198
199 nxt_debug(&engine->task, "kqueue(): %d", engine->u.kqueue.fd);
200
201 if (engine->signals != NULL) {
202 for (sigev = engine->signals->sigev; sigev->signo != 0; sigev++) {
203 if (nxt_kqueue_add_signal(engine, sigev) != NXT_OK) {
204 goto fail;
205 }
206 }
207 }
208
209 return NXT_OK;
210
211 fail:
212
213 nxt_kqueue_free(engine);
214
215 return NXT_ERROR;
216 }
217
218
219 static void
nxt_kqueue_free(nxt_event_engine_t * engine)220 nxt_kqueue_free(nxt_event_engine_t *engine)
221 {
222 nxt_fd_t fd;
223
224 fd = engine->u.kqueue.fd;
225
226 nxt_debug(&engine->task, "kqueue %d free", fd);
227
228 if (fd != -1 && engine->u.kqueue.pid == nxt_pid) {
229 /* kqueue is not inherited by fork() */
230
231 if (close(fd) != 0) {
232 nxt_alert(&engine->task, "kqueue close(%d) failed %E",
233 fd, nxt_errno);
234 }
235 }
236
237 nxt_free(engine->u.kqueue.events);
238 nxt_free(engine->u.kqueue.changes);
239
240 nxt_memzero(&engine->u.kqueue, sizeof(nxt_kqueue_engine_t));
241 }
242
243
244 static void
nxt_kqueue_enable(nxt_event_engine_t * engine,nxt_fd_event_t * ev)245 nxt_kqueue_enable(nxt_event_engine_t *engine, nxt_fd_event_t *ev)
246 {
247 nxt_kqueue_enable_read(engine, ev);
248 nxt_kqueue_enable_write(engine, ev);
249 }
250
251
252 /*
253 * EV_DISABLE is better because it eliminates in-kernel memory
254 * deallocation and probable subsequent allocation with a lock acquiring.
255 */
256
257 static void
nxt_kqueue_disable(nxt_event_engine_t * engine,nxt_fd_event_t * ev)258 nxt_kqueue_disable(nxt_event_engine_t *engine, nxt_fd_event_t *ev)
259 {
260 if (ev->read != NXT_EVENT_INACTIVE) {
261 ev->read = NXT_EVENT_INACTIVE;
262 nxt_kqueue_fd_set(engine, ev, EVFILT_READ, EV_DISABLE);
263 }
264
265 if (ev->write != NXT_EVENT_INACTIVE) {
266 ev->write = NXT_EVENT_INACTIVE;
267 nxt_kqueue_fd_set(engine, ev, EVFILT_WRITE, EV_DISABLE);
268 }
269 }
270
271
272 static void
nxt_kqueue_delete(nxt_event_engine_t * engine,nxt_fd_event_t * ev)273 nxt_kqueue_delete(nxt_event_engine_t *engine, nxt_fd_event_t *ev)
274 {
275 if (ev->read != NXT_EVENT_INACTIVE) {
276 ev->read = NXT_EVENT_INACTIVE;
277 nxt_kqueue_fd_set(engine, ev, EVFILT_READ, EV_DELETE);
278 }
279
280 if (ev->write != NXT_EVENT_INACTIVE) {
281 ev->write = NXT_EVENT_INACTIVE;
282 nxt_kqueue_fd_set(engine, ev, EVFILT_WRITE, EV_DELETE);
283 }
284 }
285
286
287 /*
288 * kqueue(2):
289 *
290 * Calling close() on a file descriptor will remove any kevents that
291 * reference the descriptor.
292 *
293 * So nxt_kqueue_close() returns true only if there are pending events.
294 */
295
296 static nxt_bool_t
nxt_kqueue_close(nxt_event_engine_t * engine,nxt_fd_event_t * ev)297 nxt_kqueue_close(nxt_event_engine_t *engine, nxt_fd_event_t *ev)
298 {
299 struct kevent *kev, *end;
300
301 ev->read = NXT_EVENT_INACTIVE;
302 ev->write = NXT_EVENT_INACTIVE;
303
304 end = &engine->u.kqueue.changes[engine->u.kqueue.nchanges];
305
306 for (kev = engine->u.kqueue.changes; kev < end; kev++) {
307 if (kev->ident == (uintptr_t) ev->fd) {
308 return 1;
309 }
310 }
311
312 return 0;
313 }
314
315
316 /*
317 * The kqueue event engine uses only three states: inactive, blocked, and
318 * active. An active oneshot event is marked as it is in the default
319 * state. The event will be converted eventually to the default EV_CLEAR
320 * mode after it will become inactive after delivery.
321 */
322
323 static void
nxt_kqueue_enable_read(nxt_event_engine_t * engine,nxt_fd_event_t * ev)324 nxt_kqueue_enable_read(nxt_event_engine_t *engine, nxt_fd_event_t *ev)
325 {
326 if (ev->read == NXT_EVENT_INACTIVE) {
327 nxt_kqueue_fd_set(engine, ev, EVFILT_READ,
328 EV_ADD | EV_ENABLE | EV_CLEAR);
329 }
330
331 ev->read = NXT_EVENT_ACTIVE;
332 }
333
334
335 static void
nxt_kqueue_enable_write(nxt_event_engine_t * engine,nxt_fd_event_t * ev)336 nxt_kqueue_enable_write(nxt_event_engine_t *engine, nxt_fd_event_t *ev)
337 {
338 if (ev->write == NXT_EVENT_INACTIVE) {
339 nxt_kqueue_fd_set(engine, ev, EVFILT_WRITE,
340 EV_ADD | EV_ENABLE | EV_CLEAR);
341 }
342
343 ev->write = NXT_EVENT_ACTIVE;
344 }
345
346
347 static void
nxt_kqueue_disable_read(nxt_event_engine_t * engine,nxt_fd_event_t * ev)348 nxt_kqueue_disable_read(nxt_event_engine_t *engine, nxt_fd_event_t *ev)
349 {
350 ev->read = NXT_EVENT_INACTIVE;
351
352 nxt_kqueue_fd_set(engine, ev, EVFILT_READ, EV_DISABLE);
353 }
354
355
356 static void
nxt_kqueue_disable_write(nxt_event_engine_t * engine,nxt_fd_event_t * ev)357 nxt_kqueue_disable_write(nxt_event_engine_t *engine, nxt_fd_event_t *ev)
358 {
359 ev->write = NXT_EVENT_INACTIVE;
360
361 nxt_kqueue_fd_set(engine, ev, EVFILT_WRITE, EV_DISABLE);
362 }
363
364
365 static void
nxt_kqueue_block_read(nxt_event_engine_t * engine,nxt_fd_event_t * ev)366 nxt_kqueue_block_read(nxt_event_engine_t *engine, nxt_fd_event_t *ev)
367 {
368 if (ev->read != NXT_EVENT_INACTIVE) {
369 ev->read = NXT_EVENT_BLOCKED;
370 }
371 }
372
373
374 static void
nxt_kqueue_block_write(nxt_event_engine_t * engine,nxt_fd_event_t * ev)375 nxt_kqueue_block_write(nxt_event_engine_t *engine, nxt_fd_event_t *ev)
376 {
377 if (ev->write != NXT_EVENT_INACTIVE) {
378 ev->write = NXT_EVENT_BLOCKED;
379 }
380 }
381
382
383 static void
nxt_kqueue_oneshot_read(nxt_event_engine_t * engine,nxt_fd_event_t * ev)384 nxt_kqueue_oneshot_read(nxt_event_engine_t *engine, nxt_fd_event_t *ev)
385 {
386 ev->write = NXT_EVENT_ACTIVE;
387
388 nxt_kqueue_fd_set(engine, ev, EVFILT_WRITE,
389 EV_ADD | EV_ENABLE | NXT_KEVENT_ONESHOT);
390 }
391
392
393 static void
nxt_kqueue_oneshot_write(nxt_event_engine_t * engine,nxt_fd_event_t * ev)394 nxt_kqueue_oneshot_write(nxt_event_engine_t *engine, nxt_fd_event_t *ev)
395 {
396 ev->write = NXT_EVENT_ACTIVE;
397
398 nxt_kqueue_fd_set(engine, ev, EVFILT_WRITE,
399 EV_ADD | EV_ENABLE | NXT_KEVENT_ONESHOT);
400 }
401
402
403 static void
nxt_kqueue_enable_accept(nxt_event_engine_t * engine,nxt_fd_event_t * ev)404 nxt_kqueue_enable_accept(nxt_event_engine_t *engine, nxt_fd_event_t *ev)
405 {
406 ev->read = NXT_EVENT_ACTIVE;
407 ev->read_handler = nxt_kqueue_listen_handler;
408
409 nxt_kqueue_fd_set(engine, ev, EVFILT_READ, EV_ADD | EV_ENABLE);
410 }
411
412
413 static void
nxt_kqueue_enable_file(nxt_event_engine_t * engine,nxt_file_event_t * ev)414 nxt_kqueue_enable_file(nxt_event_engine_t *engine, nxt_file_event_t *ev)
415 {
416 struct kevent *kev;
417
418 const nxt_int_t flags = EV_ADD | EV_ENABLE | EV_ONESHOT;
419 const nxt_uint_t fflags = NOTE_DELETE | NOTE_WRITE | NOTE_EXTEND
420 | NOTE_ATTRIB | NOTE_RENAME | NOTE_REVOKE;
421
422 nxt_debug(&engine->task, "kevent(%d) set: id:%d ft:%i fl:%04Xd, ff:%04XuD",
423 engine->u.kqueue.fd, ev->file->fd, EVFILT_VNODE, flags, fflags);
424
425 kev = nxt_kqueue_get_kevent(engine);
426
427 kev->ident = ev->file->fd;
428 kev->filter = EVFILT_VNODE;
429 kev->flags = flags;
430 kev->fflags = fflags;
431 kev->data = 0;
432 kev->udata = nxt_kevent_set_udata(ev);
433 }
434
435
436 static void
nxt_kqueue_close_file(nxt_event_engine_t * engine,nxt_file_event_t * ev)437 nxt_kqueue_close_file(nxt_event_engine_t *engine, nxt_file_event_t *ev)
438 {
439 /* TODO: pending event. */
440 }
441
442
443 static void
nxt_kqueue_fd_set(nxt_event_engine_t * engine,nxt_fd_event_t * ev,nxt_int_t filter,nxt_uint_t flags)444 nxt_kqueue_fd_set(nxt_event_engine_t *engine, nxt_fd_event_t *ev,
445 nxt_int_t filter, nxt_uint_t flags)
446 {
447 struct kevent *kev;
448
449 nxt_debug(ev->task, "kevent(%d) set event: id:%d ft:%i fl:%04Xui",
450 engine->u.kqueue.fd, ev->fd, filter, flags);
451
452 kev = nxt_kqueue_get_kevent(engine);
453
454 kev->ident = ev->fd;
455 kev->filter = filter;
456 kev->flags = flags;
457 kev->fflags = 0;
458 kev->data = 0;
459 kev->udata = nxt_kevent_set_udata(ev);
460 }
461
462
463 static struct kevent *
nxt_kqueue_get_kevent(nxt_event_engine_t * engine)464 nxt_kqueue_get_kevent(nxt_event_engine_t *engine)
465 {
466 int ret, nchanges;
467
468 nchanges = engine->u.kqueue.nchanges;
469
470 if (nxt_slow_path(nchanges >= engine->u.kqueue.mchanges)) {
471
472 nxt_debug(&engine->task, "kevent(%d) changes:%d",
473 engine->u.kqueue.fd, nchanges);
474
475 ret = kevent(engine->u.kqueue.fd, engine->u.kqueue.changes, nchanges,
476 NULL, 0, NULL);
477
478 if (nxt_slow_path(ret != 0)) {
479 nxt_alert(&engine->task, "kevent(%d) failed %E",
480 engine->u.kqueue.fd, nxt_errno);
481
482 nxt_kqueue_error(engine);
483 }
484
485 engine->u.kqueue.nchanges = 0;
486 }
487
488 return &engine->u.kqueue.changes[engine->u.kqueue.nchanges++];
489 }
490
491
492 static void
nxt_kqueue_error(nxt_event_engine_t * engine)493 nxt_kqueue_error(nxt_event_engine_t *engine)
494 {
495 struct kevent *kev, *end;
496 nxt_fd_event_t *ev;
497 nxt_file_event_t *fev;
498 nxt_work_queue_t *wq;
499
500 wq = &engine->fast_work_queue;
501 end = &engine->u.kqueue.changes[engine->u.kqueue.nchanges];
502
503 for (kev = engine->u.kqueue.changes; kev < end; kev++) {
504
505 switch (kev->filter) {
506
507 case EVFILT_READ:
508 case EVFILT_WRITE:
509 ev = nxt_kevent_get_udata(kev->udata);
510 nxt_work_queue_add(wq, nxt_kqueue_fd_error_handler,
511 ev->task, ev, ev->data);
512 break;
513
514 case EVFILT_VNODE:
515 fev = nxt_kevent_get_udata(kev->udata);
516 nxt_work_queue_add(wq, nxt_kqueue_file_error_handler,
517 fev->task, fev, fev->data);
518 break;
519 }
520 }
521 }
522
523
524 static void
nxt_kqueue_fd_error_handler(nxt_task_t * task,void * obj,void * data)525 nxt_kqueue_fd_error_handler(nxt_task_t *task, void *obj, void *data)
526 {
527 nxt_fd_event_t *ev;
528
529 ev = obj;
530
531 nxt_debug(task, "kqueue fd error handler fd:%d", ev->fd);
532
533 if (ev->kq_eof && ev->kq_errno != 0) {
534 ev->error = ev->kq_errno;
535 nxt_log(task, nxt_socket_error_level(ev->kq_errno),
536 "kevent() reported error on descriptor %d %E",
537 ev->fd, ev->kq_errno);
538 }
539
540 ev->read = NXT_EVENT_INACTIVE;
541 ev->write = NXT_EVENT_INACTIVE;
542 ev->error = ev->kq_errno;
543
544 ev->error_handler(task, ev, data);
545 }
546
547
548 static void
nxt_kqueue_file_error_handler(nxt_task_t * task,void * obj,void * data)549 nxt_kqueue_file_error_handler(nxt_task_t *task, void *obj, void *data)
550 {
551 nxt_file_event_t *ev;
552
553 ev = obj;
554
555 nxt_debug(task, "kqueue file error handler fd:%d", ev->file->fd);
556
557 ev->handler(task, ev, data);
558 }
559
560
561 static nxt_int_t
nxt_kqueue_add_signal(nxt_event_engine_t * engine,const nxt_sig_event_t * sigev)562 nxt_kqueue_add_signal(nxt_event_engine_t *engine, const nxt_sig_event_t *sigev)
563 {
564 int signo;
565 struct kevent kev;
566 struct sigaction sa;
567
568 signo = sigev->signo;
569
570 nxt_memzero(&sa, sizeof(struct sigaction));
571 sigemptyset(&sa.sa_mask);
572
573 /*
574 * SIGCHLD must not be set to SIG_IGN, since kqueue cannot catch
575 * this signal. It should be set to SIG_DFL instead. And although
576 * SIGCHLD default action is also ignoring, nevertheless SIG_DFL
577 * allows kqueue to catch the signal.
578 */
579 sa.sa_handler = (signo == SIGCHLD) ? SIG_DFL : SIG_IGN;
580
581 if (sigaction(signo, &sa, NULL) != 0) {
582 nxt_alert(&engine->task, "sigaction(%d) failed %E", signo, nxt_errno);
583
584 return NXT_ERROR;
585 }
586
587 nxt_debug(&engine->task, "kevent(%d) signo:%d (%s)",
588 engine->u.kqueue.fd, signo, sigev->name);
589
590 kev.ident = signo;
591 kev.filter = EVFILT_SIGNAL;
592 kev.flags = EV_ADD;
593 kev.fflags = 0;
594 kev.data = 0;
595 kev.udata = nxt_kevent_set_udata(sigev);
596
597 if (kevent(engine->u.kqueue.fd, &kev, 1, NULL, 0, NULL) == 0) {
598 return NXT_OK;
599 }
600
601 nxt_alert(&engine->task, "kevent(%d) failed %E", kqueue, nxt_errno);
602
603 return NXT_ERROR;
604 }
605
606
607 #if (NXT_HAVE_EVFILT_USER)
608
609 static nxt_int_t
nxt_kqueue_enable_post(nxt_event_engine_t * engine,nxt_work_handler_t handler)610 nxt_kqueue_enable_post(nxt_event_engine_t *engine, nxt_work_handler_t handler)
611 {
612 struct kevent kev;
613
614 /* EVFILT_USER must be added to a kqueue before it can be triggered. */
615
616 kev.ident = 0;
617 kev.filter = EVFILT_USER;
618 kev.flags = EV_ADD | EV_CLEAR;
619 kev.fflags = 0;
620 kev.data = 0;
621 kev.udata = NULL;
622
623 engine->u.kqueue.post_handler = handler;
624
625 if (kevent(engine->u.kqueue.fd, &kev, 1, NULL, 0, NULL) == 0) {
626 return NXT_OK;
627 }
628
629 nxt_alert(&engine->task, "kevent(%d) failed %E",
630 engine->u.kqueue.fd, nxt_errno);
631
632 return NXT_ERROR;
633 }
634
635
636 static void
nxt_kqueue_signal(nxt_event_engine_t * engine,nxt_uint_t signo)637 nxt_kqueue_signal(nxt_event_engine_t *engine, nxt_uint_t signo)
638 {
639 struct kevent kev;
640
641 /*
642 * kqueue has a builtin signal processing support, so the function
643 * is used only to post events and the signo argument is ignored.
644 */
645
646 kev.ident = 0;
647 kev.filter = EVFILT_USER;
648 kev.flags = 0;
649 kev.fflags = NOTE_TRIGGER;
650 kev.data = 0;
651 kev.udata = NULL;
652
653 if (kevent(engine->u.kqueue.fd, &kev, 1, NULL, 0, NULL) != 0) {
654 nxt_alert(&engine->task, "kevent(%d) failed %E",
655 engine->u.kqueue.fd, nxt_errno);
656 }
657 }
658
659 #endif
660
661
662 static void
nxt_kqueue_poll(nxt_event_engine_t * engine,nxt_msec_t timeout)663 nxt_kqueue_poll(nxt_event_engine_t *engine, nxt_msec_t timeout)
664 {
665 int nevents;
666 void *obj, *data;
667 nxt_int_t i;
668 nxt_err_t err;
669 nxt_uint_t level;
670 nxt_bool_t error, eof;
671 nxt_task_t *task;
672 struct kevent *kev;
673 nxt_fd_event_t *ev;
674 nxt_sig_event_t *sigev;
675 struct timespec ts, *tp;
676 nxt_file_event_t *fev;
677 nxt_work_queue_t *wq;
678 nxt_work_handler_t handler;
679
680 if (timeout == NXT_INFINITE_MSEC) {
681 tp = NULL;
682
683 } else {
684 ts.tv_sec = timeout / 1000;
685 ts.tv_nsec = (timeout % 1000) * 1000000;
686 tp = &ts;
687 }
688
689 nxt_debug(&engine->task, "kevent(%d) changes:%d timeout:%M",
690 engine->u.kqueue.fd, engine->u.kqueue.nchanges, timeout);
691
692 nevents = kevent(engine->u.kqueue.fd,
693 engine->u.kqueue.changes, engine->u.kqueue.nchanges,
694 engine->u.kqueue.events, engine->u.kqueue.mevents, tp);
695
696 err = (nevents == -1) ? nxt_errno : 0;
697
698 nxt_thread_time_update(engine->task.thread);
699
700 nxt_debug(&engine->task, "kevent(%d): %d", engine->u.kqueue.fd, nevents);
701
702 if (nevents == -1) {
703 level = (err == NXT_EINTR) ? NXT_LOG_INFO : NXT_LOG_ALERT;
704
705 nxt_log(&engine->task, level, "kevent(%d) failed %E",
706 engine->u.kqueue.fd, err);
707
708 if (err != NXT_EINTR) {
709 nxt_kqueue_error(engine);
710 }
711
712 return;
713 }
714
715 engine->u.kqueue.nchanges = 0;
716
717 for (i = 0; i < nevents; i++) {
718
719 error = 0;
720
721 kev = &engine->u.kqueue.events[i];
722
723 nxt_debug(&engine->task,
724 (kev->ident > 0x8000000 && kev->ident != (uintptr_t) -1) ?
725 "kevent: id:%p ft:%d fl:%04Xd ff:%d d:%d ud:%p":
726 "kevent: id:%d ft:%d fl:%04Xd ff:%d d:%d ud:%p",
727 kev->ident, kev->filter, kev->flags, kev->fflags,
728 kev->data, kev->udata);
729
730 if (nxt_slow_path(kev->flags & EV_ERROR)) {
731 nxt_alert(&engine->task,
732 "kevent(%d) error %E on ident:%d filter:%d",
733 engine->u.kqueue.fd, kev->data, kev->ident, kev->filter);
734 error = 1;
735 }
736
737 task = &engine->task;
738 wq = &engine->fast_work_queue;
739 handler = nxt_kqueue_fd_error_handler;
740 obj = nxt_kevent_get_udata(kev->udata);
741
742 switch (kev->filter) {
743
744 case EVFILT_READ:
745 ev = obj;
746 ev->read_ready = 1;
747 ev->kq_available = (int32_t) kev->data;
748 err = kev->fflags;
749 eof = (kev->flags & EV_EOF) != 0;
750 ev->kq_errno = err;
751 ev->kq_eof |= eof;
752
753 if (ev->read <= NXT_EVENT_BLOCKED) {
754 nxt_debug(ev->task, "blocked read event fd:%d", ev->fd);
755 continue;
756 }
757
758 if ((kev->flags & NXT_KEVENT_ONESHOT) != 0) {
759 ev->read = NXT_EVENT_INACTIVE;
760 }
761
762 if (nxt_slow_path(ev->kq_available == 0 && eof && err != 0)) {
763 error = 1;
764 }
765
766 if (nxt_fast_path(!error)) {
767 handler = ev->read_handler;
768 wq = ev->read_work_queue;
769 }
770
771 task = ev->task;
772 data = ev->data;
773
774 break;
775
776 case EVFILT_WRITE:
777 ev = obj;
778 ev->write_ready = 1;
779 err = kev->fflags;
780 eof = (kev->flags & EV_EOF) != 0;
781 ev->kq_errno = err;
782 ev->kq_eof |= eof;
783
784 if (ev->write <= NXT_EVENT_BLOCKED) {
785 nxt_debug(ev->task, "blocked write event fd:%d", ev->fd);
786 continue;
787 }
788
789 if ((kev->flags & NXT_KEVENT_ONESHOT) != 0) {
790 ev->write = NXT_EVENT_INACTIVE;
791 }
792
793 if (nxt_slow_path(eof && err != 0)) {
794 error = 1;
795 }
796
797 if (nxt_fast_path(!error)) {
798 handler = ev->write_handler;
799 wq = ev->write_work_queue;
800 }
801
802 task = ev->task;
803 data = ev->data;
804
805 break;
806
807 case EVFILT_VNODE:
808 fev = obj;
809 handler = fev->handler;
810 task = fev->task;
811 data = fev->data;
812 break;
813
814 case EVFILT_SIGNAL:
815 sigev = obj;
816 obj = (void *) kev->ident;
817 handler = sigev->handler;
818 data = (void *) sigev->name;
819 break;
820
821 #if (NXT_HAVE_EVFILT_USER)
822
823 case EVFILT_USER:
824 handler = engine->u.kqueue.post_handler;
825 data = NULL;
826 break;
827
828 #endif
829
830 default:
831
832 #if (NXT_DEBUG)
833 nxt_alert(&engine->task,
834 "unexpected kevent(%d) filter %d on ident %d",
835 engine->u.kqueue.fd, kev->filter, kev->ident);
836 #endif
837
838 continue;
839 }
840
841 nxt_work_queue_add(wq, handler, task, obj, data);
842 }
843 }
844
845
846 /*
847 * nxt_kqueue_event_conn_io_connect() eliminates the
848 * getsockopt() syscall to test pending connect() error.
849 */
850
851 static void
nxt_kqueue_conn_io_connect(nxt_task_t * task,void * obj,void * data)852 nxt_kqueue_conn_io_connect(nxt_task_t *task, void *obj, void *data)
853 {
854 nxt_conn_t *c;
855 nxt_event_engine_t *engine;
856 nxt_work_handler_t handler;
857 const nxt_event_conn_state_t *state;
858
859 c = obj;
860
861 state = c->write_state;
862
863 switch (nxt_socket_connect(task, c->socket.fd, c->remote)) {
864
865 case NXT_OK:
866 c->socket.write_ready = 1;
867 handler = state->ready_handler;
868 break;
869
870 case NXT_AGAIN:
871 c->socket.write_handler = nxt_kqueue_conn_connected;
872 c->socket.error_handler = nxt_conn_connect_error;
873
874 engine = task->thread->engine;
875 nxt_conn_timer(engine, c, state, &c->write_timer);
876
877 nxt_kqueue_enable_write(engine, &c->socket);
878 return;
879
880 case NXT_DECLINED:
881 handler = state->close_handler;
882 break;
883
884 default: /* NXT_ERROR */
885 handler = state->error_handler;
886 break;
887 }
888
889 nxt_work_queue_add(c->write_work_queue, handler, task, c, data);
890 }
891
892
893 static void
nxt_kqueue_conn_connected(nxt_task_t * task,void * obj,void * data)894 nxt_kqueue_conn_connected(nxt_task_t *task, void *obj, void *data)
895 {
896 nxt_conn_t *c;
897
898 c = obj;
899
900 nxt_debug(task, "kqueue conn connected fd:%d", c->socket.fd);
901
902 c->socket.write = NXT_EVENT_BLOCKED;
903
904 if (c->write_state->timer_autoreset) {
905 nxt_timer_disable(task->thread->engine, &c->write_timer);
906 }
907
908 nxt_work_queue_add(c->write_work_queue, c->write_state->ready_handler,
909 task, c, data);
910 }
911
912
913 static void
nxt_kqueue_listen_handler(nxt_task_t * task,void * obj,void * data)914 nxt_kqueue_listen_handler(nxt_task_t *task, void *obj, void *data)
915 {
916 nxt_listen_event_t *lev;
917
918 lev = obj;
919
920 nxt_debug(task, "kevent fd:%d avail:%D",
921 lev->socket.fd, lev->socket.kq_available);
922
923 lev->ready = nxt_min(lev->batch, (uint32_t) lev->socket.kq_available);
924
925 nxt_kqueue_conn_io_accept(task, lev, data);
926 }
927
928
929 static void
nxt_kqueue_conn_io_accept(nxt_task_t * task,void * obj,void * data)930 nxt_kqueue_conn_io_accept(nxt_task_t *task, void *obj, void *data)
931 {
932 socklen_t socklen;
933 nxt_conn_t *c;
934 nxt_socket_t s;
935 struct sockaddr *sa;
936 nxt_listen_event_t *lev;
937
938 lev = obj;
939 c = lev->next;
940
941 lev->ready--;
942 lev->socket.read_ready = (lev->ready != 0);
943
944 lev->socket.kq_available--;
945 lev->socket.read_ready = (lev->socket.kq_available != 0);
946
947 sa = &c->remote->u.sockaddr;
948 socklen = c->remote->socklen;
949 /*
950 * The returned socklen is ignored here,
951 * see comment in nxt_conn_io_accept().
952 */
953 s = accept(lev->socket.fd, sa, &socklen);
954
955 if (s != -1) {
956 c->socket.fd = s;
957
958 nxt_debug(task, "accept(%d): %d", lev->socket.fd, s);
959
960 nxt_conn_accept(task, lev, c);
961 return;
962 }
963
964 nxt_conn_accept_error(task, lev, "accept", nxt_errno);
965 }
966
967
968 /*
969 * nxt_kqueue_conn_io_read() is just a wrapper to eliminate the
970 * readv() or recv() syscall if a remote side just closed connection.
971 */
972
973 static void
nxt_kqueue_conn_io_read(nxt_task_t * task,void * obj,void * data)974 nxt_kqueue_conn_io_read(nxt_task_t *task, void *obj, void *data)
975 {
976 nxt_conn_t *c;
977
978 c = obj;
979
980 nxt_debug(task, "kqueue conn read fd:%d", c->socket.fd);
981
982 if (c->socket.kq_available == 0 && c->socket.kq_eof) {
983 nxt_debug(task, "kevent fd:%d eof", c->socket.fd);
984
985 c->socket.closed = 1;
986 nxt_work_queue_add(c->read_work_queue, c->read_state->close_handler,
987 task, c, data);
988 return;
989 }
990
991 nxt_conn_io_read(task, c, data);
992 }
993
994
995 /*
996 * nxt_kqueue_conn_io_recvbuf() is just wrapper around standard
997 * nxt_conn_io_recvbuf() to eliminate the readv() or recv() syscalls
998 * if there is no pending data or a remote side closed connection.
999 */
1000
1001 static ssize_t
nxt_kqueue_conn_io_recvbuf(nxt_conn_t * c,nxt_buf_t * b)1002 nxt_kqueue_conn_io_recvbuf(nxt_conn_t *c, nxt_buf_t *b)
1003 {
1004 ssize_t n;
1005
1006 if (c->socket.kq_available == 0 && c->socket.kq_eof) {
1007 c->socket.closed = 1;
1008 return 0;
1009 }
1010
1011 n = nxt_conn_io_recvbuf(c, b);
1012
1013 if (n > 0) {
1014 c->socket.kq_available -= n;
1015
1016 if (c->socket.kq_available < 0) {
1017 c->socket.kq_available = 0;
1018 }
1019
1020 nxt_debug(c->socket.task, "kevent fd:%d avail:%D eof:%d",
1021 c->socket.fd, c->socket.kq_available, c->socket.kq_eof);
1022
1023 c->socket.read_ready = (c->socket.kq_available != 0
1024 || c->socket.kq_eof);
1025 }
1026
1027 return n;
1028 }
1029