xref: /unit/src/nxt_router.c (revision 584:28e8e1877e62)
1 
2 /*
3  * Copyright (C) Igor Sysoev
4  * Copyright (C) Valentin V. Bartenev
5  * Copyright (C) NGINX, Inc.
6  */
7 
8 #include <nxt_router.h>
9 #include <nxt_conf.h>
10 #include <nxt_http.h>
11 
12 
13 typedef struct {
14     nxt_str_t         type;
15     uint32_t          processes;
16     uint32_t          max_processes;
17     uint32_t          spare_processes;
18     nxt_msec_t        timeout;
19     nxt_msec_t        res_timeout;
20     nxt_msec_t        idle_timeout;
21     uint32_t          requests;
22     nxt_conf_value_t  *limits_value;
23     nxt_conf_value_t  *processes_value;
24 } nxt_router_app_conf_t;
25 
26 
27 typedef struct {
28     nxt_str_t  application;
29 } nxt_router_listener_conf_t;
30 
31 
32 typedef struct nxt_msg_info_s {
33     nxt_buf_t                 *buf;
34     nxt_port_mmap_tracking_t  tracking;
35     nxt_work_handler_t        completion_handler;
36 } nxt_msg_info_t;
37 
38 
39 typedef struct nxt_req_app_link_s nxt_req_app_link_t;
40 
41 
42 typedef struct {
43     uint32_t                 stream;
44     nxt_app_t                *app;
45     nxt_port_t               *app_port;
46     nxt_app_parse_ctx_t      *ap;
47     nxt_msg_info_t           msg_info;
48     nxt_req_app_link_t       *ra;
49 
50     nxt_queue_link_t         link;     /* for nxt_conn_t.requests */
51 } nxt_req_conn_link_t;
52 
53 
54 struct nxt_req_app_link_s {
55     uint32_t             stream;
56     nxt_atomic_t         use_count;
57     nxt_port_t           *app_port;
58     nxt_port_t           *reply_port;
59     nxt_app_parse_ctx_t  *ap;
60     nxt_msg_info_t       msg_info;
61     nxt_req_conn_link_t  *rc;
62 
63     nxt_nsec_t           res_time;
64 
65     nxt_queue_link_t     link_app_requests; /* for nxt_app_t.requests */
66     nxt_queue_link_t     link_port_pending; /* for nxt_port_t.pending_requests */
67     nxt_queue_link_t     link_app_pending;  /* for nxt_app_t.pending */
68 
69     nxt_mp_t             *mem_pool;
70     nxt_work_t           work;
71 
72     int                  err_code;
73     const char           *err_str;
74 };
75 
76 
77 typedef struct {
78     nxt_socket_conf_t       *socket_conf;
79     nxt_router_temp_conf_t  *temp_conf;
80 } nxt_socket_rpc_t;
81 
82 
83 typedef struct {
84     nxt_app_t               *app;
85     nxt_router_temp_conf_t  *temp_conf;
86 } nxt_app_rpc_t;
87 
88 
89 struct nxt_port_select_state_s {
90     nxt_app_t           *app;
91     nxt_req_app_link_t  *ra;
92 
93     nxt_port_t          *failed_port;
94     int                 failed_port_use_delta;
95 
96     uint8_t             start_process;    /* 1 bit */
97     nxt_req_app_link_t  *shared_ra;
98     nxt_port_t          *port;
99 };
100 
101 typedef struct nxt_port_select_state_s nxt_port_select_state_t;
102 
103 static void nxt_router_port_select(nxt_task_t *task,
104     nxt_port_select_state_t *state);
105 
106 static nxt_int_t nxt_router_port_post_select(nxt_task_t *task,
107     nxt_port_select_state_t *state);
108 
109 static nxt_int_t nxt_router_start_app_process(nxt_task_t *task, nxt_app_t *app);
110 
111 nxt_inline void
112 nxt_router_ra_inc_use(nxt_req_app_link_t *ra)
113 {
114     nxt_atomic_fetch_add(&ra->use_count, 1);
115 }
116 
117 nxt_inline void
118 nxt_router_ra_dec_use(nxt_req_app_link_t *ra)
119 {
120 #if (NXT_DEBUG)
121     int  c;
122 
123     c = nxt_atomic_fetch_add(&ra->use_count, -1);
124 
125     nxt_assert(c > 1);
126 #else
127     (void) nxt_atomic_fetch_add(&ra->use_count, -1);
128 #endif
129 }
130 
131 static void nxt_router_ra_use(nxt_task_t *task, nxt_req_app_link_t *ra, int i);
132 
133 static nxt_router_temp_conf_t *nxt_router_temp_conf(nxt_task_t *task);
134 static void nxt_router_conf_apply(nxt_task_t *task, void *obj, void *data);
135 static void nxt_router_conf_ready(nxt_task_t *task,
136     nxt_router_temp_conf_t *tmcf);
137 static void nxt_router_conf_error(nxt_task_t *task,
138     nxt_router_temp_conf_t *tmcf);
139 static void nxt_router_conf_send(nxt_task_t *task,
140     nxt_router_temp_conf_t *tmcf, nxt_port_msg_type_t type);
141 
142 static nxt_int_t nxt_router_conf_create(nxt_task_t *task,
143     nxt_router_temp_conf_t *tmcf, u_char *start, u_char *end);
144 static nxt_app_t *nxt_router_app_find(nxt_queue_t *queue, nxt_str_t *name);
145 static nxt_app_t *nxt_router_listener_application(nxt_router_temp_conf_t *tmcf,
146     nxt_str_t *name);
147 static void nxt_router_listen_socket_rpc_create(nxt_task_t *task,
148     nxt_router_temp_conf_t *tmcf, nxt_socket_conf_t *skcf);
149 static void nxt_router_listen_socket_ready(nxt_task_t *task,
150     nxt_port_recv_msg_t *msg, void *data);
151 static void nxt_router_listen_socket_error(nxt_task_t *task,
152     nxt_port_recv_msg_t *msg, void *data);
153 static void nxt_router_app_rpc_create(nxt_task_t *task,
154     nxt_router_temp_conf_t *tmcf, nxt_app_t *app);
155 static void nxt_router_app_prefork_ready(nxt_task_t *task,
156     nxt_port_recv_msg_t *msg, void *data);
157 static void nxt_router_app_prefork_error(nxt_task_t *task,
158     nxt_port_recv_msg_t *msg, void *data);
159 static nxt_socket_conf_t *nxt_router_socket_conf(nxt_task_t *task,
160     nxt_router_temp_conf_t *tmcf, nxt_str_t *name);
161 static nxt_int_t nxt_router_listen_socket_find(nxt_router_temp_conf_t *tmcf,
162     nxt_socket_conf_t *nskcf, nxt_sockaddr_t *sa);
163 
164 static nxt_int_t nxt_router_engines_create(nxt_task_t *task,
165     nxt_router_t *router, nxt_router_temp_conf_t *tmcf,
166     const nxt_event_interface_t *interface);
167 static nxt_int_t nxt_router_engine_conf_create(nxt_router_temp_conf_t *tmcf,
168     nxt_router_engine_conf_t *recf);
169 static nxt_int_t nxt_router_engine_conf_update(nxt_router_temp_conf_t *tmcf,
170     nxt_router_engine_conf_t *recf);
171 static nxt_int_t nxt_router_engine_conf_delete(nxt_router_temp_conf_t *tmcf,
172     nxt_router_engine_conf_t *recf);
173 static nxt_int_t nxt_router_engine_joints_create(nxt_router_temp_conf_t *tmcf,
174     nxt_router_engine_conf_t *recf, nxt_queue_t *sockets,
175     nxt_work_handler_t handler);
176 static nxt_int_t nxt_router_engine_quit(nxt_router_temp_conf_t *tmcf,
177     nxt_router_engine_conf_t *recf);
178 static nxt_int_t nxt_router_engine_joints_delete(nxt_router_temp_conf_t *tmcf,
179     nxt_router_engine_conf_t *recf, nxt_queue_t *sockets);
180 
181 static nxt_int_t nxt_router_threads_create(nxt_task_t *task, nxt_runtime_t *rt,
182     nxt_router_temp_conf_t *tmcf);
183 static nxt_int_t nxt_router_thread_create(nxt_task_t *task, nxt_runtime_t *rt,
184     nxt_event_engine_t *engine);
185 static void nxt_router_apps_sort(nxt_task_t *task, nxt_router_t *router,
186     nxt_router_temp_conf_t *tmcf);
187 
188 static void nxt_router_engines_post(nxt_router_t *router,
189     nxt_router_temp_conf_t *tmcf);
190 static void nxt_router_engine_post(nxt_event_engine_t *engine,
191     nxt_work_t *jobs);
192 
193 static void nxt_router_thread_start(void *data);
194 static void nxt_router_listen_socket_create(nxt_task_t *task, void *obj,
195     void *data);
196 static void nxt_router_listen_socket_update(nxt_task_t *task, void *obj,
197     void *data);
198 static void nxt_router_listen_socket_delete(nxt_task_t *task, void *obj,
199     void *data);
200 static void nxt_router_worker_thread_quit(nxt_task_t *task, void *obj,
201     void *data);
202 static void nxt_router_listen_socket_close(nxt_task_t *task, void *obj,
203     void *data);
204 static void nxt_router_thread_exit_handler(nxt_task_t *task, void *obj,
205     void *data);
206 static void nxt_router_listen_socket_release(nxt_task_t *task,
207     nxt_socket_conf_t *skcf);
208 static void nxt_router_conf_release(nxt_task_t *task,
209     nxt_socket_conf_joint_t *joint);
210 
211 static void nxt_router_app_port_ready(nxt_task_t *task,
212     nxt_port_recv_msg_t *msg, void *data);
213 static void nxt_router_app_port_error(nxt_task_t *task,
214     nxt_port_recv_msg_t *msg, void *data);
215 
216 static void nxt_router_app_quit(nxt_task_t *task, nxt_app_t *app);
217 static void nxt_router_app_port_release(nxt_task_t *task, nxt_port_t *port,
218     uint32_t request_failed, uint32_t got_response);
219 static nxt_int_t nxt_router_app_port(nxt_task_t *task, nxt_app_t *app,
220     nxt_req_app_link_t *ra);
221 
222 static void nxt_router_app_prepare_request(nxt_task_t *task,
223     nxt_req_app_link_t *ra);
224 static nxt_int_t nxt_python_prepare_msg(nxt_task_t *task, nxt_app_request_t *r,
225     nxt_app_wmsg_t *wmsg);
226 static nxt_int_t nxt_php_prepare_msg(nxt_task_t *task, nxt_app_request_t *r,
227     nxt_app_wmsg_t *wmsg);
228 static nxt_int_t nxt_go_prepare_msg(nxt_task_t *task, nxt_app_request_t *r,
229     nxt_app_wmsg_t *wmsg);
230 static nxt_int_t nxt_perl_prepare_msg(nxt_task_t *task, nxt_app_request_t *r,
231     nxt_app_wmsg_t *wmsg);
232 static nxt_int_t nxt_ruby_prepare_msg(nxt_task_t *task, nxt_app_request_t *r,
233     nxt_app_wmsg_t *wmsg);
234 
235 static void nxt_router_conn_free(nxt_task_t *task, void *obj, void *data);
236 static void nxt_router_app_timeout(nxt_task_t *task, void *obj, void *data);
237 static void nxt_router_adjust_idle_timer(nxt_task_t *task, void *obj,
238     void *data);
239 static void nxt_router_app_idle_timeout(nxt_task_t *task, void *obj,
240     void *data);
241 static void nxt_router_app_release_handler(nxt_task_t *task, void *obj,
242     void *data);
243 
244 static const nxt_http_request_state_t  nxt_http_request_send_state;
245 static void nxt_http_request_send_body(nxt_task_t *task, void *obj, void *data);
246 
247 static nxt_router_t  *nxt_router;
248 
249 
250 static nxt_app_prepare_msg_t  nxt_app_prepare_msg[] = {
251     nxt_python_prepare_msg,
252     nxt_php_prepare_msg,
253     nxt_go_prepare_msg,
254     nxt_perl_prepare_msg,
255     nxt_ruby_prepare_msg,
256 };
257 
258 
259 nxt_int_t
260 nxt_router_start(nxt_task_t *task, void *data)
261 {
262     nxt_int_t      ret;
263     nxt_router_t   *router;
264     nxt_runtime_t  *rt;
265 
266     rt = task->thread->runtime;
267 
268     ret = nxt_http_init(task, rt);
269     if (nxt_slow_path(ret != NXT_OK)) {
270         return ret;
271     }
272 
273     router = nxt_zalloc(sizeof(nxt_router_t));
274     if (nxt_slow_path(router == NULL)) {
275         return NXT_ERROR;
276     }
277 
278     nxt_queue_init(&router->engines);
279     nxt_queue_init(&router->sockets);
280     nxt_queue_init(&router->apps);
281 
282     nxt_router = router;
283 
284     return NXT_OK;
285 }
286 
287 
288 static void
289 nxt_router_start_app_process_handler(nxt_task_t *task, nxt_port_t *port,
290     void *data)
291 {
292     size_t         size;
293     uint32_t       stream;
294     nxt_mp_t       *mp;
295     nxt_app_t      *app;
296     nxt_buf_t      *b;
297     nxt_port_t     *main_port;
298     nxt_runtime_t  *rt;
299 
300     app = data;
301 
302     rt = task->thread->runtime;
303     main_port = rt->port_by_type[NXT_PROCESS_MAIN];
304 
305     nxt_debug(task, "app '%V' %p start process", &app->name, app);
306 
307     size = app->name.length + 1 + app->conf.length;
308 
309     b = nxt_buf_mem_ts_alloc(task, task->thread->engine->mem_pool, size);
310 
311     if (nxt_slow_path(b == NULL)) {
312         goto failed;
313     }
314 
315     nxt_buf_cpystr(b, &app->name);
316     *b->mem.free++ = '\0';
317     nxt_buf_cpystr(b, &app->conf);
318 
319     stream = nxt_port_rpc_register_handler(task, port,
320                                            nxt_router_app_port_ready,
321                                            nxt_router_app_port_error,
322                                            -1, app);
323 
324     if (nxt_slow_path(stream == 0)) {
325         mp = b->data;
326         nxt_mp_free(mp, b);
327         nxt_mp_release(mp);
328 
329         goto failed;
330     }
331 
332     nxt_port_socket_write(task, main_port, NXT_PORT_MSG_START_WORKER, -1,
333                           stream, port->id, b);
334 
335     return;
336 
337 failed:
338 
339     nxt_thread_mutex_lock(&app->mutex);
340 
341     app->pending_processes--;
342 
343     nxt_thread_mutex_unlock(&app->mutex);
344 
345     nxt_router_app_use(task, app, -1);
346 }
347 
348 
349 static nxt_int_t
350 nxt_router_start_app_process(nxt_task_t *task, nxt_app_t *app)
351 {
352     nxt_int_t      res;
353     nxt_port_t     *router_port;
354     nxt_runtime_t  *rt;
355 
356     rt = task->thread->runtime;
357     router_port = rt->port_by_type[NXT_PROCESS_ROUTER];
358 
359     nxt_router_app_use(task, app, 1);
360 
361     res = nxt_port_post(task, router_port, nxt_router_start_app_process_handler,
362                         app);
363 
364     if (res == NXT_OK) {
365         return res;
366     }
367 
368     nxt_thread_mutex_lock(&app->mutex);
369 
370     app->pending_processes--;
371 
372     nxt_thread_mutex_unlock(&app->mutex);
373 
374     nxt_router_app_use(task, app, -1);
375 
376     return NXT_ERROR;
377 }
378 
379 
380 nxt_inline void
381 nxt_router_ra_init(nxt_task_t *task, nxt_req_app_link_t *ra,
382     nxt_req_conn_link_t *rc)
383 {
384     nxt_event_engine_t  *engine;
385 
386     engine = task->thread->engine;
387 
388     nxt_memzero(ra, sizeof(nxt_req_app_link_t));
389 
390     ra->stream = rc->stream;
391     ra->use_count = 1;
392     ra->rc = rc;
393     rc->ra = ra;
394     ra->reply_port = engine->port;
395     ra->ap = rc->ap;
396 
397     ra->work.handler = NULL;
398     ra->work.task = &engine->task;
399     ra->work.obj = ra;
400     ra->work.data = engine;
401 }
402 
403 
404 nxt_inline nxt_req_app_link_t *
405 nxt_router_ra_create(nxt_task_t *task, nxt_req_app_link_t *ra_src)
406 {
407     nxt_mp_t            *mp;
408     nxt_req_app_link_t  *ra;
409 
410     if (ra_src->mem_pool != NULL) {
411         return ra_src;
412     }
413 
414     mp = ra_src->ap->mem_pool;
415 
416     ra = nxt_mp_alloc(mp, sizeof(nxt_req_app_link_t));
417 
418     if (nxt_slow_path(ra == NULL)) {
419 
420         ra_src->rc->ra = NULL;
421         ra_src->rc = NULL;
422 
423         return NULL;
424     }
425 
426     nxt_mp_retain(mp);
427 
428     nxt_router_ra_init(task, ra, ra_src->rc);
429 
430     ra->mem_pool = mp;
431 
432     return ra;
433 }
434 
435 
436 nxt_inline nxt_bool_t
437 nxt_router_msg_cancel(nxt_task_t *task, nxt_msg_info_t *msg_info,
438     uint32_t stream)
439 {
440     nxt_buf_t   *b, *next;
441     nxt_bool_t  cancelled;
442 
443     if (msg_info->buf == NULL) {
444         return 0;
445     }
446 
447     cancelled = nxt_port_mmap_tracking_cancel(task, &msg_info->tracking,
448                                               stream);
449 
450     if (cancelled) {
451         nxt_debug(task, "stream #%uD: cancelled by router", stream);
452     }
453 
454     for (b = msg_info->buf; b != NULL; b = next) {
455         next = b->next;
456 
457         b->completion_handler = msg_info->completion_handler;
458 
459         if (b->is_port_mmap_sent) {
460             b->is_port_mmap_sent = cancelled == 0;
461             b->completion_handler(task, b, b->parent);
462         }
463     }
464 
465     msg_info->buf = NULL;
466 
467     return cancelled;
468 }
469 
470 
471 static void
472 nxt_router_ra_update_peer(nxt_task_t *task, nxt_req_app_link_t *ra);
473 
474 
475 static void
476 nxt_router_ra_update_peer_handler(nxt_task_t *task, void *obj, void *data)
477 {
478     nxt_req_app_link_t  *ra;
479 
480     ra = obj;
481 
482     nxt_router_ra_update_peer(task, ra);
483 
484     nxt_router_ra_use(task, ra, -1);
485 }
486 
487 
488 static void
489 nxt_router_ra_update_peer(nxt_task_t *task, nxt_req_app_link_t *ra)
490 {
491     nxt_event_engine_t   *engine;
492     nxt_req_conn_link_t  *rc;
493 
494     engine = ra->work.data;
495 
496     if (task->thread->engine != engine) {
497         nxt_router_ra_inc_use(ra);
498 
499         ra->work.handler = nxt_router_ra_update_peer_handler;
500         ra->work.task = &engine->task;
501         ra->work.next = NULL;
502 
503         nxt_debug(task, "ra stream #%uD post update peer to %p",
504                   ra->stream, engine);
505 
506         nxt_event_engine_post(engine, &ra->work);
507 
508         return;
509     }
510 
511     nxt_debug(task, "ra stream #%uD update peer", ra->stream);
512 
513     rc = ra->rc;
514 
515     if (rc != NULL && ra->app_port != NULL) {
516         nxt_port_rpc_ex_set_peer(task, engine->port, rc, ra->app_port->pid);
517     }
518 
519     nxt_router_ra_use(task, ra, -1);
520 }
521 
522 
523 static void
524 nxt_router_ra_release(nxt_task_t *task, nxt_req_app_link_t *ra)
525 {
526     nxt_mp_t                *mp;
527     nxt_req_conn_link_t     *rc;
528 
529     nxt_assert(task->thread->engine == ra->work.data);
530     nxt_assert(ra->use_count == 0);
531 
532     nxt_debug(task, "ra stream #%uD release", ra->stream);
533 
534     rc = ra->rc;
535 
536     if (rc != NULL) {
537         if (nxt_slow_path(ra->err_code != 0)) {
538             nxt_http_request_error(task, rc->ap->request, ra->err_code);
539 
540         } else {
541             rc->app_port = ra->app_port;
542             rc->msg_info = ra->msg_info;
543 
544             if (rc->app->timeout != 0) {
545                 rc->ap->timer.handler = nxt_router_app_timeout;
546                 nxt_timer_add(task->thread->engine, &rc->ap->timer,
547                               rc->app->timeout);
548             }
549 
550             ra->app_port = NULL;
551             ra->msg_info.buf = NULL;
552         }
553 
554         rc->ra = NULL;
555         ra->rc = NULL;
556     }
557 
558     if (ra->app_port != NULL) {
559         nxt_router_app_port_release(task, ra->app_port, 0, 1);
560 
561         ra->app_port = NULL;
562     }
563 
564     nxt_router_msg_cancel(task, &ra->msg_info, ra->stream);
565 
566     mp = ra->mem_pool;
567 
568     if (mp != NULL) {
569         nxt_mp_free(mp, ra);
570         nxt_mp_release(mp);
571     }
572 }
573 
574 
575 static void
576 nxt_router_ra_release_handler(nxt_task_t *task, void *obj, void *data)
577 {
578     nxt_req_app_link_t  *ra;
579 
580     ra = obj;
581 
582     nxt_assert(ra->work.data == data);
583 
584     nxt_atomic_fetch_add(&ra->use_count, -1);
585 
586     nxt_router_ra_release(task, ra);
587 }
588 
589 
590 static void
591 nxt_router_ra_use(nxt_task_t *task, nxt_req_app_link_t *ra, int i)
592 {
593     int                 c;
594     nxt_event_engine_t  *engine;
595 
596     c = nxt_atomic_fetch_add(&ra->use_count, i);
597 
598     if (i < 0 && c == -i) {
599         engine = ra->work.data;
600 
601         if (task->thread->engine == engine) {
602             nxt_router_ra_release(task, ra);
603 
604             return;
605         }
606 
607         nxt_router_ra_inc_use(ra);
608 
609         ra->work.handler = nxt_router_ra_release_handler;
610         ra->work.task = &engine->task;
611         ra->work.next = NULL;
612 
613         nxt_debug(task, "ra stream #%uD post release to %p",
614                   ra->stream, engine);
615 
616         nxt_event_engine_post(engine, &ra->work);
617     }
618 }
619 
620 
621 nxt_inline void
622 nxt_router_ra_error(nxt_req_app_link_t *ra, int code, const char *str)
623 {
624     ra->app_port = NULL;
625     ra->err_code = code;
626     ra->err_str = str;
627 }
628 
629 
630 nxt_inline void
631 nxt_router_ra_pending(nxt_task_t *task, nxt_app_t *app, nxt_req_app_link_t *ra)
632 {
633     nxt_queue_insert_tail(&ra->app_port->pending_requests,
634                           &ra->link_port_pending);
635     nxt_queue_insert_tail(&app->pending, &ra->link_app_pending);
636 
637     nxt_router_ra_inc_use(ra);
638 
639     ra->res_time = nxt_thread_monotonic_time(task->thread) + app->res_timeout;
640 
641     nxt_debug(task, "ra stream #%uD enqueue to pending_requests", ra->stream);
642 }
643 
644 
645 nxt_inline nxt_bool_t
646 nxt_queue_chk_remove(nxt_queue_link_t *lnk)
647 {
648     if (lnk->next != NULL) {
649         nxt_queue_remove(lnk);
650 
651         lnk->next = NULL;
652 
653         return 1;
654     }
655 
656     return 0;
657 }
658 
659 
660 nxt_inline void
661 nxt_router_rc_unlink(nxt_task_t *task, nxt_req_conn_link_t *rc)
662 {
663     int                 ra_use_delta;
664     nxt_req_app_link_t  *ra;
665 
666     if (rc->app_port != NULL) {
667         nxt_router_app_port_release(task, rc->app_port, 0, 1);
668 
669         rc->app_port = NULL;
670     }
671 
672     nxt_router_msg_cancel(task, &rc->msg_info, rc->stream);
673 
674     ra = rc->ra;
675 
676     if (ra != NULL) {
677         rc->ra = NULL;
678         ra->rc = NULL;
679 
680         ra_use_delta = 0;
681 
682         nxt_thread_mutex_lock(&rc->app->mutex);
683 
684         if (ra->link_app_requests.next == NULL
685             && ra->link_port_pending.next == NULL
686             && ra->link_app_pending.next == NULL)
687         {
688             ra = NULL;
689 
690         } else {
691             ra_use_delta -= nxt_queue_chk_remove(&ra->link_app_requests);
692             ra_use_delta -= nxt_queue_chk_remove(&ra->link_port_pending);
693             nxt_queue_chk_remove(&ra->link_app_pending);
694         }
695 
696         nxt_thread_mutex_unlock(&rc->app->mutex);
697 
698         if (ra != NULL) {
699             nxt_router_ra_use(task, ra, ra_use_delta);
700         }
701     }
702 
703     if (rc->app != NULL) {
704         nxt_router_app_use(task, rc->app, -1);
705 
706         rc->app = NULL;
707     }
708 
709     if (rc->ap != NULL) {
710         nxt_app_http_req_done(task, rc->ap);
711 
712         rc->ap = NULL;
713     }
714 }
715 
716 
717 void
718 nxt_router_new_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
719 {
720     nxt_port_new_port_handler(task, msg);
721 
722     if (msg->port_msg.stream == 0) {
723         return;
724     }
725 
726     if (msg->u.new_port == NULL
727         || msg->u.new_port->type != NXT_PROCESS_WORKER)
728     {
729         msg->port_msg.type = _NXT_PORT_MSG_RPC_ERROR;
730     }
731 
732     nxt_port_rpc_handler(task, msg);
733 }
734 
735 
736 void
737 nxt_router_conf_data_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
738 {
739     nxt_int_t               ret;
740     nxt_buf_t               *b;
741     nxt_router_temp_conf_t  *tmcf;
742 
743     tmcf = nxt_router_temp_conf(task);
744     if (nxt_slow_path(tmcf == NULL)) {
745         return;
746     }
747 
748     nxt_debug(task, "nxt_router_conf_data_handler(%O): %*s",
749               nxt_buf_used_size(msg->buf),
750               (size_t) nxt_buf_used_size(msg->buf), msg->buf->mem.pos);
751 
752     tmcf->conf->router = nxt_router;
753     tmcf->stream = msg->port_msg.stream;
754     tmcf->port = nxt_runtime_port_find(task->thread->runtime,
755                                        msg->port_msg.pid,
756                                        msg->port_msg.reply_port);
757 
758     b = nxt_buf_chk_make_plain(tmcf->conf->mem_pool, msg->buf, msg->size);
759     if (nxt_slow_path(b == NULL)) {
760         nxt_router_conf_error(task, tmcf);
761 
762         return;
763     }
764 
765     ret = nxt_router_conf_create(task, tmcf, b->mem.pos, b->mem.free);
766 
767     if (nxt_fast_path(ret == NXT_OK)) {
768         nxt_router_conf_apply(task, tmcf, NULL);
769 
770     } else {
771         nxt_router_conf_error(task, tmcf);
772     }
773 }
774 
775 
776 static void
777 nxt_router_app_process_remove_pid(nxt_task_t *task, nxt_port_t *port,
778     void *data)
779 {
780     union {
781         nxt_pid_t  removed_pid;
782         void       *data;
783     } u;
784 
785     u.data = data;
786 
787     nxt_port_rpc_remove_peer(task, port, u.removed_pid);
788 }
789 
790 
791 void
792 nxt_router_remove_pid_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
793 {
794     nxt_event_engine_t  *engine;
795 
796     nxt_port_remove_pid_handler(task, msg);
797 
798     if (msg->port_msg.stream == 0) {
799         return;
800     }
801 
802     nxt_queue_each(engine, &nxt_router->engines, nxt_event_engine_t, link0)
803     {
804         nxt_port_post(task, engine->port, nxt_router_app_process_remove_pid,
805                       msg->u.data);
806     }
807     nxt_queue_loop;
808 
809     msg->port_msg.type = _NXT_PORT_MSG_RPC_ERROR;
810 
811     nxt_port_rpc_handler(task, msg);
812 }
813 
814 
815 static nxt_router_temp_conf_t *
816 nxt_router_temp_conf(nxt_task_t *task)
817 {
818     nxt_mp_t                *mp, *tmp;
819     nxt_router_conf_t       *rtcf;
820     nxt_router_temp_conf_t  *tmcf;
821 
822     mp = nxt_mp_create(1024, 128, 256, 32);
823     if (nxt_slow_path(mp == NULL)) {
824         return NULL;
825     }
826 
827     rtcf = nxt_mp_zget(mp, sizeof(nxt_router_conf_t));
828     if (nxt_slow_path(rtcf == NULL)) {
829         goto fail;
830     }
831 
832     rtcf->mem_pool = mp;
833 
834     tmp = nxt_mp_create(1024, 128, 256, 32);
835     if (nxt_slow_path(tmp == NULL)) {
836         goto fail;
837     }
838 
839     tmcf = nxt_mp_zget(tmp, sizeof(nxt_router_temp_conf_t));
840     if (nxt_slow_path(tmcf == NULL)) {
841         goto temp_fail;
842     }
843 
844     tmcf->mem_pool = tmp;
845     tmcf->conf = rtcf;
846     tmcf->count = 1;
847     tmcf->engine = task->thread->engine;
848 
849     tmcf->engines = nxt_array_create(tmcf->mem_pool, 4,
850                                      sizeof(nxt_router_engine_conf_t));
851     if (nxt_slow_path(tmcf->engines == NULL)) {
852         goto temp_fail;
853     }
854 
855     nxt_queue_init(&tmcf->deleting);
856     nxt_queue_init(&tmcf->keeping);
857     nxt_queue_init(&tmcf->updating);
858     nxt_queue_init(&tmcf->pending);
859     nxt_queue_init(&tmcf->creating);
860 
861     nxt_queue_init(&tmcf->apps);
862     nxt_queue_init(&tmcf->previous);
863 
864     return tmcf;
865 
866 temp_fail:
867 
868     nxt_mp_destroy(tmp);
869 
870 fail:
871 
872     nxt_mp_destroy(mp);
873 
874     return NULL;
875 }
876 
877 
878 nxt_inline nxt_bool_t
879 nxt_router_app_can_start(nxt_app_t *app)
880 {
881     return app->processes + app->pending_processes < app->max_processes
882             && app->pending_processes < app->max_pending_processes;
883 }
884 
885 
886 nxt_inline nxt_bool_t
887 nxt_router_app_need_start(nxt_app_t *app)
888 {
889     return app->idle_processes + app->pending_processes
890             < app->spare_processes;
891 }
892 
893 
894 static void
895 nxt_router_conf_apply(nxt_task_t *task, void *obj, void *data)
896 {
897     nxt_int_t                    ret;
898     nxt_app_t                    *app;
899     nxt_router_t                 *router;
900     nxt_runtime_t                *rt;
901     nxt_queue_link_t             *qlk;
902     nxt_socket_conf_t            *skcf;
903     nxt_router_temp_conf_t       *tmcf;
904     const nxt_event_interface_t  *interface;
905 
906     tmcf = obj;
907 
908     qlk = nxt_queue_first(&tmcf->pending);
909 
910     if (qlk != nxt_queue_tail(&tmcf->pending)) {
911         nxt_queue_remove(qlk);
912         nxt_queue_insert_tail(&tmcf->creating, qlk);
913 
914         skcf = nxt_queue_link_data(qlk, nxt_socket_conf_t, link);
915 
916         nxt_router_listen_socket_rpc_create(task, tmcf, skcf);
917 
918         return;
919     }
920 
921     nxt_queue_each(app, &tmcf->apps, nxt_app_t, link) {
922 
923         if (nxt_router_app_need_start(app)) {
924             nxt_router_app_rpc_create(task, tmcf, app);
925             return;
926         }
927 
928     } nxt_queue_loop;
929 
930     rt = task->thread->runtime;
931 
932     interface = nxt_service_get(rt->services, "engine", NULL);
933 
934     router = tmcf->conf->router;
935 
936     ret = nxt_router_engines_create(task, router, tmcf, interface);
937     if (nxt_slow_path(ret != NXT_OK)) {
938         goto fail;
939     }
940 
941     ret = nxt_router_threads_create(task, rt, tmcf);
942     if (nxt_slow_path(ret != NXT_OK)) {
943         goto fail;
944     }
945 
946     nxt_router_apps_sort(task, router, tmcf);
947 
948     nxt_router_engines_post(router, tmcf);
949 
950     nxt_queue_add(&router->sockets, &tmcf->updating);
951     nxt_queue_add(&router->sockets, &tmcf->creating);
952 
953     nxt_router_conf_ready(task, tmcf);
954 
955     return;
956 
957 fail:
958 
959     nxt_router_conf_error(task, tmcf);
960 
961     return;
962 }
963 
964 
965 static void
966 nxt_router_conf_wait(nxt_task_t *task, void *obj, void *data)
967 {
968     nxt_joint_job_t  *job;
969 
970     job = obj;
971 
972     nxt_router_conf_ready(task, job->tmcf);
973 }
974 
975 
976 static void
977 nxt_router_conf_ready(nxt_task_t *task, nxt_router_temp_conf_t *tmcf)
978 {
979     nxt_debug(task, "temp conf count:%D", tmcf->count);
980 
981     if (--tmcf->count == 0) {
982         nxt_router_conf_send(task, tmcf, NXT_PORT_MSG_RPC_READY_LAST);
983     }
984 }
985 
986 
987 static void
988 nxt_router_conf_error(nxt_task_t *task, nxt_router_temp_conf_t *tmcf)
989 {
990     nxt_app_t          *app;
991     nxt_queue_t        new_socket_confs;
992     nxt_socket_t       s;
993     nxt_router_t       *router;
994     nxt_queue_link_t   *qlk;
995     nxt_socket_conf_t  *skcf;
996 
997     nxt_alert(task, "failed to apply new conf");
998 
999     for (qlk = nxt_queue_first(&tmcf->creating);
1000          qlk != nxt_queue_tail(&tmcf->creating);
1001          qlk = nxt_queue_next(qlk))
1002     {
1003         skcf = nxt_queue_link_data(qlk, nxt_socket_conf_t, link);
1004         s = skcf->listen->socket;
1005 
1006         if (s != -1) {
1007             nxt_socket_close(task, s);
1008         }
1009 
1010         nxt_free(skcf->listen);
1011     }
1012 
1013     nxt_queue_init(&new_socket_confs);
1014     nxt_queue_add(&new_socket_confs, &tmcf->updating);
1015     nxt_queue_add(&new_socket_confs, &tmcf->pending);
1016     nxt_queue_add(&new_socket_confs, &tmcf->creating);
1017 
1018     nxt_queue_each(skcf, &new_socket_confs, nxt_socket_conf_t, link) {
1019 
1020         if (skcf->application != NULL) {
1021             nxt_router_app_use(task, skcf->application, -1);
1022             skcf->application = NULL;
1023         }
1024 
1025     } nxt_queue_loop;
1026 
1027     nxt_queue_each(app, &tmcf->apps, nxt_app_t, link) {
1028 
1029         nxt_router_app_quit(task, app);
1030 
1031     } nxt_queue_loop;
1032 
1033     router = tmcf->conf->router;
1034 
1035     nxt_queue_add(&router->sockets, &tmcf->keeping);
1036     nxt_queue_add(&router->sockets, &tmcf->deleting);
1037 
1038     nxt_queue_add(&router->apps, &tmcf->previous);
1039 
1040     // TODO: new engines and threads
1041 
1042     nxt_mp_destroy(tmcf->conf->mem_pool);
1043 
1044     nxt_router_conf_send(task, tmcf, NXT_PORT_MSG_RPC_ERROR);
1045 }
1046 
1047 
1048 static void
1049 nxt_router_conf_send(nxt_task_t *task, nxt_router_temp_conf_t *tmcf,
1050     nxt_port_msg_type_t type)
1051 {
1052     nxt_port_socket_write(task, tmcf->port, type, -1, tmcf->stream, 0, NULL);
1053 }
1054 
1055 
1056 static nxt_conf_map_t  nxt_router_conf[] = {
1057     {
1058         nxt_string("listeners_threads"),
1059         NXT_CONF_MAP_INT32,
1060         offsetof(nxt_router_conf_t, threads),
1061     },
1062 };
1063 
1064 
1065 static nxt_conf_map_t  nxt_router_app_conf[] = {
1066     {
1067         nxt_string("type"),
1068         NXT_CONF_MAP_STR,
1069         offsetof(nxt_router_app_conf_t, type),
1070     },
1071 
1072     {
1073         nxt_string("limits"),
1074         NXT_CONF_MAP_PTR,
1075         offsetof(nxt_router_app_conf_t, limits_value),
1076     },
1077 
1078     {
1079         nxt_string("processes"),
1080         NXT_CONF_MAP_INT32,
1081         offsetof(nxt_router_app_conf_t, processes),
1082     },
1083 
1084     {
1085         nxt_string("processes"),
1086         NXT_CONF_MAP_PTR,
1087         offsetof(nxt_router_app_conf_t, processes_value),
1088     },
1089 };
1090 
1091 
1092 static nxt_conf_map_t  nxt_router_app_limits_conf[] = {
1093     {
1094         nxt_string("timeout"),
1095         NXT_CONF_MAP_MSEC,
1096         offsetof(nxt_router_app_conf_t, timeout),
1097     },
1098 
1099     {
1100         nxt_string("reschedule_timeout"),
1101         NXT_CONF_MAP_MSEC,
1102         offsetof(nxt_router_app_conf_t, res_timeout),
1103     },
1104 
1105     {
1106         nxt_string("requests"),
1107         NXT_CONF_MAP_INT32,
1108         offsetof(nxt_router_app_conf_t, requests),
1109     },
1110 };
1111 
1112 
1113 static nxt_conf_map_t  nxt_router_app_processes_conf[] = {
1114     {
1115         nxt_string("spare"),
1116         NXT_CONF_MAP_INT32,
1117         offsetof(nxt_router_app_conf_t, spare_processes),
1118     },
1119 
1120     {
1121         nxt_string("max"),
1122         NXT_CONF_MAP_INT32,
1123         offsetof(nxt_router_app_conf_t, max_processes),
1124     },
1125 
1126     {
1127         nxt_string("idle_timeout"),
1128         NXT_CONF_MAP_MSEC,
1129         offsetof(nxt_router_app_conf_t, idle_timeout),
1130     },
1131 };
1132 
1133 
1134 static nxt_conf_map_t  nxt_router_listener_conf[] = {
1135     {
1136         nxt_string("application"),
1137         NXT_CONF_MAP_STR,
1138         offsetof(nxt_router_listener_conf_t, application),
1139     },
1140 };
1141 
1142 
1143 static nxt_conf_map_t  nxt_router_http_conf[] = {
1144     {
1145         nxt_string("header_buffer_size"),
1146         NXT_CONF_MAP_SIZE,
1147         offsetof(nxt_socket_conf_t, header_buffer_size),
1148     },
1149 
1150     {
1151         nxt_string("large_header_buffer_size"),
1152         NXT_CONF_MAP_SIZE,
1153         offsetof(nxt_socket_conf_t, large_header_buffer_size),
1154     },
1155 
1156     {
1157         nxt_string("large_header_buffers"),
1158         NXT_CONF_MAP_SIZE,
1159         offsetof(nxt_socket_conf_t, large_header_buffers),
1160     },
1161 
1162     {
1163         nxt_string("body_buffer_size"),
1164         NXT_CONF_MAP_SIZE,
1165         offsetof(nxt_socket_conf_t, body_buffer_size),
1166     },
1167 
1168     {
1169         nxt_string("max_body_size"),
1170         NXT_CONF_MAP_SIZE,
1171         offsetof(nxt_socket_conf_t, max_body_size),
1172     },
1173 
1174     {
1175         nxt_string("idle_timeout"),
1176         NXT_CONF_MAP_MSEC,
1177         offsetof(nxt_socket_conf_t, idle_timeout),
1178     },
1179 
1180     {
1181         nxt_string("header_read_timeout"),
1182         NXT_CONF_MAP_MSEC,
1183         offsetof(nxt_socket_conf_t, header_read_timeout),
1184     },
1185 
1186     {
1187         nxt_string("body_read_timeout"),
1188         NXT_CONF_MAP_MSEC,
1189         offsetof(nxt_socket_conf_t, body_read_timeout),
1190     },
1191 
1192     {
1193         nxt_string("send_timeout"),
1194         NXT_CONF_MAP_MSEC,
1195         offsetof(nxt_socket_conf_t, send_timeout),
1196     },
1197 };
1198 
1199 
1200 static nxt_int_t
1201 nxt_router_conf_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf,
1202     u_char *start, u_char *end)
1203 {
1204     u_char                      *p;
1205     size_t                      size;
1206     nxt_mp_t                    *mp;
1207     uint32_t                    next;
1208     nxt_int_t                   ret;
1209     nxt_str_t                   name;
1210     nxt_app_t                   *app, *prev;
1211     nxt_router_t                *router;
1212     nxt_conf_value_t            *conf, *http;
1213     nxt_conf_value_t            *applications, *application;
1214     nxt_conf_value_t            *listeners, *listener;
1215     nxt_socket_conf_t           *skcf;
1216     nxt_event_engine_t          *engine;
1217     nxt_app_lang_module_t       *lang;
1218     nxt_router_app_conf_t       apcf;
1219     nxt_router_listener_conf_t  lscf;
1220 
1221     static nxt_str_t  http_path = nxt_string("/http");
1222     static nxt_str_t  applications_path = nxt_string("/applications");
1223     static nxt_str_t  listeners_path = nxt_string("/listeners");
1224 
1225     conf = nxt_conf_json_parse(tmcf->mem_pool, start, end, NULL);
1226     if (conf == NULL) {
1227         nxt_alert(task, "configuration parsing error");
1228         return NXT_ERROR;
1229     }
1230 
1231     mp = tmcf->conf->mem_pool;
1232 
1233     ret = nxt_conf_map_object(mp, conf, nxt_router_conf,
1234                               nxt_nitems(nxt_router_conf), tmcf->conf);
1235     if (ret != NXT_OK) {
1236         nxt_alert(task, "root map error");
1237         return NXT_ERROR;
1238     }
1239 
1240     if (tmcf->conf->threads == 0) {
1241         tmcf->conf->threads = nxt_ncpu;
1242     }
1243 
1244     applications = nxt_conf_get_path(conf, &applications_path);
1245     if (applications == NULL) {
1246         nxt_alert(task, "no \"applications\" block");
1247         return NXT_ERROR;
1248     }
1249 
1250     router = tmcf->conf->router;
1251 
1252     next = 0;
1253 
1254     for ( ;; ) {
1255         application = nxt_conf_next_object_member(applications, &name, &next);
1256         if (application == NULL) {
1257             break;
1258         }
1259 
1260         nxt_debug(task, "application \"%V\"", &name);
1261 
1262         size = nxt_conf_json_length(application, NULL);
1263 
1264         app = nxt_malloc(sizeof(nxt_app_t) + name.length + size);
1265         if (app == NULL) {
1266             goto fail;
1267         }
1268 
1269         nxt_memzero(app, sizeof(nxt_app_t));
1270 
1271         app->name.start = nxt_pointer_to(app, sizeof(nxt_app_t));
1272         app->conf.start = nxt_pointer_to(app, sizeof(nxt_app_t) + name.length);
1273 
1274         p = nxt_conf_json_print(app->conf.start, application, NULL);
1275         app->conf.length = p - app->conf.start;
1276 
1277         nxt_assert(app->conf.length <= size);
1278 
1279         nxt_debug(task, "application conf \"%V\"", &app->conf);
1280 
1281         prev = nxt_router_app_find(&router->apps, &name);
1282 
1283         if (prev != NULL && nxt_strstr_eq(&app->conf, &prev->conf)) {
1284             nxt_free(app);
1285 
1286             nxt_queue_remove(&prev->link);
1287             nxt_queue_insert_tail(&tmcf->previous, &prev->link);
1288             continue;
1289         }
1290 
1291         apcf.processes = 1;
1292         apcf.max_processes = 1;
1293         apcf.spare_processes = 0;
1294         apcf.timeout = 0;
1295         apcf.res_timeout = 1000;
1296         apcf.idle_timeout = 15000;
1297         apcf.requests = 0;
1298         apcf.limits_value = NULL;
1299         apcf.processes_value = NULL;
1300 
1301         ret = nxt_conf_map_object(mp, application, nxt_router_app_conf,
1302                                   nxt_nitems(nxt_router_app_conf), &apcf);
1303         if (ret != NXT_OK) {
1304             nxt_alert(task, "application map error");
1305             goto app_fail;
1306         }
1307 
1308         if (apcf.limits_value != NULL) {
1309 
1310             if (nxt_conf_type(apcf.limits_value) != NXT_CONF_OBJECT) {
1311                 nxt_alert(task, "application limits is not object");
1312                 goto app_fail;
1313             }
1314 
1315             ret = nxt_conf_map_object(mp, apcf.limits_value,
1316                                       nxt_router_app_limits_conf,
1317                                       nxt_nitems(nxt_router_app_limits_conf),
1318                                       &apcf);
1319             if (ret != NXT_OK) {
1320                 nxt_alert(task, "application limits map error");
1321                 goto app_fail;
1322             }
1323         }
1324 
1325         if (apcf.processes_value != NULL
1326             && nxt_conf_type(apcf.processes_value) == NXT_CONF_OBJECT)
1327         {
1328             ret = nxt_conf_map_object(mp, apcf.processes_value,
1329                                       nxt_router_app_processes_conf,
1330                                       nxt_nitems(nxt_router_app_processes_conf),
1331                                       &apcf);
1332             if (ret != NXT_OK) {
1333                 nxt_alert(task, "application processes map error");
1334                 goto app_fail;
1335             }
1336 
1337         } else {
1338             apcf.max_processes = apcf.processes;
1339             apcf.spare_processes = apcf.processes;
1340         }
1341 
1342         nxt_debug(task, "application type: %V", &apcf.type);
1343         nxt_debug(task, "application processes: %D", apcf.processes);
1344         nxt_debug(task, "application request timeout: %M", apcf.timeout);
1345         nxt_debug(task, "application reschedule timeout: %M", apcf.res_timeout);
1346         nxt_debug(task, "application requests: %D", apcf.requests);
1347 
1348         lang = nxt_app_lang_module(task->thread->runtime, &apcf.type);
1349 
1350         if (lang == NULL) {
1351             nxt_alert(task, "unknown application type: \"%V\"", &apcf.type);
1352             goto app_fail;
1353         }
1354 
1355         nxt_debug(task, "application language module: \"%s\"", lang->file);
1356 
1357         ret = nxt_thread_mutex_create(&app->mutex);
1358         if (ret != NXT_OK) {
1359             goto app_fail;
1360         }
1361 
1362         nxt_queue_init(&app->ports);
1363         nxt_queue_init(&app->spare_ports);
1364         nxt_queue_init(&app->idle_ports);
1365         nxt_queue_init(&app->requests);
1366         nxt_queue_init(&app->pending);
1367 
1368         app->name.length = name.length;
1369         nxt_memcpy(app->name.start, name.start, name.length);
1370 
1371         app->type = lang->type;
1372         app->max_processes = apcf.max_processes;
1373         app->spare_processes = apcf.spare_processes;
1374         app->max_pending_processes = apcf.spare_processes
1375                                       ? apcf.spare_processes : 1;
1376         app->timeout = apcf.timeout;
1377         app->res_timeout = apcf.res_timeout * 1000000;
1378         app->idle_timeout = apcf.idle_timeout;
1379         app->live = 1;
1380         app->max_pending_responses = 2;
1381         app->max_requests = apcf.requests;
1382         app->prepare_msg = nxt_app_prepare_msg[lang->type];
1383 
1384         engine = task->thread->engine;
1385 
1386         app->engine = engine;
1387 
1388         app->idle_timer.precision = NXT_TIMER_DEFAULT_PRECISION;
1389         app->idle_timer.work_queue = &engine->fast_work_queue;
1390         app->idle_timer.handler = nxt_router_app_idle_timeout;
1391         app->idle_timer.task = &engine->task;
1392         app->idle_timer.log = app->idle_timer.task->log;
1393 
1394         app->adjust_idle_work.handler = nxt_router_adjust_idle_timer;
1395         app->adjust_idle_work.task = &engine->task;
1396         app->adjust_idle_work.obj = app;
1397 
1398         nxt_queue_insert_tail(&tmcf->apps, &app->link);
1399 
1400         nxt_router_app_use(task, app, 1);
1401     }
1402 
1403     http = nxt_conf_get_path(conf, &http_path);
1404 #if 0
1405     if (http == NULL) {
1406         nxt_alert(task, "no \"http\" block");
1407         return NXT_ERROR;
1408     }
1409 #endif
1410 
1411     listeners = nxt_conf_get_path(conf, &listeners_path);
1412     if (listeners == NULL) {
1413         nxt_alert(task, "no \"listeners\" block");
1414         return NXT_ERROR;
1415     }
1416 
1417     next = 0;
1418 
1419     for ( ;; ) {
1420         listener = nxt_conf_next_object_member(listeners, &name, &next);
1421         if (listener == NULL) {
1422             break;
1423         }
1424 
1425         skcf = nxt_router_socket_conf(task, tmcf, &name);
1426         if (skcf == NULL) {
1427             goto fail;
1428         }
1429 
1430         ret = nxt_conf_map_object(mp, listener, nxt_router_listener_conf,
1431                                   nxt_nitems(nxt_router_listener_conf), &lscf);
1432         if (ret != NXT_OK) {
1433             nxt_alert(task, "listener map error");
1434             goto fail;
1435         }
1436 
1437         nxt_debug(task, "application: %V", &lscf.application);
1438 
1439         // STUB, default values if http block is not defined.
1440         skcf->header_buffer_size = 2048;
1441         skcf->large_header_buffer_size = 8192;
1442         skcf->large_header_buffers = 4;
1443         skcf->body_buffer_size = 16 * 1024;
1444         skcf->max_body_size = 2 * 1024 * 1024;
1445         skcf->idle_timeout = 65000;
1446         skcf->header_read_timeout = 5000;
1447         skcf->body_read_timeout = 5000;
1448         skcf->send_timeout = 5000;
1449 
1450         if (http != NULL) {
1451             ret = nxt_conf_map_object(mp, http, nxt_router_http_conf,
1452                                       nxt_nitems(nxt_router_http_conf), skcf);
1453             if (ret != NXT_OK) {
1454                 nxt_alert(task, "http map error");
1455                 goto fail;
1456             }
1457         }
1458 
1459         skcf->listen->handler = nxt_http_conn_init;
1460         skcf->router_conf = tmcf->conf;
1461         skcf->router_conf->count++;
1462         skcf->application = nxt_router_listener_application(tmcf,
1463                                                             &lscf.application);
1464         nxt_router_app_use(task, skcf->application, 1);
1465     }
1466 
1467     nxt_queue_add(&tmcf->deleting, &router->sockets);
1468     nxt_queue_init(&router->sockets);
1469 
1470     return NXT_OK;
1471 
1472 app_fail:
1473 
1474     nxt_free(app);
1475 
1476 fail:
1477 
1478     nxt_queue_each(app, &tmcf->apps, nxt_app_t, link) {
1479 
1480         nxt_queue_remove(&app->link);
1481         nxt_thread_mutex_destroy(&app->mutex);
1482         nxt_free(app);
1483 
1484     } nxt_queue_loop;
1485 
1486     return NXT_ERROR;
1487 }
1488 
1489 
1490 static nxt_app_t *
1491 nxt_router_app_find(nxt_queue_t *queue, nxt_str_t *name)
1492 {
1493     nxt_app_t  *app;
1494 
1495     nxt_queue_each(app, queue, nxt_app_t, link) {
1496 
1497         if (nxt_strstr_eq(name, &app->name)) {
1498             return app;
1499         }
1500 
1501     } nxt_queue_loop;
1502 
1503     return NULL;
1504 }
1505 
1506 
1507 static nxt_app_t *
1508 nxt_router_listener_application(nxt_router_temp_conf_t *tmcf, nxt_str_t *name)
1509 {
1510     nxt_app_t  *app;
1511 
1512     app = nxt_router_app_find(&tmcf->apps, name);
1513 
1514     if (app == NULL) {
1515         app = nxt_router_app_find(&tmcf->previous, name);
1516     }
1517 
1518     return app;
1519 }
1520 
1521 
1522 static nxt_socket_conf_t *
1523 nxt_router_socket_conf(nxt_task_t *task, nxt_router_temp_conf_t *tmcf,
1524     nxt_str_t *name)
1525 {
1526     size_t               size;
1527     nxt_int_t            ret;
1528     nxt_bool_t           wildcard;
1529     nxt_sockaddr_t       *sa;
1530     nxt_socket_conf_t    *skcf;
1531     nxt_listen_socket_t  *ls;
1532 
1533     sa = nxt_sockaddr_parse(tmcf->mem_pool, name);
1534     if (nxt_slow_path(sa == NULL)) {
1535         nxt_alert(task, "invalid listener \"%V\"", name);
1536         return NULL;
1537     }
1538 
1539     sa->type = SOCK_STREAM;
1540 
1541     nxt_debug(task, "router listener: \"%*s\"",
1542               (size_t) sa->length, nxt_sockaddr_start(sa));
1543 
1544     skcf = nxt_mp_zget(tmcf->conf->mem_pool, sizeof(nxt_socket_conf_t));
1545     if (nxt_slow_path(skcf == NULL)) {
1546         return NULL;
1547     }
1548 
1549     size = nxt_sockaddr_size(sa);
1550 
1551     ret = nxt_router_listen_socket_find(tmcf, skcf, sa);
1552 
1553     if (ret != NXT_OK) {
1554 
1555         ls = nxt_zalloc(sizeof(nxt_listen_socket_t) + size);
1556         if (nxt_slow_path(ls == NULL)) {
1557             return NULL;
1558         }
1559 
1560         skcf->listen = ls;
1561 
1562         ls->sockaddr = nxt_pointer_to(ls, sizeof(nxt_listen_socket_t));
1563         nxt_memcpy(ls->sockaddr, sa, size);
1564 
1565         nxt_listen_socket_remote_size(ls);
1566 
1567         ls->socket = -1;
1568         ls->backlog = NXT_LISTEN_BACKLOG;
1569         ls->flags = NXT_NONBLOCK;
1570         ls->read_after_accept = 1;
1571     }
1572 
1573     switch (sa->u.sockaddr.sa_family) {
1574 #if (NXT_HAVE_UNIX_DOMAIN)
1575     case AF_UNIX:
1576         wildcard = 0;
1577         break;
1578 #endif
1579 #if (NXT_INET6)
1580     case AF_INET6:
1581         wildcard = IN6_IS_ADDR_UNSPECIFIED(&sa->u.sockaddr_in6.sin6_addr);
1582         break;
1583 #endif
1584     case AF_INET:
1585     default:
1586         wildcard = (sa->u.sockaddr_in.sin_addr.s_addr == INADDR_ANY);
1587         break;
1588     }
1589 
1590     if (!wildcard) {
1591         skcf->sockaddr = nxt_mp_zget(tmcf->conf->mem_pool, size);
1592         if (nxt_slow_path(skcf->sockaddr == NULL)) {
1593             return NULL;
1594         }
1595 
1596         nxt_memcpy(skcf->sockaddr, sa, size);
1597     }
1598 
1599     return skcf;
1600 }
1601 
1602 
1603 static nxt_int_t
1604 nxt_router_listen_socket_find(nxt_router_temp_conf_t *tmcf,
1605     nxt_socket_conf_t *nskcf, nxt_sockaddr_t *sa)
1606 {
1607     nxt_router_t       *router;
1608     nxt_queue_link_t   *qlk;
1609     nxt_socket_conf_t  *skcf;
1610 
1611     router = tmcf->conf->router;
1612 
1613     for (qlk = nxt_queue_first(&router->sockets);
1614          qlk != nxt_queue_tail(&router->sockets);
1615          qlk = nxt_queue_next(qlk))
1616     {
1617         skcf = nxt_queue_link_data(qlk, nxt_socket_conf_t, link);
1618 
1619         if (nxt_sockaddr_cmp(skcf->listen->sockaddr, sa)) {
1620             nskcf->listen = skcf->listen;
1621 
1622             nxt_queue_remove(qlk);
1623             nxt_queue_insert_tail(&tmcf->keeping, qlk);
1624 
1625             nxt_queue_insert_tail(&tmcf->updating, &nskcf->link);
1626 
1627             return NXT_OK;
1628         }
1629     }
1630 
1631     nxt_queue_insert_tail(&tmcf->pending, &nskcf->link);
1632 
1633     return NXT_DECLINED;
1634 }
1635 
1636 
1637 static void
1638 nxt_router_listen_socket_rpc_create(nxt_task_t *task,
1639     nxt_router_temp_conf_t *tmcf, nxt_socket_conf_t *skcf)
1640 {
1641     size_t            size;
1642     uint32_t          stream;
1643     nxt_buf_t         *b;
1644     nxt_port_t        *main_port, *router_port;
1645     nxt_runtime_t     *rt;
1646     nxt_socket_rpc_t  *rpc;
1647 
1648     rpc = nxt_mp_alloc(tmcf->mem_pool, sizeof(nxt_socket_rpc_t));
1649     if (rpc == NULL) {
1650         goto fail;
1651     }
1652 
1653     rpc->socket_conf = skcf;
1654     rpc->temp_conf = tmcf;
1655 
1656     size = nxt_sockaddr_size(skcf->listen->sockaddr);
1657 
1658     b = nxt_buf_mem_alloc(tmcf->mem_pool, size, 0);
1659     if (b == NULL) {
1660         goto fail;
1661     }
1662 
1663     b->mem.free = nxt_cpymem(b->mem.free, skcf->listen->sockaddr, size);
1664 
1665     rt = task->thread->runtime;
1666     main_port = rt->port_by_type[NXT_PROCESS_MAIN];
1667     router_port = rt->port_by_type[NXT_PROCESS_ROUTER];
1668 
1669     stream = nxt_port_rpc_register_handler(task, router_port,
1670                                            nxt_router_listen_socket_ready,
1671                                            nxt_router_listen_socket_error,
1672                                            main_port->pid, rpc);
1673     if (stream == 0) {
1674         goto fail;
1675     }
1676 
1677     nxt_port_socket_write(task, main_port, NXT_PORT_MSG_SOCKET, -1,
1678                           stream, router_port->id, b);
1679 
1680     return;
1681 
1682 fail:
1683 
1684     nxt_router_conf_error(task, tmcf);
1685 }
1686 
1687 
1688 static void
1689 nxt_router_listen_socket_ready(nxt_task_t *task, nxt_port_recv_msg_t *msg,
1690     void *data)
1691 {
1692     nxt_int_t         ret;
1693     nxt_socket_t      s;
1694     nxt_socket_rpc_t  *rpc;
1695 
1696     rpc = data;
1697 
1698     s = msg->fd;
1699 
1700     ret = nxt_socket_nonblocking(task, s);
1701     if (nxt_slow_path(ret != NXT_OK)) {
1702         goto fail;
1703     }
1704 
1705     nxt_socket_defer_accept(task, s, rpc->socket_conf->listen->sockaddr);
1706 
1707     ret = nxt_listen_socket(task, s, NXT_LISTEN_BACKLOG);
1708     if (nxt_slow_path(ret != NXT_OK)) {
1709         goto fail;
1710     }
1711 
1712     rpc->socket_conf->listen->socket = s;
1713 
1714     nxt_work_queue_add(&task->thread->engine->fast_work_queue,
1715                        nxt_router_conf_apply, task, rpc->temp_conf, NULL);
1716 
1717     return;
1718 
1719 fail:
1720 
1721     nxt_socket_close(task, s);
1722 
1723     nxt_router_conf_error(task, rpc->temp_conf);
1724 }
1725 
1726 
1727 static void
1728 nxt_router_listen_socket_error(nxt_task_t *task, nxt_port_recv_msg_t *msg,
1729     void *data)
1730 {
1731     u_char                  *p;
1732     size_t                  size;
1733     uint8_t                 error;
1734     nxt_buf_t               *in, *out;
1735     nxt_sockaddr_t          *sa;
1736     nxt_socket_rpc_t        *rpc;
1737     nxt_router_temp_conf_t  *tmcf;
1738 
1739     static nxt_str_t  socket_errors[] = {
1740         nxt_string("ListenerSystem"),
1741         nxt_string("ListenerNoIPv6"),
1742         nxt_string("ListenerPort"),
1743         nxt_string("ListenerInUse"),
1744         nxt_string("ListenerNoAddress"),
1745         nxt_string("ListenerNoAccess"),
1746         nxt_string("ListenerPath"),
1747     };
1748 
1749     rpc = data;
1750     sa = rpc->socket_conf->listen->sockaddr;
1751     tmcf = rpc->temp_conf;
1752 
1753     in = nxt_buf_chk_make_plain(tmcf->mem_pool, msg->buf, msg->size);
1754 
1755     if (nxt_slow_path(in == NULL)) {
1756         return;
1757     }
1758 
1759     p = in->mem.pos;
1760 
1761     error = *p++;
1762 
1763     size = sizeof("listen socket error: ") - 1
1764            + sizeof("{listener: \"\", code:\"\", message: \"\"}") - 1
1765            + sa->length + socket_errors[error].length + (in->mem.free - p);
1766 
1767     out = nxt_buf_mem_alloc(tmcf->mem_pool, size, 0);
1768     if (nxt_slow_path(out == NULL)) {
1769         return;
1770     }
1771 
1772     out->mem.free = nxt_sprintf(out->mem.free, out->mem.end,
1773                         "listen socket error: "
1774                         "{listener: \"%*s\", code:\"%V\", message: \"%*s\"}",
1775                         (size_t) sa->length, nxt_sockaddr_start(sa),
1776                         &socket_errors[error], in->mem.free - p, p);
1777 
1778     nxt_debug(task, "%*s", out->mem.free - out->mem.pos, out->mem.pos);
1779 
1780     nxt_router_conf_error(task, tmcf);
1781 }
1782 
1783 
1784 static void
1785 nxt_router_app_rpc_create(nxt_task_t *task,
1786     nxt_router_temp_conf_t *tmcf, nxt_app_t *app)
1787 {
1788     size_t         size;
1789     uint32_t       stream;
1790     nxt_buf_t      *b;
1791     nxt_port_t     *main_port, *router_port;
1792     nxt_runtime_t  *rt;
1793     nxt_app_rpc_t  *rpc;
1794 
1795     rpc = nxt_mp_alloc(tmcf->mem_pool, sizeof(nxt_app_rpc_t));
1796     if (rpc == NULL) {
1797         goto fail;
1798     }
1799 
1800     rpc->app = app;
1801     rpc->temp_conf = tmcf;
1802 
1803     nxt_debug(task, "app '%V' prefork", &app->name);
1804 
1805     size = app->name.length + 1 + app->conf.length;
1806 
1807     b = nxt_buf_mem_alloc(tmcf->mem_pool, size, 0);
1808     if (nxt_slow_path(b == NULL)) {
1809         goto fail;
1810     }
1811 
1812     nxt_buf_cpystr(b, &app->name);
1813     *b->mem.free++ = '\0';
1814     nxt_buf_cpystr(b, &app->conf);
1815 
1816     rt = task->thread->runtime;
1817     main_port = rt->port_by_type[NXT_PROCESS_MAIN];
1818     router_port = rt->port_by_type[NXT_PROCESS_ROUTER];
1819 
1820     stream = nxt_port_rpc_register_handler(task, router_port,
1821                                            nxt_router_app_prefork_ready,
1822                                            nxt_router_app_prefork_error,
1823                                            -1, rpc);
1824     if (nxt_slow_path(stream == 0)) {
1825         goto fail;
1826     }
1827 
1828     app->pending_processes++;
1829 
1830     nxt_port_socket_write(task, main_port, NXT_PORT_MSG_START_WORKER, -1,
1831                           stream, router_port->id, b);
1832 
1833     return;
1834 
1835 fail:
1836 
1837     nxt_router_conf_error(task, tmcf);
1838 }
1839 
1840 
1841 static void
1842 nxt_router_app_prefork_ready(nxt_task_t *task, nxt_port_recv_msg_t *msg,
1843     void *data)
1844 {
1845     nxt_app_t           *app;
1846     nxt_port_t          *port;
1847     nxt_app_rpc_t       *rpc;
1848     nxt_event_engine_t  *engine;
1849 
1850     rpc = data;
1851     app = rpc->app;
1852 
1853     port = msg->u.new_port;
1854     port->app = app;
1855 
1856     nxt_router_app_use(task, app, 1);
1857 
1858     app->pending_processes--;
1859     app->processes++;
1860     app->idle_processes++;
1861 
1862     engine = task->thread->engine;
1863 
1864     nxt_queue_insert_tail(&app->ports, &port->app_link);
1865     nxt_queue_insert_tail(&app->spare_ports, &port->idle_link);
1866 
1867     port->idle_start = 0;
1868 
1869     nxt_port_inc_use(port);
1870 
1871     nxt_work_queue_add(&engine->fast_work_queue,
1872                        nxt_router_conf_apply, task, rpc->temp_conf, NULL);
1873 }
1874 
1875 
1876 static void
1877 nxt_router_app_prefork_error(nxt_task_t *task, nxt_port_recv_msg_t *msg,
1878     void *data)
1879 {
1880     nxt_app_t               *app;
1881     nxt_app_rpc_t           *rpc;
1882     nxt_router_temp_conf_t  *tmcf;
1883 
1884     rpc = data;
1885     app = rpc->app;
1886     tmcf = rpc->temp_conf;
1887 
1888     nxt_log(task, NXT_LOG_WARN, "failed to start application \"%V\"",
1889             &app->name);
1890 
1891     app->pending_processes--;
1892 
1893     nxt_router_conf_error(task, tmcf);
1894 }
1895 
1896 
1897 static nxt_int_t
1898 nxt_router_engines_create(nxt_task_t *task, nxt_router_t *router,
1899     nxt_router_temp_conf_t *tmcf, const nxt_event_interface_t *interface)
1900 {
1901     nxt_int_t                 ret;
1902     nxt_uint_t                n, threads;
1903     nxt_queue_link_t          *qlk;
1904     nxt_router_engine_conf_t  *recf;
1905 
1906     threads = tmcf->conf->threads;
1907 
1908     tmcf->engines = nxt_array_create(tmcf->mem_pool, threads,
1909                                      sizeof(nxt_router_engine_conf_t));
1910     if (nxt_slow_path(tmcf->engines == NULL)) {
1911         return NXT_ERROR;
1912     }
1913 
1914     n = 0;
1915 
1916     for (qlk = nxt_queue_first(&router->engines);
1917          qlk != nxt_queue_tail(&router->engines);
1918          qlk = nxt_queue_next(qlk))
1919     {
1920         recf = nxt_array_zero_add(tmcf->engines);
1921         if (nxt_slow_path(recf == NULL)) {
1922             return NXT_ERROR;
1923         }
1924 
1925         recf->engine = nxt_queue_link_data(qlk, nxt_event_engine_t, link0);
1926 
1927         if (n < threads) {
1928             recf->action = NXT_ROUTER_ENGINE_KEEP;
1929             ret = nxt_router_engine_conf_update(tmcf, recf);
1930 
1931         } else {
1932             recf->action = NXT_ROUTER_ENGINE_DELETE;
1933             ret = nxt_router_engine_conf_delete(tmcf, recf);
1934         }
1935 
1936         if (nxt_slow_path(ret != NXT_OK)) {
1937             return ret;
1938         }
1939 
1940         n++;
1941     }
1942 
1943     tmcf->new_threads = n;
1944 
1945     while (n < threads) {
1946         recf = nxt_array_zero_add(tmcf->engines);
1947         if (nxt_slow_path(recf == NULL)) {
1948             return NXT_ERROR;
1949         }
1950 
1951         recf->action = NXT_ROUTER_ENGINE_ADD;
1952 
1953         recf->engine = nxt_event_engine_create(task, interface, NULL, 0, 0);
1954         if (nxt_slow_path(recf->engine == NULL)) {
1955             return NXT_ERROR;
1956         }
1957 
1958         ret = nxt_router_engine_conf_create(tmcf, recf);
1959         if (nxt_slow_path(ret != NXT_OK)) {
1960             return ret;
1961         }
1962 
1963         n++;
1964     }
1965 
1966     return NXT_OK;
1967 }
1968 
1969 
1970 static nxt_int_t
1971 nxt_router_engine_conf_create(nxt_router_temp_conf_t *tmcf,
1972     nxt_router_engine_conf_t *recf)
1973 {
1974     nxt_int_t  ret;
1975 
1976     ret = nxt_router_engine_joints_create(tmcf, recf, &tmcf->creating,
1977                                           nxt_router_listen_socket_create);
1978     if (nxt_slow_path(ret != NXT_OK)) {
1979         return ret;
1980     }
1981 
1982     ret = nxt_router_engine_joints_create(tmcf, recf, &tmcf->updating,
1983                                           nxt_router_listen_socket_create);
1984     if (nxt_slow_path(ret != NXT_OK)) {
1985         return ret;
1986     }
1987 
1988     return ret;
1989 }
1990 
1991 
1992 static nxt_int_t
1993 nxt_router_engine_conf_update(nxt_router_temp_conf_t *tmcf,
1994     nxt_router_engine_conf_t *recf)
1995 {
1996     nxt_int_t  ret;
1997 
1998     ret = nxt_router_engine_joints_create(tmcf, recf, &tmcf->creating,
1999                                           nxt_router_listen_socket_create);
2000     if (nxt_slow_path(ret != NXT_OK)) {
2001         return ret;
2002     }
2003 
2004     ret = nxt_router_engine_joints_create(tmcf, recf, &tmcf->updating,
2005                                           nxt_router_listen_socket_update);
2006     if (nxt_slow_path(ret != NXT_OK)) {
2007         return ret;
2008     }
2009 
2010     ret = nxt_router_engine_joints_delete(tmcf, recf, &tmcf->deleting);
2011     if (nxt_slow_path(ret != NXT_OK)) {
2012         return ret;
2013     }
2014 
2015     return ret;
2016 }
2017 
2018 
2019 static nxt_int_t
2020 nxt_router_engine_conf_delete(nxt_router_temp_conf_t *tmcf,
2021     nxt_router_engine_conf_t *recf)
2022 {
2023     nxt_int_t  ret;
2024 
2025     ret = nxt_router_engine_quit(tmcf, recf);
2026     if (nxt_slow_path(ret != NXT_OK)) {
2027         return ret;
2028     }
2029 
2030     ret = nxt_router_engine_joints_delete(tmcf, recf, &tmcf->updating);
2031     if (nxt_slow_path(ret != NXT_OK)) {
2032         return ret;
2033     }
2034 
2035     return nxt_router_engine_joints_delete(tmcf, recf, &tmcf->deleting);
2036 }
2037 
2038 
2039 static nxt_int_t
2040 nxt_router_engine_joints_create(nxt_router_temp_conf_t *tmcf,
2041     nxt_router_engine_conf_t *recf, nxt_queue_t *sockets,
2042     nxt_work_handler_t handler)
2043 {
2044     nxt_joint_job_t          *job;
2045     nxt_queue_link_t         *qlk;
2046     nxt_socket_conf_t        *skcf;
2047     nxt_socket_conf_joint_t  *joint;
2048 
2049     for (qlk = nxt_queue_first(sockets);
2050          qlk != nxt_queue_tail(sockets);
2051          qlk = nxt_queue_next(qlk))
2052     {
2053         job = nxt_mp_get(tmcf->mem_pool, sizeof(nxt_joint_job_t));
2054         if (nxt_slow_path(job == NULL)) {
2055             return NXT_ERROR;
2056         }
2057 
2058         job->work.next = recf->jobs;
2059         recf->jobs = &job->work;
2060 
2061         job->task = tmcf->engine->task;
2062         job->work.handler = handler;
2063         job->work.task = &job->task;
2064         job->work.obj = job;
2065         job->tmcf = tmcf;
2066 
2067         tmcf->count++;
2068 
2069         joint = nxt_mp_alloc(tmcf->conf->mem_pool,
2070                              sizeof(nxt_socket_conf_joint_t));
2071         if (nxt_slow_path(joint == NULL)) {
2072             return NXT_ERROR;
2073         }
2074 
2075         job->work.data = joint;
2076 
2077         joint->count = 1;
2078 
2079         skcf = nxt_queue_link_data(qlk, nxt_socket_conf_t, link);
2080         skcf->count++;
2081         joint->socket_conf = skcf;
2082 
2083         joint->engine = recf->engine;
2084     }
2085 
2086     return NXT_OK;
2087 }
2088 
2089 
2090 static nxt_int_t
2091 nxt_router_engine_quit(nxt_router_temp_conf_t *tmcf,
2092     nxt_router_engine_conf_t *recf)
2093 {
2094     nxt_joint_job_t  *job;
2095 
2096     job = nxt_mp_get(tmcf->mem_pool, sizeof(nxt_joint_job_t));
2097     if (nxt_slow_path(job == NULL)) {
2098         return NXT_ERROR;
2099     }
2100 
2101     job->work.next = recf->jobs;
2102     recf->jobs = &job->work;
2103 
2104     job->task = tmcf->engine->task;
2105     job->work.handler = nxt_router_worker_thread_quit;
2106     job->work.task = &job->task;
2107     job->work.obj = NULL;
2108     job->work.data = NULL;
2109     job->tmcf = NULL;
2110 
2111     return NXT_OK;
2112 }
2113 
2114 
2115 static nxt_int_t
2116 nxt_router_engine_joints_delete(nxt_router_temp_conf_t *tmcf,
2117     nxt_router_engine_conf_t *recf, nxt_queue_t *sockets)
2118 {
2119     nxt_joint_job_t   *job;
2120     nxt_queue_link_t  *qlk;
2121 
2122     for (qlk = nxt_queue_first(sockets);
2123          qlk != nxt_queue_tail(sockets);
2124          qlk = nxt_queue_next(qlk))
2125     {
2126         job = nxt_mp_get(tmcf->mem_pool, sizeof(nxt_joint_job_t));
2127         if (nxt_slow_path(job == NULL)) {
2128             return NXT_ERROR;
2129         }
2130 
2131         job->work.next = recf->jobs;
2132         recf->jobs = &job->work;
2133 
2134         job->task = tmcf->engine->task;
2135         job->work.handler = nxt_router_listen_socket_delete;
2136         job->work.task = &job->task;
2137         job->work.obj = job;
2138         job->work.data = nxt_queue_link_data(qlk, nxt_socket_conf_t, link);
2139         job->tmcf = tmcf;
2140 
2141         tmcf->count++;
2142     }
2143 
2144     return NXT_OK;
2145 }
2146 
2147 
2148 static nxt_int_t
2149 nxt_router_threads_create(nxt_task_t *task, nxt_runtime_t *rt,
2150     nxt_router_temp_conf_t *tmcf)
2151 {
2152     nxt_int_t                 ret;
2153     nxt_uint_t                i, threads;
2154     nxt_router_engine_conf_t  *recf;
2155 
2156     recf = tmcf->engines->elts;
2157     threads = tmcf->conf->threads;
2158 
2159     for (i = tmcf->new_threads; i < threads; i++) {
2160         ret = nxt_router_thread_create(task, rt, recf[i].engine);
2161         if (nxt_slow_path(ret != NXT_OK)) {
2162             return ret;
2163         }
2164     }
2165 
2166     return NXT_OK;
2167 }
2168 
2169 
2170 static nxt_int_t
2171 nxt_router_thread_create(nxt_task_t *task, nxt_runtime_t *rt,
2172     nxt_event_engine_t *engine)
2173 {
2174     nxt_int_t            ret;
2175     nxt_thread_link_t    *link;
2176     nxt_thread_handle_t  handle;
2177 
2178     link = nxt_zalloc(sizeof(nxt_thread_link_t));
2179 
2180     if (nxt_slow_path(link == NULL)) {
2181         return NXT_ERROR;
2182     }
2183 
2184     link->start = nxt_router_thread_start;
2185     link->engine = engine;
2186     link->work.handler = nxt_router_thread_exit_handler;
2187     link->work.task = task;
2188     link->work.data = link;
2189 
2190     nxt_queue_insert_tail(&rt->engines, &engine->link);
2191 
2192     ret = nxt_thread_create(&handle, link);
2193 
2194     if (nxt_slow_path(ret != NXT_OK)) {
2195         nxt_queue_remove(&engine->link);
2196     }
2197 
2198     return ret;
2199 }
2200 
2201 
2202 static void
2203 nxt_router_apps_sort(nxt_task_t *task, nxt_router_t *router,
2204     nxt_router_temp_conf_t *tmcf)
2205 {
2206     nxt_app_t  *app;
2207 
2208     nxt_queue_each(app, &router->apps, nxt_app_t, link) {
2209 
2210         nxt_router_app_quit(task, app);
2211 
2212     } nxt_queue_loop;
2213 
2214     nxt_queue_add(&router->apps, &tmcf->previous);
2215     nxt_queue_add(&router->apps, &tmcf->apps);
2216 }
2217 
2218 
2219 static void
2220 nxt_router_engines_post(nxt_router_t *router, nxt_router_temp_conf_t *tmcf)
2221 {
2222     nxt_uint_t                n;
2223     nxt_event_engine_t        *engine;
2224     nxt_router_engine_conf_t  *recf;
2225 
2226     recf = tmcf->engines->elts;
2227 
2228     for (n = tmcf->engines->nelts; n != 0; n--) {
2229         engine = recf->engine;
2230 
2231         switch (recf->action) {
2232 
2233         case NXT_ROUTER_ENGINE_KEEP:
2234             break;
2235 
2236         case NXT_ROUTER_ENGINE_ADD:
2237             nxt_queue_insert_tail(&router->engines, &engine->link0);
2238             break;
2239 
2240         case NXT_ROUTER_ENGINE_DELETE:
2241             nxt_queue_remove(&engine->link0);
2242             break;
2243         }
2244 
2245         nxt_router_engine_post(engine, recf->jobs);
2246 
2247         recf++;
2248     }
2249 }
2250 
2251 
2252 static void
2253 nxt_router_engine_post(nxt_event_engine_t *engine, nxt_work_t *jobs)
2254 {
2255     nxt_work_t  *work, *next;
2256 
2257     for (work = jobs; work != NULL; work = next) {
2258         next = work->next;
2259         work->next = NULL;
2260 
2261         nxt_event_engine_post(engine, work);
2262     }
2263 }
2264 
2265 
2266 static nxt_port_handlers_t  nxt_router_app_port_handlers = {
2267     .mmap = nxt_port_mmap_handler,
2268     .data = nxt_port_rpc_handler,
2269 };
2270 
2271 
2272 static void
2273 nxt_router_thread_start(void *data)
2274 {
2275     nxt_int_t           ret;
2276     nxt_port_t          *port;
2277     nxt_task_t          *task;
2278     nxt_thread_t        *thread;
2279     nxt_thread_link_t   *link;
2280     nxt_event_engine_t  *engine;
2281 
2282     link = data;
2283     engine = link->engine;
2284     task = &engine->task;
2285 
2286     thread = nxt_thread();
2287 
2288     nxt_event_engine_thread_adopt(engine);
2289 
2290     /* STUB */
2291     thread->runtime = engine->task.thread->runtime;
2292 
2293     engine->task.thread = thread;
2294     engine->task.log = thread->log;
2295     thread->engine = engine;
2296     thread->task = &engine->task;
2297 #if 0
2298     thread->fiber = &engine->fibers->fiber;
2299 #endif
2300 
2301     engine->mem_pool = nxt_mp_create(4096, 128, 1024, 64);
2302     if (nxt_slow_path(engine->mem_pool == NULL)) {
2303         return;
2304     }
2305 
2306     port = nxt_port_new(task, nxt_port_get_next_id(), nxt_pid,
2307                         NXT_PROCESS_ROUTER);
2308     if (nxt_slow_path(port == NULL)) {
2309         return;
2310     }
2311 
2312     ret = nxt_port_socket_init(task, port, 0);
2313     if (nxt_slow_path(ret != NXT_OK)) {
2314         nxt_port_use(task, port, -1);
2315         return;
2316     }
2317 
2318     engine->port = port;
2319 
2320     nxt_port_enable(task, port, &nxt_router_app_port_handlers);
2321 
2322     nxt_event_engine_start(engine);
2323 }
2324 
2325 
2326 static void
2327 nxt_router_listen_socket_create(nxt_task_t *task, void *obj, void *data)
2328 {
2329     nxt_joint_job_t          *job;
2330     nxt_socket_conf_t        *skcf;
2331     nxt_listen_event_t       *lev;
2332     nxt_listen_socket_t      *ls;
2333     nxt_thread_spinlock_t    *lock;
2334     nxt_socket_conf_joint_t  *joint;
2335 
2336     job = obj;
2337     joint = data;
2338 
2339     nxt_queue_insert_tail(&task->thread->engine->joints, &joint->link);
2340 
2341     skcf = joint->socket_conf;
2342     ls = skcf->listen;
2343 
2344     lev = nxt_listen_event(task, ls);
2345     if (nxt_slow_path(lev == NULL)) {
2346         nxt_router_listen_socket_release(task, skcf);
2347         return;
2348     }
2349 
2350     lev->socket.data = joint;
2351 
2352     lock = &skcf->router_conf->router->lock;
2353 
2354     nxt_thread_spin_lock(lock);
2355     ls->count++;
2356     nxt_thread_spin_unlock(lock);
2357 
2358     job->work.next = NULL;
2359     job->work.handler = nxt_router_conf_wait;
2360 
2361     nxt_event_engine_post(job->tmcf->engine, &job->work);
2362 }
2363 
2364 
2365 nxt_inline nxt_listen_event_t *
2366 nxt_router_listen_event(nxt_queue_t *listen_connections,
2367     nxt_socket_conf_t *skcf)
2368 {
2369     nxt_socket_t        fd;
2370     nxt_queue_link_t    *qlk;
2371     nxt_listen_event_t  *lev;
2372 
2373     fd = skcf->listen->socket;
2374 
2375     for (qlk = nxt_queue_first(listen_connections);
2376          qlk != nxt_queue_tail(listen_connections);
2377          qlk = nxt_queue_next(qlk))
2378     {
2379         lev = nxt_queue_link_data(qlk, nxt_listen_event_t, link);
2380 
2381         if (fd == lev->socket.fd) {
2382             return lev;
2383         }
2384     }
2385 
2386     return NULL;
2387 }
2388 
2389 
2390 static void
2391 nxt_router_listen_socket_update(nxt_task_t *task, void *obj, void *data)
2392 {
2393     nxt_joint_job_t          *job;
2394     nxt_event_engine_t       *engine;
2395     nxt_listen_event_t       *lev;
2396     nxt_socket_conf_joint_t  *joint, *old;
2397 
2398     job = obj;
2399     joint = data;
2400 
2401     engine = task->thread->engine;
2402 
2403     nxt_queue_insert_tail(&engine->joints, &joint->link);
2404 
2405     lev = nxt_router_listen_event(&engine->listen_connections,
2406                                   joint->socket_conf);
2407 
2408     old = lev->socket.data;
2409     lev->socket.data = joint;
2410     lev->listen = joint->socket_conf->listen;
2411 
2412     job->work.next = NULL;
2413     job->work.handler = nxt_router_conf_wait;
2414 
2415     nxt_event_engine_post(job->tmcf->engine, &job->work);
2416 
2417     /*
2418      * The task is allocated from configuration temporary
2419      * memory pool so it can be freed after engine post operation.
2420      */
2421 
2422     nxt_router_conf_release(&engine->task, old);
2423 }
2424 
2425 
2426 static void
2427 nxt_router_listen_socket_delete(nxt_task_t *task, void *obj, void *data)
2428 {
2429     nxt_joint_job_t     *job;
2430     nxt_socket_conf_t   *skcf;
2431     nxt_listen_event_t  *lev;
2432     nxt_event_engine_t  *engine;
2433 
2434     job = obj;
2435     skcf = data;
2436 
2437     engine = task->thread->engine;
2438 
2439     lev = nxt_router_listen_event(&engine->listen_connections, skcf);
2440 
2441     nxt_fd_event_delete(engine, &lev->socket);
2442 
2443     nxt_debug(task, "engine %p: listen socket delete: %d", engine,
2444               lev->socket.fd);
2445 
2446     lev->timer.handler = nxt_router_listen_socket_close;
2447     lev->timer.work_queue = &engine->fast_work_queue;
2448 
2449     nxt_timer_add(engine, &lev->timer, 0);
2450 
2451     job->work.next = NULL;
2452     job->work.handler = nxt_router_conf_wait;
2453 
2454     nxt_event_engine_post(job->tmcf->engine, &job->work);
2455 }
2456 
2457 
2458 static void
2459 nxt_router_worker_thread_quit(nxt_task_t *task, void *obj, void *data)
2460 {
2461     nxt_event_engine_t  *engine;
2462 
2463     nxt_debug(task, "router worker thread quit");
2464 
2465     engine = task->thread->engine;
2466 
2467     engine->shutdown = 1;
2468 
2469     if (nxt_queue_is_empty(&engine->joints)) {
2470         nxt_thread_exit(task->thread);
2471     }
2472 }
2473 
2474 
2475 static void
2476 nxt_router_listen_socket_close(nxt_task_t *task, void *obj, void *data)
2477 {
2478     nxt_timer_t              *timer;
2479     nxt_listen_event_t       *lev;
2480     nxt_socket_conf_joint_t  *joint;
2481 
2482     timer = obj;
2483     lev = nxt_timer_data(timer, nxt_listen_event_t, timer);
2484     joint = lev->socket.data;
2485 
2486     nxt_debug(task, "engine %p: listen socket close: %d", task->thread->engine,
2487               lev->socket.fd);
2488 
2489     nxt_queue_remove(&lev->link);
2490 
2491     /* 'task' refers to lev->task and we cannot use after nxt_free() */
2492     task = &task->thread->engine->task;
2493 
2494     nxt_router_listen_socket_release(task, joint->socket_conf);
2495 
2496     nxt_free(lev);
2497 
2498     nxt_router_conf_release(task, joint);
2499 }
2500 
2501 
2502 static void
2503 nxt_router_listen_socket_release(nxt_task_t *task, nxt_socket_conf_t *skcf)
2504 {
2505     nxt_listen_socket_t    *ls;
2506     nxt_thread_spinlock_t  *lock;
2507 
2508     ls = skcf->listen;
2509     lock = &skcf->router_conf->router->lock;
2510 
2511     nxt_thread_spin_lock(lock);
2512 
2513     nxt_debug(task, "engine %p: listen socket release: ls->count %D",
2514               task->thread->engine, ls->count);
2515 
2516     if (--ls->count != 0) {
2517         ls = NULL;
2518     }
2519 
2520     nxt_thread_spin_unlock(lock);
2521 
2522     if (ls != NULL) {
2523         nxt_socket_close(task, ls->socket);
2524         nxt_free(ls);
2525     }
2526 }
2527 
2528 
2529 static void
2530 nxt_router_conf_release(nxt_task_t *task, nxt_socket_conf_joint_t *joint)
2531 {
2532     nxt_app_t              *app;
2533     nxt_socket_conf_t      *skcf;
2534     nxt_router_conf_t      *rtcf;
2535     nxt_event_engine_t     *engine;
2536     nxt_thread_spinlock_t  *lock;
2537 
2538     nxt_debug(task, "conf joint %p count: %D", joint, joint->count);
2539 
2540     if (--joint->count != 0) {
2541         return;
2542     }
2543 
2544     nxt_queue_remove(&joint->link);
2545 
2546     /*
2547      * The joint content can not be safely used after the critical
2548      * section protected by the spinlock because its memory pool may
2549      * be already destroyed by another thread.
2550      */
2551     engine = joint->engine;
2552 
2553     skcf = joint->socket_conf;
2554     app = skcf->application;
2555     rtcf = skcf->router_conf;
2556     lock = &rtcf->router->lock;
2557 
2558     nxt_thread_spin_lock(lock);
2559 
2560     nxt_debug(task, "conf skcf %p: %D, rtcf %p: %D", skcf, skcf->count,
2561               rtcf, rtcf->count);
2562 
2563     if (--skcf->count != 0) {
2564         rtcf = NULL;
2565         app = NULL;
2566 
2567     } else {
2568         nxt_queue_remove(&skcf->link);
2569 
2570         if (--rtcf->count != 0) {
2571             rtcf = NULL;
2572         }
2573     }
2574 
2575     nxt_thread_spin_unlock(lock);
2576 
2577     if (app != NULL) {
2578         nxt_router_app_use(task, app, -1);
2579     }
2580 
2581     /* TODO remove engine->port */
2582     /* TODO excude from connected ports */
2583 
2584     if (rtcf != NULL) {
2585         nxt_debug(task, "old router conf is destroyed");
2586 
2587         nxt_mp_thread_adopt(rtcf->mem_pool);
2588 
2589         nxt_mp_destroy(rtcf->mem_pool);
2590     }
2591 
2592     if (engine->shutdown && nxt_queue_is_empty(&engine->joints)) {
2593         nxt_thread_exit(task->thread);
2594     }
2595 }
2596 
2597 
2598 static void
2599 nxt_router_thread_exit_handler(nxt_task_t *task, void *obj, void *data)
2600 {
2601     nxt_port_t           *port;
2602     nxt_thread_link_t    *link;
2603     nxt_event_engine_t   *engine;
2604     nxt_thread_handle_t  handle;
2605 
2606     handle = (nxt_thread_handle_t) obj;
2607     link = data;
2608 
2609     nxt_thread_wait(handle);
2610 
2611     engine = link->engine;
2612 
2613     nxt_queue_remove(&engine->link);
2614 
2615     port = engine->port;
2616 
2617     // TODO notify all apps
2618 
2619     port->engine = task->thread->engine;
2620     nxt_mp_thread_adopt(port->mem_pool);
2621     nxt_port_use(task, port, -1);
2622 
2623     nxt_mp_thread_adopt(engine->mem_pool);
2624     nxt_mp_destroy(engine->mem_pool);
2625 
2626     nxt_event_engine_free(engine);
2627 
2628     nxt_free(link);
2629 }
2630 
2631 
2632 static void
2633 nxt_router_response_ready_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg,
2634     void *data)
2635 {
2636     size_t               dump_size;
2637     nxt_int_t            ret;
2638     nxt_buf_t            *b, *last;
2639     nxt_http_request_t   *r;
2640     nxt_req_conn_link_t  *rc;
2641     nxt_app_parse_ctx_t  *ar;
2642 
2643     b = msg->buf;
2644     rc = data;
2645 
2646     dump_size = nxt_buf_used_size(b);
2647 
2648     if (dump_size > 300) {
2649         dump_size = 300;
2650     }
2651 
2652     nxt_debug(task, "%srouter app data (%uz): %*s",
2653               msg->port_msg.last ? "last " : "", msg->size, dump_size,
2654               b->mem.pos);
2655 
2656     if (msg->size == 0) {
2657         b = NULL;
2658     }
2659 
2660     ar = rc->ap;
2661     if (nxt_slow_path(ar == NULL)) {
2662         return;
2663     }
2664 
2665     if (msg->port_msg.last != 0) {
2666         nxt_debug(task, "router data create last buf");
2667 
2668         last = nxt_http_request_last_buffer(task, ar->request);
2669         if (nxt_slow_path(last == NULL)) {
2670             nxt_app_http_req_done(task, ar);
2671             nxt_router_rc_unlink(task, rc);
2672             return;
2673         }
2674 
2675         nxt_buf_chain_add(&b, last);
2676 
2677         nxt_router_rc_unlink(task, rc);
2678 
2679     } else {
2680         if (rc->app->timeout != 0) {
2681             ar->timer.handler = nxt_router_app_timeout;
2682             nxt_timer_add(task->thread->engine, &ar->timer, rc->app->timeout);
2683         }
2684     }
2685 
2686     if (b == NULL) {
2687         return;
2688     }
2689 
2690     if (msg->buf == b) {
2691         /* Disable instant buffer completion/re-using by port. */
2692         msg->buf = NULL;
2693     }
2694 
2695     r = ar->request;
2696 
2697     if (r->header_sent) {
2698         nxt_buf_chain_add(&r->out, b);
2699         nxt_http_request_send_body(task, r, NULL);
2700 
2701     } else {
2702         ret = nxt_http_parse_fields(&ar->resp_parser, &b->mem);
2703         if (nxt_slow_path(ret != NXT_DONE)) {
2704             goto fail;
2705         }
2706 
2707         r->resp.fields = ar->resp_parser.fields;
2708 
2709         ret = nxt_http_fields_process(r->resp.fields,
2710                                       &nxt_response_fields_hash, r);
2711         if (nxt_slow_path(ret != NXT_OK)) {
2712             goto fail;
2713         }
2714 
2715         if (nxt_buf_mem_used_size(&b->mem) == 0) {
2716             nxt_work_queue_add(&task->thread->engine->fast_work_queue,
2717                                b->completion_handler, task, b, b->parent);
2718 
2719             b = b->next;
2720         }
2721 
2722         if (b != NULL) {
2723             nxt_buf_chain_add(&r->out, b);
2724         }
2725 
2726         r->state = &nxt_http_request_send_state;
2727 
2728         nxt_http_request_header_send(task, r);
2729     }
2730 
2731     return;
2732 
2733 fail:
2734 
2735     nxt_app_http_req_done(task, ar);
2736     nxt_router_rc_unlink(task, rc);
2737 
2738     nxt_http_request_error(task, r, NXT_HTTP_SERVICE_UNAVAILABLE);
2739 }
2740 
2741 
2742 static const nxt_http_request_state_t  nxt_http_request_send_state
2743     nxt_aligned(64) =
2744 {
2745     .ready_handler = nxt_http_request_send_body,
2746     .error_handler = nxt_http_request_close_handler,
2747 };
2748 
2749 
2750 static void
2751 nxt_http_request_send_body(nxt_task_t *task, void *obj, void *data)
2752 {
2753     nxt_buf_t           *out;
2754     nxt_http_request_t  *r;
2755 
2756     r = obj;
2757 
2758     out = r->out;
2759 
2760     if (out != NULL) {
2761         r->out = NULL;
2762         nxt_http_request_send(task, r, out);
2763     }
2764 }
2765 
2766 
2767 static void
2768 nxt_router_response_error_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg,
2769     void *data)
2770 {
2771     nxt_int_t            res;
2772     nxt_port_t           *port;
2773     nxt_bool_t           cancelled;
2774     nxt_req_app_link_t   *ra;
2775     nxt_req_conn_link_t  *rc;
2776 
2777     rc = data;
2778 
2779     ra = rc->ra;
2780 
2781     if (ra != NULL) {
2782         cancelled = nxt_router_msg_cancel(task, &ra->msg_info, ra->stream);
2783 
2784         if (cancelled) {
2785             nxt_router_ra_inc_use(ra);
2786 
2787             res = nxt_router_app_port(task, rc->app, ra);
2788 
2789             if (res == NXT_OK) {
2790                 port = ra->app_port;
2791 
2792                 if (nxt_slow_path(port == NULL)) {
2793                     nxt_log(task, NXT_LOG_ERR, "port is NULL in cancelled ra");
2794                     return;
2795                 }
2796 
2797                 nxt_port_rpc_ex_set_peer(task, task->thread->engine->port, rc,
2798                                          port->pid);
2799 
2800                 nxt_router_app_prepare_request(task, ra);
2801             }
2802 
2803             msg->port_msg.last = 0;
2804 
2805             return;
2806         }
2807     }
2808 
2809     nxt_http_request_error(task, rc->ap->request, NXT_HTTP_SERVICE_UNAVAILABLE);
2810 
2811     nxt_router_rc_unlink(task, rc);
2812 }
2813 
2814 
2815 static void
2816 nxt_router_app_port_ready(nxt_task_t *task, nxt_port_recv_msg_t *msg,
2817     void *data)
2818 {
2819     nxt_app_t   *app;
2820     nxt_port_t  *port;
2821 
2822     app = data;
2823     port = msg->u.new_port;
2824 
2825     nxt_assert(app != NULL);
2826     nxt_assert(port != NULL);
2827 
2828     port->app = app;
2829 
2830     nxt_thread_mutex_lock(&app->mutex);
2831 
2832     nxt_assert(app->pending_processes != 0);
2833 
2834     app->pending_processes--;
2835     app->processes++;
2836 
2837     nxt_thread_mutex_unlock(&app->mutex);
2838 
2839     nxt_debug(task, "app '%V' new port ready, pid %PI, %d/%d",
2840               &app->name, port->pid, app->processes, app->pending_processes);
2841 
2842     nxt_router_app_port_release(task, port, 0, 0);
2843 }
2844 
2845 
2846 static void
2847 nxt_router_app_port_error(nxt_task_t *task, nxt_port_recv_msg_t *msg,
2848     void *data)
2849 {
2850     nxt_app_t           *app;
2851     nxt_queue_link_t    *lnk;
2852     nxt_req_app_link_t  *ra;
2853 
2854     app = data;
2855 
2856     nxt_assert(app != NULL);
2857 
2858     nxt_debug(task, "app '%V' %p start error", &app->name, app);
2859 
2860     nxt_thread_mutex_lock(&app->mutex);
2861 
2862     nxt_assert(app->pending_processes != 0);
2863 
2864     app->pending_processes--;
2865 
2866     if (!nxt_queue_is_empty(&app->requests)) {
2867         lnk = nxt_queue_last(&app->requests);
2868         nxt_queue_remove(lnk);
2869         lnk->next = NULL;
2870 
2871         ra = nxt_queue_link_data(lnk, nxt_req_app_link_t, link_app_requests);
2872 
2873     } else {
2874         ra = NULL;
2875     }
2876 
2877     nxt_thread_mutex_unlock(&app->mutex);
2878 
2879     if (ra != NULL) {
2880         nxt_debug(task, "app '%V' %p abort next stream #%uD",
2881                   &app->name, app, ra->stream);
2882 
2883         nxt_router_ra_error(ra, 500, "Failed to start application process");
2884         nxt_router_ra_use(task, ra, -1);
2885     }
2886 
2887     nxt_router_app_use(task, app, -1);
2888 }
2889 
2890 
2891 void
2892 nxt_router_app_use(nxt_task_t *task, nxt_app_t *app, int i)
2893 {
2894     int  c;
2895 
2896     c = nxt_atomic_fetch_add(&app->use_count, i);
2897 
2898     if (i < 0 && c == -i) {
2899 
2900         nxt_assert(app->live == 0);
2901         nxt_assert(app->processes == 0);
2902         nxt_assert(app->idle_processes == 0);
2903         nxt_assert(app->pending_processes == 0);
2904         nxt_assert(nxt_queue_is_empty(&app->requests));
2905         nxt_assert(nxt_queue_is_empty(&app->ports));
2906         nxt_assert(nxt_queue_is_empty(&app->spare_ports));
2907         nxt_assert(nxt_queue_is_empty(&app->idle_ports));
2908 
2909         nxt_thread_mutex_destroy(&app->mutex);
2910         nxt_free(app);
2911     }
2912 }
2913 
2914 
2915 nxt_inline nxt_bool_t
2916 nxt_router_app_first_port_busy(nxt_app_t *app)
2917 {
2918     nxt_port_t        *port;
2919     nxt_queue_link_t  *lnk;
2920 
2921     lnk = nxt_queue_first(&app->ports);
2922     port = nxt_queue_link_data(lnk, nxt_port_t, app_link);
2923 
2924     return port->app_pending_responses > 0;
2925 }
2926 
2927 
2928 nxt_inline nxt_port_t *
2929 nxt_router_pop_first_port(nxt_app_t *app)
2930 {
2931     nxt_port_t        *port;
2932     nxt_queue_link_t  *lnk;
2933 
2934     lnk = nxt_queue_first(&app->ports);
2935     nxt_queue_remove(lnk);
2936 
2937     port = nxt_queue_link_data(lnk, nxt_port_t, app_link);
2938 
2939     port->app_pending_responses++;
2940 
2941     if (nxt_queue_chk_remove(&port->idle_link)) {
2942         app->idle_processes--;
2943 
2944         if (port->idle_start == 0) {
2945             nxt_assert(app->idle_processes < app->spare_processes);
2946 
2947         } else {
2948             nxt_assert(app->idle_processes >= app->spare_processes);
2949 
2950             port->idle_start = 0;
2951         }
2952     }
2953 
2954     if ((app->max_pending_responses == 0
2955             || port->app_pending_responses < app->max_pending_responses)
2956         && (app->max_requests == 0
2957             || port->app_responses + port->app_pending_responses
2958                 < app->max_requests))
2959     {
2960         nxt_queue_insert_tail(&app->ports, lnk);
2961 
2962         nxt_port_inc_use(port);
2963 
2964     } else {
2965         lnk->next = NULL;
2966     }
2967 
2968     return port;
2969 }
2970 
2971 
2972 nxt_inline nxt_port_t *
2973 nxt_router_app_get_port_for_quit(nxt_app_t *app)
2974 {
2975     nxt_port_t  *port;
2976 
2977     port = NULL;
2978 
2979     nxt_thread_mutex_lock(&app->mutex);
2980 
2981     nxt_queue_each(port, &app->ports, nxt_port_t, app_link) {
2982 
2983         if (port->app_pending_responses > 0) {
2984             port = NULL;
2985 
2986             continue;
2987         }
2988 
2989         /* Caller is responsible to decrease port use count. */
2990         nxt_queue_chk_remove(&port->app_link);
2991 
2992         if (nxt_queue_chk_remove(&port->idle_link)) {
2993             app->idle_processes--;
2994         }
2995 
2996         /* Caller is responsible to decrease app use count. */
2997         port->app = NULL;
2998         app->processes--;
2999 
3000         break;
3001 
3002     } nxt_queue_loop;
3003 
3004     nxt_thread_mutex_unlock(&app->mutex);
3005 
3006     return port;
3007 }
3008 
3009 
3010 static void
3011 nxt_router_app_quit(nxt_task_t *task, nxt_app_t *app)
3012 {
3013     nxt_port_t  *port;
3014 
3015     nxt_queue_remove(&app->link);
3016 
3017     app->live = 0;
3018 
3019     for ( ;; ) {
3020         port = nxt_router_app_get_port_for_quit(app);
3021         if (port == NULL) {
3022             break;
3023         }
3024 
3025         nxt_debug(task, "send QUIT to app '%V' pid %PI", &app->name, port->pid);
3026 
3027         nxt_port_socket_write(task, port, NXT_PORT_MSG_QUIT, -1, 0, 0, NULL);
3028 
3029         nxt_port_use(task, port, -1);
3030         nxt_router_app_use(task, app, -1);
3031     }
3032 
3033     if (nxt_timer_is_in_tree(&app->idle_timer)) {
3034         nxt_assert(app->engine == task->thread->engine);
3035 
3036         app->idle_timer.handler = nxt_router_app_release_handler;
3037         nxt_timer_add(app->engine, &app->idle_timer, 0);
3038 
3039     } else {
3040         nxt_router_app_use(task, app, -1);
3041     }
3042 }
3043 
3044 
3045 static void
3046 nxt_router_app_process_request(nxt_task_t *task, void *obj, void *data)
3047 {
3048     nxt_req_app_link_t  *ra;
3049 
3050     ra = data;
3051 
3052 #if (NXT_DEBUG)
3053     {
3054     nxt_app_t  *app;
3055 
3056     app = obj;
3057 
3058     nxt_assert(app != NULL);
3059     nxt_assert(ra != NULL);
3060     nxt_assert(ra->app_port != NULL);
3061 
3062     nxt_debug(task, "app '%V' %p process next stream #%uD",
3063               &app->name, app, ra->stream);
3064     }
3065 #endif
3066 
3067     nxt_router_app_prepare_request(task, ra);
3068 }
3069 
3070 
3071 static void
3072 nxt_router_app_port_release(nxt_task_t *task, nxt_port_t *port,
3073     uint32_t request_failed, uint32_t got_response)
3074 {
3075     nxt_app_t                *app;
3076     nxt_bool_t               port_unchained;
3077     nxt_bool_t               send_quit, cancelled, adjust_idle_timer;
3078     nxt_queue_link_t         *lnk;
3079     nxt_req_app_link_t       *ra, *pending_ra, *re_ra;
3080     nxt_port_select_state_t  state;
3081 
3082     nxt_assert(port != NULL);
3083     nxt_assert(port->app != NULL);
3084 
3085     ra = NULL;
3086 
3087     app = port->app;
3088 
3089     nxt_thread_mutex_lock(&app->mutex);
3090 
3091     port->app_pending_responses -= request_failed + got_response;
3092     port->app_responses += got_response;
3093 
3094     if (nxt_slow_path(app->live == 0)) {
3095         goto app_dead;
3096     }
3097 
3098     if (port->pair[1] != -1
3099         && (app->max_pending_responses == 0
3100             || port->app_pending_responses < app->max_pending_responses)
3101         && (app->max_requests == 0
3102             || port->app_responses + port->app_pending_responses
3103                 < app->max_requests))
3104     {
3105         if (port->app_link.next == NULL) {
3106             if (port->app_pending_responses > 0) {
3107                 nxt_queue_insert_tail(&app->ports, &port->app_link);
3108 
3109             } else {
3110                 nxt_queue_insert_head(&app->ports, &port->app_link);
3111             }
3112 
3113             nxt_port_inc_use(port);
3114 
3115         } else {
3116             if (port->app_pending_responses == 0
3117                 && nxt_queue_first(&app->ports) != &port->app_link)
3118             {
3119                 nxt_queue_remove(&port->app_link);
3120                 nxt_queue_insert_head(&app->ports, &port->app_link);
3121             }
3122         }
3123     }
3124 
3125     if (!nxt_queue_is_empty(&app->ports)
3126         && !nxt_queue_is_empty(&app->requests))
3127     {
3128         lnk = nxt_queue_first(&app->requests);
3129         nxt_queue_remove(lnk);
3130         lnk->next = NULL;
3131 
3132         ra = nxt_queue_link_data(lnk, nxt_req_app_link_t, link_app_requests);
3133 
3134         ra->app_port = nxt_router_pop_first_port(app);
3135 
3136         if (ra->app_port->app_pending_responses > 1) {
3137             nxt_router_ra_pending(task, app, ra);
3138         }
3139     }
3140 
3141 app_dead:
3142 
3143     /* Pop first pending request for this port. */
3144     if ((request_failed > 0 || got_response > 0)
3145         && !nxt_queue_is_empty(&port->pending_requests))
3146     {
3147         lnk = nxt_queue_first(&port->pending_requests);
3148         nxt_queue_remove(lnk);
3149         lnk->next = NULL;
3150 
3151         pending_ra = nxt_queue_link_data(lnk, nxt_req_app_link_t,
3152                                          link_port_pending);
3153 
3154         nxt_assert(pending_ra->link_app_pending.next != NULL);
3155 
3156         nxt_queue_remove(&pending_ra->link_app_pending);
3157         pending_ra->link_app_pending.next = NULL;
3158 
3159     } else {
3160         pending_ra = NULL;
3161     }
3162 
3163     /* Try to cancel and re-schedule first stalled request for this app. */
3164     if (got_response > 0 && !nxt_queue_is_empty(&app->pending)) {
3165         lnk = nxt_queue_first(&app->pending);
3166 
3167         re_ra = nxt_queue_link_data(lnk, nxt_req_app_link_t, link_app_pending);
3168 
3169         if (re_ra->res_time <= nxt_thread_monotonic_time(task->thread)) {
3170 
3171             nxt_debug(task, "app '%V' stalled request #%uD detected",
3172                       &app->name, re_ra->stream);
3173 
3174             cancelled = nxt_router_msg_cancel(task, &re_ra->msg_info,
3175                                               re_ra->stream);
3176 
3177             if (cancelled) {
3178                 nxt_router_ra_inc_use(re_ra);
3179 
3180                 state.ra = re_ra;
3181                 state.app = app;
3182 
3183                 nxt_router_port_select(task, &state);
3184 
3185                 goto re_ra_cancelled;
3186             }
3187         }
3188     }
3189 
3190     re_ra = NULL;
3191 
3192 re_ra_cancelled:
3193 
3194     send_quit = (app->live == 0 && port->app_pending_responses == 0)
3195                 || (app->max_requests > 0 && port->app_pending_responses == 0
3196                     && port->app_responses >= app->max_requests);
3197 
3198     if (send_quit) {
3199         port_unchained = nxt_queue_chk_remove(&port->app_link);
3200 
3201         port->app = NULL;
3202         app->processes--;
3203 
3204     } else {
3205         port_unchained = 0;
3206     }
3207 
3208     adjust_idle_timer = 0;
3209 
3210     if (port->pair[1] != -1 && !send_quit && port->app_pending_responses == 0) {
3211         nxt_assert(port->idle_link.next == NULL);
3212 
3213         if (app->idle_processes == app->spare_processes
3214             && app->adjust_idle_work.data == NULL)
3215         {
3216             adjust_idle_timer = 1;
3217             app->adjust_idle_work.data = app;
3218             app->adjust_idle_work.next = NULL;
3219         }
3220 
3221         if (app->idle_processes < app->spare_processes) {
3222             nxt_queue_insert_tail(&app->spare_ports, &port->idle_link);
3223 
3224         } else {
3225             nxt_queue_insert_tail(&app->idle_ports, &port->idle_link);
3226 
3227             port->idle_start = task->thread->engine->timers.now;
3228         }
3229 
3230         app->idle_processes++;
3231     }
3232 
3233     nxt_thread_mutex_unlock(&app->mutex);
3234 
3235     if (adjust_idle_timer) {
3236         nxt_router_app_use(task, app, 1);
3237         nxt_event_engine_post(app->engine, &app->adjust_idle_work);
3238     }
3239 
3240     if (pending_ra != NULL) {
3241         nxt_router_ra_use(task, pending_ra, -1);
3242     }
3243 
3244     if (re_ra != NULL) {
3245         if (nxt_router_port_post_select(task, &state) == NXT_OK) {
3246             nxt_work_queue_add(&task->thread->engine->fast_work_queue,
3247                                nxt_router_app_process_request,
3248                                &task->thread->engine->task, app, re_ra);
3249         }
3250     }
3251 
3252     if (ra != NULL) {
3253         nxt_router_ra_use(task, ra, -1);
3254 
3255         nxt_work_queue_add(&task->thread->engine->fast_work_queue,
3256                            nxt_router_app_process_request,
3257                            &task->thread->engine->task, app, ra);
3258 
3259         goto adjust_use;
3260     }
3261 
3262     /* ? */
3263     if (port->pair[1] == -1) {
3264         nxt_debug(task, "app '%V' %p port %p already closed (pid %PI dead?)",
3265                   &app->name, app, port, port->pid);
3266 
3267         goto adjust_use;
3268     }
3269 
3270     if (send_quit) {
3271         nxt_debug(task, "app '%V' %p send QUIT to port",
3272                   &app->name, app);
3273 
3274         nxt_port_socket_write(task, port, NXT_PORT_MSG_QUIT,
3275                               -1, 0, 0, NULL);
3276 
3277         if (port_unchained) {
3278             nxt_port_use(task, port, -1);
3279         }
3280 
3281         nxt_router_app_use(task, app, -1);
3282 
3283         goto adjust_use;
3284     }
3285 
3286     nxt_debug(task, "app '%V' %p requests queue is empty, keep the port",
3287               &app->name, app);
3288 
3289 adjust_use:
3290 
3291     if (request_failed > 0 || got_response > 0) {
3292         nxt_port_use(task, port, -1);
3293     }
3294 }
3295 
3296 
3297 void
3298 nxt_router_app_port_close(nxt_task_t *task, nxt_port_t *port)
3299 {
3300     nxt_app_t         *app;
3301     nxt_bool_t        unchain, start_process;
3302     nxt_port_t        *idle_port;
3303     nxt_queue_link_t  *idle_lnk;
3304 
3305     app = port->app;
3306 
3307     nxt_assert(app != NULL);
3308 
3309     nxt_thread_mutex_lock(&app->mutex);
3310 
3311     unchain = nxt_queue_chk_remove(&port->app_link);
3312 
3313     if (nxt_queue_chk_remove(&port->idle_link)) {
3314         app->idle_processes--;
3315 
3316         if (port->idle_start == 0
3317             && app->idle_processes >= app->spare_processes)
3318         {
3319             nxt_assert(!nxt_queue_is_empty(&app->idle_ports));
3320 
3321             idle_lnk = nxt_queue_last(&app->idle_ports);
3322             idle_port = nxt_queue_link_data(idle_lnk, nxt_port_t, idle_link);
3323             nxt_queue_remove(idle_lnk);
3324 
3325             nxt_queue_insert_tail(&app->spare_ports, idle_lnk);
3326 
3327             idle_port->idle_start = 0;
3328         }
3329     }
3330 
3331     app->processes--;
3332 
3333     start_process = app->live != 0
3334                     && !task->thread->engine->shutdown
3335                     && nxt_router_app_can_start(app)
3336                     && (!nxt_queue_is_empty(&app->requests)
3337                         || nxt_router_app_need_start(app));
3338 
3339     if (start_process) {
3340         app->pending_processes++;
3341     }
3342 
3343     nxt_thread_mutex_unlock(&app->mutex);
3344 
3345     nxt_debug(task, "app '%V' pid %PI closed", &app->name, port->pid);
3346 
3347     if (unchain) {
3348         nxt_port_use(task, port, -1);
3349     }
3350 
3351     if (start_process) {
3352         nxt_router_start_app_process(task, app);
3353     }
3354 }
3355 
3356 
3357 static void
3358 nxt_router_adjust_idle_timer(nxt_task_t *task, void *obj, void *data)
3359 {
3360     nxt_app_t           *app;
3361     nxt_bool_t          queued;
3362     nxt_port_t          *port;
3363     nxt_msec_t          timeout, threshold;
3364     nxt_queue_link_t    *lnk;
3365     nxt_event_engine_t  *engine;
3366 
3367     app = obj;
3368     queued = (data == app);
3369 
3370     nxt_debug(task, "nxt_router_adjust_idle_timer: app \"%V\", queued %b",
3371               &app->name, queued);
3372 
3373     engine = task->thread->engine;
3374 
3375     nxt_assert(app->engine == engine);
3376 
3377     threshold = engine->timers.now + app->idle_timer.precision;
3378     timeout = 0;
3379 
3380     nxt_thread_mutex_lock(&app->mutex);
3381 
3382     if (queued) {
3383         app->adjust_idle_work.data = NULL;
3384     }
3385 
3386     while (app->idle_processes > app->spare_processes) {
3387 
3388         nxt_assert(!nxt_queue_is_empty(&app->idle_ports));
3389 
3390         lnk = nxt_queue_first(&app->idle_ports);
3391         port = nxt_queue_link_data(lnk, nxt_port_t, idle_link);
3392 
3393         timeout = port->idle_start + app->idle_timeout;
3394 
3395         if (timeout > threshold) {
3396             break;
3397         }
3398 
3399         nxt_queue_remove(lnk);
3400         lnk->next = NULL;
3401 
3402         nxt_queue_chk_remove(&port->app_link);
3403 
3404         app->idle_processes--;
3405         app->processes--;
3406         port->app = NULL;
3407 
3408         nxt_thread_mutex_unlock(&app->mutex);
3409 
3410         nxt_debug(task, "app '%V' send QUIT to idle port %PI",
3411                   &app->name, port->pid);
3412 
3413         nxt_port_socket_write(task, port, NXT_PORT_MSG_QUIT, -1, 0, 0, NULL);
3414 
3415         nxt_port_use(task, port, -1);
3416         nxt_router_app_use(task, app, -1);
3417 
3418         nxt_thread_mutex_lock(&app->mutex);
3419     }
3420 
3421     nxt_thread_mutex_unlock(&app->mutex);
3422 
3423     if (timeout > threshold) {
3424         nxt_timer_add(engine, &app->idle_timer, timeout - threshold);
3425 
3426     } else {
3427         nxt_timer_disable(engine, &app->idle_timer);
3428     }
3429 
3430     if (queued) {
3431         nxt_router_app_use(task, app, -1);
3432     }
3433 }
3434 
3435 
3436 static void
3437 nxt_router_app_idle_timeout(nxt_task_t *task, void *obj, void *data)
3438 {
3439     nxt_app_t    *app;
3440     nxt_timer_t  *timer;
3441 
3442     timer = obj;
3443     app = nxt_container_of(timer, nxt_app_t, idle_timer);
3444 
3445     nxt_router_adjust_idle_timer(task, app, NULL);
3446 }
3447 
3448 
3449 static void
3450 nxt_router_app_release_handler(nxt_task_t *task, void *obj, void *data)
3451 {
3452     nxt_app_t    *app;
3453     nxt_timer_t  *timer;
3454 
3455     timer = obj;
3456     app = nxt_container_of(timer, nxt_app_t, idle_timer);
3457 
3458     nxt_router_app_use(task, app, -1);
3459 }
3460 
3461 
3462 static void
3463 nxt_router_port_select(nxt_task_t *task, nxt_port_select_state_t *state)
3464 {
3465     nxt_app_t           *app;
3466     nxt_bool_t          can_start_process;
3467     nxt_req_app_link_t  *ra;
3468 
3469     ra = state->ra;
3470     app = state->app;
3471 
3472     state->failed_port_use_delta = 0;
3473 
3474     if (nxt_queue_chk_remove(&ra->link_app_requests))
3475     {
3476         nxt_router_ra_dec_use(ra);
3477     }
3478 
3479     if (nxt_queue_chk_remove(&ra->link_port_pending))
3480     {
3481         nxt_assert(ra->link_app_pending.next != NULL);
3482 
3483         nxt_queue_remove(&ra->link_app_pending);
3484         ra->link_app_pending.next = NULL;
3485 
3486         nxt_router_ra_dec_use(ra);
3487     }
3488 
3489     state->failed_port = ra->app_port;
3490 
3491     if (ra->app_port != NULL) {
3492         state->failed_port_use_delta--;
3493 
3494         state->failed_port->app_pending_responses--;
3495 
3496         if (nxt_queue_chk_remove(&state->failed_port->app_link)) {
3497             state->failed_port_use_delta--;
3498         }
3499 
3500         ra->app_port = NULL;
3501     }
3502 
3503     can_start_process = nxt_router_app_can_start(app);
3504 
3505     state->port = NULL;
3506     state->start_process = 0;
3507 
3508     if (nxt_queue_is_empty(&app->ports)
3509         || (can_start_process && nxt_router_app_first_port_busy(app)) )
3510     {
3511         ra = nxt_router_ra_create(task, ra);
3512 
3513         if (nxt_slow_path(ra == NULL)) {
3514             goto fail;
3515         }
3516 
3517         if (nxt_slow_path(state->failed_port != NULL)) {
3518             nxt_queue_insert_head(&app->requests, &ra->link_app_requests);
3519 
3520         } else {
3521             nxt_queue_insert_tail(&app->requests, &ra->link_app_requests);
3522         }
3523 
3524         nxt_router_ra_inc_use(ra);
3525 
3526         nxt_debug(task, "ra stream #%uD enqueue to app->requests", ra->stream);
3527 
3528         if (can_start_process) {
3529             app->pending_processes++;
3530             state->start_process = 1;
3531         }
3532 
3533     } else {
3534         state->port = nxt_router_pop_first_port(app);
3535 
3536         if (state->port->app_pending_responses > 1) {
3537             ra = nxt_router_ra_create(task, ra);
3538 
3539             if (nxt_slow_path(ra == NULL)) {
3540                 goto fail;
3541             }
3542 
3543             ra->app_port = state->port;
3544 
3545             nxt_router_ra_pending(task, app, ra);
3546         }
3547 
3548         if (can_start_process && nxt_router_app_need_start(app)) {
3549             app->pending_processes++;
3550             state->start_process = 1;
3551         }
3552     }
3553 
3554 fail:
3555 
3556     state->shared_ra = ra;
3557 }
3558 
3559 
3560 static nxt_int_t
3561 nxt_router_port_post_select(nxt_task_t *task, nxt_port_select_state_t *state)
3562 {
3563     nxt_int_t           res;
3564     nxt_app_t           *app;
3565     nxt_req_app_link_t  *ra;
3566 
3567     ra = state->shared_ra;
3568     app = state->app;
3569 
3570     if (state->failed_port_use_delta != 0) {
3571         nxt_port_use(task, state->failed_port, state->failed_port_use_delta);
3572     }
3573 
3574     if (nxt_slow_path(ra == NULL)) {
3575         if (state->port != NULL) {
3576             nxt_port_use(task, state->port, -1);
3577         }
3578 
3579         nxt_router_ra_error(state->ra, 500,
3580                             "Failed to allocate shared req<->app link");
3581         nxt_router_ra_use(task, state->ra, -1);
3582 
3583         return NXT_ERROR;
3584     }
3585 
3586     if (state->port != NULL) {
3587         nxt_debug(task, "already have port for app '%V' %p ", &app->name, app);
3588 
3589         ra->app_port = state->port;
3590 
3591         if (state->start_process) {
3592             nxt_router_start_app_process(task, app);
3593         }
3594 
3595         return NXT_OK;
3596     }
3597 
3598     if (!state->start_process) {
3599         nxt_debug(task, "app '%V' %p too many running or pending processes",
3600                   &app->name, app);
3601 
3602         return NXT_AGAIN;
3603     }
3604 
3605     res = nxt_router_start_app_process(task, app);
3606 
3607     if (nxt_slow_path(res != NXT_OK)) {
3608         nxt_router_ra_error(ra, 500, "Failed to start app process");
3609         nxt_router_ra_use(task, ra, -1);
3610 
3611         return NXT_ERROR;
3612     }
3613 
3614     return NXT_AGAIN;
3615 }
3616 
3617 
3618 static nxt_int_t
3619 nxt_router_app_port(nxt_task_t *task, nxt_app_t *app, nxt_req_app_link_t *ra)
3620 {
3621     nxt_port_select_state_t  state;
3622 
3623     state.ra = ra;
3624     state.app = app;
3625 
3626     nxt_thread_mutex_lock(&app->mutex);
3627 
3628     nxt_router_port_select(task, &state);
3629 
3630     nxt_thread_mutex_unlock(&app->mutex);
3631 
3632     return nxt_router_port_post_select(task, &state);
3633 }
3634 
3635 
3636 void
3637 nxt_router_process_http_request(nxt_task_t *task, nxt_app_parse_ctx_t *ar)
3638 {
3639     nxt_int_t            res;
3640     nxt_app_t            *app;
3641     nxt_port_t           *port;
3642     nxt_event_engine_t   *engine;
3643     nxt_http_request_t   *r;
3644     nxt_req_app_link_t   ra_local, *ra;
3645     nxt_req_conn_link_t  *rc;
3646 
3647     r = ar->request;
3648     app = r->socket_conf->application;
3649 
3650     if (app == NULL) {
3651         nxt_http_request_error(task, r, NXT_HTTP_INTERNAL_SERVER_ERROR);
3652         return;
3653     }
3654 
3655     engine = task->thread->engine;
3656 
3657     rc = nxt_port_rpc_register_handler_ex(task, engine->port,
3658                                           nxt_router_response_ready_handler,
3659                                           nxt_router_response_error_handler,
3660                                           sizeof(nxt_req_conn_link_t));
3661 
3662     if (nxt_slow_path(rc == NULL)) {
3663         nxt_http_request_error(task, r, NXT_HTTP_INTERNAL_SERVER_ERROR);
3664         return;
3665     }
3666 
3667     rc->stream = nxt_port_rpc_ex_stream(rc);
3668     rc->app = app;
3669 
3670     nxt_router_app_use(task, app, 1);
3671 
3672     rc->ap = ar;
3673 
3674     ra = &ra_local;
3675     nxt_router_ra_init(task, ra, rc);
3676 
3677     res = nxt_router_app_port(task, app, ra);
3678 
3679     if (res != NXT_OK) {
3680         return;
3681     }
3682 
3683     ra = rc->ra;
3684     port = ra->app_port;
3685 
3686     nxt_assert(port != NULL);
3687 
3688     nxt_port_rpc_ex_set_peer(task, engine->port, rc, port->pid);
3689 
3690     nxt_router_app_prepare_request(task, ra);
3691 }
3692 
3693 
3694 static void
3695 nxt_router_dummy_buf_completion(nxt_task_t *task, void *obj, void *data)
3696 {
3697 }
3698 
3699 
3700 static void
3701 nxt_router_app_prepare_request(nxt_task_t *task, nxt_req_app_link_t *ra)
3702 {
3703     uint32_t             request_failed;
3704     nxt_buf_t            *b;
3705     nxt_int_t            res;
3706     nxt_port_t           *port, *c_port, *reply_port;
3707     nxt_app_wmsg_t       wmsg;
3708     nxt_app_parse_ctx_t  *ap;
3709 
3710     nxt_assert(ra->app_port != NULL);
3711 
3712     port = ra->app_port;
3713     reply_port = ra->reply_port;
3714     ap = ra->ap;
3715 
3716     request_failed = 1;
3717 
3718     c_port = nxt_process_connected_port_find(port->process, reply_port->pid,
3719                                              reply_port->id);
3720     if (nxt_slow_path(c_port != reply_port)) {
3721         res = nxt_port_send_port(task, port, reply_port, 0);
3722 
3723         if (nxt_slow_path(res != NXT_OK)) {
3724             nxt_router_ra_error(ra, 500,
3725                                 "Failed to send reply port to application");
3726             goto release_port;
3727         }
3728 
3729         nxt_process_connected_port_add(port->process, reply_port);
3730     }
3731 
3732     wmsg.port = port;
3733     wmsg.write = NULL;
3734     wmsg.buf = &wmsg.write;
3735     wmsg.stream = ra->stream;
3736 
3737     res = port->app->prepare_msg(task, &ap->r, &wmsg);
3738 
3739     if (nxt_slow_path(res != NXT_OK)) {
3740         nxt_router_ra_error(ra, 500,
3741                             "Failed to prepare message for application");
3742         goto release_port;
3743     }
3744 
3745     nxt_debug(task, "about to send %O bytes buffer to app process port %d",
3746                     nxt_buf_used_size(wmsg.write),
3747                     wmsg.port->socket.fd);
3748 
3749     request_failed = 0;
3750 
3751     ra->msg_info.buf = wmsg.write;
3752     ra->msg_info.completion_handler = wmsg.write->completion_handler;
3753 
3754     for (b = wmsg.write; b != NULL; b = b->next) {
3755         b->completion_handler = nxt_router_dummy_buf_completion;
3756     }
3757 
3758     res = nxt_port_mmap_get_tracking(task, port, &ra->msg_info.tracking,
3759                                      ra->stream);
3760     if (nxt_slow_path(res != NXT_OK)) {
3761         nxt_router_ra_error(ra, 500,
3762                             "Failed to get tracking area");
3763         goto release_port;
3764     }
3765 
3766     res = nxt_port_socket_twrite(task, wmsg.port, NXT_PORT_MSG_DATA,
3767                                  -1, ra->stream, reply_port->id, wmsg.write,
3768                                  &ra->msg_info.tracking);
3769 
3770     if (nxt_slow_path(res != NXT_OK)) {
3771         nxt_router_ra_error(ra, 500,
3772                             "Failed to send message to application");
3773         goto release_port;
3774     }
3775 
3776 release_port:
3777 
3778     nxt_router_app_port_release(task, port, request_failed, 0);
3779 
3780     nxt_router_ra_update_peer(task, ra);
3781 }
3782 
3783 
3784 static nxt_int_t
3785 nxt_python_prepare_msg(nxt_task_t *task, nxt_app_request_t *r,
3786     nxt_app_wmsg_t *wmsg)
3787 {
3788     nxt_int_t                 rc;
3789     nxt_buf_t                 *b;
3790     nxt_http_field_t          *field;
3791     nxt_app_request_header_t  *h;
3792 
3793     static const nxt_str_t prefix = nxt_string("HTTP_");
3794     static const nxt_str_t eof = nxt_null_string;
3795 
3796     h = &r->header;
3797 
3798 #define RC(S)                                                                 \
3799     do {                                                                      \
3800         rc = (S);                                                             \
3801         if (nxt_slow_path(rc != NXT_OK)) {                                    \
3802             goto fail;                                                        \
3803         }                                                                     \
3804     } while(0)
3805 
3806 #define NXT_WRITE(N)                                                          \
3807     RC(nxt_app_msg_write_str(task, wmsg, N))
3808 
3809     /* TODO error handle, async mmap buffer assignment */
3810 
3811     NXT_WRITE(&h->method);
3812     NXT_WRITE(&h->target);
3813 
3814     if (h->path.start == h->target.start) {
3815         NXT_WRITE(&eof);
3816 
3817     } else {
3818         NXT_WRITE(&h->path);
3819     }
3820 
3821     if (h->query.start != NULL) {
3822         RC(nxt_app_msg_write_size(task, wmsg,
3823                                   h->query.start - h->target.start + 1));
3824     } else {
3825         RC(nxt_app_msg_write_size(task, wmsg, 0));
3826     }
3827 
3828     NXT_WRITE(&h->version);
3829 
3830     NXT_WRITE(&r->remote);
3831     NXT_WRITE(&r->local);
3832 
3833     NXT_WRITE(&h->host);
3834     NXT_WRITE(&h->content_type);
3835     NXT_WRITE(&h->content_length);
3836 
3837     nxt_list_each(field, h->fields) {
3838         RC(nxt_app_msg_write_prefixed_upcase(task, wmsg, &prefix, field->name,
3839                                              field->name_length));
3840         RC(nxt_app_msg_write(task, wmsg, field->value, field->value_length));
3841 
3842     } nxt_list_loop;
3843 
3844     /* end-of-headers mark */
3845     NXT_WRITE(&eof);
3846 
3847     RC(nxt_app_msg_write_size(task, wmsg, r->body.preread_size));
3848 
3849     for (b = r->body.buf; b != NULL; b = b->next) {
3850         RC(nxt_app_msg_write_raw(task, wmsg, b->mem.pos,
3851                                  nxt_buf_mem_used_size(&b->mem)));
3852     }
3853 
3854 #undef NXT_WRITE
3855 #undef RC
3856 
3857     return NXT_OK;
3858 
3859 fail:
3860 
3861     return NXT_ERROR;
3862 }
3863 
3864 
3865 static nxt_int_t
3866 nxt_php_prepare_msg(nxt_task_t *task, nxt_app_request_t *r,
3867     nxt_app_wmsg_t *wmsg)
3868 {
3869     nxt_int_t                 rc;
3870     nxt_buf_t                 *b;
3871     nxt_bool_t                method_is_post;
3872     nxt_http_field_t          *field;
3873     nxt_app_request_header_t  *h;
3874 
3875     static const nxt_str_t prefix = nxt_string("HTTP_");
3876     static const nxt_str_t eof = nxt_null_string;
3877 
3878     h = &r->header;
3879 
3880 #define RC(S)                                                                 \
3881     do {                                                                      \
3882         rc = (S);                                                             \
3883         if (nxt_slow_path(rc != NXT_OK)) {                                    \
3884             goto fail;                                                        \
3885         }                                                                     \
3886     } while(0)
3887 
3888 #define NXT_WRITE(N)                                                          \
3889     RC(nxt_app_msg_write_str(task, wmsg, N))
3890 
3891     /* TODO error handle, async mmap buffer assignment */
3892 
3893     NXT_WRITE(&h->method);
3894     NXT_WRITE(&h->target);
3895 
3896     if (h->path.start == h->target.start) {
3897         NXT_WRITE(&eof);
3898 
3899     } else {
3900         NXT_WRITE(&h->path);
3901     }
3902 
3903     if (h->query.start != NULL) {
3904         RC(nxt_app_msg_write_size(task, wmsg,
3905                                   h->query.start - h->target.start + 1));
3906     } else {
3907         RC(nxt_app_msg_write_size(task, wmsg, 0));
3908     }
3909 
3910     NXT_WRITE(&h->version);
3911 
3912     // PHP_SELF
3913     // SCRIPT_NAME
3914     // SCRIPT_FILENAME
3915     // DOCUMENT_ROOT
3916 
3917     NXT_WRITE(&r->remote);
3918     NXT_WRITE(&r->local);
3919 
3920     NXT_WRITE(&h->host);
3921     NXT_WRITE(&h->cookie);
3922     NXT_WRITE(&h->content_type);
3923     NXT_WRITE(&h->content_length);
3924 
3925     RC(nxt_app_msg_write_size(task, wmsg, h->parsed_content_length));
3926     RC(nxt_app_msg_write_size(task, wmsg, r->body.preread_size));
3927 
3928     method_is_post = h->method.length == 4
3929                      && h->method.start[0] == 'P'
3930                      && h->method.start[1] == 'O'
3931                      && h->method.start[2] == 'S'
3932                      && h->method.start[3] == 'T';
3933 
3934     if (method_is_post) {
3935         for (b = r->body.buf; b != NULL; b = b->next) {
3936             RC(nxt_app_msg_write_raw(task, wmsg, b->mem.pos,
3937                                      nxt_buf_mem_used_size(&b->mem)));
3938         }
3939     }
3940 
3941     nxt_list_each(field, h->fields) {
3942         RC(nxt_app_msg_write_prefixed_upcase(task, wmsg, &prefix, field->name,
3943                                              field->name_length));
3944         RC(nxt_app_msg_write(task, wmsg, field->value, field->value_length));
3945 
3946     } nxt_list_loop;
3947 
3948     /* end-of-headers mark */
3949     NXT_WRITE(&eof);
3950 
3951     if (!method_is_post) {
3952         for (b = r->body.buf; b != NULL; b = b->next) {
3953             RC(nxt_app_msg_write_raw(task, wmsg, b->mem.pos,
3954                                      nxt_buf_mem_used_size(&b->mem)));
3955         }
3956     }
3957 
3958 #undef NXT_WRITE
3959 #undef RC
3960 
3961     return NXT_OK;
3962 
3963 fail:
3964 
3965     return NXT_ERROR;
3966 }
3967 
3968 
3969 static nxt_int_t
3970 nxt_go_prepare_msg(nxt_task_t *task, nxt_app_request_t *r, nxt_app_wmsg_t *wmsg)
3971 {
3972     nxt_int_t                 rc;
3973     nxt_buf_t                 *b;
3974     nxt_http_field_t          *field;
3975     nxt_app_request_header_t  *h;
3976 
3977     static const nxt_str_t eof = nxt_null_string;
3978 
3979     h = &r->header;
3980 
3981 #define RC(S)                                                                 \
3982     do {                                                                      \
3983         rc = (S);                                                             \
3984         if (nxt_slow_path(rc != NXT_OK)) {                                    \
3985             goto fail;                                                        \
3986         }                                                                     \
3987     } while(0)
3988 
3989 #define NXT_WRITE(N)                                                          \
3990     RC(nxt_app_msg_write_str(task, wmsg, N))
3991 
3992     /* TODO error handle, async mmap buffer assignment */
3993 
3994     NXT_WRITE(&h->method);
3995     NXT_WRITE(&h->target);
3996 
3997     if (h->path.start == h->target.start) {
3998         NXT_WRITE(&eof);
3999 
4000     } else {
4001         NXT_WRITE(&h->path);
4002     }
4003 
4004     if (h->query.start != NULL) {
4005         RC(nxt_app_msg_write_size(task, wmsg,
4006                                   h->query.start - h->target.start + 1));
4007     } else {
4008         RC(nxt_app_msg_write_size(task, wmsg, 0));
4009     }
4010 
4011     NXT_WRITE(&h->version);
4012     NXT_WRITE(&r->remote);
4013 
4014     NXT_WRITE(&h->host);
4015     NXT_WRITE(&h->cookie);
4016     NXT_WRITE(&h->content_type);
4017     NXT_WRITE(&h->content_length);
4018 
4019     RC(nxt_app_msg_write_size(task, wmsg, h->parsed_content_length));
4020 
4021     nxt_list_each(field, h->fields) {
4022         RC(nxt_app_msg_write(task, wmsg, field->name, field->name_length));
4023         RC(nxt_app_msg_write(task, wmsg, field->value, field->value_length));
4024 
4025     } nxt_list_loop;
4026 
4027     /* end-of-headers mark */
4028     NXT_WRITE(&eof);
4029 
4030     RC(nxt_app_msg_write_size(task, wmsg, r->body.preread_size));
4031 
4032     for (b = r->body.buf; b != NULL; b = b->next) {
4033         RC(nxt_app_msg_write_raw(task, wmsg, b->mem.pos,
4034                                  nxt_buf_mem_used_size(&b->mem)));
4035     }
4036 
4037 #undef NXT_WRITE
4038 #undef RC
4039 
4040     return NXT_OK;
4041 
4042 fail:
4043 
4044     return NXT_ERROR;
4045 }
4046 
4047 
4048 static nxt_int_t
4049 nxt_perl_prepare_msg(nxt_task_t *task, nxt_app_request_t *r,
4050     nxt_app_wmsg_t *wmsg)
4051 {
4052     nxt_int_t                 rc;
4053     nxt_str_t                 str;
4054     nxt_buf_t                 *b;
4055     nxt_http_field_t          *field;
4056     nxt_app_request_header_t  *h;
4057 
4058     static const nxt_str_t prefix = nxt_string("HTTP_");
4059     static const nxt_str_t eof = nxt_null_string;
4060 
4061     h = &r->header;
4062 
4063 #define RC(S)                                                                 \
4064     do {                                                                      \
4065         rc = (S);                                                             \
4066         if (nxt_slow_path(rc != NXT_OK)) {                                    \
4067             goto fail;                                                        \
4068         }                                                                     \
4069     } while(0)
4070 
4071 #define NXT_WRITE(N)                                                          \
4072     RC(nxt_app_msg_write_str(task, wmsg, N))
4073 
4074     /* TODO error handle, async mmap buffer assignment */
4075 
4076     NXT_WRITE(&h->method);
4077     NXT_WRITE(&h->target);
4078 
4079     if (h->query.length) {
4080         str.start = h->target.start;
4081         str.length = (h->target.length - h->query.length) - 1;
4082 
4083         RC(nxt_app_msg_write_str(task, wmsg, &str));
4084 
4085     } else {
4086         NXT_WRITE(&eof);
4087     }
4088 
4089     if (h->query.start != NULL) {
4090         RC(nxt_app_msg_write_size(task, wmsg,
4091                                   h->query.start - h->target.start + 1));
4092     } else {
4093         RC(nxt_app_msg_write_size(task, wmsg, 0));
4094     }
4095 
4096     NXT_WRITE(&h->version);
4097 
4098     NXT_WRITE(&r->remote);
4099     NXT_WRITE(&r->local);
4100 
4101     NXT_WRITE(&h->host);
4102     NXT_WRITE(&h->content_type);
4103     NXT_WRITE(&h->content_length);
4104 
4105     nxt_list_each(field, h->fields) {
4106         RC(nxt_app_msg_write_prefixed_upcase(task, wmsg, &prefix,
4107                                              field->name, field->name_length));
4108         RC(nxt_app_msg_write(task, wmsg, field->value, field->value_length));
4109     } nxt_list_loop;
4110 
4111     /* end-of-headers mark */
4112     NXT_WRITE(&eof);
4113 
4114     RC(nxt_app_msg_write_size(task, wmsg, r->body.preread_size));
4115 
4116     for (b = r->body.buf; b != NULL; b = b->next) {
4117 
4118         RC(nxt_app_msg_write_raw(task, wmsg, b->mem.pos,
4119                                  nxt_buf_mem_used_size(&b->mem)));
4120     }
4121 
4122 #undef NXT_WRITE
4123 #undef RC
4124 
4125     return NXT_OK;
4126 
4127 fail:
4128 
4129     return NXT_ERROR;
4130 }
4131 
4132 
4133 static nxt_int_t
4134 nxt_ruby_prepare_msg(nxt_task_t *task, nxt_app_request_t *r,
4135     nxt_app_wmsg_t *wmsg)
4136 {
4137     nxt_int_t                 rc;
4138     nxt_str_t                 str;
4139     nxt_buf_t                 *b;
4140     nxt_http_field_t          *field;
4141     nxt_app_request_header_t  *h;
4142 
4143     static const nxt_str_t prefix = nxt_string("HTTP_");
4144     static const nxt_str_t eof = nxt_null_string;
4145 
4146     h = &r->header;
4147 
4148 #define RC(S)                                                                 \
4149     do {                                                                      \
4150         rc = (S);                                                             \
4151         if (nxt_slow_path(rc != NXT_OK)) {                                    \
4152             goto fail;                                                        \
4153         }                                                                     \
4154     } while(0)
4155 
4156 #define NXT_WRITE(N)                                                          \
4157     RC(nxt_app_msg_write_str(task, wmsg, N))
4158 
4159     /* TODO error handle, async mmap buffer assignment */
4160 
4161     NXT_WRITE(&h->method);
4162     NXT_WRITE(&h->target);
4163 
4164     if (h->query.length) {
4165         str.start = h->target.start;
4166         str.length = (h->target.length - h->query.length) - 1;
4167 
4168         RC(nxt_app_msg_write_str(task, wmsg, &str));
4169 
4170     } else {
4171         NXT_WRITE(&eof);
4172     }
4173 
4174     if (h->query.start != NULL) {
4175         RC(nxt_app_msg_write_size(task, wmsg,
4176                                   h->query.start - h->target.start + 1));
4177     } else {
4178         RC(nxt_app_msg_write_size(task, wmsg, 0));
4179     }
4180 
4181     NXT_WRITE(&h->version);
4182 
4183     NXT_WRITE(&r->remote);
4184     NXT_WRITE(&r->local);
4185 
4186     NXT_WRITE(&h->host);
4187     NXT_WRITE(&h->content_type);
4188     NXT_WRITE(&h->content_length);
4189 
4190     nxt_list_each(field, h->fields) {
4191         RC(nxt_app_msg_write_prefixed_upcase(task, wmsg, &prefix,
4192                                              field->name, field->name_length));
4193         RC(nxt_app_msg_write(task, wmsg, field->value, field->value_length));
4194     } nxt_list_loop;
4195 
4196     /* end-of-headers mark */
4197     NXT_WRITE(&eof);
4198 
4199     RC(nxt_app_msg_write_size(task, wmsg, r->body.preread_size));
4200 
4201     for (b = r->body.buf; b != NULL; b = b->next) {
4202 
4203         RC(nxt_app_msg_write_raw(task, wmsg, b->mem.pos,
4204                                  nxt_buf_mem_used_size(&b->mem)));
4205     }
4206 
4207 #undef NXT_WRITE
4208 #undef RC
4209 
4210     return NXT_OK;
4211 
4212 fail:
4213 
4214     return NXT_ERROR;
4215 }
4216 
4217 
4218 const nxt_conn_state_t  nxt_router_conn_close_state
4219     nxt_aligned(64) =
4220 {
4221     .ready_handler = nxt_router_conn_free,
4222 };
4223 
4224 
4225 static void
4226 nxt_router_conn_mp_cleanup(nxt_task_t *task, void *obj, void *data)
4227 {
4228     nxt_socket_conf_joint_t  *joint;
4229 
4230     joint = obj;
4231 
4232     nxt_router_conf_release(task, joint);
4233 }
4234 
4235 
4236 static void
4237 nxt_router_conn_free(nxt_task_t *task, void *obj, void *data)
4238 {
4239     nxt_conn_t               *c;
4240     nxt_event_engine_t       *engine;
4241     nxt_socket_conf_joint_t  *joint;
4242 
4243     c = obj;
4244 
4245     nxt_debug(task, "router conn close done");
4246 
4247     nxt_queue_remove(&c->link);
4248 
4249     engine = task->thread->engine;
4250 
4251     nxt_sockaddr_cache_free(engine, c);
4252 
4253     joint = c->joint;
4254 
4255     nxt_mp_cleanup(c->mem_pool, nxt_router_conn_mp_cleanup,
4256                    &engine->task, joint, NULL);
4257 
4258     nxt_conn_free(task, c);
4259 }
4260 
4261 
4262 static void
4263 nxt_router_app_timeout(nxt_task_t *task, void *obj, void *data)
4264 {
4265     nxt_timer_t          *timer;
4266     nxt_app_parse_ctx_t  *ar;
4267 
4268     timer = obj;
4269 
4270     nxt_debug(task, "router app timeout");
4271 
4272     ar = nxt_timer_data(timer, nxt_app_parse_ctx_t, timer);
4273 
4274     if (!ar->request->header_sent) {
4275         nxt_http_request_error(task, ar->request, NXT_HTTP_SERVICE_UNAVAILABLE);
4276     }
4277 }
4278