xref: /unit/src/nxt_conn_accept.c (revision 338:2c6135a99c27)
1 
2 /*
3  * Copyright (C) Igor Sysoev
4  * Copyright (C) NGINX, Inc.
5  */
6 
7 #include <nxt_main.h>
8 
9 
10 /*
11  * A listen socket handler calls an event facility specific io_accept()
12  * method.  The method accept()s a new connection and then calls
13  * nxt_event_conn_accept() to handle the new connection and to prepare
14  * for a next connection to avoid just dropping next accept()ed socket
15  * if no more connections allowed.  If there are no available connections
16  * an idle connection would be closed.  If there are no idle connections
17  * then new connections will not be accept()ed for 1 second.
18  */
19 
20 
21 static nxt_conn_t *nxt_conn_accept_alloc(nxt_task_t *task,
22     nxt_listen_event_t *lev);
23 static void nxt_conn_listen_handler(nxt_task_t *task, void *obj,
24     void *data);
25 static nxt_conn_t *nxt_conn_accept_next(nxt_task_t *task,
26     nxt_listen_event_t *lev);
27 static nxt_int_t nxt_conn_accept_close_idle(nxt_task_t *task,
28     nxt_listen_event_t *lev);
29 static void nxt_conn_listen_event_error(nxt_task_t *task, void *obj,
30     void *data);
31 static void nxt_conn_listen_timer_handler(nxt_task_t *task, void *obj,
32     void *data);
33 
34 
35 nxt_listen_event_t *
36 nxt_listen_event(nxt_task_t *task, nxt_listen_socket_t *ls)
37 {
38     nxt_listen_event_t  *lev;
39     nxt_event_engine_t  *engine;
40 
41     lev = nxt_zalloc(sizeof(nxt_listen_event_t));
42 
43     if (nxt_fast_path(lev != NULL)) {
44         lev->socket.fd = ls->socket;
45 
46         engine = task->thread->engine;
47         lev->batch = engine->batch;
48 
49         lev->mem_cache = (uint32_t) -1;
50 
51         lev->socket.read_work_queue = &engine->accept_work_queue;
52         lev->socket.read_handler = nxt_conn_listen_handler;
53         lev->socket.error_handler = nxt_conn_listen_event_error;
54         lev->socket.log = &nxt_main_log;
55 
56         lev->accept = engine->event.io->accept;
57 
58         lev->listen = ls;
59         lev->work_queue = &engine->read_work_queue;
60 
61         lev->timer.work_queue = &engine->fast_work_queue;
62         lev->timer.handler = nxt_conn_listen_timer_handler;
63         lev->timer.log = &nxt_main_log;
64 
65         lev->task.thread = task->thread;
66         lev->task.log = &nxt_main_log;
67         lev->task.ident = nxt_task_next_ident();
68         lev->socket.task = &lev->task;
69         lev->timer.task = &lev->task;
70 
71         if (nxt_conn_accept_alloc(task, lev) != NULL) {
72             nxt_fd_event_enable_accept(engine, &lev->socket);
73 
74             nxt_queue_insert_tail(&engine->listen_connections, &lev->link);
75         }
76 
77         return lev;
78     }
79 
80     return NULL;
81 }
82 
83 
84 static nxt_conn_t *
85 nxt_conn_accept_alloc(nxt_task_t *task, nxt_listen_event_t *lev)
86 {
87     nxt_mp_t            *mp;
88     nxt_conn_t          *c;
89     nxt_event_engine_t  *engine;
90 
91     engine = task->thread->engine;
92 
93     if (engine->connections < engine->max_connections) {
94 
95         mp = nxt_mp_create(1024, 128, 256, 32);
96 
97         if (nxt_fast_path(mp != NULL)) {
98             c = nxt_conn_create(mp, lev->socket.task);
99             if (nxt_slow_path(c == NULL)) {
100                 goto fail;
101             }
102 
103             lev->next = c;
104             c->socket.read_work_queue = lev->socket.read_work_queue;
105             c->socket.write_ready = 1;
106             c->listen = lev;
107 
108             c->remote = nxt_sockaddr_cache_alloc(engine, lev);
109             if (nxt_fast_path(c->remote != NULL)) {
110                 return c;
111             }
112         }
113 
114     fail:
115 
116         nxt_mp_destroy(mp);
117     }
118 
119     return NULL;
120 }
121 
122 
123 static void
124 nxt_conn_listen_handler(nxt_task_t *task, void *obj, void *data)
125 {
126     nxt_listen_event_t  *lev;
127 
128     lev = obj;
129     lev->ready = lev->batch;
130 
131     lev->accept(task, lev, data);
132 }
133 
134 
135 void
136 nxt_conn_io_accept(nxt_task_t *task, void *obj, void *data)
137 {
138     socklen_t           socklen;
139     nxt_conn_t          *c;
140     nxt_socket_t        s;
141     struct sockaddr     *sa;
142     nxt_listen_event_t  *lev;
143 
144     lev = obj;
145     c = lev->next;
146 
147     lev->ready--;
148     lev->socket.read_ready = (lev->ready != 0);
149 
150     sa = &c->remote->u.sockaddr;
151     socklen = c->remote->socklen;
152     /*
153      * The returned socklen is ignored here, because sockaddr_in and
154      * sockaddr_in6 socklens are not changed.  As to unspecified sockaddr_un
155      * it is 3 byte length and already prepared, because old BSDs return zero
156      * socklen and do not update the sockaddr_un at all; Linux returns 2 byte
157      * socklen and updates only the sa_family part; other systems copy 3 bytes
158      * and truncate surplus zero part.  Only bound sockaddr_un will be really
159      * truncated here.
160      */
161     s = accept(lev->socket.fd, sa, &socklen);
162 
163     if (s == -1) {
164         nxt_conn_accept_error(task, lev, "accept", nxt_socket_errno);
165         return;
166     }
167 
168     c->socket.fd = s;
169 
170 #if (NXT_LINUX)
171     /*
172      * Linux does not inherit non-blocking mode
173      * from listen socket for accept()ed socket.
174      */
175     if (nxt_slow_path(nxt_socket_nonblocking(task, s) != NXT_OK)) {
176         nxt_socket_close(task, s);
177     }
178 
179 #endif
180 
181     nxt_debug(task, "accept(%d): %d", lev->socket.fd, s);
182 
183     nxt_conn_accept(task, lev, c);
184 }
185 
186 
187 void
188 nxt_conn_accept(nxt_task_t *task, nxt_listen_event_t *lev, nxt_conn_t *c)
189 {
190     nxt_conn_t  *next;
191 
192     nxt_sockaddr_text(c->remote);
193 
194     nxt_debug(task, "client: %*s",
195               c->remote->address_length, nxt_sockaddr_address(c->remote));
196 
197     nxt_queue_insert_head(&task->thread->engine->idle_connections, &c->link);
198 
199     c->read_work_queue = lev->work_queue;
200     c->write_work_queue = lev->work_queue;
201 
202     if (lev->listen->read_after_accept) {
203 
204         //c->socket.read_ready = 1;
205 //        lev->listen->handler(task, c, lev->socket.data);
206         nxt_work_queue_add(c->read_work_queue, lev->listen->handler,
207                            &c->task, c, lev->socket.data);
208 
209     } else {
210         nxt_work_queue_add(c->write_work_queue, lev->listen->handler,
211                            &c->task, c, lev->socket.data);
212     }
213 
214     next = nxt_conn_accept_next(task, lev);
215 
216     if (next != NULL && lev->socket.read_ready) {
217         nxt_work_queue_add(lev->socket.read_work_queue,
218                            lev->accept, task, lev, next);
219     }
220 }
221 
222 
223 static nxt_conn_t *
224 nxt_conn_accept_next(nxt_task_t *task, nxt_listen_event_t *lev)
225 {
226     nxt_conn_t  *c;
227 
228     lev->next = NULL;
229 
230     do {
231         c = nxt_conn_accept_alloc(task, lev);
232 
233         if (nxt_fast_path(c != NULL)) {
234             return c;
235         }
236 
237     } while (nxt_conn_accept_close_idle(task, lev) == NXT_OK);
238 
239     nxt_log(task, NXT_LOG_CRIT, "no available connections, "
240                   "new connections are not accepted within 1s");
241 
242     return NULL;
243 }
244 
245 
246 static nxt_int_t
247 nxt_conn_accept_close_idle(nxt_task_t *task, nxt_listen_event_t *lev)
248 {
249     nxt_conn_t          *c;
250     nxt_queue_t         *idle;
251     nxt_queue_link_t    *link;
252     nxt_event_engine_t  *engine;
253 
254     static nxt_log_moderation_t  nxt_idle_close_log_moderation = {
255         NXT_LOG_INFO, 2, "idle connections closed", NXT_LOG_MODERATION
256     };
257 
258     engine = task->thread->engine;
259 
260     idle = &engine->idle_connections;
261 
262     for (link = nxt_queue_last(idle);
263          link != nxt_queue_head(idle);
264          link = nxt_queue_next(link))
265     {
266         c = nxt_queue_link_data(link, nxt_conn_t, link);
267 
268         if (!c->socket.read_ready) {
269             nxt_log_moderate(&nxt_idle_close_log_moderation, NXT_LOG_INFO,
270                              task->log, "no available connections, "
271                              "close idle connection");
272             nxt_queue_remove(link);
273             nxt_conn_close(engine, c);
274 
275             return NXT_OK;
276         }
277     }
278 
279     nxt_timer_add(engine, &lev->timer, 1000);
280 
281     nxt_fd_event_disable_read(engine, &lev->socket);
282 
283     return NXT_DECLINED;
284 }
285 
286 
287 void
288 nxt_conn_accept_error(nxt_task_t *task, nxt_listen_event_t *lev,
289     const char *accept_syscall, nxt_err_t err)
290 {
291     static nxt_log_moderation_t  nxt_accept_log_moderation = {
292         NXT_LOG_INFO, 2, "accept() failed", NXT_LOG_MODERATION
293     };
294 
295     lev->socket.read_ready = 0;
296 
297     switch (err) {
298 
299     case NXT_EAGAIN:
300         nxt_debug(task, "%s(%d) %E", accept_syscall, lev->socket.fd, err);
301         return;
302 
303     case ECONNABORTED:
304         nxt_log_moderate(&nxt_accept_log_moderation, NXT_LOG_WARN,
305                          task->log, "%s(%d) failed %E",
306                          accept_syscall, lev->socket.fd, err);
307         return;
308 
309     case EMFILE:
310     case ENFILE:
311     case ENOBUFS:
312     case ENOMEM:
313         if (nxt_conn_accept_close_idle(task, lev) != NXT_OK) {
314             nxt_log(task, NXT_LOG_CRIT, "%s(%d) failed %E, "
315                     "new connections are not accepted within 1s",
316                     accept_syscall, lev->socket.fd, err);
317         }
318 
319         return;
320 
321     default:
322         nxt_log(task, NXT_LOG_CRIT, "%s(%d) failed %E",
323                 accept_syscall, lev->socket.fd, err);
324         return;
325     }
326 }
327 
328 
329 static void
330 nxt_conn_listen_timer_handler(nxt_task_t *task, void *obj, void *data)
331 {
332     nxt_conn_t          *c;
333     nxt_timer_t         *timer;
334     nxt_listen_event_t  *lev;
335 
336     timer = obj;
337 
338     lev = nxt_timer_data(timer, nxt_listen_event_t, timer);
339     c = lev->next;
340 
341     if (c == NULL) {
342         c = nxt_conn_accept_next(task, lev);
343 
344         if (c == NULL) {
345             return;
346         }
347     }
348 
349     nxt_fd_event_enable_accept(task->thread->engine, &lev->socket);
350 
351     lev->accept(task, lev, c);
352 }
353 
354 
355 static void
356 nxt_conn_listen_event_error(nxt_task_t *task, void *obj, void *data)
357 {
358     nxt_fd_event_t  *ev;
359 
360     ev = obj;
361 
362     nxt_log(task, NXT_LOG_CRIT, "accept(%d) event error", ev->fd);
363 }
364