1
2 /*
3 * Copyright (C) Igor Sysoev
4 * Copyright (C) NGINX, Inc.
5 */
6
7 #include <nxt_main.h>
8
9
10 /*
11 * The first epoll version has been introduced in Linux 2.5.44. The
12 * interface was changed several times since then and the final version
13 * of epoll_create(), epoll_ctl(), epoll_wait(), and EPOLLET mode has
14 * been introduced in Linux 2.6.0 and is supported since glibc 2.3.2.
15 *
16 * EPOLLET mode did not work reliable in early implementaions and in
17 * Linux 2.4 backport.
18 *
19 * EPOLLONESHOT Linux 2.6.2, glibc 2.3.
20 * EPOLLRDHUP Linux 2.6.17, glibc 2.8.
21 * epoll_pwait() Linux 2.6.19, glibc 2.6.
22 * signalfd() Linux 2.6.22, glibc 2.7.
23 * eventfd() Linux 2.6.22, glibc 2.7.
24 * timerfd_create() Linux 2.6.25, glibc 2.8.
25 * epoll_create1() Linux 2.6.27, glibc 2.9.
26 * signalfd4() Linux 2.6.27, glibc 2.9.
27 * eventfd2() Linux 2.6.27, glibc 2.9.
28 * accept4() Linux 2.6.28, glibc 2.10.
29 * eventfd2(EFD_SEMAPHORE) Linux 2.6.30, glibc 2.10.
30 * EPOLLEXCLUSIVE Linux 4.5, glibc 2.24.
31 */
32
33
34 #if (NXT_HAVE_EPOLL_EDGE)
35 static nxt_int_t nxt_epoll_edge_create(nxt_event_engine_t *engine,
36 nxt_uint_t mchanges, nxt_uint_t mevents);
37 #endif
38 static nxt_int_t nxt_epoll_level_create(nxt_event_engine_t *engine,
39 nxt_uint_t mchanges, nxt_uint_t mevents);
40 static nxt_int_t nxt_epoll_create(nxt_event_engine_t *engine,
41 nxt_uint_t mchanges, nxt_uint_t mevents, nxt_conn_io_t *io, uint32_t mode);
42 static void nxt_epoll_test_accept4(nxt_event_engine_t *engine,
43 nxt_conn_io_t *io);
44 static void nxt_epoll_free(nxt_event_engine_t *engine);
45 static void nxt_epoll_enable(nxt_event_engine_t *engine, nxt_fd_event_t *ev);
46 static void nxt_epoll_disable(nxt_event_engine_t *engine, nxt_fd_event_t *ev);
47 static void nxt_epoll_delete(nxt_event_engine_t *engine, nxt_fd_event_t *ev);
48 static nxt_bool_t nxt_epoll_close(nxt_event_engine_t *engine,
49 nxt_fd_event_t *ev);
50 static void nxt_epoll_enable_read(nxt_event_engine_t *engine,
51 nxt_fd_event_t *ev);
52 static void nxt_epoll_enable_write(nxt_event_engine_t *engine,
53 nxt_fd_event_t *ev);
54 static void nxt_epoll_disable_read(nxt_event_engine_t *engine,
55 nxt_fd_event_t *ev);
56 static void nxt_epoll_disable_write(nxt_event_engine_t *engine,
57 nxt_fd_event_t *ev);
58 static void nxt_epoll_block_read(nxt_event_engine_t *engine,
59 nxt_fd_event_t *ev);
60 static void nxt_epoll_block_write(nxt_event_engine_t *engine,
61 nxt_fd_event_t *ev);
62 static void nxt_epoll_oneshot_read(nxt_event_engine_t *engine,
63 nxt_fd_event_t *ev);
64 static void nxt_epoll_oneshot_write(nxt_event_engine_t *engine,
65 nxt_fd_event_t *ev);
66 static void nxt_epoll_enable_accept(nxt_event_engine_t *engine,
67 nxt_fd_event_t *ev);
68 static void nxt_epoll_change(nxt_event_engine_t *engine, nxt_fd_event_t *ev,
69 int op, uint32_t events);
70 static void nxt_epoll_commit_changes(nxt_event_engine_t *engine);
71 static void nxt_epoll_error_handler(nxt_task_t *task, void *obj, void *data);
72 #if (NXT_HAVE_SIGNALFD)
73 static nxt_int_t nxt_epoll_add_signal(nxt_event_engine_t *engine);
74 static void nxt_epoll_signalfd_handler(nxt_task_t *task, void *obj, void *data);
75 #endif
76 #if (NXT_HAVE_EVENTFD)
77 static nxt_int_t nxt_epoll_enable_post(nxt_event_engine_t *engine,
78 nxt_work_handler_t handler);
79 static void nxt_epoll_eventfd_handler(nxt_task_t *task, void *obj, void *data);
80 static void nxt_epoll_signal(nxt_event_engine_t *engine, nxt_uint_t signo);
81 #endif
82 static void nxt_epoll_poll(nxt_event_engine_t *engine, nxt_msec_t timeout);
83
84 #if (NXT_HAVE_ACCEPT4)
85 static void nxt_epoll_conn_io_accept4(nxt_task_t *task, void *obj,
86 void *data);
87 #endif
88
89
90 #if (NXT_HAVE_EPOLL_EDGE)
91
92 static void nxt_epoll_edge_conn_io_connect(nxt_task_t *task, void *obj,
93 void *data);
94 static void nxt_epoll_edge_conn_connected(nxt_task_t *task, void *obj,
95 void *data);
96 static ssize_t nxt_epoll_edge_conn_io_recvbuf(nxt_conn_t *c, nxt_buf_t *b);
97
98
99 static nxt_conn_io_t nxt_epoll_edge_conn_io = {
100 .connect = nxt_epoll_edge_conn_io_connect,
101 .accept = nxt_conn_io_accept,
102
103 .read = nxt_conn_io_read,
104 .recvbuf = nxt_epoll_edge_conn_io_recvbuf,
105 .recv = nxt_conn_io_recv,
106
107 .write = nxt_conn_io_write,
108 .sendbuf = nxt_conn_io_sendbuf,
109
110 #if (NXT_HAVE_LINUX_SENDFILE)
111 .old_sendbuf = nxt_linux_event_conn_io_sendfile,
112 #else
113 .old_sendbuf = nxt_event_conn_io_sendbuf,
114 #endif
115
116 .writev = nxt_event_conn_io_writev,
117 .send = nxt_event_conn_io_send,
118 };
119
120
121 const nxt_event_interface_t nxt_epoll_edge_engine = {
122 "epoll_edge",
123 nxt_epoll_edge_create,
124 nxt_epoll_free,
125 nxt_epoll_enable,
126 nxt_epoll_disable,
127 nxt_epoll_delete,
128 nxt_epoll_close,
129 nxt_epoll_enable_read,
130 nxt_epoll_enable_write,
131 nxt_epoll_disable_read,
132 nxt_epoll_disable_write,
133 nxt_epoll_block_read,
134 nxt_epoll_block_write,
135 nxt_epoll_oneshot_read,
136 nxt_epoll_oneshot_write,
137 nxt_epoll_enable_accept,
138 NULL,
139 NULL,
140 #if (NXT_HAVE_EVENTFD)
141 nxt_epoll_enable_post,
142 nxt_epoll_signal,
143 #else
144 NULL,
145 NULL,
146 #endif
147 nxt_epoll_poll,
148
149 &nxt_epoll_edge_conn_io,
150
151 #if (NXT_HAVE_INOTIFY)
152 NXT_FILE_EVENTS,
153 #else
154 NXT_NO_FILE_EVENTS,
155 #endif
156
157 #if (NXT_HAVE_SIGNALFD)
158 NXT_SIGNAL_EVENTS,
159 #else
160 NXT_NO_SIGNAL_EVENTS,
161 #endif
162 };
163
164 #endif
165
166
167 const nxt_event_interface_t nxt_epoll_level_engine = {
168 "epoll_level",
169 nxt_epoll_level_create,
170 nxt_epoll_free,
171 nxt_epoll_enable,
172 nxt_epoll_disable,
173 nxt_epoll_delete,
174 nxt_epoll_close,
175 nxt_epoll_enable_read,
176 nxt_epoll_enable_write,
177 nxt_epoll_disable_read,
178 nxt_epoll_disable_write,
179 nxt_epoll_block_read,
180 nxt_epoll_block_write,
181 nxt_epoll_oneshot_read,
182 nxt_epoll_oneshot_write,
183 nxt_epoll_enable_accept,
184 NULL,
185 NULL,
186 #if (NXT_HAVE_EVENTFD)
187 nxt_epoll_enable_post,
188 nxt_epoll_signal,
189 #else
190 NULL,
191 NULL,
192 #endif
193 nxt_epoll_poll,
194
195 &nxt_unix_conn_io,
196
197 #if (NXT_HAVE_INOTIFY)
198 NXT_FILE_EVENTS,
199 #else
200 NXT_NO_FILE_EVENTS,
201 #endif
202
203 #if (NXT_HAVE_SIGNALFD)
204 NXT_SIGNAL_EVENTS,
205 #else
206 NXT_NO_SIGNAL_EVENTS,
207 #endif
208 };
209
210
211 #if (NXT_HAVE_EPOLL_EDGE)
212
213 static nxt_int_t
nxt_epoll_edge_create(nxt_event_engine_t * engine,nxt_uint_t mchanges,nxt_uint_t mevents)214 nxt_epoll_edge_create(nxt_event_engine_t *engine, nxt_uint_t mchanges,
215 nxt_uint_t mevents)
216 {
217 return nxt_epoll_create(engine, mchanges, mevents, &nxt_epoll_edge_conn_io,
218 EPOLLET | EPOLLRDHUP);
219 }
220
221 #endif
222
223
224 static nxt_int_t
nxt_epoll_level_create(nxt_event_engine_t * engine,nxt_uint_t mchanges,nxt_uint_t mevents)225 nxt_epoll_level_create(nxt_event_engine_t *engine, nxt_uint_t mchanges,
226 nxt_uint_t mevents)
227 {
228 return nxt_epoll_create(engine, mchanges, mevents,
229 &nxt_unix_conn_io, 0);
230 }
231
232
233 static nxt_int_t
nxt_epoll_create(nxt_event_engine_t * engine,nxt_uint_t mchanges,nxt_uint_t mevents,nxt_conn_io_t * io,uint32_t mode)234 nxt_epoll_create(nxt_event_engine_t *engine, nxt_uint_t mchanges,
235 nxt_uint_t mevents, nxt_conn_io_t *io, uint32_t mode)
236 {
237 engine->u.epoll.fd = -1;
238 engine->u.epoll.mode = mode;
239 engine->u.epoll.mchanges = mchanges;
240 engine->u.epoll.mevents = mevents;
241 #if (NXT_HAVE_SIGNALFD)
242 engine->u.epoll.signalfd.fd = -1;
243 #endif
244
245 engine->u.epoll.changes = nxt_malloc(sizeof(nxt_epoll_change_t) * mchanges);
246 if (engine->u.epoll.changes == NULL) {
247 goto fail;
248 }
249
250 engine->u.epoll.events = nxt_malloc(sizeof(struct epoll_event) * mevents);
251 if (engine->u.epoll.events == NULL) {
252 goto fail;
253 }
254
255 engine->u.epoll.fd = epoll_create(1);
256 if (engine->u.epoll.fd == -1) {
257 nxt_alert(&engine->task, "epoll_create() failed %E", nxt_errno);
258 goto fail;
259 }
260
261 nxt_debug(&engine->task, "epoll_create(): %d", engine->u.epoll.fd);
262
263 if (engine->signals != NULL) {
264
265 #if (NXT_HAVE_SIGNALFD)
266
267 if (nxt_epoll_add_signal(engine) != NXT_OK) {
268 goto fail;
269 }
270
271 #endif
272
273 nxt_epoll_test_accept4(engine, io);
274 }
275
276 return NXT_OK;
277
278 fail:
279
280 nxt_epoll_free(engine);
281
282 return NXT_ERROR;
283 }
284
285
286 static void
nxt_epoll_test_accept4(nxt_event_engine_t * engine,nxt_conn_io_t * io)287 nxt_epoll_test_accept4(nxt_event_engine_t *engine, nxt_conn_io_t *io)
288 {
289 static nxt_work_handler_t handler;
290
291 if (handler == NULL) {
292
293 handler = io->accept;
294
295 #if (NXT_HAVE_ACCEPT4)
296
297 (void) accept4(-1, NULL, NULL, SOCK_NONBLOCK);
298
299 if (nxt_errno != NXT_ENOSYS) {
300 handler = nxt_epoll_conn_io_accept4;
301
302 } else {
303 nxt_log(&engine->task, NXT_LOG_INFO, "accept4() failed %E",
304 NXT_ENOSYS);
305 }
306
307 #endif
308 }
309
310 io->accept = handler;
311 }
312
313
314 static void
nxt_epoll_free(nxt_event_engine_t * engine)315 nxt_epoll_free(nxt_event_engine_t *engine)
316 {
317 int fd;
318
319 nxt_debug(&engine->task, "epoll %d free", engine->u.epoll.fd);
320
321 #if (NXT_HAVE_SIGNALFD)
322
323 fd = engine->u.epoll.signalfd.fd;
324
325 if (fd != -1 && close(fd) != 0) {
326 nxt_alert(&engine->task, "signalfd close(%d) failed %E", fd, nxt_errno);
327 }
328
329 #endif
330
331 #if (NXT_HAVE_EVENTFD)
332
333 fd = engine->u.epoll.eventfd.fd;
334
335 if (fd != -1 && close(fd) != 0) {
336 nxt_alert(&engine->task, "eventfd close(%d) failed %E", fd, nxt_errno);
337 }
338
339 #endif
340
341 fd = engine->u.epoll.fd;
342
343 if (fd != -1 && close(fd) != 0) {
344 nxt_alert(&engine->task, "epoll close(%d) failed %E", fd, nxt_errno);
345 }
346
347 nxt_free(engine->u.epoll.events);
348 nxt_free(engine->u.epoll.changes);
349
350 nxt_memzero(&engine->u.epoll, sizeof(nxt_epoll_engine_t));
351 }
352
353
354 static void
nxt_epoll_enable(nxt_event_engine_t * engine,nxt_fd_event_t * ev)355 nxt_epoll_enable(nxt_event_engine_t *engine, nxt_fd_event_t *ev)
356 {
357 ev->read = NXT_EVENT_ACTIVE;
358 ev->write = NXT_EVENT_ACTIVE;
359
360 nxt_epoll_change(engine, ev, EPOLL_CTL_ADD,
361 EPOLLIN | EPOLLOUT | engine->u.epoll.mode);
362 }
363
364
365 static void
nxt_epoll_disable(nxt_event_engine_t * engine,nxt_fd_event_t * ev)366 nxt_epoll_disable(nxt_event_engine_t *engine, nxt_fd_event_t *ev)
367 {
368 if (ev->read > NXT_EVENT_DISABLED || ev->write > NXT_EVENT_DISABLED) {
369
370 ev->read = NXT_EVENT_INACTIVE;
371 ev->write = NXT_EVENT_INACTIVE;
372
373 nxt_epoll_change(engine, ev, EPOLL_CTL_DEL, 0);
374 }
375 }
376
377
378 static void
nxt_epoll_delete(nxt_event_engine_t * engine,nxt_fd_event_t * ev)379 nxt_epoll_delete(nxt_event_engine_t *engine, nxt_fd_event_t *ev)
380 {
381 if (ev->read != NXT_EVENT_INACTIVE || ev->write != NXT_EVENT_INACTIVE) {
382
383 ev->read = NXT_EVENT_INACTIVE;
384 ev->write = NXT_EVENT_INACTIVE;
385
386 nxt_epoll_change(engine, ev, EPOLL_CTL_DEL, 0);
387 }
388 }
389
390
391 /*
392 * Although calling close() on a file descriptor will remove any epoll
393 * events that reference the descriptor, in this case the close() acquires
394 * the kernel global "epmutex" while epoll_ctl(EPOLL_CTL_DEL) does not
395 * acquire the "epmutex" since Linux 3.13 if the file descriptor presents
396 * only in one epoll set. Thus removing events explicitly before closing
397 * eliminates possible lock contention.
398 */
399
400 static nxt_bool_t
nxt_epoll_close(nxt_event_engine_t * engine,nxt_fd_event_t * ev)401 nxt_epoll_close(nxt_event_engine_t *engine, nxt_fd_event_t *ev)
402 {
403 nxt_epoll_delete(engine, ev);
404
405 return ev->changing;
406 }
407
408
409 static void
nxt_epoll_enable_read(nxt_event_engine_t * engine,nxt_fd_event_t * ev)410 nxt_epoll_enable_read(nxt_event_engine_t *engine, nxt_fd_event_t *ev)
411 {
412 int op;
413 uint32_t events;
414
415 if (ev->read != NXT_EVENT_BLOCKED) {
416
417 op = EPOLL_CTL_MOD;
418 events = EPOLLIN | engine->u.epoll.mode;
419
420 if (ev->read == NXT_EVENT_INACTIVE && ev->write == NXT_EVENT_INACTIVE) {
421 op = EPOLL_CTL_ADD;
422
423 } else if (ev->write >= NXT_EVENT_BLOCKED) {
424 events |= EPOLLOUT;
425 }
426
427 nxt_epoll_change(engine, ev, op, events);
428 }
429
430 ev->read = NXT_EVENT_ACTIVE;
431 }
432
433
434 static void
nxt_epoll_enable_write(nxt_event_engine_t * engine,nxt_fd_event_t * ev)435 nxt_epoll_enable_write(nxt_event_engine_t *engine, nxt_fd_event_t *ev)
436 {
437 int op;
438 uint32_t events;
439
440 if (ev->write != NXT_EVENT_BLOCKED) {
441
442 op = EPOLL_CTL_MOD;
443 events = EPOLLOUT | engine->u.epoll.mode;
444
445 if (ev->read == NXT_EVENT_INACTIVE && ev->write == NXT_EVENT_INACTIVE) {
446 op = EPOLL_CTL_ADD;
447
448 } else if (ev->read >= NXT_EVENT_BLOCKED) {
449 events |= EPOLLIN;
450 }
451
452 nxt_epoll_change(engine, ev, op, events);
453 }
454
455 ev->write = NXT_EVENT_ACTIVE;
456 }
457
458
459 static void
nxt_epoll_disable_read(nxt_event_engine_t * engine,nxt_fd_event_t * ev)460 nxt_epoll_disable_read(nxt_event_engine_t *engine, nxt_fd_event_t *ev)
461 {
462 int op;
463 uint32_t events;
464
465 ev->read = NXT_EVENT_INACTIVE;
466
467 if (ev->write <= NXT_EVENT_DISABLED) {
468 ev->write = NXT_EVENT_INACTIVE;
469 op = EPOLL_CTL_DEL;
470 events = 0;
471
472 } else {
473 op = EPOLL_CTL_MOD;
474 events = EPOLLOUT | engine->u.epoll.mode;
475 }
476
477 nxt_epoll_change(engine, ev, op, events);
478 }
479
480
481 static void
nxt_epoll_disable_write(nxt_event_engine_t * engine,nxt_fd_event_t * ev)482 nxt_epoll_disable_write(nxt_event_engine_t *engine, nxt_fd_event_t *ev)
483 {
484 int op;
485 uint32_t events;
486
487 ev->write = NXT_EVENT_INACTIVE;
488
489 if (ev->read <= NXT_EVENT_DISABLED) {
490 ev->read = NXT_EVENT_INACTIVE;
491 op = EPOLL_CTL_DEL;
492 events = 0;
493
494 } else {
495 op = EPOLL_CTL_MOD;
496 events = EPOLLIN | engine->u.epoll.mode;
497 }
498
499 nxt_epoll_change(engine, ev, op, events);
500 }
501
502
503 static void
nxt_epoll_block_read(nxt_event_engine_t * engine,nxt_fd_event_t * ev)504 nxt_epoll_block_read(nxt_event_engine_t *engine, nxt_fd_event_t *ev)
505 {
506 if (ev->read != NXT_EVENT_INACTIVE) {
507 ev->read = NXT_EVENT_BLOCKED;
508 }
509 }
510
511
512 static void
nxt_epoll_block_write(nxt_event_engine_t * engine,nxt_fd_event_t * ev)513 nxt_epoll_block_write(nxt_event_engine_t *engine, nxt_fd_event_t *ev)
514 {
515 if (ev->write != NXT_EVENT_INACTIVE) {
516 ev->write = NXT_EVENT_BLOCKED;
517 }
518 }
519
520
521 /*
522 * NXT_EVENT_DISABLED state is used to track whether EPOLLONESHOT
523 * event should be added or modified, epoll_ctl(2):
524 *
525 * EPOLLONESHOT (since Linux 2.6.2)
526 * Sets the one-shot behavior for the associated file descriptor.
527 * This means that after an event is pulled out with epoll_wait(2)
528 * the associated file descriptor is internally disabled and no
529 * other events will be reported by the epoll interface. The user
530 * must call epoll_ctl() with EPOLL_CTL_MOD to rearm the file
531 * descriptor with a new event mask.
532 */
533
534 static void
nxt_epoll_oneshot_read(nxt_event_engine_t * engine,nxt_fd_event_t * ev)535 nxt_epoll_oneshot_read(nxt_event_engine_t *engine, nxt_fd_event_t *ev)
536 {
537 int op;
538
539 op = (ev->read == NXT_EVENT_INACTIVE && ev->write == NXT_EVENT_INACTIVE) ?
540 EPOLL_CTL_ADD : EPOLL_CTL_MOD;
541
542 ev->read = NXT_EVENT_ONESHOT;
543 ev->write = NXT_EVENT_INACTIVE;
544
545 nxt_epoll_change(engine, ev, op, EPOLLIN | EPOLLONESHOT);
546 }
547
548
549 static void
nxt_epoll_oneshot_write(nxt_event_engine_t * engine,nxt_fd_event_t * ev)550 nxt_epoll_oneshot_write(nxt_event_engine_t *engine, nxt_fd_event_t *ev)
551 {
552 int op;
553
554 op = (ev->read == NXT_EVENT_INACTIVE && ev->write == NXT_EVENT_INACTIVE) ?
555 EPOLL_CTL_ADD : EPOLL_CTL_MOD;
556
557 ev->read = NXT_EVENT_INACTIVE;
558 ev->write = NXT_EVENT_ONESHOT;
559
560 nxt_epoll_change(engine, ev, op, EPOLLOUT | EPOLLONESHOT);
561 }
562
563
564 static void
nxt_epoll_enable_accept(nxt_event_engine_t * engine,nxt_fd_event_t * ev)565 nxt_epoll_enable_accept(nxt_event_engine_t *engine, nxt_fd_event_t *ev)
566 {
567 uint32_t events;
568
569 ev->read = NXT_EVENT_ACTIVE;
570
571 events = EPOLLIN;
572
573 #ifdef EPOLLEXCLUSIVE
574 events |= EPOLLEXCLUSIVE;
575 #endif
576
577 nxt_epoll_change(engine, ev, EPOLL_CTL_ADD, events);
578 }
579
580
581 /*
582 * epoll changes are batched to improve instruction and data cache
583 * locality of several epoll_ctl() calls followed by epoll_wait() call.
584 */
585
586 static void
nxt_epoll_change(nxt_event_engine_t * engine,nxt_fd_event_t * ev,int op,uint32_t events)587 nxt_epoll_change(nxt_event_engine_t *engine, nxt_fd_event_t *ev, int op,
588 uint32_t events)
589 {
590 nxt_epoll_change_t *change;
591
592 nxt_debug(ev->task, "epoll %d set event: fd:%d op:%d ev:%XD",
593 engine->u.epoll.fd, ev->fd, op, events);
594
595 if (engine->u.epoll.nchanges >= engine->u.epoll.mchanges) {
596 nxt_epoll_commit_changes(engine);
597 }
598
599 ev->changing = 1;
600
601 change = &engine->u.epoll.changes[engine->u.epoll.nchanges++];
602 change->op = op;
603 change->event.events = events;
604 change->event.data.ptr = ev;
605 }
606
607
608 static void
nxt_epoll_commit_changes(nxt_event_engine_t * engine)609 nxt_epoll_commit_changes(nxt_event_engine_t *engine)
610 {
611 int ret;
612 nxt_fd_event_t *ev;
613 nxt_epoll_change_t *change, *end;
614
615 nxt_debug(&engine->task, "epoll %d changes:%ui",
616 engine->u.epoll.fd, engine->u.epoll.nchanges);
617
618 change = engine->u.epoll.changes;
619 end = change + engine->u.epoll.nchanges;
620
621 do {
622 ev = change->event.data.ptr;
623 ev->changing = 0;
624
625 nxt_debug(ev->task, "epoll_ctl(%d): fd:%d op:%d ev:%XD",
626 engine->u.epoll.fd, ev->fd, change->op,
627 change->event.events);
628
629 ret = epoll_ctl(engine->u.epoll.fd, change->op, ev->fd, &change->event);
630
631 if (nxt_slow_path(ret != 0)) {
632 nxt_alert(ev->task, "epoll_ctl(%d, %d, %d) failed %E",
633 engine->u.epoll.fd, change->op, ev->fd, nxt_errno);
634
635 nxt_work_queue_add(&engine->fast_work_queue,
636 nxt_epoll_error_handler, ev->task, ev, ev->data);
637
638 engine->u.epoll.error = 1;
639 }
640
641 change++;
642
643 } while (change < end);
644
645 engine->u.epoll.nchanges = 0;
646 }
647
648
649 static void
nxt_epoll_error_handler(nxt_task_t * task,void * obj,void * data)650 nxt_epoll_error_handler(nxt_task_t *task, void *obj, void *data)
651 {
652 nxt_fd_event_t *ev;
653
654 ev = obj;
655
656 ev->read = NXT_EVENT_INACTIVE;
657 ev->write = NXT_EVENT_INACTIVE;
658
659 ev->error_handler(ev->task, ev, data);
660 }
661
662
663 #if (NXT_HAVE_SIGNALFD)
664
665 static nxt_int_t
nxt_epoll_add_signal(nxt_event_engine_t * engine)666 nxt_epoll_add_signal(nxt_event_engine_t *engine)
667 {
668 int fd;
669 struct epoll_event ee;
670
671 if (sigprocmask(SIG_BLOCK, &engine->signals->sigmask, NULL) != 0) {
672 nxt_alert(&engine->task, "sigprocmask(SIG_BLOCK) failed %E", nxt_errno);
673 return NXT_ERROR;
674 }
675
676 /*
677 * Glibc signalfd() wrapper always has the flags argument. Glibc 2.7
678 * and 2.8 signalfd() wrappers call the original signalfd() syscall
679 * without the flags argument. Glibc 2.9+ signalfd() wrapper at first
680 * tries to call signalfd4() syscall and if it fails then calls the
681 * original signalfd() syscall. For this reason the non-blocking mode
682 * is set separately.
683 */
684
685 fd = signalfd(-1, &engine->signals->sigmask, 0);
686
687 if (fd == -1) {
688 nxt_alert(&engine->task, "signalfd(%d) failed %E",
689 engine->u.epoll.signalfd.fd, nxt_errno);
690 return NXT_ERROR;
691 }
692
693 engine->u.epoll.signalfd.fd = fd;
694
695 if (nxt_fd_nonblocking(&engine->task, fd) != NXT_OK) {
696 return NXT_ERROR;
697 }
698
699 nxt_debug(&engine->task, "signalfd(): %d", fd);
700
701 engine->u.epoll.signalfd.data = engine->signals->handler;
702 engine->u.epoll.signalfd.read_work_queue = &engine->fast_work_queue;
703 engine->u.epoll.signalfd.read_handler = nxt_epoll_signalfd_handler;
704 engine->u.epoll.signalfd.log = engine->task.log;
705 engine->u.epoll.signalfd.task = &engine->task;
706
707 ee.events = EPOLLIN;
708 ee.data.ptr = &engine->u.epoll.signalfd;
709
710 if (epoll_ctl(engine->u.epoll.fd, EPOLL_CTL_ADD, fd, &ee) != 0) {
711 nxt_alert(&engine->task, "epoll_ctl(%d, %d, %d) failed %E",
712 engine->u.epoll.fd, EPOLL_CTL_ADD, fd, nxt_errno);
713
714 return NXT_ERROR;
715 }
716
717 return NXT_OK;
718 }
719
720
721 static void
nxt_epoll_signalfd_handler(nxt_task_t * task,void * obj,void * data)722 nxt_epoll_signalfd_handler(nxt_task_t *task, void *obj, void *data)
723 {
724 int n;
725 nxt_fd_event_t *ev;
726 nxt_work_handler_t handler;
727 struct signalfd_siginfo sfd;
728
729 ev = obj;
730 handler = data;
731
732 nxt_debug(task, "signalfd handler");
733
734 n = read(ev->fd, &sfd, sizeof(struct signalfd_siginfo));
735
736 nxt_debug(task, "read signalfd(%d): %d", ev->fd, n);
737
738 if (n != sizeof(struct signalfd_siginfo)) {
739 nxt_alert(task, "read signalfd(%d) failed %E", ev->fd, nxt_errno);
740 return;
741 }
742
743 nxt_debug(task, "signalfd(%d) signo:%d", ev->fd, sfd.ssi_signo);
744
745 handler(task, (void *) (uintptr_t) sfd.ssi_signo, NULL);
746 }
747
748 #endif
749
750
751 #if (NXT_HAVE_EVENTFD)
752
753 static nxt_int_t
nxt_epoll_enable_post(nxt_event_engine_t * engine,nxt_work_handler_t handler)754 nxt_epoll_enable_post(nxt_event_engine_t *engine, nxt_work_handler_t handler)
755 {
756 int ret;
757 struct epoll_event ee;
758
759 engine->u.epoll.post_handler = handler;
760
761 /*
762 * Glibc eventfd() wrapper always has the flags argument. Glibc 2.7
763 * and 2.8 eventfd() wrappers call the original eventfd() syscall
764 * without the flags argument. Glibc 2.9+ eventfd() wrapper at first
765 * tries to call eventfd2() syscall and if it fails then calls the
766 * original eventfd() syscall. For this reason the non-blocking mode
767 * is set separately.
768 */
769
770 engine->u.epoll.eventfd.fd = eventfd(0, 0);
771
772 if (engine->u.epoll.eventfd.fd == -1) {
773 nxt_alert(&engine->task, "eventfd() failed %E", nxt_errno);
774 return NXT_ERROR;
775 }
776
777 ret = nxt_fd_nonblocking(&engine->task, engine->u.epoll.eventfd.fd);
778 if (nxt_slow_path(ret != NXT_OK)) {
779 return NXT_ERROR;
780 }
781
782 nxt_debug(&engine->task, "eventfd(): %d", engine->u.epoll.eventfd.fd);
783
784 engine->u.epoll.eventfd.read_work_queue = &engine->fast_work_queue;
785 engine->u.epoll.eventfd.read_handler = nxt_epoll_eventfd_handler;
786 engine->u.epoll.eventfd.data = engine;
787 engine->u.epoll.eventfd.log = engine->task.log;
788 engine->u.epoll.eventfd.task = &engine->task;
789
790 ee.events = EPOLLIN | EPOLLET;
791 ee.data.ptr = &engine->u.epoll.eventfd;
792
793 ret = epoll_ctl(engine->u.epoll.fd, EPOLL_CTL_ADD,
794 engine->u.epoll.eventfd.fd, &ee);
795
796 if (nxt_fast_path(ret == 0)) {
797 return NXT_OK;
798 }
799
800 nxt_alert(&engine->task, "epoll_ctl(%d, %d, %d) failed %E",
801 engine->u.epoll.fd, EPOLL_CTL_ADD, engine->u.epoll.eventfd.fd,
802 nxt_errno);
803
804 return NXT_ERROR;
805 }
806
807
808 static void
nxt_epoll_eventfd_handler(nxt_task_t * task,void * obj,void * data)809 nxt_epoll_eventfd_handler(nxt_task_t *task, void *obj, void *data)
810 {
811 int n;
812 uint64_t events;
813 nxt_event_engine_t *engine;
814
815 engine = data;
816
817 nxt_debug(task, "eventfd handler, times:%ui", engine->u.epoll.neventfd);
818
819 /*
820 * The maximum value after write() to a eventfd() descriptor will
821 * block or return EAGAIN is 0xFFFFFFFFFFFFFFFE, so the descriptor
822 * can be read once per many notifications, for example, once per
823 * 2^32-2 noticifcations. Since the eventfd() file descriptor is
824 * always registered in EPOLLET mode, epoll returns event about
825 * only the latest write() to the descriptor.
826 */
827
828 if (engine->u.epoll.neventfd++ >= 0xFFFFFFFE) {
829 engine->u.epoll.neventfd = 0;
830
831 n = read(engine->u.epoll.eventfd.fd, &events, sizeof(uint64_t));
832
833 nxt_debug(task, "read(%d): %d events:%uL",
834 engine->u.epoll.eventfd.fd, n, events);
835
836 if (n != sizeof(uint64_t)) {
837 nxt_alert(task, "read eventfd(%d) failed %E",
838 engine->u.epoll.eventfd.fd, nxt_errno);
839 }
840 }
841
842 engine->u.epoll.post_handler(task, NULL, NULL);
843 }
844
845
846 static void
nxt_epoll_signal(nxt_event_engine_t * engine,nxt_uint_t signo)847 nxt_epoll_signal(nxt_event_engine_t *engine, nxt_uint_t signo)
848 {
849 size_t ret;
850 uint64_t event;
851
852 /*
853 * eventfd() presents along with signalfd(), so the function
854 * is used only to post events and the signo argument is ignored.
855 */
856
857 event = 1;
858
859 ret = write(engine->u.epoll.eventfd.fd, &event, sizeof(uint64_t));
860
861 if (nxt_slow_path(ret != sizeof(uint64_t))) {
862 nxt_alert(&engine->task, "write(%d) to eventfd failed %E",
863 engine->u.epoll.eventfd.fd, nxt_errno);
864 }
865 }
866
867 #endif
868
869
870 static void
nxt_epoll_poll(nxt_event_engine_t * engine,nxt_msec_t timeout)871 nxt_epoll_poll(nxt_event_engine_t *engine, nxt_msec_t timeout)
872 {
873 int nevents;
874 uint32_t events;
875 nxt_int_t i;
876 nxt_err_t err;
877 nxt_bool_t error;
878 nxt_uint_t level;
879 nxt_fd_event_t *ev;
880 struct epoll_event *event;
881
882 if (engine->u.epoll.nchanges != 0) {
883 nxt_epoll_commit_changes(engine);
884 }
885
886 if (engine->u.epoll.error) {
887 engine->u.epoll.error = 0;
888 /* Error handlers have been enqueued on failure. */
889 timeout = 0;
890 }
891
892 nxt_debug(&engine->task, "epoll_wait(%d) timeout:%M",
893 engine->u.epoll.fd, timeout);
894
895 nevents = epoll_wait(engine->u.epoll.fd, engine->u.epoll.events,
896 engine->u.epoll.mevents, timeout);
897
898 err = (nevents == -1) ? nxt_errno : 0;
899
900 nxt_thread_time_update(engine->task.thread);
901
902 nxt_debug(&engine->task, "epoll_wait(%d): %d", engine->u.epoll.fd, nevents);
903
904 if (nevents == -1) {
905 level = (err == NXT_EINTR) ? NXT_LOG_INFO : NXT_LOG_ALERT;
906
907 nxt_log(&engine->task, level, "epoll_wait(%d) failed %E",
908 engine->u.epoll.fd, err);
909
910 return;
911 }
912
913 for (i = 0; i < nevents; i++) {
914
915 event = &engine->u.epoll.events[i];
916 events = event->events;
917 ev = event->data.ptr;
918
919 nxt_debug(ev->task, "epoll: fd:%d ev:%04XD d:%p rd:%d wr:%d",
920 ev->fd, events, ev, ev->read, ev->write);
921
922 /*
923 * On error epoll may set EPOLLERR and EPOLLHUP only without EPOLLIN
924 * or EPOLLOUT, so the "error" variable enqueues only error handler.
925 */
926 error = ((events & (EPOLLERR | EPOLLHUP)) != 0);
927 ev->epoll_error = error;
928
929 if (error
930 && ev->read <= NXT_EVENT_BLOCKED
931 && ev->write <= NXT_EVENT_BLOCKED)
932 {
933 error = 0;
934 }
935
936 #if (NXT_HAVE_EPOLL_EDGE)
937
938 ev->epoll_eof = ((events & EPOLLRDHUP) != 0);
939
940 #endif
941
942 if ((events & EPOLLIN) != 0) {
943 ev->read_ready = 1;
944
945 if (ev->read != NXT_EVENT_BLOCKED) {
946
947 if (ev->read == NXT_EVENT_ONESHOT) {
948 ev->read = NXT_EVENT_DISABLED;
949 }
950
951 nxt_work_queue_add(ev->read_work_queue, ev->read_handler,
952 ev->task, ev, ev->data);
953
954 error = 0;
955
956 } else if (engine->u.epoll.mode == 0) {
957 /* Level-triggered mode. */
958 nxt_epoll_disable_read(engine, ev);
959 }
960 }
961
962 if ((events & EPOLLOUT) != 0) {
963 ev->write_ready = 1;
964
965 if (ev->write != NXT_EVENT_BLOCKED) {
966
967 if (ev->write == NXT_EVENT_ONESHOT) {
968 ev->write = NXT_EVENT_DISABLED;
969 }
970
971 nxt_work_queue_add(ev->write_work_queue, ev->write_handler,
972 ev->task, ev, ev->data);
973
974 error = 0;
975
976 } else if (engine->u.epoll.mode == 0) {
977 /* Level-triggered mode. */
978 nxt_epoll_disable_write(engine, ev);
979 }
980 }
981
982 if (!error) {
983 continue;
984 }
985
986 ev->read_ready = 1;
987 ev->write_ready = 1;
988
989 if (ev->read == NXT_EVENT_BLOCKED && ev->write == NXT_EVENT_BLOCKED) {
990
991 if (engine->u.epoll.mode == 0) {
992 /* Level-triggered mode. */
993 nxt_epoll_disable(engine, ev);
994 }
995
996 continue;
997 }
998
999 nxt_work_queue_add(&engine->fast_work_queue, nxt_epoll_error_handler,
1000 ev->task, ev, ev->data);
1001 }
1002 }
1003
1004
1005 #if (NXT_HAVE_ACCEPT4)
1006
1007 static void
nxt_epoll_conn_io_accept4(nxt_task_t * task,void * obj,void * data)1008 nxt_epoll_conn_io_accept4(nxt_task_t *task, void *obj, void *data)
1009 {
1010 socklen_t socklen;
1011 nxt_conn_t *c;
1012 nxt_socket_t s;
1013 struct sockaddr *sa;
1014 nxt_listen_event_t *lev;
1015
1016 lev = obj;
1017 c = lev->next;
1018
1019 lev->ready--;
1020 lev->socket.read_ready = (lev->ready != 0);
1021
1022 sa = &c->remote->u.sockaddr;
1023 socklen = c->remote->socklen;
1024 /*
1025 * The returned socklen is ignored here,
1026 * see comment in nxt_conn_io_accept().
1027 */
1028 s = accept4(lev->socket.fd, sa, &socklen, SOCK_NONBLOCK);
1029
1030 if (s != -1) {
1031 c->socket.fd = s;
1032
1033 nxt_debug(task, "accept4(%d): %d", lev->socket.fd, s);
1034
1035 nxt_conn_accept(task, lev, c);
1036 return;
1037 }
1038
1039 nxt_conn_accept_error(task, lev, "accept4", nxt_errno);
1040 }
1041
1042 #endif
1043
1044
1045 #if (NXT_HAVE_EPOLL_EDGE)
1046
1047 /*
1048 * nxt_epoll_edge_event_conn_io_connect() eliminates the getsockopt()
1049 * syscall to test pending connect() error. Although this special
1050 * interface can work in both edge-triggered and level-triggered
1051 * modes it is enabled only for the former mode because this mode is
1052 * available in all modern Linux distributions. For the latter mode
1053 * it is required to create additional nxt_epoll_level_event_conn_io
1054 * with single non-generic connect() interface.
1055 */
1056
1057 static void
nxt_epoll_edge_conn_io_connect(nxt_task_t * task,void * obj,void * data)1058 nxt_epoll_edge_conn_io_connect(nxt_task_t *task, void *obj, void *data)
1059 {
1060 nxt_conn_t *c;
1061 nxt_event_engine_t *engine;
1062 nxt_work_handler_t handler;
1063 const nxt_event_conn_state_t *state;
1064
1065 c = obj;
1066
1067 state = c->write_state;
1068
1069 switch (nxt_socket_connect(task, c->socket.fd, c->remote)) {
1070
1071 case NXT_OK:
1072 c->socket.write_ready = 1;
1073 handler = state->ready_handler;
1074 break;
1075
1076 case NXT_AGAIN:
1077 c->socket.write_handler = nxt_epoll_edge_conn_connected;
1078 c->socket.error_handler = nxt_conn_connect_error;
1079
1080 engine = task->thread->engine;
1081 nxt_conn_timer(engine, c, state, &c->write_timer);
1082
1083 nxt_epoll_enable(engine, &c->socket);
1084 c->socket.read = NXT_EVENT_BLOCKED;
1085 return;
1086
1087 #if 0
1088 case NXT_AGAIN:
1089 nxt_conn_timer(engine, c, state, &c->write_timer);
1090
1091 /* Fall through. */
1092
1093 case NXT_OK:
1094 /*
1095 * Mark both read and write directions as ready and try to perform
1096 * I/O operations before receiving readiness notifications.
1097 * On unconnected socket Linux send() and recv() return EAGAIN
1098 * instead of ENOTCONN.
1099 */
1100 c->socket.read_ready = 1;
1101 c->socket.write_ready = 1;
1102 /*
1103 * Enabling both read and write notifications on a getting
1104 * connected socket eliminates one epoll_ctl() syscall.
1105 */
1106 c->socket.write_handler = nxt_epoll_edge_event_conn_connected;
1107 c->socket.error_handler = state->error_handler;
1108
1109 nxt_epoll_enable(engine, &c->socket);
1110 c->socket.read = NXT_EVENT_BLOCKED;
1111
1112 handler = state->ready_handler;
1113 break;
1114 #endif
1115
1116 case NXT_ERROR:
1117 handler = state->error_handler;
1118 break;
1119
1120 default: /* NXT_DECLINED: connection refused. */
1121 handler = state->close_handler;
1122 break;
1123 }
1124
1125 nxt_work_queue_add(c->write_work_queue, handler, task, c, data);
1126 }
1127
1128
1129 static void
nxt_epoll_edge_conn_connected(nxt_task_t * task,void * obj,void * data)1130 nxt_epoll_edge_conn_connected(nxt_task_t *task, void *obj, void *data)
1131 {
1132 nxt_conn_t *c;
1133
1134 c = obj;
1135
1136 nxt_debug(task, "epoll event conn connected fd:%d", c->socket.fd);
1137
1138 if (!c->socket.epoll_error) {
1139 c->socket.write = NXT_EVENT_BLOCKED;
1140
1141 if (c->write_state->timer_autoreset) {
1142 nxt_timer_disable(task->thread->engine, &c->write_timer);
1143 }
1144
1145 nxt_work_queue_add(c->write_work_queue, c->write_state->ready_handler,
1146 task, c, data);
1147 return;
1148 }
1149
1150 nxt_conn_connect_test(task, c, data);
1151 }
1152
1153
1154 /*
1155 * nxt_epoll_edge_conn_io_recvbuf() is just wrapper around
1156 * standard nxt_conn_io_recvbuf() to enforce to read a pending EOF
1157 * in edge-triggered mode.
1158 */
1159
1160 static ssize_t
nxt_epoll_edge_conn_io_recvbuf(nxt_conn_t * c,nxt_buf_t * b)1161 nxt_epoll_edge_conn_io_recvbuf(nxt_conn_t *c, nxt_buf_t *b)
1162 {
1163 ssize_t n;
1164
1165 n = nxt_conn_io_recvbuf(c, b);
1166
1167 if (n > 0 && c->socket.epoll_eof) {
1168 c->socket.read_ready = 1;
1169 }
1170
1171 return n;
1172 }
1173
1174 #endif
1175