xref: /unit/src/nxt_conn_accept.c (revision 2185:2227bdbb3c89)
1 
2 /*
3  * Copyright (C) Igor Sysoev
4  * Copyright (C) NGINX, Inc.
5  */
6 
7 #include <nxt_main.h>
8 
9 
10 /*
11  * A listen socket handler calls an event facility specific io_accept()
12  * method.  The method accept()s a new connection and then calls
13  * nxt_event_conn_accept() to handle the new connection and to prepare
14  * for a next connection to avoid just dropping next accept()ed socket
15  * if no more connections allowed.  If there are no available connections
16  * an idle connection would be closed.  If there are no idle connections
17  * then new connections will not be accept()ed for 1 second.
18  */
19 
20 
21 static nxt_conn_t *nxt_conn_accept_alloc(nxt_task_t *task,
22     nxt_listen_event_t *lev);
23 static void nxt_conn_listen_handler(nxt_task_t *task, void *obj,
24     void *data);
25 static nxt_conn_t *nxt_conn_accept_next(nxt_task_t *task,
26     nxt_listen_event_t *lev);
27 static void nxt_conn_accept_close_idle(nxt_task_t *task,
28     nxt_listen_event_t *lev);
29 static void nxt_conn_accept_close_idle_handler(nxt_task_t *task, void *obj,
30     void *data);
31 static void nxt_conn_listen_event_error(nxt_task_t *task, void *obj,
32     void *data);
33 static void nxt_conn_listen_timer_handler(nxt_task_t *task, void *obj,
34     void *data);
35 
36 
37 nxt_listen_event_t *
nxt_listen_event(nxt_task_t * task,nxt_listen_socket_t * ls)38 nxt_listen_event(nxt_task_t *task, nxt_listen_socket_t *ls)
39 {
40     nxt_listen_event_t  *lev;
41     nxt_event_engine_t  *engine;
42 
43     lev = nxt_zalloc(sizeof(nxt_listen_event_t));
44 
45     if (nxt_fast_path(lev != NULL)) {
46         lev->socket.fd = ls->socket;
47 
48         engine = task->thread->engine;
49         lev->batch = engine->batch;
50         lev->count = 1;
51 
52         lev->socket.read_work_queue = &engine->accept_work_queue;
53         lev->socket.read_handler = nxt_conn_listen_handler;
54         lev->socket.error_handler = nxt_conn_listen_event_error;
55         lev->socket.log = &nxt_main_log;
56 
57         lev->accept = engine->event.io->accept;
58 
59         lev->listen = ls;
60         lev->work_queue = &engine->read_work_queue;
61 
62         lev->timer.work_queue = &engine->fast_work_queue;
63         lev->timer.handler = nxt_conn_listen_timer_handler;
64         lev->timer.log = &nxt_main_log;
65 
66         lev->task.thread = task->thread;
67         lev->task.log = &nxt_main_log;
68         lev->task.ident = nxt_task_next_ident();
69         lev->socket.task = &lev->task;
70         lev->timer.task = &lev->task;
71 
72         if (nxt_conn_accept_alloc(task, lev) != NULL) {
73             nxt_fd_event_enable_accept(engine, &lev->socket);
74 
75             nxt_queue_insert_tail(&engine->listen_connections, &lev->link);
76         }
77 
78         return lev;
79     }
80 
81     return NULL;
82 }
83 
84 
85 static nxt_conn_t *
nxt_conn_accept_alloc(nxt_task_t * task,nxt_listen_event_t * lev)86 nxt_conn_accept_alloc(nxt_task_t *task, nxt_listen_event_t *lev)
87 {
88     nxt_mp_t            *mp;
89     nxt_conn_t          *c;
90     nxt_event_engine_t  *engine;
91 
92     engine = task->thread->engine;
93 
94     if (engine->connections < engine->max_connections) {
95 
96         mp = nxt_mp_create(1024, 128, 256, 32);
97 
98         if (nxt_fast_path(mp != NULL)) {
99             c = nxt_conn_create(mp, lev->socket.task);
100             if (nxt_slow_path(c == NULL)) {
101                 nxt_mp_destroy(mp);
102 
103                 return NULL;
104             }
105 
106             c->socket.read_work_queue = lev->socket.read_work_queue;
107             c->socket.write_ready = 1;
108 
109             c->remote = nxt_sockaddr_cache_alloc(engine, lev->listen);
110             if (nxt_fast_path(c->remote != NULL)) {
111                 lev->next = c;
112                 return c;
113             }
114 
115             nxt_conn_free(task, c);
116         }
117     }
118 
119     return NULL;
120 }
121 
122 
123 static void
nxt_conn_listen_handler(nxt_task_t * task,void * obj,void * data)124 nxt_conn_listen_handler(nxt_task_t *task, void *obj, void *data)
125 {
126     nxt_listen_event_t  *lev;
127 
128     lev = obj;
129     lev->ready = lev->batch;
130 
131     lev->accept(task, lev, data);
132 }
133 
134 
135 void
nxt_conn_io_accept(nxt_task_t * task,void * obj,void * data)136 nxt_conn_io_accept(nxt_task_t *task, void *obj, void *data)
137 {
138     socklen_t           socklen;
139     nxt_conn_t          *c;
140     nxt_socket_t        s;
141     struct sockaddr     *sa;
142     nxt_listen_event_t  *lev;
143 
144     lev = obj;
145     c = lev->next;
146 
147     lev->ready--;
148     lev->socket.read_ready = (lev->ready != 0);
149 
150     sa = &c->remote->u.sockaddr;
151     socklen = c->remote->socklen;
152     /*
153      * The returned socklen is ignored here, because sockaddr_in and
154      * sockaddr_in6 socklens are not changed.  As to unspecified sockaddr_un
155      * it is 3 byte length and already prepared, because old BSDs return zero
156      * socklen and do not update the sockaddr_un at all; Linux returns 2 byte
157      * socklen and updates only the sa_family part; other systems copy 3 bytes
158      * and truncate surplus zero part.  Only bound sockaddr_un will be really
159      * truncated here.
160      */
161     s = accept(lev->socket.fd, sa, &socklen);
162 
163     if (s == -1) {
164         nxt_conn_accept_error(task, lev, "accept", nxt_socket_errno);
165         return;
166     }
167 
168     c->socket.fd = s;
169 
170 #if (NXT_LINUX)
171     /*
172      * Linux does not inherit non-blocking mode
173      * from listen socket for accept()ed socket.
174      */
175     if (nxt_slow_path(nxt_socket_nonblocking(task, s) != NXT_OK)) {
176         nxt_socket_close(task, s);
177     }
178 
179 #endif
180 
181     nxt_debug(task, "accept(%d): %d", lev->socket.fd, s);
182 
183     nxt_conn_accept(task, lev, c);
184 }
185 
186 
187 void
nxt_conn_accept(nxt_task_t * task,nxt_listen_event_t * lev,nxt_conn_t * c)188 nxt_conn_accept(nxt_task_t *task, nxt_listen_event_t *lev, nxt_conn_t *c)
189 {
190     nxt_conn_t          *next;
191     nxt_event_engine_t  *engine;
192 
193     nxt_sockaddr_text(c->remote);
194 
195     nxt_debug(task, "client: %*s",
196               (size_t) c->remote->address_length,
197               nxt_sockaddr_address(c->remote));
198 
199     engine = task->thread->engine;
200 
201     engine->accepted_conns_cnt++;
202 
203     nxt_conn_idle(engine, c);
204 
205     c->listen = lev;
206     lev->count++;
207     lev->next = NULL;
208     c->socket.data = NULL;
209 
210     c->read_work_queue = lev->work_queue;
211     c->write_work_queue = lev->work_queue;
212 
213     if (lev->listen->read_after_accept) {
214 
215         //c->socket.read_ready = 1;
216 //        lev->listen->handler(task, c, lev);
217         nxt_work_queue_add(c->read_work_queue, lev->listen->handler,
218                            &c->task, c, lev);
219 
220     } else {
221         nxt_work_queue_add(c->write_work_queue, lev->listen->handler,
222                            &c->task, c, lev);
223     }
224 
225     next = nxt_conn_accept_next(task, lev);
226 
227     if (next != NULL && lev->socket.read_ready) {
228         nxt_work_queue_add(lev->socket.read_work_queue,
229                            lev->accept, task, lev, next);
230     }
231 }
232 
233 
234 static nxt_conn_t *
nxt_conn_accept_next(nxt_task_t * task,nxt_listen_event_t * lev)235 nxt_conn_accept_next(nxt_task_t *task, nxt_listen_event_t *lev)
236 {
237     nxt_conn_t  *c;
238 
239     c = lev->next;
240 
241     if (c == NULL) {
242         c = nxt_conn_accept_alloc(task, lev);
243 
244         if (nxt_slow_path(c == NULL)) {
245             nxt_conn_accept_close_idle(task, lev);
246         }
247     }
248 
249     return c;
250 }
251 
252 
253 static void
nxt_conn_accept_close_idle(nxt_task_t * task,nxt_listen_event_t * lev)254 nxt_conn_accept_close_idle(nxt_task_t *task, nxt_listen_event_t *lev)
255 {
256     nxt_event_engine_t  *engine;
257 
258     engine = task->thread->engine;
259 
260     nxt_work_queue_add(&engine->close_work_queue,
261                        nxt_conn_accept_close_idle_handler, task, NULL, NULL);
262 
263     nxt_timer_add(engine, &lev->timer, 100);
264 
265     nxt_fd_event_disable_read(engine, &lev->socket);
266 
267     nxt_alert(task, "new connections are not accepted within 100ms");
268 }
269 
270 
271 static void
nxt_conn_accept_close_idle_handler(nxt_task_t * task,void * obj,void * data)272 nxt_conn_accept_close_idle_handler(nxt_task_t *task, void *obj, void *data)
273 {
274     nxt_uint_t          times;
275     nxt_conn_t          *c;
276     nxt_queue_t         *idle;
277     nxt_queue_link_t    *link, *next;
278     nxt_event_engine_t  *engine;
279 
280     static nxt_log_moderation_t  nxt_idle_close_log_moderation = {
281         NXT_LOG_INFO, 2, "idle connections closed", NXT_LOG_MODERATION
282     };
283 
284     times = 10;
285     engine = task->thread->engine;
286     idle = &engine->idle_connections;
287 
288     for (link = nxt_queue_last(idle);
289          link != nxt_queue_head(idle);
290          link = next)
291     {
292         next = nxt_queue_next(link);
293 
294         c = nxt_queue_link_data(link, nxt_conn_t, link);
295 
296         nxt_debug(c->socket.task, "idle connection: %d rdy:%d",
297                   c->socket.fd, c->socket.read_ready);
298 
299         if (!c->socket.read_ready) {
300             nxt_log_moderate(&nxt_idle_close_log_moderation, NXT_LOG_INFO,
301                              task->log, "no available connections, "
302                              "close idle connection");
303 
304             c->read_state->close_handler(c->socket.task, c, c->socket.data);
305 
306             times--;
307 
308             if (times == 0) {
309                 break;
310             }
311         }
312     }
313 }
314 
315 
316 void
nxt_conn_accept_error(nxt_task_t * task,nxt_listen_event_t * lev,const char * accept_syscall,nxt_err_t err)317 nxt_conn_accept_error(nxt_task_t *task, nxt_listen_event_t *lev,
318     const char *accept_syscall, nxt_err_t err)
319 {
320     static nxt_log_moderation_t  nxt_accept_log_moderation = {
321         NXT_LOG_INFO, 2, "accept() failed", NXT_LOG_MODERATION
322     };
323 
324     lev->socket.read_ready = 0;
325 
326     switch (err) {
327 
328     case NXT_EAGAIN:
329         nxt_debug(task, "%s(%d) %E", accept_syscall, lev->socket.fd, err);
330         return;
331 
332     case ECONNABORTED:
333         nxt_log_moderate(&nxt_accept_log_moderation, NXT_LOG_WARN,
334                          task->log, "%s(%d) failed %E",
335                          accept_syscall, lev->socket.fd, err);
336         return;
337 
338     case EMFILE:
339     case ENFILE:
340     case ENOBUFS:
341     case ENOMEM:
342         nxt_alert(task, "%s(%d) failed %E",
343                   accept_syscall, lev->socket.fd, err);
344 
345         nxt_conn_accept_close_idle(task, lev);
346         return;
347 
348     default:
349         nxt_alert(task, "%s(%d) failed %E",
350                   accept_syscall, lev->socket.fd, err);
351         return;
352     }
353 }
354 
355 
356 static void
nxt_conn_listen_timer_handler(nxt_task_t * task,void * obj,void * data)357 nxt_conn_listen_timer_handler(nxt_task_t *task, void *obj, void *data)
358 {
359     nxt_conn_t          *c;
360     nxt_timer_t         *timer;
361     nxt_listen_event_t  *lev;
362 
363     timer = obj;
364 
365     lev = nxt_timer_data(timer, nxt_listen_event_t, timer);
366 
367     c = nxt_conn_accept_next(task, lev);
368     if (c == NULL) {
369         return;
370     }
371 
372     nxt_fd_event_enable_accept(task->thread->engine, &lev->socket);
373 
374     lev->accept(task, lev, c);
375 }
376 
377 
378 static void
nxt_conn_listen_event_error(nxt_task_t * task,void * obj,void * data)379 nxt_conn_listen_event_error(nxt_task_t *task, void *obj, void *data)
380 {
381     nxt_fd_event_t  *ev;
382 
383     ev = obj;
384 
385     nxt_alert(task, "accept(%d) event error", ev->fd);
386 }
387