1 2/* 3 * Copyright (C) Igor Sysoev 4 * Copyright (C) NGINX, Inc. 5 */ 6 7#include <nxt_main.h> 8 9 10/* 11 * kqueue() has been introduced in FreeBSD 4.1 and then was ported 12 * to OpenBSD 2.9, MacOSX 10.3 (Panther), and NetBSD 2.0. 13 * DragonFlyBSD inherited it with FreeBSD 4 code base. 14 * 15 * NOTE_REVOKE has been introduced in FreeBSD 4.3 and then was ported 16 * to OpenBSD 2.9, MacOSX 10.3 (Panther), and NetBSD 2.0. 17 * DragonFlyBSD inherited it with FreeBSD 4 code base. 18 * 19 * EVFILT_TIMER has been introduced in FreeBSD 4.4-STABLE and then was 20 * ported to NetBSD 2.0, MacOSX 10.4 (Tiger), and OpenBSD 4.2. 21 * DragonFlyBSD inherited it with FreeBSD 4 code base. 22 * 23 * EVFILT_USER and EV_DISPATCH have been introduced in MacOSX 10.6 (Snow 24 * Leopard) as part of the Grand Central Dispatch framework 25 * and then were ported to FreeBSD 8.0-STABLE as part of the 26 * libdispatch support. 27 */ 28 29 30/* 31 * EV_DISPATCH is better because it just disables an event on delivery 32 * whilst EV_ONESHOT deletes the event. This eliminates in-kernel memory 33 * deallocation and probable subsequent allocation with a lock acquiring. 34 */ 35#ifdef EV_DISPATCH 36#define NXT_KEVENT_ONESHOT EV_DISPATCH 37#else 38#define NXT_KEVENT_ONESHOT EV_ONESHOT 39#endif 40 41 42#if (NXT_NETBSD) 43/* NetBSD defines the kevent.udata field as intptr_t. */ 44 45#define nxt_kevent_set_udata(udata) (intptr_t) (udata) 46#define nxt_kevent_get_udata(udata) (void *) (udata) 47 48#else 49#define nxt_kevent_set_udata(udata) (void *) (udata) 50#define nxt_kevent_get_udata(udata) (udata) 51#endif 52 53 54static nxt_int_t nxt_kqueue_create(nxt_event_engine_t *engine, 55 nxt_uint_t mchanges, nxt_uint_t mevents); 56static void nxt_kqueue_free(nxt_event_engine_t *engine); 57static void nxt_kqueue_enable(nxt_event_engine_t *engine, nxt_fd_event_t *ev); 58static void nxt_kqueue_disable(nxt_event_engine_t *engine, nxt_fd_event_t *ev); 59static void nxt_kqueue_delete(nxt_event_engine_t *engine, nxt_fd_event_t *ev); 60static nxt_bool_t nxt_kqueue_close(nxt_event_engine_t *engine, 61 nxt_fd_event_t *ev); 62static void nxt_kqueue_enable_read(nxt_event_engine_t *engine, 63 nxt_fd_event_t *ev); 64static void nxt_kqueue_enable_write(nxt_event_engine_t *engine, 65 nxt_fd_event_t *ev); 66static void nxt_kqueue_disable_read(nxt_event_engine_t *engine, 67 nxt_fd_event_t *ev); 68static void nxt_kqueue_disable_write(nxt_event_engine_t *engine, 69 nxt_fd_event_t *ev); 70static void nxt_kqueue_block_read(nxt_event_engine_t *engine, 71 nxt_fd_event_t *ev); 72static void nxt_kqueue_block_write(nxt_event_engine_t *engine, 73 nxt_fd_event_t *ev); 74static void nxt_kqueue_oneshot_read(nxt_event_engine_t *engine, 75 nxt_fd_event_t *ev); 76static void nxt_kqueue_oneshot_write(nxt_event_engine_t *engine, 77 nxt_fd_event_t *ev); 78static void nxt_kqueue_enable_accept(nxt_event_engine_t *engine, 79 nxt_fd_event_t *ev); 80static void nxt_kqueue_enable_file(nxt_event_engine_t *engine, 81 nxt_file_event_t *ev); 82static void nxt_kqueue_close_file(nxt_event_engine_t *engine, 83 nxt_file_event_t *ev); 84static void nxt_kqueue_fd_set(nxt_event_engine_t *engine, nxt_fd_event_t *ev, 85 nxt_int_t filter, nxt_uint_t flags); 86static struct kevent *nxt_kqueue_get_kevent(nxt_event_engine_t *engine); 87static void nxt_kqueue_error(nxt_event_engine_t *engine); 88static void nxt_kqueue_fd_error_handler(nxt_task_t *task, void *obj, 89 void *data); 90static void nxt_kqueue_file_error_handler(nxt_task_t *task, void *obj, 91 void *data); 92static nxt_int_t nxt_kqueue_add_signal(nxt_event_engine_t *engine, 93 const nxt_sig_event_t *sigev); 94#if (NXT_HAVE_EVFILT_USER) 95static nxt_int_t nxt_kqueue_enable_post(nxt_event_engine_t *engine, 96 nxt_work_handler_t handler); 97static void nxt_kqueue_signal(nxt_event_engine_t *engine, nxt_uint_t signo); 98#endif 99static void nxt_kqueue_poll(nxt_event_engine_t *engine, nxt_msec_t timeout); 100 101static void nxt_kqueue_conn_io_connect(nxt_task_t *task, void *obj, 102 void *data); 103static void nxt_kqueue_conn_connected(nxt_task_t *task, void *obj, 104 void *data); 105static void nxt_kqueue_listen_handler(nxt_task_t *task, void *obj, void *data); 106static void nxt_kqueue_conn_io_accept(nxt_task_t *task, void *obj, 107 void *data); 108static void nxt_kqueue_conn_io_read(nxt_task_t *task, void *obj, 109 void *data); 110static ssize_t nxt_kqueue_conn_io_recvbuf(nxt_conn_t *c, nxt_buf_t *b); 111 112 113static nxt_conn_io_t nxt_kqueue_conn_io = { 114 nxt_kqueue_conn_io_connect, 115 nxt_kqueue_conn_io_accept, 116 117 nxt_kqueue_conn_io_read, 118 nxt_kqueue_conn_io_recvbuf, 119 nxt_conn_io_recv, 120 121 nxt_conn_io_write, 122 nxt_event_conn_io_write_chunk, 123 124#if (NXT_HAVE_FREEBSD_SENDFILE) 125 nxt_freebsd_event_conn_io_sendfile, 126#elif (NXT_HAVE_MACOSX_SENDFILE) 127 nxt_macosx_event_conn_io_sendfile, 128#else 129 nxt_event_conn_io_sendbuf, 130#endif 131 132 nxt_event_conn_io_writev, 133 nxt_event_conn_io_send, 134 135 nxt_conn_io_shutdown, 136}; 137 138 139const nxt_event_interface_t nxt_kqueue_engine = { 140 "kqueue", 141 nxt_kqueue_create, 142 nxt_kqueue_free, 143 nxt_kqueue_enable, 144 nxt_kqueue_disable, 145 nxt_kqueue_delete, 146 nxt_kqueue_close, 147 nxt_kqueue_enable_read, 148 nxt_kqueue_enable_write, 149 nxt_kqueue_disable_read, 150 nxt_kqueue_disable_write, 151 nxt_kqueue_block_read, 152 nxt_kqueue_block_write, 153 nxt_kqueue_oneshot_read, 154 nxt_kqueue_oneshot_write, 155 nxt_kqueue_enable_accept, 156 nxt_kqueue_enable_file, 157 nxt_kqueue_close_file, 158#if (NXT_HAVE_EVFILT_USER) 159 nxt_kqueue_enable_post, 160 nxt_kqueue_signal, 161#else 162 NULL, 163 NULL, 164#endif 165 nxt_kqueue_poll, 166 167 &nxt_kqueue_conn_io, 168 169 NXT_FILE_EVENTS, 170 NXT_SIGNAL_EVENTS, 171}; 172 173 174static nxt_int_t 175nxt_kqueue_create(nxt_event_engine_t *engine, nxt_uint_t mchanges, 176 nxt_uint_t mevents) 177{ 178 const nxt_sig_event_t *sigev; 179 180 engine->u.kqueue.fd = -1; 181 engine->u.kqueue.mchanges = mchanges; 182 engine->u.kqueue.mevents = mevents; 183 engine->u.kqueue.pid = nxt_pid; 184 185 engine->u.kqueue.changes = nxt_malloc(sizeof(struct kevent) * mchanges); 186 if (engine->u.kqueue.changes == NULL) { 187 goto fail; 188 } 189 190 engine->u.kqueue.events = nxt_malloc(sizeof(struct kevent) * mevents); 191 if (engine->u.kqueue.events == NULL) { 192 goto fail; 193 } 194 195 engine->u.kqueue.fd = kqueue(); 196 if (engine->u.kqueue.fd == -1) {
| 1 2/* 3 * Copyright (C) Igor Sysoev 4 * Copyright (C) NGINX, Inc. 5 */ 6 7#include <nxt_main.h> 8 9 10/* 11 * kqueue() has been introduced in FreeBSD 4.1 and then was ported 12 * to OpenBSD 2.9, MacOSX 10.3 (Panther), and NetBSD 2.0. 13 * DragonFlyBSD inherited it with FreeBSD 4 code base. 14 * 15 * NOTE_REVOKE has been introduced in FreeBSD 4.3 and then was ported 16 * to OpenBSD 2.9, MacOSX 10.3 (Panther), and NetBSD 2.0. 17 * DragonFlyBSD inherited it with FreeBSD 4 code base. 18 * 19 * EVFILT_TIMER has been introduced in FreeBSD 4.4-STABLE and then was 20 * ported to NetBSD 2.0, MacOSX 10.4 (Tiger), and OpenBSD 4.2. 21 * DragonFlyBSD inherited it with FreeBSD 4 code base. 22 * 23 * EVFILT_USER and EV_DISPATCH have been introduced in MacOSX 10.6 (Snow 24 * Leopard) as part of the Grand Central Dispatch framework 25 * and then were ported to FreeBSD 8.0-STABLE as part of the 26 * libdispatch support. 27 */ 28 29 30/* 31 * EV_DISPATCH is better because it just disables an event on delivery 32 * whilst EV_ONESHOT deletes the event. This eliminates in-kernel memory 33 * deallocation and probable subsequent allocation with a lock acquiring. 34 */ 35#ifdef EV_DISPATCH 36#define NXT_KEVENT_ONESHOT EV_DISPATCH 37#else 38#define NXT_KEVENT_ONESHOT EV_ONESHOT 39#endif 40 41 42#if (NXT_NETBSD) 43/* NetBSD defines the kevent.udata field as intptr_t. */ 44 45#define nxt_kevent_set_udata(udata) (intptr_t) (udata) 46#define nxt_kevent_get_udata(udata) (void *) (udata) 47 48#else 49#define nxt_kevent_set_udata(udata) (void *) (udata) 50#define nxt_kevent_get_udata(udata) (udata) 51#endif 52 53 54static nxt_int_t nxt_kqueue_create(nxt_event_engine_t *engine, 55 nxt_uint_t mchanges, nxt_uint_t mevents); 56static void nxt_kqueue_free(nxt_event_engine_t *engine); 57static void nxt_kqueue_enable(nxt_event_engine_t *engine, nxt_fd_event_t *ev); 58static void nxt_kqueue_disable(nxt_event_engine_t *engine, nxt_fd_event_t *ev); 59static void nxt_kqueue_delete(nxt_event_engine_t *engine, nxt_fd_event_t *ev); 60static nxt_bool_t nxt_kqueue_close(nxt_event_engine_t *engine, 61 nxt_fd_event_t *ev); 62static void nxt_kqueue_enable_read(nxt_event_engine_t *engine, 63 nxt_fd_event_t *ev); 64static void nxt_kqueue_enable_write(nxt_event_engine_t *engine, 65 nxt_fd_event_t *ev); 66static void nxt_kqueue_disable_read(nxt_event_engine_t *engine, 67 nxt_fd_event_t *ev); 68static void nxt_kqueue_disable_write(nxt_event_engine_t *engine, 69 nxt_fd_event_t *ev); 70static void nxt_kqueue_block_read(nxt_event_engine_t *engine, 71 nxt_fd_event_t *ev); 72static void nxt_kqueue_block_write(nxt_event_engine_t *engine, 73 nxt_fd_event_t *ev); 74static void nxt_kqueue_oneshot_read(nxt_event_engine_t *engine, 75 nxt_fd_event_t *ev); 76static void nxt_kqueue_oneshot_write(nxt_event_engine_t *engine, 77 nxt_fd_event_t *ev); 78static void nxt_kqueue_enable_accept(nxt_event_engine_t *engine, 79 nxt_fd_event_t *ev); 80static void nxt_kqueue_enable_file(nxt_event_engine_t *engine, 81 nxt_file_event_t *ev); 82static void nxt_kqueue_close_file(nxt_event_engine_t *engine, 83 nxt_file_event_t *ev); 84static void nxt_kqueue_fd_set(nxt_event_engine_t *engine, nxt_fd_event_t *ev, 85 nxt_int_t filter, nxt_uint_t flags); 86static struct kevent *nxt_kqueue_get_kevent(nxt_event_engine_t *engine); 87static void nxt_kqueue_error(nxt_event_engine_t *engine); 88static void nxt_kqueue_fd_error_handler(nxt_task_t *task, void *obj, 89 void *data); 90static void nxt_kqueue_file_error_handler(nxt_task_t *task, void *obj, 91 void *data); 92static nxt_int_t nxt_kqueue_add_signal(nxt_event_engine_t *engine, 93 const nxt_sig_event_t *sigev); 94#if (NXT_HAVE_EVFILT_USER) 95static nxt_int_t nxt_kqueue_enable_post(nxt_event_engine_t *engine, 96 nxt_work_handler_t handler); 97static void nxt_kqueue_signal(nxt_event_engine_t *engine, nxt_uint_t signo); 98#endif 99static void nxt_kqueue_poll(nxt_event_engine_t *engine, nxt_msec_t timeout); 100 101static void nxt_kqueue_conn_io_connect(nxt_task_t *task, void *obj, 102 void *data); 103static void nxt_kqueue_conn_connected(nxt_task_t *task, void *obj, 104 void *data); 105static void nxt_kqueue_listen_handler(nxt_task_t *task, void *obj, void *data); 106static void nxt_kqueue_conn_io_accept(nxt_task_t *task, void *obj, 107 void *data); 108static void nxt_kqueue_conn_io_read(nxt_task_t *task, void *obj, 109 void *data); 110static ssize_t nxt_kqueue_conn_io_recvbuf(nxt_conn_t *c, nxt_buf_t *b); 111 112 113static nxt_conn_io_t nxt_kqueue_conn_io = { 114 nxt_kqueue_conn_io_connect, 115 nxt_kqueue_conn_io_accept, 116 117 nxt_kqueue_conn_io_read, 118 nxt_kqueue_conn_io_recvbuf, 119 nxt_conn_io_recv, 120 121 nxt_conn_io_write, 122 nxt_event_conn_io_write_chunk, 123 124#if (NXT_HAVE_FREEBSD_SENDFILE) 125 nxt_freebsd_event_conn_io_sendfile, 126#elif (NXT_HAVE_MACOSX_SENDFILE) 127 nxt_macosx_event_conn_io_sendfile, 128#else 129 nxt_event_conn_io_sendbuf, 130#endif 131 132 nxt_event_conn_io_writev, 133 nxt_event_conn_io_send, 134 135 nxt_conn_io_shutdown, 136}; 137 138 139const nxt_event_interface_t nxt_kqueue_engine = { 140 "kqueue", 141 nxt_kqueue_create, 142 nxt_kqueue_free, 143 nxt_kqueue_enable, 144 nxt_kqueue_disable, 145 nxt_kqueue_delete, 146 nxt_kqueue_close, 147 nxt_kqueue_enable_read, 148 nxt_kqueue_enable_write, 149 nxt_kqueue_disable_read, 150 nxt_kqueue_disable_write, 151 nxt_kqueue_block_read, 152 nxt_kqueue_block_write, 153 nxt_kqueue_oneshot_read, 154 nxt_kqueue_oneshot_write, 155 nxt_kqueue_enable_accept, 156 nxt_kqueue_enable_file, 157 nxt_kqueue_close_file, 158#if (NXT_HAVE_EVFILT_USER) 159 nxt_kqueue_enable_post, 160 nxt_kqueue_signal, 161#else 162 NULL, 163 NULL, 164#endif 165 nxt_kqueue_poll, 166 167 &nxt_kqueue_conn_io, 168 169 NXT_FILE_EVENTS, 170 NXT_SIGNAL_EVENTS, 171}; 172 173 174static nxt_int_t 175nxt_kqueue_create(nxt_event_engine_t *engine, nxt_uint_t mchanges, 176 nxt_uint_t mevents) 177{ 178 const nxt_sig_event_t *sigev; 179 180 engine->u.kqueue.fd = -1; 181 engine->u.kqueue.mchanges = mchanges; 182 engine->u.kqueue.mevents = mevents; 183 engine->u.kqueue.pid = nxt_pid; 184 185 engine->u.kqueue.changes = nxt_malloc(sizeof(struct kevent) * mchanges); 186 if (engine->u.kqueue.changes == NULL) { 187 goto fail; 188 } 189 190 engine->u.kqueue.events = nxt_malloc(sizeof(struct kevent) * mevents); 191 if (engine->u.kqueue.events == NULL) { 192 goto fail; 193 } 194 195 engine->u.kqueue.fd = kqueue(); 196 if (engine->u.kqueue.fd == -1) {
|
197 nxt_log(&engine->task, NXT_LOG_CRIT, "kqueue() failed %E", nxt_errno);
| 197 nxt_alert(&engine->task, "kqueue() failed %E", nxt_errno);
|
198 goto fail; 199 } 200 201 nxt_debug(&engine->task, "kqueue(): %d", engine->u.kqueue.fd); 202 203 if (engine->signals != NULL) { 204 for (sigev = engine->signals->sigev; sigev->signo != 0; sigev++) { 205 if (nxt_kqueue_add_signal(engine, sigev) != NXT_OK) { 206 goto fail; 207 } 208 } 209 } 210 211 return NXT_OK; 212 213fail: 214 215 nxt_kqueue_free(engine); 216 217 return NXT_ERROR; 218} 219 220 221static void 222nxt_kqueue_free(nxt_event_engine_t *engine) 223{ 224 nxt_fd_t fd; 225 226 fd = engine->u.kqueue.fd; 227 228 nxt_debug(&engine->task, "kqueue %d free", fd); 229 230 if (fd != -1 && engine->u.kqueue.pid == nxt_pid) { 231 /* kqueue is not inherited by fork() */ 232 233 if (close(fd) != 0) {
| 198 goto fail; 199 } 200 201 nxt_debug(&engine->task, "kqueue(): %d", engine->u.kqueue.fd); 202 203 if (engine->signals != NULL) { 204 for (sigev = engine->signals->sigev; sigev->signo != 0; sigev++) { 205 if (nxt_kqueue_add_signal(engine, sigev) != NXT_OK) { 206 goto fail; 207 } 208 } 209 } 210 211 return NXT_OK; 212 213fail: 214 215 nxt_kqueue_free(engine); 216 217 return NXT_ERROR; 218} 219 220 221static void 222nxt_kqueue_free(nxt_event_engine_t *engine) 223{ 224 nxt_fd_t fd; 225 226 fd = engine->u.kqueue.fd; 227 228 nxt_debug(&engine->task, "kqueue %d free", fd); 229 230 if (fd != -1 && engine->u.kqueue.pid == nxt_pid) { 231 /* kqueue is not inherited by fork() */ 232 233 if (close(fd) != 0) {
|
234 nxt_log(&engine->task, NXT_LOG_CRIT, "kqueue close(%d) failed %E", 235 fd, nxt_errno);
| 234 nxt_alert(&engine->task, "kqueue close(%d) failed %E", 235 fd, nxt_errno);
|
236 } 237 } 238 239 nxt_free(engine->u.kqueue.events); 240 nxt_free(engine->u.kqueue.changes); 241 242 nxt_memzero(&engine->u.kqueue, sizeof(nxt_kqueue_engine_t)); 243} 244 245 246static void 247nxt_kqueue_enable(nxt_event_engine_t *engine, nxt_fd_event_t *ev) 248{ 249 nxt_kqueue_enable_read(engine, ev); 250 nxt_kqueue_enable_write(engine, ev); 251} 252 253 254/* 255 * EV_DISABLE is better because it eliminates in-kernel memory 256 * deallocation and probable subsequent allocation with a lock acquiring. 257 */ 258 259static void 260nxt_kqueue_disable(nxt_event_engine_t *engine, nxt_fd_event_t *ev) 261{ 262 if (ev->read != NXT_EVENT_INACTIVE) { 263 ev->read = NXT_EVENT_INACTIVE; 264 nxt_kqueue_fd_set(engine, ev, EVFILT_READ, EV_DISABLE); 265 } 266 267 if (ev->write != NXT_EVENT_INACTIVE) { 268 ev->write = NXT_EVENT_INACTIVE; 269 nxt_kqueue_fd_set(engine, ev, EVFILT_WRITE, EV_DISABLE); 270 } 271} 272 273 274static void 275nxt_kqueue_delete(nxt_event_engine_t *engine, nxt_fd_event_t *ev) 276{ 277 if (ev->read != NXT_EVENT_INACTIVE) { 278 ev->read = NXT_EVENT_INACTIVE; 279 nxt_kqueue_fd_set(engine, ev, EVFILT_READ, EV_DELETE); 280 } 281 282 if (ev->write != NXT_EVENT_INACTIVE) { 283 ev->write = NXT_EVENT_INACTIVE; 284 nxt_kqueue_fd_set(engine, ev, EVFILT_WRITE, EV_DELETE); 285 } 286} 287 288 289/* 290 * kqueue(2): 291 * 292 * Calling close() on a file descriptor will remove any kevents that 293 * reference the descriptor. 294 * 295 * So nxt_kqueue_close() returns true only if there are pending events. 296 */ 297 298static nxt_bool_t 299nxt_kqueue_close(nxt_event_engine_t *engine, nxt_fd_event_t *ev) 300{ 301 struct kevent *kev, *end; 302 303 ev->read = NXT_EVENT_INACTIVE; 304 ev->write = NXT_EVENT_INACTIVE; 305 306 end = &engine->u.kqueue.changes[engine->u.kqueue.nchanges]; 307 308 for (kev = engine->u.kqueue.changes; kev < end; kev++) { 309 if (kev->ident == (uintptr_t) ev->fd) { 310 return 1; 311 } 312 } 313 314 return 0; 315} 316 317 318/* 319 * The kqueue event engine uses only three states: inactive, blocked, and 320 * active. An active oneshot event is marked as it is in the default 321 * state. The event will be converted eventually to the default EV_CLEAR 322 * mode after it will become inactive after delivery. 323 */ 324 325static void 326nxt_kqueue_enable_read(nxt_event_engine_t *engine, nxt_fd_event_t *ev) 327{ 328 if (ev->read == NXT_EVENT_INACTIVE) { 329 nxt_kqueue_fd_set(engine, ev, EVFILT_READ, 330 EV_ADD | EV_ENABLE | EV_CLEAR); 331 } 332 333 ev->read = NXT_EVENT_ACTIVE; 334} 335 336 337static void 338nxt_kqueue_enable_write(nxt_event_engine_t *engine, nxt_fd_event_t *ev) 339{ 340 if (ev->write == NXT_EVENT_INACTIVE) { 341 nxt_kqueue_fd_set(engine, ev, EVFILT_WRITE, 342 EV_ADD | EV_ENABLE | EV_CLEAR); 343 } 344 345 ev->write = NXT_EVENT_ACTIVE; 346} 347 348 349static void 350nxt_kqueue_disable_read(nxt_event_engine_t *engine, nxt_fd_event_t *ev) 351{ 352 ev->read = NXT_EVENT_INACTIVE; 353 354 nxt_kqueue_fd_set(engine, ev, EVFILT_READ, EV_DISABLE); 355} 356 357 358static void 359nxt_kqueue_disable_write(nxt_event_engine_t *engine, nxt_fd_event_t *ev) 360{ 361 ev->write = NXT_EVENT_INACTIVE; 362 363 nxt_kqueue_fd_set(engine, ev, EVFILT_WRITE, EV_DISABLE); 364} 365 366 367static void 368nxt_kqueue_block_read(nxt_event_engine_t *engine, nxt_fd_event_t *ev) 369{ 370 if (ev->read != NXT_EVENT_INACTIVE) { 371 ev->read = NXT_EVENT_BLOCKED; 372 } 373} 374 375 376static void 377nxt_kqueue_block_write(nxt_event_engine_t *engine, nxt_fd_event_t *ev) 378{ 379 if (ev->write != NXT_EVENT_INACTIVE) { 380 ev->write = NXT_EVENT_BLOCKED; 381 } 382} 383 384 385static void 386nxt_kqueue_oneshot_read(nxt_event_engine_t *engine, nxt_fd_event_t *ev) 387{ 388 ev->write = NXT_EVENT_ACTIVE; 389 390 nxt_kqueue_fd_set(engine, ev, EVFILT_WRITE, 391 EV_ADD | EV_ENABLE | NXT_KEVENT_ONESHOT); 392} 393 394 395static void 396nxt_kqueue_oneshot_write(nxt_event_engine_t *engine, nxt_fd_event_t *ev) 397{ 398 ev->write = NXT_EVENT_ACTIVE; 399 400 nxt_kqueue_fd_set(engine, ev, EVFILT_WRITE, 401 EV_ADD | EV_ENABLE | NXT_KEVENT_ONESHOT); 402} 403 404 405static void 406nxt_kqueue_enable_accept(nxt_event_engine_t *engine, nxt_fd_event_t *ev) 407{ 408 ev->read = NXT_EVENT_ACTIVE; 409 ev->read_handler = nxt_kqueue_listen_handler; 410 411 nxt_kqueue_fd_set(engine, ev, EVFILT_READ, EV_ADD | EV_ENABLE); 412} 413 414 415static void 416nxt_kqueue_enable_file(nxt_event_engine_t *engine, nxt_file_event_t *ev) 417{ 418 struct kevent *kev; 419 420 const nxt_int_t flags = EV_ADD | EV_ENABLE | EV_ONESHOT; 421 const nxt_uint_t fflags = NOTE_DELETE | NOTE_WRITE | NOTE_EXTEND 422 | NOTE_ATTRIB | NOTE_RENAME | NOTE_REVOKE; 423 424 nxt_debug(&engine->task, "kevent(%d) set: id:%d ft:%i fl:%04Xd, ff:%04XuD", 425 engine->u.kqueue.fd, ev->file->fd, EVFILT_VNODE, flags, fflags); 426 427 kev = nxt_kqueue_get_kevent(engine); 428 429 kev->ident = ev->file->fd; 430 kev->filter = EVFILT_VNODE; 431 kev->flags = flags; 432 kev->fflags = fflags; 433 kev->data = 0; 434 kev->udata = nxt_kevent_set_udata(ev); 435} 436 437 438static void 439nxt_kqueue_close_file(nxt_event_engine_t *engine, nxt_file_event_t *ev) 440{ 441 /* TODO: pending event. */ 442} 443 444 445static void 446nxt_kqueue_fd_set(nxt_event_engine_t *engine, nxt_fd_event_t *ev, 447 nxt_int_t filter, nxt_uint_t flags) 448{ 449 struct kevent *kev; 450 451 nxt_debug(ev->task, "kevent(%d) set event: id:%d ft:%i fl:%04Xui", 452 engine->u.kqueue.fd, ev->fd, filter, flags); 453 454 kev = nxt_kqueue_get_kevent(engine); 455 456 kev->ident = ev->fd; 457 kev->filter = filter; 458 kev->flags = flags; 459 kev->fflags = 0; 460 kev->data = 0; 461 kev->udata = nxt_kevent_set_udata(ev); 462} 463 464 465static struct kevent * 466nxt_kqueue_get_kevent(nxt_event_engine_t *engine) 467{ 468 int ret, nchanges; 469 470 nchanges = engine->u.kqueue.nchanges; 471 472 if (nxt_slow_path(nchanges >= engine->u.kqueue.mchanges)) { 473 474 nxt_debug(&engine->task, "kevent(%d) changes:%d", 475 engine->u.kqueue.fd, nchanges); 476 477 ret = kevent(engine->u.kqueue.fd, engine->u.kqueue.changes, nchanges, 478 NULL, 0, NULL); 479 480 if (nxt_slow_path(ret != 0)) {
| 236 } 237 } 238 239 nxt_free(engine->u.kqueue.events); 240 nxt_free(engine->u.kqueue.changes); 241 242 nxt_memzero(&engine->u.kqueue, sizeof(nxt_kqueue_engine_t)); 243} 244 245 246static void 247nxt_kqueue_enable(nxt_event_engine_t *engine, nxt_fd_event_t *ev) 248{ 249 nxt_kqueue_enable_read(engine, ev); 250 nxt_kqueue_enable_write(engine, ev); 251} 252 253 254/* 255 * EV_DISABLE is better because it eliminates in-kernel memory 256 * deallocation and probable subsequent allocation with a lock acquiring. 257 */ 258 259static void 260nxt_kqueue_disable(nxt_event_engine_t *engine, nxt_fd_event_t *ev) 261{ 262 if (ev->read != NXT_EVENT_INACTIVE) { 263 ev->read = NXT_EVENT_INACTIVE; 264 nxt_kqueue_fd_set(engine, ev, EVFILT_READ, EV_DISABLE); 265 } 266 267 if (ev->write != NXT_EVENT_INACTIVE) { 268 ev->write = NXT_EVENT_INACTIVE; 269 nxt_kqueue_fd_set(engine, ev, EVFILT_WRITE, EV_DISABLE); 270 } 271} 272 273 274static void 275nxt_kqueue_delete(nxt_event_engine_t *engine, nxt_fd_event_t *ev) 276{ 277 if (ev->read != NXT_EVENT_INACTIVE) { 278 ev->read = NXT_EVENT_INACTIVE; 279 nxt_kqueue_fd_set(engine, ev, EVFILT_READ, EV_DELETE); 280 } 281 282 if (ev->write != NXT_EVENT_INACTIVE) { 283 ev->write = NXT_EVENT_INACTIVE; 284 nxt_kqueue_fd_set(engine, ev, EVFILT_WRITE, EV_DELETE); 285 } 286} 287 288 289/* 290 * kqueue(2): 291 * 292 * Calling close() on a file descriptor will remove any kevents that 293 * reference the descriptor. 294 * 295 * So nxt_kqueue_close() returns true only if there are pending events. 296 */ 297 298static nxt_bool_t 299nxt_kqueue_close(nxt_event_engine_t *engine, nxt_fd_event_t *ev) 300{ 301 struct kevent *kev, *end; 302 303 ev->read = NXT_EVENT_INACTIVE; 304 ev->write = NXT_EVENT_INACTIVE; 305 306 end = &engine->u.kqueue.changes[engine->u.kqueue.nchanges]; 307 308 for (kev = engine->u.kqueue.changes; kev < end; kev++) { 309 if (kev->ident == (uintptr_t) ev->fd) { 310 return 1; 311 } 312 } 313 314 return 0; 315} 316 317 318/* 319 * The kqueue event engine uses only three states: inactive, blocked, and 320 * active. An active oneshot event is marked as it is in the default 321 * state. The event will be converted eventually to the default EV_CLEAR 322 * mode after it will become inactive after delivery. 323 */ 324 325static void 326nxt_kqueue_enable_read(nxt_event_engine_t *engine, nxt_fd_event_t *ev) 327{ 328 if (ev->read == NXT_EVENT_INACTIVE) { 329 nxt_kqueue_fd_set(engine, ev, EVFILT_READ, 330 EV_ADD | EV_ENABLE | EV_CLEAR); 331 } 332 333 ev->read = NXT_EVENT_ACTIVE; 334} 335 336 337static void 338nxt_kqueue_enable_write(nxt_event_engine_t *engine, nxt_fd_event_t *ev) 339{ 340 if (ev->write == NXT_EVENT_INACTIVE) { 341 nxt_kqueue_fd_set(engine, ev, EVFILT_WRITE, 342 EV_ADD | EV_ENABLE | EV_CLEAR); 343 } 344 345 ev->write = NXT_EVENT_ACTIVE; 346} 347 348 349static void 350nxt_kqueue_disable_read(nxt_event_engine_t *engine, nxt_fd_event_t *ev) 351{ 352 ev->read = NXT_EVENT_INACTIVE; 353 354 nxt_kqueue_fd_set(engine, ev, EVFILT_READ, EV_DISABLE); 355} 356 357 358static void 359nxt_kqueue_disable_write(nxt_event_engine_t *engine, nxt_fd_event_t *ev) 360{ 361 ev->write = NXT_EVENT_INACTIVE; 362 363 nxt_kqueue_fd_set(engine, ev, EVFILT_WRITE, EV_DISABLE); 364} 365 366 367static void 368nxt_kqueue_block_read(nxt_event_engine_t *engine, nxt_fd_event_t *ev) 369{ 370 if (ev->read != NXT_EVENT_INACTIVE) { 371 ev->read = NXT_EVENT_BLOCKED; 372 } 373} 374 375 376static void 377nxt_kqueue_block_write(nxt_event_engine_t *engine, nxt_fd_event_t *ev) 378{ 379 if (ev->write != NXT_EVENT_INACTIVE) { 380 ev->write = NXT_EVENT_BLOCKED; 381 } 382} 383 384 385static void 386nxt_kqueue_oneshot_read(nxt_event_engine_t *engine, nxt_fd_event_t *ev) 387{ 388 ev->write = NXT_EVENT_ACTIVE; 389 390 nxt_kqueue_fd_set(engine, ev, EVFILT_WRITE, 391 EV_ADD | EV_ENABLE | NXT_KEVENT_ONESHOT); 392} 393 394 395static void 396nxt_kqueue_oneshot_write(nxt_event_engine_t *engine, nxt_fd_event_t *ev) 397{ 398 ev->write = NXT_EVENT_ACTIVE; 399 400 nxt_kqueue_fd_set(engine, ev, EVFILT_WRITE, 401 EV_ADD | EV_ENABLE | NXT_KEVENT_ONESHOT); 402} 403 404 405static void 406nxt_kqueue_enable_accept(nxt_event_engine_t *engine, nxt_fd_event_t *ev) 407{ 408 ev->read = NXT_EVENT_ACTIVE; 409 ev->read_handler = nxt_kqueue_listen_handler; 410 411 nxt_kqueue_fd_set(engine, ev, EVFILT_READ, EV_ADD | EV_ENABLE); 412} 413 414 415static void 416nxt_kqueue_enable_file(nxt_event_engine_t *engine, nxt_file_event_t *ev) 417{ 418 struct kevent *kev; 419 420 const nxt_int_t flags = EV_ADD | EV_ENABLE | EV_ONESHOT; 421 const nxt_uint_t fflags = NOTE_DELETE | NOTE_WRITE | NOTE_EXTEND 422 | NOTE_ATTRIB | NOTE_RENAME | NOTE_REVOKE; 423 424 nxt_debug(&engine->task, "kevent(%d) set: id:%d ft:%i fl:%04Xd, ff:%04XuD", 425 engine->u.kqueue.fd, ev->file->fd, EVFILT_VNODE, flags, fflags); 426 427 kev = nxt_kqueue_get_kevent(engine); 428 429 kev->ident = ev->file->fd; 430 kev->filter = EVFILT_VNODE; 431 kev->flags = flags; 432 kev->fflags = fflags; 433 kev->data = 0; 434 kev->udata = nxt_kevent_set_udata(ev); 435} 436 437 438static void 439nxt_kqueue_close_file(nxt_event_engine_t *engine, nxt_file_event_t *ev) 440{ 441 /* TODO: pending event. */ 442} 443 444 445static void 446nxt_kqueue_fd_set(nxt_event_engine_t *engine, nxt_fd_event_t *ev, 447 nxt_int_t filter, nxt_uint_t flags) 448{ 449 struct kevent *kev; 450 451 nxt_debug(ev->task, "kevent(%d) set event: id:%d ft:%i fl:%04Xui", 452 engine->u.kqueue.fd, ev->fd, filter, flags); 453 454 kev = nxt_kqueue_get_kevent(engine); 455 456 kev->ident = ev->fd; 457 kev->filter = filter; 458 kev->flags = flags; 459 kev->fflags = 0; 460 kev->data = 0; 461 kev->udata = nxt_kevent_set_udata(ev); 462} 463 464 465static struct kevent * 466nxt_kqueue_get_kevent(nxt_event_engine_t *engine) 467{ 468 int ret, nchanges; 469 470 nchanges = engine->u.kqueue.nchanges; 471 472 if (nxt_slow_path(nchanges >= engine->u.kqueue.mchanges)) { 473 474 nxt_debug(&engine->task, "kevent(%d) changes:%d", 475 engine->u.kqueue.fd, nchanges); 476 477 ret = kevent(engine->u.kqueue.fd, engine->u.kqueue.changes, nchanges, 478 NULL, 0, NULL); 479 480 if (nxt_slow_path(ret != 0)) {
|
481 nxt_log(&engine->task, NXT_LOG_CRIT, "kevent(%d) failed %E", 482 engine->u.kqueue.fd, nxt_errno);
| 481 nxt_alert(&engine->task, "kevent(%d) failed %E", 482 engine->u.kqueue.fd, nxt_errno);
|
483 484 nxt_kqueue_error(engine); 485 } 486 487 engine->u.kqueue.nchanges = 0; 488 } 489 490 return &engine->u.kqueue.changes[engine->u.kqueue.nchanges++]; 491} 492 493 494static void 495nxt_kqueue_error(nxt_event_engine_t *engine) 496{ 497 struct kevent *kev, *end; 498 nxt_fd_event_t *ev; 499 nxt_file_event_t *fev; 500 nxt_work_queue_t *wq; 501 502 wq = &engine->fast_work_queue; 503 end = &engine->u.kqueue.changes[engine->u.kqueue.nchanges]; 504 505 for (kev = engine->u.kqueue.changes; kev < end; kev++) { 506 507 switch (kev->filter) { 508 509 case EVFILT_READ: 510 case EVFILT_WRITE: 511 ev = nxt_kevent_get_udata(kev->udata); 512 nxt_work_queue_add(wq, nxt_kqueue_fd_error_handler, 513 ev->task, ev, ev->data); 514 break; 515 516 case EVFILT_VNODE: 517 fev = nxt_kevent_get_udata(kev->udata); 518 nxt_work_queue_add(wq, nxt_kqueue_file_error_handler, 519 fev->task, fev, fev->data); 520 break; 521 } 522 } 523} 524 525 526static void 527nxt_kqueue_fd_error_handler(nxt_task_t *task, void *obj, void *data) 528{ 529 nxt_fd_event_t *ev; 530 531 ev = obj; 532 533 nxt_debug(task, "kqueue fd error handler fd:%d", ev->fd); 534 535 if (ev->kq_eof && ev->kq_errno != 0) { 536 ev->error = ev->kq_errno; 537 nxt_log(task, nxt_socket_error_level(ev->kq_errno), 538 "kevent() reported error on descriptor %d %E", 539 ev->fd, ev->kq_errno); 540 } 541 542 ev->read = NXT_EVENT_INACTIVE; 543 ev->write = NXT_EVENT_INACTIVE; 544 ev->error = ev->kq_errno; 545 546 ev->error_handler(task, ev, data); 547} 548 549 550static void 551nxt_kqueue_file_error_handler(nxt_task_t *task, void *obj, void *data) 552{ 553 nxt_file_event_t *ev; 554 555 ev = obj; 556 557 nxt_debug(task, "kqueue file error handler fd:%d", ev->file->fd); 558 559 ev->handler(task, ev, data); 560} 561 562 563static nxt_int_t 564nxt_kqueue_add_signal(nxt_event_engine_t *engine, const nxt_sig_event_t *sigev) 565{ 566 int signo; 567 struct kevent kev; 568 struct sigaction sa; 569 570 signo = sigev->signo; 571 572 nxt_memzero(&sa, sizeof(struct sigaction)); 573 sigemptyset(&sa.sa_mask); 574 575 /* 576 * SIGCHLD must not be set to SIG_IGN, since kqueue cannot catch 577 * this signal. It should be set to SIG_DFL instead. And although 578 * SIGCHLD default action is also ignoring, nevertheless SIG_DFL 579 * allows kqueue to catch the signal. 580 */ 581 sa.sa_handler = (signo == SIGCHLD) ? SIG_DFL : SIG_IGN; 582 583 if (sigaction(signo, &sa, NULL) != 0) {
| 483 484 nxt_kqueue_error(engine); 485 } 486 487 engine->u.kqueue.nchanges = 0; 488 } 489 490 return &engine->u.kqueue.changes[engine->u.kqueue.nchanges++]; 491} 492 493 494static void 495nxt_kqueue_error(nxt_event_engine_t *engine) 496{ 497 struct kevent *kev, *end; 498 nxt_fd_event_t *ev; 499 nxt_file_event_t *fev; 500 nxt_work_queue_t *wq; 501 502 wq = &engine->fast_work_queue; 503 end = &engine->u.kqueue.changes[engine->u.kqueue.nchanges]; 504 505 for (kev = engine->u.kqueue.changes; kev < end; kev++) { 506 507 switch (kev->filter) { 508 509 case EVFILT_READ: 510 case EVFILT_WRITE: 511 ev = nxt_kevent_get_udata(kev->udata); 512 nxt_work_queue_add(wq, nxt_kqueue_fd_error_handler, 513 ev->task, ev, ev->data); 514 break; 515 516 case EVFILT_VNODE: 517 fev = nxt_kevent_get_udata(kev->udata); 518 nxt_work_queue_add(wq, nxt_kqueue_file_error_handler, 519 fev->task, fev, fev->data); 520 break; 521 } 522 } 523} 524 525 526static void 527nxt_kqueue_fd_error_handler(nxt_task_t *task, void *obj, void *data) 528{ 529 nxt_fd_event_t *ev; 530 531 ev = obj; 532 533 nxt_debug(task, "kqueue fd error handler fd:%d", ev->fd); 534 535 if (ev->kq_eof && ev->kq_errno != 0) { 536 ev->error = ev->kq_errno; 537 nxt_log(task, nxt_socket_error_level(ev->kq_errno), 538 "kevent() reported error on descriptor %d %E", 539 ev->fd, ev->kq_errno); 540 } 541 542 ev->read = NXT_EVENT_INACTIVE; 543 ev->write = NXT_EVENT_INACTIVE; 544 ev->error = ev->kq_errno; 545 546 ev->error_handler(task, ev, data); 547} 548 549 550static void 551nxt_kqueue_file_error_handler(nxt_task_t *task, void *obj, void *data) 552{ 553 nxt_file_event_t *ev; 554 555 ev = obj; 556 557 nxt_debug(task, "kqueue file error handler fd:%d", ev->file->fd); 558 559 ev->handler(task, ev, data); 560} 561 562 563static nxt_int_t 564nxt_kqueue_add_signal(nxt_event_engine_t *engine, const nxt_sig_event_t *sigev) 565{ 566 int signo; 567 struct kevent kev; 568 struct sigaction sa; 569 570 signo = sigev->signo; 571 572 nxt_memzero(&sa, sizeof(struct sigaction)); 573 sigemptyset(&sa.sa_mask); 574 575 /* 576 * SIGCHLD must not be set to SIG_IGN, since kqueue cannot catch 577 * this signal. It should be set to SIG_DFL instead. And although 578 * SIGCHLD default action is also ignoring, nevertheless SIG_DFL 579 * allows kqueue to catch the signal. 580 */ 581 sa.sa_handler = (signo == SIGCHLD) ? SIG_DFL : SIG_IGN; 582 583 if (sigaction(signo, &sa, NULL) != 0) {
|
584 nxt_log(&engine->task, NXT_LOG_CRIT, "sigaction(%d) failed %E", 585 signo, nxt_errno);
| 584 nxt_alert(&engine->task, "sigaction(%d) failed %E", signo, nxt_errno);
|
586 587 return NXT_ERROR; 588 } 589 590 nxt_debug(&engine->task, "kevent(%d) signo:%d (%s)", 591 engine->u.kqueue.fd, signo, sigev->name); 592 593 kev.ident = signo; 594 kev.filter = EVFILT_SIGNAL; 595 kev.flags = EV_ADD; 596 kev.fflags = 0; 597 kev.data = 0; 598 kev.udata = nxt_kevent_set_udata(sigev); 599 600 if (kevent(engine->u.kqueue.fd, &kev, 1, NULL, 0, NULL) == 0) { 601 return NXT_OK; 602 } 603
| 585 586 return NXT_ERROR; 587 } 588 589 nxt_debug(&engine->task, "kevent(%d) signo:%d (%s)", 590 engine->u.kqueue.fd, signo, sigev->name); 591 592 kev.ident = signo; 593 kev.filter = EVFILT_SIGNAL; 594 kev.flags = EV_ADD; 595 kev.fflags = 0; 596 kev.data = 0; 597 kev.udata = nxt_kevent_set_udata(sigev); 598 599 if (kevent(engine->u.kqueue.fd, &kev, 1, NULL, 0, NULL) == 0) { 600 return NXT_OK; 601 } 602
|
604 nxt_log(&engine->task, NXT_LOG_CRIT, "kevent(%d) failed %E", 605 kqueue, nxt_errno);
| 603 nxt_alert(&engine->task, "kevent(%d) failed %E", kqueue, nxt_errno);
|
606 607 return NXT_ERROR; 608} 609 610 611#if (NXT_HAVE_EVFILT_USER) 612 613static nxt_int_t 614nxt_kqueue_enable_post(nxt_event_engine_t *engine, nxt_work_handler_t handler) 615{ 616 struct kevent kev; 617 618 /* EVFILT_USER must be added to a kqueue before it can be triggered. */ 619 620 kev.ident = 0; 621 kev.filter = EVFILT_USER; 622 kev.flags = EV_ADD | EV_CLEAR; 623 kev.fflags = 0; 624 kev.data = 0; 625 kev.udata = NULL; 626 627 engine->u.kqueue.post_handler = handler; 628 629 if (kevent(engine->u.kqueue.fd, &kev, 1, NULL, 0, NULL) == 0) { 630 return NXT_OK; 631 } 632
| 604 605 return NXT_ERROR; 606} 607 608 609#if (NXT_HAVE_EVFILT_USER) 610 611static nxt_int_t 612nxt_kqueue_enable_post(nxt_event_engine_t *engine, nxt_work_handler_t handler) 613{ 614 struct kevent kev; 615 616 /* EVFILT_USER must be added to a kqueue before it can be triggered. */ 617 618 kev.ident = 0; 619 kev.filter = EVFILT_USER; 620 kev.flags = EV_ADD | EV_CLEAR; 621 kev.fflags = 0; 622 kev.data = 0; 623 kev.udata = NULL; 624 625 engine->u.kqueue.post_handler = handler; 626 627 if (kevent(engine->u.kqueue.fd, &kev, 1, NULL, 0, NULL) == 0) { 628 return NXT_OK; 629 } 630
|
633 nxt_log(&engine->task, NXT_LOG_CRIT, "kevent(%d) failed %E", 634 engine->u.kqueue.fd, nxt_errno);
| 631 nxt_alert(&engine->task, "kevent(%d) failed %E", 632 engine->u.kqueue.fd, nxt_errno);
|
635 636 return NXT_ERROR; 637} 638 639 640static void 641nxt_kqueue_signal(nxt_event_engine_t *engine, nxt_uint_t signo) 642{ 643 struct kevent kev; 644 645 /* 646 * kqueue has a builtin signal processing support, so the function 647 * is used only to post events and the signo argument is ignored. 648 */ 649 650 kev.ident = 0; 651 kev.filter = EVFILT_USER; 652 kev.flags = 0; 653 kev.fflags = NOTE_TRIGGER; 654 kev.data = 0; 655 kev.udata = NULL; 656 657 if (kevent(engine->u.kqueue.fd, &kev, 1, NULL, 0, NULL) != 0) {
| 633 634 return NXT_ERROR; 635} 636 637 638static void 639nxt_kqueue_signal(nxt_event_engine_t *engine, nxt_uint_t signo) 640{ 641 struct kevent kev; 642 643 /* 644 * kqueue has a builtin signal processing support, so the function 645 * is used only to post events and the signo argument is ignored. 646 */ 647 648 kev.ident = 0; 649 kev.filter = EVFILT_USER; 650 kev.flags = 0; 651 kev.fflags = NOTE_TRIGGER; 652 kev.data = 0; 653 kev.udata = NULL; 654 655 if (kevent(engine->u.kqueue.fd, &kev, 1, NULL, 0, NULL) != 0) {
|
658 nxt_log(&engine->task, NXT_LOG_CRIT, "kevent(%d) failed %E", 659 engine->u.kqueue.fd, nxt_errno);
| 656 nxt_alert(&engine->task, "kevent(%d) failed %E", 657 engine->u.kqueue.fd, nxt_errno);
|
660 } 661} 662 663#endif 664 665 666static void 667nxt_kqueue_poll(nxt_event_engine_t *engine, nxt_msec_t timeout) 668{ 669 int nevents; 670 void *obj, *data; 671 nxt_int_t i; 672 nxt_err_t err; 673 nxt_uint_t level; 674 nxt_bool_t error, eof; 675 nxt_task_t *task; 676 struct kevent *kev; 677 nxt_fd_event_t *ev; 678 nxt_sig_event_t *sigev; 679 struct timespec ts, *tp; 680 nxt_file_event_t *fev; 681 nxt_work_queue_t *wq; 682 nxt_work_handler_t handler; 683 684 if (timeout == NXT_INFINITE_MSEC) { 685 tp = NULL; 686 687 } else { 688 ts.tv_sec = timeout / 1000; 689 ts.tv_nsec = (timeout % 1000) * 1000000; 690 tp = &ts; 691 } 692 693 nxt_debug(&engine->task, "kevent(%d) changes:%d timeout:%M", 694 engine->u.kqueue.fd, engine->u.kqueue.nchanges, timeout); 695 696 nevents = kevent(engine->u.kqueue.fd, 697 engine->u.kqueue.changes, engine->u.kqueue.nchanges, 698 engine->u.kqueue.events, engine->u.kqueue.mevents, tp); 699 700 err = (nevents == -1) ? nxt_errno : 0; 701 702 nxt_thread_time_update(engine->task.thread); 703 704 nxt_debug(&engine->task, "kevent(%d): %d", engine->u.kqueue.fd, nevents); 705 706 if (nevents == -1) {
| 658 } 659} 660 661#endif 662 663 664static void 665nxt_kqueue_poll(nxt_event_engine_t *engine, nxt_msec_t timeout) 666{ 667 int nevents; 668 void *obj, *data; 669 nxt_int_t i; 670 nxt_err_t err; 671 nxt_uint_t level; 672 nxt_bool_t error, eof; 673 nxt_task_t *task; 674 struct kevent *kev; 675 nxt_fd_event_t *ev; 676 nxt_sig_event_t *sigev; 677 struct timespec ts, *tp; 678 nxt_file_event_t *fev; 679 nxt_work_queue_t *wq; 680 nxt_work_handler_t handler; 681 682 if (timeout == NXT_INFINITE_MSEC) { 683 tp = NULL; 684 685 } else { 686 ts.tv_sec = timeout / 1000; 687 ts.tv_nsec = (timeout % 1000) * 1000000; 688 tp = &ts; 689 } 690 691 nxt_debug(&engine->task, "kevent(%d) changes:%d timeout:%M", 692 engine->u.kqueue.fd, engine->u.kqueue.nchanges, timeout); 693 694 nevents = kevent(engine->u.kqueue.fd, 695 engine->u.kqueue.changes, engine->u.kqueue.nchanges, 696 engine->u.kqueue.events, engine->u.kqueue.mevents, tp); 697 698 err = (nevents == -1) ? nxt_errno : 0; 699 700 nxt_thread_time_update(engine->task.thread); 701 702 nxt_debug(&engine->task, "kevent(%d): %d", engine->u.kqueue.fd, nevents); 703 704 if (nevents == -1) {
|
707 level = (err == NXT_EINTR) ? NXT_LOG_INFO : NXT_LOG_CRIT;
| 705 level = (err == NXT_EINTR) ? NXT_LOG_INFO : NXT_LOG_ALERT;
|
708 709 nxt_log(&engine->task, level, "kevent(%d) failed %E", 710 engine->u.kqueue.fd, err); 711 712 nxt_kqueue_error(engine); 713 return; 714 } 715 716 engine->u.kqueue.nchanges = 0; 717 718 for (i = 0; i < nevents; i++) { 719 720 kev = &engine->u.kqueue.events[i]; 721 722 nxt_debug(&engine->task, 723 (kev->ident > 0x8000000 && kev->ident != (uintptr_t) -1) ? 724 "kevent: id:%p ft:%d fl:%04Xd ff:%d d:%d ud:%p": 725 "kevent: id:%d ft:%d fl:%04Xd ff:%d d:%d ud:%p", 726 kev->ident, kev->filter, kev->flags, kev->fflags, 727 kev->data, kev->udata); 728 729 error = (kev->flags & EV_ERROR); 730 731 if (nxt_slow_path(error)) {
| 706 707 nxt_log(&engine->task, level, "kevent(%d) failed %E", 708 engine->u.kqueue.fd, err); 709 710 nxt_kqueue_error(engine); 711 return; 712 } 713 714 engine->u.kqueue.nchanges = 0; 715 716 for (i = 0; i < nevents; i++) { 717 718 kev = &engine->u.kqueue.events[i]; 719 720 nxt_debug(&engine->task, 721 (kev->ident > 0x8000000 && kev->ident != (uintptr_t) -1) ? 722 "kevent: id:%p ft:%d fl:%04Xd ff:%d d:%d ud:%p": 723 "kevent: id:%d ft:%d fl:%04Xd ff:%d d:%d ud:%p", 724 kev->ident, kev->filter, kev->flags, kev->fflags, 725 kev->data, kev->udata); 726 727 error = (kev->flags & EV_ERROR); 728 729 if (nxt_slow_path(error)) {
|
732 nxt_log(&engine->task, NXT_LOG_CRIT, 733 "kevent(%d) error %E on ident:%d filter:%d", 734 engine->u.kqueue.fd, kev->data, kev->ident, kev->filter);
| 730 nxt_alert(&engine->task, 731 "kevent(%d) error %E on ident:%d filter:%d", 732 engine->u.kqueue.fd, kev->data, kev->ident, kev->filter);
|
735 } 736 737 task = &engine->task; 738 wq = &engine->fast_work_queue; 739 handler = nxt_kqueue_fd_error_handler; 740 obj = nxt_kevent_get_udata(kev->udata); 741 742 switch (kev->filter) { 743 744 case EVFILT_READ: 745 ev = obj; 746 ev->read_ready = 1; 747 ev->kq_available = (int32_t) kev->data; 748 err = kev->fflags; 749 eof = (kev->flags & EV_EOF) != 0; 750 ev->kq_errno = err; 751 ev->kq_eof = eof; 752 753 if (ev->read <= NXT_EVENT_BLOCKED) { 754 nxt_debug(ev->task, "blocked read event fd:%d", ev->fd); 755 continue; 756 } 757 758 if ((kev->flags & NXT_KEVENT_ONESHOT) != 0) { 759 ev->read = NXT_EVENT_INACTIVE; 760 } 761 762 if (nxt_slow_path(ev->kq_available == 0 && eof && err != 0)) { 763 error = 1; 764 } 765 766 if (nxt_fast_path(!error)) { 767 handler = ev->read_handler; 768 wq = ev->read_work_queue; 769 } 770 771 task = ev->task; 772 data = ev->data; 773 774 break; 775 776 case EVFILT_WRITE: 777 ev = obj; 778 ev->write_ready = 1; 779 err = kev->fflags; 780 eof = (kev->flags & EV_EOF) != 0; 781 ev->kq_errno = err; 782 ev->kq_eof = eof; 783 784 if (ev->write <= NXT_EVENT_BLOCKED) { 785 nxt_debug(ev->task, "blocked write event fd:%d", ev->fd); 786 continue; 787 } 788 789 if ((kev->flags & NXT_KEVENT_ONESHOT) != 0) { 790 ev->write = NXT_EVENT_INACTIVE; 791 } 792 793 if (nxt_slow_path(eof && err != 0)) { 794 error = 1; 795 } 796 797 if (nxt_fast_path(!error)) { 798 handler = ev->write_handler; 799 wq = ev->write_work_queue; 800 } 801 802 task = ev->task; 803 data = ev->data; 804 805 break; 806 807 case EVFILT_VNODE: 808 fev = obj; 809 handler = fev->handler; 810 task = fev->task; 811 data = fev->data; 812 break; 813 814 case EVFILT_SIGNAL: 815 sigev = obj; 816 obj = (void *) kev->ident; 817 handler = sigev->handler; 818 data = (void *) sigev->name; 819 break; 820 821#if (NXT_HAVE_EVFILT_USER) 822 823 case EVFILT_USER: 824 handler = engine->u.kqueue.post_handler; 825 data = NULL; 826 break; 827 828#endif 829 830 default: 831 832#if (NXT_DEBUG)
| 733 } 734 735 task = &engine->task; 736 wq = &engine->fast_work_queue; 737 handler = nxt_kqueue_fd_error_handler; 738 obj = nxt_kevent_get_udata(kev->udata); 739 740 switch (kev->filter) { 741 742 case EVFILT_READ: 743 ev = obj; 744 ev->read_ready = 1; 745 ev->kq_available = (int32_t) kev->data; 746 err = kev->fflags; 747 eof = (kev->flags & EV_EOF) != 0; 748 ev->kq_errno = err; 749 ev->kq_eof = eof; 750 751 if (ev->read <= NXT_EVENT_BLOCKED) { 752 nxt_debug(ev->task, "blocked read event fd:%d", ev->fd); 753 continue; 754 } 755 756 if ((kev->flags & NXT_KEVENT_ONESHOT) != 0) { 757 ev->read = NXT_EVENT_INACTIVE; 758 } 759 760 if (nxt_slow_path(ev->kq_available == 0 && eof && err != 0)) { 761 error = 1; 762 } 763 764 if (nxt_fast_path(!error)) { 765 handler = ev->read_handler; 766 wq = ev->read_work_queue; 767 } 768 769 task = ev->task; 770 data = ev->data; 771 772 break; 773 774 case EVFILT_WRITE: 775 ev = obj; 776 ev->write_ready = 1; 777 err = kev->fflags; 778 eof = (kev->flags & EV_EOF) != 0; 779 ev->kq_errno = err; 780 ev->kq_eof = eof; 781 782 if (ev->write <= NXT_EVENT_BLOCKED) { 783 nxt_debug(ev->task, "blocked write event fd:%d", ev->fd); 784 continue; 785 } 786 787 if ((kev->flags & NXT_KEVENT_ONESHOT) != 0) { 788 ev->write = NXT_EVENT_INACTIVE; 789 } 790 791 if (nxt_slow_path(eof && err != 0)) { 792 error = 1; 793 } 794 795 if (nxt_fast_path(!error)) { 796 handler = ev->write_handler; 797 wq = ev->write_work_queue; 798 } 799 800 task = ev->task; 801 data = ev->data; 802 803 break; 804 805 case EVFILT_VNODE: 806 fev = obj; 807 handler = fev->handler; 808 task = fev->task; 809 data = fev->data; 810 break; 811 812 case EVFILT_SIGNAL: 813 sigev = obj; 814 obj = (void *) kev->ident; 815 handler = sigev->handler; 816 data = (void *) sigev->name; 817 break; 818 819#if (NXT_HAVE_EVFILT_USER) 820 821 case EVFILT_USER: 822 handler = engine->u.kqueue.post_handler; 823 data = NULL; 824 break; 825 826#endif 827 828 default: 829 830#if (NXT_DEBUG)
|
833 nxt_log(&engine->task, NXT_LOG_CRIT, 834 "unexpected kevent(%d) filter %d on ident %d", 835 engine->u.kqueue.fd, kev->filter, kev->ident);
| 831 nxt_alert(&engine->task, 832 "unexpected kevent(%d) filter %d on ident %d", 833 engine->u.kqueue.fd, kev->filter, kev->ident);
|
836#endif 837 838 continue; 839 } 840 841 nxt_work_queue_add(wq, handler, task, obj, data); 842 } 843} 844 845 846/* 847 * nxt_kqueue_event_conn_io_connect() eliminates the 848 * getsockopt() syscall to test pending connect() error. 849 */ 850 851static void 852nxt_kqueue_conn_io_connect(nxt_task_t *task, void *obj, void *data) 853{ 854 nxt_conn_t *c; 855 nxt_event_engine_t *engine; 856 nxt_work_handler_t handler; 857 const nxt_event_conn_state_t *state; 858 859 c = obj; 860 861 state = c->write_state; 862 863 switch (nxt_socket_connect(task, c->socket.fd, c->remote) ){ 864 865 case NXT_OK: 866 c->socket.write_ready = 1; 867 handler = state->ready_handler; 868 break; 869 870 case NXT_AGAIN: 871 c->socket.write_handler = nxt_kqueue_conn_connected; 872 c->socket.error_handler = nxt_conn_connect_error; 873 874 engine = task->thread->engine; 875 nxt_conn_timer(engine, c, state, &c->write_timer); 876 877 nxt_kqueue_enable_write(engine, &c->socket); 878 return; 879 880 case NXT_DECLINED: 881 handler = state->close_handler; 882 break; 883 884 default: /* NXT_ERROR */ 885 handler = state->error_handler; 886 break; 887 } 888 889 nxt_work_queue_add(c->write_work_queue, handler, task, c, data); 890} 891 892 893static void 894nxt_kqueue_conn_connected(nxt_task_t *task, void *obj, void *data) 895{ 896 nxt_conn_t *c; 897 898 c = obj; 899 900 nxt_debug(task, "kqueue conn connected fd:%d", c->socket.fd); 901 902 c->socket.write = NXT_EVENT_BLOCKED; 903 904 if (c->write_state->timer_autoreset) { 905 nxt_timer_disable(task->thread->engine, &c->write_timer); 906 } 907 908 nxt_work_queue_add(c->write_work_queue, c->write_state->ready_handler, 909 task, c, data); 910} 911 912 913static void 914nxt_kqueue_listen_handler(nxt_task_t *task, void *obj, void *data) 915{ 916 nxt_listen_event_t *lev; 917 918 lev = obj; 919 920 nxt_debug(task, "kevent fd:%d avail:%D", 921 lev->socket.fd, lev->socket.kq_available); 922 923 lev->ready = nxt_min(lev->batch, (uint32_t) lev->socket.kq_available); 924 925 nxt_kqueue_conn_io_accept(task, lev, data); 926} 927 928 929static void 930nxt_kqueue_conn_io_accept(nxt_task_t *task, void *obj, void *data) 931{ 932 socklen_t socklen; 933 nxt_conn_t *c; 934 nxt_socket_t s; 935 struct sockaddr *sa; 936 nxt_listen_event_t *lev; 937 938 lev = obj; 939 c = lev->next; 940 941 lev->ready--; 942 lev->socket.read_ready = (lev->ready != 0); 943 944 lev->socket.kq_available--; 945 lev->socket.read_ready = (lev->socket.kq_available != 0); 946 947 sa = &c->remote->u.sockaddr; 948 socklen = c->remote->socklen; 949 /* 950 * The returned socklen is ignored here, 951 * see comment in nxt_conn_io_accept(). 952 */ 953 s = accept(lev->socket.fd, sa, &socklen); 954 955 if (s != -1) { 956 c->socket.fd = s; 957 958 nxt_debug(task, "accept(%d): %d", lev->socket.fd, s); 959 960 nxt_conn_accept(task, lev, c); 961 return; 962 } 963 964 nxt_conn_accept_error(task, lev, "accept", nxt_errno); 965} 966 967 968/* 969 * nxt_kqueue_conn_io_read() is just a wrapper to eliminate the 970 * readv() or recv() syscall if a remote side just closed connection. 971 */ 972 973static void 974nxt_kqueue_conn_io_read(nxt_task_t *task, void *obj, void *data) 975{ 976 nxt_conn_t *c; 977 978 c = obj; 979 980 nxt_debug(task, "kqueue conn read fd:%d", c->socket.fd); 981 982 if (c->socket.kq_available == 0 && c->socket.kq_eof) { 983 nxt_debug(task, "kevent fd:%d eof", c->socket.fd); 984 985 c->socket.closed = 1; 986 nxt_work_queue_add(c->read_work_queue, c->read_state->close_handler, 987 task, c, data); 988 return; 989 } 990 991 nxt_conn_io_read(task, c, data); 992} 993 994 995/* 996 * nxt_kqueue_conn_io_recvbuf() is just wrapper around standard 997 * nxt_conn_io_recvbuf() to eliminate the readv() or recv() syscalls 998 * if there is no pending data or a remote side closed connection. 999 */ 1000 1001static ssize_t 1002nxt_kqueue_conn_io_recvbuf(nxt_conn_t *c, nxt_buf_t *b) 1003{ 1004 ssize_t n; 1005 1006 if (c->socket.kq_available == 0 && c->socket.kq_eof) { 1007 c->socket.closed = 1; 1008 return 0; 1009 } 1010 1011 n = nxt_conn_io_recvbuf(c, b); 1012 1013 if (n > 0) { 1014 c->socket.kq_available -= n; 1015 1016 if (c->socket.kq_available < 0) { 1017 c->socket.kq_available = 0; 1018 } 1019 1020 nxt_debug(c->socket.task, "kevent fd:%d avail:%D eof:%d", 1021 c->socket.fd, c->socket.kq_available, c->socket.kq_eof); 1022 1023 c->socket.read_ready = (c->socket.kq_available != 0 1024 || c->socket.kq_eof); 1025 } 1026 1027 return n; 1028}
| 834#endif 835 836 continue; 837 } 838 839 nxt_work_queue_add(wq, handler, task, obj, data); 840 } 841} 842 843 844/* 845 * nxt_kqueue_event_conn_io_connect() eliminates the 846 * getsockopt() syscall to test pending connect() error. 847 */ 848 849static void 850nxt_kqueue_conn_io_connect(nxt_task_t *task, void *obj, void *data) 851{ 852 nxt_conn_t *c; 853 nxt_event_engine_t *engine; 854 nxt_work_handler_t handler; 855 const nxt_event_conn_state_t *state; 856 857 c = obj; 858 859 state = c->write_state; 860 861 switch (nxt_socket_connect(task, c->socket.fd, c->remote) ){ 862 863 case NXT_OK: 864 c->socket.write_ready = 1; 865 handler = state->ready_handler; 866 break; 867 868 case NXT_AGAIN: 869 c->socket.write_handler = nxt_kqueue_conn_connected; 870 c->socket.error_handler = nxt_conn_connect_error; 871 872 engine = task->thread->engine; 873 nxt_conn_timer(engine, c, state, &c->write_timer); 874 875 nxt_kqueue_enable_write(engine, &c->socket); 876 return; 877 878 case NXT_DECLINED: 879 handler = state->close_handler; 880 break; 881 882 default: /* NXT_ERROR */ 883 handler = state->error_handler; 884 break; 885 } 886 887 nxt_work_queue_add(c->write_work_queue, handler, task, c, data); 888} 889 890 891static void 892nxt_kqueue_conn_connected(nxt_task_t *task, void *obj, void *data) 893{ 894 nxt_conn_t *c; 895 896 c = obj; 897 898 nxt_debug(task, "kqueue conn connected fd:%d", c->socket.fd); 899 900 c->socket.write = NXT_EVENT_BLOCKED; 901 902 if (c->write_state->timer_autoreset) { 903 nxt_timer_disable(task->thread->engine, &c->write_timer); 904 } 905 906 nxt_work_queue_add(c->write_work_queue, c->write_state->ready_handler, 907 task, c, data); 908} 909 910 911static void 912nxt_kqueue_listen_handler(nxt_task_t *task, void *obj, void *data) 913{ 914 nxt_listen_event_t *lev; 915 916 lev = obj; 917 918 nxt_debug(task, "kevent fd:%d avail:%D", 919 lev->socket.fd, lev->socket.kq_available); 920 921 lev->ready = nxt_min(lev->batch, (uint32_t) lev->socket.kq_available); 922 923 nxt_kqueue_conn_io_accept(task, lev, data); 924} 925 926 927static void 928nxt_kqueue_conn_io_accept(nxt_task_t *task, void *obj, void *data) 929{ 930 socklen_t socklen; 931 nxt_conn_t *c; 932 nxt_socket_t s; 933 struct sockaddr *sa; 934 nxt_listen_event_t *lev; 935 936 lev = obj; 937 c = lev->next; 938 939 lev->ready--; 940 lev->socket.read_ready = (lev->ready != 0); 941 942 lev->socket.kq_available--; 943 lev->socket.read_ready = (lev->socket.kq_available != 0); 944 945 sa = &c->remote->u.sockaddr; 946 socklen = c->remote->socklen; 947 /* 948 * The returned socklen is ignored here, 949 * see comment in nxt_conn_io_accept(). 950 */ 951 s = accept(lev->socket.fd, sa, &socklen); 952 953 if (s != -1) { 954 c->socket.fd = s; 955 956 nxt_debug(task, "accept(%d): %d", lev->socket.fd, s); 957 958 nxt_conn_accept(task, lev, c); 959 return; 960 } 961 962 nxt_conn_accept_error(task, lev, "accept", nxt_errno); 963} 964 965 966/* 967 * nxt_kqueue_conn_io_read() is just a wrapper to eliminate the 968 * readv() or recv() syscall if a remote side just closed connection. 969 */ 970 971static void 972nxt_kqueue_conn_io_read(nxt_task_t *task, void *obj, void *data) 973{ 974 nxt_conn_t *c; 975 976 c = obj; 977 978 nxt_debug(task, "kqueue conn read fd:%d", c->socket.fd); 979 980 if (c->socket.kq_available == 0 && c->socket.kq_eof) { 981 nxt_debug(task, "kevent fd:%d eof", c->socket.fd); 982 983 c->socket.closed = 1; 984 nxt_work_queue_add(c->read_work_queue, c->read_state->close_handler, 985 task, c, data); 986 return; 987 } 988 989 nxt_conn_io_read(task, c, data); 990} 991 992 993/* 994 * nxt_kqueue_conn_io_recvbuf() is just wrapper around standard 995 * nxt_conn_io_recvbuf() to eliminate the readv() or recv() syscalls 996 * if there is no pending data or a remote side closed connection. 997 */ 998 999static ssize_t 1000nxt_kqueue_conn_io_recvbuf(nxt_conn_t *c, nxt_buf_t *b) 1001{ 1002 ssize_t n; 1003 1004 if (c->socket.kq_available == 0 && c->socket.kq_eof) { 1005 c->socket.closed = 1; 1006 return 0; 1007 } 1008 1009 n = nxt_conn_io_recvbuf(c, b); 1010 1011 if (n > 0) { 1012 c->socket.kq_available -= n; 1013 1014 if (c->socket.kq_available < 0) { 1015 c->socket.kq_available = 0; 1016 } 1017 1018 nxt_debug(c->socket.task, "kevent fd:%d avail:%D eof:%d", 1019 c->socket.fd, c->socket.kq_available, c->socket.kq_eof); 1020 1021 c->socket.read_ready = (c->socket.kq_available != 0 1022 || c->socket.kq_eof); 1023 } 1024 1025 return n; 1026}
|