nxt_epoll_engine.c (564:762f8c976ead) nxt_epoll_engine.c (611:323e11065f83)
1
2/*
3 * Copyright (C) Igor Sysoev
4 * Copyright (C) NGINX, Inc.
5 */
6
7#include <nxt_main.h>
8
9
10/*
11 * The first epoll version has been introduced in Linux 2.5.44. The
12 * interface was changed several times since then and the final version
13 * of epoll_create(), epoll_ctl(), epoll_wait(), and EPOLLET mode has
14 * been introduced in Linux 2.6.0 and is supported since glibc 2.3.2.
15 *
16 * EPOLLET mode did not work reliable in early implementaions and in
17 * Linux 2.4 backport.
18 *
19 * EPOLLONESHOT Linux 2.6.2, glibc 2.3.
20 * EPOLLRDHUP Linux 2.6.17, glibc 2.8.
21 * epoll_pwait() Linux 2.6.19, glibc 2.6.
22 * signalfd() Linux 2.6.22, glibc 2.7.
23 * eventfd() Linux 2.6.22, glibc 2.7.
24 * timerfd_create() Linux 2.6.25, glibc 2.8.
25 * epoll_create1() Linux 2.6.27, glibc 2.9.
26 * signalfd4() Linux 2.6.27, glibc 2.9.
27 * eventfd2() Linux 2.6.27, glibc 2.9.
28 * accept4() Linux 2.6.28, glibc 2.10.
29 * eventfd2(EFD_SEMAPHORE) Linux 2.6.30, glibc 2.10.
30 * EPOLLEXCLUSIVE Linux 4.5, glibc 2.24.
31 */
32
33
34#if (NXT_HAVE_EPOLL_EDGE)
35static nxt_int_t nxt_epoll_edge_create(nxt_event_engine_t *engine,
36 nxt_uint_t mchanges, nxt_uint_t mevents);
37#endif
38static nxt_int_t nxt_epoll_level_create(nxt_event_engine_t *engine,
39 nxt_uint_t mchanges, nxt_uint_t mevents);
40static nxt_int_t nxt_epoll_create(nxt_event_engine_t *engine,
41 nxt_uint_t mchanges, nxt_uint_t mevents, nxt_conn_io_t *io, uint32_t mode);
42static void nxt_epoll_test_accept4(nxt_event_engine_t *engine,
43 nxt_conn_io_t *io);
44static void nxt_epoll_free(nxt_event_engine_t *engine);
45static void nxt_epoll_enable(nxt_event_engine_t *engine, nxt_fd_event_t *ev);
46static void nxt_epoll_disable(nxt_event_engine_t *engine, nxt_fd_event_t *ev);
47static void nxt_epoll_delete(nxt_event_engine_t *engine, nxt_fd_event_t *ev);
48static nxt_bool_t nxt_epoll_close(nxt_event_engine_t *engine,
49 nxt_fd_event_t *ev);
50static void nxt_epoll_enable_read(nxt_event_engine_t *engine,
51 nxt_fd_event_t *ev);
52static void nxt_epoll_enable_write(nxt_event_engine_t *engine,
53 nxt_fd_event_t *ev);
54static void nxt_epoll_disable_read(nxt_event_engine_t *engine,
55 nxt_fd_event_t *ev);
56static void nxt_epoll_disable_write(nxt_event_engine_t *engine,
57 nxt_fd_event_t *ev);
58static void nxt_epoll_block_read(nxt_event_engine_t *engine,
59 nxt_fd_event_t *ev);
60static void nxt_epoll_block_write(nxt_event_engine_t *engine,
61 nxt_fd_event_t *ev);
62static void nxt_epoll_oneshot_read(nxt_event_engine_t *engine,
63 nxt_fd_event_t *ev);
64static void nxt_epoll_oneshot_write(nxt_event_engine_t *engine,
65 nxt_fd_event_t *ev);
66static void nxt_epoll_enable_accept(nxt_event_engine_t *engine,
67 nxt_fd_event_t *ev);
68static void nxt_epoll_change(nxt_event_engine_t *engine, nxt_fd_event_t *ev,
69 int op, uint32_t events);
70static nxt_int_t nxt_epoll_commit_changes(nxt_event_engine_t *engine);
71static void nxt_epoll_error_handler(nxt_task_t *task, void *obj, void *data);
72#if (NXT_HAVE_SIGNALFD)
73static nxt_int_t nxt_epoll_add_signal(nxt_event_engine_t *engine);
74static void nxt_epoll_signalfd_handler(nxt_task_t *task, void *obj, void *data);
75#endif
76#if (NXT_HAVE_EVENTFD)
77static nxt_int_t nxt_epoll_enable_post(nxt_event_engine_t *engine,
78 nxt_work_handler_t handler);
79static void nxt_epoll_eventfd_handler(nxt_task_t *task, void *obj, void *data);
80static void nxt_epoll_signal(nxt_event_engine_t *engine, nxt_uint_t signo);
81#endif
82static void nxt_epoll_poll(nxt_event_engine_t *engine, nxt_msec_t timeout);
83
84#if (NXT_HAVE_ACCEPT4)
85static void nxt_epoll_conn_io_accept4(nxt_task_t *task, void *obj,
86 void *data);
87#endif
88
89
90#if (NXT_HAVE_EPOLL_EDGE)
91
92static void nxt_epoll_edge_conn_io_connect(nxt_task_t *task, void *obj,
93 void *data);
94static void nxt_epoll_edge_conn_connected(nxt_task_t *task, void *obj,
95 void *data);
96static ssize_t nxt_epoll_edge_conn_io_recvbuf(nxt_conn_t *c, nxt_buf_t *b);
97
98
99static nxt_conn_io_t nxt_epoll_edge_conn_io = {
100 nxt_epoll_edge_conn_io_connect,
101 nxt_conn_io_accept,
102
103 nxt_conn_io_read,
104 nxt_epoll_edge_conn_io_recvbuf,
105 nxt_conn_io_recv,
106
107 nxt_conn_io_write,
108 nxt_event_conn_io_write_chunk,
109
110#if (NXT_HAVE_LINUX_SENDFILE)
111 nxt_linux_event_conn_io_sendfile,
112#else
113 nxt_event_conn_io_sendbuf,
114#endif
115
116 nxt_event_conn_io_writev,
117 nxt_event_conn_io_send,
118
119 nxt_conn_io_shutdown,
120};
121
122
123const nxt_event_interface_t nxt_epoll_edge_engine = {
124 "epoll_edge",
125 nxt_epoll_edge_create,
126 nxt_epoll_free,
127 nxt_epoll_enable,
128 nxt_epoll_disable,
129 nxt_epoll_delete,
130 nxt_epoll_close,
131 nxt_epoll_enable_read,
132 nxt_epoll_enable_write,
133 nxt_epoll_disable_read,
134 nxt_epoll_disable_write,
135 nxt_epoll_block_read,
136 nxt_epoll_block_write,
137 nxt_epoll_oneshot_read,
138 nxt_epoll_oneshot_write,
139 nxt_epoll_enable_accept,
140 NULL,
141 NULL,
142#if (NXT_HAVE_EVENTFD)
143 nxt_epoll_enable_post,
144 nxt_epoll_signal,
145#else
146 NULL,
147 NULL,
148#endif
149 nxt_epoll_poll,
150
151 &nxt_epoll_edge_conn_io,
152
153#if (NXT_HAVE_INOTIFY)
154 NXT_FILE_EVENTS,
155#else
156 NXT_NO_FILE_EVENTS,
157#endif
158
159#if (NXT_HAVE_SIGNALFD)
160 NXT_SIGNAL_EVENTS,
161#else
162 NXT_NO_SIGNAL_EVENTS,
163#endif
164};
165
166#endif
167
168
169const nxt_event_interface_t nxt_epoll_level_engine = {
170 "epoll_level",
171 nxt_epoll_level_create,
172 nxt_epoll_free,
173 nxt_epoll_enable,
174 nxt_epoll_disable,
175 nxt_epoll_delete,
176 nxt_epoll_close,
177 nxt_epoll_enable_read,
178 nxt_epoll_enable_write,
179 nxt_epoll_disable_read,
180 nxt_epoll_disable_write,
181 nxt_epoll_block_read,
182 nxt_epoll_block_write,
183 nxt_epoll_oneshot_read,
184 nxt_epoll_oneshot_write,
185 nxt_epoll_enable_accept,
186 NULL,
187 NULL,
188#if (NXT_HAVE_EVENTFD)
189 nxt_epoll_enable_post,
190 nxt_epoll_signal,
191#else
192 NULL,
193 NULL,
194#endif
195 nxt_epoll_poll,
196
197 &nxt_unix_conn_io,
198
199#if (NXT_HAVE_INOTIFY)
200 NXT_FILE_EVENTS,
201#else
202 NXT_NO_FILE_EVENTS,
203#endif
204
205#if (NXT_HAVE_SIGNALFD)
206 NXT_SIGNAL_EVENTS,
207#else
208 NXT_NO_SIGNAL_EVENTS,
209#endif
210};
211
212
213#if (NXT_HAVE_EPOLL_EDGE)
214
215static nxt_int_t
216nxt_epoll_edge_create(nxt_event_engine_t *engine, nxt_uint_t mchanges,
217 nxt_uint_t mevents)
218{
219 return nxt_epoll_create(engine, mchanges, mevents, &nxt_epoll_edge_conn_io,
220 EPOLLET | EPOLLRDHUP);
221}
222
223#endif
224
225
226static nxt_int_t
227nxt_epoll_level_create(nxt_event_engine_t *engine, nxt_uint_t mchanges,
228 nxt_uint_t mevents)
229{
230 return nxt_epoll_create(engine, mchanges, mevents,
231 &nxt_unix_conn_io, 0);
232}
233
234
235static nxt_int_t
236nxt_epoll_create(nxt_event_engine_t *engine, nxt_uint_t mchanges,
237 nxt_uint_t mevents, nxt_conn_io_t *io, uint32_t mode)
238{
239 engine->u.epoll.fd = -1;
240 engine->u.epoll.mode = mode;
241 engine->u.epoll.mchanges = mchanges;
242 engine->u.epoll.mevents = mevents;
243#if (NXT_HAVE_SIGNALFD)
244 engine->u.epoll.signalfd.fd = -1;
245#endif
246
247 engine->u.epoll.changes = nxt_malloc(sizeof(nxt_epoll_change_t) * mchanges);
248 if (engine->u.epoll.changes == NULL) {
249 goto fail;
250 }
251
252 engine->u.epoll.events = nxt_malloc(sizeof(struct epoll_event) * mevents);
253 if (engine->u.epoll.events == NULL) {
254 goto fail;
255 }
256
257 engine->u.epoll.fd = epoll_create(1);
258 if (engine->u.epoll.fd == -1) {
259 nxt_alert(&engine->task, "epoll_create() failed %E", nxt_errno);
260 goto fail;
261 }
262
263 nxt_debug(&engine->task, "epoll_create(): %d", engine->u.epoll.fd);
264
265 if (engine->signals != NULL) {
266
267#if (NXT_HAVE_SIGNALFD)
268
269 if (nxt_epoll_add_signal(engine) != NXT_OK) {
270 goto fail;
271 }
272
273#endif
274
275 nxt_epoll_test_accept4(engine, io);
276 }
277
278 return NXT_OK;
279
280fail:
281
282 nxt_epoll_free(engine);
283
284 return NXT_ERROR;
285}
286
287
288static void
289nxt_epoll_test_accept4(nxt_event_engine_t *engine, nxt_conn_io_t *io)
290{
291 static nxt_work_handler_t handler;
292
293 if (handler == NULL) {
294
295 handler = io->accept;
296
297#if (NXT_HAVE_ACCEPT4)
298
299 (void) accept4(-1, NULL, NULL, SOCK_NONBLOCK);
300
301 if (nxt_errno != NXT_ENOSYS) {
302 handler = nxt_epoll_conn_io_accept4;
303
304 } else {
305 nxt_log(&engine->task, NXT_LOG_INFO, "accept4() failed %E",
306 NXT_ENOSYS);
307 }
308
309#endif
310 }
311
312 io->accept = handler;
313}
314
315
316static void
317nxt_epoll_free(nxt_event_engine_t *engine)
318{
319 int fd;
320
321 nxt_debug(&engine->task, "epoll %d free", engine->u.epoll.fd);
322
323#if (NXT_HAVE_SIGNALFD)
324
325 fd = engine->u.epoll.signalfd.fd;
326
327 if (fd != -1 && close(fd) != 0) {
328 nxt_alert(&engine->task, "signalfd close(%d) failed %E", fd, nxt_errno);
329 }
330
331#endif
332
333#if (NXT_HAVE_EVENTFD)
334
335 fd = engine->u.epoll.eventfd.fd;
336
337 if (fd != -1 && close(fd) != 0) {
338 nxt_alert(&engine->task, "eventfd close(%d) failed %E", fd, nxt_errno);
339 }
340
341#endif
342
343 fd = engine->u.epoll.fd;
344
345 if (fd != -1 && close(fd) != 0) {
346 nxt_alert(&engine->task, "epoll close(%d) failed %E", fd, nxt_errno);
347 }
348
349 nxt_free(engine->u.epoll.events);
350 nxt_free(engine->u.epoll.changes);
351
352 nxt_memzero(&engine->u.epoll, sizeof(nxt_epoll_engine_t));
353}
354
355
356static void
357nxt_epoll_enable(nxt_event_engine_t *engine, nxt_fd_event_t *ev)
358{
359 ev->read = NXT_EVENT_ACTIVE;
360 ev->write = NXT_EVENT_ACTIVE;
361
362 nxt_epoll_change(engine, ev, EPOLL_CTL_ADD,
363 EPOLLIN | EPOLLOUT | engine->u.epoll.mode);
364}
365
366
367static void
368nxt_epoll_disable(nxt_event_engine_t *engine, nxt_fd_event_t *ev)
369{
370 if (ev->read > NXT_EVENT_DISABLED || ev->write > NXT_EVENT_DISABLED) {
371
372 ev->read = NXT_EVENT_INACTIVE;
373 ev->write = NXT_EVENT_INACTIVE;
374
375 nxt_epoll_change(engine, ev, EPOLL_CTL_DEL, 0);
376 }
377}
378
379
380static void
381nxt_epoll_delete(nxt_event_engine_t *engine, nxt_fd_event_t *ev)
382{
383 if (ev->read != NXT_EVENT_INACTIVE || ev->write != NXT_EVENT_INACTIVE) {
384
385 ev->read = NXT_EVENT_INACTIVE;
386 ev->write = NXT_EVENT_INACTIVE;
387
388 nxt_epoll_change(engine, ev, EPOLL_CTL_DEL, 0);
389 }
390}
391
392
393/*
394 * Although calling close() on a file descriptor will remove any epoll
395 * events that reference the descriptor, in this case the close() acquires
396 * the kernel global "epmutex" while epoll_ctl(EPOLL_CTL_DEL) does not
397 * acquire the "epmutex" since Linux 3.13 if the file descriptor presents
398 * only in one epoll set. Thus removing events explicitly before closing
399 * eliminates possible lock contention.
400 */
401
402static nxt_bool_t
403nxt_epoll_close(nxt_event_engine_t *engine, nxt_fd_event_t *ev)
404{
405 nxt_epoll_delete(engine, ev);
406
407 return ev->changing;
408}
409
410
411static void
412nxt_epoll_enable_read(nxt_event_engine_t *engine, nxt_fd_event_t *ev)
413{
414 int op;
415 uint32_t events;
416
417 if (ev->read != NXT_EVENT_BLOCKED) {
418
419 op = EPOLL_CTL_MOD;
420 events = EPOLLIN | engine->u.epoll.mode;
421
422 if (ev->read == NXT_EVENT_INACTIVE && ev->write == NXT_EVENT_INACTIVE) {
423 op = EPOLL_CTL_ADD;
424
425 } else if (ev->write >= NXT_EVENT_BLOCKED) {
426 events |= EPOLLOUT;
427 }
428
429 nxt_epoll_change(engine, ev, op, events);
430 }
431
432 ev->read = NXT_EVENT_ACTIVE;
433}
434
435
436static void
437nxt_epoll_enable_write(nxt_event_engine_t *engine, nxt_fd_event_t *ev)
438{
439 int op;
440 uint32_t events;
441
442 if (ev->write != NXT_EVENT_BLOCKED) {
443
444 op = EPOLL_CTL_MOD;
445 events = EPOLLOUT | engine->u.epoll.mode;
446
447 if (ev->read == NXT_EVENT_INACTIVE && ev->write == NXT_EVENT_INACTIVE) {
448 op = EPOLL_CTL_ADD;
449
450 } else if (ev->read >= NXT_EVENT_BLOCKED) {
451 events |= EPOLLIN;
452 }
453
454 nxt_epoll_change(engine, ev, op, events);
455 }
456
457 ev->write = NXT_EVENT_ACTIVE;
458}
459
460
461static void
462nxt_epoll_disable_read(nxt_event_engine_t *engine, nxt_fd_event_t *ev)
463{
464 int op;
465 uint32_t events;
466
467 ev->read = NXT_EVENT_INACTIVE;
468
469 if (ev->write <= NXT_EVENT_DISABLED) {
470 ev->write = NXT_EVENT_INACTIVE;
471 op = EPOLL_CTL_DEL;
472 events = 0;
473
474 } else {
475 op = EPOLL_CTL_MOD;
476 events = EPOLLOUT | engine->u.epoll.mode;
477 }
478
479 nxt_epoll_change(engine, ev, op, events);
480}
481
482
483static void
484nxt_epoll_disable_write(nxt_event_engine_t *engine, nxt_fd_event_t *ev)
485{
486 int op;
487 uint32_t events;
488
489 ev->write = NXT_EVENT_INACTIVE;
490
491 if (ev->read <= NXT_EVENT_DISABLED) {
492 ev->write = NXT_EVENT_INACTIVE;
493 op = EPOLL_CTL_DEL;
494 events = 0;
495
496 } else {
497 op = EPOLL_CTL_MOD;
498 events = EPOLLIN | engine->u.epoll.mode;
499 }
500
501 nxt_epoll_change(engine, ev, op, events);
502}
503
504
505static void
506nxt_epoll_block_read(nxt_event_engine_t *engine, nxt_fd_event_t *ev)
507{
508 if (ev->read != NXT_EVENT_INACTIVE) {
509 ev->read = NXT_EVENT_BLOCKED;
510 }
511}
512
513
514static void
515nxt_epoll_block_write(nxt_event_engine_t *engine, nxt_fd_event_t *ev)
516{
517 if (ev->write != NXT_EVENT_INACTIVE) {
518 ev->write = NXT_EVENT_BLOCKED;
519 }
520}
521
522
523/*
524 * NXT_EVENT_DISABLED state is used to track whether EPOLLONESHOT
525 * event should be added or modified, epoll_ctl(2):
526 *
527 * EPOLLONESHOT (since Linux 2.6.2)
528 * Sets the one-shot behavior for the associated file descriptor.
529 * This means that after an event is pulled out with epoll_wait(2)
530 * the associated file descriptor is internally disabled and no
531 * other events will be reported by the epoll interface. The user
532 * must call epoll_ctl() with EPOLL_CTL_MOD to rearm the file
533 * descriptor with a new event mask.
534 */
535
536static void
537nxt_epoll_oneshot_read(nxt_event_engine_t *engine, nxt_fd_event_t *ev)
538{
539 int op;
540
541 op = (ev->read == NXT_EVENT_INACTIVE && ev->write == NXT_EVENT_INACTIVE) ?
542 EPOLL_CTL_ADD : EPOLL_CTL_MOD;
543
544 ev->read = NXT_EVENT_ONESHOT;
545 ev->write = NXT_EVENT_INACTIVE;
546
547 nxt_epoll_change(engine, ev, op, EPOLLIN | EPOLLONESHOT);
548}
549
550
551static void
552nxt_epoll_oneshot_write(nxt_event_engine_t *engine, nxt_fd_event_t *ev)
553{
554 int op;
555
556 op = (ev->read == NXT_EVENT_INACTIVE && ev->write == NXT_EVENT_INACTIVE) ?
557 EPOLL_CTL_ADD : EPOLL_CTL_MOD;
558
559 ev->read = NXT_EVENT_INACTIVE;
560 ev->write = NXT_EVENT_ONESHOT;
561
562 nxt_epoll_change(engine, ev, op, EPOLLOUT | EPOLLONESHOT);
563}
564
565
566static void
567nxt_epoll_enable_accept(nxt_event_engine_t *engine, nxt_fd_event_t *ev)
568{
569 uint32_t events;
570
571 ev->read = NXT_EVENT_ACTIVE;
572
573 events = EPOLLIN;
574
575#ifdef EPOLLEXCLUSIVE
576 events |= EPOLLEXCLUSIVE;
577#endif
578
579 nxt_epoll_change(engine, ev, EPOLL_CTL_ADD, events);
580}
581
582
583/*
584 * epoll changes are batched to improve instruction and data cache
585 * locality of several epoll_ctl() calls followed by epoll_wait() call.
586 */
587
588static void
589nxt_epoll_change(nxt_event_engine_t *engine, nxt_fd_event_t *ev, int op,
590 uint32_t events)
591{
592 nxt_epoll_change_t *change;
593
594 nxt_debug(ev->task, "epoll %d set event: fd:%d op:%d ev:%XD",
595 engine->u.epoll.fd, ev->fd, op, events);
596
597 if (engine->u.epoll.nchanges >= engine->u.epoll.mchanges) {
598 (void) nxt_epoll_commit_changes(engine);
599 }
600
601 ev->changing = 1;
602
603 change = &engine->u.epoll.changes[engine->u.epoll.nchanges++];
604 change->op = op;
605 change->event.events = events;
606 change->event.data.ptr = ev;
607}
608
609
610static nxt_int_t
611nxt_epoll_commit_changes(nxt_event_engine_t *engine)
612{
613 int ret;
614 nxt_int_t retval;
615 nxt_fd_event_t *ev;
616 nxt_epoll_change_t *change, *end;
617
618 nxt_debug(&engine->task, "epoll %d changes:%ui",
619 engine->u.epoll.fd, engine->u.epoll.nchanges);
620
621 retval = NXT_OK;
622 change = engine->u.epoll.changes;
623 end = change + engine->u.epoll.nchanges;
624
625 do {
626 ev = change->event.data.ptr;
627 ev->changing = 0;
628
629 nxt_debug(ev->task, "epoll_ctl(%d): fd:%d op:%d ev:%XD",
630 engine->u.epoll.fd, ev->fd, change->op,
631 change->event.events);
632
633 ret = epoll_ctl(engine->u.epoll.fd, change->op, ev->fd, &change->event);
634
635 if (nxt_slow_path(ret != 0)) {
636 nxt_alert(ev->task, "epoll_ctl(%d, %d, %d) failed %E",
637 engine->u.epoll.fd, change->op, ev->fd, nxt_errno);
638
639 nxt_work_queue_add(&engine->fast_work_queue,
640 nxt_epoll_error_handler, ev->task, ev, ev->data);
641
642 retval = NXT_ERROR;
643 }
644
645 change++;
646
647 } while (change < end);
648
649 engine->u.epoll.nchanges = 0;
650
651 return retval;
652}
653
654
655static void
656nxt_epoll_error_handler(nxt_task_t *task, void *obj, void *data)
657{
658 nxt_fd_event_t *ev;
659
660 ev = obj;
661
662 ev->read = NXT_EVENT_INACTIVE;
663 ev->write = NXT_EVENT_INACTIVE;
664
665 ev->error_handler(ev->task, ev, data);
666}
667
668
669#if (NXT_HAVE_SIGNALFD)
670
671static nxt_int_t
672nxt_epoll_add_signal(nxt_event_engine_t *engine)
673{
674 int fd;
675 struct epoll_event ee;
676
677 if (sigprocmask(SIG_BLOCK, &engine->signals->sigmask, NULL) != 0) {
678 nxt_alert(&engine->task, "sigprocmask(SIG_BLOCK) failed %E", nxt_errno);
679 return NXT_ERROR;
680 }
681
682 /*
683 * Glibc signalfd() wrapper always has the flags argument. Glibc 2.7
684 * and 2.8 signalfd() wrappers call the original signalfd() syscall
685 * without the flags argument. Glibc 2.9+ signalfd() wrapper at first
686 * tries to call signalfd4() syscall and if it fails then calls the
687 * original signalfd() syscall. For this reason the non-blocking mode
688 * is set separately.
689 */
690
691 fd = signalfd(-1, &engine->signals->sigmask, 0);
692
693 if (fd == -1) {
694 nxt_alert(&engine->task, "signalfd(%d) failed %E",
695 engine->u.epoll.signalfd.fd, nxt_errno);
696 return NXT_ERROR;
697 }
698
699 engine->u.epoll.signalfd.fd = fd;
700
701 if (nxt_fd_nonblocking(&engine->task, fd) != NXT_OK) {
702 return NXT_ERROR;
703 }
704
705 nxt_debug(&engine->task, "signalfd(): %d", fd);
706
707 engine->u.epoll.signalfd.data = engine->signals->handler;
708 engine->u.epoll.signalfd.read_work_queue = &engine->fast_work_queue;
709 engine->u.epoll.signalfd.read_handler = nxt_epoll_signalfd_handler;
710 engine->u.epoll.signalfd.log = engine->task.log;
711 engine->u.epoll.signalfd.task = &engine->task;
712
713 ee.events = EPOLLIN;
714 ee.data.ptr = &engine->u.epoll.signalfd;
715
716 if (epoll_ctl(engine->u.epoll.fd, EPOLL_CTL_ADD, fd, &ee) != 0) {
717 nxt_alert(&engine->task, "epoll_ctl(%d, %d, %d) failed %E",
718 engine->u.epoll.fd, EPOLL_CTL_ADD, fd, nxt_errno);
719
720 return NXT_ERROR;
721 }
722
723 return NXT_OK;
724}
725
726
727static void
728nxt_epoll_signalfd_handler(nxt_task_t *task, void *obj, void *data)
729{
730 int n;
731 nxt_fd_event_t *ev;
732 nxt_work_handler_t handler;
733 struct signalfd_siginfo sfd;
734
735 ev = obj;
736 handler = data;
737
738 nxt_debug(task, "signalfd handler");
739
740 n = read(ev->fd, &sfd, sizeof(struct signalfd_siginfo));
741
742 nxt_debug(task, "read signalfd(%d): %d", ev->fd, n);
743
744 if (n != sizeof(struct signalfd_siginfo)) {
745 nxt_alert(task, "read signalfd(%d) failed %E", ev->fd, nxt_errno);
746 return;
747 }
748
749 nxt_debug(task, "signalfd(%d) signo:%d", ev->fd, sfd.ssi_signo);
750
751 handler(task, (void *) (uintptr_t) sfd.ssi_signo, NULL);
752}
753
754#endif
755
756
757#if (NXT_HAVE_EVENTFD)
758
759static nxt_int_t
760nxt_epoll_enable_post(nxt_event_engine_t *engine, nxt_work_handler_t handler)
761{
762 int ret;
763 struct epoll_event ee;
764
765 engine->u.epoll.post_handler = handler;
766
767 /*
768 * Glibc eventfd() wrapper always has the flags argument. Glibc 2.7
769 * and 2.8 eventfd() wrappers call the original eventfd() syscall
770 * without the flags argument. Glibc 2.9+ eventfd() wrapper at first
771 * tries to call eventfd2() syscall and if it fails then calls the
772 * original eventfd() syscall. For this reason the non-blocking mode
773 * is set separately.
774 */
775
776 engine->u.epoll.eventfd.fd = eventfd(0, 0);
777
778 if (engine->u.epoll.eventfd.fd == -1) {
779 nxt_alert(&engine->task, "eventfd() failed %E", nxt_errno);
780 return NXT_ERROR;
781 }
782
783 ret = nxt_fd_nonblocking(&engine->task, engine->u.epoll.eventfd.fd);
784 if (nxt_slow_path(ret != NXT_OK)) {
785 return NXT_ERROR;
786 }
787
788 nxt_debug(&engine->task, "eventfd(): %d", engine->u.epoll.eventfd.fd);
789
790 engine->u.epoll.eventfd.read_work_queue = &engine->fast_work_queue;
791 engine->u.epoll.eventfd.read_handler = nxt_epoll_eventfd_handler;
792 engine->u.epoll.eventfd.data = engine;
793 engine->u.epoll.eventfd.log = engine->task.log;
794 engine->u.epoll.eventfd.task = &engine->task;
795
796 ee.events = EPOLLIN | EPOLLET;
797 ee.data.ptr = &engine->u.epoll.eventfd;
798
799 ret = epoll_ctl(engine->u.epoll.fd, EPOLL_CTL_ADD,
800 engine->u.epoll.eventfd.fd, &ee);
801
802 if (nxt_fast_path(ret == 0)) {
803 return NXT_OK;
804 }
805
806 nxt_alert(&engine->task, "epoll_ctl(%d, %d, %d) failed %E",
807 engine->u.epoll.fd, EPOLL_CTL_ADD, engine->u.epoll.eventfd.fd,
808 nxt_errno);
809
810 return NXT_ERROR;
811}
812
813
814static void
815nxt_epoll_eventfd_handler(nxt_task_t *task, void *obj, void *data)
816{
817 int n;
818 uint64_t events;
819 nxt_event_engine_t *engine;
820
821 engine = data;
822
823 nxt_debug(task, "eventfd handler, times:%ui", engine->u.epoll.neventfd);
824
825 /*
826 * The maximum value after write() to a eventfd() descriptor will
1
2/*
3 * Copyright (C) Igor Sysoev
4 * Copyright (C) NGINX, Inc.
5 */
6
7#include <nxt_main.h>
8
9
10/*
11 * The first epoll version has been introduced in Linux 2.5.44. The
12 * interface was changed several times since then and the final version
13 * of epoll_create(), epoll_ctl(), epoll_wait(), and EPOLLET mode has
14 * been introduced in Linux 2.6.0 and is supported since glibc 2.3.2.
15 *
16 * EPOLLET mode did not work reliable in early implementaions and in
17 * Linux 2.4 backport.
18 *
19 * EPOLLONESHOT Linux 2.6.2, glibc 2.3.
20 * EPOLLRDHUP Linux 2.6.17, glibc 2.8.
21 * epoll_pwait() Linux 2.6.19, glibc 2.6.
22 * signalfd() Linux 2.6.22, glibc 2.7.
23 * eventfd() Linux 2.6.22, glibc 2.7.
24 * timerfd_create() Linux 2.6.25, glibc 2.8.
25 * epoll_create1() Linux 2.6.27, glibc 2.9.
26 * signalfd4() Linux 2.6.27, glibc 2.9.
27 * eventfd2() Linux 2.6.27, glibc 2.9.
28 * accept4() Linux 2.6.28, glibc 2.10.
29 * eventfd2(EFD_SEMAPHORE) Linux 2.6.30, glibc 2.10.
30 * EPOLLEXCLUSIVE Linux 4.5, glibc 2.24.
31 */
32
33
34#if (NXT_HAVE_EPOLL_EDGE)
35static nxt_int_t nxt_epoll_edge_create(nxt_event_engine_t *engine,
36 nxt_uint_t mchanges, nxt_uint_t mevents);
37#endif
38static nxt_int_t nxt_epoll_level_create(nxt_event_engine_t *engine,
39 nxt_uint_t mchanges, nxt_uint_t mevents);
40static nxt_int_t nxt_epoll_create(nxt_event_engine_t *engine,
41 nxt_uint_t mchanges, nxt_uint_t mevents, nxt_conn_io_t *io, uint32_t mode);
42static void nxt_epoll_test_accept4(nxt_event_engine_t *engine,
43 nxt_conn_io_t *io);
44static void nxt_epoll_free(nxt_event_engine_t *engine);
45static void nxt_epoll_enable(nxt_event_engine_t *engine, nxt_fd_event_t *ev);
46static void nxt_epoll_disable(nxt_event_engine_t *engine, nxt_fd_event_t *ev);
47static void nxt_epoll_delete(nxt_event_engine_t *engine, nxt_fd_event_t *ev);
48static nxt_bool_t nxt_epoll_close(nxt_event_engine_t *engine,
49 nxt_fd_event_t *ev);
50static void nxt_epoll_enable_read(nxt_event_engine_t *engine,
51 nxt_fd_event_t *ev);
52static void nxt_epoll_enable_write(nxt_event_engine_t *engine,
53 nxt_fd_event_t *ev);
54static void nxt_epoll_disable_read(nxt_event_engine_t *engine,
55 nxt_fd_event_t *ev);
56static void nxt_epoll_disable_write(nxt_event_engine_t *engine,
57 nxt_fd_event_t *ev);
58static void nxt_epoll_block_read(nxt_event_engine_t *engine,
59 nxt_fd_event_t *ev);
60static void nxt_epoll_block_write(nxt_event_engine_t *engine,
61 nxt_fd_event_t *ev);
62static void nxt_epoll_oneshot_read(nxt_event_engine_t *engine,
63 nxt_fd_event_t *ev);
64static void nxt_epoll_oneshot_write(nxt_event_engine_t *engine,
65 nxt_fd_event_t *ev);
66static void nxt_epoll_enable_accept(nxt_event_engine_t *engine,
67 nxt_fd_event_t *ev);
68static void nxt_epoll_change(nxt_event_engine_t *engine, nxt_fd_event_t *ev,
69 int op, uint32_t events);
70static nxt_int_t nxt_epoll_commit_changes(nxt_event_engine_t *engine);
71static void nxt_epoll_error_handler(nxt_task_t *task, void *obj, void *data);
72#if (NXT_HAVE_SIGNALFD)
73static nxt_int_t nxt_epoll_add_signal(nxt_event_engine_t *engine);
74static void nxt_epoll_signalfd_handler(nxt_task_t *task, void *obj, void *data);
75#endif
76#if (NXT_HAVE_EVENTFD)
77static nxt_int_t nxt_epoll_enable_post(nxt_event_engine_t *engine,
78 nxt_work_handler_t handler);
79static void nxt_epoll_eventfd_handler(nxt_task_t *task, void *obj, void *data);
80static void nxt_epoll_signal(nxt_event_engine_t *engine, nxt_uint_t signo);
81#endif
82static void nxt_epoll_poll(nxt_event_engine_t *engine, nxt_msec_t timeout);
83
84#if (NXT_HAVE_ACCEPT4)
85static void nxt_epoll_conn_io_accept4(nxt_task_t *task, void *obj,
86 void *data);
87#endif
88
89
90#if (NXT_HAVE_EPOLL_EDGE)
91
92static void nxt_epoll_edge_conn_io_connect(nxt_task_t *task, void *obj,
93 void *data);
94static void nxt_epoll_edge_conn_connected(nxt_task_t *task, void *obj,
95 void *data);
96static ssize_t nxt_epoll_edge_conn_io_recvbuf(nxt_conn_t *c, nxt_buf_t *b);
97
98
99static nxt_conn_io_t nxt_epoll_edge_conn_io = {
100 nxt_epoll_edge_conn_io_connect,
101 nxt_conn_io_accept,
102
103 nxt_conn_io_read,
104 nxt_epoll_edge_conn_io_recvbuf,
105 nxt_conn_io_recv,
106
107 nxt_conn_io_write,
108 nxt_event_conn_io_write_chunk,
109
110#if (NXT_HAVE_LINUX_SENDFILE)
111 nxt_linux_event_conn_io_sendfile,
112#else
113 nxt_event_conn_io_sendbuf,
114#endif
115
116 nxt_event_conn_io_writev,
117 nxt_event_conn_io_send,
118
119 nxt_conn_io_shutdown,
120};
121
122
123const nxt_event_interface_t nxt_epoll_edge_engine = {
124 "epoll_edge",
125 nxt_epoll_edge_create,
126 nxt_epoll_free,
127 nxt_epoll_enable,
128 nxt_epoll_disable,
129 nxt_epoll_delete,
130 nxt_epoll_close,
131 nxt_epoll_enable_read,
132 nxt_epoll_enable_write,
133 nxt_epoll_disable_read,
134 nxt_epoll_disable_write,
135 nxt_epoll_block_read,
136 nxt_epoll_block_write,
137 nxt_epoll_oneshot_read,
138 nxt_epoll_oneshot_write,
139 nxt_epoll_enable_accept,
140 NULL,
141 NULL,
142#if (NXT_HAVE_EVENTFD)
143 nxt_epoll_enable_post,
144 nxt_epoll_signal,
145#else
146 NULL,
147 NULL,
148#endif
149 nxt_epoll_poll,
150
151 &nxt_epoll_edge_conn_io,
152
153#if (NXT_HAVE_INOTIFY)
154 NXT_FILE_EVENTS,
155#else
156 NXT_NO_FILE_EVENTS,
157#endif
158
159#if (NXT_HAVE_SIGNALFD)
160 NXT_SIGNAL_EVENTS,
161#else
162 NXT_NO_SIGNAL_EVENTS,
163#endif
164};
165
166#endif
167
168
169const nxt_event_interface_t nxt_epoll_level_engine = {
170 "epoll_level",
171 nxt_epoll_level_create,
172 nxt_epoll_free,
173 nxt_epoll_enable,
174 nxt_epoll_disable,
175 nxt_epoll_delete,
176 nxt_epoll_close,
177 nxt_epoll_enable_read,
178 nxt_epoll_enable_write,
179 nxt_epoll_disable_read,
180 nxt_epoll_disable_write,
181 nxt_epoll_block_read,
182 nxt_epoll_block_write,
183 nxt_epoll_oneshot_read,
184 nxt_epoll_oneshot_write,
185 nxt_epoll_enable_accept,
186 NULL,
187 NULL,
188#if (NXT_HAVE_EVENTFD)
189 nxt_epoll_enable_post,
190 nxt_epoll_signal,
191#else
192 NULL,
193 NULL,
194#endif
195 nxt_epoll_poll,
196
197 &nxt_unix_conn_io,
198
199#if (NXT_HAVE_INOTIFY)
200 NXT_FILE_EVENTS,
201#else
202 NXT_NO_FILE_EVENTS,
203#endif
204
205#if (NXT_HAVE_SIGNALFD)
206 NXT_SIGNAL_EVENTS,
207#else
208 NXT_NO_SIGNAL_EVENTS,
209#endif
210};
211
212
213#if (NXT_HAVE_EPOLL_EDGE)
214
215static nxt_int_t
216nxt_epoll_edge_create(nxt_event_engine_t *engine, nxt_uint_t mchanges,
217 nxt_uint_t mevents)
218{
219 return nxt_epoll_create(engine, mchanges, mevents, &nxt_epoll_edge_conn_io,
220 EPOLLET | EPOLLRDHUP);
221}
222
223#endif
224
225
226static nxt_int_t
227nxt_epoll_level_create(nxt_event_engine_t *engine, nxt_uint_t mchanges,
228 nxt_uint_t mevents)
229{
230 return nxt_epoll_create(engine, mchanges, mevents,
231 &nxt_unix_conn_io, 0);
232}
233
234
235static nxt_int_t
236nxt_epoll_create(nxt_event_engine_t *engine, nxt_uint_t mchanges,
237 nxt_uint_t mevents, nxt_conn_io_t *io, uint32_t mode)
238{
239 engine->u.epoll.fd = -1;
240 engine->u.epoll.mode = mode;
241 engine->u.epoll.mchanges = mchanges;
242 engine->u.epoll.mevents = mevents;
243#if (NXT_HAVE_SIGNALFD)
244 engine->u.epoll.signalfd.fd = -1;
245#endif
246
247 engine->u.epoll.changes = nxt_malloc(sizeof(nxt_epoll_change_t) * mchanges);
248 if (engine->u.epoll.changes == NULL) {
249 goto fail;
250 }
251
252 engine->u.epoll.events = nxt_malloc(sizeof(struct epoll_event) * mevents);
253 if (engine->u.epoll.events == NULL) {
254 goto fail;
255 }
256
257 engine->u.epoll.fd = epoll_create(1);
258 if (engine->u.epoll.fd == -1) {
259 nxt_alert(&engine->task, "epoll_create() failed %E", nxt_errno);
260 goto fail;
261 }
262
263 nxt_debug(&engine->task, "epoll_create(): %d", engine->u.epoll.fd);
264
265 if (engine->signals != NULL) {
266
267#if (NXT_HAVE_SIGNALFD)
268
269 if (nxt_epoll_add_signal(engine) != NXT_OK) {
270 goto fail;
271 }
272
273#endif
274
275 nxt_epoll_test_accept4(engine, io);
276 }
277
278 return NXT_OK;
279
280fail:
281
282 nxt_epoll_free(engine);
283
284 return NXT_ERROR;
285}
286
287
288static void
289nxt_epoll_test_accept4(nxt_event_engine_t *engine, nxt_conn_io_t *io)
290{
291 static nxt_work_handler_t handler;
292
293 if (handler == NULL) {
294
295 handler = io->accept;
296
297#if (NXT_HAVE_ACCEPT4)
298
299 (void) accept4(-1, NULL, NULL, SOCK_NONBLOCK);
300
301 if (nxt_errno != NXT_ENOSYS) {
302 handler = nxt_epoll_conn_io_accept4;
303
304 } else {
305 nxt_log(&engine->task, NXT_LOG_INFO, "accept4() failed %E",
306 NXT_ENOSYS);
307 }
308
309#endif
310 }
311
312 io->accept = handler;
313}
314
315
316static void
317nxt_epoll_free(nxt_event_engine_t *engine)
318{
319 int fd;
320
321 nxt_debug(&engine->task, "epoll %d free", engine->u.epoll.fd);
322
323#if (NXT_HAVE_SIGNALFD)
324
325 fd = engine->u.epoll.signalfd.fd;
326
327 if (fd != -1 && close(fd) != 0) {
328 nxt_alert(&engine->task, "signalfd close(%d) failed %E", fd, nxt_errno);
329 }
330
331#endif
332
333#if (NXT_HAVE_EVENTFD)
334
335 fd = engine->u.epoll.eventfd.fd;
336
337 if (fd != -1 && close(fd) != 0) {
338 nxt_alert(&engine->task, "eventfd close(%d) failed %E", fd, nxt_errno);
339 }
340
341#endif
342
343 fd = engine->u.epoll.fd;
344
345 if (fd != -1 && close(fd) != 0) {
346 nxt_alert(&engine->task, "epoll close(%d) failed %E", fd, nxt_errno);
347 }
348
349 nxt_free(engine->u.epoll.events);
350 nxt_free(engine->u.epoll.changes);
351
352 nxt_memzero(&engine->u.epoll, sizeof(nxt_epoll_engine_t));
353}
354
355
356static void
357nxt_epoll_enable(nxt_event_engine_t *engine, nxt_fd_event_t *ev)
358{
359 ev->read = NXT_EVENT_ACTIVE;
360 ev->write = NXT_EVENT_ACTIVE;
361
362 nxt_epoll_change(engine, ev, EPOLL_CTL_ADD,
363 EPOLLIN | EPOLLOUT | engine->u.epoll.mode);
364}
365
366
367static void
368nxt_epoll_disable(nxt_event_engine_t *engine, nxt_fd_event_t *ev)
369{
370 if (ev->read > NXT_EVENT_DISABLED || ev->write > NXT_EVENT_DISABLED) {
371
372 ev->read = NXT_EVENT_INACTIVE;
373 ev->write = NXT_EVENT_INACTIVE;
374
375 nxt_epoll_change(engine, ev, EPOLL_CTL_DEL, 0);
376 }
377}
378
379
380static void
381nxt_epoll_delete(nxt_event_engine_t *engine, nxt_fd_event_t *ev)
382{
383 if (ev->read != NXT_EVENT_INACTIVE || ev->write != NXT_EVENT_INACTIVE) {
384
385 ev->read = NXT_EVENT_INACTIVE;
386 ev->write = NXT_EVENT_INACTIVE;
387
388 nxt_epoll_change(engine, ev, EPOLL_CTL_DEL, 0);
389 }
390}
391
392
393/*
394 * Although calling close() on a file descriptor will remove any epoll
395 * events that reference the descriptor, in this case the close() acquires
396 * the kernel global "epmutex" while epoll_ctl(EPOLL_CTL_DEL) does not
397 * acquire the "epmutex" since Linux 3.13 if the file descriptor presents
398 * only in one epoll set. Thus removing events explicitly before closing
399 * eliminates possible lock contention.
400 */
401
402static nxt_bool_t
403nxt_epoll_close(nxt_event_engine_t *engine, nxt_fd_event_t *ev)
404{
405 nxt_epoll_delete(engine, ev);
406
407 return ev->changing;
408}
409
410
411static void
412nxt_epoll_enable_read(nxt_event_engine_t *engine, nxt_fd_event_t *ev)
413{
414 int op;
415 uint32_t events;
416
417 if (ev->read != NXT_EVENT_BLOCKED) {
418
419 op = EPOLL_CTL_MOD;
420 events = EPOLLIN | engine->u.epoll.mode;
421
422 if (ev->read == NXT_EVENT_INACTIVE && ev->write == NXT_EVENT_INACTIVE) {
423 op = EPOLL_CTL_ADD;
424
425 } else if (ev->write >= NXT_EVENT_BLOCKED) {
426 events |= EPOLLOUT;
427 }
428
429 nxt_epoll_change(engine, ev, op, events);
430 }
431
432 ev->read = NXT_EVENT_ACTIVE;
433}
434
435
436static void
437nxt_epoll_enable_write(nxt_event_engine_t *engine, nxt_fd_event_t *ev)
438{
439 int op;
440 uint32_t events;
441
442 if (ev->write != NXT_EVENT_BLOCKED) {
443
444 op = EPOLL_CTL_MOD;
445 events = EPOLLOUT | engine->u.epoll.mode;
446
447 if (ev->read == NXT_EVENT_INACTIVE && ev->write == NXT_EVENT_INACTIVE) {
448 op = EPOLL_CTL_ADD;
449
450 } else if (ev->read >= NXT_EVENT_BLOCKED) {
451 events |= EPOLLIN;
452 }
453
454 nxt_epoll_change(engine, ev, op, events);
455 }
456
457 ev->write = NXT_EVENT_ACTIVE;
458}
459
460
461static void
462nxt_epoll_disable_read(nxt_event_engine_t *engine, nxt_fd_event_t *ev)
463{
464 int op;
465 uint32_t events;
466
467 ev->read = NXT_EVENT_INACTIVE;
468
469 if (ev->write <= NXT_EVENT_DISABLED) {
470 ev->write = NXT_EVENT_INACTIVE;
471 op = EPOLL_CTL_DEL;
472 events = 0;
473
474 } else {
475 op = EPOLL_CTL_MOD;
476 events = EPOLLOUT | engine->u.epoll.mode;
477 }
478
479 nxt_epoll_change(engine, ev, op, events);
480}
481
482
483static void
484nxt_epoll_disable_write(nxt_event_engine_t *engine, nxt_fd_event_t *ev)
485{
486 int op;
487 uint32_t events;
488
489 ev->write = NXT_EVENT_INACTIVE;
490
491 if (ev->read <= NXT_EVENT_DISABLED) {
492 ev->write = NXT_EVENT_INACTIVE;
493 op = EPOLL_CTL_DEL;
494 events = 0;
495
496 } else {
497 op = EPOLL_CTL_MOD;
498 events = EPOLLIN | engine->u.epoll.mode;
499 }
500
501 nxt_epoll_change(engine, ev, op, events);
502}
503
504
505static void
506nxt_epoll_block_read(nxt_event_engine_t *engine, nxt_fd_event_t *ev)
507{
508 if (ev->read != NXT_EVENT_INACTIVE) {
509 ev->read = NXT_EVENT_BLOCKED;
510 }
511}
512
513
514static void
515nxt_epoll_block_write(nxt_event_engine_t *engine, nxt_fd_event_t *ev)
516{
517 if (ev->write != NXT_EVENT_INACTIVE) {
518 ev->write = NXT_EVENT_BLOCKED;
519 }
520}
521
522
523/*
524 * NXT_EVENT_DISABLED state is used to track whether EPOLLONESHOT
525 * event should be added or modified, epoll_ctl(2):
526 *
527 * EPOLLONESHOT (since Linux 2.6.2)
528 * Sets the one-shot behavior for the associated file descriptor.
529 * This means that after an event is pulled out with epoll_wait(2)
530 * the associated file descriptor is internally disabled and no
531 * other events will be reported by the epoll interface. The user
532 * must call epoll_ctl() with EPOLL_CTL_MOD to rearm the file
533 * descriptor with a new event mask.
534 */
535
536static void
537nxt_epoll_oneshot_read(nxt_event_engine_t *engine, nxt_fd_event_t *ev)
538{
539 int op;
540
541 op = (ev->read == NXT_EVENT_INACTIVE && ev->write == NXT_EVENT_INACTIVE) ?
542 EPOLL_CTL_ADD : EPOLL_CTL_MOD;
543
544 ev->read = NXT_EVENT_ONESHOT;
545 ev->write = NXT_EVENT_INACTIVE;
546
547 nxt_epoll_change(engine, ev, op, EPOLLIN | EPOLLONESHOT);
548}
549
550
551static void
552nxt_epoll_oneshot_write(nxt_event_engine_t *engine, nxt_fd_event_t *ev)
553{
554 int op;
555
556 op = (ev->read == NXT_EVENT_INACTIVE && ev->write == NXT_EVENT_INACTIVE) ?
557 EPOLL_CTL_ADD : EPOLL_CTL_MOD;
558
559 ev->read = NXT_EVENT_INACTIVE;
560 ev->write = NXT_EVENT_ONESHOT;
561
562 nxt_epoll_change(engine, ev, op, EPOLLOUT | EPOLLONESHOT);
563}
564
565
566static void
567nxt_epoll_enable_accept(nxt_event_engine_t *engine, nxt_fd_event_t *ev)
568{
569 uint32_t events;
570
571 ev->read = NXT_EVENT_ACTIVE;
572
573 events = EPOLLIN;
574
575#ifdef EPOLLEXCLUSIVE
576 events |= EPOLLEXCLUSIVE;
577#endif
578
579 nxt_epoll_change(engine, ev, EPOLL_CTL_ADD, events);
580}
581
582
583/*
584 * epoll changes are batched to improve instruction and data cache
585 * locality of several epoll_ctl() calls followed by epoll_wait() call.
586 */
587
588static void
589nxt_epoll_change(nxt_event_engine_t *engine, nxt_fd_event_t *ev, int op,
590 uint32_t events)
591{
592 nxt_epoll_change_t *change;
593
594 nxt_debug(ev->task, "epoll %d set event: fd:%d op:%d ev:%XD",
595 engine->u.epoll.fd, ev->fd, op, events);
596
597 if (engine->u.epoll.nchanges >= engine->u.epoll.mchanges) {
598 (void) nxt_epoll_commit_changes(engine);
599 }
600
601 ev->changing = 1;
602
603 change = &engine->u.epoll.changes[engine->u.epoll.nchanges++];
604 change->op = op;
605 change->event.events = events;
606 change->event.data.ptr = ev;
607}
608
609
610static nxt_int_t
611nxt_epoll_commit_changes(nxt_event_engine_t *engine)
612{
613 int ret;
614 nxt_int_t retval;
615 nxt_fd_event_t *ev;
616 nxt_epoll_change_t *change, *end;
617
618 nxt_debug(&engine->task, "epoll %d changes:%ui",
619 engine->u.epoll.fd, engine->u.epoll.nchanges);
620
621 retval = NXT_OK;
622 change = engine->u.epoll.changes;
623 end = change + engine->u.epoll.nchanges;
624
625 do {
626 ev = change->event.data.ptr;
627 ev->changing = 0;
628
629 nxt_debug(ev->task, "epoll_ctl(%d): fd:%d op:%d ev:%XD",
630 engine->u.epoll.fd, ev->fd, change->op,
631 change->event.events);
632
633 ret = epoll_ctl(engine->u.epoll.fd, change->op, ev->fd, &change->event);
634
635 if (nxt_slow_path(ret != 0)) {
636 nxt_alert(ev->task, "epoll_ctl(%d, %d, %d) failed %E",
637 engine->u.epoll.fd, change->op, ev->fd, nxt_errno);
638
639 nxt_work_queue_add(&engine->fast_work_queue,
640 nxt_epoll_error_handler, ev->task, ev, ev->data);
641
642 retval = NXT_ERROR;
643 }
644
645 change++;
646
647 } while (change < end);
648
649 engine->u.epoll.nchanges = 0;
650
651 return retval;
652}
653
654
655static void
656nxt_epoll_error_handler(nxt_task_t *task, void *obj, void *data)
657{
658 nxt_fd_event_t *ev;
659
660 ev = obj;
661
662 ev->read = NXT_EVENT_INACTIVE;
663 ev->write = NXT_EVENT_INACTIVE;
664
665 ev->error_handler(ev->task, ev, data);
666}
667
668
669#if (NXT_HAVE_SIGNALFD)
670
671static nxt_int_t
672nxt_epoll_add_signal(nxt_event_engine_t *engine)
673{
674 int fd;
675 struct epoll_event ee;
676
677 if (sigprocmask(SIG_BLOCK, &engine->signals->sigmask, NULL) != 0) {
678 nxt_alert(&engine->task, "sigprocmask(SIG_BLOCK) failed %E", nxt_errno);
679 return NXT_ERROR;
680 }
681
682 /*
683 * Glibc signalfd() wrapper always has the flags argument. Glibc 2.7
684 * and 2.8 signalfd() wrappers call the original signalfd() syscall
685 * without the flags argument. Glibc 2.9+ signalfd() wrapper at first
686 * tries to call signalfd4() syscall and if it fails then calls the
687 * original signalfd() syscall. For this reason the non-blocking mode
688 * is set separately.
689 */
690
691 fd = signalfd(-1, &engine->signals->sigmask, 0);
692
693 if (fd == -1) {
694 nxt_alert(&engine->task, "signalfd(%d) failed %E",
695 engine->u.epoll.signalfd.fd, nxt_errno);
696 return NXT_ERROR;
697 }
698
699 engine->u.epoll.signalfd.fd = fd;
700
701 if (nxt_fd_nonblocking(&engine->task, fd) != NXT_OK) {
702 return NXT_ERROR;
703 }
704
705 nxt_debug(&engine->task, "signalfd(): %d", fd);
706
707 engine->u.epoll.signalfd.data = engine->signals->handler;
708 engine->u.epoll.signalfd.read_work_queue = &engine->fast_work_queue;
709 engine->u.epoll.signalfd.read_handler = nxt_epoll_signalfd_handler;
710 engine->u.epoll.signalfd.log = engine->task.log;
711 engine->u.epoll.signalfd.task = &engine->task;
712
713 ee.events = EPOLLIN;
714 ee.data.ptr = &engine->u.epoll.signalfd;
715
716 if (epoll_ctl(engine->u.epoll.fd, EPOLL_CTL_ADD, fd, &ee) != 0) {
717 nxt_alert(&engine->task, "epoll_ctl(%d, %d, %d) failed %E",
718 engine->u.epoll.fd, EPOLL_CTL_ADD, fd, nxt_errno);
719
720 return NXT_ERROR;
721 }
722
723 return NXT_OK;
724}
725
726
727static void
728nxt_epoll_signalfd_handler(nxt_task_t *task, void *obj, void *data)
729{
730 int n;
731 nxt_fd_event_t *ev;
732 nxt_work_handler_t handler;
733 struct signalfd_siginfo sfd;
734
735 ev = obj;
736 handler = data;
737
738 nxt_debug(task, "signalfd handler");
739
740 n = read(ev->fd, &sfd, sizeof(struct signalfd_siginfo));
741
742 nxt_debug(task, "read signalfd(%d): %d", ev->fd, n);
743
744 if (n != sizeof(struct signalfd_siginfo)) {
745 nxt_alert(task, "read signalfd(%d) failed %E", ev->fd, nxt_errno);
746 return;
747 }
748
749 nxt_debug(task, "signalfd(%d) signo:%d", ev->fd, sfd.ssi_signo);
750
751 handler(task, (void *) (uintptr_t) sfd.ssi_signo, NULL);
752}
753
754#endif
755
756
757#if (NXT_HAVE_EVENTFD)
758
759static nxt_int_t
760nxt_epoll_enable_post(nxt_event_engine_t *engine, nxt_work_handler_t handler)
761{
762 int ret;
763 struct epoll_event ee;
764
765 engine->u.epoll.post_handler = handler;
766
767 /*
768 * Glibc eventfd() wrapper always has the flags argument. Glibc 2.7
769 * and 2.8 eventfd() wrappers call the original eventfd() syscall
770 * without the flags argument. Glibc 2.9+ eventfd() wrapper at first
771 * tries to call eventfd2() syscall and if it fails then calls the
772 * original eventfd() syscall. For this reason the non-blocking mode
773 * is set separately.
774 */
775
776 engine->u.epoll.eventfd.fd = eventfd(0, 0);
777
778 if (engine->u.epoll.eventfd.fd == -1) {
779 nxt_alert(&engine->task, "eventfd() failed %E", nxt_errno);
780 return NXT_ERROR;
781 }
782
783 ret = nxt_fd_nonblocking(&engine->task, engine->u.epoll.eventfd.fd);
784 if (nxt_slow_path(ret != NXT_OK)) {
785 return NXT_ERROR;
786 }
787
788 nxt_debug(&engine->task, "eventfd(): %d", engine->u.epoll.eventfd.fd);
789
790 engine->u.epoll.eventfd.read_work_queue = &engine->fast_work_queue;
791 engine->u.epoll.eventfd.read_handler = nxt_epoll_eventfd_handler;
792 engine->u.epoll.eventfd.data = engine;
793 engine->u.epoll.eventfd.log = engine->task.log;
794 engine->u.epoll.eventfd.task = &engine->task;
795
796 ee.events = EPOLLIN | EPOLLET;
797 ee.data.ptr = &engine->u.epoll.eventfd;
798
799 ret = epoll_ctl(engine->u.epoll.fd, EPOLL_CTL_ADD,
800 engine->u.epoll.eventfd.fd, &ee);
801
802 if (nxt_fast_path(ret == 0)) {
803 return NXT_OK;
804 }
805
806 nxt_alert(&engine->task, "epoll_ctl(%d, %d, %d) failed %E",
807 engine->u.epoll.fd, EPOLL_CTL_ADD, engine->u.epoll.eventfd.fd,
808 nxt_errno);
809
810 return NXT_ERROR;
811}
812
813
814static void
815nxt_epoll_eventfd_handler(nxt_task_t *task, void *obj, void *data)
816{
817 int n;
818 uint64_t events;
819 nxt_event_engine_t *engine;
820
821 engine = data;
822
823 nxt_debug(task, "eventfd handler, times:%ui", engine->u.epoll.neventfd);
824
825 /*
826 * The maximum value after write() to a eventfd() descriptor will
827 * block or return EAGAIN is 0xfffffffffffffffe, so the descriptor
827 * block or return EAGAIN is 0xFFFFFFFFFFFFFFFE, so the descriptor
828 * can be read once per many notifications, for example, once per
829 * 2^32-2 noticifcations. Since the eventfd() file descriptor is
830 * always registered in EPOLLET mode, epoll returns event about
831 * only the latest write() to the descriptor.
832 */
833
828 * can be read once per many notifications, for example, once per
829 * 2^32-2 noticifcations. Since the eventfd() file descriptor is
830 * always registered in EPOLLET mode, epoll returns event about
831 * only the latest write() to the descriptor.
832 */
833
834 if (engine->u.epoll.neventfd++ >= 0xfffffffe) {
834 if (engine->u.epoll.neventfd++ >= 0xFFFFFFFE) {
835 engine->u.epoll.neventfd = 0;
836
837 n = read(engine->u.epoll.eventfd.fd, &events, sizeof(uint64_t));
838
839 nxt_debug(task, "read(%d): %d events:%uL",
840 engine->u.epoll.eventfd.fd, n, events);
841
842 if (n != sizeof(uint64_t)) {
843 nxt_alert(task, "read eventfd(%d) failed %E",
844 engine->u.epoll.eventfd.fd, nxt_errno);
845 }
846 }
847
848 engine->u.epoll.post_handler(task, NULL, NULL);
849}
850
851
852static void
853nxt_epoll_signal(nxt_event_engine_t *engine, nxt_uint_t signo)
854{
855 size_t ret;
856 uint64_t event;
857
858 /*
859 * eventfd() presents along with signalfd(), so the function
860 * is used only to post events and the signo argument is ignored.
861 */
862
863 event = 1;
864
865 ret = write(engine->u.epoll.eventfd.fd, &event, sizeof(uint64_t));
866
867 if (nxt_slow_path(ret != sizeof(uint64_t))) {
868 nxt_alert(&engine->task, "write(%d) to eventfd failed %E",
869 engine->u.epoll.eventfd.fd, nxt_errno);
870 }
871}
872
873#endif
874
875
876static void
877nxt_epoll_poll(nxt_event_engine_t *engine, nxt_msec_t timeout)
878{
879 int nevents;
880 uint32_t events;
881 nxt_int_t i;
882 nxt_err_t err;
883 nxt_bool_t error;
884 nxt_uint_t level;
885 nxt_fd_event_t *ev;
886 struct epoll_event *event;
887
888 if (engine->u.epoll.nchanges != 0) {
889 if (nxt_epoll_commit_changes(engine) != NXT_OK) {
890 /* Error handlers have been enqueued on failure. */
891 timeout = 0;
892 }
893 }
894
895 nxt_debug(&engine->task, "epoll_wait(%d) timeout:%M",
896 engine->u.epoll.fd, timeout);
897
898 nevents = epoll_wait(engine->u.epoll.fd, engine->u.epoll.events,
899 engine->u.epoll.mevents, timeout);
900
901 err = (nevents == -1) ? nxt_errno : 0;
902
903 nxt_thread_time_update(engine->task.thread);
904
905 nxt_debug(&engine->task, "epoll_wait(%d): %d", engine->u.epoll.fd, nevents);
906
907 if (nevents == -1) {
908 level = (err == NXT_EINTR) ? NXT_LOG_INFO : NXT_LOG_ALERT;
909
910 nxt_log(&engine->task, level, "epoll_wait(%d) failed %E",
911 engine->u.epoll.fd, err);
912
913 return;
914 }
915
916 for (i = 0; i < nevents; i++) {
917
918 event = &engine->u.epoll.events[i];
919 events = event->events;
920 ev = event->data.ptr;
921
922 nxt_debug(ev->task, "epoll: fd:%d ev:%04XD d:%p rd:%d wr:%d",
923 ev->fd, events, ev, ev->read, ev->write);
924
925 /*
926 * On error epoll may set EPOLLERR and EPOLLHUP only without EPOLLIN or
927 * EPOLLOUT, so the "error" variable enqueues only one active handler.
928 */
929 error = ((events & (EPOLLERR | EPOLLHUP)) != 0);
930 ev->epoll_error = error;
931
932#if (NXT_HAVE_EPOLL_EDGE)
933
934 ev->epoll_eof = ((events & EPOLLRDHUP) != 0);
935
936#endif
937
938 if ((events & EPOLLIN) || error) {
939 ev->read_ready = 1;
940
941 if (ev->read != NXT_EVENT_BLOCKED) {
942
943 if (ev->read == NXT_EVENT_ONESHOT) {
944 ev->read = NXT_EVENT_DISABLED;
945 }
946
947 error = 0;
948
949 nxt_work_queue_add(ev->read_work_queue, ev->read_handler,
950 ev->task, ev, ev->data);
951
952 } else if (engine->u.epoll.mode == 0) {
953 /* Level-triggered mode. */
954 nxt_epoll_disable_read(engine, ev);
955 }
956 }
957
958 if ((events & EPOLLOUT) || error) {
959 ev->write_ready = 1;
960
961 if (ev->write != NXT_EVENT_BLOCKED) {
962
963 if (ev->write == NXT_EVENT_ONESHOT) {
964 ev->write = NXT_EVENT_DISABLED;
965 }
966
967 error = 0;
968
969 nxt_work_queue_add(ev->write_work_queue, ev->write_handler,
970 ev->task, ev, ev->data);
971
972 } else if (engine->u.epoll.mode == 0) {
973 /* Level-triggered mode. */
974 nxt_epoll_disable_write(engine, ev);
975 }
976 }
977
978 if (error) {
979 ev->read_ready = 1;
980 ev->write_ready = 1;
981 }
982 }
983}
984
985
986#if (NXT_HAVE_ACCEPT4)
987
988static void
989nxt_epoll_conn_io_accept4(nxt_task_t *task, void *obj, void *data)
990{
991 socklen_t socklen;
992 nxt_conn_t *c;
993 nxt_socket_t s;
994 struct sockaddr *sa;
995 nxt_listen_event_t *lev;
996
997 lev = obj;
998 c = lev->next;
999
1000 lev->ready--;
1001 lev->socket.read_ready = (lev->ready != 0);
1002
1003 sa = &c->remote->u.sockaddr;
1004 socklen = c->remote->socklen;
1005 /*
1006 * The returned socklen is ignored here,
1007 * see comment in nxt_conn_io_accept().
1008 */
1009 s = accept4(lev->socket.fd, sa, &socklen, SOCK_NONBLOCK);
1010
1011 if (s != -1) {
1012 c->socket.fd = s;
1013
1014 nxt_debug(task, "accept4(%d): %d", lev->socket.fd, s);
1015
1016 nxt_conn_accept(task, lev, c);
1017 return;
1018 }
1019
1020 nxt_conn_accept_error(task, lev, "accept4", nxt_errno);
1021}
1022
1023#endif
1024
1025
1026#if (NXT_HAVE_EPOLL_EDGE)
1027
1028/*
1029 * nxt_epoll_edge_event_conn_io_connect() eliminates the getsockopt()
1030 * syscall to test pending connect() error. Although this special
1031 * interface can work in both edge-triggered and level-triggered
1032 * modes it is enabled only for the former mode because this mode is
1033 * available in all modern Linux distributions. For the latter mode
1034 * it is required to create additional nxt_epoll_level_event_conn_io
1035 * with single non-generic connect() interface.
1036 */
1037
1038static void
1039nxt_epoll_edge_conn_io_connect(nxt_task_t *task, void *obj, void *data)
1040{
1041 nxt_conn_t *c;
1042 nxt_event_engine_t *engine;
1043 nxt_work_handler_t handler;
1044 const nxt_event_conn_state_t *state;
1045
1046 c = obj;
1047
1048 state = c->write_state;
1049
1050 switch (nxt_socket_connect(task, c->socket.fd, c->remote) ){
1051
1052 case NXT_OK:
1053 c->socket.write_ready = 1;
1054 handler = state->ready_handler;
1055 break;
1056
1057 case NXT_AGAIN:
1058 c->socket.write_handler = nxt_epoll_edge_conn_connected;
1059 c->socket.error_handler = nxt_conn_connect_error;
1060
1061 engine = task->thread->engine;
1062 nxt_conn_timer(engine, c, state, &c->write_timer);
1063
1064 nxt_epoll_enable(engine, &c->socket);
1065 c->socket.read = NXT_EVENT_BLOCKED;
1066 return;
1067
1068#if 0
1069 case NXT_AGAIN:
1070 nxt_conn_timer(engine, c, state, &c->write_timer);
1071
1072 /* Fall through. */
1073
1074 case NXT_OK:
1075 /*
1076 * Mark both read and write directions as ready and try to perform
1077 * I/O operations before receiving readiness notifications.
1078 * On unconnected socket Linux send() and recv() return EAGAIN
1079 * instead of ENOTCONN.
1080 */
1081 c->socket.read_ready = 1;
1082 c->socket.write_ready = 1;
1083 /*
1084 * Enabling both read and write notifications on a getting
1085 * connected socket eliminates one epoll_ctl() syscall.
1086 */
1087 c->socket.write_handler = nxt_epoll_edge_event_conn_connected;
1088 c->socket.error_handler = state->error_handler;
1089
1090 nxt_epoll_enable(engine, &c->socket);
1091 c->socket.read = NXT_EVENT_BLOCKED;
1092
1093 handler = state->ready_handler;
1094 break;
1095#endif
1096
1097 case NXT_ERROR:
1098 handler = state->error_handler;
1099 break;
1100
1101 default: /* NXT_DECLINED: connection refused. */
1102 handler = state->close_handler;
1103 break;
1104 }
1105
1106 nxt_work_queue_add(c->write_work_queue, handler, task, c, data);
1107}
1108
1109
1110static void
1111nxt_epoll_edge_conn_connected(nxt_task_t *task, void *obj, void *data)
1112{
1113 nxt_conn_t *c;
1114
1115 c = obj;
1116
1117 nxt_debug(task, "epoll event conn connected fd:%d", c->socket.fd);
1118
1119 if (!c->socket.epoll_error) {
1120 c->socket.write = NXT_EVENT_BLOCKED;
1121
1122 if (c->write_state->timer_autoreset) {
1123 nxt_timer_disable(task->thread->engine, &c->write_timer);
1124 }
1125
1126 nxt_work_queue_add(c->write_work_queue, c->write_state->ready_handler,
1127 task, c, data);
1128 return;
1129 }
1130
1131 nxt_conn_connect_test(task, c, data);
1132}
1133
1134
1135/*
1136 * nxt_epoll_edge_conn_io_recvbuf() is just wrapper around
1137 * standard nxt_conn_io_recvbuf() to enforce to read a pending EOF
1138 * in edge-triggered mode.
1139 */
1140
1141static ssize_t
1142nxt_epoll_edge_conn_io_recvbuf(nxt_conn_t *c, nxt_buf_t *b)
1143{
1144 ssize_t n;
1145
1146 n = nxt_conn_io_recvbuf(c, b);
1147
1148 if (n > 0 && c->socket.epoll_eof) {
1149 c->socket.read_ready = 1;
1150 }
1151
1152 return n;
1153}
1154
1155#endif
835 engine->u.epoll.neventfd = 0;
836
837 n = read(engine->u.epoll.eventfd.fd, &events, sizeof(uint64_t));
838
839 nxt_debug(task, "read(%d): %d events:%uL",
840 engine->u.epoll.eventfd.fd, n, events);
841
842 if (n != sizeof(uint64_t)) {
843 nxt_alert(task, "read eventfd(%d) failed %E",
844 engine->u.epoll.eventfd.fd, nxt_errno);
845 }
846 }
847
848 engine->u.epoll.post_handler(task, NULL, NULL);
849}
850
851
852static void
853nxt_epoll_signal(nxt_event_engine_t *engine, nxt_uint_t signo)
854{
855 size_t ret;
856 uint64_t event;
857
858 /*
859 * eventfd() presents along with signalfd(), so the function
860 * is used only to post events and the signo argument is ignored.
861 */
862
863 event = 1;
864
865 ret = write(engine->u.epoll.eventfd.fd, &event, sizeof(uint64_t));
866
867 if (nxt_slow_path(ret != sizeof(uint64_t))) {
868 nxt_alert(&engine->task, "write(%d) to eventfd failed %E",
869 engine->u.epoll.eventfd.fd, nxt_errno);
870 }
871}
872
873#endif
874
875
876static void
877nxt_epoll_poll(nxt_event_engine_t *engine, nxt_msec_t timeout)
878{
879 int nevents;
880 uint32_t events;
881 nxt_int_t i;
882 nxt_err_t err;
883 nxt_bool_t error;
884 nxt_uint_t level;
885 nxt_fd_event_t *ev;
886 struct epoll_event *event;
887
888 if (engine->u.epoll.nchanges != 0) {
889 if (nxt_epoll_commit_changes(engine) != NXT_OK) {
890 /* Error handlers have been enqueued on failure. */
891 timeout = 0;
892 }
893 }
894
895 nxt_debug(&engine->task, "epoll_wait(%d) timeout:%M",
896 engine->u.epoll.fd, timeout);
897
898 nevents = epoll_wait(engine->u.epoll.fd, engine->u.epoll.events,
899 engine->u.epoll.mevents, timeout);
900
901 err = (nevents == -1) ? nxt_errno : 0;
902
903 nxt_thread_time_update(engine->task.thread);
904
905 nxt_debug(&engine->task, "epoll_wait(%d): %d", engine->u.epoll.fd, nevents);
906
907 if (nevents == -1) {
908 level = (err == NXT_EINTR) ? NXT_LOG_INFO : NXT_LOG_ALERT;
909
910 nxt_log(&engine->task, level, "epoll_wait(%d) failed %E",
911 engine->u.epoll.fd, err);
912
913 return;
914 }
915
916 for (i = 0; i < nevents; i++) {
917
918 event = &engine->u.epoll.events[i];
919 events = event->events;
920 ev = event->data.ptr;
921
922 nxt_debug(ev->task, "epoll: fd:%d ev:%04XD d:%p rd:%d wr:%d",
923 ev->fd, events, ev, ev->read, ev->write);
924
925 /*
926 * On error epoll may set EPOLLERR and EPOLLHUP only without EPOLLIN or
927 * EPOLLOUT, so the "error" variable enqueues only one active handler.
928 */
929 error = ((events & (EPOLLERR | EPOLLHUP)) != 0);
930 ev->epoll_error = error;
931
932#if (NXT_HAVE_EPOLL_EDGE)
933
934 ev->epoll_eof = ((events & EPOLLRDHUP) != 0);
935
936#endif
937
938 if ((events & EPOLLIN) || error) {
939 ev->read_ready = 1;
940
941 if (ev->read != NXT_EVENT_BLOCKED) {
942
943 if (ev->read == NXT_EVENT_ONESHOT) {
944 ev->read = NXT_EVENT_DISABLED;
945 }
946
947 error = 0;
948
949 nxt_work_queue_add(ev->read_work_queue, ev->read_handler,
950 ev->task, ev, ev->data);
951
952 } else if (engine->u.epoll.mode == 0) {
953 /* Level-triggered mode. */
954 nxt_epoll_disable_read(engine, ev);
955 }
956 }
957
958 if ((events & EPOLLOUT) || error) {
959 ev->write_ready = 1;
960
961 if (ev->write != NXT_EVENT_BLOCKED) {
962
963 if (ev->write == NXT_EVENT_ONESHOT) {
964 ev->write = NXT_EVENT_DISABLED;
965 }
966
967 error = 0;
968
969 nxt_work_queue_add(ev->write_work_queue, ev->write_handler,
970 ev->task, ev, ev->data);
971
972 } else if (engine->u.epoll.mode == 0) {
973 /* Level-triggered mode. */
974 nxt_epoll_disable_write(engine, ev);
975 }
976 }
977
978 if (error) {
979 ev->read_ready = 1;
980 ev->write_ready = 1;
981 }
982 }
983}
984
985
986#if (NXT_HAVE_ACCEPT4)
987
988static void
989nxt_epoll_conn_io_accept4(nxt_task_t *task, void *obj, void *data)
990{
991 socklen_t socklen;
992 nxt_conn_t *c;
993 nxt_socket_t s;
994 struct sockaddr *sa;
995 nxt_listen_event_t *lev;
996
997 lev = obj;
998 c = lev->next;
999
1000 lev->ready--;
1001 lev->socket.read_ready = (lev->ready != 0);
1002
1003 sa = &c->remote->u.sockaddr;
1004 socklen = c->remote->socklen;
1005 /*
1006 * The returned socklen is ignored here,
1007 * see comment in nxt_conn_io_accept().
1008 */
1009 s = accept4(lev->socket.fd, sa, &socklen, SOCK_NONBLOCK);
1010
1011 if (s != -1) {
1012 c->socket.fd = s;
1013
1014 nxt_debug(task, "accept4(%d): %d", lev->socket.fd, s);
1015
1016 nxt_conn_accept(task, lev, c);
1017 return;
1018 }
1019
1020 nxt_conn_accept_error(task, lev, "accept4", nxt_errno);
1021}
1022
1023#endif
1024
1025
1026#if (NXT_HAVE_EPOLL_EDGE)
1027
1028/*
1029 * nxt_epoll_edge_event_conn_io_connect() eliminates the getsockopt()
1030 * syscall to test pending connect() error. Although this special
1031 * interface can work in both edge-triggered and level-triggered
1032 * modes it is enabled only for the former mode because this mode is
1033 * available in all modern Linux distributions. For the latter mode
1034 * it is required to create additional nxt_epoll_level_event_conn_io
1035 * with single non-generic connect() interface.
1036 */
1037
1038static void
1039nxt_epoll_edge_conn_io_connect(nxt_task_t *task, void *obj, void *data)
1040{
1041 nxt_conn_t *c;
1042 nxt_event_engine_t *engine;
1043 nxt_work_handler_t handler;
1044 const nxt_event_conn_state_t *state;
1045
1046 c = obj;
1047
1048 state = c->write_state;
1049
1050 switch (nxt_socket_connect(task, c->socket.fd, c->remote) ){
1051
1052 case NXT_OK:
1053 c->socket.write_ready = 1;
1054 handler = state->ready_handler;
1055 break;
1056
1057 case NXT_AGAIN:
1058 c->socket.write_handler = nxt_epoll_edge_conn_connected;
1059 c->socket.error_handler = nxt_conn_connect_error;
1060
1061 engine = task->thread->engine;
1062 nxt_conn_timer(engine, c, state, &c->write_timer);
1063
1064 nxt_epoll_enable(engine, &c->socket);
1065 c->socket.read = NXT_EVENT_BLOCKED;
1066 return;
1067
1068#if 0
1069 case NXT_AGAIN:
1070 nxt_conn_timer(engine, c, state, &c->write_timer);
1071
1072 /* Fall through. */
1073
1074 case NXT_OK:
1075 /*
1076 * Mark both read and write directions as ready and try to perform
1077 * I/O operations before receiving readiness notifications.
1078 * On unconnected socket Linux send() and recv() return EAGAIN
1079 * instead of ENOTCONN.
1080 */
1081 c->socket.read_ready = 1;
1082 c->socket.write_ready = 1;
1083 /*
1084 * Enabling both read and write notifications on a getting
1085 * connected socket eliminates one epoll_ctl() syscall.
1086 */
1087 c->socket.write_handler = nxt_epoll_edge_event_conn_connected;
1088 c->socket.error_handler = state->error_handler;
1089
1090 nxt_epoll_enable(engine, &c->socket);
1091 c->socket.read = NXT_EVENT_BLOCKED;
1092
1093 handler = state->ready_handler;
1094 break;
1095#endif
1096
1097 case NXT_ERROR:
1098 handler = state->error_handler;
1099 break;
1100
1101 default: /* NXT_DECLINED: connection refused. */
1102 handler = state->close_handler;
1103 break;
1104 }
1105
1106 nxt_work_queue_add(c->write_work_queue, handler, task, c, data);
1107}
1108
1109
1110static void
1111nxt_epoll_edge_conn_connected(nxt_task_t *task, void *obj, void *data)
1112{
1113 nxt_conn_t *c;
1114
1115 c = obj;
1116
1117 nxt_debug(task, "epoll event conn connected fd:%d", c->socket.fd);
1118
1119 if (!c->socket.epoll_error) {
1120 c->socket.write = NXT_EVENT_BLOCKED;
1121
1122 if (c->write_state->timer_autoreset) {
1123 nxt_timer_disable(task->thread->engine, &c->write_timer);
1124 }
1125
1126 nxt_work_queue_add(c->write_work_queue, c->write_state->ready_handler,
1127 task, c, data);
1128 return;
1129 }
1130
1131 nxt_conn_connect_test(task, c, data);
1132}
1133
1134
1135/*
1136 * nxt_epoll_edge_conn_io_recvbuf() is just wrapper around
1137 * standard nxt_conn_io_recvbuf() to enforce to read a pending EOF
1138 * in edge-triggered mode.
1139 */
1140
1141static ssize_t
1142nxt_epoll_edge_conn_io_recvbuf(nxt_conn_t *c, nxt_buf_t *b)
1143{
1144 ssize_t n;
1145
1146 n = nxt_conn_io_recvbuf(c, b);
1147
1148 if (n > 0 && c->socket.epoll_eof) {
1149 c->socket.read_ready = 1;
1150 }
1151
1152 return n;
1153}
1154
1155#endif