xref: /unit/src/nxt_conn_accept.c (revision 359:d4848619451a)
1 
2 /*
3  * Copyright (C) Igor Sysoev
4  * Copyright (C) NGINX, Inc.
5  */
6 
7 #include <nxt_main.h>
8 
9 
10 /*
11  * A listen socket handler calls an event facility specific io_accept()
12  * method.  The method accept()s a new connection and then calls
13  * nxt_event_conn_accept() to handle the new connection and to prepare
14  * for a next connection to avoid just dropping next accept()ed socket
15  * if no more connections allowed.  If there are no available connections
16  * an idle connection would be closed.  If there are no idle connections
17  * then new connections will not be accept()ed for 1 second.
18  */
19 
20 
21 static nxt_conn_t *nxt_conn_accept_alloc(nxt_task_t *task,
22     nxt_listen_event_t *lev);
23 static void nxt_conn_listen_handler(nxt_task_t *task, void *obj,
24     void *data);
25 static nxt_conn_t *nxt_conn_accept_next(nxt_task_t *task,
26     nxt_listen_event_t *lev);
27 static nxt_int_t nxt_conn_accept_close_idle(nxt_task_t *task,
28     nxt_listen_event_t *lev);
29 static void nxt_conn_listen_event_error(nxt_task_t *task, void *obj,
30     void *data);
31 static void nxt_conn_listen_timer_handler(nxt_task_t *task, void *obj,
32     void *data);
33 
34 
35 nxt_listen_event_t *
36 nxt_listen_event(nxt_task_t *task, nxt_listen_socket_t *ls)
37 {
38     nxt_listen_event_t  *lev;
39     nxt_event_engine_t  *engine;
40 
41     lev = nxt_zalloc(sizeof(nxt_listen_event_t));
42 
43     if (nxt_fast_path(lev != NULL)) {
44         lev->socket.fd = ls->socket;
45 
46         engine = task->thread->engine;
47         lev->batch = engine->batch;
48 
49         lev->socket.read_work_queue = &engine->accept_work_queue;
50         lev->socket.read_handler = nxt_conn_listen_handler;
51         lev->socket.error_handler = nxt_conn_listen_event_error;
52         lev->socket.log = &nxt_main_log;
53 
54         lev->accept = engine->event.io->accept;
55 
56         lev->listen = ls;
57         lev->work_queue = &engine->read_work_queue;
58 
59         lev->timer.work_queue = &engine->fast_work_queue;
60         lev->timer.handler = nxt_conn_listen_timer_handler;
61         lev->timer.log = &nxt_main_log;
62 
63         lev->task.thread = task->thread;
64         lev->task.log = &nxt_main_log;
65         lev->task.ident = nxt_task_next_ident();
66         lev->socket.task = &lev->task;
67         lev->timer.task = &lev->task;
68 
69         if (nxt_conn_accept_alloc(task, lev) != NULL) {
70             nxt_fd_event_enable_accept(engine, &lev->socket);
71 
72             nxt_queue_insert_tail(&engine->listen_connections, &lev->link);
73         }
74 
75         return lev;
76     }
77 
78     return NULL;
79 }
80 
81 
82 static nxt_conn_t *
83 nxt_conn_accept_alloc(nxt_task_t *task, nxt_listen_event_t *lev)
84 {
85     nxt_mp_t            *mp;
86     nxt_conn_t          *c;
87     nxt_event_engine_t  *engine;
88 
89     engine = task->thread->engine;
90 
91     if (engine->connections < engine->max_connections) {
92 
93         mp = nxt_mp_create(1024, 128, 256, 32);
94 
95         if (nxt_fast_path(mp != NULL)) {
96             c = nxt_conn_create(mp, lev->socket.task);
97             if (nxt_slow_path(c == NULL)) {
98                 goto fail;
99             }
100 
101             lev->next = c;
102             c->socket.read_work_queue = lev->socket.read_work_queue;
103             c->socket.write_ready = 1;
104 
105             c->remote = nxt_sockaddr_cache_alloc(engine, lev->listen);
106             if (nxt_fast_path(c->remote != NULL)) {
107                 return c;
108             }
109         }
110 
111     fail:
112 
113         nxt_mp_destroy(mp);
114     }
115 
116     return NULL;
117 }
118 
119 
120 static void
121 nxt_conn_listen_handler(nxt_task_t *task, void *obj, void *data)
122 {
123     nxt_listen_event_t  *lev;
124 
125     lev = obj;
126     lev->ready = lev->batch;
127 
128     lev->accept(task, lev, data);
129 }
130 
131 
132 void
133 nxt_conn_io_accept(nxt_task_t *task, void *obj, void *data)
134 {
135     socklen_t           socklen;
136     nxt_conn_t          *c;
137     nxt_socket_t        s;
138     struct sockaddr     *sa;
139     nxt_listen_event_t  *lev;
140 
141     lev = obj;
142     c = lev->next;
143 
144     lev->ready--;
145     lev->socket.read_ready = (lev->ready != 0);
146 
147     sa = &c->remote->u.sockaddr;
148     socklen = c->remote->socklen;
149     /*
150      * The returned socklen is ignored here, because sockaddr_in and
151      * sockaddr_in6 socklens are not changed.  As to unspecified sockaddr_un
152      * it is 3 byte length and already prepared, because old BSDs return zero
153      * socklen and do not update the sockaddr_un at all; Linux returns 2 byte
154      * socklen and updates only the sa_family part; other systems copy 3 bytes
155      * and truncate surplus zero part.  Only bound sockaddr_un will be really
156      * truncated here.
157      */
158     s = accept(lev->socket.fd, sa, &socklen);
159 
160     if (s == -1) {
161         nxt_conn_accept_error(task, lev, "accept", nxt_socket_errno);
162         return;
163     }
164 
165     c->socket.fd = s;
166 
167 #if (NXT_LINUX)
168     /*
169      * Linux does not inherit non-blocking mode
170      * from listen socket for accept()ed socket.
171      */
172     if (nxt_slow_path(nxt_socket_nonblocking(task, s) != NXT_OK)) {
173         nxt_socket_close(task, s);
174     }
175 
176 #endif
177 
178     nxt_debug(task, "accept(%d): %d", lev->socket.fd, s);
179 
180     nxt_conn_accept(task, lev, c);
181 }
182 
183 
184 void
185 nxt_conn_accept(nxt_task_t *task, nxt_listen_event_t *lev, nxt_conn_t *c)
186 {
187     nxt_conn_t  *next;
188 
189     nxt_sockaddr_text(c->remote);
190 
191     nxt_debug(task, "client: %*s",
192               c->remote->address_length, nxt_sockaddr_address(c->remote));
193 
194     nxt_queue_insert_head(&task->thread->engine->idle_connections, &c->link);
195 
196     c->read_work_queue = lev->work_queue;
197     c->write_work_queue = lev->work_queue;
198 
199     if (lev->listen->read_after_accept) {
200 
201         //c->socket.read_ready = 1;
202 //        lev->listen->handler(task, c, lev->socket.data);
203         nxt_work_queue_add(c->read_work_queue, lev->listen->handler,
204                            &c->task, c, lev->socket.data);
205 
206     } else {
207         nxt_work_queue_add(c->write_work_queue, lev->listen->handler,
208                            &c->task, c, lev->socket.data);
209     }
210 
211     next = nxt_conn_accept_next(task, lev);
212 
213     if (next != NULL && lev->socket.read_ready) {
214         nxt_work_queue_add(lev->socket.read_work_queue,
215                            lev->accept, task, lev, next);
216     }
217 }
218 
219 
220 static nxt_conn_t *
221 nxt_conn_accept_next(nxt_task_t *task, nxt_listen_event_t *lev)
222 {
223     nxt_conn_t  *c;
224 
225     lev->next = NULL;
226 
227     do {
228         c = nxt_conn_accept_alloc(task, lev);
229 
230         if (nxt_fast_path(c != NULL)) {
231             return c;
232         }
233 
234     } while (nxt_conn_accept_close_idle(task, lev) == NXT_OK);
235 
236     nxt_log(task, NXT_LOG_CRIT, "no available connections, "
237                   "new connections are not accepted within 1s");
238 
239     return NULL;
240 }
241 
242 
243 static nxt_int_t
244 nxt_conn_accept_close_idle(nxt_task_t *task, nxt_listen_event_t *lev)
245 {
246     nxt_conn_t          *c;
247     nxt_queue_t         *idle;
248     nxt_queue_link_t    *link;
249     nxt_event_engine_t  *engine;
250 
251     static nxt_log_moderation_t  nxt_idle_close_log_moderation = {
252         NXT_LOG_INFO, 2, "idle connections closed", NXT_LOG_MODERATION
253     };
254 
255     engine = task->thread->engine;
256 
257     idle = &engine->idle_connections;
258 
259     for (link = nxt_queue_last(idle);
260          link != nxt_queue_head(idle);
261          link = nxt_queue_next(link))
262     {
263         c = nxt_queue_link_data(link, nxt_conn_t, link);
264 
265         if (!c->socket.read_ready) {
266             nxt_log_moderate(&nxt_idle_close_log_moderation, NXT_LOG_INFO,
267                              task->log, "no available connections, "
268                              "close idle connection");
269             nxt_queue_remove(link);
270             nxt_conn_close(engine, c);
271 
272             return NXT_OK;
273         }
274     }
275 
276     nxt_timer_add(engine, &lev->timer, 1000);
277 
278     nxt_fd_event_disable_read(engine, &lev->socket);
279 
280     return NXT_DECLINED;
281 }
282 
283 
284 void
285 nxt_conn_accept_error(nxt_task_t *task, nxt_listen_event_t *lev,
286     const char *accept_syscall, nxt_err_t err)
287 {
288     static nxt_log_moderation_t  nxt_accept_log_moderation = {
289         NXT_LOG_INFO, 2, "accept() failed", NXT_LOG_MODERATION
290     };
291 
292     lev->socket.read_ready = 0;
293 
294     switch (err) {
295 
296     case NXT_EAGAIN:
297         nxt_debug(task, "%s(%d) %E", accept_syscall, lev->socket.fd, err);
298         return;
299 
300     case ECONNABORTED:
301         nxt_log_moderate(&nxt_accept_log_moderation, NXT_LOG_WARN,
302                          task->log, "%s(%d) failed %E",
303                          accept_syscall, lev->socket.fd, err);
304         return;
305 
306     case EMFILE:
307     case ENFILE:
308     case ENOBUFS:
309     case ENOMEM:
310         if (nxt_conn_accept_close_idle(task, lev) != NXT_OK) {
311             nxt_log(task, NXT_LOG_CRIT, "%s(%d) failed %E, "
312                     "new connections are not accepted within 1s",
313                     accept_syscall, lev->socket.fd, err);
314         }
315 
316         return;
317 
318     default:
319         nxt_log(task, NXT_LOG_CRIT, "%s(%d) failed %E",
320                 accept_syscall, lev->socket.fd, err);
321         return;
322     }
323 }
324 
325 
326 static void
327 nxt_conn_listen_timer_handler(nxt_task_t *task, void *obj, void *data)
328 {
329     nxt_conn_t          *c;
330     nxt_timer_t         *timer;
331     nxt_listen_event_t  *lev;
332 
333     timer = obj;
334 
335     lev = nxt_timer_data(timer, nxt_listen_event_t, timer);
336     c = lev->next;
337 
338     if (c == NULL) {
339         c = nxt_conn_accept_next(task, lev);
340 
341         if (c == NULL) {
342             return;
343         }
344     }
345 
346     nxt_fd_event_enable_accept(task->thread->engine, &lev->socket);
347 
348     lev->accept(task, lev, c);
349 }
350 
351 
352 static void
353 nxt_conn_listen_event_error(nxt_task_t *task, void *obj, void *data)
354 {
355     nxt_fd_event_t  *ev;
356 
357     ev = obj;
358 
359     nxt_log(task, NXT_LOG_CRIT, "accept(%d) event error", ev->fd);
360 }
361