xref: /unit/src/nxt_conn_accept.c (revision 1457:af93c866b4f0)
1 
2 /*
3  * Copyright (C) Igor Sysoev
4  * Copyright (C) NGINX, Inc.
5  */
6 
7 #include <nxt_main.h>
8 
9 
10 /*
11  * A listen socket handler calls an event facility specific io_accept()
12  * method.  The method accept()s a new connection and then calls
13  * nxt_event_conn_accept() to handle the new connection and to prepare
14  * for a next connection to avoid just dropping next accept()ed socket
15  * if no more connections allowed.  If there are no available connections
16  * an idle connection would be closed.  If there are no idle connections
17  * then new connections will not be accept()ed for 1 second.
18  */
19 
20 
21 static nxt_conn_t *nxt_conn_accept_alloc(nxt_task_t *task,
22     nxt_listen_event_t *lev);
23 static void nxt_conn_listen_handler(nxt_task_t *task, void *obj,
24     void *data);
25 static nxt_conn_t *nxt_conn_accept_next(nxt_task_t *task,
26     nxt_listen_event_t *lev);
27 static void nxt_conn_accept_close_idle(nxt_task_t *task,
28     nxt_listen_event_t *lev);
29 static void nxt_conn_accept_close_idle_handler(nxt_task_t *task, void *obj,
30     void *data);
31 static void nxt_conn_listen_event_error(nxt_task_t *task, void *obj,
32     void *data);
33 static void nxt_conn_listen_timer_handler(nxt_task_t *task, void *obj,
34     void *data);
35 
36 
37 nxt_listen_event_t *
38 nxt_listen_event(nxt_task_t *task, nxt_listen_socket_t *ls)
39 {
40     nxt_listen_event_t  *lev;
41     nxt_event_engine_t  *engine;
42 
43     lev = nxt_zalloc(sizeof(nxt_listen_event_t));
44 
45     if (nxt_fast_path(lev != NULL)) {
46         lev->socket.fd = ls->socket;
47 
48         engine = task->thread->engine;
49         lev->batch = engine->batch;
50         lev->count = 1;
51 
52         lev->socket.read_work_queue = &engine->accept_work_queue;
53         lev->socket.read_handler = nxt_conn_listen_handler;
54         lev->socket.error_handler = nxt_conn_listen_event_error;
55         lev->socket.log = &nxt_main_log;
56 
57         lev->accept = engine->event.io->accept;
58 
59         lev->listen = ls;
60         lev->work_queue = &engine->read_work_queue;
61 
62         lev->timer.work_queue = &engine->fast_work_queue;
63         lev->timer.handler = nxt_conn_listen_timer_handler;
64         lev->timer.log = &nxt_main_log;
65 
66         lev->task.thread = task->thread;
67         lev->task.log = &nxt_main_log;
68         lev->task.ident = nxt_task_next_ident();
69         lev->socket.task = &lev->task;
70         lev->timer.task = &lev->task;
71 
72         if (nxt_conn_accept_alloc(task, lev) != NULL) {
73             nxt_fd_event_enable_accept(engine, &lev->socket);
74 
75             nxt_queue_insert_tail(&engine->listen_connections, &lev->link);
76         }
77 
78         return lev;
79     }
80 
81     return NULL;
82 }
83 
84 
85 static nxt_conn_t *
86 nxt_conn_accept_alloc(nxt_task_t *task, nxt_listen_event_t *lev)
87 {
88     nxt_mp_t            *mp;
89     nxt_conn_t          *c;
90     nxt_event_engine_t  *engine;
91 
92     engine = task->thread->engine;
93 
94     if (engine->connections < engine->max_connections) {
95 
96         mp = nxt_mp_create(1024, 128, 256, 32);
97 
98         if (nxt_fast_path(mp != NULL)) {
99             c = nxt_conn_create(mp, lev->socket.task);
100             if (nxt_slow_path(c == NULL)) {
101                 goto fail;
102             }
103 
104             lev->next = c;
105             c->socket.read_work_queue = lev->socket.read_work_queue;
106             c->socket.write_ready = 1;
107 
108             c->remote = nxt_sockaddr_cache_alloc(engine, lev->listen);
109             if (nxt_fast_path(c->remote != NULL)) {
110                 return c;
111             }
112         }
113 
114     fail:
115 
116         nxt_mp_destroy(mp);
117     }
118 
119     return NULL;
120 }
121 
122 
123 static void
124 nxt_conn_listen_handler(nxt_task_t *task, void *obj, void *data)
125 {
126     nxt_listen_event_t  *lev;
127 
128     lev = obj;
129     lev->ready = lev->batch;
130 
131     lev->accept(task, lev, data);
132 }
133 
134 
135 void
136 nxt_conn_io_accept(nxt_task_t *task, void *obj, void *data)
137 {
138     socklen_t           socklen;
139     nxt_conn_t          *c;
140     nxt_socket_t        s;
141     struct sockaddr     *sa;
142     nxt_listen_event_t  *lev;
143 
144     lev = obj;
145     c = lev->next;
146 
147     lev->ready--;
148     lev->socket.read_ready = (lev->ready != 0);
149 
150     sa = &c->remote->u.sockaddr;
151     socklen = c->remote->socklen;
152     /*
153      * The returned socklen is ignored here, because sockaddr_in and
154      * sockaddr_in6 socklens are not changed.  As to unspecified sockaddr_un
155      * it is 3 byte length and already prepared, because old BSDs return zero
156      * socklen and do not update the sockaddr_un at all; Linux returns 2 byte
157      * socklen and updates only the sa_family part; other systems copy 3 bytes
158      * and truncate surplus zero part.  Only bound sockaddr_un will be really
159      * truncated here.
160      */
161     s = accept(lev->socket.fd, sa, &socklen);
162 
163     if (s == -1) {
164         nxt_conn_accept_error(task, lev, "accept", nxt_socket_errno);
165         return;
166     }
167 
168     c->socket.fd = s;
169 
170 #if (NXT_LINUX)
171     /*
172      * Linux does not inherit non-blocking mode
173      * from listen socket for accept()ed socket.
174      */
175     if (nxt_slow_path(nxt_socket_nonblocking(task, s) != NXT_OK)) {
176         nxt_socket_close(task, s);
177     }
178 
179 #endif
180 
181     nxt_debug(task, "accept(%d): %d", lev->socket.fd, s);
182 
183     nxt_conn_accept(task, lev, c);
184 }
185 
186 
187 void
188 nxt_conn_accept(nxt_task_t *task, nxt_listen_event_t *lev, nxt_conn_t *c)
189 {
190     nxt_conn_t  *next;
191 
192     nxt_sockaddr_text(c->remote);
193 
194     nxt_debug(task, "client: %*s",
195               (size_t) c->remote->address_length,
196               nxt_sockaddr_address(c->remote));
197 
198     nxt_queue_insert_head(&task->thread->engine->idle_connections, &c->link);
199 
200     c->listen = lev;
201     lev->count++;
202     c->socket.data = NULL;
203 
204     c->read_work_queue = lev->work_queue;
205     c->write_work_queue = lev->work_queue;
206 
207     if (lev->listen->read_after_accept) {
208 
209         //c->socket.read_ready = 1;
210 //        lev->listen->handler(task, c, lev);
211         nxt_work_queue_add(c->read_work_queue, lev->listen->handler,
212                            &c->task, c, lev);
213 
214     } else {
215         nxt_work_queue_add(c->write_work_queue, lev->listen->handler,
216                            &c->task, c, lev);
217     }
218 
219     next = nxt_conn_accept_next(task, lev);
220 
221     if (next != NULL && lev->socket.read_ready) {
222         nxt_work_queue_add(lev->socket.read_work_queue,
223                            lev->accept, task, lev, next);
224     }
225 }
226 
227 
228 static nxt_conn_t *
229 nxt_conn_accept_next(nxt_task_t *task, nxt_listen_event_t *lev)
230 {
231     nxt_conn_t  *c;
232 
233     lev->next = NULL;
234 
235     c = nxt_conn_accept_alloc(task, lev);
236 
237     if (nxt_slow_path(c == NULL)) {
238         nxt_conn_accept_close_idle(task, lev);
239     }
240 
241     return c;
242 }
243 
244 
245 static void
246 nxt_conn_accept_close_idle(nxt_task_t *task, nxt_listen_event_t *lev)
247 {
248     nxt_event_engine_t  *engine;
249 
250     engine = task->thread->engine;
251 
252     nxt_work_queue_add(&engine->close_work_queue,
253                        nxt_conn_accept_close_idle_handler, task, NULL, NULL);
254 
255     nxt_timer_add(engine, &lev->timer, 100);
256 
257     nxt_fd_event_disable_read(engine, &lev->socket);
258 
259     nxt_alert(task, "new connections are not accepted within 100ms");
260 }
261 
262 
263 static void
264 nxt_conn_accept_close_idle_handler(nxt_task_t *task, void *obj, void *data)
265 {
266     nxt_uint_t          times;
267     nxt_conn_t          *c;
268     nxt_queue_t         *idle;
269     nxt_queue_link_t    *link, *next;
270     nxt_event_engine_t  *engine;
271 
272     static nxt_log_moderation_t  nxt_idle_close_log_moderation = {
273         NXT_LOG_INFO, 2, "idle connections closed", NXT_LOG_MODERATION
274     };
275 
276     times = 10;
277     engine = task->thread->engine;
278     idle = &engine->idle_connections;
279 
280     for (link = nxt_queue_last(idle);
281          link != nxt_queue_head(idle);
282          link = next)
283     {
284         next = nxt_queue_next(link);
285 
286         c = nxt_queue_link_data(link, nxt_conn_t, link);
287 
288         nxt_debug(c->socket.task, "idle connection: %d rdy:%d",
289                   c->socket.fd, c->socket.read_ready);
290 
291         if (!c->socket.read_ready) {
292             nxt_log_moderate(&nxt_idle_close_log_moderation, NXT_LOG_INFO,
293                              task->log, "no available connections, "
294                              "close idle connection");
295 
296             c->read_state->close_handler(c->socket.task, c, c->socket.data);
297 
298             times--;
299 
300             if (times == 0) {
301                 break;
302             }
303         }
304     }
305 }
306 
307 
308 void
309 nxt_conn_accept_error(nxt_task_t *task, nxt_listen_event_t *lev,
310     const char *accept_syscall, nxt_err_t err)
311 {
312     static nxt_log_moderation_t  nxt_accept_log_moderation = {
313         NXT_LOG_INFO, 2, "accept() failed", NXT_LOG_MODERATION
314     };
315 
316     lev->socket.read_ready = 0;
317 
318     switch (err) {
319 
320     case NXT_EAGAIN:
321         nxt_debug(task, "%s(%d) %E", accept_syscall, lev->socket.fd, err);
322         return;
323 
324     case ECONNABORTED:
325         nxt_log_moderate(&nxt_accept_log_moderation, NXT_LOG_WARN,
326                          task->log, "%s(%d) failed %E",
327                          accept_syscall, lev->socket.fd, err);
328         return;
329 
330     case EMFILE:
331     case ENFILE:
332     case ENOBUFS:
333     case ENOMEM:
334         nxt_alert(task, "%s(%d) failed %E",
335                   accept_syscall, lev->socket.fd, err);
336 
337         nxt_conn_accept_close_idle(task, lev);
338         return;
339 
340     default:
341         nxt_alert(task, "%s(%d) failed %E",
342                   accept_syscall, lev->socket.fd, err);
343         return;
344     }
345 }
346 
347 
348 static void
349 nxt_conn_listen_timer_handler(nxt_task_t *task, void *obj, void *data)
350 {
351     nxt_conn_t          *c;
352     nxt_timer_t         *timer;
353     nxt_listen_event_t  *lev;
354 
355     timer = obj;
356 
357     lev = nxt_timer_data(timer, nxt_listen_event_t, timer);
358     c = lev->next;
359 
360     if (c == NULL) {
361         c = nxt_conn_accept_next(task, lev);
362 
363         if (c == NULL) {
364             return;
365         }
366     }
367 
368     nxt_fd_event_enable_accept(task->thread->engine, &lev->socket);
369 
370     lev->accept(task, lev, c);
371 }
372 
373 
374 static void
375 nxt_conn_listen_event_error(nxt_task_t *task, void *obj, void *data)
376 {
377     nxt_fd_event_t  *ev;
378 
379     ev = obj;
380 
381     nxt_alert(task, "accept(%d) event error", ev->fd);
382 }
383