xref: /unit/src/nxt_conn_accept.c (revision 1458:ad693d003313)
1 
2 /*
3  * Copyright (C) Igor Sysoev
4  * Copyright (C) NGINX, Inc.
5  */
6 
7 #include <nxt_main.h>
8 
9 
10 /*
11  * A listen socket handler calls an event facility specific io_accept()
12  * method.  The method accept()s a new connection and then calls
13  * nxt_event_conn_accept() to handle the new connection and to prepare
14  * for a next connection to avoid just dropping next accept()ed socket
15  * if no more connections allowed.  If there are no available connections
16  * an idle connection would be closed.  If there are no idle connections
17  * then new connections will not be accept()ed for 1 second.
18  */
19 
20 
21 static nxt_conn_t *nxt_conn_accept_alloc(nxt_task_t *task,
22     nxt_listen_event_t *lev);
23 static void nxt_conn_listen_handler(nxt_task_t *task, void *obj,
24     void *data);
25 static nxt_conn_t *nxt_conn_accept_next(nxt_task_t *task,
26     nxt_listen_event_t *lev);
27 static void nxt_conn_accept_close_idle(nxt_task_t *task,
28     nxt_listen_event_t *lev);
29 static void nxt_conn_accept_close_idle_handler(nxt_task_t *task, void *obj,
30     void *data);
31 static void nxt_conn_listen_event_error(nxt_task_t *task, void *obj,
32     void *data);
33 static void nxt_conn_listen_timer_handler(nxt_task_t *task, void *obj,
34     void *data);
35 
36 
37 nxt_listen_event_t *
38 nxt_listen_event(nxt_task_t *task, nxt_listen_socket_t *ls)
39 {
40     nxt_listen_event_t  *lev;
41     nxt_event_engine_t  *engine;
42 
43     lev = nxt_zalloc(sizeof(nxt_listen_event_t));
44 
45     if (nxt_fast_path(lev != NULL)) {
46         lev->socket.fd = ls->socket;
47 
48         engine = task->thread->engine;
49         lev->batch = engine->batch;
50         lev->count = 1;
51 
52         lev->socket.read_work_queue = &engine->accept_work_queue;
53         lev->socket.read_handler = nxt_conn_listen_handler;
54         lev->socket.error_handler = nxt_conn_listen_event_error;
55         lev->socket.log = &nxt_main_log;
56 
57         lev->accept = engine->event.io->accept;
58 
59         lev->listen = ls;
60         lev->work_queue = &engine->read_work_queue;
61 
62         lev->timer.work_queue = &engine->fast_work_queue;
63         lev->timer.handler = nxt_conn_listen_timer_handler;
64         lev->timer.log = &nxt_main_log;
65 
66         lev->task.thread = task->thread;
67         lev->task.log = &nxt_main_log;
68         lev->task.ident = nxt_task_next_ident();
69         lev->socket.task = &lev->task;
70         lev->timer.task = &lev->task;
71 
72         if (nxt_conn_accept_alloc(task, lev) != NULL) {
73             nxt_fd_event_enable_accept(engine, &lev->socket);
74 
75             nxt_queue_insert_tail(&engine->listen_connections, &lev->link);
76         }
77 
78         return lev;
79     }
80 
81     return NULL;
82 }
83 
84 
85 static nxt_conn_t *
86 nxt_conn_accept_alloc(nxt_task_t *task, nxt_listen_event_t *lev)
87 {
88     nxt_mp_t            *mp;
89     nxt_conn_t          *c;
90     nxt_event_engine_t  *engine;
91 
92     engine = task->thread->engine;
93 
94     if (engine->connections < engine->max_connections) {
95 
96         mp = nxt_mp_create(1024, 128, 256, 32);
97 
98         if (nxt_fast_path(mp != NULL)) {
99             c = nxt_conn_create(mp, lev->socket.task);
100             if (nxt_slow_path(c == NULL)) {
101                 goto fail;
102             }
103 
104             c->socket.read_work_queue = lev->socket.read_work_queue;
105             c->socket.write_ready = 1;
106 
107             c->remote = nxt_sockaddr_cache_alloc(engine, lev->listen);
108             if (nxt_fast_path(c->remote != NULL)) {
109                 lev->next = c;
110                 return c;
111             }
112         }
113 
114     fail:
115 
116         nxt_mp_destroy(mp);
117     }
118 
119     return NULL;
120 }
121 
122 
123 static void
124 nxt_conn_listen_handler(nxt_task_t *task, void *obj, void *data)
125 {
126     nxt_listen_event_t  *lev;
127 
128     lev = obj;
129     lev->ready = lev->batch;
130 
131     lev->accept(task, lev, data);
132 }
133 
134 
135 void
136 nxt_conn_io_accept(nxt_task_t *task, void *obj, void *data)
137 {
138     socklen_t           socklen;
139     nxt_conn_t          *c;
140     nxt_socket_t        s;
141     struct sockaddr     *sa;
142     nxt_listen_event_t  *lev;
143 
144     lev = obj;
145     c = lev->next;
146 
147     lev->ready--;
148     lev->socket.read_ready = (lev->ready != 0);
149 
150     sa = &c->remote->u.sockaddr;
151     socklen = c->remote->socklen;
152     /*
153      * The returned socklen is ignored here, because sockaddr_in and
154      * sockaddr_in6 socklens are not changed.  As to unspecified sockaddr_un
155      * it is 3 byte length and already prepared, because old BSDs return zero
156      * socklen and do not update the sockaddr_un at all; Linux returns 2 byte
157      * socklen and updates only the sa_family part; other systems copy 3 bytes
158      * and truncate surplus zero part.  Only bound sockaddr_un will be really
159      * truncated here.
160      */
161     s = accept(lev->socket.fd, sa, &socklen);
162 
163     if (s == -1) {
164         nxt_conn_accept_error(task, lev, "accept", nxt_socket_errno);
165         return;
166     }
167 
168     c->socket.fd = s;
169 
170 #if (NXT_LINUX)
171     /*
172      * Linux does not inherit non-blocking mode
173      * from listen socket for accept()ed socket.
174      */
175     if (nxt_slow_path(nxt_socket_nonblocking(task, s) != NXT_OK)) {
176         nxt_socket_close(task, s);
177     }
178 
179 #endif
180 
181     nxt_debug(task, "accept(%d): %d", lev->socket.fd, s);
182 
183     nxt_conn_accept(task, lev, c);
184 }
185 
186 
187 void
188 nxt_conn_accept(nxt_task_t *task, nxt_listen_event_t *lev, nxt_conn_t *c)
189 {
190     nxt_conn_t  *next;
191 
192     nxt_sockaddr_text(c->remote);
193 
194     nxt_debug(task, "client: %*s",
195               (size_t) c->remote->address_length,
196               nxt_sockaddr_address(c->remote));
197 
198     nxt_queue_insert_head(&task->thread->engine->idle_connections, &c->link);
199 
200     c->listen = lev;
201     lev->count++;
202     lev->next = NULL;
203     c->socket.data = NULL;
204 
205     c->read_work_queue = lev->work_queue;
206     c->write_work_queue = lev->work_queue;
207 
208     if (lev->listen->read_after_accept) {
209 
210         //c->socket.read_ready = 1;
211 //        lev->listen->handler(task, c, lev);
212         nxt_work_queue_add(c->read_work_queue, lev->listen->handler,
213                            &c->task, c, lev);
214 
215     } else {
216         nxt_work_queue_add(c->write_work_queue, lev->listen->handler,
217                            &c->task, c, lev);
218     }
219 
220     next = nxt_conn_accept_next(task, lev);
221 
222     if (next != NULL && lev->socket.read_ready) {
223         nxt_work_queue_add(lev->socket.read_work_queue,
224                            lev->accept, task, lev, next);
225     }
226 }
227 
228 
229 static nxt_conn_t *
230 nxt_conn_accept_next(nxt_task_t *task, nxt_listen_event_t *lev)
231 {
232     nxt_conn_t  *c;
233 
234     c = lev->next;
235 
236     if (c == NULL) {
237         c = nxt_conn_accept_alloc(task, lev);
238 
239         if (nxt_slow_path(c == NULL)) {
240             nxt_conn_accept_close_idle(task, lev);
241         }
242     }
243 
244     return c;
245 }
246 
247 
248 static void
249 nxt_conn_accept_close_idle(nxt_task_t *task, nxt_listen_event_t *lev)
250 {
251     nxt_event_engine_t  *engine;
252 
253     engine = task->thread->engine;
254 
255     nxt_work_queue_add(&engine->close_work_queue,
256                        nxt_conn_accept_close_idle_handler, task, NULL, NULL);
257 
258     nxt_timer_add(engine, &lev->timer, 100);
259 
260     nxt_fd_event_disable_read(engine, &lev->socket);
261 
262     nxt_alert(task, "new connections are not accepted within 100ms");
263 }
264 
265 
266 static void
267 nxt_conn_accept_close_idle_handler(nxt_task_t *task, void *obj, void *data)
268 {
269     nxt_uint_t          times;
270     nxt_conn_t          *c;
271     nxt_queue_t         *idle;
272     nxt_queue_link_t    *link, *next;
273     nxt_event_engine_t  *engine;
274 
275     static nxt_log_moderation_t  nxt_idle_close_log_moderation = {
276         NXT_LOG_INFO, 2, "idle connections closed", NXT_LOG_MODERATION
277     };
278 
279     times = 10;
280     engine = task->thread->engine;
281     idle = &engine->idle_connections;
282 
283     for (link = nxt_queue_last(idle);
284          link != nxt_queue_head(idle);
285          link = next)
286     {
287         next = nxt_queue_next(link);
288 
289         c = nxt_queue_link_data(link, nxt_conn_t, link);
290 
291         nxt_debug(c->socket.task, "idle connection: %d rdy:%d",
292                   c->socket.fd, c->socket.read_ready);
293 
294         if (!c->socket.read_ready) {
295             nxt_log_moderate(&nxt_idle_close_log_moderation, NXT_LOG_INFO,
296                              task->log, "no available connections, "
297                              "close idle connection");
298 
299             c->read_state->close_handler(c->socket.task, c, c->socket.data);
300 
301             times--;
302 
303             if (times == 0) {
304                 break;
305             }
306         }
307     }
308 }
309 
310 
311 void
312 nxt_conn_accept_error(nxt_task_t *task, nxt_listen_event_t *lev,
313     const char *accept_syscall, nxt_err_t err)
314 {
315     static nxt_log_moderation_t  nxt_accept_log_moderation = {
316         NXT_LOG_INFO, 2, "accept() failed", NXT_LOG_MODERATION
317     };
318 
319     lev->socket.read_ready = 0;
320 
321     switch (err) {
322 
323     case NXT_EAGAIN:
324         nxt_debug(task, "%s(%d) %E", accept_syscall, lev->socket.fd, err);
325         return;
326 
327     case ECONNABORTED:
328         nxt_log_moderate(&nxt_accept_log_moderation, NXT_LOG_WARN,
329                          task->log, "%s(%d) failed %E",
330                          accept_syscall, lev->socket.fd, err);
331         return;
332 
333     case EMFILE:
334     case ENFILE:
335     case ENOBUFS:
336     case ENOMEM:
337         nxt_alert(task, "%s(%d) failed %E",
338                   accept_syscall, lev->socket.fd, err);
339 
340         nxt_conn_accept_close_idle(task, lev);
341         return;
342 
343     default:
344         nxt_alert(task, "%s(%d) failed %E",
345                   accept_syscall, lev->socket.fd, err);
346         return;
347     }
348 }
349 
350 
351 static void
352 nxt_conn_listen_timer_handler(nxt_task_t *task, void *obj, void *data)
353 {
354     nxt_conn_t          *c;
355     nxt_timer_t         *timer;
356     nxt_listen_event_t  *lev;
357 
358     timer = obj;
359 
360     lev = nxt_timer_data(timer, nxt_listen_event_t, timer);
361 
362     c = nxt_conn_accept_next(task, lev);
363     if (c == NULL) {
364         return;
365     }
366 
367     nxt_fd_event_enable_accept(task->thread->engine, &lev->socket);
368 
369     lev->accept(task, lev, c);
370 }
371 
372 
373 static void
374 nxt_conn_listen_event_error(nxt_task_t *task, void *obj, void *data)
375 {
376     nxt_fd_event_t  *ev;
377 
378     ev = obj;
379 
380     nxt_alert(task, "accept(%d) event error", ev->fd);
381 }
382