xref: /unit/src/nxt_router.c (revision 510:4979fe09d9cd)
1 
2 /*
3  * Copyright (C) Igor Sysoev
4  * Copyright (C) Valentin V. Bartenev
5  * Copyright (C) NGINX, Inc.
6  */
7 
8 #include <nxt_router.h>
9 #include <nxt_conf.h>
10 #include <nxt_http.h>
11 
12 
13 typedef struct {
14     nxt_str_t         type;
15     uint32_t          processes;
16     uint32_t          max_processes;
17     uint32_t          spare_processes;
18     nxt_msec_t        timeout;
19     nxt_msec_t        res_timeout;
20     nxt_msec_t        idle_timeout;
21     uint32_t          requests;
22     nxt_conf_value_t  *limits_value;
23     nxt_conf_value_t  *processes_value;
24 } nxt_router_app_conf_t;
25 
26 
27 typedef struct {
28     nxt_str_t  application;
29 } nxt_router_listener_conf_t;
30 
31 
32 typedef struct nxt_msg_info_s {
33     nxt_buf_t                 *buf;
34     nxt_port_mmap_tracking_t  tracking;
35     nxt_work_handler_t        completion_handler;
36 } nxt_msg_info_t;
37 
38 
39 typedef struct nxt_req_app_link_s nxt_req_app_link_t;
40 
41 
42 typedef struct {
43     uint32_t                 stream;
44     nxt_app_t                *app;
45     nxt_port_t               *app_port;
46     nxt_app_parse_ctx_t      *ap;
47     nxt_msg_info_t           msg_info;
48     nxt_req_app_link_t       *ra;
49 
50     nxt_queue_link_t         link;     /* for nxt_conn_t.requests */
51 } nxt_req_conn_link_t;
52 
53 
54 struct nxt_req_app_link_s {
55     uint32_t             stream;
56     nxt_atomic_t         use_count;
57     nxt_port_t           *app_port;
58     nxt_port_t           *reply_port;
59     nxt_app_parse_ctx_t  *ap;
60     nxt_msg_info_t       msg_info;
61     nxt_req_conn_link_t  *rc;
62 
63     nxt_nsec_t           res_time;
64 
65     nxt_queue_link_t     link_app_requests; /* for nxt_app_t.requests */
66     nxt_queue_link_t     link_port_pending; /* for nxt_port_t.pending_requests */
67     nxt_queue_link_t     link_app_pending;  /* for nxt_app_t.pending */
68 
69     nxt_mp_t             *mem_pool;
70     nxt_work_t           work;
71 
72     int                  err_code;
73     const char           *err_str;
74 };
75 
76 
77 typedef struct {
78     nxt_socket_conf_t       *socket_conf;
79     nxt_router_temp_conf_t  *temp_conf;
80 } nxt_socket_rpc_t;
81 
82 
83 typedef struct {
84     nxt_app_t               *app;
85     nxt_router_temp_conf_t  *temp_conf;
86 } nxt_app_rpc_t;
87 
88 
89 struct nxt_port_select_state_s {
90     nxt_app_t           *app;
91     nxt_req_app_link_t  *ra;
92 
93     nxt_port_t          *failed_port;
94     int                 failed_port_use_delta;
95 
96     uint8_t             start_process;    /* 1 bit */
97     nxt_req_app_link_t  *shared_ra;
98     nxt_port_t          *port;
99 };
100 
101 typedef struct nxt_port_select_state_s nxt_port_select_state_t;
102 
103 static void nxt_router_port_select(nxt_task_t *task,
104     nxt_port_select_state_t *state);
105 
106 static nxt_int_t nxt_router_port_post_select(nxt_task_t *task,
107     nxt_port_select_state_t *state);
108 
109 static nxt_int_t nxt_router_start_app_process(nxt_task_t *task, nxt_app_t *app);
110 
111 nxt_inline void
112 nxt_router_ra_inc_use(nxt_req_app_link_t *ra)
113 {
114     nxt_atomic_fetch_add(&ra->use_count, 1);
115 }
116 
117 nxt_inline void
118 nxt_router_ra_dec_use(nxt_req_app_link_t *ra)
119 {
120     int  c;
121 
122     c = nxt_atomic_fetch_add(&ra->use_count, -1);
123 
124     nxt_assert(c > 1);
125 }
126 
127 static void nxt_router_ra_use(nxt_task_t *task, nxt_req_app_link_t *ra, int i);
128 
129 static nxt_router_temp_conf_t *nxt_router_temp_conf(nxt_task_t *task);
130 static void nxt_router_conf_apply(nxt_task_t *task, void *obj, void *data);
131 static void nxt_router_conf_ready(nxt_task_t *task,
132     nxt_router_temp_conf_t *tmcf);
133 static void nxt_router_conf_error(nxt_task_t *task,
134     nxt_router_temp_conf_t *tmcf);
135 static void nxt_router_conf_send(nxt_task_t *task,
136     nxt_router_temp_conf_t *tmcf, nxt_port_msg_type_t type);
137 
138 static nxt_int_t nxt_router_conf_create(nxt_task_t *task,
139     nxt_router_temp_conf_t *tmcf, u_char *start, u_char *end);
140 static nxt_app_t *nxt_router_app_find(nxt_queue_t *queue, nxt_str_t *name);
141 static nxt_app_t *nxt_router_listener_application(nxt_router_temp_conf_t *tmcf,
142     nxt_str_t *name);
143 static void nxt_router_listen_socket_rpc_create(nxt_task_t *task,
144     nxt_router_temp_conf_t *tmcf, nxt_socket_conf_t *skcf);
145 static void nxt_router_listen_socket_ready(nxt_task_t *task,
146     nxt_port_recv_msg_t *msg, void *data);
147 static void nxt_router_listen_socket_error(nxt_task_t *task,
148     nxt_port_recv_msg_t *msg, void *data);
149 static void nxt_router_app_rpc_create(nxt_task_t *task,
150     nxt_router_temp_conf_t *tmcf, nxt_app_t *app);
151 static void nxt_router_app_prefork_ready(nxt_task_t *task,
152     nxt_port_recv_msg_t *msg, void *data);
153 static void nxt_router_app_prefork_error(nxt_task_t *task,
154     nxt_port_recv_msg_t *msg, void *data);
155 static nxt_socket_conf_t *nxt_router_socket_conf(nxt_task_t *task,
156     nxt_router_temp_conf_t *tmcf, nxt_str_t *name);
157 static nxt_int_t nxt_router_listen_socket_find(nxt_router_temp_conf_t *tmcf,
158     nxt_socket_conf_t *nskcf, nxt_sockaddr_t *sa);
159 
160 static nxt_int_t nxt_router_engines_create(nxt_task_t *task,
161     nxt_router_t *router, nxt_router_temp_conf_t *tmcf,
162     const nxt_event_interface_t *interface);
163 static nxt_int_t nxt_router_engine_conf_create(nxt_router_temp_conf_t *tmcf,
164     nxt_router_engine_conf_t *recf);
165 static nxt_int_t nxt_router_engine_conf_update(nxt_router_temp_conf_t *tmcf,
166     nxt_router_engine_conf_t *recf);
167 static nxt_int_t nxt_router_engine_conf_delete(nxt_router_temp_conf_t *tmcf,
168     nxt_router_engine_conf_t *recf);
169 static nxt_int_t nxt_router_engine_joints_create(nxt_router_temp_conf_t *tmcf,
170     nxt_router_engine_conf_t *recf, nxt_queue_t *sockets,
171     nxt_work_handler_t handler);
172 static nxt_int_t nxt_router_engine_quit(nxt_router_temp_conf_t *tmcf,
173     nxt_router_engine_conf_t *recf);
174 static nxt_int_t nxt_router_engine_joints_delete(nxt_router_temp_conf_t *tmcf,
175     nxt_router_engine_conf_t *recf, nxt_queue_t *sockets);
176 
177 static nxt_int_t nxt_router_threads_create(nxt_task_t *task, nxt_runtime_t *rt,
178     nxt_router_temp_conf_t *tmcf);
179 static nxt_int_t nxt_router_thread_create(nxt_task_t *task, nxt_runtime_t *rt,
180     nxt_event_engine_t *engine);
181 static void nxt_router_apps_sort(nxt_task_t *task, nxt_router_t *router,
182     nxt_router_temp_conf_t *tmcf);
183 
184 static void nxt_router_engines_post(nxt_router_t *router,
185     nxt_router_temp_conf_t *tmcf);
186 static void nxt_router_engine_post(nxt_event_engine_t *engine,
187     nxt_work_t *jobs);
188 
189 static void nxt_router_thread_start(void *data);
190 static void nxt_router_listen_socket_create(nxt_task_t *task, void *obj,
191     void *data);
192 static void nxt_router_listen_socket_update(nxt_task_t *task, void *obj,
193     void *data);
194 static void nxt_router_listen_socket_delete(nxt_task_t *task, void *obj,
195     void *data);
196 static void nxt_router_worker_thread_quit(nxt_task_t *task, void *obj,
197     void *data);
198 static void nxt_router_listen_socket_close(nxt_task_t *task, void *obj,
199     void *data);
200 static void nxt_router_thread_exit_handler(nxt_task_t *task, void *obj,
201     void *data);
202 static void nxt_router_listen_socket_release(nxt_task_t *task,
203     nxt_socket_conf_t *skcf);
204 static void nxt_router_conf_release(nxt_task_t *task,
205     nxt_socket_conf_joint_t *joint);
206 
207 static void nxt_router_app_port_ready(nxt_task_t *task,
208     nxt_port_recv_msg_t *msg, void *data);
209 static void nxt_router_app_port_error(nxt_task_t *task,
210     nxt_port_recv_msg_t *msg, void *data);
211 
212 static void nxt_router_app_quit(nxt_task_t *task, nxt_app_t *app);
213 static void nxt_router_app_port_release(nxt_task_t *task, nxt_port_t *port,
214     uint32_t request_failed, uint32_t got_response);
215 static nxt_int_t nxt_router_app_port(nxt_task_t *task, nxt_app_t *app,
216     nxt_req_app_link_t *ra);
217 
218 static void nxt_router_app_prepare_request(nxt_task_t *task,
219     nxt_req_app_link_t *ra);
220 static nxt_int_t nxt_python_prepare_msg(nxt_task_t *task, nxt_app_request_t *r,
221     nxt_app_wmsg_t *wmsg);
222 static nxt_int_t nxt_php_prepare_msg(nxt_task_t *task, nxt_app_request_t *r,
223     nxt_app_wmsg_t *wmsg);
224 static nxt_int_t nxt_go_prepare_msg(nxt_task_t *task, nxt_app_request_t *r,
225     nxt_app_wmsg_t *wmsg);
226 static nxt_int_t nxt_perl_prepare_msg(nxt_task_t *task, nxt_app_request_t *r,
227     nxt_app_wmsg_t *wmsg);
228 
229 static void nxt_router_conn_free(nxt_task_t *task, void *obj, void *data);
230 static void nxt_router_app_timeout(nxt_task_t *task, void *obj, void *data);
231 static void nxt_router_adjust_idle_timer(nxt_task_t *task, void *obj,
232     void *data);
233 static void nxt_router_app_idle_timeout(nxt_task_t *task, void *obj,
234     void *data);
235 static void nxt_router_app_release_handler(nxt_task_t *task, void *obj,
236     void *data);
237 
238 static const nxt_http_request_state_t  nxt_http_request_send_state;
239 static void nxt_http_request_send_body(nxt_task_t *task, void *obj, void *data);
240 
241 static nxt_router_t  *nxt_router;
242 
243 
244 static nxt_app_prepare_msg_t  nxt_app_prepare_msg[] = {
245     nxt_python_prepare_msg,
246     nxt_php_prepare_msg,
247     nxt_go_prepare_msg,
248     nxt_perl_prepare_msg,
249 };
250 
251 
252 nxt_int_t
253 nxt_router_start(nxt_task_t *task, void *data)
254 {
255     nxt_int_t      ret;
256     nxt_router_t   *router;
257     nxt_runtime_t  *rt;
258 
259     rt = task->thread->runtime;
260 
261     ret = nxt_http_init(task, rt);
262     if (nxt_slow_path(ret != NXT_OK)) {
263         return ret;
264     }
265 
266     router = nxt_zalloc(sizeof(nxt_router_t));
267     if (nxt_slow_path(router == NULL)) {
268         return NXT_ERROR;
269     }
270 
271     nxt_queue_init(&router->engines);
272     nxt_queue_init(&router->sockets);
273     nxt_queue_init(&router->apps);
274 
275     nxt_router = router;
276 
277     return NXT_OK;
278 }
279 
280 
281 static void
282 nxt_router_start_app_process_handler(nxt_task_t *task, nxt_port_t *port,
283     void *data)
284 {
285     size_t         size;
286     uint32_t       stream;
287     nxt_mp_t       *mp;
288     nxt_app_t      *app;
289     nxt_buf_t      *b;
290     nxt_port_t     *main_port;
291     nxt_runtime_t  *rt;
292 
293     app = data;
294 
295     rt = task->thread->runtime;
296     main_port = rt->port_by_type[NXT_PROCESS_MAIN];
297 
298     nxt_debug(task, "app '%V' %p start process", &app->name, app);
299 
300     size = app->name.length + 1 + app->conf.length;
301 
302     b = nxt_buf_mem_ts_alloc(task, task->thread->engine->mem_pool, size);
303 
304     if (nxt_slow_path(b == NULL)) {
305         goto failed;
306     }
307 
308     nxt_buf_cpystr(b, &app->name);
309     *b->mem.free++ = '\0';
310     nxt_buf_cpystr(b, &app->conf);
311 
312     stream = nxt_port_rpc_register_handler(task, port,
313                                            nxt_router_app_port_ready,
314                                            nxt_router_app_port_error,
315                                            -1, app);
316 
317     if (nxt_slow_path(stream == 0)) {
318         mp = b->data;
319         nxt_mp_free(mp, b);
320         nxt_mp_release(mp);
321 
322         goto failed;
323     }
324 
325     nxt_port_socket_write(task, main_port, NXT_PORT_MSG_START_WORKER, -1,
326                           stream, port->id, b);
327 
328     return;
329 
330 failed:
331 
332     nxt_thread_mutex_lock(&app->mutex);
333 
334     app->pending_processes--;
335 
336     nxt_thread_mutex_unlock(&app->mutex);
337 
338     nxt_router_app_use(task, app, -1);
339 }
340 
341 
342 static nxt_int_t
343 nxt_router_start_app_process(nxt_task_t *task, nxt_app_t *app)
344 {
345     nxt_int_t      res;
346     nxt_port_t     *router_port;
347     nxt_runtime_t  *rt;
348 
349     rt = task->thread->runtime;
350     router_port = rt->port_by_type[NXT_PROCESS_ROUTER];
351 
352     nxt_router_app_use(task, app, 1);
353 
354     res = nxt_port_post(task, router_port, nxt_router_start_app_process_handler,
355                         app);
356 
357     if (res == NXT_OK) {
358         return res;
359     }
360 
361     nxt_thread_mutex_lock(&app->mutex);
362 
363     app->pending_processes--;
364 
365     nxt_thread_mutex_unlock(&app->mutex);
366 
367     nxt_router_app_use(task, app, -1);
368 
369     return NXT_ERROR;
370 }
371 
372 
373 nxt_inline void
374 nxt_router_ra_init(nxt_task_t *task, nxt_req_app_link_t *ra,
375     nxt_req_conn_link_t *rc)
376 {
377     nxt_event_engine_t  *engine;
378 
379     engine = task->thread->engine;
380 
381     nxt_memzero(ra, sizeof(nxt_req_app_link_t));
382 
383     ra->stream = rc->stream;
384     ra->use_count = 1;
385     ra->rc = rc;
386     rc->ra = ra;
387     ra->reply_port = engine->port;
388     ra->ap = rc->ap;
389 
390     ra->work.handler = NULL;
391     ra->work.task = &engine->task;
392     ra->work.obj = ra;
393     ra->work.data = engine;
394 }
395 
396 
397 nxt_inline nxt_req_app_link_t *
398 nxt_router_ra_create(nxt_task_t *task, nxt_req_app_link_t *ra_src)
399 {
400     nxt_mp_t            *mp;
401     nxt_req_app_link_t  *ra;
402 
403     if (ra_src->mem_pool != NULL) {
404         return ra_src;
405     }
406 
407     mp = ra_src->ap->mem_pool;
408 
409     ra = nxt_mp_alloc(mp, sizeof(nxt_req_app_link_t));
410 
411     if (nxt_slow_path(ra == NULL)) {
412 
413         ra_src->rc->ra = NULL;
414         ra_src->rc = NULL;
415 
416         return NULL;
417     }
418 
419     nxt_mp_retain(mp);
420 
421     nxt_router_ra_init(task, ra, ra_src->rc);
422 
423     ra->mem_pool = mp;
424 
425     return ra;
426 }
427 
428 
429 nxt_inline nxt_bool_t
430 nxt_router_msg_cancel(nxt_task_t *task, nxt_msg_info_t *msg_info,
431     uint32_t stream)
432 {
433     nxt_buf_t   *b, *next;
434     nxt_bool_t  cancelled;
435 
436     if (msg_info->buf == NULL) {
437         return 0;
438     }
439 
440     cancelled = nxt_port_mmap_tracking_cancel(task, &msg_info->tracking,
441                                               stream);
442 
443     if (cancelled) {
444         nxt_debug(task, "stream #%uD: cancelled by router", stream);
445     }
446 
447     for (b = msg_info->buf; b != NULL; b = next) {
448         next = b->next;
449 
450         b->completion_handler = msg_info->completion_handler;
451 
452         if (b->is_port_mmap_sent) {
453             b->is_port_mmap_sent = cancelled == 0;
454             b->completion_handler(task, b, b->parent);
455         }
456     }
457 
458     msg_info->buf = NULL;
459 
460     return cancelled;
461 }
462 
463 
464 static void
465 nxt_router_ra_update_peer(nxt_task_t *task, nxt_req_app_link_t *ra);
466 
467 
468 static void
469 nxt_router_ra_update_peer_handler(nxt_task_t *task, void *obj, void *data)
470 {
471     nxt_req_app_link_t  *ra;
472 
473     ra = obj;
474 
475     nxt_router_ra_update_peer(task, ra);
476 
477     nxt_router_ra_use(task, ra, -1);
478 }
479 
480 
481 static void
482 nxt_router_ra_update_peer(nxt_task_t *task, nxt_req_app_link_t *ra)
483 {
484     nxt_event_engine_t   *engine;
485     nxt_req_conn_link_t  *rc;
486 
487     engine = ra->work.data;
488 
489     if (task->thread->engine != engine) {
490         nxt_router_ra_inc_use(ra);
491 
492         ra->work.handler = nxt_router_ra_update_peer_handler;
493         ra->work.task = &engine->task;
494         ra->work.next = NULL;
495 
496         nxt_debug(task, "ra stream #%uD post update peer to %p",
497                   ra->stream, engine);
498 
499         nxt_event_engine_post(engine, &ra->work);
500 
501         return;
502     }
503 
504     nxt_debug(task, "ra stream #%uD update peer", ra->stream);
505 
506     rc = ra->rc;
507 
508     if (rc != NULL && ra->app_port != NULL) {
509         nxt_port_rpc_ex_set_peer(task, engine->port, rc, ra->app_port->pid);
510     }
511 
512     nxt_router_ra_use(task, ra, -1);
513 }
514 
515 
516 static void
517 nxt_router_ra_release(nxt_task_t *task, nxt_req_app_link_t *ra)
518 {
519     nxt_mp_t                *mp;
520     nxt_req_conn_link_t     *rc;
521 
522     nxt_assert(task->thread->engine == ra->work.data);
523     nxt_assert(ra->use_count == 0);
524 
525     nxt_debug(task, "ra stream #%uD release", ra->stream);
526 
527     rc = ra->rc;
528 
529     if (rc != NULL) {
530         if (nxt_slow_path(ra->err_code != 0)) {
531             nxt_http_request_error(task, rc->ap->request, ra->err_code);
532 
533         } else {
534             rc->app_port = ra->app_port;
535             rc->msg_info = ra->msg_info;
536 
537             if (rc->app->timeout != 0) {
538                 rc->ap->timer.handler = nxt_router_app_timeout;
539                 nxt_timer_add(task->thread->engine, &rc->ap->timer,
540                               rc->app->timeout);
541             }
542 
543             ra->app_port = NULL;
544             ra->msg_info.buf = NULL;
545         }
546 
547         rc->ra = NULL;
548         ra->rc = NULL;
549     }
550 
551     if (ra->app_port != NULL) {
552         nxt_router_app_port_release(task, ra->app_port, 0, 1);
553 
554         ra->app_port = NULL;
555     }
556 
557     nxt_router_msg_cancel(task, &ra->msg_info, ra->stream);
558 
559     mp = ra->mem_pool;
560 
561     if (mp != NULL) {
562         nxt_mp_free(mp, ra);
563         nxt_mp_release(mp);
564     }
565 }
566 
567 
568 static void
569 nxt_router_ra_release_handler(nxt_task_t *task, void *obj, void *data)
570 {
571     nxt_req_app_link_t  *ra;
572 
573     ra = obj;
574 
575     nxt_assert(ra->work.data == data);
576 
577     nxt_atomic_fetch_add(&ra->use_count, -1);
578 
579     nxt_router_ra_release(task, ra);
580 }
581 
582 
583 static void
584 nxt_router_ra_use(nxt_task_t *task, nxt_req_app_link_t *ra, int i)
585 {
586     int                 c;
587     nxt_event_engine_t  *engine;
588 
589     c = nxt_atomic_fetch_add(&ra->use_count, i);
590 
591     if (i < 0 && c == -i) {
592         engine = ra->work.data;
593 
594         if (task->thread->engine == engine) {
595             nxt_router_ra_release(task, ra);
596 
597             return;
598         }
599 
600         nxt_router_ra_inc_use(ra);
601 
602         ra->work.handler = nxt_router_ra_release_handler;
603         ra->work.task = &engine->task;
604         ra->work.next = NULL;
605 
606         nxt_debug(task, "ra stream #%uD post release to %p",
607                   ra->stream, engine);
608 
609         nxt_event_engine_post(engine, &ra->work);
610     }
611 }
612 
613 
614 nxt_inline void
615 nxt_router_ra_error(nxt_req_app_link_t *ra, int code, const char* str)
616 {
617     ra->app_port = NULL;
618     ra->err_code = code;
619     ra->err_str = str;
620 }
621 
622 
623 nxt_inline void
624 nxt_router_ra_pending(nxt_task_t *task, nxt_app_t *app, nxt_req_app_link_t *ra)
625 {
626     nxt_queue_insert_tail(&ra->app_port->pending_requests,
627                           &ra->link_port_pending);
628     nxt_queue_insert_tail(&app->pending, &ra->link_app_pending);
629 
630     nxt_router_ra_inc_use(ra);
631 
632     ra->res_time = nxt_thread_monotonic_time(task->thread) + app->res_timeout;
633 
634     nxt_debug(task, "ra stream #%uD enqueue to pending_requests", ra->stream);
635 }
636 
637 
638 nxt_inline nxt_bool_t
639 nxt_queue_chk_remove(nxt_queue_link_t *lnk)
640 {
641     if (lnk->next != NULL) {
642         nxt_queue_remove(lnk);
643 
644         lnk->next = NULL;
645 
646         return 1;
647     }
648 
649     return 0;
650 }
651 
652 
653 nxt_inline void
654 nxt_router_rc_unlink(nxt_task_t *task, nxt_req_conn_link_t *rc)
655 {
656     int                 ra_use_delta;
657     nxt_req_app_link_t  *ra;
658 
659     if (rc->app_port != NULL) {
660         nxt_router_app_port_release(task, rc->app_port, 0, 1);
661 
662         rc->app_port = NULL;
663     }
664 
665     nxt_router_msg_cancel(task, &rc->msg_info, rc->stream);
666 
667     ra = rc->ra;
668 
669     if (ra != NULL) {
670         rc->ra = NULL;
671         ra->rc = NULL;
672 
673         ra_use_delta = 0;
674 
675         nxt_thread_mutex_lock(&rc->app->mutex);
676 
677         if (ra->link_app_requests.next == NULL
678             && ra->link_port_pending.next == NULL
679             && ra->link_app_pending.next == NULL)
680         {
681             ra = NULL;
682 
683         } else {
684             ra_use_delta -= nxt_queue_chk_remove(&ra->link_app_requests);
685             ra_use_delta -= nxt_queue_chk_remove(&ra->link_port_pending);
686             nxt_queue_chk_remove(&ra->link_app_pending);
687         }
688 
689         nxt_thread_mutex_unlock(&rc->app->mutex);
690 
691         if (ra != NULL) {
692             nxt_router_ra_use(task, ra, ra_use_delta);
693         }
694     }
695 
696     if (rc->app != NULL) {
697         nxt_router_app_use(task, rc->app, -1);
698 
699         rc->app = NULL;
700     }
701 
702     if (rc->ap != NULL) {
703         nxt_app_http_req_done(task, rc->ap);
704 
705         rc->ap = NULL;
706     }
707 }
708 
709 
710 void
711 nxt_router_new_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
712 {
713     nxt_port_new_port_handler(task, msg);
714 
715     if (msg->port_msg.stream == 0) {
716         return;
717     }
718 
719     if (msg->u.new_port == NULL
720         || msg->u.new_port->type != NXT_PROCESS_WORKER)
721     {
722         msg->port_msg.type = _NXT_PORT_MSG_RPC_ERROR;
723     }
724 
725     nxt_port_rpc_handler(task, msg);
726 }
727 
728 
729 void
730 nxt_router_conf_data_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
731 {
732     nxt_int_t               ret;
733     nxt_buf_t               *b;
734     nxt_router_temp_conf_t  *tmcf;
735 
736     tmcf = nxt_router_temp_conf(task);
737     if (nxt_slow_path(tmcf == NULL)) {
738         return;
739     }
740 
741     nxt_debug(task, "nxt_router_conf_data_handler(%O): %*s",
742               nxt_buf_used_size(msg->buf),
743               (size_t) nxt_buf_used_size(msg->buf), msg->buf->mem.pos);
744 
745     b = nxt_buf_chk_make_plain(tmcf->conf->mem_pool, msg->buf, msg->size);
746 
747     nxt_assert(b != NULL);
748 
749     tmcf->conf->router = nxt_router;
750     tmcf->stream = msg->port_msg.stream;
751     tmcf->port = nxt_runtime_port_find(task->thread->runtime,
752                                        msg->port_msg.pid,
753                                        msg->port_msg.reply_port);
754 
755     ret = nxt_router_conf_create(task, tmcf, b->mem.pos, b->mem.free);
756 
757     if (nxt_fast_path(ret == NXT_OK)) {
758         nxt_router_conf_apply(task, tmcf, NULL);
759 
760     } else {
761         nxt_router_conf_error(task, tmcf);
762     }
763 }
764 
765 
766 static void
767 nxt_router_app_process_remove_pid(nxt_task_t *task, nxt_port_t *port,
768     void *data)
769 {
770     union {
771         nxt_pid_t  removed_pid;
772         void       *data;
773     } u;
774 
775     u.data = data;
776 
777     nxt_port_rpc_remove_peer(task, port, u.removed_pid);
778 }
779 
780 
781 void
782 nxt_router_remove_pid_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
783 {
784     nxt_event_engine_t  *engine;
785 
786     nxt_port_remove_pid_handler(task, msg);
787 
788     if (msg->port_msg.stream == 0) {
789         return;
790     }
791 
792     nxt_queue_each(engine, &nxt_router->engines, nxt_event_engine_t, link0)
793     {
794         nxt_port_post(task, engine->port, nxt_router_app_process_remove_pid,
795                       msg->u.data);
796     }
797     nxt_queue_loop;
798 
799     msg->port_msg.type = _NXT_PORT_MSG_RPC_ERROR;
800 
801     nxt_port_rpc_handler(task, msg);
802 }
803 
804 
805 static nxt_router_temp_conf_t *
806 nxt_router_temp_conf(nxt_task_t *task)
807 {
808     nxt_mp_t                *mp, *tmp;
809     nxt_router_conf_t       *rtcf;
810     nxt_router_temp_conf_t  *tmcf;
811 
812     mp = nxt_mp_create(1024, 128, 256, 32);
813     if (nxt_slow_path(mp == NULL)) {
814         return NULL;
815     }
816 
817     rtcf = nxt_mp_zget(mp, sizeof(nxt_router_conf_t));
818     if (nxt_slow_path(rtcf == NULL)) {
819         goto fail;
820     }
821 
822     rtcf->mem_pool = mp;
823 
824     tmp = nxt_mp_create(1024, 128, 256, 32);
825     if (nxt_slow_path(tmp == NULL)) {
826         goto fail;
827     }
828 
829     tmcf = nxt_mp_zget(tmp, sizeof(nxt_router_temp_conf_t));
830     if (nxt_slow_path(tmcf == NULL)) {
831         goto temp_fail;
832     }
833 
834     tmcf->mem_pool = tmp;
835     tmcf->conf = rtcf;
836     tmcf->count = 1;
837     tmcf->engine = task->thread->engine;
838 
839     tmcf->engines = nxt_array_create(tmcf->mem_pool, 4,
840                                      sizeof(nxt_router_engine_conf_t));
841     if (nxt_slow_path(tmcf->engines == NULL)) {
842         goto temp_fail;
843     }
844 
845     nxt_queue_init(&tmcf->deleting);
846     nxt_queue_init(&tmcf->keeping);
847     nxt_queue_init(&tmcf->updating);
848     nxt_queue_init(&tmcf->pending);
849     nxt_queue_init(&tmcf->creating);
850 
851     nxt_queue_init(&tmcf->apps);
852     nxt_queue_init(&tmcf->previous);
853 
854     return tmcf;
855 
856 temp_fail:
857 
858     nxt_mp_destroy(tmp);
859 
860 fail:
861 
862     nxt_mp_destroy(mp);
863 
864     return NULL;
865 }
866 
867 
868 nxt_inline nxt_bool_t
869 nxt_router_app_can_start(nxt_app_t *app)
870 {
871     return app->processes + app->pending_processes < app->max_processes
872             && app->pending_processes < app->max_pending_processes;
873 }
874 
875 
876 nxt_inline nxt_bool_t
877 nxt_router_app_need_start(nxt_app_t *app)
878 {
879     return app->idle_processes + app->pending_processes
880             < app->spare_processes;
881 }
882 
883 
884 static void
885 nxt_router_conf_apply(nxt_task_t *task, void *obj, void *data)
886 {
887     nxt_int_t                    ret;
888     nxt_app_t                    *app;
889     nxt_router_t                 *router;
890     nxt_runtime_t                *rt;
891     nxt_queue_link_t             *qlk;
892     nxt_socket_conf_t            *skcf;
893     nxt_router_temp_conf_t       *tmcf;
894     const nxt_event_interface_t  *interface;
895 
896     tmcf = obj;
897 
898     qlk = nxt_queue_first(&tmcf->pending);
899 
900     if (qlk != nxt_queue_tail(&tmcf->pending)) {
901         nxt_queue_remove(qlk);
902         nxt_queue_insert_tail(&tmcf->creating, qlk);
903 
904         skcf = nxt_queue_link_data(qlk, nxt_socket_conf_t, link);
905 
906         nxt_router_listen_socket_rpc_create(task, tmcf, skcf);
907 
908         return;
909     }
910 
911     nxt_queue_each(app, &tmcf->apps, nxt_app_t, link) {
912 
913         if (nxt_router_app_need_start(app)) {
914             nxt_router_app_rpc_create(task, tmcf, app);
915             return;
916         }
917 
918     } nxt_queue_loop;
919 
920     rt = task->thread->runtime;
921 
922     interface = nxt_service_get(rt->services, "engine", NULL);
923 
924     router = tmcf->conf->router;
925 
926     ret = nxt_router_engines_create(task, router, tmcf, interface);
927     if (nxt_slow_path(ret != NXT_OK)) {
928         goto fail;
929     }
930 
931     ret = nxt_router_threads_create(task, rt, tmcf);
932     if (nxt_slow_path(ret != NXT_OK)) {
933         goto fail;
934     }
935 
936     nxt_router_apps_sort(task, router, tmcf);
937 
938     nxt_router_engines_post(router, tmcf);
939 
940     nxt_queue_add(&router->sockets, &tmcf->updating);
941     nxt_queue_add(&router->sockets, &tmcf->creating);
942 
943     nxt_router_conf_ready(task, tmcf);
944 
945     return;
946 
947 fail:
948 
949     nxt_router_conf_error(task, tmcf);
950 
951     return;
952 }
953 
954 
955 static void
956 nxt_router_conf_wait(nxt_task_t *task, void *obj, void *data)
957 {
958     nxt_joint_job_t  *job;
959 
960     job = obj;
961 
962     nxt_router_conf_ready(task, job->tmcf);
963 }
964 
965 
966 static void
967 nxt_router_conf_ready(nxt_task_t *task, nxt_router_temp_conf_t *tmcf)
968 {
969     nxt_debug(task, "temp conf count:%D", tmcf->count);
970 
971     if (--tmcf->count == 0) {
972         nxt_router_conf_send(task, tmcf, NXT_PORT_MSG_RPC_READY_LAST);
973     }
974 }
975 
976 
977 static void
978 nxt_router_conf_error(nxt_task_t *task, nxt_router_temp_conf_t *tmcf)
979 {
980     nxt_app_t          *app;
981     nxt_socket_t       s;
982     nxt_router_t       *router;
983     nxt_queue_link_t   *qlk;
984     nxt_socket_conf_t  *skcf;
985 
986     nxt_log(task, NXT_LOG_CRIT, "failed to apply new conf");
987 
988     for (qlk = nxt_queue_first(&tmcf->creating);
989          qlk != nxt_queue_tail(&tmcf->creating);
990          qlk = nxt_queue_next(qlk))
991     {
992         skcf = nxt_queue_link_data(qlk, nxt_socket_conf_t, link);
993         s = skcf->listen->socket;
994 
995         if (s != -1) {
996             nxt_socket_close(task, s);
997         }
998 
999         nxt_free(skcf->listen);
1000     }
1001 
1002     nxt_queue_each(app, &tmcf->apps, nxt_app_t, link) {
1003 
1004         nxt_router_app_quit(task, app);
1005 
1006     } nxt_queue_loop;
1007 
1008     router = tmcf->conf->router;
1009 
1010     nxt_queue_add(&router->sockets, &tmcf->keeping);
1011     nxt_queue_add(&router->sockets, &tmcf->deleting);
1012 
1013     nxt_queue_add(&router->apps, &tmcf->previous);
1014 
1015     // TODO: new engines and threads
1016 
1017     nxt_mp_destroy(tmcf->conf->mem_pool);
1018 
1019     nxt_router_conf_send(task, tmcf, NXT_PORT_MSG_RPC_ERROR);
1020 }
1021 
1022 
1023 static void
1024 nxt_router_conf_send(nxt_task_t *task, nxt_router_temp_conf_t *tmcf,
1025     nxt_port_msg_type_t type)
1026 {
1027     nxt_port_socket_write(task, tmcf->port, type, -1, tmcf->stream, 0, NULL);
1028 }
1029 
1030 
1031 static nxt_conf_map_t  nxt_router_conf[] = {
1032     {
1033         nxt_string("listeners_threads"),
1034         NXT_CONF_MAP_INT32,
1035         offsetof(nxt_router_conf_t, threads),
1036     },
1037 };
1038 
1039 
1040 static nxt_conf_map_t  nxt_router_app_conf[] = {
1041     {
1042         nxt_string("type"),
1043         NXT_CONF_MAP_STR,
1044         offsetof(nxt_router_app_conf_t, type),
1045     },
1046 
1047     {
1048         nxt_string("limits"),
1049         NXT_CONF_MAP_PTR,
1050         offsetof(nxt_router_app_conf_t, limits_value),
1051     },
1052 
1053     {
1054         nxt_string("processes"),
1055         NXT_CONF_MAP_INT32,
1056         offsetof(nxt_router_app_conf_t, processes),
1057     },
1058 
1059     {
1060         nxt_string("processes"),
1061         NXT_CONF_MAP_PTR,
1062         offsetof(nxt_router_app_conf_t, processes_value),
1063     },
1064 };
1065 
1066 
1067 static nxt_conf_map_t  nxt_router_app_limits_conf[] = {
1068     {
1069         nxt_string("timeout"),
1070         NXT_CONF_MAP_MSEC,
1071         offsetof(nxt_router_app_conf_t, timeout),
1072     },
1073 
1074     {
1075         nxt_string("reschedule_timeout"),
1076         NXT_CONF_MAP_MSEC,
1077         offsetof(nxt_router_app_conf_t, res_timeout),
1078     },
1079 
1080     {
1081         nxt_string("requests"),
1082         NXT_CONF_MAP_INT32,
1083         offsetof(nxt_router_app_conf_t, requests),
1084     },
1085 };
1086 
1087 
1088 static nxt_conf_map_t  nxt_router_app_processes_conf[] = {
1089     {
1090         nxt_string("spare"),
1091         NXT_CONF_MAP_INT32,
1092         offsetof(nxt_router_app_conf_t, spare_processes),
1093     },
1094 
1095     {
1096         nxt_string("max"),
1097         NXT_CONF_MAP_INT32,
1098         offsetof(nxt_router_app_conf_t, max_processes),
1099     },
1100 
1101     {
1102         nxt_string("idle_timeout"),
1103         NXT_CONF_MAP_MSEC,
1104         offsetof(nxt_router_app_conf_t, idle_timeout),
1105     },
1106 };
1107 
1108 
1109 static nxt_conf_map_t  nxt_router_listener_conf[] = {
1110     {
1111         nxt_string("application"),
1112         NXT_CONF_MAP_STR,
1113         offsetof(nxt_router_listener_conf_t, application),
1114     },
1115 };
1116 
1117 
1118 static nxt_conf_map_t  nxt_router_http_conf[] = {
1119     {
1120         nxt_string("header_buffer_size"),
1121         NXT_CONF_MAP_SIZE,
1122         offsetof(nxt_socket_conf_t, header_buffer_size),
1123     },
1124 
1125     {
1126         nxt_string("large_header_buffer_size"),
1127         NXT_CONF_MAP_SIZE,
1128         offsetof(nxt_socket_conf_t, large_header_buffer_size),
1129     },
1130 
1131     {
1132         nxt_string("large_header_buffers"),
1133         NXT_CONF_MAP_SIZE,
1134         offsetof(nxt_socket_conf_t, large_header_buffers),
1135     },
1136 
1137     {
1138         nxt_string("body_buffer_size"),
1139         NXT_CONF_MAP_SIZE,
1140         offsetof(nxt_socket_conf_t, body_buffer_size),
1141     },
1142 
1143     {
1144         nxt_string("max_body_size"),
1145         NXT_CONF_MAP_SIZE,
1146         offsetof(nxt_socket_conf_t, max_body_size),
1147     },
1148 
1149     {
1150         nxt_string("idle_timeout"),
1151         NXT_CONF_MAP_MSEC,
1152         offsetof(nxt_socket_conf_t, idle_timeout),
1153     },
1154 
1155     {
1156         nxt_string("header_read_timeout"),
1157         NXT_CONF_MAP_MSEC,
1158         offsetof(nxt_socket_conf_t, header_read_timeout),
1159     },
1160 
1161     {
1162         nxt_string("body_read_timeout"),
1163         NXT_CONF_MAP_MSEC,
1164         offsetof(nxt_socket_conf_t, body_read_timeout),
1165     },
1166 
1167     {
1168         nxt_string("send_timeout"),
1169         NXT_CONF_MAP_MSEC,
1170         offsetof(nxt_socket_conf_t, send_timeout),
1171     },
1172 };
1173 
1174 
1175 static nxt_int_t
1176 nxt_router_conf_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf,
1177     u_char *start, u_char *end)
1178 {
1179     u_char                      *p;
1180     size_t                      size;
1181     nxt_mp_t                    *mp;
1182     uint32_t                    next;
1183     nxt_int_t                   ret;
1184     nxt_str_t                   name;
1185     nxt_app_t                   *app, *prev;
1186     nxt_router_t                *router;
1187     nxt_conf_value_t            *conf, *http;
1188     nxt_conf_value_t            *applications, *application;
1189     nxt_conf_value_t            *listeners, *listener;
1190     nxt_socket_conf_t           *skcf;
1191     nxt_event_engine_t          *engine;
1192     nxt_app_lang_module_t       *lang;
1193     nxt_router_app_conf_t       apcf;
1194     nxt_router_listener_conf_t  lscf;
1195 
1196     static nxt_str_t  http_path = nxt_string("/http");
1197     static nxt_str_t  applications_path = nxt_string("/applications");
1198     static nxt_str_t  listeners_path = nxt_string("/listeners");
1199 
1200     conf = nxt_conf_json_parse(tmcf->mem_pool, start, end, NULL);
1201     if (conf == NULL) {
1202         nxt_log(task, NXT_LOG_CRIT, "configuration parsing error");
1203         return NXT_ERROR;
1204     }
1205 
1206     mp = tmcf->conf->mem_pool;
1207 
1208     ret = nxt_conf_map_object(mp, conf, nxt_router_conf,
1209                               nxt_nitems(nxt_router_conf), tmcf->conf);
1210     if (ret != NXT_OK) {
1211         nxt_log(task, NXT_LOG_CRIT, "root map error");
1212         return NXT_ERROR;
1213     }
1214 
1215     if (tmcf->conf->threads == 0) {
1216         tmcf->conf->threads = nxt_ncpu;
1217     }
1218 
1219     applications = nxt_conf_get_path(conf, &applications_path);
1220     if (applications == NULL) {
1221         nxt_log(task, NXT_LOG_CRIT, "no \"applications\" block");
1222         return NXT_ERROR;
1223     }
1224 
1225     router = tmcf->conf->router;
1226 
1227     next = 0;
1228 
1229     for ( ;; ) {
1230         application = nxt_conf_next_object_member(applications, &name, &next);
1231         if (application == NULL) {
1232             break;
1233         }
1234 
1235         nxt_debug(task, "application \"%V\"", &name);
1236 
1237         size = nxt_conf_json_length(application, NULL);
1238 
1239         app = nxt_malloc(sizeof(nxt_app_t) + name.length + size);
1240         if (app == NULL) {
1241             goto fail;
1242         }
1243 
1244         nxt_memzero(app, sizeof(nxt_app_t));
1245 
1246         app->name.start = nxt_pointer_to(app, sizeof(nxt_app_t));
1247         app->conf.start = nxt_pointer_to(app, sizeof(nxt_app_t) + name.length);
1248 
1249         p = nxt_conf_json_print(app->conf.start, application, NULL);
1250         app->conf.length = p - app->conf.start;
1251 
1252         nxt_assert(app->conf.length <= size);
1253 
1254         nxt_debug(task, "application conf \"%V\"", &app->conf);
1255 
1256         prev = nxt_router_app_find(&router->apps, &name);
1257 
1258         if (prev != NULL && nxt_strstr_eq(&app->conf, &prev->conf)) {
1259             nxt_free(app);
1260 
1261             nxt_queue_remove(&prev->link);
1262             nxt_queue_insert_tail(&tmcf->previous, &prev->link);
1263             continue;
1264         }
1265 
1266         apcf.processes = 1;
1267         apcf.max_processes = 1;
1268         apcf.spare_processes = 1;
1269         apcf.timeout = 0;
1270         apcf.res_timeout = 1000;
1271         apcf.idle_timeout = 15000;
1272         apcf.requests = 0;
1273         apcf.limits_value = NULL;
1274         apcf.processes_value = NULL;
1275 
1276         ret = nxt_conf_map_object(mp, application, nxt_router_app_conf,
1277                                   nxt_nitems(nxt_router_app_conf), &apcf);
1278         if (ret != NXT_OK) {
1279             nxt_log(task, NXT_LOG_CRIT, "application map error");
1280             goto app_fail;
1281         }
1282 
1283         if (apcf.limits_value != NULL) {
1284 
1285             if (nxt_conf_type(apcf.limits_value) != NXT_CONF_OBJECT) {
1286                 nxt_log(task, NXT_LOG_CRIT, "application limits is not object");
1287                 goto app_fail;
1288             }
1289 
1290             ret = nxt_conf_map_object(mp, apcf.limits_value,
1291                                       nxt_router_app_limits_conf,
1292                                       nxt_nitems(nxt_router_app_limits_conf),
1293                                       &apcf);
1294             if (ret != NXT_OK) {
1295                 nxt_log(task, NXT_LOG_CRIT, "application limits map error");
1296                 goto app_fail;
1297             }
1298         }
1299 
1300         if (apcf.processes_value != NULL
1301             && nxt_conf_type(apcf.processes_value) == NXT_CONF_OBJECT)
1302         {
1303             ret = nxt_conf_map_object(mp, apcf.processes_value,
1304                                       nxt_router_app_processes_conf,
1305                                       nxt_nitems(nxt_router_app_processes_conf),
1306                                       &apcf);
1307             if (ret != NXT_OK) {
1308                 nxt_log(task, NXT_LOG_CRIT, "application processes map error");
1309                 goto app_fail;
1310             }
1311 
1312         } else {
1313             apcf.max_processes = apcf.processes;
1314             apcf.spare_processes = apcf.processes;
1315         }
1316 
1317         nxt_debug(task, "application type: %V", &apcf.type);
1318         nxt_debug(task, "application processes: %D", apcf.processes);
1319         nxt_debug(task, "application request timeout: %M", apcf.timeout);
1320         nxt_debug(task, "application reschedule timeout: %M", apcf.res_timeout);
1321         nxt_debug(task, "application requests: %D", apcf.requests);
1322 
1323         lang = nxt_app_lang_module(task->thread->runtime, &apcf.type);
1324 
1325         if (lang == NULL) {
1326             nxt_log(task, NXT_LOG_CRIT, "unknown application type: \"%V\"",
1327                     &apcf.type);
1328             goto app_fail;
1329         }
1330 
1331         nxt_debug(task, "application language module: \"%s\"", lang->file);
1332 
1333         ret = nxt_thread_mutex_create(&app->mutex);
1334         if (ret != NXT_OK) {
1335             goto app_fail;
1336         }
1337 
1338         nxt_queue_init(&app->ports);
1339         nxt_queue_init(&app->spare_ports);
1340         nxt_queue_init(&app->idle_ports);
1341         nxt_queue_init(&app->requests);
1342         nxt_queue_init(&app->pending);
1343 
1344         app->name.length = name.length;
1345         nxt_memcpy(app->name.start, name.start, name.length);
1346 
1347         app->type = lang->type;
1348         app->max_processes = apcf.max_processes;
1349         app->spare_processes = apcf.spare_processes;
1350         app->max_pending_processes = apcf.spare_processes
1351                                       ? apcf.spare_processes : 1;
1352         app->timeout = apcf.timeout;
1353         app->res_timeout = apcf.res_timeout * 1000000;
1354         app->idle_timeout = apcf.idle_timeout;
1355         app->live = 1;
1356         app->max_pending_responses = 2;
1357         app->max_requests = apcf.requests;
1358         app->prepare_msg = nxt_app_prepare_msg[lang->type];
1359 
1360         engine = task->thread->engine;
1361 
1362         app->engine = engine;
1363 
1364         app->idle_timer.precision = NXT_TIMER_DEFAULT_PRECISION;
1365         app->idle_timer.work_queue = &engine->fast_work_queue;
1366         app->idle_timer.handler = nxt_router_app_idle_timeout;
1367         app->idle_timer.task = &engine->task;
1368         app->idle_timer.log = app->idle_timer.task->log;
1369 
1370         app->adjust_idle_work.handler = nxt_router_adjust_idle_timer;
1371         app->adjust_idle_work.task = &engine->task;
1372         app->adjust_idle_work.obj = app;
1373 
1374         nxt_queue_insert_tail(&tmcf->apps, &app->link);
1375 
1376         nxt_router_app_use(task, app, 1);
1377     }
1378 
1379     http = nxt_conf_get_path(conf, &http_path);
1380 #if 0
1381     if (http == NULL) {
1382         nxt_log(task, NXT_LOG_CRIT, "no \"http\" block");
1383         return NXT_ERROR;
1384     }
1385 #endif
1386 
1387     listeners = nxt_conf_get_path(conf, &listeners_path);
1388     if (listeners == NULL) {
1389         nxt_log(task, NXT_LOG_CRIT, "no \"listeners\" block");
1390         return NXT_ERROR;
1391     }
1392 
1393     next = 0;
1394 
1395     for ( ;; ) {
1396         listener = nxt_conf_next_object_member(listeners, &name, &next);
1397         if (listener == NULL) {
1398             break;
1399         }
1400 
1401         skcf = nxt_router_socket_conf(task, tmcf, &name);
1402         if (skcf == NULL) {
1403             goto fail;
1404         }
1405 
1406         ret = nxt_conf_map_object(mp, listener, nxt_router_listener_conf,
1407                                   nxt_nitems(nxt_router_listener_conf), &lscf);
1408         if (ret != NXT_OK) {
1409             nxt_log(task, NXT_LOG_CRIT, "listener map error");
1410             goto fail;
1411         }
1412 
1413         nxt_debug(task, "application: %V", &lscf.application);
1414 
1415         // STUB, default values if http block is not defined.
1416         skcf->header_buffer_size = 2048;
1417         skcf->large_header_buffer_size = 8192;
1418         skcf->large_header_buffers = 4;
1419         skcf->body_buffer_size = 16 * 1024;
1420         skcf->max_body_size = 2 * 1024 * 1024;
1421         skcf->idle_timeout = 65000;
1422         skcf->header_read_timeout = 5000;
1423         skcf->body_read_timeout = 5000;
1424         skcf->send_timeout = 5000;
1425 
1426         if (http != NULL) {
1427             ret = nxt_conf_map_object(mp, http, nxt_router_http_conf,
1428                                       nxt_nitems(nxt_router_http_conf), skcf);
1429             if (ret != NXT_OK) {
1430                 nxt_log(task, NXT_LOG_CRIT, "http map error");
1431                 goto fail;
1432             }
1433         }
1434 
1435         skcf->listen->handler = nxt_http_conn_init;
1436         skcf->router_conf = tmcf->conf;
1437         skcf->router_conf->count++;
1438         skcf->application = nxt_router_listener_application(tmcf,
1439                                                             &lscf.application);
1440     }
1441 
1442     nxt_queue_add(&tmcf->deleting, &router->sockets);
1443     nxt_queue_init(&router->sockets);
1444 
1445     return NXT_OK;
1446 
1447 app_fail:
1448 
1449     nxt_free(app);
1450 
1451 fail:
1452 
1453     nxt_queue_each(app, &tmcf->apps, nxt_app_t, link) {
1454 
1455         nxt_queue_remove(&app->link);
1456         nxt_thread_mutex_destroy(&app->mutex);
1457         nxt_free(app);
1458 
1459     } nxt_queue_loop;
1460 
1461     return NXT_ERROR;
1462 }
1463 
1464 
1465 static nxt_app_t *
1466 nxt_router_app_find(nxt_queue_t *queue, nxt_str_t *name)
1467 {
1468     nxt_app_t  *app;
1469 
1470     nxt_queue_each(app, queue, nxt_app_t, link) {
1471 
1472         if (nxt_strstr_eq(name, &app->name)) {
1473             return app;
1474         }
1475 
1476     } nxt_queue_loop;
1477 
1478     return NULL;
1479 }
1480 
1481 
1482 static nxt_app_t *
1483 nxt_router_listener_application(nxt_router_temp_conf_t *tmcf, nxt_str_t *name)
1484 {
1485     nxt_app_t  *app;
1486 
1487     app = nxt_router_app_find(&tmcf->apps, name);
1488 
1489     if (app == NULL) {
1490         app = nxt_router_app_find(&tmcf->previous, name);
1491     }
1492 
1493     return app;
1494 }
1495 
1496 
1497 static nxt_socket_conf_t *
1498 nxt_router_socket_conf(nxt_task_t *task, nxt_router_temp_conf_t *tmcf,
1499     nxt_str_t *name)
1500 {
1501     size_t               size;
1502     nxt_int_t            ret;
1503     nxt_bool_t           wildcard;
1504     nxt_sockaddr_t       *sa;
1505     nxt_socket_conf_t    *skcf;
1506     nxt_listen_socket_t  *ls;
1507 
1508     sa = nxt_sockaddr_parse(tmcf->mem_pool, name);
1509     if (nxt_slow_path(sa == NULL)) {
1510         nxt_log(task, NXT_LOG_CRIT, "invalid listener \"%V\"", name);
1511         return NULL;
1512     }
1513 
1514     sa->type = SOCK_STREAM;
1515 
1516     nxt_debug(task, "router listener: \"%*s\"",
1517               (size_t) sa->length, nxt_sockaddr_start(sa));
1518 
1519     skcf = nxt_mp_zget(tmcf->conf->mem_pool, sizeof(nxt_socket_conf_t));
1520     if (nxt_slow_path(skcf == NULL)) {
1521         return NULL;
1522     }
1523 
1524     size = nxt_sockaddr_size(sa);
1525 
1526     ret = nxt_router_listen_socket_find(tmcf, skcf, sa);
1527 
1528     if (ret != NXT_OK) {
1529 
1530         ls = nxt_zalloc(sizeof(nxt_listen_socket_t) + size);
1531         if (nxt_slow_path(ls == NULL)) {
1532             return NULL;
1533         }
1534 
1535         skcf->listen = ls;
1536 
1537         ls->sockaddr = nxt_pointer_to(ls, sizeof(nxt_listen_socket_t));
1538         nxt_memcpy(ls->sockaddr, sa, size);
1539 
1540         nxt_listen_socket_remote_size(ls);
1541 
1542         ls->socket = -1;
1543         ls->backlog = NXT_LISTEN_BACKLOG;
1544         ls->flags = NXT_NONBLOCK;
1545         ls->read_after_accept = 1;
1546     }
1547 
1548     switch (sa->u.sockaddr.sa_family) {
1549 #if (NXT_HAVE_UNIX_DOMAIN)
1550     case AF_UNIX:
1551         wildcard = 0;
1552         break;
1553 #endif
1554 #if (NXT_INET6)
1555     case AF_INET6:
1556         wildcard = IN6_IS_ADDR_UNSPECIFIED(&sa->u.sockaddr_in6.sin6_addr);
1557         break;
1558 #endif
1559     case AF_INET:
1560     default:
1561         wildcard = (sa->u.sockaddr_in.sin_addr.s_addr == INADDR_ANY);
1562         break;
1563     }
1564 
1565     if (!wildcard) {
1566         skcf->sockaddr = nxt_mp_zget(tmcf->conf->mem_pool, size);
1567         if (nxt_slow_path(skcf->sockaddr == NULL)) {
1568             return NULL;
1569         }
1570 
1571         nxt_memcpy(skcf->sockaddr, sa, size);
1572     }
1573 
1574     return skcf;
1575 }
1576 
1577 
1578 static nxt_int_t
1579 nxt_router_listen_socket_find(nxt_router_temp_conf_t *tmcf,
1580     nxt_socket_conf_t *nskcf, nxt_sockaddr_t *sa)
1581 {
1582     nxt_router_t       *router;
1583     nxt_queue_link_t   *qlk;
1584     nxt_socket_conf_t  *skcf;
1585 
1586     router = tmcf->conf->router;
1587 
1588     for (qlk = nxt_queue_first(&router->sockets);
1589          qlk != nxt_queue_tail(&router->sockets);
1590          qlk = nxt_queue_next(qlk))
1591     {
1592         skcf = nxt_queue_link_data(qlk, nxt_socket_conf_t, link);
1593 
1594         if (nxt_sockaddr_cmp(skcf->listen->sockaddr, sa)) {
1595             nskcf->listen = skcf->listen;
1596 
1597             nxt_queue_remove(qlk);
1598             nxt_queue_insert_tail(&tmcf->keeping, qlk);
1599 
1600             nxt_queue_insert_tail(&tmcf->updating, &nskcf->link);
1601 
1602             return NXT_OK;
1603         }
1604     }
1605 
1606     nxt_queue_insert_tail(&tmcf->pending, &nskcf->link);
1607 
1608     return NXT_DECLINED;
1609 }
1610 
1611 
1612 static void
1613 nxt_router_listen_socket_rpc_create(nxt_task_t *task,
1614     nxt_router_temp_conf_t *tmcf, nxt_socket_conf_t *skcf)
1615 {
1616     size_t            size;
1617     uint32_t          stream;
1618     nxt_buf_t         *b;
1619     nxt_port_t        *main_port, *router_port;
1620     nxt_runtime_t     *rt;
1621     nxt_socket_rpc_t  *rpc;
1622 
1623     rpc = nxt_mp_alloc(tmcf->mem_pool, sizeof(nxt_socket_rpc_t));
1624     if (rpc == NULL) {
1625         goto fail;
1626     }
1627 
1628     rpc->socket_conf = skcf;
1629     rpc->temp_conf = tmcf;
1630 
1631     size = nxt_sockaddr_size(skcf->listen->sockaddr);
1632 
1633     b = nxt_buf_mem_alloc(tmcf->mem_pool, size, 0);
1634     if (b == NULL) {
1635         goto fail;
1636     }
1637 
1638     b->mem.free = nxt_cpymem(b->mem.free, skcf->listen->sockaddr, size);
1639 
1640     rt = task->thread->runtime;
1641     main_port = rt->port_by_type[NXT_PROCESS_MAIN];
1642     router_port = rt->port_by_type[NXT_PROCESS_ROUTER];
1643 
1644     stream = nxt_port_rpc_register_handler(task, router_port,
1645                                            nxt_router_listen_socket_ready,
1646                                            nxt_router_listen_socket_error,
1647                                            main_port->pid, rpc);
1648     if (stream == 0) {
1649         goto fail;
1650     }
1651 
1652     nxt_port_socket_write(task, main_port, NXT_PORT_MSG_SOCKET, -1,
1653                           stream, router_port->id, b);
1654 
1655     return;
1656 
1657 fail:
1658 
1659     nxt_router_conf_error(task, tmcf);
1660 }
1661 
1662 
1663 static void
1664 nxt_router_listen_socket_ready(nxt_task_t *task, nxt_port_recv_msg_t *msg,
1665     void *data)
1666 {
1667     nxt_int_t         ret;
1668     nxt_socket_t      s;
1669     nxt_socket_rpc_t  *rpc;
1670 
1671     rpc = data;
1672 
1673     s = msg->fd;
1674 
1675     ret = nxt_socket_nonblocking(task, s);
1676     if (nxt_slow_path(ret != NXT_OK)) {
1677         goto fail;
1678     }
1679 
1680     nxt_socket_defer_accept(task, s, rpc->socket_conf->listen->sockaddr);
1681 
1682     ret = nxt_listen_socket(task, s, NXT_LISTEN_BACKLOG);
1683     if (nxt_slow_path(ret != NXT_OK)) {
1684         goto fail;
1685     }
1686 
1687     rpc->socket_conf->listen->socket = s;
1688 
1689     nxt_work_queue_add(&task->thread->engine->fast_work_queue,
1690                        nxt_router_conf_apply, task, rpc->temp_conf, NULL);
1691 
1692     return;
1693 
1694 fail:
1695 
1696     nxt_socket_close(task, s);
1697 
1698     nxt_router_conf_error(task, rpc->temp_conf);
1699 }
1700 
1701 
1702 static void
1703 nxt_router_listen_socket_error(nxt_task_t *task, nxt_port_recv_msg_t *msg,
1704     void *data)
1705 {
1706     u_char                  *p;
1707     size_t                  size;
1708     uint8_t                 error;
1709     nxt_buf_t               *in, *out;
1710     nxt_sockaddr_t          *sa;
1711     nxt_socket_rpc_t        *rpc;
1712     nxt_router_temp_conf_t  *tmcf;
1713 
1714     static nxt_str_t  socket_errors[] = {
1715         nxt_string("ListenerSystem"),
1716         nxt_string("ListenerNoIPv6"),
1717         nxt_string("ListenerPort"),
1718         nxt_string("ListenerInUse"),
1719         nxt_string("ListenerNoAddress"),
1720         nxt_string("ListenerNoAccess"),
1721         nxt_string("ListenerPath"),
1722     };
1723 
1724     rpc = data;
1725     sa = rpc->socket_conf->listen->sockaddr;
1726     tmcf = rpc->temp_conf;
1727 
1728     in = nxt_buf_chk_make_plain(tmcf->mem_pool, msg->buf, msg->size);
1729 
1730     nxt_assert(in != NULL);
1731 
1732     p = in->mem.pos;
1733 
1734     error = *p++;
1735 
1736     size = sizeof("listen socket error: ") - 1
1737            + sizeof("{listener: \"\", code:\"\", message: \"\"}") - 1
1738            + sa->length + socket_errors[error].length + (in->mem.free - p);
1739 
1740     out = nxt_buf_mem_alloc(tmcf->mem_pool, size, 0);
1741     if (nxt_slow_path(out == NULL)) {
1742         return;
1743     }
1744 
1745     out->mem.free = nxt_sprintf(out->mem.free, out->mem.end,
1746                         "listen socket error: "
1747                         "{listener: \"%*s\", code:\"%V\", message: \"%*s\"}",
1748                         (size_t) sa->length, nxt_sockaddr_start(sa),
1749                         &socket_errors[error], in->mem.free - p, p);
1750 
1751     nxt_debug(task, "%*s", out->mem.free - out->mem.pos, out->mem.pos);
1752 
1753     nxt_router_conf_error(task, tmcf);
1754 }
1755 
1756 
1757 static void
1758 nxt_router_app_rpc_create(nxt_task_t *task,
1759     nxt_router_temp_conf_t *tmcf, nxt_app_t *app)
1760 {
1761     size_t         size;
1762     uint32_t       stream;
1763     nxt_buf_t      *b;
1764     nxt_port_t     *main_port, *router_port;
1765     nxt_runtime_t  *rt;
1766     nxt_app_rpc_t  *rpc;
1767 
1768     rpc = nxt_mp_alloc(tmcf->mem_pool, sizeof(nxt_app_rpc_t));
1769     if (rpc == NULL) {
1770         goto fail;
1771     }
1772 
1773     rpc->app = app;
1774     rpc->temp_conf = tmcf;
1775 
1776     nxt_debug(task, "app '%V' prefork", &app->name);
1777 
1778     size = app->name.length + 1 + app->conf.length;
1779 
1780     b = nxt_buf_mem_alloc(tmcf->mem_pool, size, 0);
1781     if (nxt_slow_path(b == NULL)) {
1782         goto fail;
1783     }
1784 
1785     nxt_buf_cpystr(b, &app->name);
1786     *b->mem.free++ = '\0';
1787     nxt_buf_cpystr(b, &app->conf);
1788 
1789     rt = task->thread->runtime;
1790     main_port = rt->port_by_type[NXT_PROCESS_MAIN];
1791     router_port = rt->port_by_type[NXT_PROCESS_ROUTER];
1792 
1793     stream = nxt_port_rpc_register_handler(task, router_port,
1794                                            nxt_router_app_prefork_ready,
1795                                            nxt_router_app_prefork_error,
1796                                            -1, rpc);
1797     if (nxt_slow_path(stream == 0)) {
1798         goto fail;
1799     }
1800 
1801     app->pending_processes++;
1802 
1803     nxt_port_socket_write(task, main_port, NXT_PORT_MSG_START_WORKER, -1,
1804                           stream, router_port->id, b);
1805 
1806     return;
1807 
1808 fail:
1809 
1810     nxt_router_conf_error(task, tmcf);
1811 }
1812 
1813 
1814 static void
1815 nxt_router_app_prefork_ready(nxt_task_t *task, nxt_port_recv_msg_t *msg,
1816     void *data)
1817 {
1818     nxt_app_t           *app;
1819     nxt_port_t          *port;
1820     nxt_app_rpc_t       *rpc;
1821     nxt_event_engine_t  *engine;
1822 
1823     rpc = data;
1824     app = rpc->app;
1825 
1826     port = msg->u.new_port;
1827     port->app = app;
1828 
1829     nxt_router_app_use(task, app, 1);
1830 
1831     app->pending_processes--;
1832     app->processes++;
1833     app->idle_processes++;
1834 
1835     engine = task->thread->engine;
1836 
1837     nxt_queue_insert_tail(&app->ports, &port->app_link);
1838     nxt_queue_insert_tail(&app->spare_ports, &port->idle_link);
1839 
1840     port->idle_start = 0;
1841 
1842     nxt_port_inc_use(port);
1843 
1844     nxt_work_queue_add(&engine->fast_work_queue,
1845                        nxt_router_conf_apply, task, rpc->temp_conf, NULL);
1846 }
1847 
1848 
1849 static void
1850 nxt_router_app_prefork_error(nxt_task_t *task, nxt_port_recv_msg_t *msg,
1851     void *data)
1852 {
1853     nxt_app_t               *app;
1854     nxt_app_rpc_t           *rpc;
1855     nxt_router_temp_conf_t  *tmcf;
1856 
1857     rpc = data;
1858     app = rpc->app;
1859     tmcf = rpc->temp_conf;
1860 
1861     nxt_log(task, NXT_LOG_WARN, "failed to start application \"%V\"",
1862             &app->name);
1863 
1864     app->pending_processes--;
1865 
1866     nxt_router_conf_error(task, tmcf);
1867 }
1868 
1869 
1870 static nxt_int_t
1871 nxt_router_engines_create(nxt_task_t *task, nxt_router_t *router,
1872     nxt_router_temp_conf_t *tmcf, const nxt_event_interface_t *interface)
1873 {
1874     nxt_int_t                 ret;
1875     nxt_uint_t                n, threads;
1876     nxt_queue_link_t          *qlk;
1877     nxt_router_engine_conf_t  *recf;
1878 
1879     threads = tmcf->conf->threads;
1880 
1881     tmcf->engines = nxt_array_create(tmcf->mem_pool, threads,
1882                                      sizeof(nxt_router_engine_conf_t));
1883     if (nxt_slow_path(tmcf->engines == NULL)) {
1884         return NXT_ERROR;
1885     }
1886 
1887     n = 0;
1888 
1889     for (qlk = nxt_queue_first(&router->engines);
1890          qlk != nxt_queue_tail(&router->engines);
1891          qlk = nxt_queue_next(qlk))
1892     {
1893         recf = nxt_array_zero_add(tmcf->engines);
1894         if (nxt_slow_path(recf == NULL)) {
1895             return NXT_ERROR;
1896         }
1897 
1898         recf->engine = nxt_queue_link_data(qlk, nxt_event_engine_t, link0);
1899 
1900         if (n < threads) {
1901             recf->action = NXT_ROUTER_ENGINE_KEEP;
1902             ret = nxt_router_engine_conf_update(tmcf, recf);
1903 
1904         } else {
1905             recf->action = NXT_ROUTER_ENGINE_DELETE;
1906             ret = nxt_router_engine_conf_delete(tmcf, recf);
1907         }
1908 
1909         if (nxt_slow_path(ret != NXT_OK)) {
1910             return ret;
1911         }
1912 
1913         n++;
1914     }
1915 
1916     tmcf->new_threads = n;
1917 
1918     while (n < threads) {
1919         recf = nxt_array_zero_add(tmcf->engines);
1920         if (nxt_slow_path(recf == NULL)) {
1921             return NXT_ERROR;
1922         }
1923 
1924         recf->action = NXT_ROUTER_ENGINE_ADD;
1925 
1926         recf->engine = nxt_event_engine_create(task, interface, NULL, 0, 0);
1927         if (nxt_slow_path(recf->engine == NULL)) {
1928             return NXT_ERROR;
1929         }
1930 
1931         ret = nxt_router_engine_conf_create(tmcf, recf);
1932         if (nxt_slow_path(ret != NXT_OK)) {
1933             return ret;
1934         }
1935 
1936         n++;
1937     }
1938 
1939     return NXT_OK;
1940 }
1941 
1942 
1943 static nxt_int_t
1944 nxt_router_engine_conf_create(nxt_router_temp_conf_t *tmcf,
1945     nxt_router_engine_conf_t *recf)
1946 {
1947     nxt_int_t  ret;
1948 
1949     ret = nxt_router_engine_joints_create(tmcf, recf, &tmcf->creating,
1950                                           nxt_router_listen_socket_create);
1951     if (nxt_slow_path(ret != NXT_OK)) {
1952         return ret;
1953     }
1954 
1955     ret = nxt_router_engine_joints_create(tmcf, recf, &tmcf->updating,
1956                                           nxt_router_listen_socket_create);
1957     if (nxt_slow_path(ret != NXT_OK)) {
1958         return ret;
1959     }
1960 
1961     return ret;
1962 }
1963 
1964 
1965 static nxt_int_t
1966 nxt_router_engine_conf_update(nxt_router_temp_conf_t *tmcf,
1967     nxt_router_engine_conf_t *recf)
1968 {
1969     nxt_int_t  ret;
1970 
1971     ret = nxt_router_engine_joints_create(tmcf, recf, &tmcf->creating,
1972                                           nxt_router_listen_socket_create);
1973     if (nxt_slow_path(ret != NXT_OK)) {
1974         return ret;
1975     }
1976 
1977     ret = nxt_router_engine_joints_create(tmcf, recf, &tmcf->updating,
1978                                           nxt_router_listen_socket_update);
1979     if (nxt_slow_path(ret != NXT_OK)) {
1980         return ret;
1981     }
1982 
1983     ret = nxt_router_engine_joints_delete(tmcf, recf, &tmcf->deleting);
1984     if (nxt_slow_path(ret != NXT_OK)) {
1985         return ret;
1986     }
1987 
1988     return ret;
1989 }
1990 
1991 
1992 static nxt_int_t
1993 nxt_router_engine_conf_delete(nxt_router_temp_conf_t *tmcf,
1994     nxt_router_engine_conf_t *recf)
1995 {
1996     nxt_int_t  ret;
1997 
1998     ret = nxt_router_engine_quit(tmcf, recf);
1999     if (nxt_slow_path(ret != NXT_OK)) {
2000         return ret;
2001     }
2002 
2003     ret = nxt_router_engine_joints_delete(tmcf, recf, &tmcf->updating);
2004     if (nxt_slow_path(ret != NXT_OK)) {
2005         return ret;
2006     }
2007 
2008     return nxt_router_engine_joints_delete(tmcf, recf, &tmcf->deleting);
2009 }
2010 
2011 
2012 static nxt_int_t
2013 nxt_router_engine_joints_create(nxt_router_temp_conf_t *tmcf,
2014     nxt_router_engine_conf_t *recf, nxt_queue_t *sockets,
2015     nxt_work_handler_t handler)
2016 {
2017     nxt_joint_job_t          *job;
2018     nxt_queue_link_t         *qlk;
2019     nxt_socket_conf_t        *skcf;
2020     nxt_socket_conf_joint_t  *joint;
2021 
2022     for (qlk = nxt_queue_first(sockets);
2023          qlk != nxt_queue_tail(sockets);
2024          qlk = nxt_queue_next(qlk))
2025     {
2026         job = nxt_mp_get(tmcf->mem_pool, sizeof(nxt_joint_job_t));
2027         if (nxt_slow_path(job == NULL)) {
2028             return NXT_ERROR;
2029         }
2030 
2031         job->work.next = recf->jobs;
2032         recf->jobs = &job->work;
2033 
2034         job->task = tmcf->engine->task;
2035         job->work.handler = handler;
2036         job->work.task = &job->task;
2037         job->work.obj = job;
2038         job->tmcf = tmcf;
2039 
2040         tmcf->count++;
2041 
2042         joint = nxt_mp_alloc(tmcf->conf->mem_pool,
2043                              sizeof(nxt_socket_conf_joint_t));
2044         if (nxt_slow_path(joint == NULL)) {
2045             return NXT_ERROR;
2046         }
2047 
2048         job->work.data = joint;
2049 
2050         joint->count = 1;
2051 
2052         skcf = nxt_queue_link_data(qlk, nxt_socket_conf_t, link);
2053         skcf->count++;
2054         joint->socket_conf = skcf;
2055 
2056         joint->engine = recf->engine;
2057     }
2058 
2059     return NXT_OK;
2060 }
2061 
2062 
2063 static nxt_int_t
2064 nxt_router_engine_quit(nxt_router_temp_conf_t *tmcf,
2065     nxt_router_engine_conf_t *recf)
2066 {
2067     nxt_joint_job_t  *job;
2068 
2069     job = nxt_mp_get(tmcf->mem_pool, sizeof(nxt_joint_job_t));
2070     if (nxt_slow_path(job == NULL)) {
2071         return NXT_ERROR;
2072     }
2073 
2074     job->work.next = recf->jobs;
2075     recf->jobs = &job->work;
2076 
2077     job->task = tmcf->engine->task;
2078     job->work.handler = nxt_router_worker_thread_quit;
2079     job->work.task = &job->task;
2080     job->work.obj = NULL;
2081     job->work.data = NULL;
2082     job->tmcf = NULL;
2083 
2084     return NXT_OK;
2085 }
2086 
2087 
2088 static nxt_int_t
2089 nxt_router_engine_joints_delete(nxt_router_temp_conf_t *tmcf,
2090     nxt_router_engine_conf_t *recf, nxt_queue_t *sockets)
2091 {
2092     nxt_joint_job_t   *job;
2093     nxt_queue_link_t  *qlk;
2094 
2095     for (qlk = nxt_queue_first(sockets);
2096          qlk != nxt_queue_tail(sockets);
2097          qlk = nxt_queue_next(qlk))
2098     {
2099         job = nxt_mp_get(tmcf->mem_pool, sizeof(nxt_joint_job_t));
2100         if (nxt_slow_path(job == NULL)) {
2101             return NXT_ERROR;
2102         }
2103 
2104         job->work.next = recf->jobs;
2105         recf->jobs = &job->work;
2106 
2107         job->task = tmcf->engine->task;
2108         job->work.handler = nxt_router_listen_socket_delete;
2109         job->work.task = &job->task;
2110         job->work.obj = job;
2111         job->work.data = nxt_queue_link_data(qlk, nxt_socket_conf_t, link);
2112         job->tmcf = tmcf;
2113 
2114         tmcf->count++;
2115     }
2116 
2117     return NXT_OK;
2118 }
2119 
2120 
2121 static nxt_int_t
2122 nxt_router_threads_create(nxt_task_t *task, nxt_runtime_t *rt,
2123     nxt_router_temp_conf_t *tmcf)
2124 {
2125     nxt_int_t                 ret;
2126     nxt_uint_t                i, threads;
2127     nxt_router_engine_conf_t  *recf;
2128 
2129     recf = tmcf->engines->elts;
2130     threads = tmcf->conf->threads;
2131 
2132     for (i = tmcf->new_threads; i < threads; i++) {
2133         ret = nxt_router_thread_create(task, rt, recf[i].engine);
2134         if (nxt_slow_path(ret != NXT_OK)) {
2135             return ret;
2136         }
2137     }
2138 
2139     return NXT_OK;
2140 }
2141 
2142 
2143 static nxt_int_t
2144 nxt_router_thread_create(nxt_task_t *task, nxt_runtime_t *rt,
2145     nxt_event_engine_t *engine)
2146 {
2147     nxt_int_t            ret;
2148     nxt_thread_link_t    *link;
2149     nxt_thread_handle_t  handle;
2150 
2151     link = nxt_zalloc(sizeof(nxt_thread_link_t));
2152 
2153     if (nxt_slow_path(link == NULL)) {
2154         return NXT_ERROR;
2155     }
2156 
2157     link->start = nxt_router_thread_start;
2158     link->engine = engine;
2159     link->work.handler = nxt_router_thread_exit_handler;
2160     link->work.task = task;
2161     link->work.data = link;
2162 
2163     nxt_queue_insert_tail(&rt->engines, &engine->link);
2164 
2165     ret = nxt_thread_create(&handle, link);
2166 
2167     if (nxt_slow_path(ret != NXT_OK)) {
2168         nxt_queue_remove(&engine->link);
2169     }
2170 
2171     return ret;
2172 }
2173 
2174 
2175 static void
2176 nxt_router_apps_sort(nxt_task_t *task, nxt_router_t *router,
2177     nxt_router_temp_conf_t *tmcf)
2178 {
2179     nxt_app_t  *app;
2180 
2181     nxt_queue_each(app, &router->apps, nxt_app_t, link) {
2182 
2183         nxt_router_app_quit(task, app);
2184 
2185     } nxt_queue_loop;
2186 
2187     nxt_queue_add(&router->apps, &tmcf->previous);
2188     nxt_queue_add(&router->apps, &tmcf->apps);
2189 }
2190 
2191 
2192 static void
2193 nxt_router_engines_post(nxt_router_t *router, nxt_router_temp_conf_t *tmcf)
2194 {
2195     nxt_uint_t                n;
2196     nxt_event_engine_t        *engine;
2197     nxt_router_engine_conf_t  *recf;
2198 
2199     recf = tmcf->engines->elts;
2200 
2201     for (n = tmcf->engines->nelts; n != 0; n--) {
2202         engine = recf->engine;
2203 
2204         switch (recf->action) {
2205 
2206         case NXT_ROUTER_ENGINE_KEEP:
2207             break;
2208 
2209         case NXT_ROUTER_ENGINE_ADD:
2210             nxt_queue_insert_tail(&router->engines, &engine->link0);
2211             break;
2212 
2213         case NXT_ROUTER_ENGINE_DELETE:
2214             nxt_queue_remove(&engine->link0);
2215             break;
2216         }
2217 
2218         nxt_router_engine_post(engine, recf->jobs);
2219 
2220         recf++;
2221     }
2222 }
2223 
2224 
2225 static void
2226 nxt_router_engine_post(nxt_event_engine_t *engine, nxt_work_t *jobs)
2227 {
2228     nxt_work_t  *work, *next;
2229 
2230     for (work = jobs; work != NULL; work = next) {
2231         next = work->next;
2232         work->next = NULL;
2233 
2234         nxt_event_engine_post(engine, work);
2235     }
2236 }
2237 
2238 
2239 static nxt_port_handlers_t  nxt_router_app_port_handlers = {
2240     .mmap = nxt_port_mmap_handler,
2241     .data = nxt_port_rpc_handler,
2242 };
2243 
2244 
2245 static void
2246 nxt_router_thread_start(void *data)
2247 {
2248     nxt_int_t           ret;
2249     nxt_port_t          *port;
2250     nxt_task_t          *task;
2251     nxt_thread_t        *thread;
2252     nxt_thread_link_t   *link;
2253     nxt_event_engine_t  *engine;
2254 
2255     link = data;
2256     engine = link->engine;
2257     task = &engine->task;
2258 
2259     thread = nxt_thread();
2260 
2261     nxt_event_engine_thread_adopt(engine);
2262 
2263     /* STUB */
2264     thread->runtime = engine->task.thread->runtime;
2265 
2266     engine->task.thread = thread;
2267     engine->task.log = thread->log;
2268     thread->engine = engine;
2269     thread->task = &engine->task;
2270 #if 0
2271     thread->fiber = &engine->fibers->fiber;
2272 #endif
2273 
2274     engine->mem_pool = nxt_mp_create(4096, 128, 1024, 64);
2275     if (nxt_slow_path(engine->mem_pool == NULL)) {
2276         return;
2277     }
2278 
2279     port = nxt_port_new(task, nxt_port_get_next_id(), nxt_pid,
2280                         NXT_PROCESS_ROUTER);
2281     if (nxt_slow_path(port == NULL)) {
2282         return;
2283     }
2284 
2285     ret = nxt_port_socket_init(task, port, 0);
2286     if (nxt_slow_path(ret != NXT_OK)) {
2287         nxt_port_use(task, port, -1);
2288         return;
2289     }
2290 
2291     engine->port = port;
2292 
2293     nxt_port_enable(task, port, &nxt_router_app_port_handlers);
2294 
2295     nxt_event_engine_start(engine);
2296 }
2297 
2298 
2299 static void
2300 nxt_router_listen_socket_create(nxt_task_t *task, void *obj, void *data)
2301 {
2302     nxt_joint_job_t          *job;
2303     nxt_socket_conf_t        *skcf;
2304     nxt_listen_event_t       *lev;
2305     nxt_listen_socket_t      *ls;
2306     nxt_thread_spinlock_t    *lock;
2307     nxt_socket_conf_joint_t  *joint;
2308 
2309     job = obj;
2310     joint = data;
2311 
2312     nxt_queue_insert_tail(&task->thread->engine->joints, &joint->link);
2313 
2314     skcf = joint->socket_conf;
2315     ls = skcf->listen;
2316 
2317     lev = nxt_listen_event(task, ls);
2318     if (nxt_slow_path(lev == NULL)) {
2319         nxt_router_listen_socket_release(task, skcf);
2320         return;
2321     }
2322 
2323     lev->socket.data = joint;
2324 
2325     lock = &skcf->router_conf->router->lock;
2326 
2327     nxt_thread_spin_lock(lock);
2328     ls->count++;
2329     nxt_thread_spin_unlock(lock);
2330 
2331     job->work.next = NULL;
2332     job->work.handler = nxt_router_conf_wait;
2333 
2334     nxt_event_engine_post(job->tmcf->engine, &job->work);
2335 }
2336 
2337 
2338 nxt_inline nxt_listen_event_t *
2339 nxt_router_listen_event(nxt_queue_t *listen_connections,
2340     nxt_socket_conf_t *skcf)
2341 {
2342     nxt_socket_t        fd;
2343     nxt_queue_link_t    *qlk;
2344     nxt_listen_event_t  *lev;
2345 
2346     fd = skcf->listen->socket;
2347 
2348     for (qlk = nxt_queue_first(listen_connections);
2349          qlk != nxt_queue_tail(listen_connections);
2350          qlk = nxt_queue_next(qlk))
2351     {
2352         lev = nxt_queue_link_data(qlk, nxt_listen_event_t, link);
2353 
2354         if (fd == lev->socket.fd) {
2355             return lev;
2356         }
2357     }
2358 
2359     return NULL;
2360 }
2361 
2362 
2363 static void
2364 nxt_router_listen_socket_update(nxt_task_t *task, void *obj, void *data)
2365 {
2366     nxt_joint_job_t          *job;
2367     nxt_event_engine_t       *engine;
2368     nxt_listen_event_t       *lev;
2369     nxt_socket_conf_joint_t  *joint, *old;
2370 
2371     job = obj;
2372     joint = data;
2373 
2374     engine = task->thread->engine;
2375 
2376     nxt_queue_insert_tail(&engine->joints, &joint->link);
2377 
2378     lev = nxt_router_listen_event(&engine->listen_connections,
2379                                   joint->socket_conf);
2380 
2381     old = lev->socket.data;
2382     lev->socket.data = joint;
2383     lev->listen = joint->socket_conf->listen;
2384 
2385     job->work.next = NULL;
2386     job->work.handler = nxt_router_conf_wait;
2387 
2388     nxt_event_engine_post(job->tmcf->engine, &job->work);
2389 
2390     /*
2391      * The task is allocated from configuration temporary
2392      * memory pool so it can be freed after engine post operation.
2393      */
2394 
2395     nxt_router_conf_release(&engine->task, old);
2396 }
2397 
2398 
2399 static void
2400 nxt_router_listen_socket_delete(nxt_task_t *task, void *obj, void *data)
2401 {
2402     nxt_joint_job_t     *job;
2403     nxt_socket_conf_t   *skcf;
2404     nxt_listen_event_t  *lev;
2405     nxt_event_engine_t  *engine;
2406 
2407     job = obj;
2408     skcf = data;
2409 
2410     engine = task->thread->engine;
2411 
2412     lev = nxt_router_listen_event(&engine->listen_connections, skcf);
2413 
2414     nxt_fd_event_delete(engine, &lev->socket);
2415 
2416     nxt_debug(task, "engine %p: listen socket delete: %d", engine,
2417               lev->socket.fd);
2418 
2419     lev->timer.handler = nxt_router_listen_socket_close;
2420     lev->timer.work_queue = &engine->fast_work_queue;
2421 
2422     nxt_timer_add(engine, &lev->timer, 0);
2423 
2424     job->work.next = NULL;
2425     job->work.handler = nxt_router_conf_wait;
2426 
2427     nxt_event_engine_post(job->tmcf->engine, &job->work);
2428 }
2429 
2430 
2431 static void
2432 nxt_router_worker_thread_quit(nxt_task_t *task, void *obj, void *data)
2433 {
2434     nxt_event_engine_t  *engine;
2435 
2436     nxt_debug(task, "router worker thread quit");
2437 
2438     engine = task->thread->engine;
2439 
2440     engine->shutdown = 1;
2441 
2442     if (nxt_queue_is_empty(&engine->joints)) {
2443         nxt_thread_exit(task->thread);
2444     }
2445 }
2446 
2447 
2448 static void
2449 nxt_router_listen_socket_close(nxt_task_t *task, void *obj, void *data)
2450 {
2451     nxt_timer_t              *timer;
2452     nxt_listen_event_t       *lev;
2453     nxt_socket_conf_joint_t  *joint;
2454 
2455     timer = obj;
2456     lev = nxt_timer_data(timer, nxt_listen_event_t, timer);
2457     joint = lev->socket.data;
2458 
2459     nxt_debug(task, "engine %p: listen socket close: %d", task->thread->engine,
2460               lev->socket.fd);
2461 
2462     nxt_queue_remove(&lev->link);
2463 
2464     /* 'task' refers to lev->task and we cannot use after nxt_free() */
2465     task = &task->thread->engine->task;
2466 
2467     nxt_router_listen_socket_release(task, joint->socket_conf);
2468 
2469     nxt_free(lev);
2470 
2471     nxt_router_conf_release(task, joint);
2472 }
2473 
2474 
2475 static void
2476 nxt_router_listen_socket_release(nxt_task_t *task, nxt_socket_conf_t *skcf)
2477 {
2478     nxt_listen_socket_t    *ls;
2479     nxt_thread_spinlock_t  *lock;
2480 
2481     ls = skcf->listen;
2482     lock = &skcf->router_conf->router->lock;
2483 
2484     nxt_thread_spin_lock(lock);
2485 
2486     nxt_debug(task, "engine %p: listen socket release: ls->count %D",
2487               task->thread->engine, ls->count);
2488 
2489     if (--ls->count != 0) {
2490         ls = NULL;
2491     }
2492 
2493     nxt_thread_spin_unlock(lock);
2494 
2495     if (ls != NULL) {
2496         nxt_socket_close(task, ls->socket);
2497         nxt_free(ls);
2498     }
2499 }
2500 
2501 
2502 static void
2503 nxt_router_conf_release(nxt_task_t *task, nxt_socket_conf_joint_t *joint)
2504 {
2505     nxt_bool_t             exit;
2506     nxt_socket_conf_t      *skcf;
2507     nxt_router_conf_t      *rtcf;
2508     nxt_event_engine_t     *engine;
2509     nxt_thread_spinlock_t  *lock;
2510 
2511     nxt_debug(task, "conf joint %p count: %D", joint, joint->count);
2512 
2513     if (--joint->count != 0) {
2514         return;
2515     }
2516 
2517     nxt_queue_remove(&joint->link);
2518 
2519     skcf = joint->socket_conf;
2520     rtcf = skcf->router_conf;
2521     lock = &rtcf->router->lock;
2522 
2523     nxt_thread_spin_lock(lock);
2524 
2525     nxt_debug(task, "conf skcf %p: %D, rtcf %p: %D", skcf, skcf->count,
2526               rtcf, rtcf->count);
2527 
2528     if (--skcf->count != 0) {
2529         rtcf = NULL;
2530 
2531     } else {
2532         nxt_queue_remove(&skcf->link);
2533 
2534         if (--rtcf->count != 0) {
2535             rtcf = NULL;
2536         }
2537     }
2538 
2539     nxt_thread_spin_unlock(lock);
2540 
2541     /* TODO remove engine->port */
2542     /* TODO excude from connected ports */
2543 
2544     /* The joint content can be used before memory pool destruction. */
2545     engine = joint->engine;
2546     exit = (engine->shutdown && nxt_queue_is_empty(&engine->joints));
2547 
2548     if (rtcf != NULL) {
2549         nxt_debug(task, "old router conf is destroyed");
2550 
2551         nxt_mp_thread_adopt(rtcf->mem_pool);
2552 
2553         nxt_mp_destroy(rtcf->mem_pool);
2554     }
2555 
2556     if (exit) {
2557         nxt_thread_exit(task->thread);
2558     }
2559 }
2560 
2561 
2562 static void
2563 nxt_router_thread_exit_handler(nxt_task_t *task, void *obj, void *data)
2564 {
2565     nxt_port_t           *port;
2566     nxt_thread_link_t    *link;
2567     nxt_event_engine_t   *engine;
2568     nxt_thread_handle_t  handle;
2569 
2570     handle = (nxt_thread_handle_t) obj;
2571     link = data;
2572 
2573     nxt_thread_wait(handle);
2574 
2575     engine = link->engine;
2576 
2577     nxt_queue_remove(&engine->link);
2578 
2579     port = engine->port;
2580 
2581     // TODO notify all apps
2582 
2583     port->engine = task->thread->engine;
2584     nxt_mp_thread_adopt(port->mem_pool);
2585     nxt_port_use(task, port, -1);
2586 
2587     nxt_mp_thread_adopt(engine->mem_pool);
2588     nxt_mp_destroy(engine->mem_pool);
2589 
2590     nxt_event_engine_free(engine);
2591 
2592     nxt_free(link);
2593 }
2594 
2595 
2596 static void
2597 nxt_router_response_ready_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg,
2598     void *data)
2599 {
2600     size_t               dump_size;
2601     nxt_int_t            ret;
2602     nxt_buf_t            *b, *last;
2603     nxt_http_request_t   *r;
2604     nxt_req_conn_link_t  *rc;
2605     nxt_app_parse_ctx_t  *ar;
2606 
2607     b = msg->buf;
2608     rc = data;
2609 
2610     dump_size = nxt_buf_used_size(b);
2611 
2612     if (dump_size > 300) {
2613         dump_size = 300;
2614     }
2615 
2616     nxt_debug(task, "%srouter app data (%uz): %*s",
2617               msg->port_msg.last ? "last " : "", msg->size, dump_size,
2618               b->mem.pos);
2619 
2620     if (msg->size == 0) {
2621         b = NULL;
2622     }
2623 
2624     ar = rc->ap;
2625 
2626     if (msg->port_msg.last != 0) {
2627         nxt_debug(task, "router data create last buf");
2628 
2629         last = nxt_http_request_last_buffer(task, ar->request);
2630         if (nxt_slow_path(last == NULL)) {
2631             nxt_app_http_req_done(task, ar);
2632             nxt_router_rc_unlink(task, rc);
2633             return;
2634         }
2635 
2636         nxt_buf_chain_add(&b, last);
2637 
2638         nxt_router_rc_unlink(task, rc);
2639 
2640     } else {
2641         if (rc->app->timeout != 0) {
2642             ar->timer.handler = nxt_router_app_timeout;
2643             nxt_timer_add(task->thread->engine, &ar->timer, rc->app->timeout);
2644         }
2645     }
2646 
2647     if (b == NULL) {
2648         return;
2649     }
2650 
2651     if (msg->buf == b) {
2652         /* Disable instant buffer completion/re-using by port. */
2653         msg->buf = NULL;
2654     }
2655 
2656     r = ar->request;
2657 
2658     if (r->header_sent) {
2659         nxt_buf_chain_add(&r->out, b);
2660         nxt_http_request_send_body(task, r, NULL);
2661 
2662     } else {
2663         ret = nxt_http_parse_fields(&ar->resp_parser, &b->mem);
2664         if (nxt_slow_path(ret != NXT_DONE)) {
2665             goto fail;
2666         }
2667 
2668         r->resp.fields = ar->resp_parser.fields;
2669 
2670         ret = nxt_http_fields_process(r->resp.fields,
2671                                       &nxt_response_fields_hash, r);
2672         if (nxt_slow_path(ret != NXT_OK)) {
2673             goto fail;
2674         }
2675 
2676         if (nxt_buf_mem_used_size(&b->mem) == 0) {
2677             nxt_work_queue_add(&task->thread->engine->fast_work_queue,
2678                                b->completion_handler, task, b, b->parent);
2679 
2680         } else {
2681             nxt_buf_chain_add(&r->out, b);
2682         }
2683 
2684         r->state = &nxt_http_request_send_state;
2685 
2686         nxt_http_request_header_send(task, r);
2687     }
2688 
2689     return;
2690 
2691 fail:
2692 
2693     nxt_app_http_req_done(task, ar);
2694     nxt_router_rc_unlink(task, rc);
2695 
2696     nxt_http_request_error(task, r, NXT_HTTP_SERVICE_UNAVAILABLE);
2697 }
2698 
2699 
2700 static const nxt_http_request_state_t  nxt_http_request_send_state
2701     nxt_aligned(64) =
2702 {
2703     .ready_handler = nxt_http_request_send_body,
2704     .error_handler = nxt_http_request_close_handler,
2705 };
2706 
2707 
2708 static void
2709 nxt_http_request_send_body(nxt_task_t *task, void *obj, void *data)
2710 {
2711     nxt_buf_t           *out;
2712     nxt_http_request_t  *r;
2713 
2714     r = obj;
2715 
2716     out = r->out;
2717 
2718     if (out != NULL) {
2719         r->out = NULL;
2720         nxt_http_request_send(task, r, out);
2721     }
2722 }
2723 
2724 
2725 static void
2726 nxt_router_response_error_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg,
2727     void *data)
2728 {
2729     nxt_int_t            res;
2730     nxt_port_t           *port;
2731     nxt_bool_t           cancelled;
2732     nxt_req_app_link_t   *ra;
2733     nxt_req_conn_link_t  *rc;
2734 
2735     rc = data;
2736 
2737     ra = rc->ra;
2738 
2739     if (ra != NULL) {
2740         cancelled = nxt_router_msg_cancel(task, &ra->msg_info, ra->stream);
2741 
2742         if (cancelled) {
2743             nxt_router_ra_inc_use(ra);
2744 
2745             res = nxt_router_app_port(task, rc->app, ra);
2746 
2747             if (res == NXT_OK) {
2748                 port = ra->app_port;
2749 
2750                 nxt_assert(port != NULL);
2751 
2752                 nxt_port_rpc_ex_set_peer(task, task->thread->engine->port, rc,
2753                                          port->pid);
2754 
2755                 nxt_router_app_prepare_request(task, ra);
2756             }
2757 
2758             msg->port_msg.last = 0;
2759 
2760             return;
2761         }
2762     }
2763 
2764     nxt_http_request_error(task, rc->ap->request, NXT_HTTP_SERVICE_UNAVAILABLE);
2765 
2766     nxt_router_rc_unlink(task, rc);
2767 }
2768 
2769 
2770 static void
2771 nxt_router_app_port_ready(nxt_task_t *task, nxt_port_recv_msg_t *msg,
2772     void *data)
2773 {
2774     nxt_app_t   *app;
2775     nxt_port_t  *port;
2776 
2777     app = data;
2778     port = msg->u.new_port;
2779 
2780     nxt_assert(app != NULL);
2781     nxt_assert(port != NULL);
2782 
2783     port->app = app;
2784 
2785     nxt_thread_mutex_lock(&app->mutex);
2786 
2787     nxt_assert(app->pending_processes != 0);
2788 
2789     app->pending_processes--;
2790     app->processes++;
2791 
2792     nxt_thread_mutex_unlock(&app->mutex);
2793 
2794     nxt_debug(task, "app '%V' new port ready, pid %PI, %d/%d",
2795               &app->name, port->pid, app->processes, app->pending_processes);
2796 
2797     nxt_router_app_port_release(task, port, 0, 0);
2798 }
2799 
2800 
2801 static void
2802 nxt_router_app_port_error(nxt_task_t *task, nxt_port_recv_msg_t *msg,
2803     void *data)
2804 {
2805     nxt_app_t           *app;
2806     nxt_queue_link_t    *lnk;
2807     nxt_req_app_link_t  *ra;
2808 
2809     app = data;
2810 
2811     nxt_assert(app != NULL);
2812 
2813     nxt_debug(task, "app '%V' %p start error", &app->name, app);
2814 
2815     nxt_thread_mutex_lock(&app->mutex);
2816 
2817     nxt_assert(app->pending_processes != 0);
2818 
2819     app->pending_processes--;
2820 
2821     if (!nxt_queue_is_empty(&app->requests)) {
2822         lnk = nxt_queue_last(&app->requests);
2823         nxt_queue_remove(lnk);
2824         lnk->next = NULL;
2825 
2826         ra = nxt_queue_link_data(lnk, nxt_req_app_link_t, link_app_requests);
2827 
2828     } else {
2829         ra = NULL;
2830     }
2831 
2832     nxt_thread_mutex_unlock(&app->mutex);
2833 
2834     if (ra != NULL) {
2835         nxt_debug(task, "app '%V' %p abort next stream #%uD",
2836                   &app->name, app, ra->stream);
2837 
2838         nxt_router_ra_error(ra, 500, "Failed to start application process");
2839         nxt_router_ra_use(task, ra, -1);
2840     }
2841 
2842     nxt_router_app_use(task, app, -1);
2843 }
2844 
2845 
2846 void
2847 nxt_router_app_use(nxt_task_t *task, nxt_app_t *app, int i)
2848 {
2849     int  c;
2850 
2851     c = nxt_atomic_fetch_add(&app->use_count, i);
2852 
2853     if (i < 0 && c == -i) {
2854 
2855         nxt_assert(app->live == 0);
2856         nxt_assert(app->processes == 0);
2857         nxt_assert(app->idle_processes == 0);
2858         nxt_assert(app->pending_processes == 0);
2859         nxt_assert(nxt_queue_is_empty(&app->requests) != 0);
2860         nxt_assert(nxt_queue_is_empty(&app->ports) != 0);
2861         nxt_assert(nxt_queue_is_empty(&app->spare_ports) != 0);
2862         nxt_assert(nxt_queue_is_empty(&app->idle_ports) != 0);
2863 
2864         nxt_thread_mutex_destroy(&app->mutex);
2865         nxt_free(app);
2866     }
2867 }
2868 
2869 
2870 nxt_inline nxt_bool_t
2871 nxt_router_app_first_port_busy(nxt_app_t *app)
2872 {
2873     nxt_port_t        *port;
2874     nxt_queue_link_t  *lnk;
2875 
2876     lnk = nxt_queue_first(&app->ports);
2877     port = nxt_queue_link_data(lnk, nxt_port_t, app_link);
2878 
2879     return port->app_pending_responses > 0;
2880 }
2881 
2882 
2883 nxt_inline nxt_port_t *
2884 nxt_router_pop_first_port(nxt_app_t *app)
2885 {
2886     nxt_port_t        *port;
2887     nxt_queue_link_t  *lnk;
2888 
2889     lnk = nxt_queue_first(&app->ports);
2890     nxt_queue_remove(lnk);
2891 
2892     port = nxt_queue_link_data(lnk, nxt_port_t, app_link);
2893 
2894     port->app_pending_responses++;
2895 
2896     if (nxt_queue_chk_remove(&port->idle_link)) {
2897         app->idle_processes--;
2898 
2899         if (port->idle_start == 0) {
2900             nxt_assert(app->idle_processes < app->spare_processes);
2901 
2902         } else {
2903             nxt_assert(app->idle_processes >= app->spare_processes);
2904 
2905             port->idle_start = 0;
2906         }
2907     }
2908 
2909     if ((app->max_pending_responses == 0
2910             || port->app_pending_responses < app->max_pending_responses)
2911         && (app->max_requests == 0
2912             || port->app_responses + port->app_pending_responses
2913                 < app->max_requests))
2914     {
2915         nxt_queue_insert_tail(&app->ports, lnk);
2916 
2917         nxt_port_inc_use(port);
2918 
2919     } else {
2920         lnk->next = NULL;
2921     }
2922 
2923     return port;
2924 }
2925 
2926 
2927 nxt_inline nxt_port_t *
2928 nxt_router_app_get_port_for_quit(nxt_app_t *app)
2929 {
2930     nxt_port_t  *port;
2931 
2932     port = NULL;
2933 
2934     nxt_thread_mutex_lock(&app->mutex);
2935 
2936     nxt_queue_each(port, &app->ports, nxt_port_t, app_link) {
2937 
2938         if (port->app_pending_responses > 0) {
2939             port = NULL;
2940 
2941             continue;
2942         }
2943 
2944         /* Caller is responsible to decrease port use count. */
2945         nxt_queue_chk_remove(&port->app_link);
2946 
2947         if (nxt_queue_chk_remove(&port->idle_link)) {
2948             app->idle_processes--;
2949         }
2950 
2951         /* Caller is responsible to decrease app use count. */
2952         port->app = NULL;
2953         app->processes--;
2954 
2955         break;
2956 
2957     } nxt_queue_loop;
2958 
2959     nxt_thread_mutex_unlock(&app->mutex);
2960 
2961     return port;
2962 }
2963 
2964 
2965 static void
2966 nxt_router_app_quit(nxt_task_t *task, nxt_app_t *app)
2967 {
2968     nxt_port_t  *port;
2969 
2970     nxt_queue_remove(&app->link);
2971 
2972     app->live = 0;
2973 
2974     for ( ;; ) {
2975         port = nxt_router_app_get_port_for_quit(app);
2976         if (port == NULL) {
2977             break;
2978         }
2979 
2980         nxt_debug(task, "send QUIT to app '%V' pid %PI", &app->name, port->pid);
2981 
2982         nxt_port_socket_write(task, port, NXT_PORT_MSG_QUIT, -1, 0, 0, NULL);
2983 
2984         nxt_port_use(task, port, -1);
2985         nxt_router_app_use(task, app, -1);
2986     }
2987 
2988     if (nxt_timer_is_in_tree(&app->idle_timer)) {
2989         nxt_assert(app->engine == task->thread->engine);
2990 
2991         app->idle_timer.handler = nxt_router_app_release_handler;
2992         nxt_timer_add(app->engine, &app->idle_timer, 0);
2993 
2994     } else {
2995         nxt_router_app_use(task, app, -1);
2996     }
2997 }
2998 
2999 
3000 static void
3001 nxt_router_app_process_request(nxt_task_t *task, void *obj, void *data)
3002 {
3003     nxt_app_t           *app;
3004     nxt_req_app_link_t  *ra;
3005 
3006     app = obj;
3007     ra = data;
3008 
3009     nxt_assert(app != NULL);
3010     nxt_assert(ra != NULL);
3011     nxt_assert(ra->app_port != NULL);
3012 
3013     nxt_debug(task, "app '%V' %p process next stream #%uD",
3014               &app->name, app, ra->stream);
3015 
3016     nxt_router_app_prepare_request(task, ra);
3017 }
3018 
3019 
3020 static void
3021 nxt_router_app_port_release(nxt_task_t *task, nxt_port_t *port,
3022     uint32_t request_failed, uint32_t got_response)
3023 {
3024     nxt_app_t                *app;
3025     nxt_bool_t               port_unchained;
3026     nxt_bool_t               send_quit, cancelled, adjust_idle_timer;
3027     nxt_queue_link_t         *lnk;
3028     nxt_req_app_link_t       *ra, *pending_ra, *re_ra;
3029     nxt_port_select_state_t  state;
3030 
3031     nxt_assert(port != NULL);
3032     nxt_assert(port->app != NULL);
3033 
3034     ra = NULL;
3035 
3036     app = port->app;
3037 
3038     nxt_thread_mutex_lock(&app->mutex);
3039 
3040     port->app_pending_responses -= request_failed + got_response;
3041     port->app_responses += got_response;
3042 
3043     if (nxt_slow_path(app->live == 0)) {
3044         goto app_dead;
3045     }
3046 
3047     if (port->pair[1] != -1
3048         && (app->max_pending_responses == 0
3049             || port->app_pending_responses < app->max_pending_responses)
3050         && (app->max_requests == 0
3051             || port->app_responses + port->app_pending_responses
3052                 < app->max_requests))
3053     {
3054         if (port->app_link.next == NULL) {
3055             if (port->app_pending_responses > 0) {
3056                 nxt_queue_insert_tail(&app->ports, &port->app_link);
3057 
3058             } else {
3059                 nxt_queue_insert_head(&app->ports, &port->app_link);
3060             }
3061 
3062             nxt_port_inc_use(port);
3063 
3064         } else {
3065             if (port->app_pending_responses == 0
3066                 && nxt_queue_first(&app->ports) != &port->app_link)
3067             {
3068                 nxt_queue_remove(&port->app_link);
3069                 nxt_queue_insert_head(&app->ports, &port->app_link);
3070             }
3071         }
3072     }
3073 
3074     if (!nxt_queue_is_empty(&app->ports)
3075         && !nxt_queue_is_empty(&app->requests))
3076     {
3077         lnk = nxt_queue_first(&app->requests);
3078         nxt_queue_remove(lnk);
3079         lnk->next = NULL;
3080 
3081         ra = nxt_queue_link_data(lnk, nxt_req_app_link_t, link_app_requests);
3082 
3083         ra->app_port = nxt_router_pop_first_port(app);
3084 
3085         if (ra->app_port->app_pending_responses > 1) {
3086             nxt_router_ra_pending(task, app, ra);
3087         }
3088     }
3089 
3090 app_dead:
3091 
3092     /* Pop first pending request for this port. */
3093     if ((request_failed > 0 || got_response > 0)
3094         && !nxt_queue_is_empty(&port->pending_requests))
3095     {
3096         lnk = nxt_queue_first(&port->pending_requests);
3097         nxt_queue_remove(lnk);
3098         lnk->next = NULL;
3099 
3100         pending_ra = nxt_queue_link_data(lnk, nxt_req_app_link_t,
3101                                          link_port_pending);
3102 
3103         nxt_assert(pending_ra->link_app_pending.next != NULL);
3104 
3105         nxt_queue_remove(&pending_ra->link_app_pending);
3106         pending_ra->link_app_pending.next = NULL;
3107 
3108     } else {
3109         pending_ra = NULL;
3110     }
3111 
3112     /* Try to cancel and re-schedule first stalled request for this app. */
3113     if (got_response > 0 && !nxt_queue_is_empty(&app->pending)) {
3114         lnk = nxt_queue_first(&app->pending);
3115 
3116         re_ra = nxt_queue_link_data(lnk, nxt_req_app_link_t, link_app_pending);
3117 
3118         if (re_ra->res_time <= nxt_thread_monotonic_time(task->thread)) {
3119 
3120             nxt_debug(task, "app '%V' stalled request #%uD detected",
3121                       &app->name, re_ra->stream);
3122 
3123             cancelled = nxt_router_msg_cancel(task, &re_ra->msg_info,
3124                                               re_ra->stream);
3125 
3126             if (cancelled) {
3127                 nxt_router_ra_inc_use(re_ra);
3128 
3129                 state.ra = re_ra;
3130                 state.app = app;
3131 
3132                 nxt_router_port_select(task, &state);
3133 
3134                 goto re_ra_cancelled;
3135             }
3136         }
3137     }
3138 
3139     re_ra = NULL;
3140 
3141 re_ra_cancelled:
3142 
3143     send_quit = (app->live == 0 && port->app_pending_responses == 0)
3144                 || (app->max_requests > 0 && port->app_pending_responses == 0
3145                     && port->app_responses >= app->max_requests);
3146 
3147     if (send_quit) {
3148         port_unchained = nxt_queue_chk_remove(&port->app_link);
3149 
3150         port->app = NULL;
3151         app->processes--;
3152 
3153     } else {
3154         port_unchained = 0;
3155     }
3156 
3157     adjust_idle_timer = 0;
3158 
3159     if (!send_quit && port->app_pending_responses == 0) {
3160         nxt_assert(port->idle_link.next == NULL);
3161 
3162         if (app->idle_processes == app->spare_processes
3163             && app->adjust_idle_work.data == NULL)
3164         {
3165             adjust_idle_timer = 1;
3166             app->adjust_idle_work.data = app;
3167             app->adjust_idle_work.next = NULL;
3168         }
3169 
3170         if (app->idle_processes < app->spare_processes) {
3171             nxt_queue_insert_tail(&app->spare_ports, &port->idle_link);
3172 
3173         } else {
3174             nxt_queue_insert_tail(&app->idle_ports, &port->idle_link);
3175 
3176             port->idle_start = task->thread->engine->timers.now;
3177         }
3178 
3179         app->idle_processes++;
3180     }
3181 
3182     nxt_thread_mutex_unlock(&app->mutex);
3183 
3184     if (adjust_idle_timer) {
3185         nxt_router_app_use(task, app, 1);
3186         nxt_event_engine_post(app->engine, &app->adjust_idle_work);
3187     }
3188 
3189     if (pending_ra != NULL) {
3190         nxt_router_ra_use(task, pending_ra, -1);
3191     }
3192 
3193     if (re_ra != NULL) {
3194         if (nxt_router_port_post_select(task, &state) == NXT_OK) {
3195             nxt_work_queue_add(&task->thread->engine->fast_work_queue,
3196                                nxt_router_app_process_request,
3197                                &task->thread->engine->task,