xref: /unit/src/nxt_epoll_engine.c (revision 545:9aee39926d7d)
1 
2 /*
3  * Copyright (C) Igor Sysoev
4  * Copyright (C) NGINX, Inc.
5  */
6 
7 #include <nxt_main.h>
8 
9 
10 /*
11  * The first epoll version has been introduced in Linux 2.5.44.  The
12  * interface was changed several times since then and the final version
13  * of epoll_create(), epoll_ctl(), epoll_wait(), and EPOLLET mode has
14  * been introduced in Linux 2.6.0 and is supported since glibc 2.3.2.
15  *
16  * EPOLLET mode did not work reliable in early implementaions and in
17  * Linux 2.4 backport.
18  *
19  * EPOLLONESHOT             Linux 2.6.2,  glibc 2.3.
20  * EPOLLRDHUP               Linux 2.6.17, glibc 2.8.
21  * epoll_pwait()            Linux 2.6.19, glibc 2.6.
22  * signalfd()               Linux 2.6.22, glibc 2.7.
23  * eventfd()                Linux 2.6.22, glibc 2.7.
24  * timerfd_create()         Linux 2.6.25, glibc 2.8.
25  * epoll_create1()          Linux 2.6.27, glibc 2.9.
26  * signalfd4()              Linux 2.6.27, glibc 2.9.
27  * eventfd2()               Linux 2.6.27, glibc 2.9.
28  * accept4()                Linux 2.6.28, glibc 2.10.
29  * eventfd2(EFD_SEMAPHORE)  Linux 2.6.30, glibc 2.10.
30  * EPOLLEXCLUSIVE           Linux 4.5, glibc 2.24.
31  */
32 
33 
34 #if (NXT_HAVE_EPOLL_EDGE)
35 static nxt_int_t nxt_epoll_edge_create(nxt_event_engine_t *engine,
36     nxt_uint_t mchanges, nxt_uint_t mevents);
37 #endif
38 static nxt_int_t nxt_epoll_level_create(nxt_event_engine_t *engine,
39     nxt_uint_t mchanges, nxt_uint_t mevents);
40 static nxt_int_t nxt_epoll_create(nxt_event_engine_t *engine,
41     nxt_uint_t mchanges, nxt_uint_t mevents, nxt_conn_io_t *io, uint32_t mode);
42 static void nxt_epoll_test_accept4(nxt_event_engine_t *engine,
43     nxt_conn_io_t *io);
44 static void nxt_epoll_free(nxt_event_engine_t *engine);
45 static void nxt_epoll_enable(nxt_event_engine_t *engine, nxt_fd_event_t *ev);
46 static void nxt_epoll_disable(nxt_event_engine_t *engine, nxt_fd_event_t *ev);
47 static void nxt_epoll_delete(nxt_event_engine_t *engine, nxt_fd_event_t *ev);
48 static nxt_bool_t nxt_epoll_close(nxt_event_engine_t *engine,
49     nxt_fd_event_t *ev);
50 static void nxt_epoll_enable_read(nxt_event_engine_t *engine,
51     nxt_fd_event_t *ev);
52 static void nxt_epoll_enable_write(nxt_event_engine_t *engine,
53     nxt_fd_event_t *ev);
54 static void nxt_epoll_disable_read(nxt_event_engine_t *engine,
55     nxt_fd_event_t *ev);
56 static void nxt_epoll_disable_write(nxt_event_engine_t *engine,
57     nxt_fd_event_t *ev);
58 static void nxt_epoll_block_read(nxt_event_engine_t *engine,
59     nxt_fd_event_t *ev);
60 static void nxt_epoll_block_write(nxt_event_engine_t *engine,
61     nxt_fd_event_t *ev);
62 static void nxt_epoll_oneshot_read(nxt_event_engine_t *engine,
63     nxt_fd_event_t *ev);
64 static void nxt_epoll_oneshot_write(nxt_event_engine_t *engine,
65     nxt_fd_event_t *ev);
66 static void nxt_epoll_enable_accept(nxt_event_engine_t *engine,
67     nxt_fd_event_t *ev);
68 static void nxt_epoll_change(nxt_event_engine_t *engine, nxt_fd_event_t *ev,
69     int op, uint32_t events);
70 static nxt_int_t nxt_epoll_commit_changes(nxt_event_engine_t *engine);
71 static void nxt_epoll_error_handler(nxt_task_t *task, void *obj, void *data);
72 #if (NXT_HAVE_SIGNALFD)
73 static nxt_int_t nxt_epoll_add_signal(nxt_event_engine_t *engine);
74 static void nxt_epoll_signalfd_handler(nxt_task_t *task, void *obj, void *data);
75 #endif
76 #if (NXT_HAVE_EVENTFD)
77 static nxt_int_t nxt_epoll_enable_post(nxt_event_engine_t *engine,
78     nxt_work_handler_t handler);
79 static void nxt_epoll_eventfd_handler(nxt_task_t *task, void *obj, void *data);
80 static void nxt_epoll_signal(nxt_event_engine_t *engine, nxt_uint_t signo);
81 #endif
82 static void nxt_epoll_poll(nxt_event_engine_t *engine, nxt_msec_t timeout);
83 
84 #if (NXT_HAVE_ACCEPT4)
85 static void nxt_epoll_conn_io_accept4(nxt_task_t *task, void *obj,
86     void *data);
87 #endif
88 
89 
90 #if (NXT_HAVE_EPOLL_EDGE)
91 
92 static void nxt_epoll_edge_conn_io_connect(nxt_task_t *task, void *obj,
93     void *data);
94 static void nxt_epoll_edge_conn_connected(nxt_task_t *task, void *obj,
95     void *data);
96 static ssize_t nxt_epoll_edge_conn_io_recvbuf(nxt_conn_t *c, nxt_buf_t *b);
97 
98 
99 static nxt_conn_io_t  nxt_epoll_edge_conn_io = {
100     nxt_epoll_edge_conn_io_connect,
101     nxt_conn_io_accept,
102 
103     nxt_conn_io_read,
104     nxt_epoll_edge_conn_io_recvbuf,
105     nxt_conn_io_recv,
106 
107     nxt_conn_io_write,
108     nxt_event_conn_io_write_chunk,
109 
110 #if (NXT_HAVE_LINUX_SENDFILE)
111     nxt_linux_event_conn_io_sendfile,
112 #else
113     nxt_event_conn_io_sendbuf,
114 #endif
115 
116     nxt_event_conn_io_writev,
117     nxt_event_conn_io_send,
118 
119     nxt_conn_io_shutdown,
120 };
121 
122 
123 const nxt_event_interface_t  nxt_epoll_edge_engine = {
124     "epoll_edge",
125     nxt_epoll_edge_create,
126     nxt_epoll_free,
127     nxt_epoll_enable,
128     nxt_epoll_disable,
129     nxt_epoll_delete,
130     nxt_epoll_close,
131     nxt_epoll_enable_read,
132     nxt_epoll_enable_write,
133     nxt_epoll_disable_read,
134     nxt_epoll_disable_write,
135     nxt_epoll_block_read,
136     nxt_epoll_block_write,
137     nxt_epoll_oneshot_read,
138     nxt_epoll_oneshot_write,
139     nxt_epoll_enable_accept,
140     NULL,
141     NULL,
142 #if (NXT_HAVE_EVENTFD)
143     nxt_epoll_enable_post,
144     nxt_epoll_signal,
145 #else
146     NULL,
147     NULL,
148 #endif
149     nxt_epoll_poll,
150 
151     &nxt_epoll_edge_conn_io,
152 
153 #if (NXT_HAVE_INOTIFY)
154     NXT_FILE_EVENTS,
155 #else
156     NXT_NO_FILE_EVENTS,
157 #endif
158 
159 #if (NXT_HAVE_SIGNALFD)
160     NXT_SIGNAL_EVENTS,
161 #else
162     NXT_NO_SIGNAL_EVENTS,
163 #endif
164 };
165 
166 #endif
167 
168 
169 const nxt_event_interface_t  nxt_epoll_level_engine = {
170     "epoll_level",
171     nxt_epoll_level_create,
172     nxt_epoll_free,
173     nxt_epoll_enable,
174     nxt_epoll_disable,
175     nxt_epoll_delete,
176     nxt_epoll_close,
177     nxt_epoll_enable_read,
178     nxt_epoll_enable_write,
179     nxt_epoll_disable_read,
180     nxt_epoll_disable_write,
181     nxt_epoll_block_read,
182     nxt_epoll_block_write,
183     nxt_epoll_oneshot_read,
184     nxt_epoll_oneshot_write,
185     nxt_epoll_enable_accept,
186     NULL,
187     NULL,
188 #if (NXT_HAVE_EVENTFD)
189     nxt_epoll_enable_post,
190     nxt_epoll_signal,
191 #else
192     NULL,
193     NULL,
194 #endif
195     nxt_epoll_poll,
196 
197     &nxt_unix_conn_io,
198 
199 #if (NXT_HAVE_INOTIFY)
200     NXT_FILE_EVENTS,
201 #else
202     NXT_NO_FILE_EVENTS,
203 #endif
204 
205 #if (NXT_HAVE_SIGNALFD)
206     NXT_SIGNAL_EVENTS,
207 #else
208     NXT_NO_SIGNAL_EVENTS,
209 #endif
210 };
211 
212 
213 #if (NXT_HAVE_EPOLL_EDGE)
214 
215 static nxt_int_t
216 nxt_epoll_edge_create(nxt_event_engine_t *engine, nxt_uint_t mchanges,
217     nxt_uint_t mevents)
218 {
219     return nxt_epoll_create(engine, mchanges, mevents, &nxt_epoll_edge_conn_io,
220                             EPOLLET | EPOLLRDHUP);
221 }
222 
223 #endif
224 
225 
226 static nxt_int_t
227 nxt_epoll_level_create(nxt_event_engine_t *engine, nxt_uint_t mchanges,
228     nxt_uint_t mevents)
229 {
230     return nxt_epoll_create(engine, mchanges, mevents,
231                             &nxt_unix_conn_io, 0);
232 }
233 
234 
235 static nxt_int_t
236 nxt_epoll_create(nxt_event_engine_t *engine, nxt_uint_t mchanges,
237     nxt_uint_t mevents, nxt_conn_io_t *io, uint32_t mode)
238 {
239     engine->u.epoll.fd = -1;
240     engine->u.epoll.mode = mode;
241     engine->u.epoll.mchanges = mchanges;
242     engine->u.epoll.mevents = mevents;
243 #if (NXT_HAVE_SIGNALFD)
244     engine->u.epoll.signalfd.fd = -1;
245 #endif
246 
247     engine->u.epoll.changes = nxt_malloc(sizeof(nxt_epoll_change_t) * mchanges);
248     if (engine->u.epoll.changes == NULL) {
249         goto fail;
250     }
251 
252     engine->u.epoll.events = nxt_malloc(sizeof(struct epoll_event) * mevents);
253     if (engine->u.epoll.events == NULL) {
254         goto fail;
255     }
256 
257     engine->u.epoll.fd = epoll_create(1);
258     if (engine->u.epoll.fd == -1) {
259         nxt_log(&engine->task, NXT_LOG_CRIT, "epoll_create() failed %E",
260                 nxt_errno);
261         goto fail;
262     }
263 
264     nxt_debug(&engine->task, "epoll_create(): %d", engine->u.epoll.fd);
265 
266     if (engine->signals != NULL) {
267 
268 #if (NXT_HAVE_SIGNALFD)
269 
270         if (nxt_epoll_add_signal(engine) != NXT_OK) {
271             goto fail;
272         }
273 
274 #endif
275 
276         nxt_epoll_test_accept4(engine, io);
277     }
278 
279     return NXT_OK;
280 
281 fail:
282 
283     nxt_epoll_free(engine);
284 
285     return NXT_ERROR;
286 }
287 
288 
289 static void
290 nxt_epoll_test_accept4(nxt_event_engine_t *engine, nxt_conn_io_t *io)
291 {
292     static nxt_work_handler_t  handler;
293 
294     if (handler == NULL) {
295 
296         handler = io->accept;
297 
298 #if (NXT_HAVE_ACCEPT4)
299 
300         (void) accept4(-1, NULL, NULL, SOCK_NONBLOCK);
301 
302         if (nxt_errno != NXT_ENOSYS) {
303             handler = nxt_epoll_conn_io_accept4;
304 
305         } else {
306             nxt_log(&engine->task, NXT_LOG_INFO, "accept4() failed %E",
307                     NXT_ENOSYS);
308         }
309 
310 #endif
311     }
312 
313     io->accept = handler;
314 }
315 
316 
317 static void
318 nxt_epoll_free(nxt_event_engine_t *engine)
319 {
320     int  fd;
321 
322     nxt_debug(&engine->task, "epoll %d free", engine->u.epoll.fd);
323 
324 #if (NXT_HAVE_SIGNALFD)
325 
326     fd = engine->u.epoll.signalfd.fd;
327 
328     if (fd != -1 && close(fd) != 0) {
329         nxt_log(&engine->task, NXT_LOG_CRIT, "signalfd close(%d) failed %E",
330                 fd, nxt_errno);
331     }
332 
333 #endif
334 
335 #if (NXT_HAVE_EVENTFD)
336 
337     fd = engine->u.epoll.eventfd.fd;
338 
339     if (fd != -1 && close(fd) != 0) {
340         nxt_log(&engine->task, NXT_LOG_CRIT, "eventfd close(%d) failed %E",
341                 fd, nxt_errno);
342     }
343 
344 #endif
345 
346     fd = engine->u.epoll.fd;
347 
348     if (fd != -1 && close(fd) != 0) {
349         nxt_log(&engine->task, NXT_LOG_CRIT, "epoll close(%d) failed %E",
350                 fd, nxt_errno);
351     }
352 
353     nxt_free(engine->u.epoll.events);
354     nxt_free(engine->u.epoll.changes);
355 
356     nxt_memzero(&engine->u.epoll, sizeof(nxt_epoll_engine_t));
357 }
358 
359 
360 static void
361 nxt_epoll_enable(nxt_event_engine_t *engine, nxt_fd_event_t *ev)
362 {
363     ev->read = NXT_EVENT_ACTIVE;
364     ev->write = NXT_EVENT_ACTIVE;
365 
366     nxt_epoll_change(engine, ev, EPOLL_CTL_ADD,
367                      EPOLLIN | EPOLLOUT | engine->u.epoll.mode);
368 }
369 
370 
371 static void
372 nxt_epoll_disable(nxt_event_engine_t *engine, nxt_fd_event_t *ev)
373 {
374     if (ev->read > NXT_EVENT_DISABLED || ev->write > NXT_EVENT_DISABLED) {
375 
376         ev->read = NXT_EVENT_INACTIVE;
377         ev->write = NXT_EVENT_INACTIVE;
378 
379         nxt_epoll_change(engine, ev, EPOLL_CTL_DEL, 0);
380     }
381 }
382 
383 
384 static void
385 nxt_epoll_delete(nxt_event_engine_t *engine, nxt_fd_event_t *ev)
386 {
387     if (ev->read != NXT_EVENT_INACTIVE || ev->write != NXT_EVENT_INACTIVE) {
388 
389         ev->read = NXT_EVENT_INACTIVE;
390         ev->write = NXT_EVENT_INACTIVE;
391 
392         nxt_epoll_change(engine, ev, EPOLL_CTL_DEL, 0);
393     }
394 }
395 
396 
397 /*
398  * Although calling close() on a file descriptor will remove any epoll
399  * events that reference the descriptor, in this case the close() acquires
400  * the kernel global "epmutex" while epoll_ctl(EPOLL_CTL_DEL) does not
401  * acquire the "epmutex" since Linux 3.13 if the file descriptor presents
402  * only in one epoll set.  Thus removing events explicitly before closing
403  * eliminates possible lock contention.
404  */
405 
406 static nxt_bool_t
407 nxt_epoll_close(nxt_event_engine_t *engine, nxt_fd_event_t *ev)
408 {
409     nxt_epoll_delete(engine, ev);
410 
411     return ev->changing;
412 }
413 
414 
415 static void
416 nxt_epoll_enable_read(nxt_event_engine_t *engine, nxt_fd_event_t *ev)
417 {
418     int       op;
419     uint32_t  events;
420 
421     if (ev->read != NXT_EVENT_BLOCKED) {
422 
423         op = EPOLL_CTL_MOD;
424         events = EPOLLIN | engine->u.epoll.mode;
425 
426         if (ev->read == NXT_EVENT_INACTIVE && ev->write == NXT_EVENT_INACTIVE) {
427             op = EPOLL_CTL_ADD;
428 
429         } else if (ev->write >= NXT_EVENT_BLOCKED) {
430             events |= EPOLLOUT;
431         }
432 
433         nxt_epoll_change(engine, ev, op, events);
434     }
435 
436     ev->read = NXT_EVENT_ACTIVE;
437 }
438 
439 
440 static void
441 nxt_epoll_enable_write(nxt_event_engine_t *engine, nxt_fd_event_t *ev)
442 {
443     int       op;
444     uint32_t  events;
445 
446     if (ev->write != NXT_EVENT_BLOCKED) {
447 
448         op = EPOLL_CTL_MOD;
449         events = EPOLLOUT | engine->u.epoll.mode;
450 
451         if (ev->read == NXT_EVENT_INACTIVE && ev->write == NXT_EVENT_INACTIVE) {
452             op = EPOLL_CTL_ADD;
453 
454         } else if (ev->read >= NXT_EVENT_BLOCKED) {
455             events |= EPOLLIN;
456         }
457 
458         nxt_epoll_change(engine, ev, op, events);
459     }
460 
461     ev->write = NXT_EVENT_ACTIVE;
462 }
463 
464 
465 static void
466 nxt_epoll_disable_read(nxt_event_engine_t *engine, nxt_fd_event_t *ev)
467 {
468     int       op;
469     uint32_t  events;
470 
471     ev->read = NXT_EVENT_INACTIVE;
472 
473     if (ev->write <= NXT_EVENT_DISABLED) {
474         ev->write = NXT_EVENT_INACTIVE;
475         op = EPOLL_CTL_DEL;
476         events = 0;
477 
478     } else {
479         op = EPOLL_CTL_MOD;
480         events = EPOLLOUT | engine->u.epoll.mode;
481     }
482 
483     nxt_epoll_change(engine, ev, op, events);
484 }
485 
486 
487 static void
488 nxt_epoll_disable_write(nxt_event_engine_t *engine, nxt_fd_event_t *ev)
489 {
490     int       op;
491     uint32_t  events;
492 
493     ev->write = NXT_EVENT_INACTIVE;
494 
495     if (ev->read <= NXT_EVENT_DISABLED) {
496         ev->write = NXT_EVENT_INACTIVE;
497         op = EPOLL_CTL_DEL;
498         events = 0;
499 
500     } else {
501         op = EPOLL_CTL_MOD;
502         events = EPOLLIN | engine->u.epoll.mode;
503     }
504 
505     nxt_epoll_change(engine, ev, op, events);
506 }
507 
508 
509 static void
510 nxt_epoll_block_read(nxt_event_engine_t *engine, nxt_fd_event_t *ev)
511 {
512     if (ev->read != NXT_EVENT_INACTIVE) {
513         ev->read = NXT_EVENT_BLOCKED;
514     }
515 }
516 
517 
518 static void
519 nxt_epoll_block_write(nxt_event_engine_t *engine, nxt_fd_event_t *ev)
520 {
521     if (ev->write != NXT_EVENT_INACTIVE) {
522         ev->write = NXT_EVENT_BLOCKED;
523     }
524 }
525 
526 
527 /*
528  * NXT_EVENT_DISABLED state is used to track whether EPOLLONESHOT
529  * event should be added or modified, epoll_ctl(2):
530  *
531  * EPOLLONESHOT (since Linux 2.6.2)
532  *     Sets the one-shot behavior for the associated file descriptor.
533  *     This means that after an event is pulled out with epoll_wait(2)
534  *     the associated file descriptor is internally disabled and no
535  *     other events will be reported by the epoll interface.  The user
536  *     must call epoll_ctl() with EPOLL_CTL_MOD to rearm the file
537  *     descriptor with a new event mask.
538  */
539 
540 static void
541 nxt_epoll_oneshot_read(nxt_event_engine_t *engine, nxt_fd_event_t *ev)
542 {
543     int  op;
544 
545     op = (ev->read == NXT_EVENT_INACTIVE && ev->write == NXT_EVENT_INACTIVE) ?
546              EPOLL_CTL_ADD : EPOLL_CTL_MOD;
547 
548     ev->read = NXT_EVENT_ONESHOT;
549     ev->write = NXT_EVENT_INACTIVE;
550 
551     nxt_epoll_change(engine, ev, op, EPOLLIN | EPOLLONESHOT);
552 }
553 
554 
555 static void
556 nxt_epoll_oneshot_write(nxt_event_engine_t *engine, nxt_fd_event_t *ev)
557 {
558     int  op;
559 
560     op = (ev->read == NXT_EVENT_INACTIVE && ev->write == NXT_EVENT_INACTIVE) ?
561              EPOLL_CTL_ADD : EPOLL_CTL_MOD;
562 
563     ev->read = NXT_EVENT_INACTIVE;
564     ev->write = NXT_EVENT_ONESHOT;
565 
566     nxt_epoll_change(engine, ev, op, EPOLLOUT | EPOLLONESHOT);
567 }
568 
569 
570 static void
571 nxt_epoll_enable_accept(nxt_event_engine_t *engine, nxt_fd_event_t *ev)
572 {
573     uint32_t  events;
574 
575     ev->read = NXT_EVENT_ACTIVE;
576 
577     events = EPOLLIN;
578 
579 #ifdef EPOLLEXCLUSIVE
580     events |= EPOLLEXCLUSIVE;
581 #endif
582 
583     nxt_epoll_change(engine, ev, EPOLL_CTL_ADD, events);
584 }
585 
586 
587 /*
588  * epoll changes are batched to improve instruction and data cache
589  * locality of several epoll_ctl() calls followed by epoll_wait() call.
590  */
591 
592 static void
593 nxt_epoll_change(nxt_event_engine_t *engine, nxt_fd_event_t *ev, int op,
594     uint32_t events)
595 {
596     nxt_epoll_change_t  *change;
597 
598     nxt_debug(ev->task, "epoll %d set event: fd:%d op:%d ev:%XD",
599               engine->u.epoll.fd, ev->fd, op, events);
600 
601     if (engine->u.epoll.nchanges >= engine->u.epoll.mchanges) {
602         (void) nxt_epoll_commit_changes(engine);
603     }
604 
605     ev->changing = 1;
606 
607     change = &engine->u.epoll.changes[engine->u.epoll.nchanges++];
608     change->op = op;
609     change->event.events = events;
610     change->event.data.ptr = ev;
611 }
612 
613 
614 static nxt_int_t
615 nxt_epoll_commit_changes(nxt_event_engine_t *engine)
616 {
617     int                 ret;
618     nxt_int_t           retval;
619     nxt_fd_event_t      *ev;
620     nxt_epoll_change_t  *change, *end;
621 
622     nxt_debug(&engine->task, "epoll %d changes:%ui",
623               engine->u.epoll.fd, engine->u.epoll.nchanges);
624 
625     retval = NXT_OK;
626     change = engine->u.epoll.changes;
627     end = change + engine->u.epoll.nchanges;
628 
629     do {
630         ev = change->event.data.ptr;
631         ev->changing = 0;
632 
633         nxt_debug(ev->task, "epoll_ctl(%d): fd:%d op:%d ev:%XD",
634                   engine->u.epoll.fd, ev->fd, change->op,
635                   change->event.events);
636 
637         ret = epoll_ctl(engine->u.epoll.fd, change->op, ev->fd, &change->event);
638 
639         if (nxt_slow_path(ret != 0)) {
640             nxt_log(ev->task, NXT_LOG_CRIT, "epoll_ctl(%d, %d, %d) failed %E",
641                     engine->u.epoll.fd, change->op, ev->fd, nxt_errno);
642 
643             nxt_work_queue_add(&engine->fast_work_queue,
644                                nxt_epoll_error_handler, ev->task, ev, ev->data);
645 
646             retval = NXT_ERROR;
647         }
648 
649         change++;
650 
651     } while (change < end);
652 
653     engine->u.epoll.nchanges = 0;
654 
655     return retval;
656 }
657 
658 
659 static void
660 nxt_epoll_error_handler(nxt_task_t *task, void *obj, void *data)
661 {
662     nxt_fd_event_t  *ev;
663 
664     ev = obj;
665 
666     ev->read = NXT_EVENT_INACTIVE;
667     ev->write = NXT_EVENT_INACTIVE;
668 
669     ev->error_handler(ev->task, ev, data);
670 }
671 
672 
673 #if (NXT_HAVE_SIGNALFD)
674 
675 static nxt_int_t
676 nxt_epoll_add_signal(nxt_event_engine_t *engine)
677 {
678     int                 fd;
679     struct epoll_event  ee;
680 
681     if (sigprocmask(SIG_BLOCK, &engine->signals->sigmask, NULL) != 0) {
682         nxt_log(&engine->task, NXT_LOG_CRIT,
683                 "sigprocmask(SIG_BLOCK) failed %E", nxt_errno);
684         return NXT_ERROR;
685     }
686 
687     /*
688      * Glibc signalfd() wrapper always has the flags argument.  Glibc 2.7
689      * and 2.8 signalfd() wrappers call the original signalfd() syscall
690      * without the flags argument.  Glibc 2.9+ signalfd() wrapper at first
691      * tries to call signalfd4() syscall and if it fails then calls the
692      * original signalfd() syscall.  For this reason the non-blocking mode
693      * is set separately.
694      */
695 
696     fd = signalfd(-1, &engine->signals->sigmask, 0);
697 
698     if (fd == -1) {
699         nxt_log(&engine->task, NXT_LOG_CRIT, "signalfd(%d) failed %E",
700                 engine->u.epoll.signalfd.fd, nxt_errno);
701         return NXT_ERROR;
702     }
703 
704     engine->u.epoll.signalfd.fd = fd;
705 
706     if (nxt_fd_nonblocking(&engine->task, fd) != NXT_OK) {
707         return NXT_ERROR;
708     }
709 
710     nxt_debug(&engine->task, "signalfd(): %d", fd);
711 
712     engine->u.epoll.signalfd.data = engine->signals->handler;
713     engine->u.epoll.signalfd.read_work_queue = &engine->fast_work_queue;
714     engine->u.epoll.signalfd.read_handler = nxt_epoll_signalfd_handler;
715     engine->u.epoll.signalfd.log = engine->task.log;
716     engine->u.epoll.signalfd.task = &engine->task;
717 
718     ee.events = EPOLLIN;
719     ee.data.ptr = &engine->u.epoll.signalfd;
720 
721     if (epoll_ctl(engine->u.epoll.fd, EPOLL_CTL_ADD, fd, &ee) != 0) {
722         nxt_log(&engine->task, NXT_LOG_CRIT, "epoll_ctl(%d, %d, %d) failed %E",
723                 engine->u.epoll.fd, EPOLL_CTL_ADD, fd, nxt_errno);
724 
725         return NXT_ERROR;
726     }
727 
728     return NXT_OK;
729 }
730 
731 
732 static void
733 nxt_epoll_signalfd_handler(nxt_task_t *task, void *obj, void *data)
734 {
735     int                      n;
736     nxt_fd_event_t           *ev;
737     nxt_work_handler_t       handler;
738     struct signalfd_siginfo  sfd;
739 
740     ev = obj;
741     handler = data;
742 
743     nxt_debug(task, "signalfd handler");
744 
745     n = read(ev->fd, &sfd, sizeof(struct signalfd_siginfo));
746 
747     nxt_debug(task, "read signalfd(%d): %d", ev->fd, n);
748 
749     if (n != sizeof(struct signalfd_siginfo)) {
750         nxt_log(task, NXT_LOG_CRIT, "read signalfd(%d) failed %E",
751                 ev->fd, nxt_errno);
752         return;
753     }
754 
755     nxt_debug(task, "signalfd(%d) signo:%d", ev->fd, sfd.ssi_signo);
756 
757     handler(task, (void *) (uintptr_t) sfd.ssi_signo, NULL);
758 }
759 
760 #endif
761 
762 
763 #if (NXT_HAVE_EVENTFD)
764 
765 static nxt_int_t
766 nxt_epoll_enable_post(nxt_event_engine_t *engine, nxt_work_handler_t handler)
767 {
768     int                 ret;
769     struct epoll_event  ee;
770 
771     engine->u.epoll.post_handler = handler;
772 
773     /*
774      * Glibc eventfd() wrapper always has the flags argument.  Glibc 2.7
775      * and 2.8 eventfd() wrappers call the original eventfd() syscall
776      * without the flags argument.  Glibc 2.9+ eventfd() wrapper at first
777      * tries to call eventfd2() syscall and if it fails then calls the
778      * original eventfd() syscall.  For this reason the non-blocking mode
779      * is set separately.
780      */
781 
782     engine->u.epoll.eventfd.fd = eventfd(0, 0);
783 
784     if (engine->u.epoll.eventfd.fd == -1) {
785         nxt_log(&engine->task, NXT_LOG_CRIT, "eventfd() failed %E", nxt_errno);
786         return NXT_ERROR;
787     }
788 
789     ret = nxt_fd_nonblocking(&engine->task, engine->u.epoll.eventfd.fd);
790     if (nxt_slow_path(ret != NXT_OK)) {
791         return NXT_ERROR;
792     }
793 
794     nxt_debug(&engine->task, "eventfd(): %d", engine->u.epoll.eventfd.fd);
795 
796     engine->u.epoll.eventfd.read_work_queue = &engine->fast_work_queue;
797     engine->u.epoll.eventfd.read_handler = nxt_epoll_eventfd_handler;
798     engine->u.epoll.eventfd.data = engine;
799     engine->u.epoll.eventfd.log = engine->task.log;
800     engine->u.epoll.eventfd.task = &engine->task;
801 
802     ee.events = EPOLLIN | EPOLLET;
803     ee.data.ptr = &engine->u.epoll.eventfd;
804 
805     ret = epoll_ctl(engine->u.epoll.fd, EPOLL_CTL_ADD,
806                     engine->u.epoll.eventfd.fd, &ee);
807 
808     if (nxt_fast_path(ret == 0)) {
809         return NXT_OK;
810     }
811 
812     nxt_log(&engine->task, NXT_LOG_CRIT, "epoll_ctl(%d, %d, %d) failed %E",
813             engine->u.epoll.fd, EPOLL_CTL_ADD, engine->u.epoll.eventfd.fd,
814             nxt_errno);
815 
816     return NXT_ERROR;
817 }
818 
819 
820 static void
821 nxt_epoll_eventfd_handler(nxt_task_t *task, void *obj, void *data)
822 {
823     int                 n;
824     uint64_t            events;
825     nxt_event_engine_t  *engine;
826 
827     engine = data;
828 
829     nxt_debug(task, "eventfd handler, times:%ui", engine->u.epoll.neventfd);
830 
831     /*
832      * The maximum value after write() to a eventfd() descriptor will
833      * block or return EAGAIN is 0xfffffffffffffffe, so the descriptor
834      * can be read once per many notifications, for example, once per
835      * 2^32-2 noticifcations.  Since the eventfd() file descriptor is
836      * always registered in EPOLLET mode, epoll returns event about
837      * only the latest write() to the descriptor.
838      */
839 
840     if (engine->u.epoll.neventfd++ >= 0xfffffffe) {
841         engine->u.epoll.neventfd = 0;
842 
843         n = read(engine->u.epoll.eventfd.fd, &events, sizeof(uint64_t));
844 
845         nxt_debug(task, "read(%d): %d events:%uL",
846                   engine->u.epoll.eventfd.fd, n, events);
847 
848         if (n != sizeof(uint64_t)) {
849             nxt_log(task, NXT_LOG_CRIT, "read eventfd(%d) failed %E",
850                     engine->u.epoll.eventfd.fd, nxt_errno);
851         }
852     }
853 
854     engine->u.epoll.post_handler(task, NULL, NULL);
855 }
856 
857 
858 static void
859 nxt_epoll_signal(nxt_event_engine_t *engine, nxt_uint_t signo)
860 {
861     size_t    ret;
862     uint64_t  event;
863 
864     /*
865      * eventfd() presents along with signalfd(), so the function
866      * is used only to post events and the signo argument is ignored.
867      */
868 
869     event = 1;
870 
871     ret = write(engine->u.epoll.eventfd.fd, &event, sizeof(uint64_t));
872 
873     if (nxt_slow_path(ret != sizeof(uint64_t))) {
874         nxt_log(&engine->task, NXT_LOG_CRIT, "write(%d) to eventfd failed %E",
875                 engine->u.epoll.eventfd.fd, nxt_errno);
876     }
877 }
878 
879 #endif
880 
881 
882 static void
883 nxt_epoll_poll(nxt_event_engine_t *engine, nxt_msec_t timeout)
884 {
885     int                 nevents;
886     uint32_t            events;
887     nxt_int_t           i;
888     nxt_err_t           err;
889     nxt_bool_t          error;
890     nxt_uint_t          level;
891     nxt_fd_event_t      *ev;
892     struct epoll_event  *event;
893 
894     if (engine->u.epoll.nchanges != 0) {
895         if (nxt_epoll_commit_changes(engine) != NXT_OK) {
896             /* Error handlers have been enqueued on failure. */
897             timeout = 0;
898         }
899     }
900 
901     nxt_debug(&engine->task, "epoll_wait(%d) timeout:%M",
902               engine->u.epoll.fd, timeout);
903 
904     nevents = epoll_wait(engine->u.epoll.fd, engine->u.epoll.events,
905                          engine->u.epoll.mevents, timeout);
906 
907     err = (nevents == -1) ? nxt_errno : 0;
908 
909     nxt_thread_time_update(engine->task.thread);
910 
911     nxt_debug(&engine->task, "epoll_wait(%d): %d", engine->u.epoll.fd, nevents);
912 
913     if (nevents == -1) {
914         level = (err == NXT_EINTR) ? NXT_LOG_INFO : NXT_LOG_CRIT;
915 
916         nxt_log(&engine->task, level, "epoll_wait(%d) failed %E",
917                 engine->u.epoll.fd, err);
918 
919         return;
920     }
921 
922     for (i = 0; i < nevents; i++) {
923 
924         event = &engine->u.epoll.events[i];
925         events = event->events;
926         ev = event->data.ptr;
927 
928         nxt_debug(ev->task, "epoll: fd:%d ev:%04XD d:%p rd:%d wr:%d",
929                   ev->fd, events, ev, ev->read, ev->write);
930 
931         /*
932          * On error epoll may set EPOLLERR and EPOLLHUP only without EPOLLIN or
933          * EPOLLOUT, so the "error" variable enqueues only one active handler.
934          */
935         error = ((events & (EPOLLERR | EPOLLHUP)) != 0);
936         ev->epoll_error = error;
937 
938 #if (NXT_HAVE_EPOLL_EDGE)
939 
940         ev->epoll_eof = ((events & EPOLLRDHUP) != 0);
941 
942 #endif
943 
944         if ((events & EPOLLIN) || error) {
945             ev->read_ready = 1;
946 
947             if (ev->read != NXT_EVENT_BLOCKED) {
948 
949                 if (ev->read == NXT_EVENT_ONESHOT) {
950                     ev->read = NXT_EVENT_DISABLED;
951                 }
952 
953                 error = 0;
954 
955                 nxt_work_queue_add(ev->read_work_queue, ev->read_handler,
956                                    ev->task, ev, ev->data);
957 
958             } else if (engine->u.epoll.mode == 0) {
959                 /* Level-triggered mode. */
960                 nxt_epoll_disable_read(engine, ev);
961             }
962         }
963 
964         if ((events & EPOLLOUT) || error) {
965             ev->write_ready = 1;
966 
967             if (ev->write != NXT_EVENT_BLOCKED) {
968 
969                 if (ev->write == NXT_EVENT_ONESHOT) {
970                     ev->write = NXT_EVENT_DISABLED;
971                 }
972 
973                 error = 0;
974 
975                 nxt_work_queue_add(ev->write_work_queue, ev->write_handler,
976                                    ev->task, ev, ev->data);
977 
978             } else if (engine->u.epoll.mode == 0) {
979                 /* Level-triggered mode. */
980                 nxt_epoll_disable_write(engine, ev);
981             }
982         }
983 
984         if (error) {
985             ev->read_ready = 1;
986             ev->write_ready = 1;
987         }
988     }
989 }
990 
991 
992 #if (NXT_HAVE_ACCEPT4)
993 
994 static void
995 nxt_epoll_conn_io_accept4(nxt_task_t *task, void *obj, void *data)
996 {
997     socklen_t           socklen;
998     nxt_conn_t          *c;
999     nxt_socket_t        s;
1000     struct sockaddr     *sa;
1001     nxt_listen_event_t  *lev;
1002 
1003     lev = obj;
1004     c = lev->next;
1005 
1006     lev->ready--;
1007     lev->socket.read_ready = (lev->ready != 0);
1008 
1009     sa = &c->remote->u.sockaddr;
1010     socklen = c->remote->socklen;
1011     /*
1012      * The returned socklen is ignored here,
1013      * see comment in nxt_conn_io_accept().
1014      */
1015     s = accept4(lev->socket.fd, sa, &socklen, SOCK_NONBLOCK);
1016 
1017     if (s != -1) {
1018         c->socket.fd = s;
1019 
1020         nxt_debug(task, "accept4(%d): %d", lev->socket.fd, s);
1021 
1022         nxt_conn_accept(task, lev, c);
1023         return;
1024     }
1025 
1026     nxt_conn_accept_error(task, lev, "accept4", nxt_errno);
1027 }
1028 
1029 #endif
1030 
1031 
1032 #if (NXT_HAVE_EPOLL_EDGE)
1033 
1034 /*
1035  * nxt_epoll_edge_event_conn_io_connect() eliminates the getsockopt()
1036  * syscall to test pending connect() error.  Although this special
1037  * interface can work in both edge-triggered and level-triggered
1038  * modes it is enabled only for the former mode because this mode is
1039  * available in all modern Linux distributions.  For the latter mode
1040  * it is required to create additional nxt_epoll_level_event_conn_io
1041  * with single non-generic connect() interface.
1042  */
1043 
1044 static void
1045 nxt_epoll_edge_conn_io_connect(nxt_task_t *task, void *obj, void *data)
1046 {
1047     nxt_conn_t                    *c;
1048     nxt_event_engine_t            *engine;
1049     nxt_work_handler_t            handler;
1050     const nxt_event_conn_state_t  *state;
1051 
1052     c = obj;
1053 
1054     state = c->write_state;
1055 
1056     switch (nxt_socket_connect(task, c->socket.fd, c->remote) ){
1057 
1058     case NXT_OK:
1059         c->socket.write_ready = 1;
1060         handler = state->ready_handler;
1061         break;
1062 
1063     case NXT_AGAIN:
1064         c->socket.write_handler = nxt_epoll_edge_conn_connected;
1065         c->socket.error_handler = nxt_conn_connect_error;
1066 
1067         engine = task->thread->engine;
1068         nxt_conn_timer(engine, c, state, &c->write_timer);
1069 
1070         nxt_epoll_enable(engine, &c->socket);
1071         c->socket.read = NXT_EVENT_BLOCKED;
1072         return;
1073 
1074 #if 0
1075     case NXT_AGAIN:
1076         nxt_conn_timer(engine, c, state, &c->write_timer);
1077 
1078         /* Fall through. */
1079 
1080     case NXT_OK:
1081         /*
1082          * Mark both read and write directions as ready and try to perform
1083          * I/O operations before receiving readiness notifications.
1084          * On unconnected socket Linux send() and recv() return EAGAIN
1085          * instead of ENOTCONN.
1086          */
1087         c->socket.read_ready = 1;
1088         c->socket.write_ready = 1;
1089         /*
1090          * Enabling both read and write notifications on a getting
1091          * connected socket eliminates one epoll_ctl() syscall.
1092          */
1093         c->socket.write_handler = nxt_epoll_edge_event_conn_connected;
1094         c->socket.error_handler = state->error_handler;
1095 
1096         nxt_epoll_enable(engine, &c->socket);
1097         c->socket.read = NXT_EVENT_BLOCKED;
1098 
1099         handler = state->ready_handler;
1100         break;
1101 #endif
1102 
1103     case NXT_ERROR:
1104         handler = state->error_handler;
1105         break;
1106 
1107     default:  /* NXT_DECLINED: connection refused. */
1108         handler = state->close_handler;
1109         break;
1110     }
1111 
1112     nxt_work_queue_add(c->write_work_queue, handler, task, c, data);
1113 }
1114 
1115 
1116 static void
1117 nxt_epoll_edge_conn_connected(nxt_task_t *task, void *obj, void *data)
1118 {
1119     nxt_conn_t  *c;
1120 
1121     c = obj;
1122 
1123     nxt_debug(task, "epoll event conn connected fd:%d", c->socket.fd);
1124 
1125     if (!c->socket.epoll_error) {
1126         c->socket.write = NXT_EVENT_BLOCKED;
1127 
1128         if (c->write_state->timer_autoreset) {
1129             nxt_timer_disable(task->thread->engine, &c->write_timer);
1130         }
1131 
1132         nxt_work_queue_add(c->write_work_queue, c->write_state->ready_handler,
1133                            task, c, data);
1134         return;
1135     }
1136 
1137     nxt_conn_connect_test(task, c, data);
1138 }
1139 
1140 
1141 /*
1142  * nxt_epoll_edge_conn_io_recvbuf() is just wrapper around
1143  * standard nxt_conn_io_recvbuf() to enforce to read a pending EOF
1144  * in edge-triggered mode.
1145  */
1146 
1147 static ssize_t
1148 nxt_epoll_edge_conn_io_recvbuf(nxt_conn_t *c, nxt_buf_t *b)
1149 {
1150     ssize_t  n;
1151 
1152     n = nxt_conn_io_recvbuf(c, b);
1153 
1154     if (n > 0 && c->socket.epoll_eof) {
1155         c->socket.read_ready = 1;
1156     }
1157 
1158     return n;
1159 }
1160 
1161 #endif
1162