xref: /unit/src/nxt_epoll_engine.c (revision 13:3a52b2c3d3f1)
1 
2 /*
3  * Copyright (C) Igor Sysoev
4  * Copyright (C) NGINX, Inc.
5  */
6 
7 #include <nxt_main.h>
8 
9 
10 /*
11  * The first epoll version has been introduced in Linux 2.5.44.  The
12  * interface was changed several times since then and the final version
13  * of epoll_create(), epoll_ctl(), epoll_wait(), and EPOLLET mode has
14  * been introduced in Linux 2.6.0 and is supported since glibc 2.3.2.
15  *
16  * EPOLLET mode did not work reliable in early implementaions and in
17  * Linux 2.4 backport.
18  *
19  * EPOLLONESHOT             Linux 2.6.2,  glibc 2.3.
20  * EPOLLRDHUP               Linux 2.6.17, glibc 2.8.
21  * epoll_pwait()            Linux 2.6.19, glibc 2.6.
22  * signalfd()               Linux 2.6.22, glibc 2.7.
23  * eventfd()                Linux 2.6.22, glibc 2.7.
24  * timerfd_create()         Linux 2.6.25, glibc 2.8.
25  * epoll_create1()          Linux 2.6.27, glibc 2.9.
26  * signalfd4()              Linux 2.6.27, glibc 2.9.
27  * eventfd2()               Linux 2.6.27, glibc 2.9.
28  * accept4()                Linux 2.6.28, glibc 2.10.
29  * eventfd2(EFD_SEMAPHORE)  Linux 2.6.30, glibc 2.10.
30  * EPOLLEXCLUSIVE           Linux 4.5.
31  */
32 
33 
34 #if (NXT_HAVE_EPOLL_EDGE)
35 static nxt_int_t nxt_epoll_edge_create(nxt_event_engine_t *engine,
36     nxt_uint_t mchanges, nxt_uint_t mevents);
37 #endif
38 static nxt_int_t nxt_epoll_level_create(nxt_event_engine_t *engine,
39     nxt_uint_t mchanges, nxt_uint_t mevents);
40 static nxt_int_t nxt_epoll_create(nxt_event_engine_t *engine,
41     nxt_uint_t mchanges, nxt_uint_t mevents, nxt_event_conn_io_t *io,
42     uint32_t mode);
43 static void nxt_epoll_test_accept4(nxt_event_engine_t *engine,
44     nxt_event_conn_io_t *io);
45 static void nxt_epoll_free(nxt_event_engine_t *engine);
46 static void nxt_epoll_enable(nxt_event_engine_t *engine, nxt_fd_event_t *ev);
47 static void nxt_epoll_disable(nxt_event_engine_t *engine, nxt_fd_event_t *ev);
48 static void nxt_epoll_delete(nxt_event_engine_t *engine, nxt_fd_event_t *ev);
49 static nxt_bool_t nxt_epoll_close(nxt_event_engine_t *engine,
50     nxt_fd_event_t *ev);
51 static void nxt_epoll_enable_read(nxt_event_engine_t *engine,
52     nxt_fd_event_t *ev);
53 static void nxt_epoll_enable_write(nxt_event_engine_t *engine,
54     nxt_fd_event_t *ev);
55 static void nxt_epoll_disable_read(nxt_event_engine_t *engine,
56     nxt_fd_event_t *ev);
57 static void nxt_epoll_disable_write(nxt_event_engine_t *engine,
58     nxt_fd_event_t *ev);
59 static void nxt_epoll_block_read(nxt_event_engine_t *engine,
60     nxt_fd_event_t *ev);
61 static void nxt_epoll_block_write(nxt_event_engine_t *engine,
62     nxt_fd_event_t *ev);
63 static void nxt_epoll_oneshot_read(nxt_event_engine_t *engine,
64     nxt_fd_event_t *ev);
65 static void nxt_epoll_oneshot_write(nxt_event_engine_t *engine,
66     nxt_fd_event_t *ev);
67 static void nxt_epoll_enable_accept(nxt_event_engine_t *engine,
68     nxt_fd_event_t *ev);
69 static void nxt_epoll_change(nxt_event_engine_t *engine, nxt_fd_event_t *ev,
70     int op, uint32_t events);
71 static nxt_int_t nxt_epoll_commit_changes(nxt_event_engine_t *engine);
72 static void nxt_epoll_error_handler(nxt_task_t *task, void *obj, void *data);
73 #if (NXT_HAVE_SIGNALFD)
74 static nxt_int_t nxt_epoll_add_signal(nxt_event_engine_t *engine);
75 static void nxt_epoll_signalfd_handler(nxt_task_t *task, void *obj, void *data);
76 #endif
77 #if (NXT_HAVE_EVENTFD)
78 static nxt_int_t nxt_epoll_enable_post(nxt_event_engine_t *engine,
79     nxt_work_handler_t handler);
80 static void nxt_epoll_eventfd_handler(nxt_task_t *task, void *obj, void *data);
81 static void nxt_epoll_signal(nxt_event_engine_t *engine, nxt_uint_t signo);
82 #endif
83 static void nxt_epoll_poll(nxt_event_engine_t *engine, nxt_msec_t timeout);
84 
85 #if (NXT_HAVE_ACCEPT4)
86 static void nxt_epoll_event_conn_io_accept4(nxt_task_t *task, void *obj,
87     void *data);
88 #endif
89 
90 
91 #if (NXT_HAVE_EPOLL_EDGE)
92 
93 static void nxt_epoll_edge_event_conn_io_connect(nxt_task_t *task, void *obj,
94     void *data);
95 static void nxt_epoll_edge_event_conn_connected(nxt_task_t *task, void *obj,
96     void *data);
97 static ssize_t nxt_epoll_edge_event_conn_io_recvbuf(nxt_event_conn_t *c,
98     nxt_buf_t *b);
99 
100 
101 static nxt_event_conn_io_t  nxt_epoll_edge_event_conn_io = {
102     nxt_epoll_edge_event_conn_io_connect,
103     nxt_event_conn_io_accept,
104 
105     nxt_event_conn_io_read,
106     nxt_epoll_edge_event_conn_io_recvbuf,
107     nxt_event_conn_io_recv,
108 
109     nxt_conn_io_write,
110     nxt_event_conn_io_write_chunk,
111 
112 #if (NXT_HAVE_LINUX_SENDFILE)
113     nxt_linux_event_conn_io_sendfile,
114 #else
115     nxt_event_conn_io_sendbuf,
116 #endif
117 
118     nxt_event_conn_io_writev,
119     nxt_event_conn_io_send,
120 
121     nxt_event_conn_io_shutdown,
122 };
123 
124 
125 const nxt_event_interface_t  nxt_epoll_edge_engine = {
126     "epoll_edge",
127     nxt_epoll_edge_create,
128     nxt_epoll_free,
129     nxt_epoll_enable,
130     nxt_epoll_disable,
131     nxt_epoll_delete,
132     nxt_epoll_close,
133     nxt_epoll_enable_read,
134     nxt_epoll_enable_write,
135     nxt_epoll_disable_read,
136     nxt_epoll_disable_write,
137     nxt_epoll_block_read,
138     nxt_epoll_block_write,
139     nxt_epoll_oneshot_read,
140     nxt_epoll_oneshot_write,
141     nxt_epoll_enable_accept,
142     NULL,
143     NULL,
144 #if (NXT_HAVE_EVENTFD)
145     nxt_epoll_enable_post,
146     nxt_epoll_signal,
147 #else
148     NULL,
149     NULL,
150 #endif
151     nxt_epoll_poll,
152 
153     &nxt_epoll_edge_event_conn_io,
154 
155 #if (NXT_HAVE_INOTIFY)
156     NXT_FILE_EVENTS,
157 #else
158     NXT_NO_FILE_EVENTS,
159 #endif
160 
161 #if (NXT_HAVE_SIGNALFD)
162     NXT_SIGNAL_EVENTS,
163 #else
164     NXT_NO_SIGNAL_EVENTS,
165 #endif
166 };
167 
168 #endif
169 
170 
171 const nxt_event_interface_t  nxt_epoll_level_engine = {
172     "epoll_level",
173     nxt_epoll_level_create,
174     nxt_epoll_free,
175     nxt_epoll_enable,
176     nxt_epoll_disable,
177     nxt_epoll_delete,
178     nxt_epoll_close,
179     nxt_epoll_enable_read,
180     nxt_epoll_enable_write,
181     nxt_epoll_disable_read,
182     nxt_epoll_disable_write,
183     nxt_epoll_block_read,
184     nxt_epoll_block_write,
185     nxt_epoll_oneshot_read,
186     nxt_epoll_oneshot_write,
187     nxt_epoll_enable_accept,
188     NULL,
189     NULL,
190 #if (NXT_HAVE_EVENTFD)
191     nxt_epoll_enable_post,
192     nxt_epoll_signal,
193 #else
194     NULL,
195     NULL,
196 #endif
197     nxt_epoll_poll,
198 
199     &nxt_unix_event_conn_io,
200 
201 #if (NXT_HAVE_INOTIFY)
202     NXT_FILE_EVENTS,
203 #else
204     NXT_NO_FILE_EVENTS,
205 #endif
206 
207 #if (NXT_HAVE_SIGNALFD)
208     NXT_SIGNAL_EVENTS,
209 #else
210     NXT_NO_SIGNAL_EVENTS,
211 #endif
212 };
213 
214 
215 #if (NXT_HAVE_EPOLL_EDGE)
216 
217 static nxt_int_t
218 nxt_epoll_edge_create(nxt_event_engine_t *engine, nxt_uint_t mchanges,
219     nxt_uint_t mevents)
220 {
221     return nxt_epoll_create(engine, mchanges, mevents,
222                             &nxt_epoll_edge_event_conn_io,
223                             EPOLLET | EPOLLRDHUP);
224 }
225 
226 #endif
227 
228 
229 static nxt_int_t
230 nxt_epoll_level_create(nxt_event_engine_t *engine, nxt_uint_t mchanges,
231     nxt_uint_t mevents)
232 {
233     return nxt_epoll_create(engine, mchanges, mevents,
234                             &nxt_unix_event_conn_io, 0);
235 }
236 
237 
238 static nxt_int_t
239 nxt_epoll_create(nxt_event_engine_t *engine, nxt_uint_t mchanges,
240     nxt_uint_t mevents, nxt_event_conn_io_t *io, uint32_t mode)
241 {
242     engine->u.epoll.fd = -1;
243     engine->u.epoll.mode = mode;
244     engine->u.epoll.mchanges = mchanges;
245     engine->u.epoll.mevents = mevents;
246 #if (NXT_HAVE_SIGNALFD)
247     engine->u.epoll.signalfd.fd = -1;
248 #endif
249 
250     engine->u.epoll.changes = nxt_malloc(sizeof(nxt_epoll_change_t) * mchanges);
251     if (engine->u.epoll.changes == NULL) {
252         goto fail;
253     }
254 
255     engine->u.epoll.events = nxt_malloc(sizeof(struct epoll_event) * mevents);
256     if (engine->u.epoll.events == NULL) {
257         goto fail;
258     }
259 
260     engine->u.epoll.fd = epoll_create(1);
261     if (engine->u.epoll.fd == -1) {
262         nxt_log(&engine->task, NXT_LOG_CRIT, "epoll_create() failed %E",
263                 nxt_errno);
264         goto fail;
265     }
266 
267     nxt_debug(&engine->task, "epoll_create(): %d", engine->u.epoll.fd);
268 
269     if (engine->signals != NULL) {
270 
271 #if (NXT_HAVE_SIGNALFD)
272 
273         if (nxt_epoll_add_signal(engine) != NXT_OK) {
274             goto fail;
275         }
276 
277 #endif
278 
279         nxt_epoll_test_accept4(engine, io);
280     }
281 
282     return NXT_OK;
283 
284 fail:
285 
286     nxt_epoll_free(engine);
287 
288     return NXT_ERROR;
289 }
290 
291 
292 static void
293 nxt_epoll_test_accept4(nxt_event_engine_t *engine, nxt_event_conn_io_t *io)
294 {
295     static nxt_work_handler_t  handler;
296 
297     if (handler == NULL) {
298 
299         handler = io->accept;
300 
301 #if (NXT_HAVE_ACCEPT4)
302 
303         (void) accept4(-1, NULL, NULL, SOCK_NONBLOCK);
304 
305         if (nxt_errno != NXT_ENOSYS) {
306             handler = nxt_epoll_event_conn_io_accept4;
307 
308         } else {
309             nxt_log(&engine->task, NXT_LOG_INFO, "accept4() failed %E",
310                     NXT_ENOSYS);
311         }
312 
313 #endif
314     }
315 
316     io->accept = handler;
317 }
318 
319 
320 static void
321 nxt_epoll_free(nxt_event_engine_t *engine)
322 {
323     int  fd;
324 
325     nxt_debug(&engine->task, "epoll %d free", engine->u.epoll.fd);
326 
327 #if (NXT_HAVE_SIGNALFD)
328 
329     fd = engine->u.epoll.signalfd.fd;
330 
331     if (fd != -1 && close(fd) != 0) {
332         nxt_log(&engine->task, NXT_LOG_CRIT, "signalfd close(%d) failed %E",
333                 fd, nxt_errno);
334     }
335 
336 #endif
337 
338 #if (NXT_HAVE_EVENTFD)
339 
340     fd = engine->u.epoll.eventfd.fd;
341 
342     if (fd != -1 && close(fd) != 0) {
343         nxt_log(&engine->task, NXT_LOG_CRIT, "eventfd close(%d) failed %E",
344                 fd, nxt_errno);
345     }
346 
347 #endif
348 
349     fd = engine->u.epoll.fd;
350 
351     if (fd != -1 && close(fd) != 0) {
352         nxt_log(&engine->task, NXT_LOG_CRIT, "epoll close(%d) failed %E",
353                 fd, nxt_errno);
354     }
355 
356     nxt_free(engine->u.epoll.events);
357 
358     nxt_memzero(&engine->u.epoll, sizeof(nxt_epoll_engine_t));
359 }
360 
361 
362 static void
363 nxt_epoll_enable(nxt_event_engine_t *engine, nxt_fd_event_t *ev)
364 {
365     ev->read = NXT_EVENT_ACTIVE;
366     ev->write = NXT_EVENT_ACTIVE;
367 
368     nxt_epoll_change(engine, ev, EPOLL_CTL_ADD,
369                      EPOLLIN | EPOLLOUT | engine->u.epoll.mode);
370 }
371 
372 
373 static void
374 nxt_epoll_disable(nxt_event_engine_t *engine, nxt_fd_event_t *ev)
375 {
376     if (ev->read > NXT_EVENT_DISABLED || ev->write > NXT_EVENT_DISABLED) {
377 
378         ev->read = NXT_EVENT_INACTIVE;
379         ev->write = NXT_EVENT_INACTIVE;
380 
381         nxt_epoll_change(engine, ev, EPOLL_CTL_DEL, 0);
382     }
383 }
384 
385 
386 static void
387 nxt_epoll_delete(nxt_event_engine_t *engine, nxt_fd_event_t *ev)
388 {
389     if (ev->read != NXT_EVENT_INACTIVE || ev->write != NXT_EVENT_INACTIVE) {
390 
391         ev->read = NXT_EVENT_INACTIVE;
392         ev->write = NXT_EVENT_INACTIVE;
393 
394         nxt_epoll_change(engine, ev, EPOLL_CTL_DEL, 0);
395     }
396 }
397 
398 
399 /*
400  * Although calling close() on a file descriptor will remove any epoll
401  * events that reference the descriptor, in this case the close() acquires
402  * the kernel global "epmutex" while epoll_ctl(EPOLL_CTL_DEL) does not
403  * acquire the "epmutex" since Linux 3.13 if the file descriptor presents
404  * only in one epoll set.  Thus removing events explicitly before closing
405  * eliminates possible lock contention.
406  */
407 
408 static nxt_bool_t
409 nxt_epoll_close(nxt_event_engine_t *engine, nxt_fd_event_t *ev)
410 {
411     nxt_epoll_delete(engine, ev);
412 
413     return ev->changing;
414 }
415 
416 
417 static void
418 nxt_epoll_enable_read(nxt_event_engine_t *engine, nxt_fd_event_t *ev)
419 {
420     int       op;
421     uint32_t  events;
422 
423     if (ev->read != NXT_EVENT_BLOCKED) {
424 
425         op = EPOLL_CTL_MOD;
426         events = EPOLLIN | engine->u.epoll.mode;
427 
428         if (ev->read == NXT_EVENT_INACTIVE && ev->write == NXT_EVENT_INACTIVE) {
429             op = EPOLL_CTL_ADD;
430 
431         } else if (ev->write >= NXT_EVENT_BLOCKED) {
432             events |= EPOLLOUT;
433         }
434 
435         nxt_epoll_change(engine, ev, op, events);
436     }
437 
438     ev->read = NXT_EVENT_ACTIVE;
439 }
440 
441 
442 static void
443 nxt_epoll_enable_write(nxt_event_engine_t *engine, nxt_fd_event_t *ev)
444 {
445     int       op;
446     uint32_t  events;
447 
448     if (ev->write != NXT_EVENT_BLOCKED) {
449 
450         op = EPOLL_CTL_MOD;
451         events = EPOLLOUT | engine->u.epoll.mode;
452 
453         if (ev->read == NXT_EVENT_INACTIVE && ev->write == NXT_EVENT_INACTIVE) {
454             op = EPOLL_CTL_ADD;
455 
456         } else if (ev->read >= NXT_EVENT_BLOCKED) {
457             events |= EPOLLIN;
458         }
459 
460         nxt_epoll_change(engine, ev, op, events);
461     }
462 
463     ev->write = NXT_EVENT_ACTIVE;
464 }
465 
466 
467 static void
468 nxt_epoll_disable_read(nxt_event_engine_t *engine, nxt_fd_event_t *ev)
469 {
470     int       op;
471     uint32_t  events;
472 
473     ev->read = NXT_EVENT_INACTIVE;
474 
475     if (ev->write <= NXT_EVENT_DISABLED) {
476         ev->write = NXT_EVENT_INACTIVE;
477         op = EPOLL_CTL_DEL;
478         events = 0;
479 
480     } else {
481         op = EPOLL_CTL_MOD;
482         events = EPOLLOUT | engine->u.epoll.mode;
483     }
484 
485     nxt_epoll_change(engine, ev, op, events);
486 }
487 
488 
489 static void
490 nxt_epoll_disable_write(nxt_event_engine_t *engine, nxt_fd_event_t *ev)
491 {
492     int       op;
493     uint32_t  events;
494 
495     ev->write = NXT_EVENT_INACTIVE;
496 
497     if (ev->read <= NXT_EVENT_DISABLED) {
498         ev->write = NXT_EVENT_INACTIVE;
499         op = EPOLL_CTL_DEL;
500         events = 0;
501 
502     } else {
503         op = EPOLL_CTL_MOD;
504         events = EPOLLIN | engine->u.epoll.mode;
505     }
506 
507     nxt_epoll_change(engine, ev, op, events);
508 }
509 
510 
511 static void
512 nxt_epoll_block_read(nxt_event_engine_t *engine, nxt_fd_event_t *ev)
513 {
514     if (ev->read != NXT_EVENT_INACTIVE) {
515         ev->read = NXT_EVENT_BLOCKED;
516     }
517 }
518 
519 
520 static void
521 nxt_epoll_block_write(nxt_event_engine_t *engine, nxt_fd_event_t *ev)
522 {
523     if (ev->write != NXT_EVENT_INACTIVE) {
524         ev->write = NXT_EVENT_BLOCKED;
525     }
526 }
527 
528 
529 /*
530  * NXT_EVENT_DISABLED state is used to track whether EPOLLONESHOT
531  * event should be added or modified, epoll_ctl(2):
532  *
533  * EPOLLONESHOT (since Linux 2.6.2)
534  *     Sets the one-shot behavior for the associated file descriptor.
535  *     This means that after an event is pulled out with epoll_wait(2)
536  *     the associated file descriptor is internally disabled and no
537  *     other events will be reported by the epoll interface.  The user
538  *     must call epoll_ctl() with EPOLL_CTL_MOD to rearm the file
539  *     descriptor with a new event mask.
540  */
541 
542 static void
543 nxt_epoll_oneshot_read(nxt_event_engine_t *engine, nxt_fd_event_t *ev)
544 {
545     int  op;
546 
547     op = (ev->read == NXT_EVENT_INACTIVE && ev->write == NXT_EVENT_INACTIVE) ?
548              EPOLL_CTL_ADD : EPOLL_CTL_MOD;
549 
550     ev->read = NXT_EVENT_ONESHOT;
551     ev->write = NXT_EVENT_INACTIVE;
552 
553     nxt_epoll_change(engine, ev, op, EPOLLIN | EPOLLONESHOT);
554 }
555 
556 
557 static void
558 nxt_epoll_oneshot_write(nxt_event_engine_t *engine, nxt_fd_event_t *ev)
559 {
560     int  op;
561 
562     op = (ev->read == NXT_EVENT_INACTIVE && ev->write == NXT_EVENT_INACTIVE) ?
563              EPOLL_CTL_ADD : EPOLL_CTL_MOD;
564 
565     ev->read = NXT_EVENT_INACTIVE;
566     ev->write = NXT_EVENT_ONESHOT;
567 
568     nxt_epoll_change(engine, ev, op, EPOLLOUT | EPOLLONESHOT);
569 }
570 
571 
572 static void
573 nxt_epoll_enable_accept(nxt_event_engine_t *engine, nxt_fd_event_t *ev)
574 {
575     ev->read = NXT_EVENT_ACTIVE;
576 
577     nxt_epoll_change(engine, ev, EPOLL_CTL_ADD, EPOLLIN);
578 }
579 
580 
581 /*
582  * epoll changes are batched to improve instruction and data cache
583  * locality of several epoll_ctl() calls followed by epoll_wait() call.
584  */
585 
586 static void
587 nxt_epoll_change(nxt_event_engine_t *engine, nxt_fd_event_t *ev, int op,
588     uint32_t events)
589 {
590     nxt_epoll_change_t  *change;
591 
592     nxt_debug(ev->task, "epoll %d set event: fd:%d op:%d ev:%XD",
593               engine->u.epoll.fd, ev->fd, op, events);
594 
595     if (engine->u.epoll.nchanges >= engine->u.epoll.mchanges) {
596         (void) nxt_epoll_commit_changes(engine);
597     }
598 
599     ev->changing = 1;
600 
601     change = &engine->u.epoll.changes[engine->u.epoll.nchanges++];
602     change->op = op;
603     change->event.events = events;
604     change->event.data.ptr = ev;
605 }
606 
607 
608 static nxt_int_t
609 nxt_epoll_commit_changes(nxt_event_engine_t *engine)
610 {
611     int                 ret;
612     nxt_int_t           retval;
613     nxt_fd_event_t      *ev;
614     nxt_epoll_change_t  *change, *end;
615 
616     nxt_debug(&engine->task, "epoll %d changes:%ui",
617               engine->u.epoll.fd, engine->u.epoll.nchanges);
618 
619     retval = NXT_OK;
620     change = engine->u.epoll.changes;
621     end = change + engine->u.epoll.nchanges;
622 
623     do {
624         ev = change->event.data.ptr;
625         ev->changing = 0;
626 
627         nxt_debug(ev->task, "epoll_ctl(%d): fd:%d op:%d ev:%XD",
628                   engine->u.epoll.fd, ev->fd, change->op,
629                   change->event.events);
630 
631         ret = epoll_ctl(engine->u.epoll.fd, change->op, ev->fd, &change->event);
632 
633         if (nxt_slow_path(ret != 0)) {
634             nxt_log(ev->task, NXT_LOG_CRIT, "epoll_ctl(%d, %d, %d) failed %E",
635                     engine->u.epoll.fd, change->op, ev->fd, nxt_errno);
636 
637             nxt_work_queue_add(&engine->fast_work_queue,
638                                nxt_epoll_error_handler, ev->task, ev, ev->data);
639 
640             retval = NXT_ERROR;
641         }
642 
643         change++;
644 
645     } while (change < end);
646 
647     engine->u.epoll.nchanges = 0;
648 
649     return retval;
650 }
651 
652 
653 static void
654 nxt_epoll_error_handler(nxt_task_t *task, void *obj, void *data)
655 {
656     nxt_fd_event_t  *ev;
657 
658     ev = obj;
659 
660     ev->read = NXT_EVENT_INACTIVE;
661     ev->write = NXT_EVENT_INACTIVE;
662 
663     ev->error_handler(ev->task, ev, data);
664 }
665 
666 
667 #if (NXT_HAVE_SIGNALFD)
668 
669 static nxt_int_t
670 nxt_epoll_add_signal(nxt_event_engine_t *engine)
671 {
672     int                 fd;
673     struct epoll_event  ee;
674 
675     if (sigprocmask(SIG_BLOCK, &engine->signals->sigmask, NULL) != 0) {
676         nxt_log(&engine->task, NXT_LOG_CRIT,
677                 "sigprocmask(SIG_BLOCK) failed %E", nxt_errno);
678         return NXT_ERROR;
679     }
680 
681     /*
682      * Glibc signalfd() wrapper always has the flags argument.  Glibc 2.7
683      * and 2.8 signalfd() wrappers call the original signalfd() syscall
684      * without the flags argument.  Glibc 2.9+ signalfd() wrapper at first
685      * tries to call signalfd4() syscall and if it fails then calls the
686      * original signalfd() syscall.  For this reason the non-blocking mode
687      * is set separately.
688      */
689 
690     fd = signalfd(-1, &engine->signals->sigmask, 0);
691 
692     if (fd == -1) {
693         nxt_log(&engine->task, NXT_LOG_CRIT, "signalfd(%d) failed %E",
694                 engine->u.epoll.signalfd.fd, nxt_errno);
695         return NXT_ERROR;
696     }
697 
698     engine->u.epoll.signalfd.fd = fd;
699 
700     if (nxt_fd_nonblocking(&engine->task, fd) != NXT_OK) {
701         return NXT_ERROR;
702     }
703 
704     nxt_debug(&engine->task, "signalfd(): %d", fd);
705 
706     engine->u.epoll.signalfd.data = engine->signals->handler;
707     engine->u.epoll.signalfd.read_work_queue = &engine->fast_work_queue;
708     engine->u.epoll.signalfd.read_handler = nxt_epoll_signalfd_handler;
709     engine->u.epoll.signalfd.log = engine->task.log;
710     engine->u.epoll.signalfd.task = &engine->task;
711 
712     ee.events = EPOLLIN;
713     ee.data.ptr = &engine->u.epoll.signalfd;
714 
715     if (epoll_ctl(engine->u.epoll.fd, EPOLL_CTL_ADD, fd, &ee) != 0) {
716         nxt_log(&engine->task, NXT_LOG_CRIT, "epoll_ctl(%d, %d, %d) failed %E",
717                 engine->u.epoll.fd, EPOLL_CTL_ADD, fd, nxt_errno);
718 
719         return NXT_ERROR;
720     }
721 
722     return NXT_OK;
723 }
724 
725 
726 static void
727 nxt_epoll_signalfd_handler(nxt_task_t *task, void *obj, void *data)
728 {
729     int                      n;
730     nxt_fd_event_t           *ev;
731     nxt_work_handler_t       handler;
732     struct signalfd_siginfo  sfd;
733 
734     ev = obj;
735     handler = data;
736 
737     nxt_debug(task, "signalfd handler");
738 
739     n = read(ev->fd, &sfd, sizeof(struct signalfd_siginfo));
740 
741     nxt_debug(task, "read signalfd(%d): %d", ev->fd, n);
742 
743     if (n != sizeof(struct signalfd_siginfo)) {
744         nxt_log(task, NXT_LOG_CRIT, "read signalfd(%d) failed %E",
745                 ev->fd, nxt_errno);
746     }
747 
748     nxt_debug(task, "signalfd(%d) signo:%d", ev->fd, sfd.ssi_signo);
749 
750     handler(task, (void *) (uintptr_t) sfd.ssi_signo, NULL);
751 }
752 
753 #endif
754 
755 
756 #if (NXT_HAVE_EVENTFD)
757 
758 static nxt_int_t
759 nxt_epoll_enable_post(nxt_event_engine_t *engine, nxt_work_handler_t handler)
760 {
761     int                 ret;
762     struct epoll_event  ee;
763 
764     engine->u.epoll.post_handler = handler;
765 
766     /*
767      * Glibc eventfd() wrapper always has the flags argument.  Glibc 2.7
768      * and 2.8 eventfd() wrappers call the original eventfd() syscall
769      * without the flags argument.  Glibc 2.9+ eventfd() wrapper at first
770      * tries to call eventfd2() syscall and if it fails then calls the
771      * original eventfd() syscall.  For this reason the non-blocking mode
772      * is set separately.
773      */
774 
775     engine->u.epoll.eventfd.fd = eventfd(0, 0);
776 
777     if (engine->u.epoll.eventfd.fd == -1) {
778         nxt_log(&engine->task, NXT_LOG_CRIT, "eventfd() failed %E", nxt_errno);
779         return NXT_ERROR;
780     }
781 
782     ret = nxt_fd_nonblocking(&engine->task, engine->u.epoll.eventfd.fd);
783     if (nxt_slow_path(ret != NXT_OK)) {
784         return NXT_ERROR;
785     }
786 
787     nxt_debug(&engine->task, "eventfd(): %d", engine->u.epoll.eventfd.fd);
788 
789     engine->u.epoll.eventfd.read_work_queue = &engine->fast_work_queue;
790     engine->u.epoll.eventfd.read_handler = nxt_epoll_eventfd_handler;
791     engine->u.epoll.eventfd.data = engine;
792     engine->u.epoll.eventfd.log = engine->task.log;
793     engine->u.epoll.eventfd.task = &engine->task;
794 
795     ee.events = EPOLLIN | EPOLLET;
796     ee.data.ptr = &engine->u.epoll.eventfd;
797 
798     ret = epoll_ctl(engine->u.epoll.fd, EPOLL_CTL_ADD,
799                     engine->u.epoll.eventfd.fd, &ee);
800 
801     if (nxt_fast_path(ret == 0)) {
802         return NXT_OK;
803     }
804 
805     nxt_log(&engine->task, NXT_LOG_CRIT, "epoll_ctl(%d, %d, %d) failed %E",
806             engine->u.epoll.fd, EPOLL_CTL_ADD, engine->u.epoll.eventfd.fd,
807             nxt_errno);
808 
809     return NXT_ERROR;
810 }
811 
812 
813 static void
814 nxt_epoll_eventfd_handler(nxt_task_t *task, void *obj, void *data)
815 {
816     int                 n;
817     uint64_t            events;
818     nxt_event_engine_t  *engine;
819 
820     engine = data;
821 
822     nxt_debug(task, "eventfd handler, times:%ui", engine->u.epoll.neventfd);
823 
824     /*
825      * The maximum value after write() to a eventfd() descriptor will
826      * block or return EAGAIN is 0xfffffffffffffffe, so the descriptor
827      * can be read once per many notifications, for example, once per
828      * 2^32-2 noticifcations.  Since the eventfd() file descriptor is
829      * always registered in EPOLLET mode, epoll returns event about
830      * only the latest write() to the descriptor.
831      */
832 
833     if (engine->u.epoll.neventfd++ >= 0xfffffffe) {
834         engine->u.epoll.neventfd = 0;
835 
836         n = read(engine->u.epoll.eventfd.fd, &events, sizeof(uint64_t));
837 
838         nxt_debug(task, "read(%d): %d events:%uL",
839                   engine->u.epoll.eventfd.fd, n, events);
840 
841         if (n != sizeof(uint64_t)) {
842             nxt_log(task, NXT_LOG_CRIT, "read eventfd(%d) failed %E",
843                     engine->u.epoll.eventfd.fd, nxt_errno);
844         }
845     }
846 
847     engine->u.epoll.post_handler(task, NULL, NULL);
848 }
849 
850 
851 static void
852 nxt_epoll_signal(nxt_event_engine_t *engine, nxt_uint_t signo)
853 {
854     size_t    ret;
855     uint64_t  event;
856 
857     /*
858      * eventfd() presents along with signalfd(), so the function
859      * is used only to post events and the signo argument is ignored.
860      */
861 
862     event = 1;
863 
864     ret = write(engine->u.epoll.eventfd.fd, &event, sizeof(uint64_t));
865 
866     if (nxt_slow_path(ret != sizeof(uint64_t))) {
867         nxt_log(&engine->task, NXT_LOG_CRIT, "write(%d) to eventfd failed %E",
868                 engine->u.epoll.eventfd.fd, nxt_errno);
869     }
870 }
871 
872 #endif
873 
874 
875 static void
876 nxt_epoll_poll(nxt_event_engine_t *engine, nxt_msec_t timeout)
877 {
878     int                 nevents;
879     uint32_t            events;
880     nxt_int_t           i;
881     nxt_err_t           err;
882     nxt_bool_t          error;
883     nxt_uint_t          level;
884     nxt_fd_event_t      *ev;
885     struct epoll_event  *event;
886 
887     if (engine->u.epoll.nchanges != 0) {
888         if (nxt_epoll_commit_changes(engine) != NXT_OK) {
889             /* Error handlers have been enqueued on failure. */
890             timeout = 0;
891         }
892     }
893 
894     nxt_debug(&engine->task, "epoll_wait(%d) timeout:%M",
895               engine->u.epoll.fd, timeout);
896 
897     nevents = epoll_wait(engine->u.epoll.fd, engine->u.epoll.events,
898                          engine->u.epoll.mevents, timeout);
899 
900     err = (nevents == -1) ? nxt_errno : 0;
901 
902     nxt_thread_time_update(engine->task.thread);
903 
904     nxt_debug(&engine->task, "epoll_wait(%d): %d", engine->u.epoll.fd, nevents);
905 
906     if (nevents == -1) {
907         level = (err == NXT_EINTR) ? NXT_LOG_INFO : NXT_LOG_CRIT;
908 
909         nxt_log(&engine->task, level, "epoll_wait(%d) failed %E",
910                 engine->u.epoll.fd, err);
911 
912         return;
913     }
914 
915     for (i = 0; i < nevents; i++) {
916 
917         event = &engine->u.epoll.events[i];
918         events = event->events;
919         ev = event->data.ptr;
920 
921         nxt_debug(ev->task, "epoll: fd:%d ev:%04XD d:%p rd:%d wr:%d",
922                   ev->fd, events, ev, ev->read, ev->write);
923 
924         /*
925          * On error epoll may set EPOLLERR and EPOLLHUP only without EPOLLIN or
926          * EPOLLOUT, so the "error" variable enqueues only one active handler.
927          */
928         error = ((events & (EPOLLERR | EPOLLHUP)) != 0);
929         ev->epoll_error = error;
930 
931 #if (NXT_HAVE_EPOLL_EDGE)
932 
933         ev->epoll_eof = ((events & EPOLLRDHUP) != 0);
934 
935 #endif
936 
937         if ((events & EPOLLIN) || error) {
938             ev->read_ready = 1;
939 
940             if (ev->read != NXT_EVENT_BLOCKED) {
941 
942                 if (ev->read == NXT_EVENT_ONESHOT) {
943                     ev->read = NXT_EVENT_DISABLED;
944                 }
945 
946                 error = 0;
947 
948                 nxt_work_queue_add(ev->read_work_queue, ev->read_handler,
949                                    ev->task, ev, ev->data);
950 
951             } else if (engine->u.epoll.mode == 0) {
952                 /* Level-triggered mode. */
953                 nxt_epoll_disable_read(engine, ev);
954             }
955         }
956 
957         if ((events & EPOLLOUT) || error) {
958             ev->write_ready = 1;
959 
960             if (ev->write != NXT_EVENT_BLOCKED) {
961 
962                 if (ev->write == NXT_EVENT_ONESHOT) {
963                     ev->write = NXT_EVENT_DISABLED;
964                 }
965 
966                 error = 0;
967 
968                 nxt_work_queue_add(ev->write_work_queue, ev->write_handler,
969                                    ev->task, ev, ev->data);
970 
971             } else if (engine->u.epoll.mode == 0) {
972                 /* Level-triggered mode. */
973                 nxt_epoll_disable_write(engine, ev);
974             }
975         }
976 
977         if (error) {
978             ev->read_ready = 1;
979             ev->write_ready = 1;
980         }
981     }
982 }
983 
984 
985 #if (NXT_HAVE_ACCEPT4)
986 
987 static void
988 nxt_epoll_event_conn_io_accept4(nxt_task_t *task, void *obj, void *data)
989 {
990     socklen_t                len;
991     nxt_socket_t             s;
992     struct sockaddr          *sa;
993     nxt_event_conn_t         *c;
994     nxt_event_conn_listen_t  *cls;
995 
996     cls = obj;
997     c = data;
998 
999     cls->ready--;
1000     cls->socket.read_ready = (cls->ready != 0);
1001 
1002     len = c->remote->socklen;
1003 
1004     if (len >= sizeof(struct sockaddr)) {
1005         sa = &c->remote->u.sockaddr;
1006 
1007     } else {
1008         sa = NULL;
1009         len = 0;
1010     }
1011 
1012     s = accept4(cls->socket.fd, sa, &len, SOCK_NONBLOCK);
1013 
1014     if (s != -1) {
1015         c->socket.fd = s;
1016 
1017         nxt_debug(task, "accept4(%d): %d", cls->socket.fd, s);
1018 
1019         nxt_event_conn_accept(task, cls, c);
1020         return;
1021     }
1022 
1023     nxt_event_conn_accept_error(task, cls, "accept4", nxt_errno);
1024 }
1025 
1026 #endif
1027 
1028 
1029 #if (NXT_HAVE_EPOLL_EDGE)
1030 
1031 /*
1032  * nxt_epoll_edge_event_conn_io_connect() eliminates the getsockopt()
1033  * syscall to test pending connect() error.  Although this special
1034  * interface can work in both edge-triggered and level-triggered
1035  * modes it is enabled only for the former mode because this mode is
1036  * available in all modern Linux distributions.  For the latter mode
1037  * it is required to create additional nxt_epoll_level_event_conn_io
1038  * with single non-generic connect() interface.
1039  */
1040 
1041 static void
1042 nxt_epoll_edge_event_conn_io_connect(nxt_task_t *task, void *obj, void *data)
1043 {
1044     nxt_event_conn_t              *c;
1045     nxt_event_engine_t            *engine;
1046     nxt_work_handler_t            handler;
1047     const nxt_event_conn_state_t  *state;
1048 
1049     c = obj;
1050 
1051     state = c->write_state;
1052 
1053     switch (nxt_socket_connect(task, c->socket.fd, c->remote) ){
1054 
1055     case NXT_OK:
1056         c->socket.write_ready = 1;
1057         handler = state->ready_handler;
1058         break;
1059 
1060     case NXT_AGAIN:
1061         c->socket.write_handler = nxt_epoll_edge_event_conn_connected;
1062         c->socket.error_handler = nxt_event_conn_connect_error;
1063 
1064         engine = task->thread->engine;
1065         nxt_event_conn_timer(engine, c, state, &c->write_timer);
1066 
1067         nxt_epoll_enable(engine, &c->socket);
1068         c->socket.read = NXT_EVENT_BLOCKED;
1069         return;
1070 
1071 #if 0
1072     case NXT_AGAIN:
1073         nxt_event_conn_timer(engine, c, state, &c->write_timer);
1074 
1075         /* Fall through. */
1076 
1077     case NXT_OK:
1078         /*
1079          * Mark both read and write directions as ready and try to perform
1080          * I/O operations before receiving readiness notifications.
1081          * On unconnected socket Linux send() and recv() return EAGAIN
1082          * instead of ENOTCONN.
1083          */
1084         c->socket.read_ready = 1;
1085         c->socket.write_ready = 1;
1086         /*
1087          * Enabling both read and write notifications on a getting
1088          * connected socket eliminates one epoll_ctl() syscall.
1089          */
1090         c->socket.write_handler = nxt_epoll_edge_event_conn_connected;
1091         c->socket.error_handler = state->error_handler;
1092 
1093         nxt_epoll_enable(engine, &c->socket);
1094         c->socket.read = NXT_EVENT_BLOCKED;
1095 
1096         handler = state->ready_handler;
1097         break;
1098 #endif
1099 
1100     case NXT_ERROR:
1101         handler = state->error_handler;
1102         break;
1103 
1104     default:  /* NXT_DECLINED: connection refused. */
1105         handler = state->close_handler;
1106         break;
1107     }
1108 
1109     nxt_work_queue_add(c->write_work_queue, handler, task, c, data);
1110 }
1111 
1112 
1113 static void
1114 nxt_epoll_edge_event_conn_connected(nxt_task_t *task, void *obj, void *data)
1115 {
1116     nxt_event_conn_t  *c;
1117 
1118     c = obj;
1119 
1120     nxt_debug(task, "epoll event conn connected fd:%d", c->socket.fd);
1121 
1122     if (!c->socket.epoll_error) {
1123         c->socket.write = NXT_EVENT_BLOCKED;
1124 
1125         if (c->write_state->autoreset_timer) {
1126             nxt_timer_disable(task->thread->engine, &c->write_timer);
1127         }
1128 
1129         nxt_work_queue_add(c->write_work_queue, c->write_state->ready_handler,
1130                            task, c, data);
1131         return;
1132     }
1133 
1134     nxt_event_conn_connect_test(task, c, data);
1135 }
1136 
1137 
1138 /*
1139  * nxt_epoll_edge_event_conn_io_recvbuf() is just wrapper around
1140  * standard nxt_event_conn_io_recvbuf() to enforce to read a pending EOF
1141  * in edge-triggered mode.
1142  */
1143 
1144 static ssize_t
1145 nxt_epoll_edge_event_conn_io_recvbuf(nxt_event_conn_t *c, nxt_buf_t *b)
1146 {
1147     ssize_t  n;
1148 
1149     n = nxt_event_conn_io_recvbuf(c, b);
1150 
1151     if (n > 0 && c->socket.epoll_eof) {
1152         c->socket.read_ready = 1;
1153     }
1154 
1155     return n;
1156 }
1157 
1158 #endif
1159