xref: /unit/src/nxt_conn_accept.c (revision 65:10688b89aa16)
1 
2 /*
3  * Copyright (C) Igor Sysoev
4  * Copyright (C) NGINX, Inc.
5  */
6 
7 #include <nxt_main.h>
8 
9 
10 /*
11  * A listen socket handler calls an event facility specific io_accept()
12  * method.  The method accept()s a new connection and then calls
13  * nxt_event_conn_accept() to handle the new connection and to prepare
14  * for a next connection to avoid just dropping next accept()ed socket
15  * if no more connections allowed.  If there are no available connections
16  * an idle connection would be closed.  If there are no idle connections
17  * then new connections will not be accept()ed for 1 second.
18  */
19 
20 
21 static nxt_conn_t *nxt_conn_accept_alloc(nxt_task_t *task,
22     nxt_listen_event_t *lev);
23 static void nxt_conn_listen_handler(nxt_task_t *task, void *obj,
24     void *data);
25 static nxt_conn_t *nxt_conn_accept_next(nxt_task_t *task,
26     nxt_listen_event_t *lev);
27 static nxt_int_t nxt_conn_accept_close_idle(nxt_task_t *task,
28     nxt_listen_event_t *lev);
29 static void nxt_conn_listen_event_error(nxt_task_t *task, void *obj,
30     void *data);
31 static void nxt_conn_listen_timer_handler(nxt_task_t *task, void *obj,
32     void *data);
33 
34 
35 nxt_listen_event_t *
36 nxt_listen_event(nxt_task_t *task, nxt_listen_socket_t *ls)
37 {
38     nxt_listen_event_t  *lev;
39     nxt_event_engine_t  *engine;
40 
41     lev = nxt_zalloc(sizeof(nxt_listen_event_t));
42 
43     if (nxt_fast_path(lev != NULL)) {
44         lev->socket.fd = ls->socket;
45 
46         engine = task->thread->engine;
47         lev->batch = engine->batch;
48 
49         lev->socket.read_work_queue = &engine->accept_work_queue;
50         lev->socket.read_handler = nxt_conn_listen_handler;
51         lev->socket.error_handler = nxt_conn_listen_event_error;
52         lev->socket.log = &nxt_main_log;
53 
54         lev->accept = engine->event.io->accept;
55 
56         lev->listen = ls;
57         lev->work_queue = &engine->read_work_queue;
58 
59         lev->timer.work_queue = &engine->fast_work_queue;
60         lev->timer.handler = nxt_conn_listen_timer_handler;
61         lev->timer.log = &nxt_main_log;
62 
63         lev->task.thread = task->thread;
64         lev->task.log = &nxt_main_log;
65         lev->task.ident = nxt_task_next_ident();
66         lev->socket.task = &lev->task;
67         lev->timer.task = &lev->task;
68 
69         if (nxt_conn_accept_alloc(task, lev) != NULL) {
70             nxt_fd_event_enable_accept(engine, &lev->socket);
71 
72             nxt_queue_insert_head(&engine->listen_connections, &lev->link);
73         }
74 
75         return lev;
76     }
77 
78     return NULL;
79 }
80 
81 
82 static nxt_conn_t *
83 nxt_conn_accept_alloc(nxt_task_t *task, nxt_listen_event_t *lev)
84 {
85     nxt_mp_t             *mp;
86     nxt_conn_t           *c;
87     nxt_sockaddr_t       *sa, *remote;
88     nxt_event_engine_t   *engine;
89     nxt_listen_socket_t  *ls;
90 
91     engine = task->thread->engine;
92 
93     if (engine->connections < engine->max_connections) {
94 
95         mp = nxt_mp_create(1024, 128, 256, 32);
96 
97         if (nxt_fast_path(mp != NULL)) {
98             c = nxt_conn_create(mp, lev->socket.task);
99             if (nxt_slow_path(c == NULL)) {
100                 goto fail;
101             }
102 
103             lev->next = c;
104             c->socket.read_work_queue = lev->socket.read_work_queue;
105             c->socket.write_ready = 1;
106             c->listen = lev;
107 
108             ls = lev->listen;
109 
110             remote = nxt_sockaddr_alloc(mp, ls->socklen, ls->address_length);
111             if (nxt_slow_path(remote == NULL)) {
112                 goto fail;
113             }
114 
115             c->remote = remote;
116 
117             sa = ls->sockaddr;
118             remote->type = sa->type;
119             /*
120              * Set address family for unspecified Unix domain,
121              * because these sockaddr's are not be passed to accept().
122              */
123             remote->u.sockaddr.sa_family = sa->u.sockaddr.sa_family;
124 
125             return c;
126         }
127 
128     fail:
129 
130         nxt_mp_destroy(mp);
131     }
132 
133     return NULL;
134 }
135 
136 
137 static void
138 nxt_conn_listen_handler(nxt_task_t *task, void *obj, void *data)
139 {
140     nxt_listen_event_t  *lev;
141 
142     lev = obj;
143     lev->ready = lev->batch;
144 
145     lev->accept(task, lev, data);
146 }
147 
148 
149 void
150 nxt_conn_io_accept(nxt_task_t *task, void *obj, void *data)
151 {
152     socklen_t           len;
153     nxt_conn_t          *c;
154     nxt_socket_t        s;
155     struct sockaddr     *sa;
156     nxt_listen_event_t  *lev;
157 
158     lev = obj;
159     c = lev->next;
160 
161     lev->ready--;
162     lev->socket.read_ready = (lev->ready != 0);
163 
164     len = c->remote->socklen;
165 
166     if (len >= sizeof(struct sockaddr)) {
167         sa = &c->remote->u.sockaddr;
168 
169     } else {
170         sa = NULL;
171         len = 0;
172     }
173 
174     s = accept(lev->socket.fd, sa, &len);
175 
176     if (s == -1) {
177         nxt_conn_accept_error(task, lev, "accept", nxt_socket_errno);
178         return;
179     }
180 
181     c->socket.fd = s;
182 
183 #if (NXT_LINUX)
184     /*
185      * Linux does not inherit non-blocking mode
186      * from listen socket for accept()ed socket.
187      */
188     if (nxt_slow_path(nxt_socket_nonblocking(task, s) != NXT_OK)) {
189         nxt_socket_close(task, s);
190     }
191 
192 #endif
193 
194     nxt_debug(task, "accept(%d): %d", lev->socket.fd, s);
195 
196     nxt_conn_accept(task, lev, c);
197 }
198 
199 
200 void
201 nxt_conn_accept(nxt_task_t *task, nxt_listen_event_t *lev, nxt_conn_t *c)
202 {
203     nxt_conn_t  *next;
204 
205     nxt_sockaddr_text(c->remote);
206 
207     nxt_debug(task, "client: %*s",
208               c->remote->address_length, nxt_sockaddr_address(c->remote));
209 
210     nxt_queue_insert_head(&task->thread->engine->idle_connections, &c->link);
211 
212     c->read_work_queue = lev->work_queue;
213     c->write_work_queue = lev->work_queue;
214 
215     if (lev->listen->read_after_accept) {
216 
217         //c->socket.read_ready = 1;
218 //        lev->listen->handler(task, c, lev->socket.data);
219         nxt_work_queue_add(c->read_work_queue, lev->listen->handler,
220                            task, c, lev->socket.data);
221 
222     } else {
223         nxt_work_queue_add(c->write_work_queue, lev->listen->handler,
224                            task, c, lev->socket.data);
225     }
226 
227     next = nxt_conn_accept_next(task, lev);
228 
229     if (next != NULL && lev->socket.read_ready) {
230         nxt_work_queue_add(lev->socket.read_work_queue,
231                            lev->accept, task, lev, next);
232     }
233 }
234 
235 
236 static nxt_conn_t *
237 nxt_conn_accept_next(nxt_task_t *task, nxt_listen_event_t *lev)
238 {
239     nxt_conn_t  *c;
240 
241     lev->next = NULL;
242 
243     do {
244         c = nxt_conn_accept_alloc(task, lev);
245 
246         if (nxt_fast_path(c != NULL)) {
247             return c;
248         }
249 
250     } while (nxt_conn_accept_close_idle(task, lev) == NXT_OK);
251 
252     nxt_log(task, NXT_LOG_CRIT, "no available connections, "
253                   "new connections are not accepted within 1s");
254 
255     return NULL;
256 }
257 
258 
259 static nxt_int_t
260 nxt_conn_accept_close_idle(nxt_task_t *task, nxt_listen_event_t *lev)
261 {
262     nxt_conn_t          *c;
263     nxt_queue_t         *idle;
264     nxt_queue_link_t    *link;
265     nxt_event_engine_t  *engine;
266 
267     static nxt_log_moderation_t  nxt_idle_close_log_moderation = {
268         NXT_LOG_INFO, 2, "idle connections closed", NXT_LOG_MODERATION
269     };
270 
271     engine = task->thread->engine;
272 
273     idle = &engine->idle_connections;
274 
275     for (link = nxt_queue_last(idle);
276          link != nxt_queue_head(idle);
277          link = nxt_queue_next(link))
278     {
279         c = nxt_queue_link_data(link, nxt_conn_t, link);
280 
281         if (!c->socket.read_ready) {
282             nxt_log_moderate(&nxt_idle_close_log_moderation, NXT_LOG_INFO,
283                              task->log, "no available connections, "
284                              "close idle connection");
285             nxt_queue_remove(link);
286             nxt_conn_close(engine, c);
287 
288             return NXT_OK;
289         }
290     }
291 
292     nxt_timer_add(engine, &lev->timer, 1000);
293 
294     nxt_fd_event_disable_read(engine, &lev->socket);
295 
296     return NXT_DECLINED;
297 }
298 
299 
300 void
301 nxt_conn_accept_error(nxt_task_t *task, nxt_listen_event_t *lev,
302     const char *accept_syscall, nxt_err_t err)
303 {
304     static nxt_log_moderation_t  nxt_accept_log_moderation = {
305         NXT_LOG_INFO, 2, "accept() failed", NXT_LOG_MODERATION
306     };
307 
308     lev->socket.read_ready = 0;
309 
310     switch (err) {
311 
312     case NXT_EAGAIN:
313         nxt_debug(task, "%s(%d) %E", accept_syscall, lev->socket.fd, err);
314         return;
315 
316     case ECONNABORTED:
317         nxt_log_moderate(&nxt_accept_log_moderation, NXT_LOG_WARN,
318                          task->log, "%s(%d) failed %E",
319                          accept_syscall, lev->socket.fd, err);
320         return;
321 
322     case EMFILE:
323     case ENFILE:
324     case ENOBUFS:
325     case ENOMEM:
326         if (nxt_conn_accept_close_idle(task, lev) != NXT_OK) {
327             nxt_log(task, NXT_LOG_CRIT, "%s(%d) failed %E, "
328                     "new connections are not accepted within 1s",
329                     accept_syscall, lev->socket.fd, err);
330         }
331 
332         return;
333 
334     default:
335         nxt_log(task, NXT_LOG_CRIT, "%s(%d) failed %E",
336                 accept_syscall, lev->socket.fd, err);
337         return;
338     }
339 }
340 
341 
342 static void
343 nxt_conn_listen_timer_handler(nxt_task_t *task, void *obj, void *data)
344 {
345     nxt_conn_t          *c;
346     nxt_timer_t         *timer;
347     nxt_listen_event_t  *lev;
348 
349     timer = obj;
350 
351     lev = nxt_timer_data(timer, nxt_listen_event_t, timer);
352     c = lev->next;
353 
354     if (c == NULL) {
355         c = nxt_conn_accept_next(task, lev);
356 
357         if (c == NULL) {
358             return;
359         }
360     }
361 
362     nxt_fd_event_enable_accept(task->thread->engine, &lev->socket);
363 
364     lev->accept(task, lev, c);
365 }
366 
367 
368 static void
369 nxt_conn_listen_event_error(nxt_task_t *task, void *obj, void *data)
370 {
371     nxt_fd_event_t  *ev;
372 
373     ev = obj;
374 
375     nxt_log(task, NXT_LOG_CRIT, "accept(%d) event error", ev->fd);
376 }
377