xref: /unit/src/nxt_epoll_engine.c (revision 611:323e11065f83)
1 
2 /*
3  * Copyright (C) Igor Sysoev
4  * Copyright (C) NGINX, Inc.
5  */
6 
7 #include <nxt_main.h>
8 
9 
10 /*
11  * The first epoll version has been introduced in Linux 2.5.44.  The
12  * interface was changed several times since then and the final version
13  * of epoll_create(), epoll_ctl(), epoll_wait(), and EPOLLET mode has
14  * been introduced in Linux 2.6.0 and is supported since glibc 2.3.2.
15  *
16  * EPOLLET mode did not work reliable in early implementaions and in
17  * Linux 2.4 backport.
18  *
19  * EPOLLONESHOT             Linux 2.6.2,  glibc 2.3.
20  * EPOLLRDHUP               Linux 2.6.17, glibc 2.8.
21  * epoll_pwait()            Linux 2.6.19, glibc 2.6.
22  * signalfd()               Linux 2.6.22, glibc 2.7.
23  * eventfd()                Linux 2.6.22, glibc 2.7.
24  * timerfd_create()         Linux 2.6.25, glibc 2.8.
25  * epoll_create1()          Linux 2.6.27, glibc 2.9.
26  * signalfd4()              Linux 2.6.27, glibc 2.9.
27  * eventfd2()               Linux 2.6.27, glibc 2.9.
28  * accept4()                Linux 2.6.28, glibc 2.10.
29  * eventfd2(EFD_SEMAPHORE)  Linux 2.6.30, glibc 2.10.
30  * EPOLLEXCLUSIVE           Linux 4.5, glibc 2.24.
31  */
32 
33 
34 #if (NXT_HAVE_EPOLL_EDGE)
35 static nxt_int_t nxt_epoll_edge_create(nxt_event_engine_t *engine,
36     nxt_uint_t mchanges, nxt_uint_t mevents);
37 #endif
38 static nxt_int_t nxt_epoll_level_create(nxt_event_engine_t *engine,
39     nxt_uint_t mchanges, nxt_uint_t mevents);
40 static nxt_int_t nxt_epoll_create(nxt_event_engine_t *engine,
41     nxt_uint_t mchanges, nxt_uint_t mevents, nxt_conn_io_t *io, uint32_t mode);
42 static void nxt_epoll_test_accept4(nxt_event_engine_t *engine,
43     nxt_conn_io_t *io);
44 static void nxt_epoll_free(nxt_event_engine_t *engine);
45 static void nxt_epoll_enable(nxt_event_engine_t *engine, nxt_fd_event_t *ev);
46 static void nxt_epoll_disable(nxt_event_engine_t *engine, nxt_fd_event_t *ev);
47 static void nxt_epoll_delete(nxt_event_engine_t *engine, nxt_fd_event_t *ev);
48 static nxt_bool_t nxt_epoll_close(nxt_event_engine_t *engine,
49     nxt_fd_event_t *ev);
50 static void nxt_epoll_enable_read(nxt_event_engine_t *engine,
51     nxt_fd_event_t *ev);
52 static void nxt_epoll_enable_write(nxt_event_engine_t *engine,
53     nxt_fd_event_t *ev);
54 static void nxt_epoll_disable_read(nxt_event_engine_t *engine,
55     nxt_fd_event_t *ev);
56 static void nxt_epoll_disable_write(nxt_event_engine_t *engine,
57     nxt_fd_event_t *ev);
58 static void nxt_epoll_block_read(nxt_event_engine_t *engine,
59     nxt_fd_event_t *ev);
60 static void nxt_epoll_block_write(nxt_event_engine_t *engine,
61     nxt_fd_event_t *ev);
62 static void nxt_epoll_oneshot_read(nxt_event_engine_t *engine,
63     nxt_fd_event_t *ev);
64 static void nxt_epoll_oneshot_write(nxt_event_engine_t *engine,
65     nxt_fd_event_t *ev);
66 static void nxt_epoll_enable_accept(nxt_event_engine_t *engine,
67     nxt_fd_event_t *ev);
68 static void nxt_epoll_change(nxt_event_engine_t *engine, nxt_fd_event_t *ev,
69     int op, uint32_t events);
70 static nxt_int_t nxt_epoll_commit_changes(nxt_event_engine_t *engine);
71 static void nxt_epoll_error_handler(nxt_task_t *task, void *obj, void *data);
72 #if (NXT_HAVE_SIGNALFD)
73 static nxt_int_t nxt_epoll_add_signal(nxt_event_engine_t *engine);
74 static void nxt_epoll_signalfd_handler(nxt_task_t *task, void *obj, void *data);
75 #endif
76 #if (NXT_HAVE_EVENTFD)
77 static nxt_int_t nxt_epoll_enable_post(nxt_event_engine_t *engine,
78     nxt_work_handler_t handler);
79 static void nxt_epoll_eventfd_handler(nxt_task_t *task, void *obj, void *data);
80 static void nxt_epoll_signal(nxt_event_engine_t *engine, nxt_uint_t signo);
81 #endif
82 static void nxt_epoll_poll(nxt_event_engine_t *engine, nxt_msec_t timeout);
83 
84 #if (NXT_HAVE_ACCEPT4)
85 static void nxt_epoll_conn_io_accept4(nxt_task_t *task, void *obj,
86     void *data);
87 #endif
88 
89 
90 #if (NXT_HAVE_EPOLL_EDGE)
91 
92 static void nxt_epoll_edge_conn_io_connect(nxt_task_t *task, void *obj,
93     void *data);
94 static void nxt_epoll_edge_conn_connected(nxt_task_t *task, void *obj,
95     void *data);
96 static ssize_t nxt_epoll_edge_conn_io_recvbuf(nxt_conn_t *c, nxt_buf_t *b);
97 
98 
99 static nxt_conn_io_t  nxt_epoll_edge_conn_io = {
100     nxt_epoll_edge_conn_io_connect,
101     nxt_conn_io_accept,
102 
103     nxt_conn_io_read,
104     nxt_epoll_edge_conn_io_recvbuf,
105     nxt_conn_io_recv,
106 
107     nxt_conn_io_write,
108     nxt_event_conn_io_write_chunk,
109 
110 #if (NXT_HAVE_LINUX_SENDFILE)
111     nxt_linux_event_conn_io_sendfile,
112 #else
113     nxt_event_conn_io_sendbuf,
114 #endif
115 
116     nxt_event_conn_io_writev,
117     nxt_event_conn_io_send,
118 
119     nxt_conn_io_shutdown,
120 };
121 
122 
123 const nxt_event_interface_t  nxt_epoll_edge_engine = {
124     "epoll_edge",
125     nxt_epoll_edge_create,
126     nxt_epoll_free,
127     nxt_epoll_enable,
128     nxt_epoll_disable,
129     nxt_epoll_delete,
130     nxt_epoll_close,
131     nxt_epoll_enable_read,
132     nxt_epoll_enable_write,
133     nxt_epoll_disable_read,
134     nxt_epoll_disable_write,
135     nxt_epoll_block_read,
136     nxt_epoll_block_write,
137     nxt_epoll_oneshot_read,
138     nxt_epoll_oneshot_write,
139     nxt_epoll_enable_accept,
140     NULL,
141     NULL,
142 #if (NXT_HAVE_EVENTFD)
143     nxt_epoll_enable_post,
144     nxt_epoll_signal,
145 #else
146     NULL,
147     NULL,
148 #endif
149     nxt_epoll_poll,
150 
151     &nxt_epoll_edge_conn_io,
152 
153 #if (NXT_HAVE_INOTIFY)
154     NXT_FILE_EVENTS,
155 #else
156     NXT_NO_FILE_EVENTS,
157 #endif
158 
159 #if (NXT_HAVE_SIGNALFD)
160     NXT_SIGNAL_EVENTS,
161 #else
162     NXT_NO_SIGNAL_EVENTS,
163 #endif
164 };
165 
166 #endif
167 
168 
169 const nxt_event_interface_t  nxt_epoll_level_engine = {
170     "epoll_level",
171     nxt_epoll_level_create,
172     nxt_epoll_free,
173     nxt_epoll_enable,
174     nxt_epoll_disable,
175     nxt_epoll_delete,
176     nxt_epoll_close,
177     nxt_epoll_enable_read,
178     nxt_epoll_enable_write,
179     nxt_epoll_disable_read,
180     nxt_epoll_disable_write,
181     nxt_epoll_block_read,
182     nxt_epoll_block_write,
183     nxt_epoll_oneshot_read,
184     nxt_epoll_oneshot_write,
185     nxt_epoll_enable_accept,
186     NULL,
187     NULL,
188 #if (NXT_HAVE_EVENTFD)
189     nxt_epoll_enable_post,
190     nxt_epoll_signal,
191 #else
192     NULL,
193     NULL,
194 #endif
195     nxt_epoll_poll,
196 
197     &nxt_unix_conn_io,
198 
199 #if (NXT_HAVE_INOTIFY)
200     NXT_FILE_EVENTS,
201 #else
202     NXT_NO_FILE_EVENTS,
203 #endif
204 
205 #if (NXT_HAVE_SIGNALFD)
206     NXT_SIGNAL_EVENTS,
207 #else
208     NXT_NO_SIGNAL_EVENTS,
209 #endif
210 };
211 
212 
213 #if (NXT_HAVE_EPOLL_EDGE)
214 
215 static nxt_int_t
216 nxt_epoll_edge_create(nxt_event_engine_t *engine, nxt_uint_t mchanges,
217     nxt_uint_t mevents)
218 {
219     return nxt_epoll_create(engine, mchanges, mevents, &nxt_epoll_edge_conn_io,
220                             EPOLLET | EPOLLRDHUP);
221 }
222 
223 #endif
224 
225 
226 static nxt_int_t
227 nxt_epoll_level_create(nxt_event_engine_t *engine, nxt_uint_t mchanges,
228     nxt_uint_t mevents)
229 {
230     return nxt_epoll_create(engine, mchanges, mevents,
231                             &nxt_unix_conn_io, 0);
232 }
233 
234 
235 static nxt_int_t
236 nxt_epoll_create(nxt_event_engine_t *engine, nxt_uint_t mchanges,
237     nxt_uint_t mevents, nxt_conn_io_t *io, uint32_t mode)
238 {
239     engine->u.epoll.fd = -1;
240     engine->u.epoll.mode = mode;
241     engine->u.epoll.mchanges = mchanges;
242     engine->u.epoll.mevents = mevents;
243 #if (NXT_HAVE_SIGNALFD)
244     engine->u.epoll.signalfd.fd = -1;
245 #endif
246 
247     engine->u.epoll.changes = nxt_malloc(sizeof(nxt_epoll_change_t) * mchanges);
248     if (engine->u.epoll.changes == NULL) {
249         goto fail;
250     }
251 
252     engine->u.epoll.events = nxt_malloc(sizeof(struct epoll_event) * mevents);
253     if (engine->u.epoll.events == NULL) {
254         goto fail;
255     }
256 
257     engine->u.epoll.fd = epoll_create(1);
258     if (engine->u.epoll.fd == -1) {
259         nxt_alert(&engine->task, "epoll_create() failed %E", nxt_errno);
260         goto fail;
261     }
262 
263     nxt_debug(&engine->task, "epoll_create(): %d", engine->u.epoll.fd);
264 
265     if (engine->signals != NULL) {
266 
267 #if (NXT_HAVE_SIGNALFD)
268 
269         if (nxt_epoll_add_signal(engine) != NXT_OK) {
270             goto fail;
271         }
272 
273 #endif
274 
275         nxt_epoll_test_accept4(engine, io);
276     }
277 
278     return NXT_OK;
279 
280 fail:
281 
282     nxt_epoll_free(engine);
283 
284     return NXT_ERROR;
285 }
286 
287 
288 static void
289 nxt_epoll_test_accept4(nxt_event_engine_t *engine, nxt_conn_io_t *io)
290 {
291     static nxt_work_handler_t  handler;
292 
293     if (handler == NULL) {
294 
295         handler = io->accept;
296 
297 #if (NXT_HAVE_ACCEPT4)
298 
299         (void) accept4(-1, NULL, NULL, SOCK_NONBLOCK);
300 
301         if (nxt_errno != NXT_ENOSYS) {
302             handler = nxt_epoll_conn_io_accept4;
303 
304         } else {
305             nxt_log(&engine->task, NXT_LOG_INFO, "accept4() failed %E",
306                     NXT_ENOSYS);
307         }
308 
309 #endif
310     }
311 
312     io->accept = handler;
313 }
314 
315 
316 static void
317 nxt_epoll_free(nxt_event_engine_t *engine)
318 {
319     int  fd;
320 
321     nxt_debug(&engine->task, "epoll %d free", engine->u.epoll.fd);
322 
323 #if (NXT_HAVE_SIGNALFD)
324 
325     fd = engine->u.epoll.signalfd.fd;
326 
327     if (fd != -1 && close(fd) != 0) {
328         nxt_alert(&engine->task, "signalfd close(%d) failed %E", fd, nxt_errno);
329     }
330 
331 #endif
332 
333 #if (NXT_HAVE_EVENTFD)
334 
335     fd = engine->u.epoll.eventfd.fd;
336 
337     if (fd != -1 && close(fd) != 0) {
338         nxt_alert(&engine->task, "eventfd close(%d) failed %E", fd, nxt_errno);
339     }
340 
341 #endif
342 
343     fd = engine->u.epoll.fd;
344 
345     if (fd != -1 && close(fd) != 0) {
346         nxt_alert(&engine->task, "epoll close(%d) failed %E", fd, nxt_errno);
347     }
348 
349     nxt_free(engine->u.epoll.events);
350     nxt_free(engine->u.epoll.changes);
351 
352     nxt_memzero(&engine->u.epoll, sizeof(nxt_epoll_engine_t));
353 }
354 
355 
356 static void
357 nxt_epoll_enable(nxt_event_engine_t *engine, nxt_fd_event_t *ev)
358 {
359     ev->read = NXT_EVENT_ACTIVE;
360     ev->write = NXT_EVENT_ACTIVE;
361 
362     nxt_epoll_change(engine, ev, EPOLL_CTL_ADD,
363                      EPOLLIN | EPOLLOUT | engine->u.epoll.mode);
364 }
365 
366 
367 static void
368 nxt_epoll_disable(nxt_event_engine_t *engine, nxt_fd_event_t *ev)
369 {
370     if (ev->read > NXT_EVENT_DISABLED || ev->write > NXT_EVENT_DISABLED) {
371 
372         ev->read = NXT_EVENT_INACTIVE;
373         ev->write = NXT_EVENT_INACTIVE;
374 
375         nxt_epoll_change(engine, ev, EPOLL_CTL_DEL, 0);
376     }
377 }
378 
379 
380 static void
381 nxt_epoll_delete(nxt_event_engine_t *engine, nxt_fd_event_t *ev)
382 {
383     if (ev->read != NXT_EVENT_INACTIVE || ev->write != NXT_EVENT_INACTIVE) {
384 
385         ev->read = NXT_EVENT_INACTIVE;
386         ev->write = NXT_EVENT_INACTIVE;
387 
388         nxt_epoll_change(engine, ev, EPOLL_CTL_DEL, 0);
389     }
390 }
391 
392 
393 /*
394  * Although calling close() on a file descriptor will remove any epoll
395  * events that reference the descriptor, in this case the close() acquires
396  * the kernel global "epmutex" while epoll_ctl(EPOLL_CTL_DEL) does not
397  * acquire the "epmutex" since Linux 3.13 if the file descriptor presents
398  * only in one epoll set.  Thus removing events explicitly before closing
399  * eliminates possible lock contention.
400  */
401 
402 static nxt_bool_t
403 nxt_epoll_close(nxt_event_engine_t *engine, nxt_fd_event_t *ev)
404 {
405     nxt_epoll_delete(engine, ev);
406 
407     return ev->changing;
408 }
409 
410 
411 static void
412 nxt_epoll_enable_read(nxt_event_engine_t *engine, nxt_fd_event_t *ev)
413 {
414     int       op;
415     uint32_t  events;
416 
417     if (ev->read != NXT_EVENT_BLOCKED) {
418 
419         op = EPOLL_CTL_MOD;
420         events = EPOLLIN | engine->u.epoll.mode;
421 
422         if (ev->read == NXT_EVENT_INACTIVE && ev->write == NXT_EVENT_INACTIVE) {
423             op = EPOLL_CTL_ADD;
424 
425         } else if (ev->write >= NXT_EVENT_BLOCKED) {
426             events |= EPOLLOUT;
427         }
428 
429         nxt_epoll_change(engine, ev, op, events);
430     }
431 
432     ev->read = NXT_EVENT_ACTIVE;
433 }
434 
435 
436 static void
437 nxt_epoll_enable_write(nxt_event_engine_t *engine, nxt_fd_event_t *ev)
438 {
439     int       op;
440     uint32_t  events;
441 
442     if (ev->write != NXT_EVENT_BLOCKED) {
443 
444         op = EPOLL_CTL_MOD;
445         events = EPOLLOUT | engine->u.epoll.mode;
446 
447         if (ev->read == NXT_EVENT_INACTIVE && ev->write == NXT_EVENT_INACTIVE) {
448             op = EPOLL_CTL_ADD;
449 
450         } else if (ev->read >= NXT_EVENT_BLOCKED) {
451             events |= EPOLLIN;
452         }
453 
454         nxt_epoll_change(engine, ev, op, events);
455     }
456 
457     ev->write = NXT_EVENT_ACTIVE;
458 }
459 
460 
461 static void
462 nxt_epoll_disable_read(nxt_event_engine_t *engine, nxt_fd_event_t *ev)
463 {
464     int       op;
465     uint32_t  events;
466 
467     ev->read = NXT_EVENT_INACTIVE;
468 
469     if (ev->write <= NXT_EVENT_DISABLED) {
470         ev->write = NXT_EVENT_INACTIVE;
471         op = EPOLL_CTL_DEL;
472         events = 0;
473 
474     } else {
475         op = EPOLL_CTL_MOD;
476         events = EPOLLOUT | engine->u.epoll.mode;
477     }
478 
479     nxt_epoll_change(engine, ev, op, events);
480 }
481 
482 
483 static void
484 nxt_epoll_disable_write(nxt_event_engine_t *engine, nxt_fd_event_t *ev)
485 {
486     int       op;
487     uint32_t  events;
488 
489     ev->write = NXT_EVENT_INACTIVE;
490 
491     if (ev->read <= NXT_EVENT_DISABLED) {
492         ev->write = NXT_EVENT_INACTIVE;
493         op = EPOLL_CTL_DEL;
494         events = 0;
495 
496     } else {
497         op = EPOLL_CTL_MOD;
498         events = EPOLLIN | engine->u.epoll.mode;
499     }
500 
501     nxt_epoll_change(engine, ev, op, events);
502 }
503 
504 
505 static void
506 nxt_epoll_block_read(nxt_event_engine_t *engine, nxt_fd_event_t *ev)
507 {
508     if (ev->read != NXT_EVENT_INACTIVE) {
509         ev->read = NXT_EVENT_BLOCKED;
510     }
511 }
512 
513 
514 static void
515 nxt_epoll_block_write(nxt_event_engine_t *engine, nxt_fd_event_t *ev)
516 {
517     if (ev->write != NXT_EVENT_INACTIVE) {
518         ev->write = NXT_EVENT_BLOCKED;
519     }
520 }
521 
522 
523 /*
524  * NXT_EVENT_DISABLED state is used to track whether EPOLLONESHOT
525  * event should be added or modified, epoll_ctl(2):
526  *
527  * EPOLLONESHOT (since Linux 2.6.2)
528  *     Sets the one-shot behavior for the associated file descriptor.
529  *     This means that after an event is pulled out with epoll_wait(2)
530  *     the associated file descriptor is internally disabled and no
531  *     other events will be reported by the epoll interface.  The user
532  *     must call epoll_ctl() with EPOLL_CTL_MOD to rearm the file
533  *     descriptor with a new event mask.
534  */
535 
536 static void
537 nxt_epoll_oneshot_read(nxt_event_engine_t *engine, nxt_fd_event_t *ev)
538 {
539     int  op;
540 
541     op = (ev->read == NXT_EVENT_INACTIVE && ev->write == NXT_EVENT_INACTIVE) ?
542              EPOLL_CTL_ADD : EPOLL_CTL_MOD;
543 
544     ev->read = NXT_EVENT_ONESHOT;
545     ev->write = NXT_EVENT_INACTIVE;
546 
547     nxt_epoll_change(engine, ev, op, EPOLLIN | EPOLLONESHOT);
548 }
549 
550 
551 static void
552 nxt_epoll_oneshot_write(nxt_event_engine_t *engine, nxt_fd_event_t *ev)
553 {
554     int  op;
555 
556     op = (ev->read == NXT_EVENT_INACTIVE && ev->write == NXT_EVENT_INACTIVE) ?
557              EPOLL_CTL_ADD : EPOLL_CTL_MOD;
558 
559     ev->read = NXT_EVENT_INACTIVE;
560     ev->write = NXT_EVENT_ONESHOT;
561 
562     nxt_epoll_change(engine, ev, op, EPOLLOUT | EPOLLONESHOT);
563 }
564 
565 
566 static void
567 nxt_epoll_enable_accept(nxt_event_engine_t *engine, nxt_fd_event_t *ev)
568 {
569     uint32_t  events;
570 
571     ev->read = NXT_EVENT_ACTIVE;
572 
573     events = EPOLLIN;
574 
575 #ifdef EPOLLEXCLUSIVE
576     events |= EPOLLEXCLUSIVE;
577 #endif
578 
579     nxt_epoll_change(engine, ev, EPOLL_CTL_ADD, events);
580 }
581 
582 
583 /*
584  * epoll changes are batched to improve instruction and data cache
585  * locality of several epoll_ctl() calls followed by epoll_wait() call.
586  */
587 
588 static void
589 nxt_epoll_change(nxt_event_engine_t *engine, nxt_fd_event_t *ev, int op,
590     uint32_t events)
591 {
592     nxt_epoll_change_t  *change;
593 
594     nxt_debug(ev->task, "epoll %d set event: fd:%d op:%d ev:%XD",
595               engine->u.epoll.fd, ev->fd, op, events);
596 
597     if (engine->u.epoll.nchanges >= engine->u.epoll.mchanges) {
598         (void) nxt_epoll_commit_changes(engine);
599     }
600 
601     ev->changing = 1;
602 
603     change = &engine->u.epoll.changes[engine->u.epoll.nchanges++];
604     change->op = op;
605     change->event.events = events;
606     change->event.data.ptr = ev;
607 }
608 
609 
610 static nxt_int_t
611 nxt_epoll_commit_changes(nxt_event_engine_t *engine)
612 {
613     int                 ret;
614     nxt_int_t           retval;
615     nxt_fd_event_t      *ev;
616     nxt_epoll_change_t  *change, *end;
617 
618     nxt_debug(&engine->task, "epoll %d changes:%ui",
619               engine->u.epoll.fd, engine->u.epoll.nchanges);
620 
621     retval = NXT_OK;
622     change = engine->u.epoll.changes;
623     end = change + engine->u.epoll.nchanges;
624 
625     do {
626         ev = change->event.data.ptr;
627         ev->changing = 0;
628 
629         nxt_debug(ev->task, "epoll_ctl(%d): fd:%d op:%d ev:%XD",
630                   engine->u.epoll.fd, ev->fd, change->op,
631                   change->event.events);
632 
633         ret = epoll_ctl(engine->u.epoll.fd, change->op, ev->fd, &change->event);
634 
635         if (nxt_slow_path(ret != 0)) {
636             nxt_alert(ev->task, "epoll_ctl(%d, %d, %d) failed %E",
637                       engine->u.epoll.fd, change->op, ev->fd, nxt_errno);
638 
639             nxt_work_queue_add(&engine->fast_work_queue,
640                                nxt_epoll_error_handler, ev->task, ev, ev->data);
641 
642             retval = NXT_ERROR;
643         }
644 
645         change++;
646 
647     } while (change < end);
648 
649     engine->u.epoll.nchanges = 0;
650 
651     return retval;
652 }
653 
654 
655 static void
656 nxt_epoll_error_handler(nxt_task_t *task, void *obj, void *data)
657 {
658     nxt_fd_event_t  *ev;
659 
660     ev = obj;
661 
662     ev->read = NXT_EVENT_INACTIVE;
663     ev->write = NXT_EVENT_INACTIVE;
664 
665     ev->error_handler(ev->task, ev, data);
666 }
667 
668 
669 #if (NXT_HAVE_SIGNALFD)
670 
671 static nxt_int_t
672 nxt_epoll_add_signal(nxt_event_engine_t *engine)
673 {
674     int                 fd;
675     struct epoll_event  ee;
676 
677     if (sigprocmask(SIG_BLOCK, &engine->signals->sigmask, NULL) != 0) {
678         nxt_alert(&engine->task, "sigprocmask(SIG_BLOCK) failed %E", nxt_errno);
679         return NXT_ERROR;
680     }
681 
682     /*
683      * Glibc signalfd() wrapper always has the flags argument.  Glibc 2.7
684      * and 2.8 signalfd() wrappers call the original signalfd() syscall
685      * without the flags argument.  Glibc 2.9+ signalfd() wrapper at first
686      * tries to call signalfd4() syscall and if it fails then calls the
687      * original signalfd() syscall.  For this reason the non-blocking mode
688      * is set separately.
689      */
690 
691     fd = signalfd(-1, &engine->signals->sigmask, 0);
692 
693     if (fd == -1) {
694         nxt_alert(&engine->task, "signalfd(%d) failed %E",
695                   engine->u.epoll.signalfd.fd, nxt_errno);
696         return NXT_ERROR;
697     }
698 
699     engine->u.epoll.signalfd.fd = fd;
700 
701     if (nxt_fd_nonblocking(&engine->task, fd) != NXT_OK) {
702         return NXT_ERROR;
703     }
704 
705     nxt_debug(&engine->task, "signalfd(): %d", fd);
706 
707     engine->u.epoll.signalfd.data = engine->signals->handler;
708     engine->u.epoll.signalfd.read_work_queue = &engine->fast_work_queue;
709     engine->u.epoll.signalfd.read_handler = nxt_epoll_signalfd_handler;
710     engine->u.epoll.signalfd.log = engine->task.log;
711     engine->u.epoll.signalfd.task = &engine->task;
712 
713     ee.events = EPOLLIN;
714     ee.data.ptr = &engine->u.epoll.signalfd;
715 
716     if (epoll_ctl(engine->u.epoll.fd, EPOLL_CTL_ADD, fd, &ee) != 0) {
717         nxt_alert(&engine->task, "epoll_ctl(%d, %d, %d) failed %E",
718                   engine->u.epoll.fd, EPOLL_CTL_ADD, fd, nxt_errno);
719 
720         return NXT_ERROR;
721     }
722 
723     return NXT_OK;
724 }
725 
726 
727 static void
728 nxt_epoll_signalfd_handler(nxt_task_t *task, void *obj, void *data)
729 {
730     int                      n;
731     nxt_fd_event_t           *ev;
732     nxt_work_handler_t       handler;
733     struct signalfd_siginfo  sfd;
734 
735     ev = obj;
736     handler = data;
737 
738     nxt_debug(task, "signalfd handler");
739 
740     n = read(ev->fd, &sfd, sizeof(struct signalfd_siginfo));
741 
742     nxt_debug(task, "read signalfd(%d): %d", ev->fd, n);
743 
744     if (n != sizeof(struct signalfd_siginfo)) {
745         nxt_alert(task, "read signalfd(%d) failed %E", ev->fd, nxt_errno);
746         return;
747     }
748 
749     nxt_debug(task, "signalfd(%d) signo:%d", ev->fd, sfd.ssi_signo);
750 
751     handler(task, (void *) (uintptr_t) sfd.ssi_signo, NULL);
752 }
753 
754 #endif
755 
756 
757 #if (NXT_HAVE_EVENTFD)
758 
759 static nxt_int_t
760 nxt_epoll_enable_post(nxt_event_engine_t *engine, nxt_work_handler_t handler)
761 {
762     int                 ret;
763     struct epoll_event  ee;
764 
765     engine->u.epoll.post_handler = handler;
766 
767     /*
768      * Glibc eventfd() wrapper always has the flags argument.  Glibc 2.7
769      * and 2.8 eventfd() wrappers call the original eventfd() syscall
770      * without the flags argument.  Glibc 2.9+ eventfd() wrapper at first
771      * tries to call eventfd2() syscall and if it fails then calls the
772      * original eventfd() syscall.  For this reason the non-blocking mode
773      * is set separately.
774      */
775 
776     engine->u.epoll.eventfd.fd = eventfd(0, 0);
777 
778     if (engine->u.epoll.eventfd.fd == -1) {
779         nxt_alert(&engine->task, "eventfd() failed %E", nxt_errno);
780         return NXT_ERROR;
781     }
782 
783     ret = nxt_fd_nonblocking(&engine->task, engine->u.epoll.eventfd.fd);
784     if (nxt_slow_path(ret != NXT_OK)) {
785         return NXT_ERROR;
786     }
787 
788     nxt_debug(&engine->task, "eventfd(): %d", engine->u.epoll.eventfd.fd);
789 
790     engine->u.epoll.eventfd.read_work_queue = &engine->fast_work_queue;
791     engine->u.epoll.eventfd.read_handler = nxt_epoll_eventfd_handler;
792     engine->u.epoll.eventfd.data = engine;
793     engine->u.epoll.eventfd.log = engine->task.log;
794     engine->u.epoll.eventfd.task = &engine->task;
795 
796     ee.events = EPOLLIN | EPOLLET;
797     ee.data.ptr = &engine->u.epoll.eventfd;
798 
799     ret = epoll_ctl(engine->u.epoll.fd, EPOLL_CTL_ADD,
800                     engine->u.epoll.eventfd.fd, &ee);
801 
802     if (nxt_fast_path(ret == 0)) {
803         return NXT_OK;
804     }
805 
806     nxt_alert(&engine->task, "epoll_ctl(%d, %d, %d) failed %E",
807               engine->u.epoll.fd, EPOLL_CTL_ADD, engine->u.epoll.eventfd.fd,
808               nxt_errno);
809 
810     return NXT_ERROR;
811 }
812 
813 
814 static void
815 nxt_epoll_eventfd_handler(nxt_task_t *task, void *obj, void *data)
816 {
817     int                 n;
818     uint64_t            events;
819     nxt_event_engine_t  *engine;
820 
821     engine = data;
822 
823     nxt_debug(task, "eventfd handler, times:%ui", engine->u.epoll.neventfd);
824 
825     /*
826      * The maximum value after write() to a eventfd() descriptor will
827      * block or return EAGAIN is 0xFFFFFFFFFFFFFFFE, so the descriptor
828      * can be read once per many notifications, for example, once per
829      * 2^32-2 noticifcations.  Since the eventfd() file descriptor is
830      * always registered in EPOLLET mode, epoll returns event about
831      * only the latest write() to the descriptor.
832      */
833 
834     if (engine->u.epoll.neventfd++ >= 0xFFFFFFFE) {
835         engine->u.epoll.neventfd = 0;
836 
837         n = read(engine->u.epoll.eventfd.fd, &events, sizeof(uint64_t));
838 
839         nxt_debug(task, "read(%d): %d events:%uL",
840                   engine->u.epoll.eventfd.fd, n, events);
841 
842         if (n != sizeof(uint64_t)) {
843             nxt_alert(task, "read eventfd(%d) failed %E",
844                       engine->u.epoll.eventfd.fd, nxt_errno);
845         }
846     }
847 
848     engine->u.epoll.post_handler(task, NULL, NULL);
849 }
850 
851 
852 static void
853 nxt_epoll_signal(nxt_event_engine_t *engine, nxt_uint_t signo)
854 {
855     size_t    ret;
856     uint64_t  event;
857 
858     /*
859      * eventfd() presents along with signalfd(), so the function
860      * is used only to post events and the signo argument is ignored.
861      */
862 
863     event = 1;
864 
865     ret = write(engine->u.epoll.eventfd.fd, &event, sizeof(uint64_t));
866 
867     if (nxt_slow_path(ret != sizeof(uint64_t))) {
868         nxt_alert(&engine->task, "write(%d) to eventfd failed %E",
869                   engine->u.epoll.eventfd.fd, nxt_errno);
870     }
871 }
872 
873 #endif
874 
875 
876 static void
877 nxt_epoll_poll(nxt_event_engine_t *engine, nxt_msec_t timeout)
878 {
879     int                 nevents;
880     uint32_t            events;
881     nxt_int_t           i;
882     nxt_err_t           err;
883     nxt_bool_t          error;
884     nxt_uint_t          level;
885     nxt_fd_event_t      *ev;
886     struct epoll_event  *event;
887 
888     if (engine->u.epoll.nchanges != 0) {
889         if (nxt_epoll_commit_changes(engine) != NXT_OK) {
890             /* Error handlers have been enqueued on failure. */
891             timeout = 0;
892         }
893     }
894 
895     nxt_debug(&engine->task, "epoll_wait(%d) timeout:%M",
896               engine->u.epoll.fd, timeout);
897 
898     nevents = epoll_wait(engine->u.epoll.fd, engine->u.epoll.events,
899                          engine->u.epoll.mevents, timeout);
900 
901     err = (nevents == -1) ? nxt_errno : 0;
902 
903     nxt_thread_time_update(engine->task.thread);
904 
905     nxt_debug(&engine->task, "epoll_wait(%d): %d", engine->u.epoll.fd, nevents);
906 
907     if (nevents == -1) {
908         level = (err == NXT_EINTR) ? NXT_LOG_INFO : NXT_LOG_ALERT;
909 
910         nxt_log(&engine->task, level, "epoll_wait(%d) failed %E",
911                 engine->u.epoll.fd, err);
912 
913         return;
914     }
915 
916     for (i = 0; i < nevents; i++) {
917 
918         event = &engine->u.epoll.events[i];
919         events = event->events;
920         ev = event->data.ptr;
921 
922         nxt_debug(ev->task, "epoll: fd:%d ev:%04XD d:%p rd:%d wr:%d",
923                   ev->fd, events, ev, ev->read, ev->write);
924 
925         /*
926          * On error epoll may set EPOLLERR and EPOLLHUP only without EPOLLIN or
927          * EPOLLOUT, so the "error" variable enqueues only one active handler.
928          */
929         error = ((events & (EPOLLERR | EPOLLHUP)) != 0);
930         ev->epoll_error = error;
931 
932 #if (NXT_HAVE_EPOLL_EDGE)
933 
934         ev->epoll_eof = ((events & EPOLLRDHUP) != 0);
935 
936 #endif
937 
938         if ((events & EPOLLIN) || error) {
939             ev->read_ready = 1;
940 
941             if (ev->read != NXT_EVENT_BLOCKED) {
942 
943                 if (ev->read == NXT_EVENT_ONESHOT) {
944                     ev->read = NXT_EVENT_DISABLED;
945                 }
946 
947                 error = 0;
948 
949                 nxt_work_queue_add(ev->read_work_queue, ev->read_handler,
950                                    ev->task, ev, ev->data);
951 
952             } else if (engine->u.epoll.mode == 0) {
953                 /* Level-triggered mode. */
954                 nxt_epoll_disable_read(engine, ev);
955             }
956         }
957 
958         if ((events & EPOLLOUT) || error) {
959             ev->write_ready = 1;
960 
961             if (ev->write != NXT_EVENT_BLOCKED) {
962 
963                 if (ev->write == NXT_EVENT_ONESHOT) {
964                     ev->write = NXT_EVENT_DISABLED;
965                 }
966 
967                 error = 0;
968 
969                 nxt_work_queue_add(ev->write_work_queue, ev->write_handler,
970                                    ev->task, ev, ev->data);
971 
972             } else if (engine->u.epoll.mode == 0) {
973                 /* Level-triggered mode. */
974                 nxt_epoll_disable_write(engine, ev);
975             }
976         }
977 
978         if (error) {
979             ev->read_ready = 1;
980             ev->write_ready = 1;
981         }
982     }
983 }
984 
985 
986 #if (NXT_HAVE_ACCEPT4)
987 
988 static void
989 nxt_epoll_conn_io_accept4(nxt_task_t *task, void *obj, void *data)
990 {
991     socklen_t           socklen;
992     nxt_conn_t          *c;
993     nxt_socket_t        s;
994     struct sockaddr     *sa;
995     nxt_listen_event_t  *lev;
996 
997     lev = obj;
998     c = lev->next;
999 
1000     lev->ready--;
1001     lev->socket.read_ready = (lev->ready != 0);
1002 
1003     sa = &c->remote->u.sockaddr;
1004     socklen = c->remote->socklen;
1005     /*
1006      * The returned socklen is ignored here,
1007      * see comment in nxt_conn_io_accept().
1008      */
1009     s = accept4(lev->socket.fd, sa, &socklen, SOCK_NONBLOCK);
1010 
1011     if (s != -1) {
1012         c->socket.fd = s;
1013 
1014         nxt_debug(task, "accept4(%d): %d", lev->socket.fd, s);
1015 
1016         nxt_conn_accept(task, lev, c);
1017         return;
1018     }
1019 
1020     nxt_conn_accept_error(task, lev, "accept4", nxt_errno);
1021 }
1022 
1023 #endif
1024 
1025 
1026 #if (NXT_HAVE_EPOLL_EDGE)
1027 
1028 /*
1029  * nxt_epoll_edge_event_conn_io_connect() eliminates the getsockopt()
1030  * syscall to test pending connect() error.  Although this special
1031  * interface can work in both edge-triggered and level-triggered
1032  * modes it is enabled only for the former mode because this mode is
1033  * available in all modern Linux distributions.  For the latter mode
1034  * it is required to create additional nxt_epoll_level_event_conn_io
1035  * with single non-generic connect() interface.
1036  */
1037 
1038 static void
1039 nxt_epoll_edge_conn_io_connect(nxt_task_t *task, void *obj, void *data)
1040 {
1041     nxt_conn_t                    *c;
1042     nxt_event_engine_t            *engine;
1043     nxt_work_handler_t            handler;
1044     const nxt_event_conn_state_t  *state;
1045 
1046     c = obj;
1047 
1048     state = c->write_state;
1049 
1050     switch (nxt_socket_connect(task, c->socket.fd, c->remote) ){
1051 
1052     case NXT_OK:
1053         c->socket.write_ready = 1;
1054         handler = state->ready_handler;
1055         break;
1056 
1057     case NXT_AGAIN:
1058         c->socket.write_handler = nxt_epoll_edge_conn_connected;
1059         c->socket.error_handler = nxt_conn_connect_error;
1060 
1061         engine = task->thread->engine;
1062         nxt_conn_timer(engine, c, state, &c->write_timer);
1063 
1064         nxt_epoll_enable(engine, &c->socket);
1065         c->socket.read = NXT_EVENT_BLOCKED;
1066         return;
1067 
1068 #if 0
1069     case NXT_AGAIN:
1070         nxt_conn_timer(engine, c, state, &c->write_timer);
1071 
1072         /* Fall through. */
1073 
1074     case NXT_OK:
1075         /*
1076          * Mark both read and write directions as ready and try to perform
1077          * I/O operations before receiving readiness notifications.
1078          * On unconnected socket Linux send() and recv() return EAGAIN
1079          * instead of ENOTCONN.
1080          */
1081         c->socket.read_ready = 1;
1082         c->socket.write_ready = 1;
1083         /*
1084          * Enabling both read and write notifications on a getting
1085          * connected socket eliminates one epoll_ctl() syscall.
1086          */
1087         c->socket.write_handler = nxt_epoll_edge_event_conn_connected;
1088         c->socket.error_handler = state->error_handler;
1089 
1090         nxt_epoll_enable(engine, &c->socket);
1091         c->socket.read = NXT_EVENT_BLOCKED;
1092 
1093         handler = state->ready_handler;
1094         break;
1095 #endif
1096 
1097     case NXT_ERROR:
1098         handler = state->error_handler;
1099         break;
1100 
1101     default:  /* NXT_DECLINED: connection refused. */
1102         handler = state->close_handler;
1103         break;
1104     }
1105 
1106     nxt_work_queue_add(c->write_work_queue, handler, task, c, data);
1107 }
1108 
1109 
1110 static void
1111 nxt_epoll_edge_conn_connected(nxt_task_t *task, void *obj, void *data)
1112 {
1113     nxt_conn_t  *c;
1114 
1115     c = obj;
1116 
1117     nxt_debug(task, "epoll event conn connected fd:%d", c->socket.fd);
1118 
1119     if (!c->socket.epoll_error) {
1120         c->socket.write = NXT_EVENT_BLOCKED;
1121 
1122         if (c->write_state->timer_autoreset) {
1123             nxt_timer_disable(task->thread->engine, &c->write_timer);
1124         }
1125 
1126         nxt_work_queue_add(c->write_work_queue, c->write_state->ready_handler,
1127                            task, c, data);
1128         return;
1129     }
1130 
1131     nxt_conn_connect_test(task, c, data);
1132 }
1133 
1134 
1135 /*
1136  * nxt_epoll_edge_conn_io_recvbuf() is just wrapper around
1137  * standard nxt_conn_io_recvbuf() to enforce to read a pending EOF
1138  * in edge-triggered mode.
1139  */
1140 
1141 static ssize_t
1142 nxt_epoll_edge_conn_io_recvbuf(nxt_conn_t *c, nxt_buf_t *b)
1143 {
1144     ssize_t  n;
1145 
1146     n = nxt_conn_io_recvbuf(c, b);
1147 
1148     if (n > 0 && c->socket.epoll_eof) {
1149         c->socket.read_ready = 1;
1150     }
1151 
1152     return n;
1153 }
1154 
1155 #endif
1156