nxt_kqueue_engine.c (312:c156aea91063) nxt_kqueue_engine.c (564:762f8c976ead)
1
2/*
3 * Copyright (C) Igor Sysoev
4 * Copyright (C) NGINX, Inc.
5 */
6
7#include <nxt_main.h>
8
9
10/*
11 * kqueue() has been introduced in FreeBSD 4.1 and then was ported
12 * to OpenBSD 2.9, MacOSX 10.3 (Panther), and NetBSD 2.0.
13 * DragonFlyBSD inherited it with FreeBSD 4 code base.
14 *
15 * NOTE_REVOKE has been introduced in FreeBSD 4.3 and then was ported
16 * to OpenBSD 2.9, MacOSX 10.3 (Panther), and NetBSD 2.0.
17 * DragonFlyBSD inherited it with FreeBSD 4 code base.
18 *
19 * EVFILT_TIMER has been introduced in FreeBSD 4.4-STABLE and then was
20 * ported to NetBSD 2.0, MacOSX 10.4 (Tiger), and OpenBSD 4.2.
21 * DragonFlyBSD inherited it with FreeBSD 4 code base.
22 *
23 * EVFILT_USER and EV_DISPATCH have been introduced in MacOSX 10.6 (Snow
24 * Leopard) as part of the Grand Central Dispatch framework
25 * and then were ported to FreeBSD 8.0-STABLE as part of the
26 * libdispatch support.
27 */
28
29
30/*
31 * EV_DISPATCH is better because it just disables an event on delivery
32 * whilst EV_ONESHOT deletes the event. This eliminates in-kernel memory
33 * deallocation and probable subsequent allocation with a lock acquiring.
34 */
35#ifdef EV_DISPATCH
36#define NXT_KEVENT_ONESHOT EV_DISPATCH
37#else
38#define NXT_KEVENT_ONESHOT EV_ONESHOT
39#endif
40
41
42#if (NXT_NETBSD)
43/* NetBSD defines the kevent.udata field as intptr_t. */
44
45#define nxt_kevent_set_udata(udata) (intptr_t) (udata)
46#define nxt_kevent_get_udata(udata) (void *) (udata)
47
48#else
49#define nxt_kevent_set_udata(udata) (void *) (udata)
50#define nxt_kevent_get_udata(udata) (udata)
51#endif
52
53
54static nxt_int_t nxt_kqueue_create(nxt_event_engine_t *engine,
55 nxt_uint_t mchanges, nxt_uint_t mevents);
56static void nxt_kqueue_free(nxt_event_engine_t *engine);
57static void nxt_kqueue_enable(nxt_event_engine_t *engine, nxt_fd_event_t *ev);
58static void nxt_kqueue_disable(nxt_event_engine_t *engine, nxt_fd_event_t *ev);
59static void nxt_kqueue_delete(nxt_event_engine_t *engine, nxt_fd_event_t *ev);
60static nxt_bool_t nxt_kqueue_close(nxt_event_engine_t *engine,
61 nxt_fd_event_t *ev);
62static void nxt_kqueue_enable_read(nxt_event_engine_t *engine,
63 nxt_fd_event_t *ev);
64static void nxt_kqueue_enable_write(nxt_event_engine_t *engine,
65 nxt_fd_event_t *ev);
66static void nxt_kqueue_disable_read(nxt_event_engine_t *engine,
67 nxt_fd_event_t *ev);
68static void nxt_kqueue_disable_write(nxt_event_engine_t *engine,
69 nxt_fd_event_t *ev);
70static void nxt_kqueue_block_read(nxt_event_engine_t *engine,
71 nxt_fd_event_t *ev);
72static void nxt_kqueue_block_write(nxt_event_engine_t *engine,
73 nxt_fd_event_t *ev);
74static void nxt_kqueue_oneshot_read(nxt_event_engine_t *engine,
75 nxt_fd_event_t *ev);
76static void nxt_kqueue_oneshot_write(nxt_event_engine_t *engine,
77 nxt_fd_event_t *ev);
78static void nxt_kqueue_enable_accept(nxt_event_engine_t *engine,
79 nxt_fd_event_t *ev);
80static void nxt_kqueue_enable_file(nxt_event_engine_t *engine,
81 nxt_file_event_t *ev);
82static void nxt_kqueue_close_file(nxt_event_engine_t *engine,
83 nxt_file_event_t *ev);
84static void nxt_kqueue_fd_set(nxt_event_engine_t *engine, nxt_fd_event_t *ev,
85 nxt_int_t filter, nxt_uint_t flags);
86static struct kevent *nxt_kqueue_get_kevent(nxt_event_engine_t *engine);
87static void nxt_kqueue_error(nxt_event_engine_t *engine);
88static void nxt_kqueue_fd_error_handler(nxt_task_t *task, void *obj,
89 void *data);
90static void nxt_kqueue_file_error_handler(nxt_task_t *task, void *obj,
91 void *data);
92static nxt_int_t nxt_kqueue_add_signal(nxt_event_engine_t *engine,
93 const nxt_sig_event_t *sigev);
94#if (NXT_HAVE_EVFILT_USER)
95static nxt_int_t nxt_kqueue_enable_post(nxt_event_engine_t *engine,
96 nxt_work_handler_t handler);
97static void nxt_kqueue_signal(nxt_event_engine_t *engine, nxt_uint_t signo);
98#endif
99static void nxt_kqueue_poll(nxt_event_engine_t *engine, nxt_msec_t timeout);
100
101static void nxt_kqueue_conn_io_connect(nxt_task_t *task, void *obj,
102 void *data);
103static void nxt_kqueue_conn_connected(nxt_task_t *task, void *obj,
104 void *data);
105static void nxt_kqueue_listen_handler(nxt_task_t *task, void *obj, void *data);
106static void nxt_kqueue_conn_io_accept(nxt_task_t *task, void *obj,
107 void *data);
108static void nxt_kqueue_conn_io_read(nxt_task_t *task, void *obj,
109 void *data);
110static ssize_t nxt_kqueue_conn_io_recvbuf(nxt_conn_t *c, nxt_buf_t *b);
111
112
113static nxt_conn_io_t nxt_kqueue_conn_io = {
114 nxt_kqueue_conn_io_connect,
115 nxt_kqueue_conn_io_accept,
116
117 nxt_kqueue_conn_io_read,
118 nxt_kqueue_conn_io_recvbuf,
119 nxt_conn_io_recv,
120
121 nxt_conn_io_write,
122 nxt_event_conn_io_write_chunk,
123
124#if (NXT_HAVE_FREEBSD_SENDFILE)
125 nxt_freebsd_event_conn_io_sendfile,
126#elif (NXT_HAVE_MACOSX_SENDFILE)
127 nxt_macosx_event_conn_io_sendfile,
128#else
129 nxt_event_conn_io_sendbuf,
130#endif
131
132 nxt_event_conn_io_writev,
133 nxt_event_conn_io_send,
134
135 nxt_conn_io_shutdown,
136};
137
138
139const nxt_event_interface_t nxt_kqueue_engine = {
140 "kqueue",
141 nxt_kqueue_create,
142 nxt_kqueue_free,
143 nxt_kqueue_enable,
144 nxt_kqueue_disable,
145 nxt_kqueue_delete,
146 nxt_kqueue_close,
147 nxt_kqueue_enable_read,
148 nxt_kqueue_enable_write,
149 nxt_kqueue_disable_read,
150 nxt_kqueue_disable_write,
151 nxt_kqueue_block_read,
152 nxt_kqueue_block_write,
153 nxt_kqueue_oneshot_read,
154 nxt_kqueue_oneshot_write,
155 nxt_kqueue_enable_accept,
156 nxt_kqueue_enable_file,
157 nxt_kqueue_close_file,
158#if (NXT_HAVE_EVFILT_USER)
159 nxt_kqueue_enable_post,
160 nxt_kqueue_signal,
161#else
162 NULL,
163 NULL,
164#endif
165 nxt_kqueue_poll,
166
167 &nxt_kqueue_conn_io,
168
169 NXT_FILE_EVENTS,
170 NXT_SIGNAL_EVENTS,
171};
172
173
174static nxt_int_t
175nxt_kqueue_create(nxt_event_engine_t *engine, nxt_uint_t mchanges,
176 nxt_uint_t mevents)
177{
178 const nxt_sig_event_t *sigev;
179
180 engine->u.kqueue.fd = -1;
181 engine->u.kqueue.mchanges = mchanges;
182 engine->u.kqueue.mevents = mevents;
183 engine->u.kqueue.pid = nxt_pid;
184
185 engine->u.kqueue.changes = nxt_malloc(sizeof(struct kevent) * mchanges);
186 if (engine->u.kqueue.changes == NULL) {
187 goto fail;
188 }
189
190 engine->u.kqueue.events = nxt_malloc(sizeof(struct kevent) * mevents);
191 if (engine->u.kqueue.events == NULL) {
192 goto fail;
193 }
194
195 engine->u.kqueue.fd = kqueue();
196 if (engine->u.kqueue.fd == -1) {
1
2/*
3 * Copyright (C) Igor Sysoev
4 * Copyright (C) NGINX, Inc.
5 */
6
7#include <nxt_main.h>
8
9
10/*
11 * kqueue() has been introduced in FreeBSD 4.1 and then was ported
12 * to OpenBSD 2.9, MacOSX 10.3 (Panther), and NetBSD 2.0.
13 * DragonFlyBSD inherited it with FreeBSD 4 code base.
14 *
15 * NOTE_REVOKE has been introduced in FreeBSD 4.3 and then was ported
16 * to OpenBSD 2.9, MacOSX 10.3 (Panther), and NetBSD 2.0.
17 * DragonFlyBSD inherited it with FreeBSD 4 code base.
18 *
19 * EVFILT_TIMER has been introduced in FreeBSD 4.4-STABLE and then was
20 * ported to NetBSD 2.0, MacOSX 10.4 (Tiger), and OpenBSD 4.2.
21 * DragonFlyBSD inherited it with FreeBSD 4 code base.
22 *
23 * EVFILT_USER and EV_DISPATCH have been introduced in MacOSX 10.6 (Snow
24 * Leopard) as part of the Grand Central Dispatch framework
25 * and then were ported to FreeBSD 8.0-STABLE as part of the
26 * libdispatch support.
27 */
28
29
30/*
31 * EV_DISPATCH is better because it just disables an event on delivery
32 * whilst EV_ONESHOT deletes the event. This eliminates in-kernel memory
33 * deallocation and probable subsequent allocation with a lock acquiring.
34 */
35#ifdef EV_DISPATCH
36#define NXT_KEVENT_ONESHOT EV_DISPATCH
37#else
38#define NXT_KEVENT_ONESHOT EV_ONESHOT
39#endif
40
41
42#if (NXT_NETBSD)
43/* NetBSD defines the kevent.udata field as intptr_t. */
44
45#define nxt_kevent_set_udata(udata) (intptr_t) (udata)
46#define nxt_kevent_get_udata(udata) (void *) (udata)
47
48#else
49#define nxt_kevent_set_udata(udata) (void *) (udata)
50#define nxt_kevent_get_udata(udata) (udata)
51#endif
52
53
54static nxt_int_t nxt_kqueue_create(nxt_event_engine_t *engine,
55 nxt_uint_t mchanges, nxt_uint_t mevents);
56static void nxt_kqueue_free(nxt_event_engine_t *engine);
57static void nxt_kqueue_enable(nxt_event_engine_t *engine, nxt_fd_event_t *ev);
58static void nxt_kqueue_disable(nxt_event_engine_t *engine, nxt_fd_event_t *ev);
59static void nxt_kqueue_delete(nxt_event_engine_t *engine, nxt_fd_event_t *ev);
60static nxt_bool_t nxt_kqueue_close(nxt_event_engine_t *engine,
61 nxt_fd_event_t *ev);
62static void nxt_kqueue_enable_read(nxt_event_engine_t *engine,
63 nxt_fd_event_t *ev);
64static void nxt_kqueue_enable_write(nxt_event_engine_t *engine,
65 nxt_fd_event_t *ev);
66static void nxt_kqueue_disable_read(nxt_event_engine_t *engine,
67 nxt_fd_event_t *ev);
68static void nxt_kqueue_disable_write(nxt_event_engine_t *engine,
69 nxt_fd_event_t *ev);
70static void nxt_kqueue_block_read(nxt_event_engine_t *engine,
71 nxt_fd_event_t *ev);
72static void nxt_kqueue_block_write(nxt_event_engine_t *engine,
73 nxt_fd_event_t *ev);
74static void nxt_kqueue_oneshot_read(nxt_event_engine_t *engine,
75 nxt_fd_event_t *ev);
76static void nxt_kqueue_oneshot_write(nxt_event_engine_t *engine,
77 nxt_fd_event_t *ev);
78static void nxt_kqueue_enable_accept(nxt_event_engine_t *engine,
79 nxt_fd_event_t *ev);
80static void nxt_kqueue_enable_file(nxt_event_engine_t *engine,
81 nxt_file_event_t *ev);
82static void nxt_kqueue_close_file(nxt_event_engine_t *engine,
83 nxt_file_event_t *ev);
84static void nxt_kqueue_fd_set(nxt_event_engine_t *engine, nxt_fd_event_t *ev,
85 nxt_int_t filter, nxt_uint_t flags);
86static struct kevent *nxt_kqueue_get_kevent(nxt_event_engine_t *engine);
87static void nxt_kqueue_error(nxt_event_engine_t *engine);
88static void nxt_kqueue_fd_error_handler(nxt_task_t *task, void *obj,
89 void *data);
90static void nxt_kqueue_file_error_handler(nxt_task_t *task, void *obj,
91 void *data);
92static nxt_int_t nxt_kqueue_add_signal(nxt_event_engine_t *engine,
93 const nxt_sig_event_t *sigev);
94#if (NXT_HAVE_EVFILT_USER)
95static nxt_int_t nxt_kqueue_enable_post(nxt_event_engine_t *engine,
96 nxt_work_handler_t handler);
97static void nxt_kqueue_signal(nxt_event_engine_t *engine, nxt_uint_t signo);
98#endif
99static void nxt_kqueue_poll(nxt_event_engine_t *engine, nxt_msec_t timeout);
100
101static void nxt_kqueue_conn_io_connect(nxt_task_t *task, void *obj,
102 void *data);
103static void nxt_kqueue_conn_connected(nxt_task_t *task, void *obj,
104 void *data);
105static void nxt_kqueue_listen_handler(nxt_task_t *task, void *obj, void *data);
106static void nxt_kqueue_conn_io_accept(nxt_task_t *task, void *obj,
107 void *data);
108static void nxt_kqueue_conn_io_read(nxt_task_t *task, void *obj,
109 void *data);
110static ssize_t nxt_kqueue_conn_io_recvbuf(nxt_conn_t *c, nxt_buf_t *b);
111
112
113static nxt_conn_io_t nxt_kqueue_conn_io = {
114 nxt_kqueue_conn_io_connect,
115 nxt_kqueue_conn_io_accept,
116
117 nxt_kqueue_conn_io_read,
118 nxt_kqueue_conn_io_recvbuf,
119 nxt_conn_io_recv,
120
121 nxt_conn_io_write,
122 nxt_event_conn_io_write_chunk,
123
124#if (NXT_HAVE_FREEBSD_SENDFILE)
125 nxt_freebsd_event_conn_io_sendfile,
126#elif (NXT_HAVE_MACOSX_SENDFILE)
127 nxt_macosx_event_conn_io_sendfile,
128#else
129 nxt_event_conn_io_sendbuf,
130#endif
131
132 nxt_event_conn_io_writev,
133 nxt_event_conn_io_send,
134
135 nxt_conn_io_shutdown,
136};
137
138
139const nxt_event_interface_t nxt_kqueue_engine = {
140 "kqueue",
141 nxt_kqueue_create,
142 nxt_kqueue_free,
143 nxt_kqueue_enable,
144 nxt_kqueue_disable,
145 nxt_kqueue_delete,
146 nxt_kqueue_close,
147 nxt_kqueue_enable_read,
148 nxt_kqueue_enable_write,
149 nxt_kqueue_disable_read,
150 nxt_kqueue_disable_write,
151 nxt_kqueue_block_read,
152 nxt_kqueue_block_write,
153 nxt_kqueue_oneshot_read,
154 nxt_kqueue_oneshot_write,
155 nxt_kqueue_enable_accept,
156 nxt_kqueue_enable_file,
157 nxt_kqueue_close_file,
158#if (NXT_HAVE_EVFILT_USER)
159 nxt_kqueue_enable_post,
160 nxt_kqueue_signal,
161#else
162 NULL,
163 NULL,
164#endif
165 nxt_kqueue_poll,
166
167 &nxt_kqueue_conn_io,
168
169 NXT_FILE_EVENTS,
170 NXT_SIGNAL_EVENTS,
171};
172
173
174static nxt_int_t
175nxt_kqueue_create(nxt_event_engine_t *engine, nxt_uint_t mchanges,
176 nxt_uint_t mevents)
177{
178 const nxt_sig_event_t *sigev;
179
180 engine->u.kqueue.fd = -1;
181 engine->u.kqueue.mchanges = mchanges;
182 engine->u.kqueue.mevents = mevents;
183 engine->u.kqueue.pid = nxt_pid;
184
185 engine->u.kqueue.changes = nxt_malloc(sizeof(struct kevent) * mchanges);
186 if (engine->u.kqueue.changes == NULL) {
187 goto fail;
188 }
189
190 engine->u.kqueue.events = nxt_malloc(sizeof(struct kevent) * mevents);
191 if (engine->u.kqueue.events == NULL) {
192 goto fail;
193 }
194
195 engine->u.kqueue.fd = kqueue();
196 if (engine->u.kqueue.fd == -1) {
197 nxt_log(&engine->task, NXT_LOG_CRIT, "kqueue() failed %E", nxt_errno);
197 nxt_alert(&engine->task, "kqueue() failed %E", nxt_errno);
198 goto fail;
199 }
200
201 nxt_debug(&engine->task, "kqueue(): %d", engine->u.kqueue.fd);
202
203 if (engine->signals != NULL) {
204 for (sigev = engine->signals->sigev; sigev->signo != 0; sigev++) {
205 if (nxt_kqueue_add_signal(engine, sigev) != NXT_OK) {
206 goto fail;
207 }
208 }
209 }
210
211 return NXT_OK;
212
213fail:
214
215 nxt_kqueue_free(engine);
216
217 return NXT_ERROR;
218}
219
220
221static void
222nxt_kqueue_free(nxt_event_engine_t *engine)
223{
224 nxt_fd_t fd;
225
226 fd = engine->u.kqueue.fd;
227
228 nxt_debug(&engine->task, "kqueue %d free", fd);
229
230 if (fd != -1 && engine->u.kqueue.pid == nxt_pid) {
231 /* kqueue is not inherited by fork() */
232
233 if (close(fd) != 0) {
198 goto fail;
199 }
200
201 nxt_debug(&engine->task, "kqueue(): %d", engine->u.kqueue.fd);
202
203 if (engine->signals != NULL) {
204 for (sigev = engine->signals->sigev; sigev->signo != 0; sigev++) {
205 if (nxt_kqueue_add_signal(engine, sigev) != NXT_OK) {
206 goto fail;
207 }
208 }
209 }
210
211 return NXT_OK;
212
213fail:
214
215 nxt_kqueue_free(engine);
216
217 return NXT_ERROR;
218}
219
220
221static void
222nxt_kqueue_free(nxt_event_engine_t *engine)
223{
224 nxt_fd_t fd;
225
226 fd = engine->u.kqueue.fd;
227
228 nxt_debug(&engine->task, "kqueue %d free", fd);
229
230 if (fd != -1 && engine->u.kqueue.pid == nxt_pid) {
231 /* kqueue is not inherited by fork() */
232
233 if (close(fd) != 0) {
234 nxt_log(&engine->task, NXT_LOG_CRIT, "kqueue close(%d) failed %E",
235 fd, nxt_errno);
234 nxt_alert(&engine->task, "kqueue close(%d) failed %E",
235 fd, nxt_errno);
236 }
237 }
238
239 nxt_free(engine->u.kqueue.events);
240 nxt_free(engine->u.kqueue.changes);
241
242 nxt_memzero(&engine->u.kqueue, sizeof(nxt_kqueue_engine_t));
243}
244
245
246static void
247nxt_kqueue_enable(nxt_event_engine_t *engine, nxt_fd_event_t *ev)
248{
249 nxt_kqueue_enable_read(engine, ev);
250 nxt_kqueue_enable_write(engine, ev);
251}
252
253
254/*
255 * EV_DISABLE is better because it eliminates in-kernel memory
256 * deallocation and probable subsequent allocation with a lock acquiring.
257 */
258
259static void
260nxt_kqueue_disable(nxt_event_engine_t *engine, nxt_fd_event_t *ev)
261{
262 if (ev->read != NXT_EVENT_INACTIVE) {
263 ev->read = NXT_EVENT_INACTIVE;
264 nxt_kqueue_fd_set(engine, ev, EVFILT_READ, EV_DISABLE);
265 }
266
267 if (ev->write != NXT_EVENT_INACTIVE) {
268 ev->write = NXT_EVENT_INACTIVE;
269 nxt_kqueue_fd_set(engine, ev, EVFILT_WRITE, EV_DISABLE);
270 }
271}
272
273
274static void
275nxt_kqueue_delete(nxt_event_engine_t *engine, nxt_fd_event_t *ev)
276{
277 if (ev->read != NXT_EVENT_INACTIVE) {
278 ev->read = NXT_EVENT_INACTIVE;
279 nxt_kqueue_fd_set(engine, ev, EVFILT_READ, EV_DELETE);
280 }
281
282 if (ev->write != NXT_EVENT_INACTIVE) {
283 ev->write = NXT_EVENT_INACTIVE;
284 nxt_kqueue_fd_set(engine, ev, EVFILT_WRITE, EV_DELETE);
285 }
286}
287
288
289/*
290 * kqueue(2):
291 *
292 * Calling close() on a file descriptor will remove any kevents that
293 * reference the descriptor.
294 *
295 * So nxt_kqueue_close() returns true only if there are pending events.
296 */
297
298static nxt_bool_t
299nxt_kqueue_close(nxt_event_engine_t *engine, nxt_fd_event_t *ev)
300{
301 struct kevent *kev, *end;
302
303 ev->read = NXT_EVENT_INACTIVE;
304 ev->write = NXT_EVENT_INACTIVE;
305
306 end = &engine->u.kqueue.changes[engine->u.kqueue.nchanges];
307
308 for (kev = engine->u.kqueue.changes; kev < end; kev++) {
309 if (kev->ident == (uintptr_t) ev->fd) {
310 return 1;
311 }
312 }
313
314 return 0;
315}
316
317
318/*
319 * The kqueue event engine uses only three states: inactive, blocked, and
320 * active. An active oneshot event is marked as it is in the default
321 * state. The event will be converted eventually to the default EV_CLEAR
322 * mode after it will become inactive after delivery.
323 */
324
325static void
326nxt_kqueue_enable_read(nxt_event_engine_t *engine, nxt_fd_event_t *ev)
327{
328 if (ev->read == NXT_EVENT_INACTIVE) {
329 nxt_kqueue_fd_set(engine, ev, EVFILT_READ,
330 EV_ADD | EV_ENABLE | EV_CLEAR);
331 }
332
333 ev->read = NXT_EVENT_ACTIVE;
334}
335
336
337static void
338nxt_kqueue_enable_write(nxt_event_engine_t *engine, nxt_fd_event_t *ev)
339{
340 if (ev->write == NXT_EVENT_INACTIVE) {
341 nxt_kqueue_fd_set(engine, ev, EVFILT_WRITE,
342 EV_ADD | EV_ENABLE | EV_CLEAR);
343 }
344
345 ev->write = NXT_EVENT_ACTIVE;
346}
347
348
349static void
350nxt_kqueue_disable_read(nxt_event_engine_t *engine, nxt_fd_event_t *ev)
351{
352 ev->read = NXT_EVENT_INACTIVE;
353
354 nxt_kqueue_fd_set(engine, ev, EVFILT_READ, EV_DISABLE);
355}
356
357
358static void
359nxt_kqueue_disable_write(nxt_event_engine_t *engine, nxt_fd_event_t *ev)
360{
361 ev->write = NXT_EVENT_INACTIVE;
362
363 nxt_kqueue_fd_set(engine, ev, EVFILT_WRITE, EV_DISABLE);
364}
365
366
367static void
368nxt_kqueue_block_read(nxt_event_engine_t *engine, nxt_fd_event_t *ev)
369{
370 if (ev->read != NXT_EVENT_INACTIVE) {
371 ev->read = NXT_EVENT_BLOCKED;
372 }
373}
374
375
376static void
377nxt_kqueue_block_write(nxt_event_engine_t *engine, nxt_fd_event_t *ev)
378{
379 if (ev->write != NXT_EVENT_INACTIVE) {
380 ev->write = NXT_EVENT_BLOCKED;
381 }
382}
383
384
385static void
386nxt_kqueue_oneshot_read(nxt_event_engine_t *engine, nxt_fd_event_t *ev)
387{
388 ev->write = NXT_EVENT_ACTIVE;
389
390 nxt_kqueue_fd_set(engine, ev, EVFILT_WRITE,
391 EV_ADD | EV_ENABLE | NXT_KEVENT_ONESHOT);
392}
393
394
395static void
396nxt_kqueue_oneshot_write(nxt_event_engine_t *engine, nxt_fd_event_t *ev)
397{
398 ev->write = NXT_EVENT_ACTIVE;
399
400 nxt_kqueue_fd_set(engine, ev, EVFILT_WRITE,
401 EV_ADD | EV_ENABLE | NXT_KEVENT_ONESHOT);
402}
403
404
405static void
406nxt_kqueue_enable_accept(nxt_event_engine_t *engine, nxt_fd_event_t *ev)
407{
408 ev->read = NXT_EVENT_ACTIVE;
409 ev->read_handler = nxt_kqueue_listen_handler;
410
411 nxt_kqueue_fd_set(engine, ev, EVFILT_READ, EV_ADD | EV_ENABLE);
412}
413
414
415static void
416nxt_kqueue_enable_file(nxt_event_engine_t *engine, nxt_file_event_t *ev)
417{
418 struct kevent *kev;
419
420 const nxt_int_t flags = EV_ADD | EV_ENABLE | EV_ONESHOT;
421 const nxt_uint_t fflags = NOTE_DELETE | NOTE_WRITE | NOTE_EXTEND
422 | NOTE_ATTRIB | NOTE_RENAME | NOTE_REVOKE;
423
424 nxt_debug(&engine->task, "kevent(%d) set: id:%d ft:%i fl:%04Xd, ff:%04XuD",
425 engine->u.kqueue.fd, ev->file->fd, EVFILT_VNODE, flags, fflags);
426
427 kev = nxt_kqueue_get_kevent(engine);
428
429 kev->ident = ev->file->fd;
430 kev->filter = EVFILT_VNODE;
431 kev->flags = flags;
432 kev->fflags = fflags;
433 kev->data = 0;
434 kev->udata = nxt_kevent_set_udata(ev);
435}
436
437
438static void
439nxt_kqueue_close_file(nxt_event_engine_t *engine, nxt_file_event_t *ev)
440{
441 /* TODO: pending event. */
442}
443
444
445static void
446nxt_kqueue_fd_set(nxt_event_engine_t *engine, nxt_fd_event_t *ev,
447 nxt_int_t filter, nxt_uint_t flags)
448{
449 struct kevent *kev;
450
451 nxt_debug(ev->task, "kevent(%d) set event: id:%d ft:%i fl:%04Xui",
452 engine->u.kqueue.fd, ev->fd, filter, flags);
453
454 kev = nxt_kqueue_get_kevent(engine);
455
456 kev->ident = ev->fd;
457 kev->filter = filter;
458 kev->flags = flags;
459 kev->fflags = 0;
460 kev->data = 0;
461 kev->udata = nxt_kevent_set_udata(ev);
462}
463
464
465static struct kevent *
466nxt_kqueue_get_kevent(nxt_event_engine_t *engine)
467{
468 int ret, nchanges;
469
470 nchanges = engine->u.kqueue.nchanges;
471
472 if (nxt_slow_path(nchanges >= engine->u.kqueue.mchanges)) {
473
474 nxt_debug(&engine->task, "kevent(%d) changes:%d",
475 engine->u.kqueue.fd, nchanges);
476
477 ret = kevent(engine->u.kqueue.fd, engine->u.kqueue.changes, nchanges,
478 NULL, 0, NULL);
479
480 if (nxt_slow_path(ret != 0)) {
236 }
237 }
238
239 nxt_free(engine->u.kqueue.events);
240 nxt_free(engine->u.kqueue.changes);
241
242 nxt_memzero(&engine->u.kqueue, sizeof(nxt_kqueue_engine_t));
243}
244
245
246static void
247nxt_kqueue_enable(nxt_event_engine_t *engine, nxt_fd_event_t *ev)
248{
249 nxt_kqueue_enable_read(engine, ev);
250 nxt_kqueue_enable_write(engine, ev);
251}
252
253
254/*
255 * EV_DISABLE is better because it eliminates in-kernel memory
256 * deallocation and probable subsequent allocation with a lock acquiring.
257 */
258
259static void
260nxt_kqueue_disable(nxt_event_engine_t *engine, nxt_fd_event_t *ev)
261{
262 if (ev->read != NXT_EVENT_INACTIVE) {
263 ev->read = NXT_EVENT_INACTIVE;
264 nxt_kqueue_fd_set(engine, ev, EVFILT_READ, EV_DISABLE);
265 }
266
267 if (ev->write != NXT_EVENT_INACTIVE) {
268 ev->write = NXT_EVENT_INACTIVE;
269 nxt_kqueue_fd_set(engine, ev, EVFILT_WRITE, EV_DISABLE);
270 }
271}
272
273
274static void
275nxt_kqueue_delete(nxt_event_engine_t *engine, nxt_fd_event_t *ev)
276{
277 if (ev->read != NXT_EVENT_INACTIVE) {
278 ev->read = NXT_EVENT_INACTIVE;
279 nxt_kqueue_fd_set(engine, ev, EVFILT_READ, EV_DELETE);
280 }
281
282 if (ev->write != NXT_EVENT_INACTIVE) {
283 ev->write = NXT_EVENT_INACTIVE;
284 nxt_kqueue_fd_set(engine, ev, EVFILT_WRITE, EV_DELETE);
285 }
286}
287
288
289/*
290 * kqueue(2):
291 *
292 * Calling close() on a file descriptor will remove any kevents that
293 * reference the descriptor.
294 *
295 * So nxt_kqueue_close() returns true only if there are pending events.
296 */
297
298static nxt_bool_t
299nxt_kqueue_close(nxt_event_engine_t *engine, nxt_fd_event_t *ev)
300{
301 struct kevent *kev, *end;
302
303 ev->read = NXT_EVENT_INACTIVE;
304 ev->write = NXT_EVENT_INACTIVE;
305
306 end = &engine->u.kqueue.changes[engine->u.kqueue.nchanges];
307
308 for (kev = engine->u.kqueue.changes; kev < end; kev++) {
309 if (kev->ident == (uintptr_t) ev->fd) {
310 return 1;
311 }
312 }
313
314 return 0;
315}
316
317
318/*
319 * The kqueue event engine uses only three states: inactive, blocked, and
320 * active. An active oneshot event is marked as it is in the default
321 * state. The event will be converted eventually to the default EV_CLEAR
322 * mode after it will become inactive after delivery.
323 */
324
325static void
326nxt_kqueue_enable_read(nxt_event_engine_t *engine, nxt_fd_event_t *ev)
327{
328 if (ev->read == NXT_EVENT_INACTIVE) {
329 nxt_kqueue_fd_set(engine, ev, EVFILT_READ,
330 EV_ADD | EV_ENABLE | EV_CLEAR);
331 }
332
333 ev->read = NXT_EVENT_ACTIVE;
334}
335
336
337static void
338nxt_kqueue_enable_write(nxt_event_engine_t *engine, nxt_fd_event_t *ev)
339{
340 if (ev->write == NXT_EVENT_INACTIVE) {
341 nxt_kqueue_fd_set(engine, ev, EVFILT_WRITE,
342 EV_ADD | EV_ENABLE | EV_CLEAR);
343 }
344
345 ev->write = NXT_EVENT_ACTIVE;
346}
347
348
349static void
350nxt_kqueue_disable_read(nxt_event_engine_t *engine, nxt_fd_event_t *ev)
351{
352 ev->read = NXT_EVENT_INACTIVE;
353
354 nxt_kqueue_fd_set(engine, ev, EVFILT_READ, EV_DISABLE);
355}
356
357
358static void
359nxt_kqueue_disable_write(nxt_event_engine_t *engine, nxt_fd_event_t *ev)
360{
361 ev->write = NXT_EVENT_INACTIVE;
362
363 nxt_kqueue_fd_set(engine, ev, EVFILT_WRITE, EV_DISABLE);
364}
365
366
367static void
368nxt_kqueue_block_read(nxt_event_engine_t *engine, nxt_fd_event_t *ev)
369{
370 if (ev->read != NXT_EVENT_INACTIVE) {
371 ev->read = NXT_EVENT_BLOCKED;
372 }
373}
374
375
376static void
377nxt_kqueue_block_write(nxt_event_engine_t *engine, nxt_fd_event_t *ev)
378{
379 if (ev->write != NXT_EVENT_INACTIVE) {
380 ev->write = NXT_EVENT_BLOCKED;
381 }
382}
383
384
385static void
386nxt_kqueue_oneshot_read(nxt_event_engine_t *engine, nxt_fd_event_t *ev)
387{
388 ev->write = NXT_EVENT_ACTIVE;
389
390 nxt_kqueue_fd_set(engine, ev, EVFILT_WRITE,
391 EV_ADD | EV_ENABLE | NXT_KEVENT_ONESHOT);
392}
393
394
395static void
396nxt_kqueue_oneshot_write(nxt_event_engine_t *engine, nxt_fd_event_t *ev)
397{
398 ev->write = NXT_EVENT_ACTIVE;
399
400 nxt_kqueue_fd_set(engine, ev, EVFILT_WRITE,
401 EV_ADD | EV_ENABLE | NXT_KEVENT_ONESHOT);
402}
403
404
405static void
406nxt_kqueue_enable_accept(nxt_event_engine_t *engine, nxt_fd_event_t *ev)
407{
408 ev->read = NXT_EVENT_ACTIVE;
409 ev->read_handler = nxt_kqueue_listen_handler;
410
411 nxt_kqueue_fd_set(engine, ev, EVFILT_READ, EV_ADD | EV_ENABLE);
412}
413
414
415static void
416nxt_kqueue_enable_file(nxt_event_engine_t *engine, nxt_file_event_t *ev)
417{
418 struct kevent *kev;
419
420 const nxt_int_t flags = EV_ADD | EV_ENABLE | EV_ONESHOT;
421 const nxt_uint_t fflags = NOTE_DELETE | NOTE_WRITE | NOTE_EXTEND
422 | NOTE_ATTRIB | NOTE_RENAME | NOTE_REVOKE;
423
424 nxt_debug(&engine->task, "kevent(%d) set: id:%d ft:%i fl:%04Xd, ff:%04XuD",
425 engine->u.kqueue.fd, ev->file->fd, EVFILT_VNODE, flags, fflags);
426
427 kev = nxt_kqueue_get_kevent(engine);
428
429 kev->ident = ev->file->fd;
430 kev->filter = EVFILT_VNODE;
431 kev->flags = flags;
432 kev->fflags = fflags;
433 kev->data = 0;
434 kev->udata = nxt_kevent_set_udata(ev);
435}
436
437
438static void
439nxt_kqueue_close_file(nxt_event_engine_t *engine, nxt_file_event_t *ev)
440{
441 /* TODO: pending event. */
442}
443
444
445static void
446nxt_kqueue_fd_set(nxt_event_engine_t *engine, nxt_fd_event_t *ev,
447 nxt_int_t filter, nxt_uint_t flags)
448{
449 struct kevent *kev;
450
451 nxt_debug(ev->task, "kevent(%d) set event: id:%d ft:%i fl:%04Xui",
452 engine->u.kqueue.fd, ev->fd, filter, flags);
453
454 kev = nxt_kqueue_get_kevent(engine);
455
456 kev->ident = ev->fd;
457 kev->filter = filter;
458 kev->flags = flags;
459 kev->fflags = 0;
460 kev->data = 0;
461 kev->udata = nxt_kevent_set_udata(ev);
462}
463
464
465static struct kevent *
466nxt_kqueue_get_kevent(nxt_event_engine_t *engine)
467{
468 int ret, nchanges;
469
470 nchanges = engine->u.kqueue.nchanges;
471
472 if (nxt_slow_path(nchanges >= engine->u.kqueue.mchanges)) {
473
474 nxt_debug(&engine->task, "kevent(%d) changes:%d",
475 engine->u.kqueue.fd, nchanges);
476
477 ret = kevent(engine->u.kqueue.fd, engine->u.kqueue.changes, nchanges,
478 NULL, 0, NULL);
479
480 if (nxt_slow_path(ret != 0)) {
481 nxt_log(&engine->task, NXT_LOG_CRIT, "kevent(%d) failed %E",
482 engine->u.kqueue.fd, nxt_errno);
481 nxt_alert(&engine->task, "kevent(%d) failed %E",
482 engine->u.kqueue.fd, nxt_errno);
483
484 nxt_kqueue_error(engine);
485 }
486
487 engine->u.kqueue.nchanges = 0;
488 }
489
490 return &engine->u.kqueue.changes[engine->u.kqueue.nchanges++];
491}
492
493
494static void
495nxt_kqueue_error(nxt_event_engine_t *engine)
496{
497 struct kevent *kev, *end;
498 nxt_fd_event_t *ev;
499 nxt_file_event_t *fev;
500 nxt_work_queue_t *wq;
501
502 wq = &engine->fast_work_queue;
503 end = &engine->u.kqueue.changes[engine->u.kqueue.nchanges];
504
505 for (kev = engine->u.kqueue.changes; kev < end; kev++) {
506
507 switch (kev->filter) {
508
509 case EVFILT_READ:
510 case EVFILT_WRITE:
511 ev = nxt_kevent_get_udata(kev->udata);
512 nxt_work_queue_add(wq, nxt_kqueue_fd_error_handler,
513 ev->task, ev, ev->data);
514 break;
515
516 case EVFILT_VNODE:
517 fev = nxt_kevent_get_udata(kev->udata);
518 nxt_work_queue_add(wq, nxt_kqueue_file_error_handler,
519 fev->task, fev, fev->data);
520 break;
521 }
522 }
523}
524
525
526static void
527nxt_kqueue_fd_error_handler(nxt_task_t *task, void *obj, void *data)
528{
529 nxt_fd_event_t *ev;
530
531 ev = obj;
532
533 nxt_debug(task, "kqueue fd error handler fd:%d", ev->fd);
534
535 if (ev->kq_eof && ev->kq_errno != 0) {
536 ev->error = ev->kq_errno;
537 nxt_log(task, nxt_socket_error_level(ev->kq_errno),
538 "kevent() reported error on descriptor %d %E",
539 ev->fd, ev->kq_errno);
540 }
541
542 ev->read = NXT_EVENT_INACTIVE;
543 ev->write = NXT_EVENT_INACTIVE;
544 ev->error = ev->kq_errno;
545
546 ev->error_handler(task, ev, data);
547}
548
549
550static void
551nxt_kqueue_file_error_handler(nxt_task_t *task, void *obj, void *data)
552{
553 nxt_file_event_t *ev;
554
555 ev = obj;
556
557 nxt_debug(task, "kqueue file error handler fd:%d", ev->file->fd);
558
559 ev->handler(task, ev, data);
560}
561
562
563static nxt_int_t
564nxt_kqueue_add_signal(nxt_event_engine_t *engine, const nxt_sig_event_t *sigev)
565{
566 int signo;
567 struct kevent kev;
568 struct sigaction sa;
569
570 signo = sigev->signo;
571
572 nxt_memzero(&sa, sizeof(struct sigaction));
573 sigemptyset(&sa.sa_mask);
574
575 /*
576 * SIGCHLD must not be set to SIG_IGN, since kqueue cannot catch
577 * this signal. It should be set to SIG_DFL instead. And although
578 * SIGCHLD default action is also ignoring, nevertheless SIG_DFL
579 * allows kqueue to catch the signal.
580 */
581 sa.sa_handler = (signo == SIGCHLD) ? SIG_DFL : SIG_IGN;
582
583 if (sigaction(signo, &sa, NULL) != 0) {
483
484 nxt_kqueue_error(engine);
485 }
486
487 engine->u.kqueue.nchanges = 0;
488 }
489
490 return &engine->u.kqueue.changes[engine->u.kqueue.nchanges++];
491}
492
493
494static void
495nxt_kqueue_error(nxt_event_engine_t *engine)
496{
497 struct kevent *kev, *end;
498 nxt_fd_event_t *ev;
499 nxt_file_event_t *fev;
500 nxt_work_queue_t *wq;
501
502 wq = &engine->fast_work_queue;
503 end = &engine->u.kqueue.changes[engine->u.kqueue.nchanges];
504
505 for (kev = engine->u.kqueue.changes; kev < end; kev++) {
506
507 switch (kev->filter) {
508
509 case EVFILT_READ:
510 case EVFILT_WRITE:
511 ev = nxt_kevent_get_udata(kev->udata);
512 nxt_work_queue_add(wq, nxt_kqueue_fd_error_handler,
513 ev->task, ev, ev->data);
514 break;
515
516 case EVFILT_VNODE:
517 fev = nxt_kevent_get_udata(kev->udata);
518 nxt_work_queue_add(wq, nxt_kqueue_file_error_handler,
519 fev->task, fev, fev->data);
520 break;
521 }
522 }
523}
524
525
526static void
527nxt_kqueue_fd_error_handler(nxt_task_t *task, void *obj, void *data)
528{
529 nxt_fd_event_t *ev;
530
531 ev = obj;
532
533 nxt_debug(task, "kqueue fd error handler fd:%d", ev->fd);
534
535 if (ev->kq_eof && ev->kq_errno != 0) {
536 ev->error = ev->kq_errno;
537 nxt_log(task, nxt_socket_error_level(ev->kq_errno),
538 "kevent() reported error on descriptor %d %E",
539 ev->fd, ev->kq_errno);
540 }
541
542 ev->read = NXT_EVENT_INACTIVE;
543 ev->write = NXT_EVENT_INACTIVE;
544 ev->error = ev->kq_errno;
545
546 ev->error_handler(task, ev, data);
547}
548
549
550static void
551nxt_kqueue_file_error_handler(nxt_task_t *task, void *obj, void *data)
552{
553 nxt_file_event_t *ev;
554
555 ev = obj;
556
557 nxt_debug(task, "kqueue file error handler fd:%d", ev->file->fd);
558
559 ev->handler(task, ev, data);
560}
561
562
563static nxt_int_t
564nxt_kqueue_add_signal(nxt_event_engine_t *engine, const nxt_sig_event_t *sigev)
565{
566 int signo;
567 struct kevent kev;
568 struct sigaction sa;
569
570 signo = sigev->signo;
571
572 nxt_memzero(&sa, sizeof(struct sigaction));
573 sigemptyset(&sa.sa_mask);
574
575 /*
576 * SIGCHLD must not be set to SIG_IGN, since kqueue cannot catch
577 * this signal. It should be set to SIG_DFL instead. And although
578 * SIGCHLD default action is also ignoring, nevertheless SIG_DFL
579 * allows kqueue to catch the signal.
580 */
581 sa.sa_handler = (signo == SIGCHLD) ? SIG_DFL : SIG_IGN;
582
583 if (sigaction(signo, &sa, NULL) != 0) {
584 nxt_log(&engine->task, NXT_LOG_CRIT, "sigaction(%d) failed %E",
585 signo, nxt_errno);
584 nxt_alert(&engine->task, "sigaction(%d) failed %E", signo, nxt_errno);
586
587 return NXT_ERROR;
588 }
589
590 nxt_debug(&engine->task, "kevent(%d) signo:%d (%s)",
591 engine->u.kqueue.fd, signo, sigev->name);
592
593 kev.ident = signo;
594 kev.filter = EVFILT_SIGNAL;
595 kev.flags = EV_ADD;
596 kev.fflags = 0;
597 kev.data = 0;
598 kev.udata = nxt_kevent_set_udata(sigev);
599
600 if (kevent(engine->u.kqueue.fd, &kev, 1, NULL, 0, NULL) == 0) {
601 return NXT_OK;
602 }
603
585
586 return NXT_ERROR;
587 }
588
589 nxt_debug(&engine->task, "kevent(%d) signo:%d (%s)",
590 engine->u.kqueue.fd, signo, sigev->name);
591
592 kev.ident = signo;
593 kev.filter = EVFILT_SIGNAL;
594 kev.flags = EV_ADD;
595 kev.fflags = 0;
596 kev.data = 0;
597 kev.udata = nxt_kevent_set_udata(sigev);
598
599 if (kevent(engine->u.kqueue.fd, &kev, 1, NULL, 0, NULL) == 0) {
600 return NXT_OK;
601 }
602
604 nxt_log(&engine->task, NXT_LOG_CRIT, "kevent(%d) failed %E",
605 kqueue, nxt_errno);
603 nxt_alert(&engine->task, "kevent(%d) failed %E", kqueue, nxt_errno);
606
607 return NXT_ERROR;
608}
609
610
611#if (NXT_HAVE_EVFILT_USER)
612
613static nxt_int_t
614nxt_kqueue_enable_post(nxt_event_engine_t *engine, nxt_work_handler_t handler)
615{
616 struct kevent kev;
617
618 /* EVFILT_USER must be added to a kqueue before it can be triggered. */
619
620 kev.ident = 0;
621 kev.filter = EVFILT_USER;
622 kev.flags = EV_ADD | EV_CLEAR;
623 kev.fflags = 0;
624 kev.data = 0;
625 kev.udata = NULL;
626
627 engine->u.kqueue.post_handler = handler;
628
629 if (kevent(engine->u.kqueue.fd, &kev, 1, NULL, 0, NULL) == 0) {
630 return NXT_OK;
631 }
632
604
605 return NXT_ERROR;
606}
607
608
609#if (NXT_HAVE_EVFILT_USER)
610
611static nxt_int_t
612nxt_kqueue_enable_post(nxt_event_engine_t *engine, nxt_work_handler_t handler)
613{
614 struct kevent kev;
615
616 /* EVFILT_USER must be added to a kqueue before it can be triggered. */
617
618 kev.ident = 0;
619 kev.filter = EVFILT_USER;
620 kev.flags = EV_ADD | EV_CLEAR;
621 kev.fflags = 0;
622 kev.data = 0;
623 kev.udata = NULL;
624
625 engine->u.kqueue.post_handler = handler;
626
627 if (kevent(engine->u.kqueue.fd, &kev, 1, NULL, 0, NULL) == 0) {
628 return NXT_OK;
629 }
630
633 nxt_log(&engine->task, NXT_LOG_CRIT, "kevent(%d) failed %E",
634 engine->u.kqueue.fd, nxt_errno);
631 nxt_alert(&engine->task, "kevent(%d) failed %E",
632 engine->u.kqueue.fd, nxt_errno);
635
636 return NXT_ERROR;
637}
638
639
640static void
641nxt_kqueue_signal(nxt_event_engine_t *engine, nxt_uint_t signo)
642{
643 struct kevent kev;
644
645 /*
646 * kqueue has a builtin signal processing support, so the function
647 * is used only to post events and the signo argument is ignored.
648 */
649
650 kev.ident = 0;
651 kev.filter = EVFILT_USER;
652 kev.flags = 0;
653 kev.fflags = NOTE_TRIGGER;
654 kev.data = 0;
655 kev.udata = NULL;
656
657 if (kevent(engine->u.kqueue.fd, &kev, 1, NULL, 0, NULL) != 0) {
633
634 return NXT_ERROR;
635}
636
637
638static void
639nxt_kqueue_signal(nxt_event_engine_t *engine, nxt_uint_t signo)
640{
641 struct kevent kev;
642
643 /*
644 * kqueue has a builtin signal processing support, so the function
645 * is used only to post events and the signo argument is ignored.
646 */
647
648 kev.ident = 0;
649 kev.filter = EVFILT_USER;
650 kev.flags = 0;
651 kev.fflags = NOTE_TRIGGER;
652 kev.data = 0;
653 kev.udata = NULL;
654
655 if (kevent(engine->u.kqueue.fd, &kev, 1, NULL, 0, NULL) != 0) {
658 nxt_log(&engine->task, NXT_LOG_CRIT, "kevent(%d) failed %E",
659 engine->u.kqueue.fd, nxt_errno);
656 nxt_alert(&engine->task, "kevent(%d) failed %E",
657 engine->u.kqueue.fd, nxt_errno);
660 }
661}
662
663#endif
664
665
666static void
667nxt_kqueue_poll(nxt_event_engine_t *engine, nxt_msec_t timeout)
668{
669 int nevents;
670 void *obj, *data;
671 nxt_int_t i;
672 nxt_err_t err;
673 nxt_uint_t level;
674 nxt_bool_t error, eof;
675 nxt_task_t *task;
676 struct kevent *kev;
677 nxt_fd_event_t *ev;
678 nxt_sig_event_t *sigev;
679 struct timespec ts, *tp;
680 nxt_file_event_t *fev;
681 nxt_work_queue_t *wq;
682 nxt_work_handler_t handler;
683
684 if (timeout == NXT_INFINITE_MSEC) {
685 tp = NULL;
686
687 } else {
688 ts.tv_sec = timeout / 1000;
689 ts.tv_nsec = (timeout % 1000) * 1000000;
690 tp = &ts;
691 }
692
693 nxt_debug(&engine->task, "kevent(%d) changes:%d timeout:%M",
694 engine->u.kqueue.fd, engine->u.kqueue.nchanges, timeout);
695
696 nevents = kevent(engine->u.kqueue.fd,
697 engine->u.kqueue.changes, engine->u.kqueue.nchanges,
698 engine->u.kqueue.events, engine->u.kqueue.mevents, tp);
699
700 err = (nevents == -1) ? nxt_errno : 0;
701
702 nxt_thread_time_update(engine->task.thread);
703
704 nxt_debug(&engine->task, "kevent(%d): %d", engine->u.kqueue.fd, nevents);
705
706 if (nevents == -1) {
658 }
659}
660
661#endif
662
663
664static void
665nxt_kqueue_poll(nxt_event_engine_t *engine, nxt_msec_t timeout)
666{
667 int nevents;
668 void *obj, *data;
669 nxt_int_t i;
670 nxt_err_t err;
671 nxt_uint_t level;
672 nxt_bool_t error, eof;
673 nxt_task_t *task;
674 struct kevent *kev;
675 nxt_fd_event_t *ev;
676 nxt_sig_event_t *sigev;
677 struct timespec ts, *tp;
678 nxt_file_event_t *fev;
679 nxt_work_queue_t *wq;
680 nxt_work_handler_t handler;
681
682 if (timeout == NXT_INFINITE_MSEC) {
683 tp = NULL;
684
685 } else {
686 ts.tv_sec = timeout / 1000;
687 ts.tv_nsec = (timeout % 1000) * 1000000;
688 tp = &ts;
689 }
690
691 nxt_debug(&engine->task, "kevent(%d) changes:%d timeout:%M",
692 engine->u.kqueue.fd, engine->u.kqueue.nchanges, timeout);
693
694 nevents = kevent(engine->u.kqueue.fd,
695 engine->u.kqueue.changes, engine->u.kqueue.nchanges,
696 engine->u.kqueue.events, engine->u.kqueue.mevents, tp);
697
698 err = (nevents == -1) ? nxt_errno : 0;
699
700 nxt_thread_time_update(engine->task.thread);
701
702 nxt_debug(&engine->task, "kevent(%d): %d", engine->u.kqueue.fd, nevents);
703
704 if (nevents == -1) {
707 level = (err == NXT_EINTR) ? NXT_LOG_INFO : NXT_LOG_CRIT;
705 level = (err == NXT_EINTR) ? NXT_LOG_INFO : NXT_LOG_ALERT;
708
709 nxt_log(&engine->task, level, "kevent(%d) failed %E",
710 engine->u.kqueue.fd, err);
711
712 nxt_kqueue_error(engine);
713 return;
714 }
715
716 engine->u.kqueue.nchanges = 0;
717
718 for (i = 0; i < nevents; i++) {
719
720 kev = &engine->u.kqueue.events[i];
721
722 nxt_debug(&engine->task,
723 (kev->ident > 0x8000000 && kev->ident != (uintptr_t) -1) ?
724 "kevent: id:%p ft:%d fl:%04Xd ff:%d d:%d ud:%p":
725 "kevent: id:%d ft:%d fl:%04Xd ff:%d d:%d ud:%p",
726 kev->ident, kev->filter, kev->flags, kev->fflags,
727 kev->data, kev->udata);
728
729 error = (kev->flags & EV_ERROR);
730
731 if (nxt_slow_path(error)) {
706
707 nxt_log(&engine->task, level, "kevent(%d) failed %E",
708 engine->u.kqueue.fd, err);
709
710 nxt_kqueue_error(engine);
711 return;
712 }
713
714 engine->u.kqueue.nchanges = 0;
715
716 for (i = 0; i < nevents; i++) {
717
718 kev = &engine->u.kqueue.events[i];
719
720 nxt_debug(&engine->task,
721 (kev->ident > 0x8000000 && kev->ident != (uintptr_t) -1) ?
722 "kevent: id:%p ft:%d fl:%04Xd ff:%d d:%d ud:%p":
723 "kevent: id:%d ft:%d fl:%04Xd ff:%d d:%d ud:%p",
724 kev->ident, kev->filter, kev->flags, kev->fflags,
725 kev->data, kev->udata);
726
727 error = (kev->flags & EV_ERROR);
728
729 if (nxt_slow_path(error)) {
732 nxt_log(&engine->task, NXT_LOG_CRIT,
733 "kevent(%d) error %E on ident:%d filter:%d",
734 engine->u.kqueue.fd, kev->data, kev->ident, kev->filter);
730 nxt_alert(&engine->task,
731 "kevent(%d) error %E on ident:%d filter:%d",
732 engine->u.kqueue.fd, kev->data, kev->ident, kev->filter);
735 }
736
737 task = &engine->task;
738 wq = &engine->fast_work_queue;
739 handler = nxt_kqueue_fd_error_handler;
740 obj = nxt_kevent_get_udata(kev->udata);
741
742 switch (kev->filter) {
743
744 case EVFILT_READ:
745 ev = obj;
746 ev->read_ready = 1;
747 ev->kq_available = (int32_t) kev->data;
748 err = kev->fflags;
749 eof = (kev->flags & EV_EOF) != 0;
750 ev->kq_errno = err;
751 ev->kq_eof = eof;
752
753 if (ev->read <= NXT_EVENT_BLOCKED) {
754 nxt_debug(ev->task, "blocked read event fd:%d", ev->fd);
755 continue;
756 }
757
758 if ((kev->flags & NXT_KEVENT_ONESHOT) != 0) {
759 ev->read = NXT_EVENT_INACTIVE;
760 }
761
762 if (nxt_slow_path(ev->kq_available == 0 && eof && err != 0)) {
763 error = 1;
764 }
765
766 if (nxt_fast_path(!error)) {
767 handler = ev->read_handler;
768 wq = ev->read_work_queue;
769 }
770
771 task = ev->task;
772 data = ev->data;
773
774 break;
775
776 case EVFILT_WRITE:
777 ev = obj;
778 ev->write_ready = 1;
779 err = kev->fflags;
780 eof = (kev->flags & EV_EOF) != 0;
781 ev->kq_errno = err;
782 ev->kq_eof = eof;
783
784 if (ev->write <= NXT_EVENT_BLOCKED) {
785 nxt_debug(ev->task, "blocked write event fd:%d", ev->fd);
786 continue;
787 }
788
789 if ((kev->flags & NXT_KEVENT_ONESHOT) != 0) {
790 ev->write = NXT_EVENT_INACTIVE;
791 }
792
793 if (nxt_slow_path(eof && err != 0)) {
794 error = 1;
795 }
796
797 if (nxt_fast_path(!error)) {
798 handler = ev->write_handler;
799 wq = ev->write_work_queue;
800 }
801
802 task = ev->task;
803 data = ev->data;
804
805 break;
806
807 case EVFILT_VNODE:
808 fev = obj;
809 handler = fev->handler;
810 task = fev->task;
811 data = fev->data;
812 break;
813
814 case EVFILT_SIGNAL:
815 sigev = obj;
816 obj = (void *) kev->ident;
817 handler = sigev->handler;
818 data = (void *) sigev->name;
819 break;
820
821#if (NXT_HAVE_EVFILT_USER)
822
823 case EVFILT_USER:
824 handler = engine->u.kqueue.post_handler;
825 data = NULL;
826 break;
827
828#endif
829
830 default:
831
832#if (NXT_DEBUG)
733 }
734
735 task = &engine->task;
736 wq = &engine->fast_work_queue;
737 handler = nxt_kqueue_fd_error_handler;
738 obj = nxt_kevent_get_udata(kev->udata);
739
740 switch (kev->filter) {
741
742 case EVFILT_READ:
743 ev = obj;
744 ev->read_ready = 1;
745 ev->kq_available = (int32_t) kev->data;
746 err = kev->fflags;
747 eof = (kev->flags & EV_EOF) != 0;
748 ev->kq_errno = err;
749 ev->kq_eof = eof;
750
751 if (ev->read <= NXT_EVENT_BLOCKED) {
752 nxt_debug(ev->task, "blocked read event fd:%d", ev->fd);
753 continue;
754 }
755
756 if ((kev->flags & NXT_KEVENT_ONESHOT) != 0) {
757 ev->read = NXT_EVENT_INACTIVE;
758 }
759
760 if (nxt_slow_path(ev->kq_available == 0 && eof && err != 0)) {
761 error = 1;
762 }
763
764 if (nxt_fast_path(!error)) {
765 handler = ev->read_handler;
766 wq = ev->read_work_queue;
767 }
768
769 task = ev->task;
770 data = ev->data;
771
772 break;
773
774 case EVFILT_WRITE:
775 ev = obj;
776 ev->write_ready = 1;
777 err = kev->fflags;
778 eof = (kev->flags & EV_EOF) != 0;
779 ev->kq_errno = err;
780 ev->kq_eof = eof;
781
782 if (ev->write <= NXT_EVENT_BLOCKED) {
783 nxt_debug(ev->task, "blocked write event fd:%d", ev->fd);
784 continue;
785 }
786
787 if ((kev->flags & NXT_KEVENT_ONESHOT) != 0) {
788 ev->write = NXT_EVENT_INACTIVE;
789 }
790
791 if (nxt_slow_path(eof && err != 0)) {
792 error = 1;
793 }
794
795 if (nxt_fast_path(!error)) {
796 handler = ev->write_handler;
797 wq = ev->write_work_queue;
798 }
799
800 task = ev->task;
801 data = ev->data;
802
803 break;
804
805 case EVFILT_VNODE:
806 fev = obj;
807 handler = fev->handler;
808 task = fev->task;
809 data = fev->data;
810 break;
811
812 case EVFILT_SIGNAL:
813 sigev = obj;
814 obj = (void *) kev->ident;
815 handler = sigev->handler;
816 data = (void *) sigev->name;
817 break;
818
819#if (NXT_HAVE_EVFILT_USER)
820
821 case EVFILT_USER:
822 handler = engine->u.kqueue.post_handler;
823 data = NULL;
824 break;
825
826#endif
827
828 default:
829
830#if (NXT_DEBUG)
833 nxt_log(&engine->task, NXT_LOG_CRIT,
834 "unexpected kevent(%d) filter %d on ident %d",
835 engine->u.kqueue.fd, kev->filter, kev->ident);
831 nxt_alert(&engine->task,
832 "unexpected kevent(%d) filter %d on ident %d",
833 engine->u.kqueue.fd, kev->filter, kev->ident);
836#endif
837
838 continue;
839 }
840
841 nxt_work_queue_add(wq, handler, task, obj, data);
842 }
843}
844
845
846/*
847 * nxt_kqueue_event_conn_io_connect() eliminates the
848 * getsockopt() syscall to test pending connect() error.
849 */
850
851static void
852nxt_kqueue_conn_io_connect(nxt_task_t *task, void *obj, void *data)
853{
854 nxt_conn_t *c;
855 nxt_event_engine_t *engine;
856 nxt_work_handler_t handler;
857 const nxt_event_conn_state_t *state;
858
859 c = obj;
860
861 state = c->write_state;
862
863 switch (nxt_socket_connect(task, c->socket.fd, c->remote) ){
864
865 case NXT_OK:
866 c->socket.write_ready = 1;
867 handler = state->ready_handler;
868 break;
869
870 case NXT_AGAIN:
871 c->socket.write_handler = nxt_kqueue_conn_connected;
872 c->socket.error_handler = nxt_conn_connect_error;
873
874 engine = task->thread->engine;
875 nxt_conn_timer(engine, c, state, &c->write_timer);
876
877 nxt_kqueue_enable_write(engine, &c->socket);
878 return;
879
880 case NXT_DECLINED:
881 handler = state->close_handler;
882 break;
883
884 default: /* NXT_ERROR */
885 handler = state->error_handler;
886 break;
887 }
888
889 nxt_work_queue_add(c->write_work_queue, handler, task, c, data);
890}
891
892
893static void
894nxt_kqueue_conn_connected(nxt_task_t *task, void *obj, void *data)
895{
896 nxt_conn_t *c;
897
898 c = obj;
899
900 nxt_debug(task, "kqueue conn connected fd:%d", c->socket.fd);
901
902 c->socket.write = NXT_EVENT_BLOCKED;
903
904 if (c->write_state->timer_autoreset) {
905 nxt_timer_disable(task->thread->engine, &c->write_timer);
906 }
907
908 nxt_work_queue_add(c->write_work_queue, c->write_state->ready_handler,
909 task, c, data);
910}
911
912
913static void
914nxt_kqueue_listen_handler(nxt_task_t *task, void *obj, void *data)
915{
916 nxt_listen_event_t *lev;
917
918 lev = obj;
919
920 nxt_debug(task, "kevent fd:%d avail:%D",
921 lev->socket.fd, lev->socket.kq_available);
922
923 lev->ready = nxt_min(lev->batch, (uint32_t) lev->socket.kq_available);
924
925 nxt_kqueue_conn_io_accept(task, lev, data);
926}
927
928
929static void
930nxt_kqueue_conn_io_accept(nxt_task_t *task, void *obj, void *data)
931{
932 socklen_t socklen;
933 nxt_conn_t *c;
934 nxt_socket_t s;
935 struct sockaddr *sa;
936 nxt_listen_event_t *lev;
937
938 lev = obj;
939 c = lev->next;
940
941 lev->ready--;
942 lev->socket.read_ready = (lev->ready != 0);
943
944 lev->socket.kq_available--;
945 lev->socket.read_ready = (lev->socket.kq_available != 0);
946
947 sa = &c->remote->u.sockaddr;
948 socklen = c->remote->socklen;
949 /*
950 * The returned socklen is ignored here,
951 * see comment in nxt_conn_io_accept().
952 */
953 s = accept(lev->socket.fd, sa, &socklen);
954
955 if (s != -1) {
956 c->socket.fd = s;
957
958 nxt_debug(task, "accept(%d): %d", lev->socket.fd, s);
959
960 nxt_conn_accept(task, lev, c);
961 return;
962 }
963
964 nxt_conn_accept_error(task, lev, "accept", nxt_errno);
965}
966
967
968/*
969 * nxt_kqueue_conn_io_read() is just a wrapper to eliminate the
970 * readv() or recv() syscall if a remote side just closed connection.
971 */
972
973static void
974nxt_kqueue_conn_io_read(nxt_task_t *task, void *obj, void *data)
975{
976 nxt_conn_t *c;
977
978 c = obj;
979
980 nxt_debug(task, "kqueue conn read fd:%d", c->socket.fd);
981
982 if (c->socket.kq_available == 0 && c->socket.kq_eof) {
983 nxt_debug(task, "kevent fd:%d eof", c->socket.fd);
984
985 c->socket.closed = 1;
986 nxt_work_queue_add(c->read_work_queue, c->read_state->close_handler,
987 task, c, data);
988 return;
989 }
990
991 nxt_conn_io_read(task, c, data);
992}
993
994
995/*
996 * nxt_kqueue_conn_io_recvbuf() is just wrapper around standard
997 * nxt_conn_io_recvbuf() to eliminate the readv() or recv() syscalls
998 * if there is no pending data or a remote side closed connection.
999 */
1000
1001static ssize_t
1002nxt_kqueue_conn_io_recvbuf(nxt_conn_t *c, nxt_buf_t *b)
1003{
1004 ssize_t n;
1005
1006 if (c->socket.kq_available == 0 && c->socket.kq_eof) {
1007 c->socket.closed = 1;
1008 return 0;
1009 }
1010
1011 n = nxt_conn_io_recvbuf(c, b);
1012
1013 if (n > 0) {
1014 c->socket.kq_available -= n;
1015
1016 if (c->socket.kq_available < 0) {
1017 c->socket.kq_available = 0;
1018 }
1019
1020 nxt_debug(c->socket.task, "kevent fd:%d avail:%D eof:%d",
1021 c->socket.fd, c->socket.kq_available, c->socket.kq_eof);
1022
1023 c->socket.read_ready = (c->socket.kq_available != 0
1024 || c->socket.kq_eof);
1025 }
1026
1027 return n;
1028}
834#endif
835
836 continue;
837 }
838
839 nxt_work_queue_add(wq, handler, task, obj, data);
840 }
841}
842
843
844/*
845 * nxt_kqueue_event_conn_io_connect() eliminates the
846 * getsockopt() syscall to test pending connect() error.
847 */
848
849static void
850nxt_kqueue_conn_io_connect(nxt_task_t *task, void *obj, void *data)
851{
852 nxt_conn_t *c;
853 nxt_event_engine_t *engine;
854 nxt_work_handler_t handler;
855 const nxt_event_conn_state_t *state;
856
857 c = obj;
858
859 state = c->write_state;
860
861 switch (nxt_socket_connect(task, c->socket.fd, c->remote) ){
862
863 case NXT_OK:
864 c->socket.write_ready = 1;
865 handler = state->ready_handler;
866 break;
867
868 case NXT_AGAIN:
869 c->socket.write_handler = nxt_kqueue_conn_connected;
870 c->socket.error_handler = nxt_conn_connect_error;
871
872 engine = task->thread->engine;
873 nxt_conn_timer(engine, c, state, &c->write_timer);
874
875 nxt_kqueue_enable_write(engine, &c->socket);
876 return;
877
878 case NXT_DECLINED:
879 handler = state->close_handler;
880 break;
881
882 default: /* NXT_ERROR */
883 handler = state->error_handler;
884 break;
885 }
886
887 nxt_work_queue_add(c->write_work_queue, handler, task, c, data);
888}
889
890
891static void
892nxt_kqueue_conn_connected(nxt_task_t *task, void *obj, void *data)
893{
894 nxt_conn_t *c;
895
896 c = obj;
897
898 nxt_debug(task, "kqueue conn connected fd:%d", c->socket.fd);
899
900 c->socket.write = NXT_EVENT_BLOCKED;
901
902 if (c->write_state->timer_autoreset) {
903 nxt_timer_disable(task->thread->engine, &c->write_timer);
904 }
905
906 nxt_work_queue_add(c->write_work_queue, c->write_state->ready_handler,
907 task, c, data);
908}
909
910
911static void
912nxt_kqueue_listen_handler(nxt_task_t *task, void *obj, void *data)
913{
914 nxt_listen_event_t *lev;
915
916 lev = obj;
917
918 nxt_debug(task, "kevent fd:%d avail:%D",
919 lev->socket.fd, lev->socket.kq_available);
920
921 lev->ready = nxt_min(lev->batch, (uint32_t) lev->socket.kq_available);
922
923 nxt_kqueue_conn_io_accept(task, lev, data);
924}
925
926
927static void
928nxt_kqueue_conn_io_accept(nxt_task_t *task, void *obj, void *data)
929{
930 socklen_t socklen;
931 nxt_conn_t *c;
932 nxt_socket_t s;
933 struct sockaddr *sa;
934 nxt_listen_event_t *lev;
935
936 lev = obj;
937 c = lev->next;
938
939 lev->ready--;
940 lev->socket.read_ready = (lev->ready != 0);
941
942 lev->socket.kq_available--;
943 lev->socket.read_ready = (lev->socket.kq_available != 0);
944
945 sa = &c->remote->u.sockaddr;
946 socklen = c->remote->socklen;
947 /*
948 * The returned socklen is ignored here,
949 * see comment in nxt_conn_io_accept().
950 */
951 s = accept(lev->socket.fd, sa, &socklen);
952
953 if (s != -1) {
954 c->socket.fd = s;
955
956 nxt_debug(task, "accept(%d): %d", lev->socket.fd, s);
957
958 nxt_conn_accept(task, lev, c);
959 return;
960 }
961
962 nxt_conn_accept_error(task, lev, "accept", nxt_errno);
963}
964
965
966/*
967 * nxt_kqueue_conn_io_read() is just a wrapper to eliminate the
968 * readv() or recv() syscall if a remote side just closed connection.
969 */
970
971static void
972nxt_kqueue_conn_io_read(nxt_task_t *task, void *obj, void *data)
973{
974 nxt_conn_t *c;
975
976 c = obj;
977
978 nxt_debug(task, "kqueue conn read fd:%d", c->socket.fd);
979
980 if (c->socket.kq_available == 0 && c->socket.kq_eof) {
981 nxt_debug(task, "kevent fd:%d eof", c->socket.fd);
982
983 c->socket.closed = 1;
984 nxt_work_queue_add(c->read_work_queue, c->read_state->close_handler,
985 task, c, data);
986 return;
987 }
988
989 nxt_conn_io_read(task, c, data);
990}
991
992
993/*
994 * nxt_kqueue_conn_io_recvbuf() is just wrapper around standard
995 * nxt_conn_io_recvbuf() to eliminate the readv() or recv() syscalls
996 * if there is no pending data or a remote side closed connection.
997 */
998
999static ssize_t
1000nxt_kqueue_conn_io_recvbuf(nxt_conn_t *c, nxt_buf_t *b)
1001{
1002 ssize_t n;
1003
1004 if (c->socket.kq_available == 0 && c->socket.kq_eof) {
1005 c->socket.closed = 1;
1006 return 0;
1007 }
1008
1009 n = nxt_conn_io_recvbuf(c, b);
1010
1011 if (n > 0) {
1012 c->socket.kq_available -= n;
1013
1014 if (c->socket.kq_available < 0) {
1015 c->socket.kq_available = 0;
1016 }
1017
1018 nxt_debug(c->socket.task, "kevent fd:%d avail:%D eof:%d",
1019 c->socket.fd, c->socket.kq_available, c->socket.kq_eof);
1020
1021 c->socket.read_ready = (c->socket.kq_available != 0
1022 || c->socket.kq_eof);
1023 }
1024
1025 return n;
1026}