nxt_epoll_engine.c (57:adf36a8fa62b) nxt_epoll_engine.c (62:5e1efcc7b740)
1
2/*
3 * Copyright (C) Igor Sysoev
4 * Copyright (C) NGINX, Inc.
5 */
6
7#include <nxt_main.h>
8

--- 24 unchanged lines hidden (view full) ---

33
34#if (NXT_HAVE_EPOLL_EDGE)
35static nxt_int_t nxt_epoll_edge_create(nxt_event_engine_t *engine,
36 nxt_uint_t mchanges, nxt_uint_t mevents);
37#endif
38static nxt_int_t nxt_epoll_level_create(nxt_event_engine_t *engine,
39 nxt_uint_t mchanges, nxt_uint_t mevents);
40static nxt_int_t nxt_epoll_create(nxt_event_engine_t *engine,
1
2/*
3 * Copyright (C) Igor Sysoev
4 * Copyright (C) NGINX, Inc.
5 */
6
7#include <nxt_main.h>
8

--- 24 unchanged lines hidden (view full) ---

33
34#if (NXT_HAVE_EPOLL_EDGE)
35static nxt_int_t nxt_epoll_edge_create(nxt_event_engine_t *engine,
36 nxt_uint_t mchanges, nxt_uint_t mevents);
37#endif
38static nxt_int_t nxt_epoll_level_create(nxt_event_engine_t *engine,
39 nxt_uint_t mchanges, nxt_uint_t mevents);
40static nxt_int_t nxt_epoll_create(nxt_event_engine_t *engine,
41 nxt_uint_t mchanges, nxt_uint_t mevents, nxt_event_conn_io_t *io,
42 uint32_t mode);
41 nxt_uint_t mchanges, nxt_uint_t mevents, nxt_conn_io_t *io, uint32_t mode);
43static void nxt_epoll_test_accept4(nxt_event_engine_t *engine,
42static void nxt_epoll_test_accept4(nxt_event_engine_t *engine,
44 nxt_event_conn_io_t *io);
43 nxt_conn_io_t *io);
45static void nxt_epoll_free(nxt_event_engine_t *engine);
46static void nxt_epoll_enable(nxt_event_engine_t *engine, nxt_fd_event_t *ev);
47static void nxt_epoll_disable(nxt_event_engine_t *engine, nxt_fd_event_t *ev);
48static void nxt_epoll_delete(nxt_event_engine_t *engine, nxt_fd_event_t *ev);
49static nxt_bool_t nxt_epoll_close(nxt_event_engine_t *engine,
50 nxt_fd_event_t *ev);
51static void nxt_epoll_enable_read(nxt_event_engine_t *engine,
52 nxt_fd_event_t *ev);

--- 25 unchanged lines hidden (view full) ---

78static nxt_int_t nxt_epoll_enable_post(nxt_event_engine_t *engine,
79 nxt_work_handler_t handler);
80static void nxt_epoll_eventfd_handler(nxt_task_t *task, void *obj, void *data);
81static void nxt_epoll_signal(nxt_event_engine_t *engine, nxt_uint_t signo);
82#endif
83static void nxt_epoll_poll(nxt_event_engine_t *engine, nxt_msec_t timeout);
84
85#if (NXT_HAVE_ACCEPT4)
44static void nxt_epoll_free(nxt_event_engine_t *engine);
45static void nxt_epoll_enable(nxt_event_engine_t *engine, nxt_fd_event_t *ev);
46static void nxt_epoll_disable(nxt_event_engine_t *engine, nxt_fd_event_t *ev);
47static void nxt_epoll_delete(nxt_event_engine_t *engine, nxt_fd_event_t *ev);
48static nxt_bool_t nxt_epoll_close(nxt_event_engine_t *engine,
49 nxt_fd_event_t *ev);
50static void nxt_epoll_enable_read(nxt_event_engine_t *engine,
51 nxt_fd_event_t *ev);

--- 25 unchanged lines hidden (view full) ---

77static nxt_int_t nxt_epoll_enable_post(nxt_event_engine_t *engine,
78 nxt_work_handler_t handler);
79static void nxt_epoll_eventfd_handler(nxt_task_t *task, void *obj, void *data);
80static void nxt_epoll_signal(nxt_event_engine_t *engine, nxt_uint_t signo);
81#endif
82static void nxt_epoll_poll(nxt_event_engine_t *engine, nxt_msec_t timeout);
83
84#if (NXT_HAVE_ACCEPT4)
86static void nxt_epoll_event_conn_io_accept4(nxt_task_t *task, void *obj,
85static void nxt_epoll_conn_io_accept4(nxt_task_t *task, void *obj,
87 void *data);
88#endif
89
90
91#if (NXT_HAVE_EPOLL_EDGE)
92
86 void *data);
87#endif
88
89
90#if (NXT_HAVE_EPOLL_EDGE)
91
93static void nxt_epoll_edge_event_conn_io_connect(nxt_task_t *task, void *obj,
92static void nxt_epoll_edge_conn_io_connect(nxt_task_t *task, void *obj,
94 void *data);
93 void *data);
95static void nxt_epoll_edge_event_conn_connected(nxt_task_t *task, void *obj,
94static void nxt_epoll_edge_conn_connected(nxt_task_t *task, void *obj,
96 void *data);
95 void *data);
97static ssize_t nxt_epoll_edge_event_conn_io_recvbuf(nxt_event_conn_t *c,
98 nxt_buf_t *b);
96static ssize_t nxt_epoll_edge_conn_io_recvbuf(nxt_conn_t *c, nxt_buf_t *b);
99
100
97
98
101static nxt_event_conn_io_t nxt_epoll_edge_event_conn_io = {
102 nxt_epoll_edge_event_conn_io_connect,
103 nxt_event_conn_io_accept,
99static nxt_conn_io_t nxt_epoll_edge_conn_io = {
100 nxt_epoll_edge_conn_io_connect,
101 nxt_conn_io_accept,
104
102
105 nxt_event_conn_io_read,
106 nxt_epoll_edge_event_conn_io_recvbuf,
107 nxt_event_conn_io_recv,
103 nxt_conn_io_read,
104 nxt_epoll_edge_conn_io_recvbuf,
105 nxt_conn_io_recv,
108
109 nxt_conn_io_write,
110 nxt_event_conn_io_write_chunk,
111
112#if (NXT_HAVE_LINUX_SENDFILE)
113 nxt_linux_event_conn_io_sendfile,
114#else
115 nxt_event_conn_io_sendbuf,
116#endif
117
118 nxt_event_conn_io_writev,
119 nxt_event_conn_io_send,
120
106
107 nxt_conn_io_write,
108 nxt_event_conn_io_write_chunk,
109
110#if (NXT_HAVE_LINUX_SENDFILE)
111 nxt_linux_event_conn_io_sendfile,
112#else
113 nxt_event_conn_io_sendbuf,
114#endif
115
116 nxt_event_conn_io_writev,
117 nxt_event_conn_io_send,
118
121 nxt_event_conn_io_shutdown,
119 nxt_conn_io_shutdown,
122};
123
124
125const nxt_event_interface_t nxt_epoll_edge_engine = {
126 "epoll_edge",
127 nxt_epoll_edge_create,
128 nxt_epoll_free,
129 nxt_epoll_enable,

--- 15 unchanged lines hidden (view full) ---

145 nxt_epoll_enable_post,
146 nxt_epoll_signal,
147#else
148 NULL,
149 NULL,
150#endif
151 nxt_epoll_poll,
152
120};
121
122
123const nxt_event_interface_t nxt_epoll_edge_engine = {
124 "epoll_edge",
125 nxt_epoll_edge_create,
126 nxt_epoll_free,
127 nxt_epoll_enable,

--- 15 unchanged lines hidden (view full) ---

143 nxt_epoll_enable_post,
144 nxt_epoll_signal,
145#else
146 NULL,
147 NULL,
148#endif
149 nxt_epoll_poll,
150
153 &nxt_epoll_edge_event_conn_io,
151 &nxt_epoll_edge_conn_io,
154
155#if (NXT_HAVE_INOTIFY)
156 NXT_FILE_EVENTS,
157#else
158 NXT_NO_FILE_EVENTS,
159#endif
160
161#if (NXT_HAVE_SIGNALFD)

--- 29 unchanged lines hidden (view full) ---

191 nxt_epoll_enable_post,
192 nxt_epoll_signal,
193#else
194 NULL,
195 NULL,
196#endif
197 nxt_epoll_poll,
198
152
153#if (NXT_HAVE_INOTIFY)
154 NXT_FILE_EVENTS,
155#else
156 NXT_NO_FILE_EVENTS,
157#endif
158
159#if (NXT_HAVE_SIGNALFD)

--- 29 unchanged lines hidden (view full) ---

189 nxt_epoll_enable_post,
190 nxt_epoll_signal,
191#else
192 NULL,
193 NULL,
194#endif
195 nxt_epoll_poll,
196
199 &nxt_unix_event_conn_io,
197 &nxt_unix_conn_io,
200
201#if (NXT_HAVE_INOTIFY)
202 NXT_FILE_EVENTS,
203#else
204 NXT_NO_FILE_EVENTS,
205#endif
206
207#if (NXT_HAVE_SIGNALFD)

--- 5 unchanged lines hidden (view full) ---

213
214
215#if (NXT_HAVE_EPOLL_EDGE)
216
217static nxt_int_t
218nxt_epoll_edge_create(nxt_event_engine_t *engine, nxt_uint_t mchanges,
219 nxt_uint_t mevents)
220{
198
199#if (NXT_HAVE_INOTIFY)
200 NXT_FILE_EVENTS,
201#else
202 NXT_NO_FILE_EVENTS,
203#endif
204
205#if (NXT_HAVE_SIGNALFD)

--- 5 unchanged lines hidden (view full) ---

211
212
213#if (NXT_HAVE_EPOLL_EDGE)
214
215static nxt_int_t
216nxt_epoll_edge_create(nxt_event_engine_t *engine, nxt_uint_t mchanges,
217 nxt_uint_t mevents)
218{
221 return nxt_epoll_create(engine, mchanges, mevents,
222 &nxt_epoll_edge_event_conn_io,
219 return nxt_epoll_create(engine, mchanges, mevents, &nxt_epoll_edge_conn_io,
223 EPOLLET | EPOLLRDHUP);
224}
225
226#endif
227
228
229static nxt_int_t
230nxt_epoll_level_create(nxt_event_engine_t *engine, nxt_uint_t mchanges,
231 nxt_uint_t mevents)
232{
233 return nxt_epoll_create(engine, mchanges, mevents,
220 EPOLLET | EPOLLRDHUP);
221}
222
223#endif
224
225
226static nxt_int_t
227nxt_epoll_level_create(nxt_event_engine_t *engine, nxt_uint_t mchanges,
228 nxt_uint_t mevents)
229{
230 return nxt_epoll_create(engine, mchanges, mevents,
234 &nxt_unix_event_conn_io, 0);
231 &nxt_unix_conn_io, 0);
235}
236
237
238static nxt_int_t
239nxt_epoll_create(nxt_event_engine_t *engine, nxt_uint_t mchanges,
232}
233
234
235static nxt_int_t
236nxt_epoll_create(nxt_event_engine_t *engine, nxt_uint_t mchanges,
240 nxt_uint_t mevents, nxt_event_conn_io_t *io, uint32_t mode)
237 nxt_uint_t mevents, nxt_conn_io_t *io, uint32_t mode)
241{
242 engine->u.epoll.fd = -1;
243 engine->u.epoll.mode = mode;
244 engine->u.epoll.mchanges = mchanges;
245 engine->u.epoll.mevents = mevents;
246#if (NXT_HAVE_SIGNALFD)
247 engine->u.epoll.signalfd.fd = -1;
248#endif

--- 36 unchanged lines hidden (view full) ---

285
286 nxt_epoll_free(engine);
287
288 return NXT_ERROR;
289}
290
291
292static void
238{
239 engine->u.epoll.fd = -1;
240 engine->u.epoll.mode = mode;
241 engine->u.epoll.mchanges = mchanges;
242 engine->u.epoll.mevents = mevents;
243#if (NXT_HAVE_SIGNALFD)
244 engine->u.epoll.signalfd.fd = -1;
245#endif

--- 36 unchanged lines hidden (view full) ---

282
283 nxt_epoll_free(engine);
284
285 return NXT_ERROR;
286}
287
288
289static void
293nxt_epoll_test_accept4(nxt_event_engine_t *engine, nxt_event_conn_io_t *io)
290nxt_epoll_test_accept4(nxt_event_engine_t *engine, nxt_conn_io_t *io)
294{
295 static nxt_work_handler_t handler;
296
297 if (handler == NULL) {
298
299 handler = io->accept;
300
301#if (NXT_HAVE_ACCEPT4)
302
303 (void) accept4(-1, NULL, NULL, SOCK_NONBLOCK);
304
305 if (nxt_errno != NXT_ENOSYS) {
291{
292 static nxt_work_handler_t handler;
293
294 if (handler == NULL) {
295
296 handler = io->accept;
297
298#if (NXT_HAVE_ACCEPT4)
299
300 (void) accept4(-1, NULL, NULL, SOCK_NONBLOCK);
301
302 if (nxt_errno != NXT_ENOSYS) {
306 handler = nxt_epoll_event_conn_io_accept4;
303 handler = nxt_epoll_conn_io_accept4;
307
308 } else {
309 nxt_log(&engine->task, NXT_LOG_INFO, "accept4() failed %E",
310 NXT_ENOSYS);
311 }
312
313#endif
314 }

--- 665 unchanged lines hidden (view full) ---

980 }
981 }
982}
983
984
985#if (NXT_HAVE_ACCEPT4)
986
987static void
304
305 } else {
306 nxt_log(&engine->task, NXT_LOG_INFO, "accept4() failed %E",
307 NXT_ENOSYS);
308 }
309
310#endif
311 }

--- 665 unchanged lines hidden (view full) ---

977 }
978 }
979}
980
981
982#if (NXT_HAVE_ACCEPT4)
983
984static void
988nxt_epoll_event_conn_io_accept4(nxt_task_t *task, void *obj, void *data)
985nxt_epoll_conn_io_accept4(nxt_task_t *task, void *obj, void *data)
989{
986{
990 socklen_t len;
991 nxt_socket_t s;
992 struct sockaddr *sa;
993 nxt_event_conn_t *c;
994 nxt_event_conn_listen_t *cls;
987 socklen_t len;
988 nxt_conn_t *c;
989 nxt_socket_t s;
990 struct sockaddr *sa;
991 nxt_listen_event_t *lev;
995
992
996 cls = obj;
997 c = cls->next;
993 lev = obj;
994 c = lev->next;
998
995
999 cls->ready--;
1000 cls->socket.read_ready = (cls->ready != 0);
996 lev->ready--;
997 lev->socket.read_ready = (lev->ready != 0);
1001
1002 len = c->remote->socklen;
1003
1004 if (len >= sizeof(struct sockaddr)) {
1005 sa = &c->remote->u.sockaddr;
1006
1007 } else {
1008 sa = NULL;
1009 len = 0;
1010 }
1011
998
999 len = c->remote->socklen;
1000
1001 if (len >= sizeof(struct sockaddr)) {
1002 sa = &c->remote->u.sockaddr;
1003
1004 } else {
1005 sa = NULL;
1006 len = 0;
1007 }
1008
1012 s = accept4(cls->socket.fd, sa, &len, SOCK_NONBLOCK);
1009 s = accept4(lev->socket.fd, sa, &len, SOCK_NONBLOCK);
1013
1014 if (s != -1) {
1015 c->socket.fd = s;
1016
1010
1011 if (s != -1) {
1012 c->socket.fd = s;
1013
1017 nxt_debug(task, "accept4(%d): %d", cls->socket.fd, s);
1014 nxt_debug(task, "accept4(%d): %d", lev->socket.fd, s);
1018
1015
1019 nxt_event_conn_accept(task, cls, c);
1016 nxt_conn_accept(task, lev, c);
1020 return;
1021 }
1022
1017 return;
1018 }
1019
1023 nxt_event_conn_accept_error(task, cls, "accept4", nxt_errno);
1020 nxt_conn_accept_error(task, lev, "accept4", nxt_errno);
1024}
1025
1026#endif
1027
1028
1029#if (NXT_HAVE_EPOLL_EDGE)
1030
1031/*
1032 * nxt_epoll_edge_event_conn_io_connect() eliminates the getsockopt()
1033 * syscall to test pending connect() error. Although this special
1034 * interface can work in both edge-triggered and level-triggered
1035 * modes it is enabled only for the former mode because this mode is
1036 * available in all modern Linux distributions. For the latter mode
1037 * it is required to create additional nxt_epoll_level_event_conn_io
1038 * with single non-generic connect() interface.
1039 */
1040
1041static void
1021}
1022
1023#endif
1024
1025
1026#if (NXT_HAVE_EPOLL_EDGE)
1027
1028/*
1029 * nxt_epoll_edge_event_conn_io_connect() eliminates the getsockopt()
1030 * syscall to test pending connect() error. Although this special
1031 * interface can work in both edge-triggered and level-triggered
1032 * modes it is enabled only for the former mode because this mode is
1033 * available in all modern Linux distributions. For the latter mode
1034 * it is required to create additional nxt_epoll_level_event_conn_io
1035 * with single non-generic connect() interface.
1036 */
1037
1038static void
1042nxt_epoll_edge_event_conn_io_connect(nxt_task_t *task, void *obj, void *data)
1039nxt_epoll_edge_conn_io_connect(nxt_task_t *task, void *obj, void *data)
1043{
1040{
1044 nxt_event_conn_t *c;
1041 nxt_conn_t *c;
1045 nxt_event_engine_t *engine;
1046 nxt_work_handler_t handler;
1047 const nxt_event_conn_state_t *state;
1048
1049 c = obj;
1050
1051 state = c->write_state;
1052
1053 switch (nxt_socket_connect(task, c->socket.fd, c->remote) ){
1054
1055 case NXT_OK:
1056 c->socket.write_ready = 1;
1057 handler = state->ready_handler;
1058 break;
1059
1060 case NXT_AGAIN:
1042 nxt_event_engine_t *engine;
1043 nxt_work_handler_t handler;
1044 const nxt_event_conn_state_t *state;
1045
1046 c = obj;
1047
1048 state = c->write_state;
1049
1050 switch (nxt_socket_connect(task, c->socket.fd, c->remote) ){
1051
1052 case NXT_OK:
1053 c->socket.write_ready = 1;
1054 handler = state->ready_handler;
1055 break;
1056
1057 case NXT_AGAIN:
1061 c->socket.write_handler = nxt_epoll_edge_event_conn_connected;
1062 c->socket.error_handler = nxt_event_conn_connect_error;
1058 c->socket.write_handler = nxt_epoll_edge_conn_connected;
1059 c->socket.error_handler = nxt_conn_connect_error;
1063
1064 engine = task->thread->engine;
1060
1061 engine = task->thread->engine;
1065 nxt_event_conn_timer(engine, c, state, &c->write_timer);
1062 nxt_conn_timer(engine, c, state, &c->write_timer);
1066
1067 nxt_epoll_enable(engine, &c->socket);
1068 c->socket.read = NXT_EVENT_BLOCKED;
1069 return;
1070
1071#if 0
1072 case NXT_AGAIN:
1063
1064 nxt_epoll_enable(engine, &c->socket);
1065 c->socket.read = NXT_EVENT_BLOCKED;
1066 return;
1067
1068#if 0
1069 case NXT_AGAIN:
1073 nxt_event_conn_timer(engine, c, state, &c->write_timer);
1070 nxt_conn_timer(engine, c, state, &c->write_timer);
1074
1075 /* Fall through. */
1076
1077 case NXT_OK:
1078 /*
1079 * Mark both read and write directions as ready and try to perform
1080 * I/O operations before receiving readiness notifications.
1081 * On unconnected socket Linux send() and recv() return EAGAIN

--- 24 unchanged lines hidden (view full) ---

1106 break;
1107 }
1108
1109 nxt_work_queue_add(c->write_work_queue, handler, task, c, data);
1110}
1111
1112
1113static void
1071
1072 /* Fall through. */
1073
1074 case NXT_OK:
1075 /*
1076 * Mark both read and write directions as ready and try to perform
1077 * I/O operations before receiving readiness notifications.
1078 * On unconnected socket Linux send() and recv() return EAGAIN

--- 24 unchanged lines hidden (view full) ---

1103 break;
1104 }
1105
1106 nxt_work_queue_add(c->write_work_queue, handler, task, c, data);
1107}
1108
1109
1110static void
1114nxt_epoll_edge_event_conn_connected(nxt_task_t *task, void *obj, void *data)
1111nxt_epoll_edge_conn_connected(nxt_task_t *task, void *obj, void *data)
1115{
1112{
1116 nxt_event_conn_t *c;
1113 nxt_conn_t *c;
1117
1118 c = obj;
1119
1120 nxt_debug(task, "epoll event conn connected fd:%d", c->socket.fd);
1121
1122 if (!c->socket.epoll_error) {
1123 c->socket.write = NXT_EVENT_BLOCKED;
1124
1125 if (c->write_state->timer_autoreset) {
1126 nxt_timer_disable(task->thread->engine, &c->write_timer);
1127 }
1128
1129 nxt_work_queue_add(c->write_work_queue, c->write_state->ready_handler,
1130 task, c, data);
1131 return;
1132 }
1133
1114
1115 c = obj;
1116
1117 nxt_debug(task, "epoll event conn connected fd:%d", c->socket.fd);
1118
1119 if (!c->socket.epoll_error) {
1120 c->socket.write = NXT_EVENT_BLOCKED;
1121
1122 if (c->write_state->timer_autoreset) {
1123 nxt_timer_disable(task->thread->engine, &c->write_timer);
1124 }
1125
1126 nxt_work_queue_add(c->write_work_queue, c->write_state->ready_handler,
1127 task, c, data);
1128 return;
1129 }
1130
1134 nxt_event_conn_connect_test(task, c, data);
1131 nxt_conn_connect_test(task, c, data);
1135}
1136
1137
1138/*
1132}
1133
1134
1135/*
1139 * nxt_epoll_edge_event_conn_io_recvbuf() is just wrapper around
1140 * standard nxt_event_conn_io_recvbuf() to enforce to read a pending EOF
1136 * nxt_epoll_edge_conn_io_recvbuf() is just wrapper around
1137 * standard nxt_conn_io_recvbuf() to enforce to read a pending EOF
1141 * in edge-triggered mode.
1142 */
1143
1144static ssize_t
1138 * in edge-triggered mode.
1139 */
1140
1141static ssize_t
1145nxt_epoll_edge_event_conn_io_recvbuf(nxt_event_conn_t *c, nxt_buf_t *b)
1142nxt_epoll_edge_conn_io_recvbuf(nxt_conn_t *c, nxt_buf_t *b)
1146{
1147 ssize_t n;
1148
1143{
1144 ssize_t n;
1145
1149 n = nxt_event_conn_io_recvbuf(c, b);
1146 n = nxt_conn_io_recvbuf(c, b);
1150
1151 if (n > 0 && c->socket.epoll_eof) {
1152 c->socket.read_ready = 1;
1153 }
1154
1155 return n;
1156}
1157
1158#endif
1147
1148 if (n > 0 && c->socket.epoll_eof) {
1149 c->socket.read_ready = 1;
1150 }
1151
1152 return n;
1153}
1154
1155#endif