1
2 /*
3 * Copyright (C) Igor Sysoev
4 * Copyright (C) NGINX, Inc.
5 */
6
7 #include <nxt_main.h>
8
9
10 /*
11 * A listen socket handler calls an event facility specific io_accept()
12 * method. The method accept()s a new connection and then calls
13 * nxt_event_conn_accept() to handle the new connection and to prepare
14 * for a next connection to avoid just dropping next accept()ed socket
15 * if no more connections allowed. If there are no available connections
16 * an idle connection would be closed. If there are no idle connections
17 * then new connections will not be accept()ed for 1 second.
18 */
19
20
21 static nxt_conn_t *nxt_conn_accept_alloc(nxt_task_t *task,
22 nxt_listen_event_t *lev);
23 static void nxt_conn_listen_handler(nxt_task_t *task, void *obj,
24 void *data);
25 static nxt_conn_t *nxt_conn_accept_next(nxt_task_t *task,
26 nxt_listen_event_t *lev);
27 static void nxt_conn_accept_close_idle(nxt_task_t *task,
28 nxt_listen_event_t *lev);
29 static void nxt_conn_accept_close_idle_handler(nxt_task_t *task, void *obj,
30 void *data);
31 static void nxt_conn_listen_event_error(nxt_task_t *task, void *obj,
32 void *data);
33 static void nxt_conn_listen_timer_handler(nxt_task_t *task, void *obj,
34 void *data);
35
36
37 nxt_listen_event_t *
nxt_listen_event(nxt_task_t * task,nxt_listen_socket_t * ls)38 nxt_listen_event(nxt_task_t *task, nxt_listen_socket_t *ls)
39 {
40 nxt_listen_event_t *lev;
41 nxt_event_engine_t *engine;
42
43 lev = nxt_zalloc(sizeof(nxt_listen_event_t));
44
45 if (nxt_fast_path(lev != NULL)) {
46 lev->socket.fd = ls->socket;
47
48 engine = task->thread->engine;
49 lev->batch = engine->batch;
50 lev->count = 1;
51
52 lev->socket.read_work_queue = &engine->accept_work_queue;
53 lev->socket.read_handler = nxt_conn_listen_handler;
54 lev->socket.error_handler = nxt_conn_listen_event_error;
55 lev->socket.log = &nxt_main_log;
56
57 lev->accept = engine->event.io->accept;
58
59 lev->listen = ls;
60 lev->work_queue = &engine->read_work_queue;
61
62 lev->timer.work_queue = &engine->fast_work_queue;
63 lev->timer.handler = nxt_conn_listen_timer_handler;
64 lev->timer.log = &nxt_main_log;
65
66 lev->task.thread = task->thread;
67 lev->task.log = &nxt_main_log;
68 lev->task.ident = nxt_task_next_ident();
69 lev->socket.task = &lev->task;
70 lev->timer.task = &lev->task;
71
72 if (nxt_conn_accept_alloc(task, lev) != NULL) {
73 nxt_fd_event_enable_accept(engine, &lev->socket);
74
75 nxt_queue_insert_tail(&engine->listen_connections, &lev->link);
76 }
77
78 return lev;
79 }
80
81 return NULL;
82 }
83
84
85 static nxt_conn_t *
nxt_conn_accept_alloc(nxt_task_t * task,nxt_listen_event_t * lev)86 nxt_conn_accept_alloc(nxt_task_t *task, nxt_listen_event_t *lev)
87 {
88 nxt_mp_t *mp;
89 nxt_conn_t *c;
90 nxt_event_engine_t *engine;
91
92 engine = task->thread->engine;
93
94 if (engine->connections < engine->max_connections) {
95
96 mp = nxt_mp_create(1024, 128, 256, 32);
97
98 if (nxt_fast_path(mp != NULL)) {
99 c = nxt_conn_create(mp, lev->socket.task);
100 if (nxt_slow_path(c == NULL)) {
101 nxt_mp_destroy(mp);
102
103 return NULL;
104 }
105
106 c->socket.read_work_queue = lev->socket.read_work_queue;
107 c->socket.write_ready = 1;
108
109 c->remote = nxt_sockaddr_cache_alloc(engine, lev->listen);
110 if (nxt_fast_path(c->remote != NULL)) {
111 lev->next = c;
112 return c;
113 }
114
115 nxt_conn_free(task, c);
116 }
117 }
118
119 return NULL;
120 }
121
122
123 static void
nxt_conn_listen_handler(nxt_task_t * task,void * obj,void * data)124 nxt_conn_listen_handler(nxt_task_t *task, void *obj, void *data)
125 {
126 nxt_listen_event_t *lev;
127
128 lev = obj;
129 lev->ready = lev->batch;
130
131 lev->accept(task, lev, data);
132 }
133
134
135 void
nxt_conn_io_accept(nxt_task_t * task,void * obj,void * data)136 nxt_conn_io_accept(nxt_task_t *task, void *obj, void *data)
137 {
138 socklen_t socklen;
139 nxt_conn_t *c;
140 nxt_socket_t s;
141 struct sockaddr *sa;
142 nxt_listen_event_t *lev;
143
144 lev = obj;
145 c = lev->next;
146
147 lev->ready--;
148 lev->socket.read_ready = (lev->ready != 0);
149
150 sa = &c->remote->u.sockaddr;
151 socklen = c->remote->socklen;
152 /*
153 * The returned socklen is ignored here, because sockaddr_in and
154 * sockaddr_in6 socklens are not changed. As to unspecified sockaddr_un
155 * it is 3 byte length and already prepared, because old BSDs return zero
156 * socklen and do not update the sockaddr_un at all; Linux returns 2 byte
157 * socklen and updates only the sa_family part; other systems copy 3 bytes
158 * and truncate surplus zero part. Only bound sockaddr_un will be really
159 * truncated here.
160 */
161 s = accept(lev->socket.fd, sa, &socklen);
162
163 if (s == -1) {
164 nxt_conn_accept_error(task, lev, "accept", nxt_socket_errno);
165 return;
166 }
167
168 c->socket.fd = s;
169
170 #if (NXT_LINUX)
171 /*
172 * Linux does not inherit non-blocking mode
173 * from listen socket for accept()ed socket.
174 */
175 if (nxt_slow_path(nxt_socket_nonblocking(task, s) != NXT_OK)) {
176 nxt_socket_close(task, s);
177 }
178
179 #endif
180
181 nxt_debug(task, "accept(%d): %d", lev->socket.fd, s);
182
183 nxt_conn_accept(task, lev, c);
184 }
185
186
187 void
nxt_conn_accept(nxt_task_t * task,nxt_listen_event_t * lev,nxt_conn_t * c)188 nxt_conn_accept(nxt_task_t *task, nxt_listen_event_t *lev, nxt_conn_t *c)
189 {
190 nxt_conn_t *next;
191 nxt_event_engine_t *engine;
192
193 nxt_sockaddr_text(c->remote);
194
195 nxt_debug(task, "client: %*s",
196 (size_t) c->remote->address_length,
197 nxt_sockaddr_address(c->remote));
198
199 engine = task->thread->engine;
200
201 engine->accepted_conns_cnt++;
202
203 nxt_conn_idle(engine, c);
204
205 c->listen = lev;
206 lev->count++;
207 lev->next = NULL;
208 c->socket.data = NULL;
209
210 c->read_work_queue = lev->work_queue;
211 c->write_work_queue = lev->work_queue;
212
213 if (lev->listen->read_after_accept) {
214
215 //c->socket.read_ready = 1;
216 // lev->listen->handler(task, c, lev);
217 nxt_work_queue_add(c->read_work_queue, lev->listen->handler,
218 &c->task, c, lev);
219
220 } else {
221 nxt_work_queue_add(c->write_work_queue, lev->listen->handler,
222 &c->task, c, lev);
223 }
224
225 next = nxt_conn_accept_next(task, lev);
226
227 if (next != NULL && lev->socket.read_ready) {
228 nxt_work_queue_add(lev->socket.read_work_queue,
229 lev->accept, task, lev, next);
230 }
231 }
232
233
234 static nxt_conn_t *
nxt_conn_accept_next(nxt_task_t * task,nxt_listen_event_t * lev)235 nxt_conn_accept_next(nxt_task_t *task, nxt_listen_event_t *lev)
236 {
237 nxt_conn_t *c;
238
239 c = lev->next;
240
241 if (c == NULL) {
242 c = nxt_conn_accept_alloc(task, lev);
243
244 if (nxt_slow_path(c == NULL)) {
245 nxt_conn_accept_close_idle(task, lev);
246 }
247 }
248
249 return c;
250 }
251
252
253 static void
nxt_conn_accept_close_idle(nxt_task_t * task,nxt_listen_event_t * lev)254 nxt_conn_accept_close_idle(nxt_task_t *task, nxt_listen_event_t *lev)
255 {
256 nxt_event_engine_t *engine;
257
258 engine = task->thread->engine;
259
260 nxt_work_queue_add(&engine->close_work_queue,
261 nxt_conn_accept_close_idle_handler, task, NULL, NULL);
262
263 nxt_timer_add(engine, &lev->timer, 100);
264
265 nxt_fd_event_disable_read(engine, &lev->socket);
266
267 nxt_alert(task, "new connections are not accepted within 100ms");
268 }
269
270
271 static void
nxt_conn_accept_close_idle_handler(nxt_task_t * task,void * obj,void * data)272 nxt_conn_accept_close_idle_handler(nxt_task_t *task, void *obj, void *data)
273 {
274 nxt_uint_t times;
275 nxt_conn_t *c;
276 nxt_queue_t *idle;
277 nxt_queue_link_t *link, *next;
278 nxt_event_engine_t *engine;
279
280 static nxt_log_moderation_t nxt_idle_close_log_moderation = {
281 NXT_LOG_INFO, 2, "idle connections closed", NXT_LOG_MODERATION
282 };
283
284 times = 10;
285 engine = task->thread->engine;
286 idle = &engine->idle_connections;
287
288 for (link = nxt_queue_last(idle);
289 link != nxt_queue_head(idle);
290 link = next)
291 {
292 next = nxt_queue_next(link);
293
294 c = nxt_queue_link_data(link, nxt_conn_t, link);
295
296 nxt_debug(c->socket.task, "idle connection: %d rdy:%d",
297 c->socket.fd, c->socket.read_ready);
298
299 if (!c->socket.read_ready) {
300 nxt_log_moderate(&nxt_idle_close_log_moderation, NXT_LOG_INFO,
301 task->log, "no available connections, "
302 "close idle connection");
303
304 c->read_state->close_handler(c->socket.task, c, c->socket.data);
305
306 times--;
307
308 if (times == 0) {
309 break;
310 }
311 }
312 }
313 }
314
315
316 void
nxt_conn_accept_error(nxt_task_t * task,nxt_listen_event_t * lev,const char * accept_syscall,nxt_err_t err)317 nxt_conn_accept_error(nxt_task_t *task, nxt_listen_event_t *lev,
318 const char *accept_syscall, nxt_err_t err)
319 {
320 static nxt_log_moderation_t nxt_accept_log_moderation = {
321 NXT_LOG_INFO, 2, "accept() failed", NXT_LOG_MODERATION
322 };
323
324 lev->socket.read_ready = 0;
325
326 switch (err) {
327
328 case NXT_EAGAIN:
329 nxt_debug(task, "%s(%d) %E", accept_syscall, lev->socket.fd, err);
330 return;
331
332 case ECONNABORTED:
333 nxt_log_moderate(&nxt_accept_log_moderation, NXT_LOG_WARN,
334 task->log, "%s(%d) failed %E",
335 accept_syscall, lev->socket.fd, err);
336 return;
337
338 case EMFILE:
339 case ENFILE:
340 case ENOBUFS:
341 case ENOMEM:
342 nxt_alert(task, "%s(%d) failed %E",
343 accept_syscall, lev->socket.fd, err);
344
345 nxt_conn_accept_close_idle(task, lev);
346 return;
347
348 default:
349 nxt_alert(task, "%s(%d) failed %E",
350 accept_syscall, lev->socket.fd, err);
351 return;
352 }
353 }
354
355
356 static void
nxt_conn_listen_timer_handler(nxt_task_t * task,void * obj,void * data)357 nxt_conn_listen_timer_handler(nxt_task_t *task, void *obj, void *data)
358 {
359 nxt_conn_t *c;
360 nxt_timer_t *timer;
361 nxt_listen_event_t *lev;
362
363 timer = obj;
364
365 lev = nxt_timer_data(timer, nxt_listen_event_t, timer);
366
367 c = nxt_conn_accept_next(task, lev);
368 if (c == NULL) {
369 return;
370 }
371
372 nxt_fd_event_enable_accept(task->thread->engine, &lev->socket);
373
374 lev->accept(task, lev, c);
375 }
376
377
378 static void
nxt_conn_listen_event_error(nxt_task_t * task,void * obj,void * data)379 nxt_conn_listen_event_error(nxt_task_t *task, void *obj, void *data)
380 {
381 nxt_fd_event_t *ev;
382
383 ev = obj;
384
385 nxt_alert(task, "accept(%d) event error", ev->fd);
386 }
387