xref: /unit/src/nxt_epoll_engine.c (revision 1456:b70c731c278d)
1 
2 /*
3  * Copyright (C) Igor Sysoev
4  * Copyright (C) NGINX, Inc.
5  */
6 
7 #include <nxt_main.h>
8 
9 
10 /*
11  * The first epoll version has been introduced in Linux 2.5.44.  The
12  * interface was changed several times since then and the final version
13  * of epoll_create(), epoll_ctl(), epoll_wait(), and EPOLLET mode has
14  * been introduced in Linux 2.6.0 and is supported since glibc 2.3.2.
15  *
16  * EPOLLET mode did not work reliable in early implementaions and in
17  * Linux 2.4 backport.
18  *
19  * EPOLLONESHOT             Linux 2.6.2,  glibc 2.3.
20  * EPOLLRDHUP               Linux 2.6.17, glibc 2.8.
21  * epoll_pwait()            Linux 2.6.19, glibc 2.6.
22  * signalfd()               Linux 2.6.22, glibc 2.7.
23  * eventfd()                Linux 2.6.22, glibc 2.7.
24  * timerfd_create()         Linux 2.6.25, glibc 2.8.
25  * epoll_create1()          Linux 2.6.27, glibc 2.9.
26  * signalfd4()              Linux 2.6.27, glibc 2.9.
27  * eventfd2()               Linux 2.6.27, glibc 2.9.
28  * accept4()                Linux 2.6.28, glibc 2.10.
29  * eventfd2(EFD_SEMAPHORE)  Linux 2.6.30, glibc 2.10.
30  * EPOLLEXCLUSIVE           Linux 4.5, glibc 2.24.
31  */
32 
33 
34 #if (NXT_HAVE_EPOLL_EDGE)
35 static nxt_int_t nxt_epoll_edge_create(nxt_event_engine_t *engine,
36     nxt_uint_t mchanges, nxt_uint_t mevents);
37 #endif
38 static nxt_int_t nxt_epoll_level_create(nxt_event_engine_t *engine,
39     nxt_uint_t mchanges, nxt_uint_t mevents);
40 static nxt_int_t nxt_epoll_create(nxt_event_engine_t *engine,
41     nxt_uint_t mchanges, nxt_uint_t mevents, nxt_conn_io_t *io, uint32_t mode);
42 static void nxt_epoll_test_accept4(nxt_event_engine_t *engine,
43     nxt_conn_io_t *io);
44 static void nxt_epoll_free(nxt_event_engine_t *engine);
45 static void nxt_epoll_enable(nxt_event_engine_t *engine, nxt_fd_event_t *ev);
46 static void nxt_epoll_disable(nxt_event_engine_t *engine, nxt_fd_event_t *ev);
47 static void nxt_epoll_delete(nxt_event_engine_t *engine, nxt_fd_event_t *ev);
48 static nxt_bool_t nxt_epoll_close(nxt_event_engine_t *engine,
49     nxt_fd_event_t *ev);
50 static void nxt_epoll_enable_read(nxt_event_engine_t *engine,
51     nxt_fd_event_t *ev);
52 static void nxt_epoll_enable_write(nxt_event_engine_t *engine,
53     nxt_fd_event_t *ev);
54 static void nxt_epoll_disable_read(nxt_event_engine_t *engine,
55     nxt_fd_event_t *ev);
56 static void nxt_epoll_disable_write(nxt_event_engine_t *engine,
57     nxt_fd_event_t *ev);
58 static void nxt_epoll_block_read(nxt_event_engine_t *engine,
59     nxt_fd_event_t *ev);
60 static void nxt_epoll_block_write(nxt_event_engine_t *engine,
61     nxt_fd_event_t *ev);
62 static void nxt_epoll_oneshot_read(nxt_event_engine_t *engine,
63     nxt_fd_event_t *ev);
64 static void nxt_epoll_oneshot_write(nxt_event_engine_t *engine,
65     nxt_fd_event_t *ev);
66 static void nxt_epoll_enable_accept(nxt_event_engine_t *engine,
67     nxt_fd_event_t *ev);
68 static void nxt_epoll_change(nxt_event_engine_t *engine, nxt_fd_event_t *ev,
69     int op, uint32_t events);
70 static void nxt_epoll_commit_changes(nxt_event_engine_t *engine);
71 static void nxt_epoll_error_handler(nxt_task_t *task, void *obj, void *data);
72 #if (NXT_HAVE_SIGNALFD)
73 static nxt_int_t nxt_epoll_add_signal(nxt_event_engine_t *engine);
74 static void nxt_epoll_signalfd_handler(nxt_task_t *task, void *obj, void *data);
75 #endif
76 #if (NXT_HAVE_EVENTFD)
77 static nxt_int_t nxt_epoll_enable_post(nxt_event_engine_t *engine,
78     nxt_work_handler_t handler);
79 static void nxt_epoll_eventfd_handler(nxt_task_t *task, void *obj, void *data);
80 static void nxt_epoll_signal(nxt_event_engine_t *engine, nxt_uint_t signo);
81 #endif
82 static void nxt_epoll_poll(nxt_event_engine_t *engine, nxt_msec_t timeout);
83 
84 #if (NXT_HAVE_ACCEPT4)
85 static void nxt_epoll_conn_io_accept4(nxt_task_t *task, void *obj,
86     void *data);
87 #endif
88 
89 
90 #if (NXT_HAVE_EPOLL_EDGE)
91 
92 static void nxt_epoll_edge_conn_io_connect(nxt_task_t *task, void *obj,
93     void *data);
94 static void nxt_epoll_edge_conn_connected(nxt_task_t *task, void *obj,
95     void *data);
96 static ssize_t nxt_epoll_edge_conn_io_recvbuf(nxt_conn_t *c, nxt_buf_t *b);
97 
98 
99 static nxt_conn_io_t  nxt_epoll_edge_conn_io = {
100     .connect = nxt_epoll_edge_conn_io_connect,
101     .accept = nxt_conn_io_accept,
102 
103     .read = nxt_conn_io_read,
104     .recvbuf = nxt_epoll_edge_conn_io_recvbuf,
105     .recv = nxt_conn_io_recv,
106 
107     .write = nxt_conn_io_write,
108     .sendbuf = nxt_conn_io_sendbuf,
109 
110 #if (NXT_HAVE_LINUX_SENDFILE)
111     .old_sendbuf = nxt_linux_event_conn_io_sendfile,
112 #else
113     .old_sendbuf = nxt_event_conn_io_sendbuf,
114 #endif
115 
116     .writev = nxt_event_conn_io_writev,
117     .send = nxt_event_conn_io_send,
118 };
119 
120 
121 const nxt_event_interface_t  nxt_epoll_edge_engine = {
122     "epoll_edge",
123     nxt_epoll_edge_create,
124     nxt_epoll_free,
125     nxt_epoll_enable,
126     nxt_epoll_disable,
127     nxt_epoll_delete,
128     nxt_epoll_close,
129     nxt_epoll_enable_read,
130     nxt_epoll_enable_write,
131     nxt_epoll_disable_read,
132     nxt_epoll_disable_write,
133     nxt_epoll_block_read,
134     nxt_epoll_block_write,
135     nxt_epoll_oneshot_read,
136     nxt_epoll_oneshot_write,
137     nxt_epoll_enable_accept,
138     NULL,
139     NULL,
140 #if (NXT_HAVE_EVENTFD)
141     nxt_epoll_enable_post,
142     nxt_epoll_signal,
143 #else
144     NULL,
145     NULL,
146 #endif
147     nxt_epoll_poll,
148 
149     &nxt_epoll_edge_conn_io,
150 
151 #if (NXT_HAVE_INOTIFY)
152     NXT_FILE_EVENTS,
153 #else
154     NXT_NO_FILE_EVENTS,
155 #endif
156 
157 #if (NXT_HAVE_SIGNALFD)
158     NXT_SIGNAL_EVENTS,
159 #else
160     NXT_NO_SIGNAL_EVENTS,
161 #endif
162 };
163 
164 #endif
165 
166 
167 const nxt_event_interface_t  nxt_epoll_level_engine = {
168     "epoll_level",
169     nxt_epoll_level_create,
170     nxt_epoll_free,
171     nxt_epoll_enable,
172     nxt_epoll_disable,
173     nxt_epoll_delete,
174     nxt_epoll_close,
175     nxt_epoll_enable_read,
176     nxt_epoll_enable_write,
177     nxt_epoll_disable_read,
178     nxt_epoll_disable_write,
179     nxt_epoll_block_read,
180     nxt_epoll_block_write,
181     nxt_epoll_oneshot_read,
182     nxt_epoll_oneshot_write,
183     nxt_epoll_enable_accept,
184     NULL,
185     NULL,
186 #if (NXT_HAVE_EVENTFD)
187     nxt_epoll_enable_post,
188     nxt_epoll_signal,
189 #else
190     NULL,
191     NULL,
192 #endif
193     nxt_epoll_poll,
194 
195     &nxt_unix_conn_io,
196 
197 #if (NXT_HAVE_INOTIFY)
198     NXT_FILE_EVENTS,
199 #else
200     NXT_NO_FILE_EVENTS,
201 #endif
202 
203 #if (NXT_HAVE_SIGNALFD)
204     NXT_SIGNAL_EVENTS,
205 #else
206     NXT_NO_SIGNAL_EVENTS,
207 #endif
208 };
209 
210 
211 #if (NXT_HAVE_EPOLL_EDGE)
212 
213 static nxt_int_t
nxt_epoll_edge_create(nxt_event_engine_t * engine,nxt_uint_t mchanges,nxt_uint_t mevents)214 nxt_epoll_edge_create(nxt_event_engine_t *engine, nxt_uint_t mchanges,
215     nxt_uint_t mevents)
216 {
217     return nxt_epoll_create(engine, mchanges, mevents, &nxt_epoll_edge_conn_io,
218                             EPOLLET | EPOLLRDHUP);
219 }
220 
221 #endif
222 
223 
224 static nxt_int_t
nxt_epoll_level_create(nxt_event_engine_t * engine,nxt_uint_t mchanges,nxt_uint_t mevents)225 nxt_epoll_level_create(nxt_event_engine_t *engine, nxt_uint_t mchanges,
226     nxt_uint_t mevents)
227 {
228     return nxt_epoll_create(engine, mchanges, mevents,
229                             &nxt_unix_conn_io, 0);
230 }
231 
232 
233 static nxt_int_t
nxt_epoll_create(nxt_event_engine_t * engine,nxt_uint_t mchanges,nxt_uint_t mevents,nxt_conn_io_t * io,uint32_t mode)234 nxt_epoll_create(nxt_event_engine_t *engine, nxt_uint_t mchanges,
235     nxt_uint_t mevents, nxt_conn_io_t *io, uint32_t mode)
236 {
237     engine->u.epoll.fd = -1;
238     engine->u.epoll.mode = mode;
239     engine->u.epoll.mchanges = mchanges;
240     engine->u.epoll.mevents = mevents;
241 #if (NXT_HAVE_SIGNALFD)
242     engine->u.epoll.signalfd.fd = -1;
243 #endif
244 
245     engine->u.epoll.changes = nxt_malloc(sizeof(nxt_epoll_change_t) * mchanges);
246     if (engine->u.epoll.changes == NULL) {
247         goto fail;
248     }
249 
250     engine->u.epoll.events = nxt_malloc(sizeof(struct epoll_event) * mevents);
251     if (engine->u.epoll.events == NULL) {
252         goto fail;
253     }
254 
255     engine->u.epoll.fd = epoll_create(1);
256     if (engine->u.epoll.fd == -1) {
257         nxt_alert(&engine->task, "epoll_create() failed %E", nxt_errno);
258         goto fail;
259     }
260 
261     nxt_debug(&engine->task, "epoll_create(): %d", engine->u.epoll.fd);
262 
263     if (engine->signals != NULL) {
264 
265 #if (NXT_HAVE_SIGNALFD)
266 
267         if (nxt_epoll_add_signal(engine) != NXT_OK) {
268             goto fail;
269         }
270 
271 #endif
272 
273         nxt_epoll_test_accept4(engine, io);
274     }
275 
276     return NXT_OK;
277 
278 fail:
279 
280     nxt_epoll_free(engine);
281 
282     return NXT_ERROR;
283 }
284 
285 
286 static void
nxt_epoll_test_accept4(nxt_event_engine_t * engine,nxt_conn_io_t * io)287 nxt_epoll_test_accept4(nxt_event_engine_t *engine, nxt_conn_io_t *io)
288 {
289     static nxt_work_handler_t  handler;
290 
291     if (handler == NULL) {
292 
293         handler = io->accept;
294 
295 #if (NXT_HAVE_ACCEPT4)
296 
297         (void) accept4(-1, NULL, NULL, SOCK_NONBLOCK);
298 
299         if (nxt_errno != NXT_ENOSYS) {
300             handler = nxt_epoll_conn_io_accept4;
301 
302         } else {
303             nxt_log(&engine->task, NXT_LOG_INFO, "accept4() failed %E",
304                     NXT_ENOSYS);
305         }
306 
307 #endif
308     }
309 
310     io->accept = handler;
311 }
312 
313 
314 static void
nxt_epoll_free(nxt_event_engine_t * engine)315 nxt_epoll_free(nxt_event_engine_t *engine)
316 {
317     int  fd;
318 
319     nxt_debug(&engine->task, "epoll %d free", engine->u.epoll.fd);
320 
321 #if (NXT_HAVE_SIGNALFD)
322 
323     fd = engine->u.epoll.signalfd.fd;
324 
325     if (fd != -1 && close(fd) != 0) {
326         nxt_alert(&engine->task, "signalfd close(%d) failed %E", fd, nxt_errno);
327     }
328 
329 #endif
330 
331 #if (NXT_HAVE_EVENTFD)
332 
333     fd = engine->u.epoll.eventfd.fd;
334 
335     if (fd != -1 && close(fd) != 0) {
336         nxt_alert(&engine->task, "eventfd close(%d) failed %E", fd, nxt_errno);
337     }
338 
339 #endif
340 
341     fd = engine->u.epoll.fd;
342 
343     if (fd != -1 && close(fd) != 0) {
344         nxt_alert(&engine->task, "epoll close(%d) failed %E", fd, nxt_errno);
345     }
346 
347     nxt_free(engine->u.epoll.events);
348     nxt_free(engine->u.epoll.changes);
349 
350     nxt_memzero(&engine->u.epoll, sizeof(nxt_epoll_engine_t));
351 }
352 
353 
354 static void
nxt_epoll_enable(nxt_event_engine_t * engine,nxt_fd_event_t * ev)355 nxt_epoll_enable(nxt_event_engine_t *engine, nxt_fd_event_t *ev)
356 {
357     ev->read = NXT_EVENT_ACTIVE;
358     ev->write = NXT_EVENT_ACTIVE;
359 
360     nxt_epoll_change(engine, ev, EPOLL_CTL_ADD,
361                      EPOLLIN | EPOLLOUT | engine->u.epoll.mode);
362 }
363 
364 
365 static void
nxt_epoll_disable(nxt_event_engine_t * engine,nxt_fd_event_t * ev)366 nxt_epoll_disable(nxt_event_engine_t *engine, nxt_fd_event_t *ev)
367 {
368     if (ev->read > NXT_EVENT_DISABLED || ev->write > NXT_EVENT_DISABLED) {
369 
370         ev->read = NXT_EVENT_INACTIVE;
371         ev->write = NXT_EVENT_INACTIVE;
372 
373         nxt_epoll_change(engine, ev, EPOLL_CTL_DEL, 0);
374     }
375 }
376 
377 
378 static void
nxt_epoll_delete(nxt_event_engine_t * engine,nxt_fd_event_t * ev)379 nxt_epoll_delete(nxt_event_engine_t *engine, nxt_fd_event_t *ev)
380 {
381     if (ev->read != NXT_EVENT_INACTIVE || ev->write != NXT_EVENT_INACTIVE) {
382 
383         ev->read = NXT_EVENT_INACTIVE;
384         ev->write = NXT_EVENT_INACTIVE;
385 
386         nxt_epoll_change(engine, ev, EPOLL_CTL_DEL, 0);
387     }
388 }
389 
390 
391 /*
392  * Although calling close() on a file descriptor will remove any epoll
393  * events that reference the descriptor, in this case the close() acquires
394  * the kernel global "epmutex" while epoll_ctl(EPOLL_CTL_DEL) does not
395  * acquire the "epmutex" since Linux 3.13 if the file descriptor presents
396  * only in one epoll set.  Thus removing events explicitly before closing
397  * eliminates possible lock contention.
398  */
399 
400 static nxt_bool_t
nxt_epoll_close(nxt_event_engine_t * engine,nxt_fd_event_t * ev)401 nxt_epoll_close(nxt_event_engine_t *engine, nxt_fd_event_t *ev)
402 {
403     nxt_epoll_delete(engine, ev);
404 
405     return ev->changing;
406 }
407 
408 
409 static void
nxt_epoll_enable_read(nxt_event_engine_t * engine,nxt_fd_event_t * ev)410 nxt_epoll_enable_read(nxt_event_engine_t *engine, nxt_fd_event_t *ev)
411 {
412     int       op;
413     uint32_t  events;
414 
415     if (ev->read != NXT_EVENT_BLOCKED) {
416 
417         op = EPOLL_CTL_MOD;
418         events = EPOLLIN | engine->u.epoll.mode;
419 
420         if (ev->read == NXT_EVENT_INACTIVE && ev->write == NXT_EVENT_INACTIVE) {
421             op = EPOLL_CTL_ADD;
422 
423         } else if (ev->write >= NXT_EVENT_BLOCKED) {
424             events |= EPOLLOUT;
425         }
426 
427         nxt_epoll_change(engine, ev, op, events);
428     }
429 
430     ev->read = NXT_EVENT_ACTIVE;
431 }
432 
433 
434 static void
nxt_epoll_enable_write(nxt_event_engine_t * engine,nxt_fd_event_t * ev)435 nxt_epoll_enable_write(nxt_event_engine_t *engine, nxt_fd_event_t *ev)
436 {
437     int       op;
438     uint32_t  events;
439 
440     if (ev->write != NXT_EVENT_BLOCKED) {
441 
442         op = EPOLL_CTL_MOD;
443         events = EPOLLOUT | engine->u.epoll.mode;
444 
445         if (ev->read == NXT_EVENT_INACTIVE && ev->write == NXT_EVENT_INACTIVE) {
446             op = EPOLL_CTL_ADD;
447 
448         } else if (ev->read >= NXT_EVENT_BLOCKED) {
449             events |= EPOLLIN;
450         }
451 
452         nxt_epoll_change(engine, ev, op, events);
453     }
454 
455     ev->write = NXT_EVENT_ACTIVE;
456 }
457 
458 
459 static void
nxt_epoll_disable_read(nxt_event_engine_t * engine,nxt_fd_event_t * ev)460 nxt_epoll_disable_read(nxt_event_engine_t *engine, nxt_fd_event_t *ev)
461 {
462     int       op;
463     uint32_t  events;
464 
465     ev->read = NXT_EVENT_INACTIVE;
466 
467     if (ev->write <= NXT_EVENT_DISABLED) {
468         ev->write = NXT_EVENT_INACTIVE;
469         op = EPOLL_CTL_DEL;
470         events = 0;
471 
472     } else {
473         op = EPOLL_CTL_MOD;
474         events = EPOLLOUT | engine->u.epoll.mode;
475     }
476 
477     nxt_epoll_change(engine, ev, op, events);
478 }
479 
480 
481 static void
nxt_epoll_disable_write(nxt_event_engine_t * engine,nxt_fd_event_t * ev)482 nxt_epoll_disable_write(nxt_event_engine_t *engine, nxt_fd_event_t *ev)
483 {
484     int       op;
485     uint32_t  events;
486 
487     ev->write = NXT_EVENT_INACTIVE;
488 
489     if (ev->read <= NXT_EVENT_DISABLED) {
490         ev->read = NXT_EVENT_INACTIVE;
491         op = EPOLL_CTL_DEL;
492         events = 0;
493 
494     } else {
495         op = EPOLL_CTL_MOD;
496         events = EPOLLIN | engine->u.epoll.mode;
497     }
498 
499     nxt_epoll_change(engine, ev, op, events);
500 }
501 
502 
503 static void
nxt_epoll_block_read(nxt_event_engine_t * engine,nxt_fd_event_t * ev)504 nxt_epoll_block_read(nxt_event_engine_t *engine, nxt_fd_event_t *ev)
505 {
506     if (ev->read != NXT_EVENT_INACTIVE) {
507         ev->read = NXT_EVENT_BLOCKED;
508     }
509 }
510 
511 
512 static void
nxt_epoll_block_write(nxt_event_engine_t * engine,nxt_fd_event_t * ev)513 nxt_epoll_block_write(nxt_event_engine_t *engine, nxt_fd_event_t *ev)
514 {
515     if (ev->write != NXT_EVENT_INACTIVE) {
516         ev->write = NXT_EVENT_BLOCKED;
517     }
518 }
519 
520 
521 /*
522  * NXT_EVENT_DISABLED state is used to track whether EPOLLONESHOT
523  * event should be added or modified, epoll_ctl(2):
524  *
525  * EPOLLONESHOT (since Linux 2.6.2)
526  *     Sets the one-shot behavior for the associated file descriptor.
527  *     This means that after an event is pulled out with epoll_wait(2)
528  *     the associated file descriptor is internally disabled and no
529  *     other events will be reported by the epoll interface.  The user
530  *     must call epoll_ctl() with EPOLL_CTL_MOD to rearm the file
531  *     descriptor with a new event mask.
532  */
533 
534 static void
nxt_epoll_oneshot_read(nxt_event_engine_t * engine,nxt_fd_event_t * ev)535 nxt_epoll_oneshot_read(nxt_event_engine_t *engine, nxt_fd_event_t *ev)
536 {
537     int  op;
538 
539     op = (ev->read == NXT_EVENT_INACTIVE && ev->write == NXT_EVENT_INACTIVE) ?
540              EPOLL_CTL_ADD : EPOLL_CTL_MOD;
541 
542     ev->read = NXT_EVENT_ONESHOT;
543     ev->write = NXT_EVENT_INACTIVE;
544 
545     nxt_epoll_change(engine, ev, op, EPOLLIN | EPOLLONESHOT);
546 }
547 
548 
549 static void
nxt_epoll_oneshot_write(nxt_event_engine_t * engine,nxt_fd_event_t * ev)550 nxt_epoll_oneshot_write(nxt_event_engine_t *engine, nxt_fd_event_t *ev)
551 {
552     int  op;
553 
554     op = (ev->read == NXT_EVENT_INACTIVE && ev->write == NXT_EVENT_INACTIVE) ?
555              EPOLL_CTL_ADD : EPOLL_CTL_MOD;
556 
557     ev->read = NXT_EVENT_INACTIVE;
558     ev->write = NXT_EVENT_ONESHOT;
559 
560     nxt_epoll_change(engine, ev, op, EPOLLOUT | EPOLLONESHOT);
561 }
562 
563 
564 static void
nxt_epoll_enable_accept(nxt_event_engine_t * engine,nxt_fd_event_t * ev)565 nxt_epoll_enable_accept(nxt_event_engine_t *engine, nxt_fd_event_t *ev)
566 {
567     uint32_t  events;
568 
569     ev->read = NXT_EVENT_ACTIVE;
570 
571     events = EPOLLIN;
572 
573 #ifdef EPOLLEXCLUSIVE
574     events |= EPOLLEXCLUSIVE;
575 #endif
576 
577     nxt_epoll_change(engine, ev, EPOLL_CTL_ADD, events);
578 }
579 
580 
581 /*
582  * epoll changes are batched to improve instruction and data cache
583  * locality of several epoll_ctl() calls followed by epoll_wait() call.
584  */
585 
586 static void
nxt_epoll_change(nxt_event_engine_t * engine,nxt_fd_event_t * ev,int op,uint32_t events)587 nxt_epoll_change(nxt_event_engine_t *engine, nxt_fd_event_t *ev, int op,
588     uint32_t events)
589 {
590     nxt_epoll_change_t  *change;
591 
592     nxt_debug(ev->task, "epoll %d set event: fd:%d op:%d ev:%XD",
593               engine->u.epoll.fd, ev->fd, op, events);
594 
595     if (engine->u.epoll.nchanges >= engine->u.epoll.mchanges) {
596         nxt_epoll_commit_changes(engine);
597     }
598 
599     ev->changing = 1;
600 
601     change = &engine->u.epoll.changes[engine->u.epoll.nchanges++];
602     change->op = op;
603     change->event.events = events;
604     change->event.data.ptr = ev;
605 }
606 
607 
608 static void
nxt_epoll_commit_changes(nxt_event_engine_t * engine)609 nxt_epoll_commit_changes(nxt_event_engine_t *engine)
610 {
611     int                 ret;
612     nxt_fd_event_t      *ev;
613     nxt_epoll_change_t  *change, *end;
614 
615     nxt_debug(&engine->task, "epoll %d changes:%ui",
616               engine->u.epoll.fd, engine->u.epoll.nchanges);
617 
618     change = engine->u.epoll.changes;
619     end = change + engine->u.epoll.nchanges;
620 
621     do {
622         ev = change->event.data.ptr;
623         ev->changing = 0;
624 
625         nxt_debug(ev->task, "epoll_ctl(%d): fd:%d op:%d ev:%XD",
626                   engine->u.epoll.fd, ev->fd, change->op,
627                   change->event.events);
628 
629         ret = epoll_ctl(engine->u.epoll.fd, change->op, ev->fd, &change->event);
630 
631         if (nxt_slow_path(ret != 0)) {
632             nxt_alert(ev->task, "epoll_ctl(%d, %d, %d) failed %E",
633                       engine->u.epoll.fd, change->op, ev->fd, nxt_errno);
634 
635             nxt_work_queue_add(&engine->fast_work_queue,
636                                nxt_epoll_error_handler, ev->task, ev, ev->data);
637 
638             engine->u.epoll.error = 1;
639         }
640 
641         change++;
642 
643     } while (change < end);
644 
645     engine->u.epoll.nchanges = 0;
646 }
647 
648 
649 static void
nxt_epoll_error_handler(nxt_task_t * task,void * obj,void * data)650 nxt_epoll_error_handler(nxt_task_t *task, void *obj, void *data)
651 {
652     nxt_fd_event_t  *ev;
653 
654     ev = obj;
655 
656     ev->read = NXT_EVENT_INACTIVE;
657     ev->write = NXT_EVENT_INACTIVE;
658 
659     ev->error_handler(ev->task, ev, data);
660 }
661 
662 
663 #if (NXT_HAVE_SIGNALFD)
664 
665 static nxt_int_t
nxt_epoll_add_signal(nxt_event_engine_t * engine)666 nxt_epoll_add_signal(nxt_event_engine_t *engine)
667 {
668     int                 fd;
669     struct epoll_event  ee;
670 
671     if (sigprocmask(SIG_BLOCK, &engine->signals->sigmask, NULL) != 0) {
672         nxt_alert(&engine->task, "sigprocmask(SIG_BLOCK) failed %E", nxt_errno);
673         return NXT_ERROR;
674     }
675 
676     /*
677      * Glibc signalfd() wrapper always has the flags argument.  Glibc 2.7
678      * and 2.8 signalfd() wrappers call the original signalfd() syscall
679      * without the flags argument.  Glibc 2.9+ signalfd() wrapper at first
680      * tries to call signalfd4() syscall and if it fails then calls the
681      * original signalfd() syscall.  For this reason the non-blocking mode
682      * is set separately.
683      */
684 
685     fd = signalfd(-1, &engine->signals->sigmask, 0);
686 
687     if (fd == -1) {
688         nxt_alert(&engine->task, "signalfd(%d) failed %E",
689                   engine->u.epoll.signalfd.fd, nxt_errno);
690         return NXT_ERROR;
691     }
692 
693     engine->u.epoll.signalfd.fd = fd;
694 
695     if (nxt_fd_nonblocking(&engine->task, fd) != NXT_OK) {
696         return NXT_ERROR;
697     }
698 
699     nxt_debug(&engine->task, "signalfd(): %d", fd);
700 
701     engine->u.epoll.signalfd.data = engine->signals->handler;
702     engine->u.epoll.signalfd.read_work_queue = &engine->fast_work_queue;
703     engine->u.epoll.signalfd.read_handler = nxt_epoll_signalfd_handler;
704     engine->u.epoll.signalfd.log = engine->task.log;
705     engine->u.epoll.signalfd.task = &engine->task;
706 
707     ee.events = EPOLLIN;
708     ee.data.ptr = &engine->u.epoll.signalfd;
709 
710     if (epoll_ctl(engine->u.epoll.fd, EPOLL_CTL_ADD, fd, &ee) != 0) {
711         nxt_alert(&engine->task, "epoll_ctl(%d, %d, %d) failed %E",
712                   engine->u.epoll.fd, EPOLL_CTL_ADD, fd, nxt_errno);
713 
714         return NXT_ERROR;
715     }
716 
717     return NXT_OK;
718 }
719 
720 
721 static void
nxt_epoll_signalfd_handler(nxt_task_t * task,void * obj,void * data)722 nxt_epoll_signalfd_handler(nxt_task_t *task, void *obj, void *data)
723 {
724     int                      n;
725     nxt_fd_event_t           *ev;
726     nxt_work_handler_t       handler;
727     struct signalfd_siginfo  sfd;
728 
729     ev = obj;
730     handler = data;
731 
732     nxt_debug(task, "signalfd handler");
733 
734     n = read(ev->fd, &sfd, sizeof(struct signalfd_siginfo));
735 
736     nxt_debug(task, "read signalfd(%d): %d", ev->fd, n);
737 
738     if (n != sizeof(struct signalfd_siginfo)) {
739         nxt_alert(task, "read signalfd(%d) failed %E", ev->fd, nxt_errno);
740         return;
741     }
742 
743     nxt_debug(task, "signalfd(%d) signo:%d", ev->fd, sfd.ssi_signo);
744 
745     handler(task, (void *) (uintptr_t) sfd.ssi_signo, NULL);
746 }
747 
748 #endif
749 
750 
751 #if (NXT_HAVE_EVENTFD)
752 
753 static nxt_int_t
nxt_epoll_enable_post(nxt_event_engine_t * engine,nxt_work_handler_t handler)754 nxt_epoll_enable_post(nxt_event_engine_t *engine, nxt_work_handler_t handler)
755 {
756     int                 ret;
757     struct epoll_event  ee;
758 
759     engine->u.epoll.post_handler = handler;
760 
761     /*
762      * Glibc eventfd() wrapper always has the flags argument.  Glibc 2.7
763      * and 2.8 eventfd() wrappers call the original eventfd() syscall
764      * without the flags argument.  Glibc 2.9+ eventfd() wrapper at first
765      * tries to call eventfd2() syscall and if it fails then calls the
766      * original eventfd() syscall.  For this reason the non-blocking mode
767      * is set separately.
768      */
769 
770     engine->u.epoll.eventfd.fd = eventfd(0, 0);
771 
772     if (engine->u.epoll.eventfd.fd == -1) {
773         nxt_alert(&engine->task, "eventfd() failed %E", nxt_errno);
774         return NXT_ERROR;
775     }
776 
777     ret = nxt_fd_nonblocking(&engine->task, engine->u.epoll.eventfd.fd);
778     if (nxt_slow_path(ret != NXT_OK)) {
779         return NXT_ERROR;
780     }
781 
782     nxt_debug(&engine->task, "eventfd(): %d", engine->u.epoll.eventfd.fd);
783 
784     engine->u.epoll.eventfd.read_work_queue = &engine->fast_work_queue;
785     engine->u.epoll.eventfd.read_handler = nxt_epoll_eventfd_handler;
786     engine->u.epoll.eventfd.data = engine;
787     engine->u.epoll.eventfd.log = engine->task.log;
788     engine->u.epoll.eventfd.task = &engine->task;
789 
790     ee.events = EPOLLIN | EPOLLET;
791     ee.data.ptr = &engine->u.epoll.eventfd;
792 
793     ret = epoll_ctl(engine->u.epoll.fd, EPOLL_CTL_ADD,
794                     engine->u.epoll.eventfd.fd, &ee);
795 
796     if (nxt_fast_path(ret == 0)) {
797         return NXT_OK;
798     }
799 
800     nxt_alert(&engine->task, "epoll_ctl(%d, %d, %d) failed %E",
801               engine->u.epoll.fd, EPOLL_CTL_ADD, engine->u.epoll.eventfd.fd,
802               nxt_errno);
803 
804     return NXT_ERROR;
805 }
806 
807 
808 static void
nxt_epoll_eventfd_handler(nxt_task_t * task,void * obj,void * data)809 nxt_epoll_eventfd_handler(nxt_task_t *task, void *obj, void *data)
810 {
811     int                 n;
812     uint64_t            events;
813     nxt_event_engine_t  *engine;
814 
815     engine = data;
816 
817     nxt_debug(task, "eventfd handler, times:%ui", engine->u.epoll.neventfd);
818 
819     /*
820      * The maximum value after write() to a eventfd() descriptor will
821      * block or return EAGAIN is 0xFFFFFFFFFFFFFFFE, so the descriptor
822      * can be read once per many notifications, for example, once per
823      * 2^32-2 noticifcations.  Since the eventfd() file descriptor is
824      * always registered in EPOLLET mode, epoll returns event about
825      * only the latest write() to the descriptor.
826      */
827 
828     if (engine->u.epoll.neventfd++ >= 0xFFFFFFFE) {
829         engine->u.epoll.neventfd = 0;
830 
831         n = read(engine->u.epoll.eventfd.fd, &events, sizeof(uint64_t));
832 
833         nxt_debug(task, "read(%d): %d events:%uL",
834                   engine->u.epoll.eventfd.fd, n, events);
835 
836         if (n != sizeof(uint64_t)) {
837             nxt_alert(task, "read eventfd(%d) failed %E",
838                       engine->u.epoll.eventfd.fd, nxt_errno);
839         }
840     }
841 
842     engine->u.epoll.post_handler(task, NULL, NULL);
843 }
844 
845 
846 static void
nxt_epoll_signal(nxt_event_engine_t * engine,nxt_uint_t signo)847 nxt_epoll_signal(nxt_event_engine_t *engine, nxt_uint_t signo)
848 {
849     size_t    ret;
850     uint64_t  event;
851 
852     /*
853      * eventfd() presents along with signalfd(), so the function
854      * is used only to post events and the signo argument is ignored.
855      */
856 
857     event = 1;
858 
859     ret = write(engine->u.epoll.eventfd.fd, &event, sizeof(uint64_t));
860 
861     if (nxt_slow_path(ret != sizeof(uint64_t))) {
862         nxt_alert(&engine->task, "write(%d) to eventfd failed %E",
863                   engine->u.epoll.eventfd.fd, nxt_errno);
864     }
865 }
866 
867 #endif
868 
869 
870 static void
nxt_epoll_poll(nxt_event_engine_t * engine,nxt_msec_t timeout)871 nxt_epoll_poll(nxt_event_engine_t *engine, nxt_msec_t timeout)
872 {
873     int                 nevents;
874     uint32_t            events;
875     nxt_int_t           i;
876     nxt_err_t           err;
877     nxt_bool_t          error;
878     nxt_uint_t          level;
879     nxt_fd_event_t      *ev;
880     struct epoll_event  *event;
881 
882     if (engine->u.epoll.nchanges != 0) {
883         nxt_epoll_commit_changes(engine);
884     }
885 
886     if (engine->u.epoll.error) {
887         engine->u.epoll.error = 0;
888         /* Error handlers have been enqueued on failure. */
889         timeout = 0;
890     }
891 
892     nxt_debug(&engine->task, "epoll_wait(%d) timeout:%M",
893               engine->u.epoll.fd, timeout);
894 
895     nevents = epoll_wait(engine->u.epoll.fd, engine->u.epoll.events,
896                          engine->u.epoll.mevents, timeout);
897 
898     err = (nevents == -1) ? nxt_errno : 0;
899 
900     nxt_thread_time_update(engine->task.thread);
901 
902     nxt_debug(&engine->task, "epoll_wait(%d): %d", engine->u.epoll.fd, nevents);
903 
904     if (nevents == -1) {
905         level = (err == NXT_EINTR) ? NXT_LOG_INFO : NXT_LOG_ALERT;
906 
907         nxt_log(&engine->task, level, "epoll_wait(%d) failed %E",
908                 engine->u.epoll.fd, err);
909 
910         return;
911     }
912 
913     for (i = 0; i < nevents; i++) {
914 
915         event = &engine->u.epoll.events[i];
916         events = event->events;
917         ev = event->data.ptr;
918 
919         nxt_debug(ev->task, "epoll: fd:%d ev:%04XD d:%p rd:%d wr:%d",
920                   ev->fd, events, ev, ev->read, ev->write);
921 
922         /*
923          * On error epoll may set EPOLLERR and EPOLLHUP only without EPOLLIN
924          * or EPOLLOUT, so the "error" variable enqueues only error handler.
925          */
926         error = ((events & (EPOLLERR | EPOLLHUP)) != 0);
927         ev->epoll_error = error;
928 
929         if (error
930             && ev->read <= NXT_EVENT_BLOCKED
931             && ev->write <= NXT_EVENT_BLOCKED)
932         {
933             error = 0;
934         }
935 
936 #if (NXT_HAVE_EPOLL_EDGE)
937 
938         ev->epoll_eof = ((events & EPOLLRDHUP) != 0);
939 
940 #endif
941 
942         if ((events & EPOLLIN) != 0) {
943             ev->read_ready = 1;
944 
945             if (ev->read != NXT_EVENT_BLOCKED) {
946 
947                 if (ev->read == NXT_EVENT_ONESHOT) {
948                     ev->read = NXT_EVENT_DISABLED;
949                 }
950 
951                 nxt_work_queue_add(ev->read_work_queue, ev->read_handler,
952                                    ev->task, ev, ev->data);
953 
954                 error = 0;
955 
956             } else if (engine->u.epoll.mode == 0) {
957                 /* Level-triggered mode. */
958                 nxt_epoll_disable_read(engine, ev);
959             }
960         }
961 
962         if ((events & EPOLLOUT) != 0) {
963             ev->write_ready = 1;
964 
965             if (ev->write != NXT_EVENT_BLOCKED) {
966 
967                 if (ev->write == NXT_EVENT_ONESHOT) {
968                     ev->write = NXT_EVENT_DISABLED;
969                 }
970 
971                 nxt_work_queue_add(ev->write_work_queue, ev->write_handler,
972                                    ev->task, ev, ev->data);
973 
974                 error = 0;
975 
976             } else if (engine->u.epoll.mode == 0) {
977                 /* Level-triggered mode. */
978                 nxt_epoll_disable_write(engine, ev);
979             }
980         }
981 
982         if (!error) {
983             continue;
984         }
985 
986         ev->read_ready = 1;
987         ev->write_ready = 1;
988 
989         if (ev->read == NXT_EVENT_BLOCKED && ev->write == NXT_EVENT_BLOCKED) {
990 
991             if (engine->u.epoll.mode == 0) {
992                 /* Level-triggered mode. */
993                 nxt_epoll_disable(engine, ev);
994             }
995 
996             continue;
997         }
998 
999         nxt_work_queue_add(&engine->fast_work_queue, nxt_epoll_error_handler,
1000                            ev->task, ev, ev->data);
1001     }
1002 }
1003 
1004 
1005 #if (NXT_HAVE_ACCEPT4)
1006 
1007 static void
nxt_epoll_conn_io_accept4(nxt_task_t * task,void * obj,void * data)1008 nxt_epoll_conn_io_accept4(nxt_task_t *task, void *obj, void *data)
1009 {
1010     socklen_t           socklen;
1011     nxt_conn_t          *c;
1012     nxt_socket_t        s;
1013     struct sockaddr     *sa;
1014     nxt_listen_event_t  *lev;
1015 
1016     lev = obj;
1017     c = lev->next;
1018 
1019     lev->ready--;
1020     lev->socket.read_ready = (lev->ready != 0);
1021 
1022     sa = &c->remote->u.sockaddr;
1023     socklen = c->remote->socklen;
1024     /*
1025      * The returned socklen is ignored here,
1026      * see comment in nxt_conn_io_accept().
1027      */
1028     s = accept4(lev->socket.fd, sa, &socklen, SOCK_NONBLOCK);
1029 
1030     if (s != -1) {
1031         c->socket.fd = s;
1032 
1033         nxt_debug(task, "accept4(%d): %d", lev->socket.fd, s);
1034 
1035         nxt_conn_accept(task, lev, c);
1036         return;
1037     }
1038 
1039     nxt_conn_accept_error(task, lev, "accept4", nxt_errno);
1040 }
1041 
1042 #endif
1043 
1044 
1045 #if (NXT_HAVE_EPOLL_EDGE)
1046 
1047 /*
1048  * nxt_epoll_edge_event_conn_io_connect() eliminates the getsockopt()
1049  * syscall to test pending connect() error.  Although this special
1050  * interface can work in both edge-triggered and level-triggered
1051  * modes it is enabled only for the former mode because this mode is
1052  * available in all modern Linux distributions.  For the latter mode
1053  * it is required to create additional nxt_epoll_level_event_conn_io
1054  * with single non-generic connect() interface.
1055  */
1056 
1057 static void
nxt_epoll_edge_conn_io_connect(nxt_task_t * task,void * obj,void * data)1058 nxt_epoll_edge_conn_io_connect(nxt_task_t *task, void *obj, void *data)
1059 {
1060     nxt_conn_t                    *c;
1061     nxt_event_engine_t            *engine;
1062     nxt_work_handler_t            handler;
1063     const nxt_event_conn_state_t  *state;
1064 
1065     c = obj;
1066 
1067     state = c->write_state;
1068 
1069     switch (nxt_socket_connect(task, c->socket.fd, c->remote)) {
1070 
1071     case NXT_OK:
1072         c->socket.write_ready = 1;
1073         handler = state->ready_handler;
1074         break;
1075 
1076     case NXT_AGAIN:
1077         c->socket.write_handler = nxt_epoll_edge_conn_connected;
1078         c->socket.error_handler = nxt_conn_connect_error;
1079 
1080         engine = task->thread->engine;
1081         nxt_conn_timer(engine, c, state, &c->write_timer);
1082 
1083         nxt_epoll_enable(engine, &c->socket);
1084         c->socket.read = NXT_EVENT_BLOCKED;
1085         return;
1086 
1087 #if 0
1088     case NXT_AGAIN:
1089         nxt_conn_timer(engine, c, state, &c->write_timer);
1090 
1091         /* Fall through. */
1092 
1093     case NXT_OK:
1094         /*
1095          * Mark both read and write directions as ready and try to perform
1096          * I/O operations before receiving readiness notifications.
1097          * On unconnected socket Linux send() and recv() return EAGAIN
1098          * instead of ENOTCONN.
1099          */
1100         c->socket.read_ready = 1;
1101         c->socket.write_ready = 1;
1102         /*
1103          * Enabling both read and write notifications on a getting
1104          * connected socket eliminates one epoll_ctl() syscall.
1105          */
1106         c->socket.write_handler = nxt_epoll_edge_event_conn_connected;
1107         c->socket.error_handler = state->error_handler;
1108 
1109         nxt_epoll_enable(engine, &c->socket);
1110         c->socket.read = NXT_EVENT_BLOCKED;
1111 
1112         handler = state->ready_handler;
1113         break;
1114 #endif
1115 
1116     case NXT_ERROR:
1117         handler = state->error_handler;
1118         break;
1119 
1120     default:  /* NXT_DECLINED: connection refused. */
1121         handler = state->close_handler;
1122         break;
1123     }
1124 
1125     nxt_work_queue_add(c->write_work_queue, handler, task, c, data);
1126 }
1127 
1128 
1129 static void
nxt_epoll_edge_conn_connected(nxt_task_t * task,void * obj,void * data)1130 nxt_epoll_edge_conn_connected(nxt_task_t *task, void *obj, void *data)
1131 {
1132     nxt_conn_t  *c;
1133 
1134     c = obj;
1135 
1136     nxt_debug(task, "epoll event conn connected fd:%d", c->socket.fd);
1137 
1138     if (!c->socket.epoll_error) {
1139         c->socket.write = NXT_EVENT_BLOCKED;
1140 
1141         if (c->write_state->timer_autoreset) {
1142             nxt_timer_disable(task->thread->engine, &c->write_timer);
1143         }
1144 
1145         nxt_work_queue_add(c->write_work_queue, c->write_state->ready_handler,
1146                            task, c, data);
1147         return;
1148     }
1149 
1150     nxt_conn_connect_test(task, c, data);
1151 }
1152 
1153 
1154 /*
1155  * nxt_epoll_edge_conn_io_recvbuf() is just wrapper around
1156  * standard nxt_conn_io_recvbuf() to enforce to read a pending EOF
1157  * in edge-triggered mode.
1158  */
1159 
1160 static ssize_t
nxt_epoll_edge_conn_io_recvbuf(nxt_conn_t * c,nxt_buf_t * b)1161 nxt_epoll_edge_conn_io_recvbuf(nxt_conn_t *c, nxt_buf_t *b)
1162 {
1163     ssize_t  n;
1164 
1165     n = nxt_conn_io_recvbuf(c, b);
1166 
1167     if (n > 0 && c->socket.epoll_eof) {
1168         c->socket.read_ready = 1;
1169     }
1170 
1171     return n;
1172 }
1173 
1174 #endif
1175