nxt_conn_accept.c (337:854a1a440616) nxt_conn_accept.c (338:2c6135a99c27)
1
2/*
3 * Copyright (C) Igor Sysoev
4 * Copyright (C) NGINX, Inc.
5 */
6
7#include <nxt_main.h>
8
9
10/*
11 * A listen socket handler calls an event facility specific io_accept()
12 * method. The method accept()s a new connection and then calls
13 * nxt_event_conn_accept() to handle the new connection and to prepare
14 * for a next connection to avoid just dropping next accept()ed socket
15 * if no more connections allowed. If there are no available connections
16 * an idle connection would be closed. If there are no idle connections
17 * then new connections will not be accept()ed for 1 second.
18 */
19
20
21static nxt_conn_t *nxt_conn_accept_alloc(nxt_task_t *task,
22 nxt_listen_event_t *lev);
23static void nxt_conn_listen_handler(nxt_task_t *task, void *obj,
24 void *data);
25static nxt_conn_t *nxt_conn_accept_next(nxt_task_t *task,
26 nxt_listen_event_t *lev);
27static nxt_int_t nxt_conn_accept_close_idle(nxt_task_t *task,
28 nxt_listen_event_t *lev);
29static void nxt_conn_listen_event_error(nxt_task_t *task, void *obj,
30 void *data);
31static void nxt_conn_listen_timer_handler(nxt_task_t *task, void *obj,
32 void *data);
33
34
35nxt_listen_event_t *
36nxt_listen_event(nxt_task_t *task, nxt_listen_socket_t *ls)
37{
38 nxt_listen_event_t *lev;
39 nxt_event_engine_t *engine;
40
41 lev = nxt_zalloc(sizeof(nxt_listen_event_t));
42
43 if (nxt_fast_path(lev != NULL)) {
44 lev->socket.fd = ls->socket;
45
46 engine = task->thread->engine;
47 lev->batch = engine->batch;
48
49 lev->mem_cache = (uint32_t) -1;
50
51 lev->socket.read_work_queue = &engine->accept_work_queue;
52 lev->socket.read_handler = nxt_conn_listen_handler;
53 lev->socket.error_handler = nxt_conn_listen_event_error;
54 lev->socket.log = &nxt_main_log;
55
56 lev->accept = engine->event.io->accept;
57
58 lev->listen = ls;
59 lev->work_queue = &engine->read_work_queue;
60
61 lev->timer.work_queue = &engine->fast_work_queue;
62 lev->timer.handler = nxt_conn_listen_timer_handler;
63 lev->timer.log = &nxt_main_log;
64
65 lev->task.thread = task->thread;
66 lev->task.log = &nxt_main_log;
67 lev->task.ident = nxt_task_next_ident();
68 lev->socket.task = &lev->task;
69 lev->timer.task = &lev->task;
70
71 if (nxt_conn_accept_alloc(task, lev) != NULL) {
72 nxt_fd_event_enable_accept(engine, &lev->socket);
73
74 nxt_queue_insert_tail(&engine->listen_connections, &lev->link);
75 }
76
77 return lev;
78 }
79
80 return NULL;
81}
82
83
84static nxt_conn_t *
85nxt_conn_accept_alloc(nxt_task_t *task, nxt_listen_event_t *lev)
86{
1
2/*
3 * Copyright (C) Igor Sysoev
4 * Copyright (C) NGINX, Inc.
5 */
6
7#include <nxt_main.h>
8
9
10/*
11 * A listen socket handler calls an event facility specific io_accept()
12 * method. The method accept()s a new connection and then calls
13 * nxt_event_conn_accept() to handle the new connection and to prepare
14 * for a next connection to avoid just dropping next accept()ed socket
15 * if no more connections allowed. If there are no available connections
16 * an idle connection would be closed. If there are no idle connections
17 * then new connections will not be accept()ed for 1 second.
18 */
19
20
21static nxt_conn_t *nxt_conn_accept_alloc(nxt_task_t *task,
22 nxt_listen_event_t *lev);
23static void nxt_conn_listen_handler(nxt_task_t *task, void *obj,
24 void *data);
25static nxt_conn_t *nxt_conn_accept_next(nxt_task_t *task,
26 nxt_listen_event_t *lev);
27static nxt_int_t nxt_conn_accept_close_idle(nxt_task_t *task,
28 nxt_listen_event_t *lev);
29static void nxt_conn_listen_event_error(nxt_task_t *task, void *obj,
30 void *data);
31static void nxt_conn_listen_timer_handler(nxt_task_t *task, void *obj,
32 void *data);
33
34
35nxt_listen_event_t *
36nxt_listen_event(nxt_task_t *task, nxt_listen_socket_t *ls)
37{
38 nxt_listen_event_t *lev;
39 nxt_event_engine_t *engine;
40
41 lev = nxt_zalloc(sizeof(nxt_listen_event_t));
42
43 if (nxt_fast_path(lev != NULL)) {
44 lev->socket.fd = ls->socket;
45
46 engine = task->thread->engine;
47 lev->batch = engine->batch;
48
49 lev->mem_cache = (uint32_t) -1;
50
51 lev->socket.read_work_queue = &engine->accept_work_queue;
52 lev->socket.read_handler = nxt_conn_listen_handler;
53 lev->socket.error_handler = nxt_conn_listen_event_error;
54 lev->socket.log = &nxt_main_log;
55
56 lev->accept = engine->event.io->accept;
57
58 lev->listen = ls;
59 lev->work_queue = &engine->read_work_queue;
60
61 lev->timer.work_queue = &engine->fast_work_queue;
62 lev->timer.handler = nxt_conn_listen_timer_handler;
63 lev->timer.log = &nxt_main_log;
64
65 lev->task.thread = task->thread;
66 lev->task.log = &nxt_main_log;
67 lev->task.ident = nxt_task_next_ident();
68 lev->socket.task = &lev->task;
69 lev->timer.task = &lev->task;
70
71 if (nxt_conn_accept_alloc(task, lev) != NULL) {
72 nxt_fd_event_enable_accept(engine, &lev->socket);
73
74 nxt_queue_insert_tail(&engine->listen_connections, &lev->link);
75 }
76
77 return lev;
78 }
79
80 return NULL;
81}
82
83
84static nxt_conn_t *
85nxt_conn_accept_alloc(nxt_task_t *task, nxt_listen_event_t *lev)
86{
87 nxt_mp_t *mp;
88 nxt_conn_t *c;
89 nxt_event_engine_t *engine;
90 nxt_listen_socket_t *ls;
87 nxt_mp_t *mp;
88 nxt_conn_t *c;
89 nxt_event_engine_t *engine;
91
92 engine = task->thread->engine;
93
94 if (engine->connections < engine->max_connections) {
95
96 mp = nxt_mp_create(1024, 128, 256, 32);
97
98 if (nxt_fast_path(mp != NULL)) {
99 c = nxt_conn_create(mp, lev->socket.task);
100 if (nxt_slow_path(c == NULL)) {
101 goto fail;
102 }
103
104 lev->next = c;
105 c->socket.read_work_queue = lev->socket.read_work_queue;
106 c->socket.write_ready = 1;
107 c->listen = lev;
108
90
91 engine = task->thread->engine;
92
93 if (engine->connections < engine->max_connections) {
94
95 mp = nxt_mp_create(1024, 128, 256, 32);
96
97 if (nxt_fast_path(mp != NULL)) {
98 c = nxt_conn_create(mp, lev->socket.task);
99 if (nxt_slow_path(c == NULL)) {
100 goto fail;
101 }
102
103 lev->next = c;
104 c->socket.read_work_queue = lev->socket.read_work_queue;
105 c->socket.write_ready = 1;
106 c->listen = lev;
107
109 ls = lev->listen;
110
111 c->remote = nxt_sockaddr_cache_alloc(engine, lev);
112 if (nxt_fast_path(c->remote != NULL)) {
113 return c;
114 }
115 }
116
117 fail:
118
119 nxt_mp_destroy(mp);
120 }
121
122 return NULL;
123}
124
125
126static void
127nxt_conn_listen_handler(nxt_task_t *task, void *obj, void *data)
128{
129 nxt_listen_event_t *lev;
130
131 lev = obj;
132 lev->ready = lev->batch;
133
134 lev->accept(task, lev, data);
135}
136
137
138void
139nxt_conn_io_accept(nxt_task_t *task, void *obj, void *data)
140{
141 socklen_t socklen;
142 nxt_conn_t *c;
143 nxt_socket_t s;
144 struct sockaddr *sa;
145 nxt_listen_event_t *lev;
146
147 lev = obj;
148 c = lev->next;
149
150 lev->ready--;
151 lev->socket.read_ready = (lev->ready != 0);
152
153 sa = &c->remote->u.sockaddr;
154 socklen = c->remote->socklen;
155 /*
156 * The returned socklen is ignored here, because sockaddr_in and
157 * sockaddr_in6 socklens are not changed. As to unspecified sockaddr_un
158 * it is 3 byte length and already prepared, because old BSDs return zero
159 * socklen and do not update the sockaddr_un at all; Linux returns 2 byte
160 * socklen and updates only the sa_family part; other systems copy 3 bytes
161 * and truncate surplus zero part. Only bound sockaddr_un will be really
162 * truncated here.
163 */
164 s = accept(lev->socket.fd, sa, &socklen);
165
166 if (s == -1) {
167 nxt_conn_accept_error(task, lev, "accept", nxt_socket_errno);
168 return;
169 }
170
171 c->socket.fd = s;
172
173#if (NXT_LINUX)
174 /*
175 * Linux does not inherit non-blocking mode
176 * from listen socket for accept()ed socket.
177 */
178 if (nxt_slow_path(nxt_socket_nonblocking(task, s) != NXT_OK)) {
179 nxt_socket_close(task, s);
180 }
181
182#endif
183
184 nxt_debug(task, "accept(%d): %d", lev->socket.fd, s);
185
186 nxt_conn_accept(task, lev, c);
187}
188
189
190void
191nxt_conn_accept(nxt_task_t *task, nxt_listen_event_t *lev, nxt_conn_t *c)
192{
193 nxt_conn_t *next;
194
195 nxt_sockaddr_text(c->remote);
196
197 nxt_debug(task, "client: %*s",
198 c->remote->address_length, nxt_sockaddr_address(c->remote));
199
200 nxt_queue_insert_head(&task->thread->engine->idle_connections, &c->link);
201
202 c->read_work_queue = lev->work_queue;
203 c->write_work_queue = lev->work_queue;
204
205 if (lev->listen->read_after_accept) {
206
207 //c->socket.read_ready = 1;
208// lev->listen->handler(task, c, lev->socket.data);
209 nxt_work_queue_add(c->read_work_queue, lev->listen->handler,
210 &c->task, c, lev->socket.data);
211
212 } else {
213 nxt_work_queue_add(c->write_work_queue, lev->listen->handler,
214 &c->task, c, lev->socket.data);
215 }
216
217 next = nxt_conn_accept_next(task, lev);
218
219 if (next != NULL && lev->socket.read_ready) {
220 nxt_work_queue_add(lev->socket.read_work_queue,
221 lev->accept, task, lev, next);
222 }
223}
224
225
226static nxt_conn_t *
227nxt_conn_accept_next(nxt_task_t *task, nxt_listen_event_t *lev)
228{
229 nxt_conn_t *c;
230
231 lev->next = NULL;
232
233 do {
234 c = nxt_conn_accept_alloc(task, lev);
235
236 if (nxt_fast_path(c != NULL)) {
237 return c;
238 }
239
240 } while (nxt_conn_accept_close_idle(task, lev) == NXT_OK);
241
242 nxt_log(task, NXT_LOG_CRIT, "no available connections, "
243 "new connections are not accepted within 1s");
244
245 return NULL;
246}
247
248
249static nxt_int_t
250nxt_conn_accept_close_idle(nxt_task_t *task, nxt_listen_event_t *lev)
251{
252 nxt_conn_t *c;
253 nxt_queue_t *idle;
254 nxt_queue_link_t *link;
255 nxt_event_engine_t *engine;
256
257 static nxt_log_moderation_t nxt_idle_close_log_moderation = {
258 NXT_LOG_INFO, 2, "idle connections closed", NXT_LOG_MODERATION
259 };
260
261 engine = task->thread->engine;
262
263 idle = &engine->idle_connections;
264
265 for (link = nxt_queue_last(idle);
266 link != nxt_queue_head(idle);
267 link = nxt_queue_next(link))
268 {
269 c = nxt_queue_link_data(link, nxt_conn_t, link);
270
271 if (!c->socket.read_ready) {
272 nxt_log_moderate(&nxt_idle_close_log_moderation, NXT_LOG_INFO,
273 task->log, "no available connections, "
274 "close idle connection");
275 nxt_queue_remove(link);
276 nxt_conn_close(engine, c);
277
278 return NXT_OK;
279 }
280 }
281
282 nxt_timer_add(engine, &lev->timer, 1000);
283
284 nxt_fd_event_disable_read(engine, &lev->socket);
285
286 return NXT_DECLINED;
287}
288
289
290void
291nxt_conn_accept_error(nxt_task_t *task, nxt_listen_event_t *lev,
292 const char *accept_syscall, nxt_err_t err)
293{
294 static nxt_log_moderation_t nxt_accept_log_moderation = {
295 NXT_LOG_INFO, 2, "accept() failed", NXT_LOG_MODERATION
296 };
297
298 lev->socket.read_ready = 0;
299
300 switch (err) {
301
302 case NXT_EAGAIN:
303 nxt_debug(task, "%s(%d) %E", accept_syscall, lev->socket.fd, err);
304 return;
305
306 case ECONNABORTED:
307 nxt_log_moderate(&nxt_accept_log_moderation, NXT_LOG_WARN,
308 task->log, "%s(%d) failed %E",
309 accept_syscall, lev->socket.fd, err);
310 return;
311
312 case EMFILE:
313 case ENFILE:
314 case ENOBUFS:
315 case ENOMEM:
316 if (nxt_conn_accept_close_idle(task, lev) != NXT_OK) {
317 nxt_log(task, NXT_LOG_CRIT, "%s(%d) failed %E, "
318 "new connections are not accepted within 1s",
319 accept_syscall, lev->socket.fd, err);
320 }
321
322 return;
323
324 default:
325 nxt_log(task, NXT_LOG_CRIT, "%s(%d) failed %E",
326 accept_syscall, lev->socket.fd, err);
327 return;
328 }
329}
330
331
332static void
333nxt_conn_listen_timer_handler(nxt_task_t *task, void *obj, void *data)
334{
335 nxt_conn_t *c;
336 nxt_timer_t *timer;
337 nxt_listen_event_t *lev;
338
339 timer = obj;
340
341 lev = nxt_timer_data(timer, nxt_listen_event_t, timer);
342 c = lev->next;
343
344 if (c == NULL) {
345 c = nxt_conn_accept_next(task, lev);
346
347 if (c == NULL) {
348 return;
349 }
350 }
351
352 nxt_fd_event_enable_accept(task->thread->engine, &lev->socket);
353
354 lev->accept(task, lev, c);
355}
356
357
358static void
359nxt_conn_listen_event_error(nxt_task_t *task, void *obj, void *data)
360{
361 nxt_fd_event_t *ev;
362
363 ev = obj;
364
365 nxt_log(task, NXT_LOG_CRIT, "accept(%d) event error", ev->fd);
366}
108 c->remote = nxt_sockaddr_cache_alloc(engine, lev);
109 if (nxt_fast_path(c->remote != NULL)) {
110 return c;
111 }
112 }
113
114 fail:
115
116 nxt_mp_destroy(mp);
117 }
118
119 return NULL;
120}
121
122
123static void
124nxt_conn_listen_handler(nxt_task_t *task, void *obj, void *data)
125{
126 nxt_listen_event_t *lev;
127
128 lev = obj;
129 lev->ready = lev->batch;
130
131 lev->accept(task, lev, data);
132}
133
134
135void
136nxt_conn_io_accept(nxt_task_t *task, void *obj, void *data)
137{
138 socklen_t socklen;
139 nxt_conn_t *c;
140 nxt_socket_t s;
141 struct sockaddr *sa;
142 nxt_listen_event_t *lev;
143
144 lev = obj;
145 c = lev->next;
146
147 lev->ready--;
148 lev->socket.read_ready = (lev->ready != 0);
149
150 sa = &c->remote->u.sockaddr;
151 socklen = c->remote->socklen;
152 /*
153 * The returned socklen is ignored here, because sockaddr_in and
154 * sockaddr_in6 socklens are not changed. As to unspecified sockaddr_un
155 * it is 3 byte length and already prepared, because old BSDs return zero
156 * socklen and do not update the sockaddr_un at all; Linux returns 2 byte
157 * socklen and updates only the sa_family part; other systems copy 3 bytes
158 * and truncate surplus zero part. Only bound sockaddr_un will be really
159 * truncated here.
160 */
161 s = accept(lev->socket.fd, sa, &socklen);
162
163 if (s == -1) {
164 nxt_conn_accept_error(task, lev, "accept", nxt_socket_errno);
165 return;
166 }
167
168 c->socket.fd = s;
169
170#if (NXT_LINUX)
171 /*
172 * Linux does not inherit non-blocking mode
173 * from listen socket for accept()ed socket.
174 */
175 if (nxt_slow_path(nxt_socket_nonblocking(task, s) != NXT_OK)) {
176 nxt_socket_close(task, s);
177 }
178
179#endif
180
181 nxt_debug(task, "accept(%d): %d", lev->socket.fd, s);
182
183 nxt_conn_accept(task, lev, c);
184}
185
186
187void
188nxt_conn_accept(nxt_task_t *task, nxt_listen_event_t *lev, nxt_conn_t *c)
189{
190 nxt_conn_t *next;
191
192 nxt_sockaddr_text(c->remote);
193
194 nxt_debug(task, "client: %*s",
195 c->remote->address_length, nxt_sockaddr_address(c->remote));
196
197 nxt_queue_insert_head(&task->thread->engine->idle_connections, &c->link);
198
199 c->read_work_queue = lev->work_queue;
200 c->write_work_queue = lev->work_queue;
201
202 if (lev->listen->read_after_accept) {
203
204 //c->socket.read_ready = 1;
205// lev->listen->handler(task, c, lev->socket.data);
206 nxt_work_queue_add(c->read_work_queue, lev->listen->handler,
207 &c->task, c, lev->socket.data);
208
209 } else {
210 nxt_work_queue_add(c->write_work_queue, lev->listen->handler,
211 &c->task, c, lev->socket.data);
212 }
213
214 next = nxt_conn_accept_next(task, lev);
215
216 if (next != NULL && lev->socket.read_ready) {
217 nxt_work_queue_add(lev->socket.read_work_queue,
218 lev->accept, task, lev, next);
219 }
220}
221
222
223static nxt_conn_t *
224nxt_conn_accept_next(nxt_task_t *task, nxt_listen_event_t *lev)
225{
226 nxt_conn_t *c;
227
228 lev->next = NULL;
229
230 do {
231 c = nxt_conn_accept_alloc(task, lev);
232
233 if (nxt_fast_path(c != NULL)) {
234 return c;
235 }
236
237 } while (nxt_conn_accept_close_idle(task, lev) == NXT_OK);
238
239 nxt_log(task, NXT_LOG_CRIT, "no available connections, "
240 "new connections are not accepted within 1s");
241
242 return NULL;
243}
244
245
246static nxt_int_t
247nxt_conn_accept_close_idle(nxt_task_t *task, nxt_listen_event_t *lev)
248{
249 nxt_conn_t *c;
250 nxt_queue_t *idle;
251 nxt_queue_link_t *link;
252 nxt_event_engine_t *engine;
253
254 static nxt_log_moderation_t nxt_idle_close_log_moderation = {
255 NXT_LOG_INFO, 2, "idle connections closed", NXT_LOG_MODERATION
256 };
257
258 engine = task->thread->engine;
259
260 idle = &engine->idle_connections;
261
262 for (link = nxt_queue_last(idle);
263 link != nxt_queue_head(idle);
264 link = nxt_queue_next(link))
265 {
266 c = nxt_queue_link_data(link, nxt_conn_t, link);
267
268 if (!c->socket.read_ready) {
269 nxt_log_moderate(&nxt_idle_close_log_moderation, NXT_LOG_INFO,
270 task->log, "no available connections, "
271 "close idle connection");
272 nxt_queue_remove(link);
273 nxt_conn_close(engine, c);
274
275 return NXT_OK;
276 }
277 }
278
279 nxt_timer_add(engine, &lev->timer, 1000);
280
281 nxt_fd_event_disable_read(engine, &lev->socket);
282
283 return NXT_DECLINED;
284}
285
286
287void
288nxt_conn_accept_error(nxt_task_t *task, nxt_listen_event_t *lev,
289 const char *accept_syscall, nxt_err_t err)
290{
291 static nxt_log_moderation_t nxt_accept_log_moderation = {
292 NXT_LOG_INFO, 2, "accept() failed", NXT_LOG_MODERATION
293 };
294
295 lev->socket.read_ready = 0;
296
297 switch (err) {
298
299 case NXT_EAGAIN:
300 nxt_debug(task, "%s(%d) %E", accept_syscall, lev->socket.fd, err);
301 return;
302
303 case ECONNABORTED:
304 nxt_log_moderate(&nxt_accept_log_moderation, NXT_LOG_WARN,
305 task->log, "%s(%d) failed %E",
306 accept_syscall, lev->socket.fd, err);
307 return;
308
309 case EMFILE:
310 case ENFILE:
311 case ENOBUFS:
312 case ENOMEM:
313 if (nxt_conn_accept_close_idle(task, lev) != NXT_OK) {
314 nxt_log(task, NXT_LOG_CRIT, "%s(%d) failed %E, "
315 "new connections are not accepted within 1s",
316 accept_syscall, lev->socket.fd, err);
317 }
318
319 return;
320
321 default:
322 nxt_log(task, NXT_LOG_CRIT, "%s(%d) failed %E",
323 accept_syscall, lev->socket.fd, err);
324 return;
325 }
326}
327
328
329static void
330nxt_conn_listen_timer_handler(nxt_task_t *task, void *obj, void *data)
331{
332 nxt_conn_t *c;
333 nxt_timer_t *timer;
334 nxt_listen_event_t *lev;
335
336 timer = obj;
337
338 lev = nxt_timer_data(timer, nxt_listen_event_t, timer);
339 c = lev->next;
340
341 if (c == NULL) {
342 c = nxt_conn_accept_next(task, lev);
343
344 if (c == NULL) {
345 return;
346 }
347 }
348
349 nxt_fd_event_enable_accept(task->thread->engine, &lev->socket);
350
351 lev->accept(task, lev, c);
352}
353
354
355static void
356nxt_conn_listen_event_error(nxt_task_t *task, void *obj, void *data)
357{
358 nxt_fd_event_t *ev;
359
360 ev = obj;
361
362 nxt_log(task, NXT_LOG_CRIT, "accept(%d) event error", ev->fd);
363}