Deleted
Added
nxt_epoll_engine.c (57:adf36a8fa62b) | nxt_epoll_engine.c (62:5e1efcc7b740) |
---|---|
1 2/* 3 * Copyright (C) Igor Sysoev 4 * Copyright (C) NGINX, Inc. 5 */ 6 7#include <nxt_main.h> 8 --- 24 unchanged lines hidden (view full) --- 33 34#if (NXT_HAVE_EPOLL_EDGE) 35static nxt_int_t nxt_epoll_edge_create(nxt_event_engine_t *engine, 36 nxt_uint_t mchanges, nxt_uint_t mevents); 37#endif 38static nxt_int_t nxt_epoll_level_create(nxt_event_engine_t *engine, 39 nxt_uint_t mchanges, nxt_uint_t mevents); 40static nxt_int_t nxt_epoll_create(nxt_event_engine_t *engine, | 1 2/* 3 * Copyright (C) Igor Sysoev 4 * Copyright (C) NGINX, Inc. 5 */ 6 7#include <nxt_main.h> 8 --- 24 unchanged lines hidden (view full) --- 33 34#if (NXT_HAVE_EPOLL_EDGE) 35static nxt_int_t nxt_epoll_edge_create(nxt_event_engine_t *engine, 36 nxt_uint_t mchanges, nxt_uint_t mevents); 37#endif 38static nxt_int_t nxt_epoll_level_create(nxt_event_engine_t *engine, 39 nxt_uint_t mchanges, nxt_uint_t mevents); 40static nxt_int_t nxt_epoll_create(nxt_event_engine_t *engine, |
41 nxt_uint_t mchanges, nxt_uint_t mevents, nxt_event_conn_io_t *io, 42 uint32_t mode); | 41 nxt_uint_t mchanges, nxt_uint_t mevents, nxt_conn_io_t *io, uint32_t mode); |
43static void nxt_epoll_test_accept4(nxt_event_engine_t *engine, | 42static void nxt_epoll_test_accept4(nxt_event_engine_t *engine, |
44 nxt_event_conn_io_t *io); | 43 nxt_conn_io_t *io); |
45static void nxt_epoll_free(nxt_event_engine_t *engine); 46static void nxt_epoll_enable(nxt_event_engine_t *engine, nxt_fd_event_t *ev); 47static void nxt_epoll_disable(nxt_event_engine_t *engine, nxt_fd_event_t *ev); 48static void nxt_epoll_delete(nxt_event_engine_t *engine, nxt_fd_event_t *ev); 49static nxt_bool_t nxt_epoll_close(nxt_event_engine_t *engine, 50 nxt_fd_event_t *ev); 51static void nxt_epoll_enable_read(nxt_event_engine_t *engine, 52 nxt_fd_event_t *ev); --- 25 unchanged lines hidden (view full) --- 78static nxt_int_t nxt_epoll_enable_post(nxt_event_engine_t *engine, 79 nxt_work_handler_t handler); 80static void nxt_epoll_eventfd_handler(nxt_task_t *task, void *obj, void *data); 81static void nxt_epoll_signal(nxt_event_engine_t *engine, nxt_uint_t signo); 82#endif 83static void nxt_epoll_poll(nxt_event_engine_t *engine, nxt_msec_t timeout); 84 85#if (NXT_HAVE_ACCEPT4) | 44static void nxt_epoll_free(nxt_event_engine_t *engine); 45static void nxt_epoll_enable(nxt_event_engine_t *engine, nxt_fd_event_t *ev); 46static void nxt_epoll_disable(nxt_event_engine_t *engine, nxt_fd_event_t *ev); 47static void nxt_epoll_delete(nxt_event_engine_t *engine, nxt_fd_event_t *ev); 48static nxt_bool_t nxt_epoll_close(nxt_event_engine_t *engine, 49 nxt_fd_event_t *ev); 50static void nxt_epoll_enable_read(nxt_event_engine_t *engine, 51 nxt_fd_event_t *ev); --- 25 unchanged lines hidden (view full) --- 77static nxt_int_t nxt_epoll_enable_post(nxt_event_engine_t *engine, 78 nxt_work_handler_t handler); 79static void nxt_epoll_eventfd_handler(nxt_task_t *task, void *obj, void *data); 80static void nxt_epoll_signal(nxt_event_engine_t *engine, nxt_uint_t signo); 81#endif 82static void nxt_epoll_poll(nxt_event_engine_t *engine, nxt_msec_t timeout); 83 84#if (NXT_HAVE_ACCEPT4) |
86static void nxt_epoll_event_conn_io_accept4(nxt_task_t *task, void *obj, | 85static void nxt_epoll_conn_io_accept4(nxt_task_t *task, void *obj, |
87 void *data); 88#endif 89 90 91#if (NXT_HAVE_EPOLL_EDGE) 92 | 86 void *data); 87#endif 88 89 90#if (NXT_HAVE_EPOLL_EDGE) 91 |
93static void nxt_epoll_edge_event_conn_io_connect(nxt_task_t *task, void *obj, | 92static void nxt_epoll_edge_conn_io_connect(nxt_task_t *task, void *obj, |
94 void *data); | 93 void *data); |
95static void nxt_epoll_edge_event_conn_connected(nxt_task_t *task, void *obj, | 94static void nxt_epoll_edge_conn_connected(nxt_task_t *task, void *obj, |
96 void *data); | 95 void *data); |
97static ssize_t nxt_epoll_edge_event_conn_io_recvbuf(nxt_event_conn_t *c, 98 nxt_buf_t *b); | 96static ssize_t nxt_epoll_edge_conn_io_recvbuf(nxt_conn_t *c, nxt_buf_t *b); |
99 100 | 97 98 |
101static nxt_event_conn_io_t nxt_epoll_edge_event_conn_io = { 102 nxt_epoll_edge_event_conn_io_connect, 103 nxt_event_conn_io_accept, | 99static nxt_conn_io_t nxt_epoll_edge_conn_io = { 100 nxt_epoll_edge_conn_io_connect, 101 nxt_conn_io_accept, |
104 | 102 |
105 nxt_event_conn_io_read, 106 nxt_epoll_edge_event_conn_io_recvbuf, 107 nxt_event_conn_io_recv, | 103 nxt_conn_io_read, 104 nxt_epoll_edge_conn_io_recvbuf, 105 nxt_conn_io_recv, |
108 109 nxt_conn_io_write, 110 nxt_event_conn_io_write_chunk, 111 112#if (NXT_HAVE_LINUX_SENDFILE) 113 nxt_linux_event_conn_io_sendfile, 114#else 115 nxt_event_conn_io_sendbuf, 116#endif 117 118 nxt_event_conn_io_writev, 119 nxt_event_conn_io_send, 120 | 106 107 nxt_conn_io_write, 108 nxt_event_conn_io_write_chunk, 109 110#if (NXT_HAVE_LINUX_SENDFILE) 111 nxt_linux_event_conn_io_sendfile, 112#else 113 nxt_event_conn_io_sendbuf, 114#endif 115 116 nxt_event_conn_io_writev, 117 nxt_event_conn_io_send, 118 |
121 nxt_event_conn_io_shutdown, | 119 nxt_conn_io_shutdown, |
122}; 123 124 125const nxt_event_interface_t nxt_epoll_edge_engine = { 126 "epoll_edge", 127 nxt_epoll_edge_create, 128 nxt_epoll_free, 129 nxt_epoll_enable, --- 15 unchanged lines hidden (view full) --- 145 nxt_epoll_enable_post, 146 nxt_epoll_signal, 147#else 148 NULL, 149 NULL, 150#endif 151 nxt_epoll_poll, 152 | 120}; 121 122 123const nxt_event_interface_t nxt_epoll_edge_engine = { 124 "epoll_edge", 125 nxt_epoll_edge_create, 126 nxt_epoll_free, 127 nxt_epoll_enable, --- 15 unchanged lines hidden (view full) --- 143 nxt_epoll_enable_post, 144 nxt_epoll_signal, 145#else 146 NULL, 147 NULL, 148#endif 149 nxt_epoll_poll, 150 |
153 &nxt_epoll_edge_event_conn_io, | 151 &nxt_epoll_edge_conn_io, |
154 155#if (NXT_HAVE_INOTIFY) 156 NXT_FILE_EVENTS, 157#else 158 NXT_NO_FILE_EVENTS, 159#endif 160 161#if (NXT_HAVE_SIGNALFD) --- 29 unchanged lines hidden (view full) --- 191 nxt_epoll_enable_post, 192 nxt_epoll_signal, 193#else 194 NULL, 195 NULL, 196#endif 197 nxt_epoll_poll, 198 | 152 153#if (NXT_HAVE_INOTIFY) 154 NXT_FILE_EVENTS, 155#else 156 NXT_NO_FILE_EVENTS, 157#endif 158 159#if (NXT_HAVE_SIGNALFD) --- 29 unchanged lines hidden (view full) --- 189 nxt_epoll_enable_post, 190 nxt_epoll_signal, 191#else 192 NULL, 193 NULL, 194#endif 195 nxt_epoll_poll, 196 |
199 &nxt_unix_event_conn_io, | 197 &nxt_unix_conn_io, |
200 201#if (NXT_HAVE_INOTIFY) 202 NXT_FILE_EVENTS, 203#else 204 NXT_NO_FILE_EVENTS, 205#endif 206 207#if (NXT_HAVE_SIGNALFD) --- 5 unchanged lines hidden (view full) --- 213 214 215#if (NXT_HAVE_EPOLL_EDGE) 216 217static nxt_int_t 218nxt_epoll_edge_create(nxt_event_engine_t *engine, nxt_uint_t mchanges, 219 nxt_uint_t mevents) 220{ | 198 199#if (NXT_HAVE_INOTIFY) 200 NXT_FILE_EVENTS, 201#else 202 NXT_NO_FILE_EVENTS, 203#endif 204 205#if (NXT_HAVE_SIGNALFD) --- 5 unchanged lines hidden (view full) --- 211 212 213#if (NXT_HAVE_EPOLL_EDGE) 214 215static nxt_int_t 216nxt_epoll_edge_create(nxt_event_engine_t *engine, nxt_uint_t mchanges, 217 nxt_uint_t mevents) 218{ |
221 return nxt_epoll_create(engine, mchanges, mevents, 222 &nxt_epoll_edge_event_conn_io, | 219 return nxt_epoll_create(engine, mchanges, mevents, &nxt_epoll_edge_conn_io, |
223 EPOLLET | EPOLLRDHUP); 224} 225 226#endif 227 228 229static nxt_int_t 230nxt_epoll_level_create(nxt_event_engine_t *engine, nxt_uint_t mchanges, 231 nxt_uint_t mevents) 232{ 233 return nxt_epoll_create(engine, mchanges, mevents, | 220 EPOLLET | EPOLLRDHUP); 221} 222 223#endif 224 225 226static nxt_int_t 227nxt_epoll_level_create(nxt_event_engine_t *engine, nxt_uint_t mchanges, 228 nxt_uint_t mevents) 229{ 230 return nxt_epoll_create(engine, mchanges, mevents, |
234 &nxt_unix_event_conn_io, 0); | 231 &nxt_unix_conn_io, 0); |
235} 236 237 238static nxt_int_t 239nxt_epoll_create(nxt_event_engine_t *engine, nxt_uint_t mchanges, | 232} 233 234 235static nxt_int_t 236nxt_epoll_create(nxt_event_engine_t *engine, nxt_uint_t mchanges, |
240 nxt_uint_t mevents, nxt_event_conn_io_t *io, uint32_t mode) | 237 nxt_uint_t mevents, nxt_conn_io_t *io, uint32_t mode) |
241{ 242 engine->u.epoll.fd = -1; 243 engine->u.epoll.mode = mode; 244 engine->u.epoll.mchanges = mchanges; 245 engine->u.epoll.mevents = mevents; 246#if (NXT_HAVE_SIGNALFD) 247 engine->u.epoll.signalfd.fd = -1; 248#endif --- 36 unchanged lines hidden (view full) --- 285 286 nxt_epoll_free(engine); 287 288 return NXT_ERROR; 289} 290 291 292static void | 238{ 239 engine->u.epoll.fd = -1; 240 engine->u.epoll.mode = mode; 241 engine->u.epoll.mchanges = mchanges; 242 engine->u.epoll.mevents = mevents; 243#if (NXT_HAVE_SIGNALFD) 244 engine->u.epoll.signalfd.fd = -1; 245#endif --- 36 unchanged lines hidden (view full) --- 282 283 nxt_epoll_free(engine); 284 285 return NXT_ERROR; 286} 287 288 289static void |
293nxt_epoll_test_accept4(nxt_event_engine_t *engine, nxt_event_conn_io_t *io) | 290nxt_epoll_test_accept4(nxt_event_engine_t *engine, nxt_conn_io_t *io) |
294{ 295 static nxt_work_handler_t handler; 296 297 if (handler == NULL) { 298 299 handler = io->accept; 300 301#if (NXT_HAVE_ACCEPT4) 302 303 (void) accept4(-1, NULL, NULL, SOCK_NONBLOCK); 304 305 if (nxt_errno != NXT_ENOSYS) { | 291{ 292 static nxt_work_handler_t handler; 293 294 if (handler == NULL) { 295 296 handler = io->accept; 297 298#if (NXT_HAVE_ACCEPT4) 299 300 (void) accept4(-1, NULL, NULL, SOCK_NONBLOCK); 301 302 if (nxt_errno != NXT_ENOSYS) { |
306 handler = nxt_epoll_event_conn_io_accept4; | 303 handler = nxt_epoll_conn_io_accept4; |
307 308 } else { 309 nxt_log(&engine->task, NXT_LOG_INFO, "accept4() failed %E", 310 NXT_ENOSYS); 311 } 312 313#endif 314 } --- 665 unchanged lines hidden (view full) --- 980 } 981 } 982} 983 984 985#if (NXT_HAVE_ACCEPT4) 986 987static void | 304 305 } else { 306 nxt_log(&engine->task, NXT_LOG_INFO, "accept4() failed %E", 307 NXT_ENOSYS); 308 } 309 310#endif 311 } --- 665 unchanged lines hidden (view full) --- 977 } 978 } 979} 980 981 982#if (NXT_HAVE_ACCEPT4) 983 984static void |
988nxt_epoll_event_conn_io_accept4(nxt_task_t *task, void *obj, void *data) | 985nxt_epoll_conn_io_accept4(nxt_task_t *task, void *obj, void *data) |
989{ | 986{ |
990 socklen_t len; 991 nxt_socket_t s; 992 struct sockaddr *sa; 993 nxt_event_conn_t *c; 994 nxt_event_conn_listen_t *cls; | 987 socklen_t len; 988 nxt_conn_t *c; 989 nxt_socket_t s; 990 struct sockaddr *sa; 991 nxt_listen_event_t *lev; |
995 | 992 |
996 cls = obj; 997 c = cls->next; | 993 lev = obj; 994 c = lev->next; |
998 | 995 |
999 cls->ready--; 1000 cls->socket.read_ready = (cls->ready != 0); | 996 lev->ready--; 997 lev->socket.read_ready = (lev->ready != 0); |
1001 1002 len = c->remote->socklen; 1003 1004 if (len >= sizeof(struct sockaddr)) { 1005 sa = &c->remote->u.sockaddr; 1006 1007 } else { 1008 sa = NULL; 1009 len = 0; 1010 } 1011 | 998 999 len = c->remote->socklen; 1000 1001 if (len >= sizeof(struct sockaddr)) { 1002 sa = &c->remote->u.sockaddr; 1003 1004 } else { 1005 sa = NULL; 1006 len = 0; 1007 } 1008 |
1012 s = accept4(cls->socket.fd, sa, &len, SOCK_NONBLOCK); | 1009 s = accept4(lev->socket.fd, sa, &len, SOCK_NONBLOCK); |
1013 1014 if (s != -1) { 1015 c->socket.fd = s; 1016 | 1010 1011 if (s != -1) { 1012 c->socket.fd = s; 1013 |
1017 nxt_debug(task, "accept4(%d): %d", cls->socket.fd, s); | 1014 nxt_debug(task, "accept4(%d): %d", lev->socket.fd, s); |
1018 | 1015 |
1019 nxt_event_conn_accept(task, cls, c); | 1016 nxt_conn_accept(task, lev, c); |
1020 return; 1021 } 1022 | 1017 return; 1018 } 1019 |
1023 nxt_event_conn_accept_error(task, cls, "accept4", nxt_errno); | 1020 nxt_conn_accept_error(task, lev, "accept4", nxt_errno); |
1024} 1025 1026#endif 1027 1028 1029#if (NXT_HAVE_EPOLL_EDGE) 1030 1031/* 1032 * nxt_epoll_edge_event_conn_io_connect() eliminates the getsockopt() 1033 * syscall to test pending connect() error. Although this special 1034 * interface can work in both edge-triggered and level-triggered 1035 * modes it is enabled only for the former mode because this mode is 1036 * available in all modern Linux distributions. For the latter mode 1037 * it is required to create additional nxt_epoll_level_event_conn_io 1038 * with single non-generic connect() interface. 1039 */ 1040 1041static void | 1021} 1022 1023#endif 1024 1025 1026#if (NXT_HAVE_EPOLL_EDGE) 1027 1028/* 1029 * nxt_epoll_edge_event_conn_io_connect() eliminates the getsockopt() 1030 * syscall to test pending connect() error. Although this special 1031 * interface can work in both edge-triggered and level-triggered 1032 * modes it is enabled only for the former mode because this mode is 1033 * available in all modern Linux distributions. For the latter mode 1034 * it is required to create additional nxt_epoll_level_event_conn_io 1035 * with single non-generic connect() interface. 1036 */ 1037 1038static void |
1042nxt_epoll_edge_event_conn_io_connect(nxt_task_t *task, void *obj, void *data) | 1039nxt_epoll_edge_conn_io_connect(nxt_task_t *task, void *obj, void *data) |
1043{ | 1040{ |
1044 nxt_event_conn_t *c; | 1041 nxt_conn_t *c; |
1045 nxt_event_engine_t *engine; 1046 nxt_work_handler_t handler; 1047 const nxt_event_conn_state_t *state; 1048 1049 c = obj; 1050 1051 state = c->write_state; 1052 1053 switch (nxt_socket_connect(task, c->socket.fd, c->remote) ){ 1054 1055 case NXT_OK: 1056 c->socket.write_ready = 1; 1057 handler = state->ready_handler; 1058 break; 1059 1060 case NXT_AGAIN: | 1042 nxt_event_engine_t *engine; 1043 nxt_work_handler_t handler; 1044 const nxt_event_conn_state_t *state; 1045 1046 c = obj; 1047 1048 state = c->write_state; 1049 1050 switch (nxt_socket_connect(task, c->socket.fd, c->remote) ){ 1051 1052 case NXT_OK: 1053 c->socket.write_ready = 1; 1054 handler = state->ready_handler; 1055 break; 1056 1057 case NXT_AGAIN: |
1061 c->socket.write_handler = nxt_epoll_edge_event_conn_connected; 1062 c->socket.error_handler = nxt_event_conn_connect_error; | 1058 c->socket.write_handler = nxt_epoll_edge_conn_connected; 1059 c->socket.error_handler = nxt_conn_connect_error; |
1063 1064 engine = task->thread->engine; | 1060 1061 engine = task->thread->engine; |
1065 nxt_event_conn_timer(engine, c, state, &c->write_timer); | 1062 nxt_conn_timer(engine, c, state, &c->write_timer); |
1066 1067 nxt_epoll_enable(engine, &c->socket); 1068 c->socket.read = NXT_EVENT_BLOCKED; 1069 return; 1070 1071#if 0 1072 case NXT_AGAIN: | 1063 1064 nxt_epoll_enable(engine, &c->socket); 1065 c->socket.read = NXT_EVENT_BLOCKED; 1066 return; 1067 1068#if 0 1069 case NXT_AGAIN: |
1073 nxt_event_conn_timer(engine, c, state, &c->write_timer); | 1070 nxt_conn_timer(engine, c, state, &c->write_timer); |
1074 1075 /* Fall through. */ 1076 1077 case NXT_OK: 1078 /* 1079 * Mark both read and write directions as ready and try to perform 1080 * I/O operations before receiving readiness notifications. 1081 * On unconnected socket Linux send() and recv() return EAGAIN --- 24 unchanged lines hidden (view full) --- 1106 break; 1107 } 1108 1109 nxt_work_queue_add(c->write_work_queue, handler, task, c, data); 1110} 1111 1112 1113static void | 1071 1072 /* Fall through. */ 1073 1074 case NXT_OK: 1075 /* 1076 * Mark both read and write directions as ready and try to perform 1077 * I/O operations before receiving readiness notifications. 1078 * On unconnected socket Linux send() and recv() return EAGAIN --- 24 unchanged lines hidden (view full) --- 1103 break; 1104 } 1105 1106 nxt_work_queue_add(c->write_work_queue, handler, task, c, data); 1107} 1108 1109 1110static void |
1114nxt_epoll_edge_event_conn_connected(nxt_task_t *task, void *obj, void *data) | 1111nxt_epoll_edge_conn_connected(nxt_task_t *task, void *obj, void *data) |
1115{ | 1112{ |
1116 nxt_event_conn_t *c; | 1113 nxt_conn_t *c; |
1117 1118 c = obj; 1119 1120 nxt_debug(task, "epoll event conn connected fd:%d", c->socket.fd); 1121 1122 if (!c->socket.epoll_error) { 1123 c->socket.write = NXT_EVENT_BLOCKED; 1124 1125 if (c->write_state->timer_autoreset) { 1126 nxt_timer_disable(task->thread->engine, &c->write_timer); 1127 } 1128 1129 nxt_work_queue_add(c->write_work_queue, c->write_state->ready_handler, 1130 task, c, data); 1131 return; 1132 } 1133 | 1114 1115 c = obj; 1116 1117 nxt_debug(task, "epoll event conn connected fd:%d", c->socket.fd); 1118 1119 if (!c->socket.epoll_error) { 1120 c->socket.write = NXT_EVENT_BLOCKED; 1121 1122 if (c->write_state->timer_autoreset) { 1123 nxt_timer_disable(task->thread->engine, &c->write_timer); 1124 } 1125 1126 nxt_work_queue_add(c->write_work_queue, c->write_state->ready_handler, 1127 task, c, data); 1128 return; 1129 } 1130 |
1134 nxt_event_conn_connect_test(task, c, data); | 1131 nxt_conn_connect_test(task, c, data); |
1135} 1136 1137 1138/* | 1132} 1133 1134 1135/* |
1139 * nxt_epoll_edge_event_conn_io_recvbuf() is just wrapper around 1140 * standard nxt_event_conn_io_recvbuf() to enforce to read a pending EOF | 1136 * nxt_epoll_edge_conn_io_recvbuf() is just wrapper around 1137 * standard nxt_conn_io_recvbuf() to enforce to read a pending EOF |
1141 * in edge-triggered mode. 1142 */ 1143 1144static ssize_t | 1138 * in edge-triggered mode. 1139 */ 1140 1141static ssize_t |
1145nxt_epoll_edge_event_conn_io_recvbuf(nxt_event_conn_t *c, nxt_buf_t *b) | 1142nxt_epoll_edge_conn_io_recvbuf(nxt_conn_t *c, nxt_buf_t *b) |
1146{ 1147 ssize_t n; 1148 | 1143{ 1144 ssize_t n; 1145 |
1149 n = nxt_event_conn_io_recvbuf(c, b); | 1146 n = nxt_conn_io_recvbuf(c, b); |
1150 1151 if (n > 0 && c->socket.epoll_eof) { 1152 c->socket.read_ready = 1; 1153 } 1154 1155 return n; 1156} 1157 1158#endif | 1147 1148 if (n > 0 && c->socket.epoll_eof) { 1149 c->socket.read_ready = 1; 1150 } 1151 1152 return n; 1153} 1154 1155#endif |