xref: /unit/src/nxt_conn_accept.c (revision 337:854a1a440616)
1 
2 /*
3  * Copyright (C) Igor Sysoev
4  * Copyright (C) NGINX, Inc.
5  */
6 
7 #include <nxt_main.h>
8 
9 
10 /*
11  * A listen socket handler calls an event facility specific io_accept()
12  * method.  The method accept()s a new connection and then calls
13  * nxt_event_conn_accept() to handle the new connection and to prepare
14  * for a next connection to avoid just dropping next accept()ed socket
15  * if no more connections allowed.  If there are no available connections
16  * an idle connection would be closed.  If there are no idle connections
17  * then new connections will not be accept()ed for 1 second.
18  */
19 
20 
21 static nxt_conn_t *nxt_conn_accept_alloc(nxt_task_t *task,
22     nxt_listen_event_t *lev);
23 static void nxt_conn_listen_handler(nxt_task_t *task, void *obj,
24     void *data);
25 static nxt_conn_t *nxt_conn_accept_next(nxt_task_t *task,
26     nxt_listen_event_t *lev);
27 static nxt_int_t nxt_conn_accept_close_idle(nxt_task_t *task,
28     nxt_listen_event_t *lev);
29 static void nxt_conn_listen_event_error(nxt_task_t *task, void *obj,
30     void *data);
31 static void nxt_conn_listen_timer_handler(nxt_task_t *task, void *obj,
32     void *data);
33 
34 
35 nxt_listen_event_t *
36 nxt_listen_event(nxt_task_t *task, nxt_listen_socket_t *ls)
37 {
38     nxt_listen_event_t  *lev;
39     nxt_event_engine_t  *engine;
40 
41     lev = nxt_zalloc(sizeof(nxt_listen_event_t));
42 
43     if (nxt_fast_path(lev != NULL)) {
44         lev->socket.fd = ls->socket;
45 
46         engine = task->thread->engine;
47         lev->batch = engine->batch;
48 
49         lev->mem_cache = (uint32_t) -1;
50 
51         lev->socket.read_work_queue = &engine->accept_work_queue;
52         lev->socket.read_handler = nxt_conn_listen_handler;
53         lev->socket.error_handler = nxt_conn_listen_event_error;
54         lev->socket.log = &nxt_main_log;
55 
56         lev->accept = engine->event.io->accept;
57 
58         lev->listen = ls;
59         lev->work_queue = &engine->read_work_queue;
60 
61         lev->timer.work_queue = &engine->fast_work_queue;
62         lev->timer.handler = nxt_conn_listen_timer_handler;
63         lev->timer.log = &nxt_main_log;
64 
65         lev->task.thread = task->thread;
66         lev->task.log = &nxt_main_log;
67         lev->task.ident = nxt_task_next_ident();
68         lev->socket.task = &lev->task;
69         lev->timer.task = &lev->task;
70 
71         if (nxt_conn_accept_alloc(task, lev) != NULL) {
72             nxt_fd_event_enable_accept(engine, &lev->socket);
73 
74             nxt_queue_insert_tail(&engine->listen_connections, &lev->link);
75         }
76 
77         return lev;
78     }
79 
80     return NULL;
81 }
82 
83 
84 static nxt_conn_t *
85 nxt_conn_accept_alloc(nxt_task_t *task, nxt_listen_event_t *lev)
86 {
87     nxt_mp_t             *mp;
88     nxt_conn_t           *c;
89     nxt_event_engine_t   *engine;
90     nxt_listen_socket_t  *ls;
91 
92     engine = task->thread->engine;
93 
94     if (engine->connections < engine->max_connections) {
95 
96         mp = nxt_mp_create(1024, 128, 256, 32);
97 
98         if (nxt_fast_path(mp != NULL)) {
99             c = nxt_conn_create(mp, lev->socket.task);
100             if (nxt_slow_path(c == NULL)) {
101                 goto fail;
102             }
103 
104             lev->next = c;
105             c->socket.read_work_queue = lev->socket.read_work_queue;
106             c->socket.write_ready = 1;
107             c->listen = lev;
108 
109             ls = lev->listen;
110 
111             c->remote = nxt_sockaddr_cache_alloc(engine, lev);
112             if (nxt_fast_path(c->remote != NULL)) {
113                 return c;
114             }
115         }
116 
117     fail:
118 
119         nxt_mp_destroy(mp);
120     }
121 
122     return NULL;
123 }
124 
125 
126 static void
127 nxt_conn_listen_handler(nxt_task_t *task, void *obj, void *data)
128 {
129     nxt_listen_event_t  *lev;
130 
131     lev = obj;
132     lev->ready = lev->batch;
133 
134     lev->accept(task, lev, data);
135 }
136 
137 
138 void
139 nxt_conn_io_accept(nxt_task_t *task, void *obj, void *data)
140 {
141     socklen_t           socklen;
142     nxt_conn_t          *c;
143     nxt_socket_t        s;
144     struct sockaddr     *sa;
145     nxt_listen_event_t  *lev;
146 
147     lev = obj;
148     c = lev->next;
149 
150     lev->ready--;
151     lev->socket.read_ready = (lev->ready != 0);
152 
153     sa = &c->remote->u.sockaddr;
154     socklen = c->remote->socklen;
155     /*
156      * The returned socklen is ignored here, because sockaddr_in and
157      * sockaddr_in6 socklens are not changed.  As to unspecified sockaddr_un
158      * it is 3 byte length and already prepared, because old BSDs return zero
159      * socklen and do not update the sockaddr_un at all; Linux returns 2 byte
160      * socklen and updates only the sa_family part; other systems copy 3 bytes
161      * and truncate surplus zero part.  Only bound sockaddr_un will be really
162      * truncated here.
163      */
164     s = accept(lev->socket.fd, sa, &socklen);
165 
166     if (s == -1) {
167         nxt_conn_accept_error(task, lev, "accept", nxt_socket_errno);
168         return;
169     }
170 
171     c->socket.fd = s;
172 
173 #if (NXT_LINUX)
174     /*
175      * Linux does not inherit non-blocking mode
176      * from listen socket for accept()ed socket.
177      */
178     if (nxt_slow_path(nxt_socket_nonblocking(task, s) != NXT_OK)) {
179         nxt_socket_close(task, s);
180     }
181 
182 #endif
183 
184     nxt_debug(task, "accept(%d): %d", lev->socket.fd, s);
185 
186     nxt_conn_accept(task, lev, c);
187 }
188 
189 
190 void
191 nxt_conn_accept(nxt_task_t *task, nxt_listen_event_t *lev, nxt_conn_t *c)
192 {
193     nxt_conn_t  *next;
194 
195     nxt_sockaddr_text(c->remote);
196 
197     nxt_debug(task, "client: %*s",
198               c->remote->address_length, nxt_sockaddr_address(c->remote));
199 
200     nxt_queue_insert_head(&task->thread->engine->idle_connections, &c->link);
201 
202     c->read_work_queue = lev->work_queue;
203     c->write_work_queue = lev->work_queue;
204 
205     if (lev->listen->read_after_accept) {
206 
207         //c->socket.read_ready = 1;
208 //        lev->listen->handler(task, c, lev->socket.data);
209         nxt_work_queue_add(c->read_work_queue, lev->listen->handler,
210                            &c->task, c, lev->socket.data);
211 
212     } else {
213         nxt_work_queue_add(c->write_work_queue, lev->listen->handler,
214                            &c->task, c, lev->socket.data);
215     }
216 
217     next = nxt_conn_accept_next(task, lev);
218 
219     if (next != NULL && lev->socket.read_ready) {
220         nxt_work_queue_add(lev->socket.read_work_queue,
221                            lev->accept, task, lev, next);
222     }
223 }
224 
225 
226 static nxt_conn_t *
227 nxt_conn_accept_next(nxt_task_t *task, nxt_listen_event_t *lev)
228 {
229     nxt_conn_t  *c;
230 
231     lev->next = NULL;
232 
233     do {
234         c = nxt_conn_accept_alloc(task, lev);
235 
236         if (nxt_fast_path(c != NULL)) {
237             return c;
238         }
239 
240     } while (nxt_conn_accept_close_idle(task, lev) == NXT_OK);
241 
242     nxt_log(task, NXT_LOG_CRIT, "no available connections, "
243                   "new connections are not accepted within 1s");
244 
245     return NULL;
246 }
247 
248 
249 static nxt_int_t
250 nxt_conn_accept_close_idle(nxt_task_t *task, nxt_listen_event_t *lev)
251 {
252     nxt_conn_t          *c;
253     nxt_queue_t         *idle;
254     nxt_queue_link_t    *link;
255     nxt_event_engine_t  *engine;
256 
257     static nxt_log_moderation_t  nxt_idle_close_log_moderation = {
258         NXT_LOG_INFO, 2, "idle connections closed", NXT_LOG_MODERATION
259     };
260 
261     engine = task->thread->engine;
262 
263     idle = &engine->idle_connections;
264 
265     for (link = nxt_queue_last(idle);
266          link != nxt_queue_head(idle);
267          link = nxt_queue_next(link))
268     {
269         c = nxt_queue_link_data(link, nxt_conn_t, link);
270 
271         if (!c->socket.read_ready) {
272             nxt_log_moderate(&nxt_idle_close_log_moderation, NXT_LOG_INFO,
273                              task->log, "no available connections, "
274                              "close idle connection");
275             nxt_queue_remove(link);
276             nxt_conn_close(engine, c);
277 
278             return NXT_OK;
279         }
280     }
281 
282     nxt_timer_add(engine, &lev->timer, 1000);
283 
284     nxt_fd_event_disable_read(engine, &lev->socket);
285 
286     return NXT_DECLINED;
287 }
288 
289 
290 void
291 nxt_conn_accept_error(nxt_task_t *task, nxt_listen_event_t *lev,
292     const char *accept_syscall, nxt_err_t err)
293 {
294     static nxt_log_moderation_t  nxt_accept_log_moderation = {
295         NXT_LOG_INFO, 2, "accept() failed", NXT_LOG_MODERATION
296     };
297 
298     lev->socket.read_ready = 0;
299 
300     switch (err) {
301 
302     case NXT_EAGAIN:
303         nxt_debug(task, "%s(%d) %E", accept_syscall, lev->socket.fd, err);
304         return;
305 
306     case ECONNABORTED:
307         nxt_log_moderate(&nxt_accept_log_moderation, NXT_LOG_WARN,
308                          task->log, "%s(%d) failed %E",
309                          accept_syscall, lev->socket.fd, err);
310         return;
311 
312     case EMFILE:
313     case ENFILE:
314     case ENOBUFS:
315     case ENOMEM:
316         if (nxt_conn_accept_close_idle(task, lev) != NXT_OK) {
317             nxt_log(task, NXT_LOG_CRIT, "%s(%d) failed %E, "
318                     "new connections are not accepted within 1s",
319                     accept_syscall, lev->socket.fd, err);
320         }
321 
322         return;
323 
324     default:
325         nxt_log(task, NXT_LOG_CRIT, "%s(%d) failed %E",
326                 accept_syscall, lev->socket.fd, err);
327         return;
328     }
329 }
330 
331 
332 static void
333 nxt_conn_listen_timer_handler(nxt_task_t *task, void *obj, void *data)
334 {
335     nxt_conn_t          *c;
336     nxt_timer_t         *timer;
337     nxt_listen_event_t  *lev;
338 
339     timer = obj;
340 
341     lev = nxt_timer_data(timer, nxt_listen_event_t, timer);
342     c = lev->next;
343 
344     if (c == NULL) {
345         c = nxt_conn_accept_next(task, lev);
346 
347         if (c == NULL) {
348             return;
349         }
350     }
351 
352     nxt_fd_event_enable_accept(task->thread->engine, &lev->socket);
353 
354     lev->accept(task, lev, c);
355 }
356 
357 
358 static void
359 nxt_conn_listen_event_error(nxt_task_t *task, void *obj, void *data)
360 {
361     nxt_fd_event_t  *ev;
362 
363     ev = obj;
364 
365     nxt_log(task, NXT_LOG_CRIT, "accept(%d) event error", ev->fd);
366 }
367