xref: /unit/src/nxt_router.c (revision 967:d693ed6d0209)
1 
2 /*
3  * Copyright (C) Igor Sysoev
4  * Copyright (C) Valentin V. Bartenev
5  * Copyright (C) NGINX, Inc.
6  */
7 
8 #include <nxt_router.h>
9 #include <nxt_conf.h>
10 #if (NXT_TLS)
11 #include <nxt_cert.h>
12 #endif
13 #include <nxt_http.h>
14 #include <nxt_port_memory_int.h>
15 #include <nxt_unit_request.h>
16 #include <nxt_unit_response.h>
17 
18 
19 typedef struct {
20     nxt_str_t         type;
21     uint32_t          processes;
22     uint32_t          max_processes;
23     uint32_t          spare_processes;
24     nxt_msec_t        timeout;
25     nxt_msec_t        res_timeout;
26     nxt_msec_t        idle_timeout;
27     uint32_t          requests;
28     nxt_conf_value_t  *limits_value;
29     nxt_conf_value_t  *processes_value;
30 } nxt_router_app_conf_t;
31 
32 
33 typedef struct {
34     nxt_str_t         pass;
35     nxt_str_t         application;
36 } nxt_router_listener_conf_t;
37 
38 
39 #if (NXT_TLS)
40 
41 typedef struct {
42     nxt_str_t          name;
43     nxt_socket_conf_t  *conf;
44 
45     nxt_queue_link_t   link;  /* for nxt_socket_conf_t.tls */
46 } nxt_router_tlssock_t;
47 
48 #endif
49 
50 
51 typedef struct nxt_msg_info_s {
52     nxt_buf_t                 *buf;
53     nxt_port_mmap_tracking_t  tracking;
54     nxt_work_handler_t        completion_handler;
55 } nxt_msg_info_t;
56 
57 
58 typedef struct nxt_req_app_link_s nxt_req_app_link_t;
59 
60 
61 typedef struct {
62     uint32_t                 stream;
63     nxt_app_t                *app;
64     nxt_port_t               *app_port;
65     nxt_app_parse_ctx_t      *ap;
66     nxt_msg_info_t           msg_info;
67     nxt_req_app_link_t       *ra;
68 
69     nxt_queue_link_t         link;     /* for nxt_conn_t.requests */
70 } nxt_req_conn_link_t;
71 
72 
73 struct nxt_req_app_link_s {
74     uint32_t             stream;
75     nxt_atomic_t         use_count;
76     nxt_port_t           *app_port;
77     nxt_port_t           *reply_port;
78     nxt_app_parse_ctx_t  *ap;
79     nxt_msg_info_t       msg_info;
80     nxt_req_conn_link_t  *rc;
81 
82     nxt_nsec_t           res_time;
83 
84     nxt_queue_link_t     link_app_requests; /* for nxt_app_t.requests */
85     nxt_queue_link_t     link_port_pending; /* for nxt_port_t.pending_requests */
86     nxt_queue_link_t     link_app_pending;  /* for nxt_app_t.pending */
87 
88     nxt_mp_t             *mem_pool;
89     nxt_work_t           work;
90 
91     int                  err_code;
92     const char           *err_str;
93 };
94 
95 
96 typedef struct {
97     nxt_socket_conf_t       *socket_conf;
98     nxt_router_temp_conf_t  *temp_conf;
99 } nxt_socket_rpc_t;
100 
101 
102 typedef struct {
103     nxt_app_t               *app;
104     nxt_router_temp_conf_t  *temp_conf;
105 } nxt_app_rpc_t;
106 
107 
108 struct nxt_port_select_state_s {
109     nxt_app_t           *app;
110     nxt_req_app_link_t  *ra;
111 
112     nxt_port_t          *failed_port;
113     int                 failed_port_use_delta;
114 
115     uint8_t             start_process;    /* 1 bit */
116     nxt_req_app_link_t  *shared_ra;
117     nxt_port_t          *port;
118 };
119 
120 typedef struct nxt_port_select_state_s nxt_port_select_state_t;
121 
122 static void nxt_router_greet_controller(nxt_task_t *task,
123     nxt_port_t *controller_port);
124 
125 static void nxt_router_port_select(nxt_task_t *task,
126     nxt_port_select_state_t *state);
127 
128 static nxt_int_t nxt_router_port_post_select(nxt_task_t *task,
129     nxt_port_select_state_t *state);
130 
131 static nxt_int_t nxt_router_start_app_process(nxt_task_t *task, nxt_app_t *app);
132 
133 nxt_inline void
134 nxt_router_ra_inc_use(nxt_req_app_link_t *ra)
135 {
136     nxt_atomic_fetch_add(&ra->use_count, 1);
137 }
138 
139 nxt_inline void
140 nxt_router_ra_dec_use(nxt_req_app_link_t *ra)
141 {
142 #if (NXT_DEBUG)
143     int  c;
144 
145     c = nxt_atomic_fetch_add(&ra->use_count, -1);
146 
147     nxt_assert(c > 1);
148 #else
149     (void) nxt_atomic_fetch_add(&ra->use_count, -1);
150 #endif
151 }
152 
153 static void nxt_router_ra_use(nxt_task_t *task, nxt_req_app_link_t *ra, int i);
154 
155 static nxt_router_temp_conf_t *nxt_router_temp_conf(nxt_task_t *task);
156 static void nxt_router_conf_apply(nxt_task_t *task, void *obj, void *data);
157 static void nxt_router_conf_ready(nxt_task_t *task,
158     nxt_router_temp_conf_t *tmcf);
159 static void nxt_router_conf_error(nxt_task_t *task,
160     nxt_router_temp_conf_t *tmcf);
161 static void nxt_router_conf_send(nxt_task_t *task,
162     nxt_router_temp_conf_t *tmcf, nxt_port_msg_type_t type);
163 
164 static nxt_int_t nxt_router_conf_create(nxt_task_t *task,
165     nxt_router_temp_conf_t *tmcf, u_char *start, u_char *end);
166 static nxt_app_t *nxt_router_app_find(nxt_queue_t *queue, nxt_str_t *name);
167 static void nxt_router_listen_socket_rpc_create(nxt_task_t *task,
168     nxt_router_temp_conf_t *tmcf, nxt_socket_conf_t *skcf);
169 static void nxt_router_listen_socket_ready(nxt_task_t *task,
170     nxt_port_recv_msg_t *msg, void *data);
171 static void nxt_router_listen_socket_error(nxt_task_t *task,
172     nxt_port_recv_msg_t *msg, void *data);
173 #if (NXT_TLS)
174 static void nxt_router_tls_rpc_create(nxt_task_t *task,
175     nxt_router_temp_conf_t *tmcf, nxt_router_tlssock_t *tls);
176 static void nxt_router_tls_rpc_handler(nxt_task_t *task,
177     nxt_port_recv_msg_t *msg, void *data);
178 #endif
179 static void nxt_router_app_rpc_create(nxt_task_t *task,
180     nxt_router_temp_conf_t *tmcf, nxt_app_t *app);
181 static void nxt_router_app_prefork_ready(nxt_task_t *task,
182     nxt_port_recv_msg_t *msg, void *data);
183 static void nxt_router_app_prefork_error(nxt_task_t *task,
184     nxt_port_recv_msg_t *msg, void *data);
185 static nxt_socket_conf_t *nxt_router_socket_conf(nxt_task_t *task,
186     nxt_router_temp_conf_t *tmcf, nxt_str_t *name);
187 static nxt_int_t nxt_router_listen_socket_find(nxt_router_temp_conf_t *tmcf,
188     nxt_socket_conf_t *nskcf, nxt_sockaddr_t *sa);
189 
190 static nxt_int_t nxt_router_engines_create(nxt_task_t *task,
191     nxt_router_t *router, nxt_router_temp_conf_t *tmcf,
192     const nxt_event_interface_t *interface);
193 static nxt_int_t nxt_router_engine_conf_create(nxt_router_temp_conf_t *tmcf,
194     nxt_router_engine_conf_t *recf);
195 static nxt_int_t nxt_router_engine_conf_update(nxt_router_temp_conf_t *tmcf,
196     nxt_router_engine_conf_t *recf);
197 static nxt_int_t nxt_router_engine_conf_delete(nxt_router_temp_conf_t *tmcf,
198     nxt_router_engine_conf_t *recf);
199 static nxt_int_t nxt_router_engine_joints_create(nxt_router_temp_conf_t *tmcf,
200     nxt_router_engine_conf_t *recf, nxt_queue_t *sockets,
201     nxt_work_handler_t handler);
202 static nxt_int_t nxt_router_engine_quit(nxt_router_temp_conf_t *tmcf,
203     nxt_router_engine_conf_t *recf);
204 static nxt_int_t nxt_router_engine_joints_delete(nxt_router_temp_conf_t *tmcf,
205     nxt_router_engine_conf_t *recf, nxt_queue_t *sockets);
206 
207 static nxt_int_t nxt_router_threads_create(nxt_task_t *task, nxt_runtime_t *rt,
208     nxt_router_temp_conf_t *tmcf);
209 static nxt_int_t nxt_router_thread_create(nxt_task_t *task, nxt_runtime_t *rt,
210     nxt_event_engine_t *engine);
211 static void nxt_router_apps_sort(nxt_task_t *task, nxt_router_t *router,
212     nxt_router_temp_conf_t *tmcf);
213 
214 static void nxt_router_engines_post(nxt_router_t *router,
215     nxt_router_temp_conf_t *tmcf);
216 static void nxt_router_engine_post(nxt_event_engine_t *engine,
217     nxt_work_t *jobs);
218 
219 static void nxt_router_thread_start(void *data);
220 static void nxt_router_listen_socket_create(nxt_task_t *task, void *obj,
221     void *data);
222 static void nxt_router_listen_socket_update(nxt_task_t *task, void *obj,
223     void *data);
224 static void nxt_router_listen_socket_delete(nxt_task_t *task, void *obj,
225     void *data);
226 static void nxt_router_worker_thread_quit(nxt_task_t *task, void *obj,
227     void *data);
228 static void nxt_router_listen_socket_close(nxt_task_t *task, void *obj,
229     void *data);
230 static void nxt_router_thread_exit_handler(nxt_task_t *task, void *obj,
231     void *data);
232 static void nxt_router_listen_socket_release(nxt_task_t *task,
233     nxt_socket_conf_t *skcf);
234 
235 static void nxt_router_access_log_writer(nxt_task_t *task,
236     nxt_http_request_t *r, nxt_router_access_log_t *access_log);
237 static u_char *nxt_router_access_log_date(u_char *buf, nxt_realtime_t *now,
238     struct tm *tm, size_t size, const char *format);
239 static void nxt_router_access_log_open(nxt_task_t *task,
240     nxt_router_temp_conf_t *tmcf);
241 static void nxt_router_access_log_ready(nxt_task_t *task,
242     nxt_port_recv_msg_t *msg, void *data);
243 static void nxt_router_access_log_error(nxt_task_t *task,
244     nxt_port_recv_msg_t *msg, void *data);
245 static void nxt_router_access_log_release(nxt_task_t *task,
246     nxt_thread_spinlock_t *lock, nxt_router_access_log_t *access_log);
247 static void nxt_router_access_log_reopen_completion(nxt_task_t *task, void *obj,
248     void *data);
249 static void nxt_router_access_log_reopen_ready(nxt_task_t *task,
250     nxt_port_recv_msg_t *msg, void *data);
251 static void nxt_router_access_log_reopen_error(nxt_task_t *task,
252     nxt_port_recv_msg_t *msg, void *data);
253 
254 static void nxt_router_app_port_ready(nxt_task_t *task,
255     nxt_port_recv_msg_t *msg, void *data);
256 static void nxt_router_app_port_error(nxt_task_t *task,
257     nxt_port_recv_msg_t *msg, void *data);
258 
259 static void nxt_router_app_unlink(nxt_task_t *task, nxt_app_t *app);
260 static void nxt_router_app_port_release(nxt_task_t *task, nxt_port_t *port,
261     uint32_t request_failed, uint32_t got_response);
262 static nxt_int_t nxt_router_app_port(nxt_task_t *task, nxt_app_t *app,
263     nxt_req_app_link_t *ra);
264 
265 static void nxt_router_app_prepare_request(nxt_task_t *task,
266     nxt_req_app_link_t *ra);
267 static nxt_buf_t *nxt_router_prepare_msg(nxt_task_t *task, nxt_app_request_t *r,
268     nxt_port_t *port, const nxt_str_t *prefix);
269 
270 static void nxt_router_app_timeout(nxt_task_t *task, void *obj, void *data);
271 static void nxt_router_adjust_idle_timer(nxt_task_t *task, void *obj,
272     void *data);
273 static void nxt_router_app_idle_timeout(nxt_task_t *task, void *obj,
274     void *data);
275 static void nxt_router_app_joint_release_handler(nxt_task_t *task, void *obj,
276     void *data);
277 static void nxt_router_free_app(nxt_task_t *task, void *obj, void *data);
278 
279 static const nxt_http_request_state_t  nxt_http_request_send_state;
280 static void nxt_http_request_send_body(nxt_task_t *task, void *obj, void *data);
281 
282 static void nxt_router_app_joint_use(nxt_task_t *task,
283     nxt_app_joint_t *app_joint, int i);
284 
285 static nxt_router_t  *nxt_router;
286 
287 static const nxt_str_t http_prefix = nxt_string("HTTP_");
288 static const nxt_str_t empty_prefix = nxt_string("");
289 
290 static const nxt_str_t  *nxt_app_msg_prefix[] = {
291     &empty_prefix,
292     &http_prefix,
293     &http_prefix,
294     &http_prefix,
295     &http_prefix,
296 };
297 
298 
299 nxt_port_handlers_t  nxt_router_process_port_handlers = {
300     .quit         = nxt_worker_process_quit_handler,
301     .new_port     = nxt_router_new_port_handler,
302     .change_file  = nxt_port_change_log_file_handler,
303     .mmap         = nxt_port_mmap_handler,
304     .data         = nxt_router_conf_data_handler,
305     .remove_pid   = nxt_router_remove_pid_handler,
306     .access_log   = nxt_router_access_log_reopen_handler,
307     .rpc_ready    = nxt_port_rpc_handler,
308     .rpc_error    = nxt_port_rpc_handler,
309 };
310 
311 
312 nxt_int_t
313 nxt_router_start(nxt_task_t *task, void *data)
314 {
315     nxt_int_t      ret;
316     nxt_port_t     *controller_port;
317     nxt_router_t   *router;
318     nxt_runtime_t  *rt;
319 
320     rt = task->thread->runtime;
321 
322 #if (NXT_TLS)
323     rt->tls = nxt_service_get(rt->services, "SSL/TLS", "OpenSSL");
324     if (nxt_slow_path(rt->tls == NULL)) {
325         return NXT_ERROR;
326     }
327 
328     ret = rt->tls->library_init(task);
329     if (nxt_slow_path(ret != NXT_OK)) {
330         return ret;
331     }
332 #endif
333 
334     ret = nxt_http_init(task, rt);
335     if (nxt_slow_path(ret != NXT_OK)) {
336         return ret;
337     }
338 
339     router = nxt_zalloc(sizeof(nxt_router_t));
340     if (nxt_slow_path(router == NULL)) {
341         return NXT_ERROR;
342     }
343 
344     nxt_queue_init(&router->engines);
345     nxt_queue_init(&router->sockets);
346     nxt_queue_init(&router->apps);
347 
348     nxt_router = router;
349 
350     controller_port = rt->port_by_type[NXT_PROCESS_CONTROLLER];
351     if (controller_port != NULL) {
352         nxt_router_greet_controller(task, controller_port);
353     }
354 
355     return NXT_OK;
356 }
357 
358 
359 static void
360 nxt_router_greet_controller(nxt_task_t *task, nxt_port_t *controller_port)
361 {
362     nxt_port_socket_write(task, controller_port, NXT_PORT_MSG_PROCESS_READY,
363                           -1, 0, 0, NULL);
364 }
365 
366 
367 static void
368 nxt_router_start_app_process_handler(nxt_task_t *task, nxt_port_t *port,
369     void *data)
370 {
371     size_t         size;
372     uint32_t       stream;
373     nxt_mp_t       *mp;
374     nxt_int_t      ret;
375     nxt_app_t      *app;
376     nxt_buf_t      *b;
377     nxt_port_t     *main_port;
378     nxt_runtime_t  *rt;
379 
380     app = data;
381 
382     rt = task->thread->runtime;
383     main_port = rt->port_by_type[NXT_PROCESS_MAIN];
384 
385     nxt_debug(task, "app '%V' %p start process", &app->name, app);
386 
387     size = app->name.length + 1 + app->conf.length;
388 
389     b = nxt_buf_mem_ts_alloc(task, task->thread->engine->mem_pool, size);
390 
391     if (nxt_slow_path(b == NULL)) {
392         goto failed;
393     }
394 
395     nxt_buf_cpystr(b, &app->name);
396     *b->mem.free++ = '\0';
397     nxt_buf_cpystr(b, &app->conf);
398 
399     nxt_router_app_joint_use(task, app->joint, 1);
400 
401     stream = nxt_port_rpc_register_handler(task, port,
402                                            nxt_router_app_port_ready,
403                                            nxt_router_app_port_error,
404                                            -1, app->joint);
405 
406     if (nxt_slow_path(stream == 0)) {
407         nxt_router_app_joint_use(task, app->joint, -1);
408 
409         goto failed;
410     }
411 
412     ret = nxt_port_socket_write(task, main_port, NXT_PORT_MSG_START_WORKER, -1,
413                                 stream, port->id, b);
414 
415     if (nxt_slow_path(ret != NXT_OK)) {
416         nxt_port_rpc_cancel(task, port, stream);
417 
418         nxt_router_app_joint_use(task, app->joint, -1);
419 
420         goto failed;
421     }
422 
423     nxt_router_app_use(task, app, -1);
424 
425     return;
426 
427 failed:
428 
429     if (b != NULL) {
430         mp = b->data;
431         nxt_mp_free(mp, b);
432         nxt_mp_release(mp);
433     }
434 
435     nxt_thread_mutex_lock(&app->mutex);
436 
437     app->pending_processes--;
438 
439     nxt_thread_mutex_unlock(&app->mutex);
440 
441     nxt_router_app_use(task, app, -1);
442 }
443 
444 
445 static void
446 nxt_router_app_joint_use(nxt_task_t *task, nxt_app_joint_t *app_joint, int i)
447 {
448     app_joint->use_count += i;
449 
450     if (app_joint->use_count == 0) {
451         nxt_assert(app_joint->app == NULL);
452 
453         nxt_free(app_joint);
454     }
455 }
456 
457 
458 static nxt_int_t
459 nxt_router_start_app_process(nxt_task_t *task, nxt_app_t *app)
460 {
461     nxt_int_t      res;
462     nxt_port_t     *router_port;
463     nxt_runtime_t  *rt;
464 
465     rt = task->thread->runtime;
466     router_port = rt->port_by_type[NXT_PROCESS_ROUTER];
467 
468     nxt_router_app_use(task, app, 1);
469 
470     res = nxt_port_post(task, router_port, nxt_router_start_app_process_handler,
471                         app);
472 
473     if (res == NXT_OK) {
474         return res;
475     }
476 
477     nxt_thread_mutex_lock(&app->mutex);
478 
479     app->pending_processes--;
480 
481     nxt_thread_mutex_unlock(&app->mutex);
482 
483     nxt_router_app_use(task, app, -1);
484 
485     return NXT_ERROR;
486 }
487 
488 
489 nxt_inline void
490 nxt_router_ra_init(nxt_task_t *task, nxt_req_app_link_t *ra,
491     nxt_req_conn_link_t *rc)
492 {
493     nxt_event_engine_t  *engine;
494 
495     engine = task->thread->engine;
496 
497     nxt_memzero(ra, sizeof(nxt_req_app_link_t));
498 
499     ra->stream = rc->stream;
500     ra->use_count = 1;
501     ra->rc = rc;
502     rc->ra = ra;
503     ra->reply_port = engine->port;
504     ra->ap = rc->ap;
505 
506     ra->work.handler = NULL;
507     ra->work.task = &engine->task;
508     ra->work.obj = ra;
509     ra->work.data = engine;
510 }
511 
512 
513 nxt_inline nxt_req_app_link_t *
514 nxt_router_ra_create(nxt_task_t *task, nxt_req_app_link_t *ra_src)
515 {
516     nxt_mp_t            *mp;
517     nxt_req_app_link_t  *ra;
518 
519     if (ra_src->mem_pool != NULL) {
520         return ra_src;
521     }
522 
523     mp = ra_src->ap->mem_pool;
524 
525     ra = nxt_mp_alloc(mp, sizeof(nxt_req_app_link_t));
526 
527     if (nxt_slow_path(ra == NULL)) {
528 
529         ra_src->rc->ra = NULL;
530         ra_src->rc = NULL;
531 
532         return NULL;
533     }
534 
535     nxt_mp_retain(mp);
536 
537     nxt_router_ra_init(task, ra, ra_src->rc);
538 
539     ra->mem_pool = mp;
540 
541     return ra;
542 }
543 
544 
545 nxt_inline nxt_bool_t
546 nxt_router_msg_cancel(nxt_task_t *task, nxt_msg_info_t *msg_info,
547     uint32_t stream)
548 {
549     nxt_buf_t   *b, *next;
550     nxt_bool_t  cancelled;
551 
552     if (msg_info->buf == NULL) {
553         return 0;
554     }
555 
556     cancelled = nxt_port_mmap_tracking_cancel(task, &msg_info->tracking,
557                                               stream);
558 
559     if (cancelled) {
560         nxt_debug(task, "stream #%uD: cancelled by router", stream);
561     }
562 
563     for (b = msg_info->buf; b != NULL; b = next) {
564         next = b->next;
565 
566         b->completion_handler = msg_info->completion_handler;
567 
568         if (b->is_port_mmap_sent) {
569             b->is_port_mmap_sent = cancelled == 0;
570             b->completion_handler(task, b, b->parent);
571         }
572     }
573 
574     msg_info->buf = NULL;
575 
576     return cancelled;
577 }
578 
579 
580 static void
581 nxt_router_ra_update_peer(nxt_task_t *task, nxt_req_app_link_t *ra);
582 
583 
584 static void
585 nxt_router_ra_update_peer_handler(nxt_task_t *task, void *obj, void *data)
586 {
587     nxt_req_app_link_t  *ra;
588 
589     ra = obj;
590 
591     nxt_router_ra_update_peer(task, ra);
592 
593     nxt_router_ra_use(task, ra, -1);
594 }
595 
596 
597 static void
598 nxt_router_ra_update_peer(nxt_task_t *task, nxt_req_app_link_t *ra)
599 {
600     nxt_event_engine_t   *engine;
601     nxt_req_conn_link_t  *rc;
602 
603     engine = ra->work.data;
604 
605     if (task->thread->engine != engine) {
606         nxt_router_ra_inc_use(ra);
607 
608         ra->work.handler = nxt_router_ra_update_peer_handler;
609         ra->work.task = &engine->task;
610         ra->work.next = NULL;
611 
612         nxt_debug(task, "ra stream #%uD post update peer to %p",
613                   ra->stream, engine);
614 
615         nxt_event_engine_post(engine, &ra->work);
616 
617         return;
618     }
619 
620     nxt_debug(task, "ra stream #%uD update peer", ra->stream);
621 
622     rc = ra->rc;
623 
624     if (rc != NULL && ra->app_port != NULL) {
625         nxt_port_rpc_ex_set_peer(task, engine->port, rc, ra->app_port->pid);
626     }
627 
628     nxt_router_ra_use(task, ra, -1);
629 }
630 
631 
632 static void
633 nxt_router_ra_release(nxt_task_t *task, nxt_req_app_link_t *ra)
634 {
635     nxt_mp_t                *mp;
636     nxt_req_conn_link_t     *rc;
637 
638     nxt_assert(task->thread->engine == ra->work.data);
639     nxt_assert(ra->use_count == 0);
640 
641     nxt_debug(task, "ra stream #%uD release", ra->stream);
642 
643     rc = ra->rc;
644 
645     if (rc != NULL) {
646         if (nxt_slow_path(ra->err_code != 0)) {
647             nxt_http_request_error(task, rc->ap->request, ra->err_code);
648 
649         } else {
650             rc->app_port = ra->app_port;
651             rc->msg_info = ra->msg_info;
652 
653             if (rc->app->timeout != 0) {
654                 rc->ap->timer.handler = nxt_router_app_timeout;
655                 rc->ap->timer_data = rc;
656                 nxt_timer_add(task->thread->engine, &rc->ap->timer,
657                               rc->app->timeout);
658             }
659 
660             ra->app_port = NULL;
661             ra->msg_info.buf = NULL;
662         }
663 
664         rc->ra = NULL;
665         ra->rc = NULL;
666     }
667 
668     if (ra->app_port != NULL) {
669         nxt_router_app_port_release(task, ra->app_port, 0, 1);
670 
671         ra->app_port = NULL;
672     }
673 
674     nxt_router_msg_cancel(task, &ra->msg_info, ra->stream);
675 
676     mp = ra->mem_pool;
677 
678     if (mp != NULL) {
679         nxt_mp_free(mp, ra);
680         nxt_mp_release(mp);
681     }
682 }
683 
684 
685 static void
686 nxt_router_ra_release_handler(nxt_task_t *task, void *obj, void *data)
687 {
688     nxt_req_app_link_t  *ra;
689 
690     ra = obj;
691 
692     nxt_assert(ra->work.data == data);
693 
694     nxt_atomic_fetch_add(&ra->use_count, -1);
695 
696     nxt_router_ra_release(task, ra);
697 }
698 
699 
700 static void
701 nxt_router_ra_use(nxt_task_t *task, nxt_req_app_link_t *ra, int i)
702 {
703     int                 c;
704     nxt_event_engine_t  *engine;
705 
706     c = nxt_atomic_fetch_add(&ra->use_count, i);
707 
708     if (i < 0 && c == -i) {
709         engine = ra->work.data;
710 
711         if (task->thread->engine == engine) {
712             nxt_router_ra_release(task, ra);
713 
714             return;
715         }
716 
717         nxt_router_ra_inc_use(ra);
718 
719         ra->work.handler = nxt_router_ra_release_handler;
720         ra->work.task = &engine->task;
721         ra->work.next = NULL;
722 
723         nxt_debug(task, "ra stream #%uD post release to %p",
724                   ra->stream, engine);
725 
726         nxt_event_engine_post(engine, &ra->work);
727     }
728 }
729 
730 
731 nxt_inline void
732 nxt_router_ra_error(nxt_req_app_link_t *ra, int code, const char *str)
733 {
734     ra->app_port = NULL;
735     ra->err_code = code;
736     ra->err_str = str;
737 }
738 
739 
740 nxt_inline void
741 nxt_router_ra_pending(nxt_task_t *task, nxt_app_t *app, nxt_req_app_link_t *ra)
742 {
743     nxt_queue_insert_tail(&ra->app_port->pending_requests,
744                           &ra->link_port_pending);
745     nxt_queue_insert_tail(&app->pending, &ra->link_app_pending);
746 
747     nxt_router_ra_inc_use(ra);
748 
749     ra->res_time = nxt_thread_monotonic_time(task->thread) + app->res_timeout;
750 
751     nxt_debug(task, "ra stream #%uD enqueue to pending_requests", ra->stream);
752 }
753 
754 
755 nxt_inline nxt_bool_t
756 nxt_queue_chk_remove(nxt_queue_link_t *lnk)
757 {
758     if (lnk->next != NULL) {
759         nxt_queue_remove(lnk);
760 
761         lnk->next = NULL;
762 
763         return 1;
764     }
765 
766     return 0;
767 }
768 
769 
770 nxt_inline void
771 nxt_router_rc_unlink(nxt_task_t *task, nxt_req_conn_link_t *rc)
772 {
773     int                 ra_use_delta;
774     nxt_req_app_link_t  *ra;
775 
776     if (rc->app_port != NULL) {
777         nxt_router_app_port_release(task, rc->app_port, 0, 1);
778 
779         rc->app_port = NULL;
780     }
781 
782     nxt_router_msg_cancel(task, &rc->msg_info, rc->stream);
783 
784     ra = rc->ra;
785 
786     if (ra != NULL) {
787         rc->ra = NULL;
788         ra->rc = NULL;
789 
790         ra_use_delta = 0;
791 
792         nxt_thread_mutex_lock(&rc->app->mutex);
793 
794         if (ra->link_app_requests.next == NULL
795             && ra->link_port_pending.next == NULL
796             && ra->link_app_pending.next == NULL)
797         {
798             ra = NULL;
799 
800         } else {
801             ra_use_delta -= nxt_queue_chk_remove(&ra->link_app_requests);
802             ra_use_delta -= nxt_queue_chk_remove(&ra->link_port_pending);
803             nxt_queue_chk_remove(&ra->link_app_pending);
804         }
805 
806         nxt_thread_mutex_unlock(&rc->app->mutex);
807 
808         if (ra != NULL) {
809             nxt_router_ra_use(task, ra, ra_use_delta);
810         }
811     }
812 
813     if (rc->app != NULL) {
814         nxt_router_app_use(task, rc->app, -1);
815 
816         rc->app = NULL;
817     }
818 
819     if (rc->ap != NULL) {
820         rc->ap->timer_data = NULL;
821 
822         nxt_app_http_req_done(task, rc->ap);
823 
824         rc->ap = NULL;
825     }
826 }
827 
828 
829 void
830 nxt_router_new_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
831 {
832     nxt_port_new_port_handler(task, msg);
833 
834     if (msg->u.new_port != NULL
835         && msg->u.new_port->type == NXT_PROCESS_CONTROLLER)
836     {
837         nxt_router_greet_controller(task, msg->u.new_port);
838     }
839 
840     if (msg->port_msg.stream == 0) {
841         return;
842     }
843 
844     if (msg->u.new_port == NULL
845         || msg->u.new_port->type != NXT_PROCESS_WORKER)
846     {
847         msg->port_msg.type = _NXT_PORT_MSG_RPC_ERROR;
848     }
849 
850     nxt_port_rpc_handler(task, msg);
851 }
852 
853 
854 void
855 nxt_router_conf_data_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
856 {
857     nxt_int_t               ret;
858     nxt_buf_t               *b;
859     nxt_router_temp_conf_t  *tmcf;
860 
861     tmcf = nxt_router_temp_conf(task);
862     if (nxt_slow_path(tmcf == NULL)) {
863         return;
864     }
865 
866     nxt_debug(task, "nxt_router_conf_data_handler(%O): %*s",
867               nxt_buf_used_size(msg->buf),
868               (size_t) nxt_buf_used_size(msg->buf), msg->buf->mem.pos);
869 
870     tmcf->router_conf->router = nxt_router;
871     tmcf->stream = msg->port_msg.stream;
872     tmcf->port = nxt_runtime_port_find(task->thread->runtime,
873                                        msg->port_msg.pid,
874                                        msg->port_msg.reply_port);
875 
876     if (nxt_slow_path(tmcf->port == NULL)) {
877         nxt_alert(task, "reply port not found");
878 
879         return;
880     }
881 
882     nxt_port_use(task, tmcf->port, 1);
883 
884     b = nxt_buf_chk_make_plain(tmcf->router_conf->mem_pool,
885                                msg->buf, msg->size);
886     if (nxt_slow_path(b == NULL)) {
887         nxt_router_conf_error(task, tmcf);
888 
889         return;
890     }
891 
892     ret = nxt_router_conf_create(task, tmcf, b->mem.pos, b->mem.free);
893 
894     if (nxt_fast_path(ret == NXT_OK)) {
895         nxt_router_conf_apply(task, tmcf, NULL);
896 
897     } else {
898         nxt_router_conf_error(task, tmcf);
899     }
900 }
901 
902 
903 static void
904 nxt_router_app_process_remove_pid(nxt_task_t *task, nxt_port_t *port,
905     void *data)
906 {
907     union {
908         nxt_pid_t  removed_pid;
909         void       *data;
910     } u;
911 
912     u.data = data;
913 
914     nxt_port_rpc_remove_peer(task, port, u.removed_pid);
915 }
916 
917 
918 void
919 nxt_router_remove_pid_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
920 {
921     nxt_event_engine_t  *engine;
922 
923     nxt_port_remove_pid_handler(task, msg);
924 
925     if (msg->port_msg.stream == 0) {
926         return;
927     }
928 
929     nxt_queue_each(engine, &nxt_router->engines, nxt_event_engine_t, link0)
930     {
931         nxt_port_post(task, engine->port, nxt_router_app_process_remove_pid,
932                       msg->u.data);
933     }
934     nxt_queue_loop;
935 
936     msg->port_msg.type = _NXT_PORT_MSG_RPC_ERROR;
937 
938     nxt_port_rpc_handler(task, msg);
939 }
940 
941 
942 static nxt_router_temp_conf_t *
943 nxt_router_temp_conf(nxt_task_t *task)
944 {
945     nxt_mp_t                *mp, *tmp;
946     nxt_router_conf_t       *rtcf;
947     nxt_router_temp_conf_t  *tmcf;
948 
949     mp = nxt_mp_create(1024, 128, 256, 32);
950     if (nxt_slow_path(mp == NULL)) {
951         return NULL;
952     }
953 
954     rtcf = nxt_mp_zget(mp, sizeof(nxt_router_conf_t));
955     if (nxt_slow_path(rtcf == NULL)) {
956         goto fail;
957     }
958 
959     rtcf->mem_pool = mp;
960 
961     tmp = nxt_mp_create(1024, 128, 256, 32);
962     if (nxt_slow_path(tmp == NULL)) {
963         goto fail;
964     }
965 
966     tmcf = nxt_mp_zget(tmp, sizeof(nxt_router_temp_conf_t));
967     if (nxt_slow_path(tmcf == NULL)) {
968         goto temp_fail;
969     }
970 
971     tmcf->mem_pool = tmp;
972     tmcf->router_conf = rtcf;
973     tmcf->count = 1;
974     tmcf->engine = task->thread->engine;
975 
976     tmcf->engines = nxt_array_create(tmcf->mem_pool, 4,
977                                      sizeof(nxt_router_engine_conf_t));
978     if (nxt_slow_path(tmcf->engines == NULL)) {
979         goto temp_fail;
980     }
981 
982     nxt_queue_init(&tmcf->deleting);
983     nxt_queue_init(&tmcf->keeping);
984     nxt_queue_init(&tmcf->updating);
985     nxt_queue_init(&tmcf->pending);
986     nxt_queue_init(&tmcf->creating);
987 
988 #if (NXT_TLS)
989     nxt_queue_init(&tmcf->tls);
990 #endif
991 
992     nxt_queue_init(&tmcf->apps);
993     nxt_queue_init(&tmcf->previous);
994 
995     return tmcf;
996 
997 temp_fail:
998 
999     nxt_mp_destroy(tmp);
1000 
1001 fail:
1002 
1003     nxt_mp_destroy(mp);
1004 
1005     return NULL;
1006 }
1007 
1008 
1009 nxt_inline nxt_bool_t
1010 nxt_router_app_can_start(nxt_app_t *app)
1011 {
1012     return app->processes + app->pending_processes < app->max_processes
1013             && app->pending_processes < app->max_pending_processes;
1014 }
1015 
1016 
1017 nxt_inline nxt_bool_t
1018 nxt_router_app_need_start(nxt_app_t *app)
1019 {
1020     return app->idle_processes + app->pending_processes
1021             < app->spare_processes;
1022 }
1023 
1024 
1025 static void
1026 nxt_router_conf_apply(nxt_task_t *task, void *obj, void *data)
1027 {
1028     nxt_int_t                    ret;
1029     nxt_app_t                    *app;
1030     nxt_router_t                 *router;
1031     nxt_runtime_t                *rt;
1032     nxt_queue_link_t             *qlk;
1033     nxt_socket_conf_t            *skcf;
1034     nxt_router_conf_t            *rtcf;
1035     nxt_router_temp_conf_t       *tmcf;
1036     const nxt_event_interface_t  *interface;
1037 #if (NXT_TLS)
1038     nxt_router_tlssock_t         *tls;
1039 #endif
1040 
1041     tmcf = obj;
1042 
1043     qlk = nxt_queue_first(&tmcf->pending);
1044 
1045     if (qlk != nxt_queue_tail(&tmcf->pending)) {
1046         nxt_queue_remove(qlk);
1047         nxt_queue_insert_tail(&tmcf->creating, qlk);
1048 
1049         skcf = nxt_queue_link_data(qlk, nxt_socket_conf_t, link);
1050 
1051         nxt_router_listen_socket_rpc_create(task, tmcf, skcf);
1052 
1053         return;
1054     }
1055 
1056 #if (NXT_TLS)
1057     qlk = nxt_queue_first(&tmcf->tls);
1058 
1059     if (qlk != nxt_queue_tail(&tmcf->tls)) {
1060         nxt_queue_remove(qlk);
1061 
1062         tls = nxt_queue_link_data(qlk, nxt_router_tlssock_t, link);
1063 
1064         nxt_router_tls_rpc_create(task, tmcf, tls);
1065         return;
1066     }
1067 #endif
1068 
1069     nxt_queue_each(app, &tmcf->apps, nxt_app_t, link) {
1070 
1071         if (nxt_router_app_need_start(app)) {
1072             nxt_router_app_rpc_create(task, tmcf, app);
1073             return;
1074         }
1075 
1076     } nxt_queue_loop;
1077 
1078     rtcf = tmcf->router_conf;
1079 
1080     if (rtcf->access_log != NULL && rtcf->access_log->fd == -1) {
1081         nxt_router_access_log_open(task, tmcf);
1082         return;
1083     }
1084 
1085     rt = task->thread->runtime;
1086 
1087     interface = nxt_service_get(rt->services, "engine", NULL);
1088 
1089     router = rtcf->router;
1090 
1091     ret = nxt_router_engines_create(task, router, tmcf, interface);
1092     if (nxt_slow_path(ret != NXT_OK)) {
1093         goto fail;
1094     }
1095 
1096     ret = nxt_router_threads_create(task, rt, tmcf);
1097     if (nxt_slow_path(ret != NXT_OK)) {
1098         goto fail;
1099     }
1100 
1101     nxt_router_apps_sort(task, router, tmcf);
1102 
1103     nxt_router_engines_post(router, tmcf);
1104 
1105     nxt_queue_add(&router->sockets, &tmcf->updating);
1106     nxt_queue_add(&router->sockets, &tmcf->creating);
1107 
1108     router->access_log = rtcf->access_log;
1109 
1110     nxt_router_conf_ready(task, tmcf);
1111 
1112     return;
1113 
1114 fail:
1115 
1116     nxt_router_conf_error(task, tmcf);
1117 
1118     return;
1119 }
1120 
1121 
1122 static void
1123 nxt_router_conf_wait(nxt_task_t *task, void *obj, void *data)
1124 {
1125     nxt_joint_job_t  *job;
1126 
1127     job = obj;
1128 
1129     nxt_router_conf_ready(task, job->tmcf);
1130 }
1131 
1132 
1133 static void
1134 nxt_router_conf_ready(nxt_task_t *task, nxt_router_temp_conf_t *tmcf)
1135 {
1136     nxt_debug(task, "temp conf count:%D", tmcf->count);
1137 
1138     if (--tmcf->count == 0) {
1139         nxt_router_conf_send(task, tmcf, NXT_PORT_MSG_RPC_READY_LAST);
1140     }
1141 }
1142 
1143 
1144 static void
1145 nxt_router_conf_error(nxt_task_t *task, nxt_router_temp_conf_t *tmcf)
1146 {
1147     nxt_app_t          *app;
1148     nxt_queue_t        new_socket_confs;
1149     nxt_socket_t       s;
1150     nxt_router_t       *router;
1151     nxt_queue_link_t   *qlk;
1152     nxt_socket_conf_t  *skcf;
1153     nxt_router_conf_t  *rtcf;
1154 
1155     nxt_alert(task, "failed to apply new conf");
1156 
1157     for (qlk = nxt_queue_first(&tmcf->creating);
1158          qlk != nxt_queue_tail(&tmcf->creating);
1159          qlk = nxt_queue_next(qlk))
1160     {
1161         skcf = nxt_queue_link_data(qlk, nxt_socket_conf_t, link);
1162         s = skcf->listen->socket;
1163 
1164         if (s != -1) {
1165             nxt_socket_close(task, s);
1166         }
1167 
1168         nxt_free(skcf->listen);
1169     }
1170 
1171     nxt_queue_init(&new_socket_confs);
1172     nxt_queue_add(&new_socket_confs, &tmcf->updating);
1173     nxt_queue_add(&new_socket_confs, &tmcf->pending);
1174     nxt_queue_add(&new_socket_confs, &tmcf->creating);
1175 
1176     rtcf = tmcf->router_conf;
1177 
1178     nxt_http_routes_cleanup(task, rtcf->routes);
1179 
1180     nxt_queue_each(skcf, &new_socket_confs, nxt_socket_conf_t, link) {
1181 
1182         if (skcf->pass != NULL) {
1183             nxt_http_pass_cleanup(task, skcf->pass);
1184         }
1185 
1186     } nxt_queue_loop;
1187 
1188     nxt_queue_each(app, &tmcf->apps, nxt_app_t, link) {
1189 
1190         nxt_router_app_unlink(task, app);
1191 
1192     } nxt_queue_loop;
1193 
1194     router = rtcf->router;
1195 
1196     nxt_queue_add(&router->sockets, &tmcf->keeping);
1197     nxt_queue_add(&router->sockets, &tmcf->deleting);
1198 
1199     nxt_queue_add(&router->apps, &tmcf->previous);
1200 
1201     // TODO: new engines and threads
1202 
1203     nxt_router_access_log_release(task, &router->lock, rtcf->access_log);
1204 
1205     nxt_mp_destroy(rtcf->mem_pool);
1206 
1207     nxt_router_conf_send(task, tmcf, NXT_PORT_MSG_RPC_ERROR);
1208 }
1209 
1210 
1211 static void
1212 nxt_router_conf_send(nxt_task_t *task, nxt_router_temp_conf_t *tmcf,
1213     nxt_port_msg_type_t type)
1214 {
1215     nxt_port_socket_write(task, tmcf->port, type, -1, tmcf->stream, 0, NULL);
1216 
1217     nxt_port_use(task, tmcf->port, -1);
1218 
1219     tmcf->port = NULL;
1220 }
1221 
1222 
1223 static nxt_conf_map_t  nxt_router_conf[] = {
1224     {
1225         nxt_string("listeners_threads"),
1226         NXT_CONF_MAP_INT32,
1227         offsetof(nxt_router_conf_t, threads),
1228     },
1229 };
1230 
1231 
1232 static nxt_conf_map_t  nxt_router_app_conf[] = {
1233     {
1234         nxt_string("type"),
1235         NXT_CONF_MAP_STR,
1236         offsetof(nxt_router_app_conf_t, type),
1237     },
1238 
1239     {
1240         nxt_string("limits"),
1241         NXT_CONF_MAP_PTR,
1242         offsetof(nxt_router_app_conf_t, limits_value),
1243     },
1244 
1245     {
1246         nxt_string("processes"),
1247         NXT_CONF_MAP_INT32,
1248         offsetof(nxt_router_app_conf_t, processes),
1249     },
1250 
1251     {
1252         nxt_string("processes"),
1253         NXT_CONF_MAP_PTR,
1254         offsetof(nxt_router_app_conf_t, processes_value),
1255     },
1256 };
1257 
1258 
1259 static nxt_conf_map_t  nxt_router_app_limits_conf[] = {
1260     {
1261         nxt_string("timeout"),
1262         NXT_CONF_MAP_MSEC,
1263         offsetof(nxt_router_app_conf_t, timeout),
1264     },
1265 
1266     {
1267         nxt_string("reschedule_timeout"),
1268         NXT_CONF_MAP_MSEC,
1269         offsetof(nxt_router_app_conf_t, res_timeout),
1270     },
1271 
1272     {
1273         nxt_string("requests"),
1274         NXT_CONF_MAP_INT32,
1275         offsetof(nxt_router_app_conf_t, requests),
1276     },
1277 };
1278 
1279 
1280 static nxt_conf_map_t  nxt_router_app_processes_conf[] = {
1281     {
1282         nxt_string("spare"),
1283         NXT_CONF_MAP_INT32,
1284         offsetof(nxt_router_app_conf_t, spare_processes),
1285     },
1286 
1287     {
1288         nxt_string("max"),
1289         NXT_CONF_MAP_INT32,
1290         offsetof(nxt_router_app_conf_t, max_processes),
1291     },
1292 
1293     {
1294         nxt_string("idle_timeout"),
1295         NXT_CONF_MAP_MSEC,
1296         offsetof(nxt_router_app_conf_t, idle_timeout),
1297     },
1298 };
1299 
1300 
1301 static nxt_conf_map_t  nxt_router_listener_conf[] = {
1302     {
1303         nxt_string("pass"),
1304         NXT_CONF_MAP_STR_COPY,
1305         offsetof(nxt_router_listener_conf_t, pass),
1306     },
1307 
1308     {
1309         nxt_string("application"),
1310         NXT_CONF_MAP_STR_COPY,
1311         offsetof(nxt_router_listener_conf_t, application),
1312     },
1313 };
1314 
1315 
1316 static nxt_conf_map_t  nxt_router_http_conf[] = {
1317     {
1318         nxt_string("header_buffer_size"),
1319         NXT_CONF_MAP_SIZE,
1320         offsetof(nxt_socket_conf_t, header_buffer_size),
1321     },
1322 
1323     {
1324         nxt_string("large_header_buffer_size"),
1325         NXT_CONF_MAP_SIZE,
1326         offsetof(nxt_socket_conf_t, large_header_buffer_size),
1327     },
1328 
1329     {
1330         nxt_string("large_header_buffers"),
1331         NXT_CONF_MAP_SIZE,
1332         offsetof(nxt_socket_conf_t, large_header_buffers),
1333     },
1334 
1335     {
1336         nxt_string("body_buffer_size"),
1337         NXT_CONF_MAP_SIZE,
1338         offsetof(nxt_socket_conf_t, body_buffer_size),
1339     },
1340 
1341     {
1342         nxt_string("max_body_size"),
1343         NXT_CONF_MAP_SIZE,
1344         offsetof(nxt_socket_conf_t, max_body_size),
1345     },
1346 
1347     {
1348         nxt_string("idle_timeout"),
1349         NXT_CONF_MAP_MSEC,
1350         offsetof(nxt_socket_conf_t, idle_timeout),
1351     },
1352 
1353     {
1354         nxt_string("header_read_timeout"),
1355         NXT_CONF_MAP_MSEC,
1356         offsetof(nxt_socket_conf_t, header_read_timeout),
1357     },
1358 
1359     {
1360         nxt_string("body_read_timeout"),
1361         NXT_CONF_MAP_MSEC,
1362         offsetof(nxt_socket_conf_t, body_read_timeout),
1363     },
1364 
1365     {
1366         nxt_string("send_timeout"),
1367         NXT_CONF_MAP_MSEC,
1368         offsetof(nxt_socket_conf_t, send_timeout),
1369     },
1370 };
1371 
1372 
1373 static nxt_int_t
1374 nxt_router_conf_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf,
1375     u_char *start, u_char *end)
1376 {
1377     u_char                      *p;
1378     size_t                      size;
1379     nxt_mp_t                    *mp;
1380     uint32_t                    next;
1381     nxt_int_t                   ret;
1382     nxt_str_t                   name, path;
1383     nxt_app_t                   *app, *prev;
1384     nxt_router_t                *router;
1385     nxt_app_joint_t             *app_joint;
1386     nxt_conf_value_t            *conf, *http, *value;
1387     nxt_conf_value_t            *applications, *application;
1388     nxt_conf_value_t            *listeners, *listener;
1389     nxt_conf_value_t            *routes_conf;
1390     nxt_socket_conf_t           *skcf;
1391     nxt_http_routes_t           *routes;
1392     nxt_event_engine_t          *engine;
1393     nxt_app_lang_module_t       *lang;
1394     nxt_router_app_conf_t       apcf;
1395     nxt_router_access_log_t     *access_log;
1396     nxt_router_listener_conf_t  lscf;
1397 #if (NXT_TLS)
1398     nxt_router_tlssock_t        *tls;
1399 #endif
1400 
1401     static nxt_str_t  http_path = nxt_string("/settings/http");
1402     static nxt_str_t  applications_path = nxt_string("/applications");
1403     static nxt_str_t  listeners_path = nxt_string("/listeners");
1404     static nxt_str_t  routes_path = nxt_string("/routes");
1405     static nxt_str_t  access_log_path = nxt_string("/access_log");
1406 #if (NXT_TLS)
1407     static nxt_str_t  certificate_path = nxt_string("/tls/certificate");
1408 #endif
1409 
1410     conf = nxt_conf_json_parse(tmcf->mem_pool, start, end, NULL);
1411     if (conf == NULL) {
1412         nxt_alert(task, "configuration parsing error");
1413         return NXT_ERROR;
1414     }
1415 
1416     mp = tmcf->router_conf->mem_pool;
1417 
1418     ret = nxt_conf_map_object(mp, conf, nxt_router_conf,
1419                               nxt_nitems(nxt_router_conf), tmcf->router_conf);
1420     if (ret != NXT_OK) {
1421         nxt_alert(task, "root map error");
1422         return NXT_ERROR;
1423     }
1424 
1425     if (tmcf->router_conf->threads == 0) {
1426         tmcf->router_conf->threads = nxt_ncpu;
1427     }
1428 
1429     applications = nxt_conf_get_path(conf, &applications_path);
1430     if (applications == NULL) {
1431         nxt_alert(task, "no \"applications\" block");
1432         return NXT_ERROR;
1433     }
1434 
1435     router = tmcf->router_conf->router;
1436 
1437     next = 0;
1438 
1439     for ( ;; ) {
1440         application = nxt_conf_next_object_member(applications, &name, &next);
1441         if (application == NULL) {
1442             break;
1443         }
1444 
1445         nxt_debug(task, "application \"%V\"", &name);
1446 
1447         size = nxt_conf_json_length(application, NULL);
1448 
1449         app = nxt_malloc(sizeof(nxt_app_t) + name.length + size);
1450         if (app == NULL) {
1451             goto fail;
1452         }
1453 
1454         nxt_memzero(app, sizeof(nxt_app_t));
1455 
1456         app->name.start = nxt_pointer_to(app, sizeof(nxt_app_t));
1457         app->conf.start = nxt_pointer_to(app, sizeof(nxt_app_t) + name.length);
1458 
1459         p = nxt_conf_json_print(app->conf.start, application, NULL);
1460         app->conf.length = p - app->conf.start;
1461 
1462         nxt_assert(app->conf.length <= size);
1463 
1464         nxt_debug(task, "application conf \"%V\"", &app->conf);
1465 
1466         prev = nxt_router_app_find(&router->apps, &name);
1467 
1468         if (prev != NULL && nxt_strstr_eq(&app->conf, &prev->conf)) {
1469             nxt_free(app);
1470 
1471             nxt_queue_remove(&prev->link);
1472             nxt_queue_insert_tail(&tmcf->previous, &prev->link);
1473             continue;
1474         }
1475 
1476         apcf.processes = 1;
1477         apcf.max_processes = 1;
1478         apcf.spare_processes = 0;
1479         apcf.timeout = 0;
1480         apcf.res_timeout = 1000;
1481         apcf.idle_timeout = 15000;
1482         apcf.requests = 0;
1483         apcf.limits_value = NULL;
1484         apcf.processes_value = NULL;
1485 
1486         app_joint = nxt_malloc(sizeof(nxt_app_joint_t));
1487         if (nxt_slow_path(app_joint == NULL)) {
1488             goto app_fail;
1489         }
1490 
1491         nxt_memzero(app_joint, sizeof(nxt_app_joint_t));
1492 
1493         ret = nxt_conf_map_object(mp, application, nxt_router_app_conf,
1494                                   nxt_nitems(nxt_router_app_conf), &apcf);
1495         if (ret != NXT_OK) {
1496             nxt_alert(task, "application map error");
1497             goto app_fail;
1498         }
1499 
1500         if (apcf.limits_value != NULL) {
1501 
1502             if (nxt_conf_type(apcf.limits_value) != NXT_CONF_OBJECT) {
1503                 nxt_alert(task, "application limits is not object");
1504                 goto app_fail;
1505             }
1506 
1507             ret = nxt_conf_map_object(mp, apcf.limits_value,
1508                                       nxt_router_app_limits_conf,
1509                                       nxt_nitems(nxt_router_app_limits_conf),
1510                                       &apcf);
1511             if (ret != NXT_OK) {
1512                 nxt_alert(task, "application limits map error");
1513                 goto app_fail;
1514             }
1515         }
1516 
1517         if (apcf.processes_value != NULL
1518             && nxt_conf_type(apcf.processes_value) == NXT_CONF_OBJECT)
1519         {
1520             ret = nxt_conf_map_object(mp, apcf.processes_value,
1521                                       nxt_router_app_processes_conf,
1522                                       nxt_nitems(nxt_router_app_processes_conf),
1523                                       &apcf);
1524             if (ret != NXT_OK) {
1525                 nxt_alert(task, "application processes map error");
1526                 goto app_fail;
1527             }
1528 
1529         } else {
1530             apcf.max_processes = apcf.processes;
1531             apcf.spare_processes = apcf.processes;
1532         }
1533 
1534         nxt_debug(task, "application type: %V", &apcf.type);
1535         nxt_debug(task, "application processes: %D", apcf.processes);
1536         nxt_debug(task, "application request timeout: %M", apcf.timeout);
1537         nxt_debug(task, "application reschedule timeout: %M", apcf.res_timeout);
1538         nxt_debug(task, "application requests: %D", apcf.requests);
1539 
1540         lang = nxt_app_lang_module(task->thread->runtime, &apcf.type);
1541 
1542         if (lang == NULL) {
1543             nxt_alert(task, "unknown application type: \"%V\"", &apcf.type);
1544             goto app_fail;
1545         }
1546 
1547         nxt_debug(task, "application language module: \"%s\"", lang->file);
1548 
1549         ret = nxt_thread_mutex_create(&app->mutex);
1550         if (ret != NXT_OK) {
1551             goto app_fail;
1552         }
1553 
1554         nxt_queue_init(&app->ports);
1555         nxt_queue_init(&app->spare_ports);
1556         nxt_queue_init(&app->idle_ports);
1557         nxt_queue_init(&app->requests);
1558         nxt_queue_init(&app->pending);
1559 
1560         app->name.length = name.length;
1561         nxt_memcpy(app->name.start, name.start, name.length);
1562 
1563         app->type = lang->type;
1564         app->max_processes = apcf.max_processes;
1565         app->spare_processes = apcf.spare_processes;
1566         app->max_pending_processes = apcf.spare_processes
1567                                       ? apcf.spare_processes : 1;
1568         app->timeout = apcf.timeout;
1569         app->res_timeout = apcf.res_timeout * 1000000;
1570         app->idle_timeout = apcf.idle_timeout;
1571         app->max_pending_responses = 2;
1572         app->max_requests = apcf.requests;
1573 
1574         engine = task->thread->engine;
1575 
1576         app->engine = engine;
1577 
1578         app->adjust_idle_work.handler = nxt_router_adjust_idle_timer;
1579         app->adjust_idle_work.task = &engine->task;
1580         app->adjust_idle_work.obj = app;
1581 
1582         nxt_queue_insert_tail(&tmcf->apps, &app->link);
1583 
1584         nxt_router_app_use(task, app, 1);
1585 
1586         app->joint = app_joint;
1587 
1588         app_joint->use_count = 1;
1589         app_joint->app = app;
1590 
1591         app_joint->idle_timer.bias = NXT_TIMER_DEFAULT_BIAS;
1592         app_joint->idle_timer.work_queue = &engine->fast_work_queue;
1593         app_joint->idle_timer.handler = nxt_router_app_idle_timeout;
1594         app_joint->idle_timer.task = &engine->task;
1595         app_joint->idle_timer.log = app_joint->idle_timer.task->log;
1596 
1597         app_joint->free_app_work.handler = nxt_router_free_app;
1598         app_joint->free_app_work.task = &engine->task;
1599         app_joint->free_app_work.obj = app_joint;
1600     }
1601 
1602     routes_conf = nxt_conf_get_path(conf, &routes_path);
1603     if (nxt_fast_path(routes_conf != NULL)) {
1604         routes = nxt_http_routes_create(task, tmcf, routes_conf);
1605         if (nxt_slow_path(routes == NULL)) {
1606             return NXT_ERROR;
1607         }
1608         tmcf->router_conf->routes = routes;
1609     }
1610 
1611     http = nxt_conf_get_path(conf, &http_path);
1612 #if 0
1613     if (http == NULL) {
1614         nxt_alert(task, "no \"http\" block");
1615         return NXT_ERROR;
1616     }
1617 #endif
1618 
1619     listeners = nxt_conf_get_path(conf, &listeners_path);
1620     if (listeners == NULL) {
1621         nxt_alert(task, "no \"listeners\" block");
1622         return NXT_ERROR;
1623     }
1624 
1625     next = 0;
1626 
1627     for ( ;; ) {
1628         listener = nxt_conf_next_object_member(listeners, &name, &next);
1629         if (listener == NULL) {
1630             break;
1631         }
1632 
1633         skcf = nxt_router_socket_conf(task, tmcf, &name);
1634         if (skcf == NULL) {
1635             goto fail;
1636         }
1637 
1638         nxt_memzero(&lscf, sizeof(lscf));
1639 
1640         ret = nxt_conf_map_object(mp, listener, nxt_router_listener_conf,
1641                                   nxt_nitems(nxt_router_listener_conf), &lscf);
1642         if (ret != NXT_OK) {
1643             nxt_alert(task, "listener map error");
1644             goto fail;
1645         }
1646 
1647         nxt_debug(task, "application: %V", &lscf.application);
1648 
1649         // STUB, default values if http block is not defined.
1650         skcf->header_buffer_size = 2048;
1651         skcf->large_header_buffer_size = 8192;
1652         skcf->large_header_buffers = 4;
1653         skcf->body_buffer_size = 16 * 1024;
1654         skcf->max_body_size = 8 * 1024 * 1024;
1655         skcf->idle_timeout = 180 * 1000;
1656         skcf->header_read_timeout = 30 * 1000;
1657         skcf->body_read_timeout = 30 * 1000;
1658         skcf->send_timeout = 30 * 1000;
1659 
1660         if (http != NULL) {
1661             ret = nxt_conf_map_object(mp, http, nxt_router_http_conf,
1662                                       nxt_nitems(nxt_router_http_conf), skcf);
1663             if (ret != NXT_OK) {
1664                 nxt_alert(task, "http map error");
1665                 goto fail;
1666             }
1667         }
1668 
1669 #if (NXT_TLS)
1670 
1671         value = nxt_conf_get_path(listener, &certificate_path);
1672 
1673         if (value != NULL) {
1674             nxt_conf_get_string(value, &name);
1675 
1676             tls = nxt_mp_get(mp, sizeof(nxt_router_tlssock_t));
1677             if (nxt_slow_path(tls == NULL)) {
1678                 goto fail;
1679             }
1680 
1681             tls->name = name;
1682             tls->conf = skcf;
1683 
1684             nxt_queue_insert_tail(&tmcf->tls, &tls->link);
1685         }
1686 
1687 #endif
1688 
1689         skcf->listen->handler = nxt_http_conn_init;
1690         skcf->router_conf = tmcf->router_conf;
1691         skcf->router_conf->count++;
1692 
1693         if (lscf.pass.length != 0) {
1694             skcf->pass = nxt_http_pass_create(task, tmcf, &lscf.pass);
1695 
1696         /* COMPATIBILITY: listener application. */
1697         } else if (lscf.application.length > 0) {
1698             skcf->pass = nxt_http_pass_application(task, tmcf,
1699                                                    &lscf.application);
1700         }
1701     }
1702 
1703     value = nxt_conf_get_path(conf, &access_log_path);
1704 
1705     if (value != NULL) {
1706         nxt_conf_get_string(value, &path);
1707 
1708         access_log = router->access_log;
1709 
1710         if (access_log != NULL && nxt_strstr_eq(&path, &access_log->path)) {
1711             nxt_thread_spin_lock(&router->lock);
1712             access_log->count++;
1713             nxt_thread_spin_unlock(&router->lock);
1714 
1715         } else {
1716             access_log = nxt_malloc(sizeof(nxt_router_access_log_t)
1717                                     + path.length);
1718             if (access_log == NULL) {
1719                 nxt_alert(task, "failed to allocate access log structure");
1720                 goto fail;
1721             }
1722 
1723             access_log->fd = -1;
1724             access_log->handler = &nxt_router_access_log_writer;
1725             access_log->count = 1;
1726 
1727             access_log->path.length = path.length;
1728             access_log->path.start = (u_char *) access_log
1729                                      + sizeof(nxt_router_access_log_t);
1730 
1731             nxt_memcpy(access_log->path.start, path.start, path.length);
1732         }
1733 
1734         tmcf->router_conf->access_log = access_log;
1735     }
1736 
1737     nxt_http_routes_resolve(task, tmcf);
1738 
1739     nxt_queue_add(&tmcf->deleting, &router->sockets);
1740     nxt_queue_init(&router->sockets);
1741 
1742     return NXT_OK;
1743 
1744 app_fail:
1745 
1746     nxt_free(app);
1747 
1748 fail:
1749 
1750     nxt_queue_each(app, &tmcf->apps, nxt_app_t, link) {
1751 
1752         nxt_queue_remove(&app->link);
1753         nxt_thread_mutex_destroy(&app->mutex);
1754         nxt_free(app);
1755 
1756     } nxt_queue_loop;
1757 
1758     return NXT_ERROR;
1759 }
1760 
1761 
1762 static nxt_app_t *
1763 nxt_router_app_find(nxt_queue_t *queue, nxt_str_t *name)
1764 {
1765     nxt_app_t  *app;
1766 
1767     nxt_queue_each(app, queue, nxt_app_t, link) {
1768 
1769         if (nxt_strstr_eq(name, &app->name)) {
1770             return app;
1771         }
1772 
1773     } nxt_queue_loop;
1774 
1775     return NULL;
1776 }
1777 
1778 
1779 nxt_app_t *
1780 nxt_router_listener_application(nxt_router_temp_conf_t *tmcf, nxt_str_t *name)
1781 {
1782     nxt_app_t  *app;
1783 
1784     app = nxt_router_app_find(&tmcf->apps, name);
1785 
1786     if (app == NULL) {
1787         app = nxt_router_app_find(&tmcf->previous, name);
1788     }
1789 
1790     return app;
1791 }
1792 
1793 
1794 static nxt_socket_conf_t *
1795 nxt_router_socket_conf(nxt_task_t *task, nxt_router_temp_conf_t *tmcf,
1796     nxt_str_t *name)
1797 {
1798     size_t               size;
1799     nxt_int_t            ret;
1800     nxt_bool_t           wildcard;
1801     nxt_sockaddr_t       *sa;
1802     nxt_socket_conf_t    *skcf;
1803     nxt_listen_socket_t  *ls;
1804 
1805     sa = nxt_sockaddr_parse(tmcf->mem_pool, name);
1806     if (nxt_slow_path(sa == NULL)) {
1807         nxt_alert(task, "invalid listener \"%V\"", name);
1808         return NULL;
1809     }
1810 
1811     sa->type = SOCK_STREAM;
1812 
1813     nxt_debug(task, "router listener: \"%*s\"",
1814               (size_t) sa->length, nxt_sockaddr_start(sa));
1815 
1816     skcf = nxt_mp_zget(tmcf->router_conf->mem_pool, sizeof(nxt_socket_conf_t));
1817     if (nxt_slow_path(skcf == NULL)) {
1818         return NULL;
1819     }
1820 
1821     size = nxt_sockaddr_size(sa);
1822 
1823     ret = nxt_router_listen_socket_find(tmcf, skcf, sa);
1824 
1825     if (ret != NXT_OK) {
1826 
1827         ls = nxt_zalloc(sizeof(nxt_listen_socket_t) + size);
1828         if (nxt_slow_path(ls == NULL)) {
1829             return NULL;
1830         }
1831 
1832         skcf->listen = ls;
1833 
1834         ls->sockaddr = nxt_pointer_to(ls, sizeof(nxt_listen_socket_t));
1835         nxt_memcpy(ls->sockaddr, sa, size);
1836 
1837         nxt_listen_socket_remote_size(ls);
1838 
1839         ls->socket = -1;
1840         ls->backlog = NXT_LISTEN_BACKLOG;
1841         ls->flags = NXT_NONBLOCK;
1842         ls->read_after_accept = 1;
1843     }
1844 
1845     switch (sa->u.sockaddr.sa_family) {
1846 #if (NXT_HAVE_UNIX_DOMAIN)
1847     case AF_UNIX:
1848         wildcard = 0;
1849         break;
1850 #endif
1851 #if (NXT_INET6)
1852     case AF_INET6:
1853         wildcard = IN6_IS_ADDR_UNSPECIFIED(&sa->u.sockaddr_in6.sin6_addr);
1854         break;
1855 #endif
1856     case AF_INET:
1857     default:
1858         wildcard = (sa->u.sockaddr_in.sin_addr.s_addr == INADDR_ANY);
1859         break;
1860     }
1861 
1862     if (!wildcard) {
1863         skcf->sockaddr = nxt_mp_zget(tmcf->router_conf->mem_pool, size);
1864         if (nxt_slow_path(skcf->sockaddr == NULL)) {
1865             return NULL;
1866         }
1867 
1868         nxt_memcpy(skcf->sockaddr, sa, size);
1869     }
1870 
1871     return skcf;
1872 }
1873 
1874 
1875 static nxt_int_t
1876 nxt_router_listen_socket_find(nxt_router_temp_conf_t *tmcf,
1877     nxt_socket_conf_t *nskcf, nxt_sockaddr_t *sa)
1878 {
1879     nxt_router_t       *router;
1880     nxt_queue_link_t   *qlk;
1881     nxt_socket_conf_t  *skcf;
1882 
1883     router = tmcf->router_conf->router;
1884 
1885     for (qlk = nxt_queue_first(&router->sockets);
1886          qlk != nxt_queue_tail(&router->sockets);
1887          qlk = nxt_queue_next(qlk))
1888     {
1889         skcf = nxt_queue_link_data(qlk, nxt_socket_conf_t, link);
1890 
1891         if (nxt_sockaddr_cmp(skcf->listen->sockaddr, sa)) {
1892             nskcf->listen = skcf->listen;
1893 
1894             nxt_queue_remove(qlk);
1895             nxt_queue_insert_tail(&tmcf->keeping, qlk);
1896 
1897             nxt_queue_insert_tail(&tmcf->updating, &nskcf->link);
1898 
1899             return NXT_OK;
1900         }
1901     }
1902 
1903     nxt_queue_insert_tail(&tmcf->pending, &nskcf->link);
1904 
1905     return NXT_DECLINED;
1906 }
1907 
1908 
1909 static void
1910 nxt_router_listen_socket_rpc_create(nxt_task_t *task,
1911     nxt_router_temp_conf_t *tmcf, nxt_socket_conf_t *skcf)
1912 {
1913     size_t            size;
1914     uint32_t          stream;
1915     nxt_int_t         ret;
1916     nxt_buf_t         *b;
1917     nxt_port_t        *main_port, *router_port;
1918     nxt_runtime_t     *rt;
1919     nxt_socket_rpc_t  *rpc;
1920 
1921     rpc = nxt_mp_alloc(tmcf->mem_pool, sizeof(nxt_socket_rpc_t));
1922     if (rpc == NULL) {
1923         goto fail;
1924     }
1925 
1926     rpc->socket_conf = skcf;
1927     rpc->temp_conf = tmcf;
1928 
1929     size = nxt_sockaddr_size(skcf->listen->sockaddr);
1930 
1931     b = nxt_buf_mem_alloc(tmcf->mem_pool, size, 0);
1932     if (b == NULL) {
1933         goto fail;
1934     }
1935 
1936     b->mem.free = nxt_cpymem(b->mem.free, skcf->listen->sockaddr, size);
1937 
1938     rt = task->thread->runtime;
1939     main_port = rt->port_by_type[NXT_PROCESS_MAIN];
1940     router_port = rt->port_by_type[NXT_PROCESS_ROUTER];
1941 
1942     stream = nxt_port_rpc_register_handler(task, router_port,
1943                                            nxt_router_listen_socket_ready,
1944                                            nxt_router_listen_socket_error,
1945                                            main_port->pid, rpc);
1946     if (nxt_slow_path(stream == 0)) {
1947         goto fail;
1948     }
1949 
1950     ret = nxt_port_socket_write(task, main_port, NXT_PORT_MSG_SOCKET, -1,
1951                                 stream, router_port->id, b);
1952 
1953     if (nxt_slow_path(ret != NXT_OK)) {
1954         nxt_port_rpc_cancel(task, router_port, stream);
1955         goto fail;
1956     }
1957 
1958     return;
1959 
1960 fail:
1961 
1962     nxt_router_conf_error(task, tmcf);
1963 }
1964 
1965 
1966 static void
1967 nxt_router_listen_socket_ready(nxt_task_t *task, nxt_port_recv_msg_t *msg,
1968     void *data)
1969 {
1970     nxt_int_t         ret;
1971     nxt_socket_t      s;
1972     nxt_socket_rpc_t  *rpc;
1973 
1974     rpc = data;
1975 
1976     s = msg->fd;
1977 
1978     ret = nxt_socket_nonblocking(task, s);
1979     if (nxt_slow_path(ret != NXT_OK)) {
1980         goto fail;
1981     }
1982 
1983     nxt_socket_defer_accept(task, s, rpc->socket_conf->listen->sockaddr);
1984 
1985     ret = nxt_listen_socket(task, s, NXT_LISTEN_BACKLOG);
1986     if (nxt_slow_path(ret != NXT_OK)) {
1987         goto fail;
1988     }
1989 
1990     rpc->socket_conf->listen->socket = s;
1991 
1992     nxt_work_queue_add(&task->thread->engine->fast_work_queue,
1993                        nxt_router_conf_apply, task, rpc->temp_conf, NULL);
1994 
1995     return;
1996 
1997 fail:
1998 
1999     nxt_socket_close(task, s);
2000 
2001     nxt_router_conf_error(task, rpc->temp_conf);
2002 }
2003 
2004 
2005 static void
2006 nxt_router_listen_socket_error(nxt_task_t *task, nxt_port_recv_msg_t *msg,
2007     void *data)
2008 {
2009     nxt_socket_rpc_t        *rpc;
2010     nxt_router_temp_conf_t  *tmcf;
2011 
2012     rpc = data;
2013     tmcf = rpc->temp_conf;
2014 
2015 #if 0
2016     u_char                  *p;
2017     size_t                  size;
2018     uint8_t                 error;
2019     nxt_buf_t               *in, *out;
2020     nxt_sockaddr_t          *sa;
2021 
2022     static nxt_str_t  socket_errors[] = {
2023         nxt_string("ListenerSystem"),
2024         nxt_string("ListenerNoIPv6"),
2025         nxt_string("ListenerPort"),
2026         nxt_string("ListenerInUse"),
2027         nxt_string("ListenerNoAddress"),
2028         nxt_string("ListenerNoAccess"),
2029         nxt_string("ListenerPath"),
2030     };
2031 
2032     sa = rpc->socket_conf->listen->sockaddr;
2033 
2034     in = nxt_buf_chk_make_plain(tmcf->mem_pool, msg->buf, msg->size);
2035 
2036     if (nxt_slow_path(in == NULL)) {
2037         return;
2038     }
2039 
2040     p = in->mem.pos;
2041 
2042     error = *p++;
2043 
2044     size = nxt_length("listen socket error: ")
2045            + nxt_length("{listener: \"\", code:\"\", message: \"\"}")
2046            + sa->length + socket_errors[error].length + (in->mem.free - p);
2047 
2048     out = nxt_buf_mem_alloc(tmcf->mem_pool, size, 0);
2049     if (nxt_slow_path(out == NULL)) {
2050         return;
2051     }
2052 
2053     out->mem.free = nxt_sprintf(out->mem.free, out->mem.end,
2054                         "listen socket error: "
2055                         "{listener: \"%*s\", code:\"%V\", message: \"%*s\"}",
2056                         (size_t) sa->length, nxt_sockaddr_start(sa),
2057                         &socket_errors[error], in->mem.free - p, p);
2058 
2059     nxt_debug(task, "%*s", out->mem.free - out->mem.pos, out->mem.pos);
2060 #endif
2061 
2062     nxt_router_conf_error(task, tmcf);
2063 }
2064 
2065 
2066 #if (NXT_TLS)
2067 
2068 static void
2069 nxt_router_tls_rpc_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf,
2070     nxt_router_tlssock_t *tls)
2071 {
2072     nxt_socket_rpc_t  *rpc;
2073 
2074     rpc = nxt_mp_alloc(tmcf->mem_pool, sizeof(nxt_socket_rpc_t));
2075     if (rpc == NULL) {
2076         nxt_router_conf_error(task, tmcf);
2077         return;
2078     }
2079 
2080     rpc->socket_conf = tls->conf;
2081     rpc->temp_conf = tmcf;
2082 
2083     nxt_cert_store_get(task, &tls->name, tmcf->mem_pool,
2084                        nxt_router_tls_rpc_handler, rpc);
2085 }
2086 
2087 
2088 static void
2089 nxt_router_tls_rpc_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg,
2090     void *data)
2091 {
2092     nxt_mp_t               *mp;
2093     nxt_int_t              ret;
2094     nxt_tls_conf_t         *tlscf;
2095     nxt_socket_rpc_t       *rpc;
2096     nxt_router_temp_conf_t *tmcf;
2097 
2098     nxt_debug(task, "tls rpc handler");
2099 
2100     rpc = data;
2101     tmcf = rpc->temp_conf;
2102 
2103     if (msg == NULL || msg->port_msg.type == _NXT_PORT_MSG_RPC_ERROR) {
2104         goto fail;
2105     }
2106 
2107     mp = tmcf->router_conf->mem_pool;
2108 
2109     tlscf = nxt_mp_zget(mp, sizeof(nxt_tls_conf_t));
2110     if (nxt_slow_path(tlscf == NULL)) {
2111         goto fail;
2112     }
2113 
2114     tlscf->chain_file = msg->fd;
2115 
2116     ret = task->thread->runtime->tls->server_init(task, tlscf);
2117     if (nxt_slow_path(ret != NXT_OK)) {
2118         goto fail;
2119     }
2120 
2121     rpc->socket_conf->tls = tlscf;
2122 
2123     nxt_work_queue_add(&task->thread->engine->fast_work_queue,
2124                        nxt_router_conf_apply, task, tmcf, NULL);
2125     return;
2126 
2127 fail:
2128 
2129     nxt_router_conf_error(task, tmcf);
2130 }
2131 
2132 #endif
2133 
2134 
2135 static void
2136 nxt_router_app_rpc_create(nxt_task_t *task,
2137     nxt_router_temp_conf_t *tmcf, nxt_app_t *app)
2138 {
2139     size_t         size;
2140     uint32_t       stream;
2141     nxt_int_t      ret;
2142     nxt_buf_t      *b;
2143     nxt_port_t     *main_port, *router_port;
2144     nxt_runtime_t  *rt;
2145     nxt_app_rpc_t  *rpc;
2146 
2147     rpc = nxt_mp_alloc(tmcf->mem_pool, sizeof(nxt_app_rpc_t));
2148     if (rpc == NULL) {
2149         goto fail;
2150     }
2151 
2152     rpc->app = app;
2153     rpc->temp_conf = tmcf;
2154 
2155     nxt_debug(task, "app '%V' prefork", &app->name);
2156 
2157     size = app->name.length + 1 + app->conf.length;
2158 
2159     b = nxt_buf_mem_alloc(tmcf->mem_pool, size, 0);
2160     if (nxt_slow_path(b == NULL)) {
2161         goto fail;
2162     }
2163 
2164     nxt_buf_cpystr(b, &app->name);
2165     *b->mem.free++ = '\0';
2166     nxt_buf_cpystr(b, &app->conf);
2167 
2168     rt = task->thread->runtime;
2169     main_port = rt->port_by_type[NXT_PROCESS_MAIN];
2170     router_port = rt->port_by_type[NXT_PROCESS_ROUTER];
2171 
2172     stream = nxt_port_rpc_register_handler(task, router_port,
2173                                            nxt_router_app_prefork_ready,
2174                                            nxt_router_app_prefork_error,
2175                                            -1, rpc);
2176     if (nxt_slow_path(stream == 0)) {
2177         goto fail;
2178     }
2179 
2180     ret = nxt_port_socket_write(task, main_port, NXT_PORT_MSG_START_WORKER, -1,
2181                                 stream, router_port->id, b);
2182 
2183     if (nxt_slow_path(ret != NXT_OK)) {
2184         nxt_port_rpc_cancel(task, router_port, stream);
2185         goto fail;
2186     }
2187 
2188     app->pending_processes++;
2189 
2190     return;
2191 
2192 fail:
2193 
2194     nxt_router_conf_error(task, tmcf);
2195 }
2196 
2197 
2198 static void
2199 nxt_router_app_prefork_ready(nxt_task_t *task, nxt_port_recv_msg_t *msg,
2200     void *data)
2201 {
2202     nxt_app_t           *app;
2203     nxt_port_t          *port;
2204     nxt_app_rpc_t       *rpc;
2205     nxt_event_engine_t  *engine;
2206 
2207     rpc = data;
2208     app = rpc->app;
2209 
2210     port = msg->u.new_port;
2211     port->app = app;
2212 
2213     app->pending_processes--;
2214     app->processes++;
2215     app->idle_processes++;
2216 
2217     engine = task->thread->engine;
2218 
2219     nxt_queue_insert_tail(&app->ports, &port->app_link);
2220     nxt_queue_insert_tail(&app->spare_ports, &port->idle_link);
2221 
2222     port->idle_start = 0;
2223 
2224     nxt_port_inc_use(port);
2225 
2226     nxt_work_queue_add(&engine->fast_work_queue,
2227                        nxt_router_conf_apply, task, rpc->temp_conf, NULL);
2228 }
2229 
2230 
2231 static void
2232 nxt_router_app_prefork_error(nxt_task_t *task, nxt_port_recv_msg_t *msg,
2233     void *data)
2234 {
2235     nxt_app_t               *app;
2236     nxt_app_rpc_t           *rpc;
2237     nxt_router_temp_conf_t  *tmcf;
2238 
2239     rpc = data;
2240     app = rpc->app;
2241     tmcf = rpc->temp_conf;
2242 
2243     nxt_log(task, NXT_LOG_WARN, "failed to start application \"%V\"",
2244             &app->name);
2245 
2246     app->pending_processes--;
2247 
2248     nxt_router_conf_error(task, tmcf);
2249 }
2250 
2251 
2252 static nxt_int_t
2253 nxt_router_engines_create(nxt_task_t *task, nxt_router_t *router,
2254     nxt_router_temp_conf_t *tmcf, const nxt_event_interface_t *interface)
2255 {
2256     nxt_int_t                 ret;
2257     nxt_uint_t                n, threads;
2258     nxt_queue_link_t          *qlk;
2259     nxt_router_engine_conf_t  *recf;
2260 
2261     threads = tmcf->router_conf->threads;
2262 
2263     tmcf->engines = nxt_array_create(tmcf->mem_pool, threads,
2264                                      sizeof(nxt_router_engine_conf_t));
2265     if (nxt_slow_path(tmcf->engines == NULL)) {
2266         return NXT_ERROR;
2267     }
2268 
2269     n = 0;
2270 
2271     for (qlk = nxt_queue_first(&router->engines);
2272          qlk != nxt_queue_tail(&router->engines);
2273          qlk = nxt_queue_next(qlk))
2274     {
2275         recf = nxt_array_zero_add(tmcf->engines);
2276         if (nxt_slow_path(recf == NULL)) {
2277             return NXT_ERROR;
2278         }
2279 
2280         recf->engine = nxt_queue_link_data(qlk, nxt_event_engine_t, link0);
2281 
2282         if (n < threads) {
2283             recf->action = NXT_ROUTER_ENGINE_KEEP;
2284             ret = nxt_router_engine_conf_update(tmcf, recf);
2285 
2286         } else {
2287             recf->action = NXT_ROUTER_ENGINE_DELETE;
2288             ret = nxt_router_engine_conf_delete(tmcf, recf);
2289         }
2290 
2291         if (nxt_slow_path(ret != NXT_OK)) {
2292             return ret;
2293         }
2294 
2295         n++;
2296     }
2297 
2298     tmcf->new_threads = n;
2299 
2300     while (n < threads) {
2301         recf = nxt_array_zero_add(tmcf->engines);
2302         if (nxt_slow_path(recf == NULL)) {
2303             return NXT_ERROR;
2304         }
2305 
2306         recf->action = NXT_ROUTER_ENGINE_ADD;
2307 
2308         recf->engine = nxt_event_engine_create(task, interface, NULL, 0, 0);
2309         if (nxt_slow_path(recf->engine == NULL)) {
2310             return NXT_ERROR;
2311         }
2312 
2313         ret = nxt_router_engine_conf_create(tmcf, recf);
2314         if (nxt_slow_path(ret != NXT_OK)) {
2315             return ret;
2316         }
2317 
2318         n++;
2319     }
2320 
2321     return NXT_OK;
2322 }
2323 
2324 
2325 static nxt_int_t
2326 nxt_router_engine_conf_create(nxt_router_temp_conf_t *tmcf,
2327     nxt_router_engine_conf_t *recf)
2328 {
2329     nxt_int_t  ret;
2330 
2331     ret = nxt_router_engine_joints_create(tmcf, recf, &tmcf->creating,
2332                                           nxt_router_listen_socket_create);
2333     if (nxt_slow_path(ret != NXT_OK)) {
2334         return ret;
2335     }
2336 
2337     ret = nxt_router_engine_joints_create(tmcf, recf, &tmcf->updating,
2338                                           nxt_router_listen_socket_create);
2339     if (nxt_slow_path(ret != NXT_OK)) {
2340         return ret;
2341     }
2342 
2343     return ret;
2344 }
2345 
2346 
2347 static nxt_int_t
2348 nxt_router_engine_conf_update(nxt_router_temp_conf_t *tmcf,
2349     nxt_router_engine_conf_t *recf)
2350 {
2351     nxt_int_t  ret;
2352 
2353     ret = nxt_router_engine_joints_create(tmcf, recf, &tmcf->creating,
2354                                           nxt_router_listen_socket_create);
2355     if (nxt_slow_path(ret != NXT_OK)) {
2356         return ret;
2357     }
2358 
2359     ret = nxt_router_engine_joints_create(tmcf, recf, &tmcf->updating,
2360                                           nxt_router_listen_socket_update);
2361     if (nxt_slow_path(ret != NXT_OK)) {
2362         return ret;
2363     }
2364 
2365     ret = nxt_router_engine_joints_delete(tmcf, recf, &tmcf->deleting);
2366     if (nxt_slow_path(ret != NXT_OK)) {
2367         return ret;
2368     }
2369 
2370     return ret;
2371 }
2372 
2373 
2374 static nxt_int_t
2375 nxt_router_engine_conf_delete(nxt_router_temp_conf_t *tmcf,
2376     nxt_router_engine_conf_t *recf)
2377 {
2378     nxt_int_t  ret;
2379 
2380     ret = nxt_router_engine_quit(tmcf, recf);
2381     if (nxt_slow_path(ret != NXT_OK)) {
2382         return ret;
2383     }
2384 
2385     ret = nxt_router_engine_joints_delete(tmcf, recf, &tmcf->updating);
2386     if (nxt_slow_path(ret != NXT_OK)) {
2387         return ret;
2388     }
2389 
2390     return nxt_router_engine_joints_delete(tmcf, recf, &tmcf->deleting);
2391 }
2392 
2393 
2394 static nxt_int_t
2395 nxt_router_engine_joints_create(nxt_router_temp_conf_t *tmcf,
2396     nxt_router_engine_conf_t *recf, nxt_queue_t *sockets,
2397     nxt_work_handler_t handler)
2398 {
2399     nxt_joint_job_t          *job;
2400     nxt_queue_link_t         *qlk;
2401     nxt_socket_conf_t        *skcf;
2402     nxt_socket_conf_joint_t  *joint;
2403 
2404     for (qlk = nxt_queue_first(sockets);
2405          qlk != nxt_queue_tail(sockets);
2406          qlk = nxt_queue_next(qlk))
2407     {
2408         job = nxt_mp_get(tmcf->mem_pool, sizeof(nxt_joint_job_t));
2409         if (nxt_slow_path(job == NULL)) {
2410             return NXT_ERROR;
2411         }
2412 
2413         job->work.next = recf->jobs;
2414         recf->jobs = &job->work;
2415 
2416         job->task = tmcf->engine->task;
2417         job->work.handler = handler;
2418         job->work.task = &job->task;
2419         job->work.obj = job;
2420         job->tmcf = tmcf;
2421 
2422         tmcf->count++;
2423 
2424         joint = nxt_mp_alloc(tmcf->router_conf->mem_pool,
2425                              sizeof(nxt_socket_conf_joint_t));
2426         if (nxt_slow_path(joint == NULL)) {
2427             return NXT_ERROR;
2428         }
2429 
2430         job->work.data = joint;
2431 
2432         joint->count = 1;
2433 
2434         skcf = nxt_queue_link_data(qlk, nxt_socket_conf_t, link);
2435         skcf->count++;
2436         joint->socket_conf = skcf;
2437 
2438         joint->engine = recf->engine;
2439     }
2440 
2441     return NXT_OK;
2442 }
2443 
2444 
2445 static nxt_int_t
2446 nxt_router_engine_quit(nxt_router_temp_conf_t *tmcf,
2447     nxt_router_engine_conf_t *recf)
2448 {
2449     nxt_joint_job_t  *job;
2450 
2451     job = nxt_mp_get(tmcf->mem_pool, sizeof(nxt_joint_job_t));
2452     if (nxt_slow_path(job == NULL)) {
2453         return NXT_ERROR;
2454     }
2455 
2456     job->work.next = recf->jobs;
2457     recf->jobs = &job->work;
2458 
2459     job->task = tmcf->engine->task;
2460     job->work.handler = nxt_router_worker_thread_quit;
2461     job->work.task = &job->task;
2462     job->work.obj = NULL;
2463     job->work.data = NULL;
2464     job->tmcf = NULL;
2465 
2466     return NXT_OK;
2467 }
2468 
2469 
2470 static nxt_int_t
2471 nxt_router_engine_joints_delete(nxt_router_temp_conf_t *tmcf,
2472     nxt_router_engine_conf_t *recf, nxt_queue_t *sockets)
2473 {
2474     nxt_joint_job_t   *job;
2475     nxt_queue_link_t  *qlk;
2476 
2477     for (qlk = nxt_queue_first(sockets);
2478          qlk != nxt_queue_tail(sockets);
2479          qlk = nxt_queue_next(qlk))
2480     {
2481         job = nxt_mp_get(tmcf->mem_pool, sizeof(nxt_joint_job_t));
2482         if (nxt_slow_path(job == NULL)) {
2483             return NXT_ERROR;
2484         }
2485 
2486         job->work.next = recf->jobs;
2487         recf->jobs = &job->work;
2488 
2489         job->task = tmcf->engine->task;
2490         job->work.handler = nxt_router_listen_socket_delete;
2491         job->work.task = &job->task;
2492         job->work.obj = job;
2493         job->work.data = nxt_queue_link_data(qlk, nxt_socket_conf_t, link);
2494         job->tmcf = tmcf;
2495 
2496         tmcf->count++;
2497     }
2498 
2499     return NXT_OK;
2500 }
2501 
2502 
2503 static nxt_int_t
2504 nxt_router_threads_create(nxt_task_t *task, nxt_runtime_t *rt,
2505     nxt_router_temp_conf_t *tmcf)
2506 {
2507     nxt_int_t                 ret;
2508     nxt_uint_t                i, threads;
2509     nxt_router_engine_conf_t  *recf;
2510 
2511     recf = tmcf->engines->elts;
2512     threads = tmcf->router_conf->threads;
2513 
2514     for (i = tmcf->new_threads; i < threads; i++) {
2515         ret = nxt_router_thread_create(task, rt, recf[i].engine);
2516         if (nxt_slow_path(ret != NXT_OK)) {
2517             return ret;
2518         }
2519     }
2520 
2521     return NXT_OK;
2522 }
2523 
2524 
2525 static nxt_int_t
2526 nxt_router_thread_create(nxt_task_t *task, nxt_runtime_t *rt,
2527     nxt_event_engine_t *engine)
2528 {
2529     nxt_int_t            ret;
2530     nxt_thread_link_t    *link;
2531     nxt_thread_handle_t  handle;
2532 
2533     link = nxt_zalloc(sizeof(nxt_thread_link_t));
2534 
2535     if (nxt_slow_path(link == NULL)) {
2536         return NXT_ERROR;
2537     }
2538 
2539     link->start = nxt_router_thread_start;
2540     link->engine = engine;
2541     link->work.handler = nxt_router_thread_exit_handler;
2542     link->work.task = task;
2543     link->work.data = link;
2544 
2545     nxt_queue_insert_tail(&rt->engines, &engine->link);
2546 
2547     ret = nxt_thread_create(&handle, link);
2548 
2549     if (nxt_slow_path(ret != NXT_OK)) {
2550         nxt_queue_remove(&engine->link);
2551     }
2552 
2553     return ret;
2554 }
2555 
2556 
2557 static void
2558 nxt_router_apps_sort(nxt_task_t *task, nxt_router_t *router,
2559     nxt_router_temp_conf_t *tmcf)
2560 {
2561     nxt_app_t  *app;
2562 
2563     nxt_queue_each(app, &router->apps, nxt_app_t, link) {
2564 
2565         nxt_router_app_unlink(task, app);
2566 
2567     } nxt_queue_loop;
2568 
2569     nxt_queue_add(&router->apps, &tmcf->previous);
2570     nxt_queue_add(&router->apps, &tmcf->apps);
2571 }
2572 
2573 
2574 static void
2575 nxt_router_engines_post(nxt_router_t *router, nxt_router_temp_conf_t *tmcf)
2576 {
2577     nxt_uint_t                n;
2578     nxt_event_engine_t        *engine;
2579     nxt_router_engine_conf_t  *recf;
2580 
2581     recf = tmcf->engines->elts;
2582 
2583     for (n = tmcf->engines->nelts; n != 0; n--) {
2584         engine = recf->engine;
2585 
2586         switch (recf->action) {
2587 
2588         case NXT_ROUTER_ENGINE_KEEP:
2589             break;
2590 
2591         case NXT_ROUTER_ENGINE_ADD:
2592             nxt_queue_insert_tail(&router->engines, &engine->link0);
2593             break;
2594 
2595         case NXT_ROUTER_ENGINE_DELETE:
2596             nxt_queue_remove(&engine->link0);
2597             break;
2598         }
2599 
2600         nxt_router_engine_post(engine, recf->jobs);
2601 
2602         recf++;
2603     }
2604 }
2605 
2606 
2607 static void
2608 nxt_router_engine_post(nxt_event_engine_t *engine, nxt_work_t *jobs)
2609 {
2610     nxt_work_t  *work, *next;
2611 
2612     for (work = jobs; work != NULL; work = next) {
2613         next = work->next;
2614         work->next = NULL;
2615 
2616         nxt_event_engine_post(engine, work);
2617     }
2618 }
2619 
2620 
2621 static nxt_port_handlers_t  nxt_router_app_port_handlers = {
2622     .rpc_error = nxt_port_rpc_handler,
2623     .mmap      = nxt_port_mmap_handler,
2624     .data      = nxt_port_rpc_handler,
2625 };
2626 
2627 
2628 static void
2629 nxt_router_thread_start(void *data)
2630 {
2631     nxt_int_t           ret;
2632     nxt_port_t          *port;
2633     nxt_task_t          *task;
2634     nxt_thread_t        *thread;
2635     nxt_thread_link_t   *link;
2636     nxt_event_engine_t  *engine;
2637 
2638     link = data;
2639     engine = link->engine;
2640     task = &engine->task;
2641 
2642     thread = nxt_thread();
2643 
2644     nxt_event_engine_thread_adopt(engine);
2645 
2646     /* STUB */
2647     thread->runtime = engine->task.thread->runtime;
2648 
2649     engine->task.thread = thread;
2650     engine->task.log = thread->log;
2651     thread->engine = engine;
2652     thread->task = &engine->task;
2653 #if 0
2654     thread->fiber = &engine->fibers->fiber;
2655 #endif
2656 
2657     engine->mem_pool = nxt_mp_create(4096, 128, 1024, 64);
2658     if (nxt_slow_path(engine->mem_pool == NULL)) {
2659         return;
2660     }
2661 
2662     port = nxt_port_new(task, nxt_port_get_next_id(), nxt_pid,
2663                         NXT_PROCESS_ROUTER);
2664     if (nxt_slow_path(port == NULL)) {
2665         return;
2666     }
2667 
2668     ret = nxt_port_socket_init(task, port, 0);
2669     if (nxt_slow_path(ret != NXT_OK)) {
2670         nxt_port_use(task, port, -1);
2671         return;
2672     }
2673 
2674     engine->port = port;
2675 
2676     nxt_port_enable(task, port, &nxt_router_app_port_handlers);
2677 
2678     nxt_event_engine_start(engine);
2679 }
2680 
2681 
2682 static void
2683 nxt_router_listen_socket_create(nxt_task_t *task, void *obj, void *data)
2684 {
2685     nxt_joint_job_t          *job;
2686     nxt_socket_conf_t        *skcf;
2687     nxt_listen_event_t       *lev;
2688     nxt_listen_socket_t      *ls;
2689     nxt_thread_spinlock_t    *lock;
2690     nxt_socket_conf_joint_t  *joint;
2691 
2692     job = obj;
2693     joint = data;
2694 
2695     nxt_queue_insert_tail(&task->thread->engine->joints, &joint->link);
2696 
2697     skcf = joint->socket_conf;
2698     ls = skcf->listen;
2699 
2700     lev = nxt_listen_event(task, ls);
2701     if (nxt_slow_path(lev == NULL)) {
2702         nxt_router_listen_socket_release(task, skcf);
2703         return;
2704     }
2705 
2706     lev->socket.data = joint;
2707 
2708     lock = &skcf->router_conf->router->lock;
2709 
2710     nxt_thread_spin_lock(lock);
2711     ls->count++;
2712     nxt_thread_spin_unlock(lock);
2713 
2714     job->work.next = NULL;
2715     job->work.handler = nxt_router_conf_wait;
2716 
2717     nxt_event_engine_post(job->tmcf->engine, &job->work);
2718 }
2719 
2720 
2721 nxt_inline nxt_listen_event_t *
2722 nxt_router_listen_event(nxt_queue_t *listen_connections,
2723     nxt_socket_conf_t *skcf)
2724 {
2725     nxt_socket_t        fd;
2726     nxt_queue_link_t    *qlk;
2727     nxt_listen_event_t  *lev;
2728 
2729     fd = skcf->listen->socket;
2730 
2731     for (qlk = nxt_queue_first(listen_connections);
2732          qlk != nxt_queue_tail(listen_connections);
2733          qlk = nxt_queue_next(qlk))
2734     {
2735         lev = nxt_queue_link_data(qlk, nxt_listen_event_t, link);
2736 
2737         if (fd == lev->socket.fd) {
2738             return lev;
2739         }
2740     }
2741 
2742     return NULL;
2743 }
2744 
2745 
2746 static void
2747 nxt_router_listen_socket_update(nxt_task_t *task, void *obj, void *data)
2748 {
2749     nxt_joint_job_t          *job;
2750     nxt_event_engine_t       *engine;
2751     nxt_listen_event_t       *lev;
2752     nxt_socket_conf_joint_t  *joint, *old;
2753 
2754     job = obj;
2755     joint = data;
2756 
2757     engine = task->thread->engine;
2758 
2759     nxt_queue_insert_tail(&engine->joints, &joint->link);
2760 
2761     lev = nxt_router_listen_event(&engine->listen_connections,
2762                                   joint->socket_conf);
2763 
2764     old = lev->socket.data;
2765     lev->socket.data = joint;
2766     lev->listen = joint->socket_conf->listen;
2767 
2768     job->work.next = NULL;
2769     job->work.handler = nxt_router_conf_wait;
2770 
2771     nxt_event_engine_post(job->tmcf->engine, &job->work);
2772 
2773     /*
2774      * The task is allocated from configuration temporary
2775      * memory pool so it can be freed after engine post operation.
2776      */
2777 
2778     nxt_router_conf_release(&engine->task, old);
2779 }
2780 
2781 
2782 static void
2783 nxt_router_listen_socket_delete(nxt_task_t *task, void *obj, void *data)
2784 {
2785     nxt_joint_job_t     *job;
2786     nxt_socket_conf_t   *skcf;
2787     nxt_listen_event_t  *lev;
2788     nxt_event_engine_t  *engine;
2789 
2790     job = obj;
2791     skcf = data;
2792 
2793     engine = task->thread->engine;
2794 
2795     lev = nxt_router_listen_event(&engine->listen_connections, skcf);
2796 
2797     nxt_fd_event_delete(engine, &lev->socket);
2798 
2799     nxt_debug(task, "engine %p: listen socket delete: %d", engine,
2800               lev->socket.fd);
2801 
2802     lev->timer.handler = nxt_router_listen_socket_close;
2803     lev->timer.work_queue = &engine->fast_work_queue;
2804 
2805     nxt_timer_add(engine, &lev->timer, 0);
2806 
2807     job->work.next = NULL;
2808     job->work.handler = nxt_router_conf_wait;
2809 
2810     nxt_event_engine_post(job->tmcf->engine, &job->work);
2811 }
2812 
2813 
2814 static void
2815 nxt_router_worker_thread_quit(nxt_task_t *task, void *obj, void *data)
2816 {
2817     nxt_event_engine_t  *engine;
2818 
2819     nxt_debug(task, "router worker thread quit");
2820 
2821     engine = task->thread->engine;
2822 
2823     engine->shutdown = 1;
2824 
2825     if (nxt_queue_is_empty(&engine->joints)) {
2826         nxt_thread_exit(task->thread);
2827     }
2828 }
2829 
2830 
2831 static void
2832 nxt_router_listen_socket_close(nxt_task_t *task, void *obj, void *data)
2833 {
2834     nxt_timer_t              *timer;
2835     nxt_listen_event_t       *lev;
2836     nxt_socket_conf_joint_t  *joint;
2837 
2838     timer = obj;
2839     lev = nxt_timer_data(timer, nxt_listen_event_t, timer);
2840 
2841     nxt_debug(task, "engine %p: listen socket close: %d", task->thread->engine,
2842               lev->socket.fd);
2843 
2844     nxt_queue_remove(&lev->link);
2845 
2846     joint = lev->socket.data;
2847     lev->socket.data = NULL;
2848 
2849     /* 'task' refers to lev->task and we cannot use after nxt_free() */
2850     task = &task->thread->engine->task;
2851 
2852     nxt_router_listen_socket_release(task, joint->socket_conf);
2853 
2854     nxt_router_listen_event_release(task, lev, joint);
2855 }
2856 
2857 
2858 static void
2859 nxt_router_listen_socket_release(nxt_task_t *task, nxt_socket_conf_t *skcf)
2860 {
2861     nxt_listen_socket_t    *ls;
2862     nxt_thread_spinlock_t  *lock;
2863 
2864     ls = skcf->listen;
2865     lock = &skcf->router_conf->router->lock;
2866 
2867     nxt_thread_spin_lock(lock);
2868 
2869     nxt_debug(task, "engine %p: listen socket release: ls->count %D",
2870               task->thread->engine, ls->count);
2871 
2872     if (--ls->count != 0) {
2873         ls = NULL;
2874     }
2875 
2876     nxt_thread_spin_unlock(lock);
2877 
2878     if (ls != NULL) {
2879         nxt_socket_close(task, ls->socket);
2880         nxt_free(ls);
2881     }
2882 }
2883 
2884 
2885 void
2886 nxt_router_listen_event_release(nxt_task_t *task, nxt_listen_event_t *lev,
2887     nxt_socket_conf_joint_t *joint)
2888 {
2889     nxt_event_engine_t  *engine;
2890 
2891     nxt_debug(task, "listen event count: %D", lev->count);
2892 
2893     if (--lev->count == 0) {
2894         nxt_free(lev);
2895     }
2896 
2897     if (joint != NULL) {
2898         nxt_router_conf_release(task, joint);
2899     }
2900 
2901     engine = task->thread->engine;
2902 
2903     if (engine->shutdown && nxt_queue_is_empty(&engine->joints)) {
2904         nxt_thread_exit(task->thread);
2905     }
2906 }
2907 
2908 
2909 void
2910 nxt_router_conf_release(nxt_task_t *task, nxt_socket_conf_joint_t *joint)
2911 {
2912     nxt_socket_conf_t      *skcf;
2913     nxt_router_conf_t      *rtcf;
2914     nxt_thread_spinlock_t  *lock;
2915 
2916     nxt_debug(task, "conf joint %p count: %D", joint, joint->count);
2917 
2918     if (--joint->count != 0) {
2919         return;
2920     }
2921 
2922     nxt_queue_remove(&joint->link);
2923 
2924     /*
2925      * The joint content can not be safely used after the critical
2926      * section protected by the spinlock because its memory pool may
2927      * be already destroyed by another thread.
2928      */
2929     skcf = joint->socket_conf;
2930     rtcf = skcf->router_conf;
2931     lock = &rtcf->router->lock;
2932 
2933     nxt_thread_spin_lock(lock);
2934 
2935     nxt_debug(task, "conf skcf %p: %D, rtcf %p: %D", skcf, skcf->count,
2936               rtcf, rtcf->count);
2937 
2938     if (--skcf->count != 0) {
2939         skcf = NULL;
2940         rtcf = NULL;
2941 
2942     } else {
2943         nxt_queue_remove(&skcf->link);
2944 
2945         if (--rtcf->count != 0) {
2946             rtcf = NULL;
2947         }
2948     }
2949 
2950     nxt_thread_spin_unlock(lock);
2951 
2952     if (skcf != NULL) {
2953         if (skcf->pass != NULL) {
2954             nxt_http_pass_cleanup(task, skcf->pass);
2955         }
2956 
2957 #if (NXT_TLS)
2958         if (skcf->tls != NULL) {
2959             task->thread->runtime->tls->server_free(task, skcf->tls);
2960         }
2961 #endif
2962     }
2963 
2964     /* TODO remove engine->port */
2965     /* TODO excude from connected ports */
2966 
2967     if (rtcf != NULL) {
2968         nxt_debug(task, "old router conf is destroyed");
2969 
2970         nxt_http_routes_cleanup(task, rtcf->routes);
2971 
2972         nxt_router_access_log_release(task, lock, rtcf->access_log);
2973 
2974         nxt_mp_thread_adopt(rtcf->mem_pool);
2975 
2976         nxt_mp_destroy(rtcf->mem_pool);
2977     }
2978 }
2979 
2980 
2981 static void
2982 nxt_router_access_log_writer(nxt_task_t *task, nxt_http_request_t *r,
2983     nxt_router_access_log_t *access_log)
2984 {
2985     size_t     size;
2986     u_char     *buf, *p;
2987     nxt_off_t  bytes;
2988 
2989     static nxt_time_string_t  date_cache = {
2990         (nxt_atomic_uint_t) -1,
2991         nxt_router_access_log_date,
2992         "%02d/%s/%4d:%02d:%02d:%02d %c%02d%02d",
2993         nxt_length("31/Dec/1986:19:40:00 +0300"),
2994         NXT_THREAD_TIME_LOCAL,
2995         NXT_THREAD_TIME_SEC,
2996     };
2997 
2998     size = r->remote->address_length
2999            + 6                  /* ' - - [' */
3000            + date_cache.size
3001            + 3                  /* '] "' */
3002            + r->method->length
3003            + 1                  /* space */
3004            + r->target.length
3005            + 1                  /* space */
3006            + r->version.length
3007            + 2                  /* '" ' */
3008            + 3                  /* status */
3009            + 1                  /* space */
3010            + NXT_OFF_T_LEN
3011            + 2                  /* ' "' */
3012            + (r->referer != NULL ? r->referer->value_length : 1)
3013            + 3                  /* '" "' */
3014            + (r->user_agent != NULL ? r->user_agent->value_length : 1)
3015            + 2                  /* '"\n' */
3016     ;
3017 
3018     buf = nxt_mp_nget(r->mem_pool, size);
3019     if (nxt_slow_path(buf == NULL)) {
3020         return;
3021     }
3022 
3023     p = nxt_cpymem(buf, nxt_sockaddr_address(r->remote),
3024                    r->remote->address_length);
3025 
3026     p = nxt_cpymem(p, " - - [", 6);
3027 
3028     p = nxt_thread_time_string(task->thread, &date_cache, p);
3029 
3030     p = nxt_cpymem(p, "] \"", 3);
3031 
3032     if (r->method->length != 0) {
3033         p = nxt_cpymem(p, r->method->start, r->method->length);
3034 
3035         if (r->target.length != 0) {
3036             *p++ = ' ';
3037             p = nxt_cpymem(p, r->target.start, r->target.length);
3038 
3039             if (r->version.length != 0) {
3040                 *p++ = ' ';
3041                 p = nxt_cpymem(p, r->version.start, r->version.length);
3042             }
3043         }
3044 
3045     } else {
3046         *p++ = '-';
3047     }
3048 
3049     p = nxt_cpymem(p, "\" ", 2);
3050 
3051     p = nxt_sprintf(p, p + 3, "%03d", r->status);
3052 
3053     *p++ = ' ';
3054 
3055     bytes = nxt_http_proto_body_bytes_sent[r->protocol](task, r->proto);
3056 
3057     p = nxt_sprintf(p, p + NXT_OFF_T_LEN, "%O", bytes);
3058 
3059     p = nxt_cpymem(p, " \"", 2);
3060 
3061     if (r->referer != NULL) {
3062         p = nxt_cpymem(p, r->referer->value, r->referer->value_length);
3063 
3064     } else {
3065         *p++ = '-';
3066     }
3067 
3068     p = nxt_cpymem(p, "\" \"", 3);
3069 
3070     if (r->user_agent != NULL) {
3071         p = nxt_cpymem(p, r->user_agent->value, r->user_agent->value_length);
3072 
3073     } else {
3074         *p++ = '-';
3075     }
3076 
3077     p = nxt_cpymem(p, "\"\n", 2);
3078 
3079     nxt_fd_write(access_log->fd, buf, p - buf);
3080 }
3081 
3082 
3083 static u_char *
3084 nxt_router_access_log_date(u_char *buf, nxt_realtime_t *now, struct tm *tm,
3085     size_t size, const char *format)
3086 {
3087     u_char  sign;
3088     time_t  gmtoff;
3089 
3090     static const char  *month[] = { "Jan", "Feb", "Mar", "Apr", "May", "Jun",
3091                                     "Jul", "Aug", "Sep", "Oct", "Nov", "Dec" };
3092 
3093     gmtoff = nxt_timezone(tm) / 60;
3094 
3095     if (gmtoff < 0) {
3096         gmtoff = -gmtoff;
3097         sign = '-';
3098 
3099     } else {
3100         sign = '+';
3101     }
3102 
3103     return nxt_sprintf(buf, buf + size, format,
3104                        tm->tm_mday, month[tm->tm_mon], tm->tm_year + 1900,
3105                        tm->tm_hour, tm->tm_min, tm->tm_sec,
3106                        sign, gmtoff / 60, gmtoff % 60);
3107 }
3108 
3109 
3110 static void
3111 nxt_router_access_log_open(nxt_task_t *task, nxt_router_temp_conf_t *tmcf)
3112 {
3113     uint32_t                 stream;
3114     nxt_int_t                ret;
3115     nxt_buf_t                *b;
3116     nxt_port_t               *main_port, *router_port;
3117     nxt_runtime_t            *rt;
3118     nxt_router_access_log_t  *access_log;
3119 
3120     access_log = tmcf->router_conf->access_log;
3121 
3122     b = nxt_buf_mem_alloc(tmcf->mem_pool, access_log->path.length + 1, 0);
3123     if (nxt_slow_path(b == NULL)) {
3124         goto fail;
3125     }
3126 
3127     nxt_buf_cpystr(b, &access_log->path);
3128     *b->mem.free++ = '\0';
3129 
3130     rt = task->thread->runtime;
3131     main_port = rt->port_by_type[NXT_PROCESS_MAIN];
3132     router_port = rt->port_by_type[NXT_PROCESS_ROUTER];
3133 
3134     stream = nxt_port_rpc_register_handler(task, router_port,
3135                                            nxt_router_access_log_ready,
3136                                            nxt_router_access_log_error,
3137                                            -1, tmcf);
3138     if (nxt_slow_path(stream == 0)) {
3139         goto fail;
3140     }
3141 
3142     ret = nxt_port_socket_write(task, main_port, NXT_PORT_MSG_ACCESS_LOG, -1,
3143                                 stream, router_port->id, b);
3144 
3145     if (nxt_slow_path(ret != NXT_OK)) {
3146         nxt_port_rpc_cancel(task, router_port, stream);
3147         goto fail;
3148     }
3149 
3150     return;
3151 
3152 fail:
3153 
3154     nxt_router_conf_error(task, tmcf);
3155 }
3156 
3157 
3158 static void
3159 nxt_router_access_log_ready(nxt_task_t *task, nxt_port_recv_msg_t *msg,
3160     void *data)
3161 {
3162     nxt_router_temp_conf_t   *tmcf;
3163     nxt_router_access_log_t  *access_log;
3164 
3165     tmcf = data;
3166 
3167     access_log = tmcf->router_conf->access_log;
3168 
3169     access_log->fd = msg->fd;
3170 
3171     nxt_work_queue_add(&task->thread->engine->fast_work_queue,
3172                        nxt_router_conf_apply, task, tmcf, NULL);
3173 }
3174 
3175 
3176 static void
3177 nxt_router_access_log_error(nxt_task_t *task, nxt_port_recv_msg_t *msg,
3178     void *data)
3179 {
3180     nxt_router_temp_conf_t  *tmcf;
3181 
3182     tmcf = data;
3183 
3184     nxt_router_conf_error(task, tmcf);
3185 }
3186 
3187 
3188 static void
3189 nxt_router_access_log_release(nxt_task_t *task, nxt_thread_spinlock_t *lock,
3190     nxt_router_access_log_t *access_log)
3191 {
3192     if (access_log == NULL) {
3193         return;
3194     }
3195 
3196     nxt_thread_spin_lock(lock);
3197 
3198     if (--access_log->count != 0) {
3199         access_log = NULL;
3200     }
3201 
3202     nxt_thread_spin_unlock(lock);
3203 
3204     if (access_log != NULL) {
3205 
3206         if (access_log->fd != -1) {
3207             nxt_fd_close(access_log->fd);
3208         }
3209 
3210         nxt_free(access_log);
3211     }
3212 }
3213 
3214 
3215 typedef struct {
3216     nxt_mp_t                 *mem_pool;
3217     nxt_router_access_log_t  *access_log;
3218 } nxt_router_access_log_reopen_t;
3219 
3220 
3221 void
3222 nxt_router_access_log_reopen_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
3223 {
3224     nxt_mp_t                        *mp;
3225     uint32_t                        stream;
3226     nxt_int_t                       ret;
3227     nxt_buf_t                       *b;
3228     nxt_port_t                      *main_port, *router_port;
3229     nxt_runtime_t                   *rt;
3230     nxt_router_access_log_t         *access_log;
3231     nxt_router_access_log_reopen_t  *reopen;
3232 
3233     access_log = nxt_router->access_log;
3234 
3235     if (access_log == NULL) {
3236         return;
3237     }
3238 
3239     mp = nxt_mp_create(1024, 128, 256, 32);
3240     if (nxt_slow_path(mp == NULL)) {
3241         return;
3242     }
3243 
3244     reopen = nxt_mp_get(mp, sizeof(nxt_router_access_log_reopen_t));
3245     if (nxt_slow_path(reopen == NULL)) {
3246         goto fail;
3247     }
3248 
3249     reopen->mem_pool = mp;
3250     reopen->access_log = access_log;
3251 
3252     b = nxt_buf_mem_alloc(mp, access_log->path.length + 1, 0);
3253     if (nxt_slow_path(b == NULL)) {
3254         goto fail;
3255     }
3256 
3257     b->completion_handler = nxt_router_access_log_reopen_completion;
3258 
3259     nxt_buf_cpystr(b, &access_log->path);
3260     *b->mem.free++ = '\0';
3261 
3262     rt = task->thread->runtime;
3263     main_port = rt->port_by_type[NXT_PROCESS_MAIN];
3264     router_port = rt->port_by_type[NXT_PROCESS_ROUTER];
3265 
3266     stream = nxt_port_rpc_register_handler(task, router_port,
3267                                            nxt_router_access_log_reopen_ready,
3268                                            nxt_router_access_log_reopen_error,
3269                                            -1, reopen);
3270     if (nxt_slow_path(stream == 0)) {
3271         goto fail;
3272     }
3273 
3274     ret = nxt_port_socket_write(task, main_port, NXT_PORT_MSG_ACCESS_LOG, -1,
3275                                 stream, router_port->id, b);
3276 
3277     if (nxt_slow_path(ret != NXT_OK)) {
3278         nxt_port_rpc_cancel(task, router_port, stream);
3279         goto fail;
3280     }
3281 
3282     nxt_mp_retain(mp);
3283 
3284     return;
3285 
3286 fail:
3287 
3288     nxt_mp_destroy(mp);
3289 }
3290 
3291 
3292 static void
3293 nxt_router_access_log_reopen_completion(nxt_task_t *task, void *obj, void *data)
3294 {
3295     nxt_mp_t   *mp;
3296     nxt_buf_t  *b;
3297 
3298     b = obj;
3299     mp = b->data;
3300 
3301     nxt_mp_release(mp);
3302 }
3303 
3304 
3305 static void
3306 nxt_router_access_log_reopen_ready(nxt_task_t *task, nxt_port_recv_msg_t *msg,
3307     void *data)
3308 {
3309     nxt_router_access_log_t         *access_log;
3310     nxt_router_access_log_reopen_t  *reopen;
3311 
3312     reopen = data;
3313 
3314     access_log = reopen->access_log;
3315 
3316     if (access_log == nxt_router->access_log) {
3317 
3318         if (nxt_slow_path(dup2(msg->fd, access_log->fd) == -1)) {
3319             nxt_alert(task, "dup2(%FD, %FD) failed %E",
3320                       msg->fd, access_log->fd, nxt_errno);
3321         }
3322     }
3323 
3324     nxt_fd_close(msg->fd);
3325     nxt_mp_release(reopen->mem_pool);
3326 }
3327 
3328 
3329 static void
3330 nxt_router_access_log_reopen_error(nxt_task_t *task, nxt_port_recv_msg_t *msg,
3331     void *data)
3332 {
3333     nxt_router_access_log_reopen_t  *reopen;
3334 
3335     reopen = data;
3336 
3337     nxt_mp_release(reopen->mem_pool);
3338 }
3339 
3340 
3341 static void
3342 nxt_router_thread_exit_handler(nxt_task_t *task, void *obj, void *data)
3343 {
3344     nxt_port_t           *port;
3345     nxt_thread_link_t    *link;
3346     nxt_event_engine_t   *engine;
3347     nxt_thread_handle_t  handle;
3348 
3349     handle = (nxt_thread_handle_t) obj;
3350     link = data;
3351 
3352     nxt_thread_wait(handle);
3353 
3354     engine = link->engine;
3355 
3356     nxt_queue_remove(&engine->link);
3357 
3358     port = engine->port;
3359 
3360     // TODO notify all apps
3361 
3362     port->engine = task->thread->engine;
3363     nxt_mp_thread_adopt(port->mem_pool);
3364     nxt_port_use(task, port, -1);
3365 
3366     nxt_mp_thread_adopt(engine->mem_pool);
3367     nxt_mp_destroy(engine->mem_pool);
3368 
3369     nxt_event_engine_free(engine);
3370 
3371     nxt_free(link);
3372 }
3373 
3374 
3375 static void
3376 nxt_router_response_ready_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg,
3377     void *data)
3378 {
3379     size_t               dump_size;
3380     nxt_int_t            ret;
3381     nxt_buf_t            *b;
3382     nxt_http_request_t   *r;
3383     nxt_req_conn_link_t  *rc;
3384     nxt_app_parse_ctx_t  *ar;
3385     nxt_unit_response_t  *resp;
3386 
3387     b = msg->buf;
3388     rc = data;
3389 
3390     dump_size = nxt_buf_used_size(b);
3391 
3392     if (dump_size > 300) {
3393         dump_size = 300;
3394     }
3395 
3396     nxt_debug(task, "%srouter app data (%uz): %*s",
3397               msg->port_msg.last ? "last " : "", msg->size, dump_size,
3398               b->mem.pos);
3399 
3400     if (msg->size == 0) {
3401         b = NULL;
3402     }
3403 
3404     ar = rc->ap;
3405     if (nxt_slow_path(ar == NULL)) {
3406         return;
3407     }
3408 
3409     if (ar->request->error) {
3410         nxt_router_rc_unlink(task, rc);
3411         return;
3412     }
3413 
3414     if (msg->port_msg.last != 0) {
3415         nxt_debug(task, "router data create last buf");
3416 
3417         nxt_buf_chain_add(&b, nxt_http_buf_last(ar->request));
3418 
3419         nxt_router_rc_unlink(task, rc);
3420 
3421     } else {
3422         if (rc->app != NULL && rc->app->timeout != 0) {
3423             ar->timer.handler = nxt_router_app_timeout;
3424             ar->timer_data = rc;
3425             nxt_timer_add(task->thread->engine, &ar->timer, rc->app->timeout);
3426         }
3427     }
3428 
3429     if (b == NULL) {
3430         return;
3431     }
3432 
3433     if (msg->buf == b) {
3434         /* Disable instant buffer completion/re-using by port. */
3435         msg->buf = NULL;
3436     }
3437 
3438     r = ar->request;
3439 
3440     if (r->header_sent) {
3441         nxt_buf_chain_add(&r->out, b);
3442         nxt_http_request_send_body(task, r, NULL);
3443 
3444     } else {
3445         size_t b_size = nxt_buf_mem_used_size(&b->mem);
3446 
3447         if (nxt_slow_path(b_size < sizeof(*resp))) {
3448             goto fail;
3449         }
3450 
3451         resp = (void *) b->mem.pos;
3452         if (nxt_slow_path(b_size < sizeof(*resp)
3453               + resp->fields_count * sizeof(nxt_unit_field_t))) {
3454             goto fail;
3455         }
3456 
3457         nxt_unit_field_t  *f;
3458         nxt_http_field_t  *field;
3459 
3460         for (f = resp->fields; f < resp->fields + resp->fields_count; f++) {
3461             field = nxt_list_add(ar->resp_parser.fields);
3462 
3463             if (nxt_slow_path(field == NULL)) {
3464                 goto fail;
3465             }
3466 
3467             field->hash = f->hash;
3468             field->skip = f->skip;
3469 
3470             field->name_length = f->name_length;
3471             field->value_length = f->value_length;
3472             field->name = nxt_unit_sptr_get(&f->name);
3473             field->value = nxt_unit_sptr_get(&f->value);
3474 
3475             nxt_debug(task, "header: %*s: %*s",
3476                       (size_t) field->name_length, field->name,
3477                       (size_t) field->value_length, field->value);
3478         }
3479         r->status = resp->status;
3480 
3481 /*
3482         ret = nxt_http_parse_fields(&ar->resp_parser, &b->mem);
3483         if (nxt_slow_path(ret != NXT_DONE)) {
3484             goto fail;
3485         }
3486 */
3487         r->resp.fields = ar->resp_parser.fields;
3488 
3489         ret = nxt_http_fields_process(r->resp.fields,
3490                                       &nxt_response_fields_hash, r);
3491         if (nxt_slow_path(ret != NXT_OK)) {
3492             goto fail;
3493         }
3494 
3495         if (resp->piggyback_content_length != 0) {
3496             b->mem.pos = nxt_unit_sptr_get(&resp->piggyback_content);
3497             b->mem.free = b->mem.pos + resp->piggyback_content_length;
3498 
3499         } else {
3500             b->mem.pos = b->mem.free;
3501         }
3502 
3503         if (nxt_buf_mem_used_size(&b->mem) == 0) {
3504             nxt_work_queue_add(&task->thread->engine->fast_work_queue,
3505                                b->completion_handler, task, b, b->parent);
3506 
3507             b = b->next;
3508         }
3509 
3510         if (b != NULL) {
3511             nxt_buf_chain_add(&r->out, b);
3512         }
3513 
3514         r->state = &nxt_http_request_send_state;
3515 
3516         nxt_http_request_header_send(task, r);
3517     }
3518 
3519     return;
3520 
3521 fail:
3522 
3523     nxt_http_request_error(task, r, NXT_HTTP_SERVICE_UNAVAILABLE);
3524 
3525     nxt_router_rc_unlink(task, rc);
3526 }
3527 
3528 
3529 static const nxt_http_request_state_t  nxt_http_request_send_state
3530     nxt_aligned(64) =
3531 {
3532     .ready_handler = nxt_http_request_send_body,
3533     .error_handler = nxt_http_request_error_handler,
3534 };
3535 
3536 
3537 static void
3538 nxt_http_request_send_body(nxt_task_t *task, void *obj, void *data)
3539 {
3540     nxt_buf_t           *out;
3541     nxt_http_request_t  *r;
3542 
3543     r = obj;
3544 
3545     out = r->out;
3546 
3547     if (out != NULL) {
3548         r->out = NULL;
3549         nxt_http_request_send(task, r, out);
3550     }
3551 }
3552 
3553 
3554 static void
3555 nxt_router_response_error_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg,
3556     void *data)
3557 {
3558     nxt_int_t            res;
3559     nxt_port_t           *port;
3560     nxt_bool_t           cancelled;
3561     nxt_req_app_link_t   *ra;
3562     nxt_req_conn_link_t  *rc;
3563 
3564     rc = data;
3565 
3566     ra = rc->ra;
3567 
3568     if (ra != NULL) {
3569         cancelled = nxt_router_msg_cancel(task, &ra->msg_info, ra->stream);
3570 
3571         if (cancelled) {
3572             nxt_router_ra_inc_use(ra);
3573 
3574             res = nxt_router_app_port(task, rc->app, ra);
3575 
3576             if (res == NXT_OK) {
3577                 port = ra->app_port;
3578 
3579                 if (nxt_slow_path(port == NULL)) {
3580                     nxt_log(task, NXT_LOG_ERR, "port is NULL in cancelled ra");
3581                     return;
3582                 }
3583 
3584                 nxt_port_rpc_ex_set_peer(task, task->thread->engine->port, rc,
3585                                          port->pid);
3586 
3587                 nxt_router_app_prepare_request(task, ra);
3588             }
3589 
3590             msg->port_msg.last = 0;
3591 
3592             return;
3593         }
3594     }
3595 
3596     if (rc->ap != NULL) {
3597         nxt_http_request_error(task, rc->ap->request,
3598                                NXT_HTTP_SERVICE_UNAVAILABLE);
3599     }
3600 
3601     nxt_router_rc_unlink(task, rc);
3602 }
3603 
3604 
3605 static void
3606 nxt_router_app_port_ready(nxt_task_t *task, nxt_port_recv_msg_t *msg,
3607     void *data)
3608 {
3609     nxt_app_t        *app;
3610     nxt_port_t       *port;
3611     nxt_app_joint_t  *app_joint;
3612 
3613     app_joint = data;
3614     port = msg->u.new_port;
3615 
3616     nxt_assert(app_joint != NULL);
3617     nxt_assert(port != NULL);
3618 
3619     app = app_joint->app;
3620 
3621     nxt_router_app_joint_use(task, app_joint, -1);
3622 
3623     if (nxt_slow_path(app == NULL)) {
3624         nxt_debug(task, "new port ready for released app, send QUIT");
3625 
3626         nxt_port_socket_write(task, port, NXT_PORT_MSG_QUIT, -1, 0, 0, NULL);
3627 
3628         return;
3629     }
3630 
3631     port->app = app;
3632 
3633     nxt_thread_mutex_lock(&app->mutex);
3634 
3635     nxt_assert(app->pending_processes != 0);
3636 
3637     app->pending_processes--;
3638     app->processes++;
3639 
3640     nxt_thread_mutex_unlock(&app->mutex);
3641 
3642     nxt_debug(task, "app '%V' new port ready, pid %PI, %d/%d",
3643               &app->name, port->pid, app->processes, app->pending_processes);
3644 
3645     nxt_router_app_port_release(task, port, 0, 0);
3646 }
3647 
3648 
3649 static void
3650 nxt_router_app_port_error(nxt_task_t *task, nxt_port_recv_msg_t *msg,
3651     void *data)
3652 {
3653     nxt_app_t           *app;
3654     nxt_app_joint_t     *app_joint;
3655     nxt_queue_link_t    *lnk;
3656     nxt_req_app_link_t  *ra;
3657 
3658     app_joint = data;
3659 
3660     nxt_assert(app_joint != NULL);
3661 
3662     app = app_joint->app;
3663 
3664     nxt_router_app_joint_use(task, app_joint, -1);
3665 
3666     if (nxt_slow_path(app == NULL)) {
3667         nxt_debug(task, "start error for released app");
3668 
3669         return;
3670     }
3671 
3672     nxt_debug(task, "app '%V' %p start error", &app->name, app);
3673 
3674     nxt_thread_mutex_lock(&app->mutex);
3675 
3676     nxt_assert(app->pending_processes != 0);
3677 
3678     app->pending_processes--;
3679 
3680     if (!nxt_queue_is_empty(&app->requests)) {
3681         lnk = nxt_queue_last(&app->requests);
3682         nxt_queue_remove(lnk);
3683         lnk->next = NULL;
3684 
3685         ra = nxt_queue_link_data(lnk, nxt_req_app_link_t, link_app_requests);
3686 
3687     } else {
3688         ra = NULL;
3689     }
3690 
3691     nxt_thread_mutex_unlock(&app->mutex);
3692 
3693     if (ra != NULL) {
3694         nxt_debug(task, "app '%V' %p abort next stream #%uD",
3695                   &app->name, app, ra->stream);
3696 
3697         nxt_router_ra_error(ra, 500, "Failed to start application process");
3698         nxt_router_ra_use(task, ra, -1);
3699     }
3700 }
3701 
3702 nxt_inline nxt_port_t *
3703 nxt_router_app_get_port_for_quit(nxt_app_t *app);
3704 
3705 void
3706 nxt_router_app_use(nxt_task_t *task, nxt_app_t *app, int i)
3707 {
3708     int  c;
3709 
3710     c = nxt_atomic_fetch_add(&app->use_count, i);
3711 
3712     if (i < 0 && c == -i) {
3713 
3714         if (task->thread->engine != app->engine) {
3715             nxt_event_engine_post(app->engine, &app->joint->free_app_work);
3716 
3717         } else {
3718             nxt_router_free_app(task, app->joint, NULL);
3719         }
3720     }
3721 }
3722 
3723 
3724 nxt_inline nxt_bool_t
3725 nxt_router_app_first_port_busy(nxt_app_t *app)
3726 {
3727     nxt_port_t        *port;
3728     nxt_queue_link_t  *lnk;
3729 
3730     lnk = nxt_queue_first(&app->ports);
3731     port = nxt_queue_link_data(lnk, nxt_port_t, app_link);
3732 
3733     return port->app_pending_responses > 0;
3734 }
3735 
3736 
3737 nxt_inline nxt_port_t *
3738 nxt_router_pop_first_port(nxt_app_t *app)
3739 {
3740     nxt_port_t        *port;
3741     nxt_queue_link_t  *lnk;
3742 
3743     lnk = nxt_queue_first(&app->ports);
3744     nxt_queue_remove(lnk);
3745 
3746     port = nxt_queue_link_data(lnk, nxt_port_t, app_link);
3747 
3748     port->app_pending_responses++;
3749 
3750     if (nxt_queue_chk_remove(&port->idle_link)) {
3751         app->idle_processes--;
3752 
3753         if (port->idle_start == 0) {
3754             nxt_assert(app->idle_processes < app->spare_processes);
3755 
3756         } else {
3757             nxt_assert(app->idle_processes >= app->spare_processes);
3758 
3759             port->idle_start = 0;
3760         }
3761     }
3762 
3763     if ((app->max_pending_responses == 0
3764             || port->app_pending_responses < app->max_pending_responses)
3765         && (app->max_requests == 0
3766             || port->app_responses + port->app_pending_responses
3767                 < app->max_requests))
3768     {
3769         nxt_queue_insert_tail(&app->ports, lnk);
3770 
3771         nxt_port_inc_use(port);
3772 
3773     } else {
3774         lnk->next = NULL;
3775     }
3776 
3777     return port;
3778 }
3779 
3780 
3781 nxt_inline nxt_port_t *
3782 nxt_router_app_get_port_for_quit(nxt_app_t *app)
3783 {
3784     nxt_port_t  *port;
3785 
3786     port = NULL;
3787 
3788     nxt_thread_mutex_lock(&app->mutex);
3789 
3790     nxt_queue_each(port, &app->ports, nxt_port_t, app_link) {
3791 
3792         if (port->app_pending_responses > 0) {
3793             port = NULL;
3794 
3795             continue;
3796         }
3797 
3798         /* Caller is responsible to decrease port use count. */
3799         nxt_queue_chk_remove(&port->app_link);
3800 
3801         if (nxt_queue_chk_remove(&port->idle_link)) {
3802             app->idle_processes--;
3803         }
3804 
3805         port->app = NULL;
3806         app->processes--;
3807 
3808         break;
3809 
3810     } nxt_queue_loop;
3811 
3812     nxt_thread_mutex_unlock(&app->mutex);
3813 
3814     return port;
3815 }
3816 
3817 
3818 static void
3819 nxt_router_app_unlink(nxt_task_t *task, nxt_app_t *app)
3820 {
3821     nxt_debug(task, "app '%V' %p unlink", &app->name, app);
3822 
3823     nxt_queue_remove(&app->link);
3824 
3825     nxt_router_app_use(task, app, -1);
3826 }
3827 
3828 
3829 static void
3830 nxt_router_app_process_request(nxt_task_t *task, void *obj, void *data)
3831 {
3832     nxt_req_app_link_t  *ra;
3833 
3834     ra = data;
3835 
3836 #if (NXT_DEBUG)
3837     {
3838     nxt_app_t  *app;
3839 
3840     app = obj;
3841 
3842     nxt_assert(app != NULL);
3843     nxt_assert(ra != NULL);
3844     nxt_assert(ra->app_port != NULL);
3845 
3846     nxt_debug(task, "app '%V' %p process next stream #%uD",
3847               &app->name, app, ra->stream);
3848     }
3849 #endif
3850 
3851     nxt_router_app_prepare_request(task, ra);
3852 }
3853 
3854 
3855 static void
3856 nxt_router_app_port_release(nxt_task_t *task, nxt_port_t *port,
3857     uint32_t request_failed, uint32_t got_response)
3858 {
3859     nxt_app_t                *app;
3860     nxt_bool_t               port_unchained;
3861     nxt_bool_t               send_quit, cancelled, adjust_idle_timer;
3862     nxt_queue_link_t         *lnk;
3863     nxt_req_app_link_t       *ra, *pending_ra, *re_ra;
3864     nxt_port_select_state_t  state;
3865 
3866     nxt_assert(port != NULL);
3867     nxt_assert(port->app != NULL);
3868 
3869     ra = NULL;
3870 
3871     app = port->app;
3872 
3873     nxt_thread_mutex_lock(&app->mutex);
3874 
3875     port->app_pending_responses -= request_failed + got_response;
3876     port->app_responses += got_response;
3877 
3878     if (port->pair[1] != -1
3879         && (app->max_pending_responses == 0
3880             || port->app_pending_responses < app->max_pending_responses)
3881         && (app->max_requests == 0
3882             || port->app_responses + port->app_pending_responses
3883                 < app->max_requests))
3884     {
3885         if (port->app_link.next == NULL) {
3886             if (port->app_pending_responses > 0) {
3887                 nxt_queue_insert_tail(&app->ports, &port->app_link);
3888 
3889             } else {
3890                 nxt_queue_insert_head(&app->ports, &port->app_link);
3891             }
3892 
3893             nxt_port_inc_use(port);
3894 
3895         } else {
3896             if (port->app_pending_responses == 0
3897                 && nxt_queue_first(&app->ports) != &port->app_link)
3898             {
3899                 nxt_queue_remove(&port->app_link);
3900                 nxt_queue_insert_head(&app->ports, &port->app_link);
3901             }
3902         }
3903     }
3904 
3905     if (!nxt_queue_is_empty(&app->ports)
3906         && !nxt_queue_is_empty(&app->requests))
3907     {
3908         lnk = nxt_queue_first(&app->requests);
3909         nxt_queue_remove(lnk);
3910         lnk->next = NULL;
3911 
3912         ra = nxt_queue_link_data(lnk, nxt_req_app_link_t, link_app_requests);
3913 
3914         ra->app_port = nxt_router_pop_first_port(app);
3915 
3916         if (ra->app_port->app_pending_responses > 1) {
3917             nxt_router_ra_pending(task, app, ra);
3918         }
3919     }
3920 
3921     /* Pop first pending request for this port. */
3922     if ((request_failed > 0 || got_response > 0)
3923         && !nxt_queue_is_empty(&port->pending_requests))
3924     {
3925         lnk = nxt_queue_first(&port->pending_requests);
3926         nxt_queue_remove(lnk);
3927         lnk->next = NULL;
3928 
3929         pending_ra = nxt_queue_link_data(lnk, nxt_req_app_link_t,
3930                                          link_port_pending);
3931 
3932         nxt_assert(pending_ra->link_app_pending.next != NULL);
3933 
3934         nxt_queue_remove(&pending_ra->link_app_pending);
3935         pending_ra->link_app_pending.next = NULL;
3936 
3937     } else {
3938         pending_ra = NULL;
3939     }
3940 
3941     /* Try to cancel and re-schedule first stalled request for this app. */
3942     if (got_response > 0 && !nxt_queue_is_empty(&app->pending)) {
3943         lnk = nxt_queue_first(&app->pending);
3944 
3945         re_ra = nxt_queue_link_data(lnk, nxt_req_app_link_t, link_app_pending);
3946 
3947         if (re_ra->res_time <= nxt_thread_monotonic_time(task->thread)) {
3948 
3949             nxt_debug(task, "app '%V' stalled request #%uD detected",
3950                       &app->name, re_ra->stream);
3951 
3952             cancelled = nxt_router_msg_cancel(task, &re_ra->msg_info,
3953                                               re_ra->stream);
3954 
3955             if (cancelled) {
3956                 nxt_router_ra_inc_use(re_ra);
3957 
3958                 state.ra = re_ra;
3959                 state.app = app;
3960 
3961                 nxt_router_port_select(task, &state);
3962 
3963                 goto re_ra_cancelled;
3964             }
3965         }
3966     }
3967 
3968     re_ra = NULL;
3969 
3970 re_ra_cancelled:
3971 
3972     send_quit = (app->max_requests > 0
3973                  && port->app_pending_responses == 0
3974                  && port->app_responses >= app->max_requests);
3975 
3976     if (send_quit) {
3977         port_unchained = nxt_queue_chk_remove(&port->app_link);
3978 
3979         port->app = NULL;
3980         app->processes--;
3981 
3982     } else {
3983         port_unchained = 0;
3984     }
3985 
3986     adjust_idle_timer = 0;
3987 
3988     if (port->pair[1] != -1 && !send_quit && port->app_pending_responses == 0) {
3989         nxt_assert(port->idle_link.next == NULL);
3990 
3991         if (app->idle_processes == app->spare_processes
3992             && app->adjust_idle_work.data == NULL)
3993         {
3994             adjust_idle_timer = 1;
3995             app->adjust_idle_work.data = app;
3996             app->adjust_idle_work.next = NULL;
3997         }
3998 
3999         if (app->idle_processes < app->spare_processes) {
4000             nxt_queue_insert_tail(&app->spare_ports, &port->idle_link);
4001 
4002         } else {
4003             nxt_queue_insert_tail(&app->idle_ports, &port->idle_link);
4004 
4005             port->idle_start = task->thread->engine->timers.now;
4006         }
4007 
4008         app->idle_processes++;
4009     }
4010 
4011     nxt_thread_mutex_unlock(&app->mutex);
4012 
4013     if (adjust_idle_timer) {
4014         nxt_router_app_use(task, app, 1);
4015         nxt_event_engine_post(app->engine, &app->adjust_idle_work);
4016     }
4017 
4018     if (pending_ra != NULL) {
4019         nxt_router_ra_use(task, pending_ra, -1);
4020     }
4021 
4022     if (re_ra != NULL) {
4023         if (nxt_router_port_post_select(task, &state) == NXT_OK) {
4024             nxt_work_queue_add(&task->thread->engine->fast_work_queue,
4025                                nxt_router_app_process_request,
4026                                &task->thread->engine->task, app, re_ra);
4027         }
4028     }
4029 
4030     if (ra != NULL) {
4031         nxt_router_ra_use(task, ra, -1);
4032 
4033         nxt_work_queue_add(&task->thread->engine->fast_work_queue,
4034                            nxt_router_app_process_request,
4035                            &task->thread->engine->task, app, ra);
4036 
4037         goto adjust_use;
4038     }
4039 
4040     /* ? */
4041     if (port->pair[1] == -1) {
4042         nxt_debug(task, "app '%V' %p port %p already closed (pid %PI dead?)",
4043                   &app->name, app, port, port->pid);
4044 
4045         goto adjust_use;
4046     }
4047 
4048     if (send_quit) {
4049         nxt_debug(task, "app '%V' %p send QUIT to port",
4050                   &app->name, app);
4051 
4052         nxt_port_socket_write(task, port, NXT_PORT_MSG_QUIT,
4053                               -1, 0, 0, NULL);
4054 
4055         if (port_unchained) {
4056             nxt_port_use(task, port, -1);
4057         }
4058 
4059         goto adjust_use;
4060     }
4061 
4062     nxt_debug(task, "app '%V' %p requests queue is empty, keep the port",
4063               &app->name, app);
4064 
4065 adjust_use:
4066 
4067     if (request_failed > 0 || got_response > 0) {
4068         nxt_port_use(task, port, -1);
4069     }
4070 }
4071 
4072 
4073 void
4074 nxt_router_app_port_close(nxt_task_t *task, nxt_port_t *port)
4075 {
4076     nxt_app_t         *app;
4077     nxt_bool_t        unchain, start_process;
4078     nxt_port_t        *idle_port;
4079     nxt_queue_link_t  *idle_lnk;
4080 
4081     app = port->app;
4082 
4083     nxt_assert(app != NULL);
4084 
4085     nxt_thread_mutex_lock(&app->mutex);
4086 
4087     unchain = nxt_queue_chk_remove(&port->app_link);
4088 
4089     if (nxt_queue_chk_remove(&port->idle_link)) {
4090         app->idle_processes--;
4091 
4092         if (port->idle_start == 0
4093             && app->idle_processes >= app->spare_processes)
4094         {
4095             nxt_assert(!nxt_queue_is_empty(&app->idle_ports));
4096 
4097             idle_lnk = nxt_queue_last(&app->idle_ports);
4098             idle_port = nxt_queue_link_data(idle_lnk, nxt_port_t, idle_link);
4099             nxt_queue_remove(idle_lnk);
4100 
4101             nxt_queue_insert_tail(&app->spare_ports, idle_lnk);
4102 
4103             idle_port->idle_start = 0;
4104         }
4105     }
4106 
4107     app->processes--;
4108 
4109     start_process = !task->thread->engine->shutdown
4110                     && nxt_router_app_can_start(app)
4111                     && (!nxt_queue_is_empty(&app->requests)
4112                         || nxt_router_app_need_start(app));
4113 
4114     if (start_process) {
4115         app->pending_processes++;
4116     }
4117 
4118     nxt_thread_mutex_unlock(&app->mutex);
4119 
4120     nxt_debug(task, "app '%V' pid %PI closed", &app->name, port->pid);
4121 
4122     if (unchain) {
4123         nxt_port_use(task, port, -1);
4124     }
4125 
4126     if (start_process) {
4127         nxt_router_start_app_process(task, app);
4128     }
4129 }
4130 
4131 
4132 static void
4133 nxt_router_adjust_idle_timer(nxt_task_t *task, void *obj, void *data)
4134 {
4135     nxt_app_t           *app;
4136     nxt_bool_t          queued;
4137     nxt_port_t          *port;
4138     nxt_msec_t          timeout, threshold;
4139     nxt_queue_link_t    *lnk;
4140     nxt_event_engine_t  *engine;
4141 
4142     app = obj;
4143     queued = (data == app);
4144 
4145     nxt_debug(task, "nxt_router_adjust_idle_timer: app \"%V\", queued %b",
4146               &app->name, queued);
4147 
4148     engine = task->thread->engine;
4149 
4150     nxt_assert(app->engine == engine);
4151 
4152     threshold = engine->timers.now + app->joint->idle_timer.bias;
4153     timeout = 0;
4154 
4155     nxt_thread_mutex_lock(&app->mutex);
4156 
4157     if (queued) {
4158         app->adjust_idle_work.data = NULL;
4159     }
4160 
4161     while (app->idle_processes > app->spare_processes) {
4162 
4163         nxt_assert(!nxt_queue_is_empty(&app->idle_ports));
4164 
4165         lnk = nxt_queue_first(&app->idle_ports);
4166         port = nxt_queue_link_data(lnk, nxt_port_t, idle_link);
4167 
4168         timeout = port->idle_start + app->idle_timeout;
4169 
4170         if (timeout > threshold) {
4171             break;
4172         }
4173 
4174         nxt_queue_remove(lnk);
4175         lnk->next = NULL;
4176 
4177         nxt_queue_chk_remove(&port->app_link);
4178 
4179         app->idle_processes--;
4180         app->processes--;
4181         port->app = NULL;
4182 
4183         nxt_thread_mutex_unlock(&app->mutex);
4184 
4185         nxt_debug(task, "app '%V' send QUIT to idle port %PI",
4186                   &app->name, port->pid);
4187 
4188         nxt_port_socket_write(task, port, NXT_PORT_MSG_QUIT, -1, 0, 0, NULL);
4189 
4190         nxt_port_use(task, port, -1);
4191 
4192         nxt_thread_mutex_lock(&app->mutex);
4193     }
4194 
4195     nxt_thread_mutex_unlock(&app->mutex);
4196 
4197     if (timeout > threshold) {
4198         nxt_timer_add(engine, &app->joint->idle_timer, timeout - threshold);
4199 
4200     } else {
4201         nxt_timer_disable(engine, &app->joint->idle_timer);
4202     }
4203 
4204     if (queued) {
4205         nxt_router_app_use(task, app, -1);
4206     }
4207 }
4208 
4209 
4210 static void
4211 nxt_router_app_idle_timeout(nxt_task_t *task, void *obj, void *data)
4212 {
4213     nxt_timer_t      *timer;
4214     nxt_app_joint_t  *app_joint;
4215 
4216     timer = obj;
4217     app_joint = nxt_container_of(timer, nxt_app_joint_t, idle_timer);
4218 
4219     if (nxt_fast_path(app_joint->app != NULL)) {
4220         nxt_router_adjust_idle_timer(task, app_joint->app, NULL);
4221     }
4222 }
4223 
4224 
4225 static void
4226 nxt_router_app_joint_release_handler(nxt_task_t *task, void *obj, void *data)
4227 {
4228     nxt_timer_t      *timer;
4229     nxt_app_joint_t  *app_joint;
4230 
4231     timer = obj;
4232     app_joint = nxt_container_of(timer, nxt_app_joint_t, idle_timer);
4233 
4234     nxt_router_app_joint_use(task, app_joint, -1);
4235 }
4236 
4237 
4238 static void
4239 nxt_router_free_app(nxt_task_t *task, void *obj, void *data)
4240 {
4241     nxt_app_t        *app;
4242     nxt_port_t       *port;
4243     nxt_app_joint_t  *app_joint;
4244 
4245     app_joint = obj;
4246     app = app_joint->app;
4247 
4248     for ( ;; ) {
4249         port = nxt_router_app_get_port_for_quit(app);
4250         if (port == NULL) {
4251             break;
4252         }
4253 
4254         nxt_debug(task, "send QUIT to app '%V' pid %PI", &app->name, port->pid);
4255 
4256         nxt_port_socket_write(task, port, NXT_PORT_MSG_QUIT, -1, 0, 0, NULL);
4257 
4258         nxt_port_use(task, port, -1);
4259     }
4260 
4261     nxt_assert(app->processes == 0);
4262     nxt_assert(app->idle_processes == 0);
4263     nxt_assert(nxt_queue_is_empty(&app->requests));
4264     nxt_assert(nxt_queue_is_empty(&app->ports));
4265     nxt_assert(nxt_queue_is_empty(&app->spare_ports));
4266     nxt_assert(nxt_queue_is_empty(&app->idle_ports));
4267 
4268     nxt_thread_mutex_destroy(&app->mutex);
4269     nxt_free(app);
4270 
4271     app_joint->app = NULL;
4272 
4273     if (nxt_timer_delete(task->thread->engine, &app_joint->idle_timer)) {
4274         app_joint->idle_timer.handler = nxt_router_app_joint_release_handler;
4275         nxt_timer_add(task->thread->engine, &app_joint->idle_timer, 0);
4276 
4277     } else {
4278         nxt_router_app_joint_use(task, app_joint, -1);
4279     }
4280 }
4281 
4282 
4283 static void
4284 nxt_router_port_select(nxt_task_t *task, nxt_port_select_state_t *state)
4285 {
4286     nxt_app_t           *app;
4287     nxt_bool_t          can_start_process;
4288     nxt_req_app_link_t  *ra;
4289 
4290     ra = state->ra;
4291     app = state->app;
4292 
4293     state->failed_port_use_delta = 0;
4294 
4295     if (nxt_queue_chk_remove(&ra->link_app_requests))
4296     {
4297         nxt_router_ra_dec_use(ra);
4298     }
4299 
4300     if (nxt_queue_chk_remove(&ra->link_port_pending))
4301     {
4302         nxt_assert(ra->link_app_pending.next != NULL);
4303 
4304         nxt_queue_remove(&ra->link_app_pending);
4305         ra->link_app_pending.next = NULL;
4306 
4307         nxt_router_ra_dec_use(ra);
4308     }
4309 
4310     state->failed_port = ra->app_port;
4311 
4312     if (ra->app_port != NULL) {
4313         state->failed_port_use_delta--;
4314 
4315         state->failed_port->app_pending_responses--;
4316 
4317         if (nxt_queue_chk_remove(&state->failed_port->app_link)) {
4318             state->failed_port_use_delta--;
4319         }
4320 
4321         ra->app_port = NULL;
4322     }
4323 
4324     can_start_process = nxt_router_app_can_start(app);
4325 
4326     state->port = NULL;
4327     state->start_process = 0;
4328 
4329     if (nxt_queue_is_empty(&app->ports)
4330         || (can_start_process && nxt_router_app_first_port_busy(app)) )
4331     {
4332         ra = nxt_router_ra_create(task, ra);
4333 
4334         if (nxt_slow_path(ra == NULL)) {
4335             goto fail;
4336         }
4337 
4338         if (nxt_slow_path(state->failed_port != NULL)) {
4339             nxt_queue_insert_head(&app->requests, &ra->link_app_requests);
4340 
4341         } else {
4342             nxt_queue_insert_tail(&app->requests, &ra->link_app_requests);
4343         }
4344 
4345         nxt_router_ra_inc_use(ra);
4346 
4347         nxt_debug(task, "ra stream #%uD enqueue to app->requests", ra->stream);
4348 
4349         if (can_start_process) {
4350             app->pending_processes++;
4351             state->start_process = 1;
4352         }
4353 
4354     } else {
4355         state->port = nxt_router_pop_first_port(app);
4356 
4357         if (state->port->app_pending_responses > 1) {
4358             ra = nxt_router_ra_create(task, ra);
4359 
4360             if (nxt_slow_path(ra == NULL)) {
4361                 goto fail;
4362             }
4363 
4364             ra->app_port = state->port;
4365 
4366             nxt_router_ra_pending(task, app, ra);
4367         }
4368 
4369         if (can_start_process && nxt_router_app_need_start(app)) {
4370             app->pending_processes++;
4371             state->start_process = 1;
4372         }
4373     }
4374 
4375 fail:
4376 
4377     state->shared_ra = ra;
4378 }
4379 
4380 
4381 static nxt_int_t
4382 nxt_router_port_post_select(nxt_task_t *task, nxt_port_select_state_t *state)
4383 {
4384     nxt_int_t           res;
4385     nxt_app_t           *app;
4386     nxt_req_app_link_t  *ra;
4387 
4388     ra = state->shared_ra;
4389     app = state->app;
4390 
4391     if (state->failed_port_use_delta != 0) {
4392         nxt_port_use(task, state->failed_port, state->failed_port_use_delta);
4393     }
4394 
4395     if (nxt_slow_path(ra == NULL)) {
4396         if (state->port != NULL) {
4397             nxt_port_use(task, state->port, -1);
4398         }
4399 
4400         nxt_router_ra_error(state->ra, 500,
4401                             "Failed to allocate shared req<->app link");
4402         nxt_router_ra_use(task, state->ra, -1);
4403 
4404         return NXT_ERROR;
4405     }
4406 
4407     if (state->port != NULL) {
4408         nxt_debug(task, "already have port for app '%V' %p ", &app->name, app);
4409 
4410         ra->app_port = state->port;
4411 
4412         if (state->start_process) {
4413             nxt_router_start_app_process(task, app);
4414         }
4415 
4416         return NXT_OK;
4417     }
4418 
4419     if (!state->start_process) {
4420         nxt_debug(task, "app '%V' %p too many running or pending processes",
4421                   &app->name, app);
4422 
4423         return NXT_AGAIN;
4424     }
4425 
4426     res = nxt_router_start_app_process(task, app);
4427 
4428     if (nxt_slow_path(res != NXT_OK)) {
4429         nxt_router_ra_error(ra, 500, "Failed to start app process");
4430         nxt_router_ra_use(task, ra, -1);
4431 
4432         return NXT_ERROR;
4433     }
4434 
4435     return NXT_AGAIN;
4436 }
4437 
4438 
4439 static nxt_int_t
4440 nxt_router_app_port(nxt_task_t *task, nxt_app_t *app, nxt_req_app_link_t *ra)
4441 {
4442     nxt_port_select_state_t  state;
4443 
4444     state.ra = ra;
4445     state.app = app;
4446 
4447     nxt_thread_mutex_lock(&app->mutex);
4448 
4449     nxt_router_port_select(task, &state);
4450 
4451     nxt_thread_mutex_unlock(&app->mutex);
4452 
4453     return nxt_router_port_post_select(task, &state);
4454 }
4455 
4456 
4457 void
4458 nxt_router_process_http_request(nxt_task_t *task, nxt_app_parse_ctx_t *ar,
4459     nxt_app_t *app)
4460 {
4461     nxt_int_t            res;
4462     nxt_port_t           *port;
4463     nxt_event_engine_t   *engine;
4464     nxt_http_request_t   *r;
4465     nxt_req_app_link_t   ra_local, *ra;
4466     nxt_req_conn_link_t  *rc;
4467 
4468     r = ar->request;
4469     engine = task->thread->engine;
4470 
4471     rc = nxt_port_rpc_register_handler_ex(task, engine->port,
4472                                           nxt_router_response_ready_handler,
4473                                           nxt_router_response_error_handler,
4474                                           sizeof(nxt_req_conn_link_t));
4475 
4476     if (nxt_slow_path(rc == NULL)) {
4477         nxt_http_request_error(task, r, NXT_HTTP_INTERNAL_SERVER_ERROR);
4478         return;
4479     }
4480 
4481     rc->stream = nxt_port_rpc_ex_stream(rc);
4482     rc->app = app;
4483 
4484     nxt_router_app_use(task, app, 1);
4485 
4486     rc->ap = ar;
4487 
4488     ra = &ra_local;
4489     nxt_router_ra_init(task, ra, rc);
4490 
4491     res = nxt_router_app_port(task, app, ra);
4492 
4493     if (res != NXT_OK) {
4494         return;
4495     }
4496 
4497     ra = rc->ra;
4498     port = ra->app_port;
4499 
4500     nxt_assert(port != NULL);
4501 
4502     nxt_port_rpc_ex_set_peer(task, engine->port, rc, port->pid);
4503 
4504     nxt_router_app_prepare_request(task, ra);
4505 }
4506 
4507 
4508 static void
4509 nxt_router_dummy_buf_completion(nxt_task_t *task, void *obj, void *data)
4510 {
4511 }
4512 
4513 
4514 static void
4515 nxt_router_app_prepare_request(nxt_task_t *task, nxt_req_app_link_t *ra)
4516 {
4517     uint32_t             request_failed;
4518     nxt_buf_t            *buf;
4519     nxt_int_t            res;
4520     nxt_port_t           *port, *c_port, *reply_port;
4521     nxt_app_parse_ctx_t  *ap;
4522 
4523     nxt_assert(ra->app_port != NULL);
4524 
4525     port = ra->app_port;
4526     reply_port = ra->reply_port;
4527     ap = ra->ap;
4528 
4529     request_failed = 1;
4530 
4531     c_port = nxt_process_connected_port_find(port->process, reply_port->pid,
4532                                              reply_port->id);
4533     if (nxt_slow_path(c_port != reply_port)) {
4534         res = nxt_port_send_port(task, port, reply_port, 0);
4535 
4536         if (nxt_slow_path(res != NXT_OK)) {
4537             nxt_router_ra_error(ra, 500,
4538                                 "Failed to send reply port to application");
4539             goto release_port;
4540         }
4541 
4542         nxt_process_connected_port_add(port->process, reply_port);
4543     }
4544 
4545     buf = nxt_router_prepare_msg(task, &ap->r, port,
4546                                  nxt_app_msg_prefix[port->app->type]);
4547 
4548     if (nxt_slow_path(buf == NULL)) {
4549         nxt_router_ra_error(ra, 500,
4550                             "Failed to prepare message for application");
4551         goto release_port;
4552     }
4553 
4554     nxt_debug(task, "about to send %O bytes buffer to app process port %d",
4555                     nxt_buf_used_size(buf),
4556                     port->socket.fd);
4557 
4558     request_failed = 0;
4559 
4560     ra->msg_info.buf = buf;
4561     ra->msg_info.completion_handler = buf->completion_handler;
4562 
4563     for (; buf; buf = buf->next) {
4564         buf->completion_handler = nxt_router_dummy_buf_completion;
4565     }
4566 
4567     buf = ra->msg_info.buf;
4568 
4569     res = nxt_port_mmap_get_tracking(task, port, &ra->msg_info.tracking,
4570                                      ra->stream);
4571     if (nxt_slow_path(res != NXT_OK)) {
4572         nxt_router_ra_error(ra, 500,
4573                             "Failed to get tracking area");
4574         goto release_port;
4575     }
4576 
4577     res = nxt_port_socket_twrite(task, port, NXT_PORT_MSG_DATA,
4578                                  -1, ra->stream, reply_port->id, buf,
4579                                  &ra->msg_info.tracking);
4580 
4581     if (nxt_slow_path(res != NXT_OK)) {
4582         nxt_router_ra_error(ra, 500,
4583                             "Failed to send message to application");
4584         goto release_port;
4585     }
4586 
4587 release_port:
4588 
4589     nxt_router_app_port_release(task, port, request_failed, 0);
4590 
4591     nxt_router_ra_update_peer(task, ra);
4592 }
4593 
4594 
4595 struct nxt_fields_iter_s {
4596     nxt_list_part_t   *part;
4597     nxt_http_field_t  *field;
4598 };
4599 
4600 typedef struct nxt_fields_iter_s  nxt_fields_iter_t;
4601 
4602 
4603 static nxt_http_field_t *
4604 nxt_fields_part_first(nxt_list_part_t *part, nxt_fields_iter_t *i)
4605 {
4606     if (part == NULL) {
4607         return NULL;
4608     }
4609 
4610     while (part->nelts == 0) {
4611         part = part->next;
4612         if (part == NULL) {
4613             return NULL;
4614         }
4615     }
4616 
4617     i->part = part;
4618     i->field = nxt_list_data(i->part);
4619 
4620     return i->field;
4621 }
4622 
4623 
4624 static nxt_http_field_t *
4625 nxt_fields_first(nxt_list_t *fields, nxt_fields_iter_t *i)
4626 {
4627     return nxt_fields_part_first(nxt_list_part(fields), i);
4628 }
4629 
4630 
4631 static nxt_http_field_t *
4632 nxt_fields_next(nxt_fields_iter_t *i)
4633 {
4634     nxt_http_field_t  *end = nxt_list_data(i->part);
4635 
4636     end += i->part->nelts;
4637     i->field++;
4638 
4639     if (i->field < end) {
4640         return i->field;
4641     }
4642 
4643     return nxt_fields_part_first(i->part->next, i);
4644 }
4645 
4646 
4647 static nxt_buf_t *
4648 nxt_router_prepare_msg(nxt_task_t *task, nxt_app_request_t *r,
4649     nxt_port_t *port, const nxt_str_t *prefix)
4650 {
4651     void                      *target_pos, *query_pos;
4652     u_char                    *pos, *end, *p, c;
4653     size_t                    fields_count, req_size, size, free_size;
4654     size_t                    copy_size;
4655     nxt_buf_t                 *b, *buf, *out, **tail;
4656     nxt_http_field_t          *field, *dup;
4657     nxt_unit_field_t          *dst_field;
4658     nxt_fields_iter_t         iter, dup_iter;
4659     nxt_unit_request_t        *req;
4660     nxt_app_request_header_t  *h;
4661 
4662     h = &r->header;
4663 
4664     req_size = sizeof(nxt_unit_request_t)
4665                + h->method.length + 1
4666                + h->version.length + 1
4667                + r->remote.length + 1
4668                + r->local.length + 1
4669                + h->server_name.length + 1
4670                + h->target.length + 1
4671                + (h->path.start != h->target.start ? h->path.length + 1 : 0);
4672 
4673     fields_count = 0;
4674 
4675     nxt_list_each(field, h->fields) {
4676         fields_count++;
4677 
4678         req_size += field->name_length + prefix->length + 1
4679                     + field->value_length + 1;
4680     } nxt_list_loop;
4681 
4682     req_size += fields_count * sizeof(nxt_unit_field_t);
4683 
4684     if (nxt_slow_path(req_size > PORT_MMAP_DATA_SIZE)) {
4685         nxt_alert(task, "headers to big to fit in shared memory (%d)",
4686                   (int) req_size);
4687 
4688         return NULL;
4689     }
4690 
4691     out = nxt_port_mmap_get_buf(task, port,
4692               nxt_min(req_size + r->body.preread_size, PORT_MMAP_DATA_SIZE));
4693     if (nxt_slow_path(out == NULL)) {
4694         return NULL;
4695     }
4696 
4697     req = (nxt_unit_request_t *) out->mem.free;
4698     out->mem.free += req_size;
4699 
4700     req->content_length = h->parsed_content_length;
4701 
4702     p = (u_char *) (req->fields + fields_count);
4703 
4704     nxt_debug(task, "fields_count=%d", (int) fields_count);
4705 
4706     req->method_length = h->method.length;
4707     nxt_unit_sptr_set(&req->method, p);
4708     p = nxt_cpymem(p, h->method.start, h->method.length);
4709     *p++ = '\0';
4710 
4711     req->version_length = h->version.length;
4712     nxt_unit_sptr_set(&req->version, p);
4713     p = nxt_cpymem(p, h->version.start, h->version.length);
4714     *p++ = '\0';
4715 
4716     req->remote_length = r->remote.length;
4717     nxt_unit_sptr_set(&req->remote, p);
4718     p = nxt_cpymem(p, r->remote.start, r->remote.length);
4719     *p++ = '\0';
4720 
4721     req->local_length = r->local.length;
4722     nxt_unit_sptr_set(&req->local, p);
4723     p = nxt_cpymem(p, r->local.start, r->local.length);
4724     *p++ = '\0';
4725 
4726     req->server_name_length = h->server_name.length;
4727     nxt_unit_sptr_set(&req->server_name, p);
4728     p = nxt_cpymem(p, h->server_name.start, h->server_name.length);
4729     *p++ = '\0';
4730 
4731     target_pos = p;
4732     req->target_length = h->target.length;
4733     nxt_unit_sptr_set(&req->target, p);
4734     p = nxt_cpymem(p, h->target.start, h->target.length);
4735     *p++ = '\0';
4736 
4737     req->path_length = h->path.length;
4738     if (h->path.start == h->target.start) {
4739         nxt_unit_sptr_set(&req->path, target_pos);
4740 
4741     } else {
4742         nxt_unit_sptr_set(&req->path, p);
4743         p = nxt_cpymem(p, h->path.start, h->path.length);
4744         *p++ = '\0';
4745     }
4746 
4747     req->query_length = h->query.length;
4748     if (h->query.start != NULL) {
4749         query_pos = nxt_pointer_to(target_pos,
4750                                    h->query.start - h->target.start);
4751 
4752         nxt_unit_sptr_set(&req->query, query_pos);
4753 
4754     } else {
4755         req->query.offset = 0;
4756     }
4757 
4758     req->content_length_field = NXT_UNIT_NONE_FIELD;
4759     req->content_type_field   = NXT_UNIT_NONE_FIELD;
4760     req->cookie_field         = NXT_UNIT_NONE_FIELD;
4761 
4762     dst_field = req->fields;
4763 
4764     for (field = nxt_fields_first(h->fields, &iter);
4765          field != NULL;
4766          field = nxt_fields_next(&iter))
4767     {
4768         if (field->skip) {
4769             continue;
4770         }
4771 
4772         dst_field->hash = field->hash;
4773         dst_field->skip = 0;
4774         dst_field->name_length = field->name_length + prefix->length;
4775         dst_field->value_length = field->value_length;
4776 
4777         if (field->value == h->content_length.start) {
4778             req->content_length_field = dst_field - req->fields;
4779 
4780         } else if (field->value == h->content_type.start) {
4781             req->content_type_field = dst_field - req->fields;
4782 
4783         } else if (field->value == h->cookie.start) {
4784             req->cookie_field = dst_field - req->fields;
4785         }
4786 
4787         nxt_debug(task, "add field 0x%04Xd, %d, %d, %p : %d %p",
4788                   (int) field->hash, (int) field->skip,
4789                   (int) field->name_length, field->name,
4790                   (int) field->value_length, field->value);
4791 
4792         if (prefix->length != 0) {
4793             nxt_unit_sptr_set(&dst_field->name, p);
4794             p = nxt_cpymem(p, prefix->start, prefix->length);
4795 
4796             end = field->name + field->name_length;
4797             for (pos = field->name; pos < end; pos++) {
4798                 c = *pos;
4799 
4800                 if (c >= 'a' && c <= 'z') {
4801                     *p++ = (c & ~0x20);
4802                     continue;
4803                 }
4804 
4805                 if (c == '-') {
4806                     *p++ = '_';
4807                     continue;
4808                 }
4809 
4810                 *p++ = c;
4811             }
4812 
4813         } else {
4814             nxt_unit_sptr_set(&dst_field->name, p);
4815             p = nxt_cpymem(p, field->name, field->name_length);
4816         }
4817 
4818         *p++ = '\0';
4819 
4820         nxt_unit_sptr_set(&dst_field->value, p);
4821         p = nxt_cpymem(p, field->value, field->value_length);
4822 
4823         if (prefix->length != 0) {
4824             dup_iter = iter;
4825 
4826             for (dup = nxt_fields_next(&dup_iter);
4827                  dup != NULL;
4828                  dup = nxt_fields_next(&dup_iter))
4829             {
4830                 if (dup->name_length != field->name_length
4831                     || dup->skip
4832                     || dup->hash != field->hash
4833                     || nxt_memcasecmp(dup->name, field->name, dup->name_length))
4834                 {
4835                     continue;
4836                 }
4837 
4838                 p = nxt_cpymem(p, ", ", 2);
4839                 p = nxt_cpymem(p, dup->value, dup->value_length);
4840 
4841                 dst_field->value_length += 2 + dup->value_length;
4842 
4843                 dup->skip = 1;
4844             }
4845         }
4846 
4847         *p++ = '\0';
4848 
4849         dst_field++;
4850     }
4851 
4852     req->fields_count = dst_field - req->fields;
4853 
4854     nxt_unit_sptr_set(&req->preread_content, out->mem.free);
4855 
4856     buf = out;
4857     tail = &buf->next;
4858 
4859     for (b = r->body.buf; b != NULL; b = b->next) {
4860         size = nxt_buf_mem_used_size(&b->mem);
4861         pos = b->mem.pos;
4862 
4863         while (size > 0) {
4864             if (buf == NULL) {
4865                 free_size = nxt_min(size, PORT_MMAP_DATA_SIZE);
4866 
4867                 buf = nxt_port_mmap_get_buf(task, port, free_size);
4868                 if (nxt_slow_path(buf == NULL)) {
4869                     while (out != NULL) {
4870                         buf = out->next;
4871                         out->completion_handler(task, out, out->parent);
4872                         out = buf;
4873                     }
4874                     return NULL;
4875                 }
4876 
4877                 *tail = buf;
4878                 tail = &buf->next;
4879 
4880             } else {
4881                 free_size = nxt_buf_mem_free_size(&buf->mem);
4882                 if (free_size < size
4883                     && nxt_port_mmap_increase_buf(task, buf, size, 1)
4884                        == NXT_OK)
4885                 {
4886                     free_size = nxt_buf_mem_free_size(&buf->mem);
4887                 }
4888             }
4889 
4890             if (free_size > 0) {
4891                 copy_size = nxt_min(free_size, size);
4892 
4893                 buf->mem.free = nxt_cpymem(buf->mem.free, pos, copy_size);
4894 
4895                 size -= copy_size;
4896                 pos += copy_size;
4897 
4898                 if (size == 0) {
4899                     break;
4900                 }
4901             }
4902 
4903             buf = NULL;
4904         }
4905     }
4906 
4907     return out;
4908 }
4909 
4910 
4911 static void
4912 nxt_router_app_timeout(nxt_task_t *task, void *obj, void *data)
4913 {
4914     nxt_app_t                *app;
4915     nxt_bool_t               cancelled, unlinked;
4916     nxt_port_t               *port;
4917     nxt_timer_t              *timer;
4918     nxt_queue_link_t         *lnk;
4919     nxt_req_app_link_t       *pending_ra;
4920     nxt_app_parse_ctx_t      *ar;
4921     nxt_req_conn_link_t      *rc;
4922     nxt_port_select_state_t  state;
4923 
4924     timer = obj;
4925 
4926     nxt_debug(task, "router app timeout");
4927 
4928     ar = nxt_timer_data(timer, nxt_app_parse_ctx_t, timer);
4929     rc = ar->timer_data;
4930     app = rc->app;
4931 
4932     if (app == NULL) {
4933         goto generate_error;
4934     }
4935 
4936     port = NULL;
4937     pending_ra = NULL;
4938 
4939     if (rc->app_port != NULL) {
4940         port = rc->app_port;
4941         rc->app_port = NULL;
4942     }
4943 
4944     if (port == NULL && rc->ra != NULL && rc->ra->app_port != NULL) {
4945         port = rc->ra->app_port;
4946         rc->ra->app_port = NULL;
4947     }
4948 
4949     if (port == NULL) {
4950         goto generate_error;
4951     }
4952 
4953     nxt_thread_mutex_lock(&app->mutex);
4954 
4955     unlinked = nxt_queue_chk_remove(&port->app_link);
4956 
4957     if (!nxt_queue_is_empty(&port->pending_requests)) {
4958         lnk = nxt_queue_first(&port->pending_requests);
4959 
4960         pending_ra = nxt_queue_link_data(lnk, nxt_req_app_link_t,
4961                                          link_port_pending);
4962 
4963         nxt_assert(pending_ra->link_app_pending.next != NULL);
4964 
4965         nxt_debug(task, "app '%V' pending request #%uD found",
4966                   &app->name, pending_ra->stream);
4967 
4968         cancelled = nxt_router_msg_cancel(task, &pending_ra->msg_info,
4969                                           pending_ra->stream);
4970 
4971         if (cancelled) {
4972             nxt_router_ra_inc_use(pending_ra);
4973 
4974             state.ra = pending_ra;
4975             state.app = app;
4976 
4977             nxt_router_port_select(task, &state);
4978 
4979         } else {
4980             pending_ra = NULL;
4981         }
4982     }
4983 
4984     nxt_thread_mutex_unlock(&app->mutex);
4985 
4986     if (pending_ra != NULL
4987         && nxt_router_port_post_select(task, &state) == NXT_OK)
4988     {
4989         nxt_router_app_prepare_request(task, pending_ra);
4990     }
4991 
4992     nxt_debug(task, "send quit to app '%V' pid %PI", &app->name, port->pid);
4993 
4994     nxt_port_socket_write(task, port, NXT_PORT_MSG_QUIT, -1, 0, 0, NULL);
4995 
4996     nxt_port_use(task, port, unlinked ? -2 : -1);
4997 
4998 generate_error:
4999 
5000     nxt_http_request_error(task, ar->request, NXT_HTTP_SERVICE_UNAVAILABLE);
5001 
5002     nxt_router_rc_unlink(task, rc);
5003 }
5004