xref: /unit/src/nxt_router.c (revision 2147:7bf58b1b18c4)
1 
2 /*
3  * Copyright (C) Igor Sysoev
4  * Copyright (C) Valentin V. Bartenev
5  * Copyright (C) NGINX, Inc.
6  */
7 
8 #include <nxt_router.h>
9 #include <nxt_conf.h>
10 #if (NXT_TLS)
11 #include <nxt_cert.h>
12 #endif
13 #include <nxt_http.h>
14 #include <nxt_port_memory_int.h>
15 #include <nxt_unit_request.h>
16 #include <nxt_unit_response.h>
17 #include <nxt_router_request.h>
18 #include <nxt_app_queue.h>
19 #include <nxt_port_queue.h>
20 
21 #define NXT_SHARED_PORT_ID  0xFFFFu
22 
23 typedef struct {
24     nxt_str_t         type;
25     uint32_t          processes;
26     uint32_t          max_processes;
27     uint32_t          spare_processes;
28     nxt_msec_t        timeout;
29     nxt_msec_t        idle_timeout;
30     nxt_conf_value_t  *limits_value;
31     nxt_conf_value_t  *processes_value;
32     nxt_conf_value_t  *targets_value;
33 } nxt_router_app_conf_t;
34 
35 
36 typedef struct {
37     nxt_str_t         pass;
38     nxt_str_t         application;
39 } nxt_router_listener_conf_t;
40 
41 
42 #if (NXT_TLS)
43 
44 typedef struct {
45     nxt_str_t               name;
46     nxt_socket_conf_t       *socket_conf;
47     nxt_router_temp_conf_t  *temp_conf;
48     nxt_tls_init_t          *tls_init;
49     nxt_bool_t              last;
50 
51     nxt_queue_link_t        link;  /* for nxt_socket_conf_t.tls */
52 } nxt_router_tlssock_t;
53 
54 #endif
55 
56 
57 typedef struct {
58     nxt_str_t               *name;
59     nxt_socket_conf_t       *socket_conf;
60     nxt_router_temp_conf_t  *temp_conf;
61     nxt_bool_t              last;
62 } nxt_socket_rpc_t;
63 
64 
65 typedef struct {
66     nxt_app_t               *app;
67     nxt_router_temp_conf_t  *temp_conf;
68     uint8_t                 proto;  /* 1 bit */
69 } nxt_app_rpc_t;
70 
71 
72 typedef struct {
73     nxt_app_joint_t         *app_joint;
74     uint32_t                generation;
75     uint8_t                 proto;  /* 1 bit */
76 } nxt_app_joint_rpc_t;
77 
78 
79 static nxt_int_t nxt_router_prefork(nxt_task_t *task, nxt_process_t *process,
80     nxt_mp_t *mp);
81 static nxt_int_t nxt_router_start(nxt_task_t *task, nxt_process_data_t *data);
82 static void nxt_router_greet_controller(nxt_task_t *task,
83     nxt_port_t *controller_port);
84 
85 static nxt_int_t nxt_router_start_app_process(nxt_task_t *task, nxt_app_t *app);
86 
87 static void nxt_router_new_port_handler(nxt_task_t *task,
88     nxt_port_recv_msg_t *msg);
89 static void nxt_router_conf_data_handler(nxt_task_t *task,
90     nxt_port_recv_msg_t *msg);
91 static void nxt_router_app_restart_handler(nxt_task_t *task,
92     nxt_port_recv_msg_t *msg);
93 static void nxt_router_remove_pid_handler(nxt_task_t *task,
94     nxt_port_recv_msg_t *msg);
95 static void nxt_router_access_log_reopen_handler(nxt_task_t *task,
96     nxt_port_recv_msg_t *msg);
97 
98 static nxt_router_temp_conf_t *nxt_router_temp_conf(nxt_task_t *task);
99 static void nxt_router_conf_apply(nxt_task_t *task, void *obj, void *data);
100 static void nxt_router_conf_ready(nxt_task_t *task,
101     nxt_router_temp_conf_t *tmcf);
102 static void nxt_router_conf_error(nxt_task_t *task,
103     nxt_router_temp_conf_t *tmcf);
104 static void nxt_router_conf_send(nxt_task_t *task,
105     nxt_router_temp_conf_t *tmcf, nxt_port_msg_type_t type);
106 
107 static nxt_int_t nxt_router_conf_create(nxt_task_t *task,
108     nxt_router_temp_conf_t *tmcf, u_char *start, u_char *end);
109 static nxt_int_t nxt_router_conf_process_static(nxt_task_t *task,
110     nxt_router_conf_t *rtcf, nxt_conf_value_t *conf);
111 static nxt_http_forward_t *nxt_router_conf_forward(nxt_task_t *task,
112     nxt_mp_t *mp, nxt_conf_value_t *conf);
113 static nxt_int_t nxt_router_conf_forward_header(nxt_mp_t *mp,
114     nxt_conf_value_t *conf, nxt_http_forward_header_t *fh);
115 
116 static nxt_app_t *nxt_router_app_find(nxt_queue_t *queue, nxt_str_t *name);
117 static nxt_int_t nxt_router_apps_hash_test(nxt_lvlhsh_query_t *lhq, void *data);
118 static nxt_int_t nxt_router_apps_hash_add(nxt_router_conf_t *rtcf,
119     nxt_app_t *app);
120 static nxt_app_t *nxt_router_apps_hash_get(nxt_router_conf_t *rtcf,
121     nxt_str_t *name);
122 static void nxt_router_apps_hash_use(nxt_task_t *task, nxt_router_conf_t *rtcf,
123     int i);
124 
125 static nxt_int_t nxt_router_app_queue_init(nxt_task_t *task,
126     nxt_port_t *port);
127 static nxt_int_t nxt_router_port_queue_init(nxt_task_t *task,
128     nxt_port_t *port);
129 static nxt_int_t nxt_router_port_queue_map(nxt_task_t *task,
130     nxt_port_t *port, nxt_fd_t fd);
131 static void nxt_router_listen_socket_rpc_create(nxt_task_t *task,
132     nxt_router_temp_conf_t *tmcf, nxt_socket_conf_t *skcf);
133 static void nxt_router_listen_socket_ready(nxt_task_t *task,
134     nxt_port_recv_msg_t *msg, void *data);
135 static void nxt_router_listen_socket_error(nxt_task_t *task,
136     nxt_port_recv_msg_t *msg, void *data);
137 #if (NXT_TLS)
138 static void nxt_router_tls_rpc_handler(nxt_task_t *task,
139     nxt_port_recv_msg_t *msg, void *data);
140 static nxt_int_t nxt_router_conf_tls_insert(nxt_router_temp_conf_t *tmcf,
141     nxt_conf_value_t *value, nxt_socket_conf_t *skcf, nxt_tls_init_t *tls_init,
142     nxt_bool_t last);
143 #endif
144 static void nxt_router_app_rpc_create(nxt_task_t *task,
145     nxt_router_temp_conf_t *tmcf, nxt_app_t *app);
146 static void nxt_router_app_prefork_ready(nxt_task_t *task,
147     nxt_port_recv_msg_t *msg, void *data);
148 static void nxt_router_app_prefork_error(nxt_task_t *task,
149     nxt_port_recv_msg_t *msg, void *data);
150 static nxt_socket_conf_t *nxt_router_socket_conf(nxt_task_t *task,
151     nxt_router_temp_conf_t *tmcf, nxt_str_t *name);
152 static nxt_int_t nxt_router_listen_socket_find(nxt_router_temp_conf_t *tmcf,
153     nxt_socket_conf_t *nskcf, nxt_sockaddr_t *sa);
154 
155 static nxt_int_t nxt_router_engines_create(nxt_task_t *task,
156     nxt_router_t *router, nxt_router_temp_conf_t *tmcf,
157     const nxt_event_interface_t *interface);
158 static nxt_int_t nxt_router_engine_conf_create(nxt_router_temp_conf_t *tmcf,
159     nxt_router_engine_conf_t *recf);
160 static nxt_int_t nxt_router_engine_conf_update(nxt_router_temp_conf_t *tmcf,
161     nxt_router_engine_conf_t *recf);
162 static nxt_int_t nxt_router_engine_conf_delete(nxt_router_temp_conf_t *tmcf,
163     nxt_router_engine_conf_t *recf);
164 static nxt_int_t nxt_router_engine_joints_create(nxt_router_temp_conf_t *tmcf,
165     nxt_router_engine_conf_t *recf, nxt_queue_t *sockets,
166     nxt_work_handler_t handler);
167 static nxt_int_t nxt_router_engine_quit(nxt_router_temp_conf_t *tmcf,
168     nxt_router_engine_conf_t *recf);
169 static nxt_int_t nxt_router_engine_joints_delete(nxt_router_temp_conf_t *tmcf,
170     nxt_router_engine_conf_t *recf, nxt_queue_t *sockets);
171 
172 static nxt_int_t nxt_router_threads_create(nxt_task_t *task, nxt_runtime_t *rt,
173     nxt_router_temp_conf_t *tmcf);
174 static nxt_int_t nxt_router_thread_create(nxt_task_t *task, nxt_runtime_t *rt,
175     nxt_event_engine_t *engine);
176 static void nxt_router_apps_sort(nxt_task_t *task, nxt_router_t *router,
177     nxt_router_temp_conf_t *tmcf);
178 
179 static void nxt_router_engines_post(nxt_router_t *router,
180     nxt_router_temp_conf_t *tmcf);
181 static void nxt_router_engine_post(nxt_event_engine_t *engine,
182     nxt_work_t *jobs);
183 
184 static void nxt_router_thread_start(void *data);
185 static void nxt_router_rt_add_port(nxt_task_t *task, void *obj,
186     void *data);
187 static void nxt_router_listen_socket_create(nxt_task_t *task, void *obj,
188     void *data);
189 static void nxt_router_listen_socket_update(nxt_task_t *task, void *obj,
190     void *data);
191 static void nxt_router_listen_socket_delete(nxt_task_t *task, void *obj,
192     void *data);
193 static void nxt_router_worker_thread_quit(nxt_task_t *task, void *obj,
194     void *data);
195 static void nxt_router_listen_socket_close(nxt_task_t *task, void *obj,
196     void *data);
197 static void nxt_router_thread_exit_handler(nxt_task_t *task, void *obj,
198     void *data);
199 static void nxt_router_req_headers_ack_handler(nxt_task_t *task,
200     nxt_port_recv_msg_t *msg, nxt_request_rpc_data_t *req_rpc_data);
201 static void nxt_router_listen_socket_release(nxt_task_t *task,
202     nxt_socket_conf_t *skcf);
203 
204 static void nxt_router_access_log_writer(nxt_task_t *task,
205     nxt_http_request_t *r, nxt_router_access_log_t *access_log);
206 static u_char *nxt_router_access_log_date(u_char *buf, nxt_realtime_t *now,
207     struct tm *tm, size_t size, const char *format);
208 static void nxt_router_access_log_open(nxt_task_t *task,
209     nxt_router_temp_conf_t *tmcf);
210 static void nxt_router_access_log_ready(nxt_task_t *task,
211     nxt_port_recv_msg_t *msg, void *data);
212 static void nxt_router_access_log_error(nxt_task_t *task,
213     nxt_port_recv_msg_t *msg, void *data);
214 static void nxt_router_access_log_use(nxt_thread_spinlock_t *lock,
215     nxt_router_access_log_t *access_log);
216 static void nxt_router_access_log_release(nxt_task_t *task,
217     nxt_thread_spinlock_t *lock, nxt_router_access_log_t *access_log);
218 static void nxt_router_access_log_reopen_completion(nxt_task_t *task, void *obj,
219     void *data);
220 static void nxt_router_access_log_reopen_ready(nxt_task_t *task,
221     nxt_port_recv_msg_t *msg, void *data);
222 static void nxt_router_access_log_reopen_error(nxt_task_t *task,
223     nxt_port_recv_msg_t *msg, void *data);
224 
225 static void nxt_router_app_port_ready(nxt_task_t *task,
226     nxt_port_recv_msg_t *msg, void *data);
227 static void nxt_router_app_port_error(nxt_task_t *task,
228     nxt_port_recv_msg_t *msg, void *data);
229 
230 static void nxt_router_app_use(nxt_task_t *task, nxt_app_t *app, int i);
231 static void nxt_router_app_unlink(nxt_task_t *task, nxt_app_t *app);
232 
233 static void nxt_router_app_port_release(nxt_task_t *task, nxt_app_t *app,
234     nxt_port_t *port, nxt_apr_action_t action);
235 static void nxt_router_app_port_get(nxt_task_t *task, nxt_app_t *app,
236     nxt_request_rpc_data_t *req_rpc_data);
237 static void nxt_router_http_request_error(nxt_task_t *task, void *obj,
238     void *data);
239 static void nxt_router_http_request_done(nxt_task_t *task, void *obj,
240     void *data);
241 
242 static void nxt_router_app_prepare_request(nxt_task_t *task,
243     nxt_request_rpc_data_t *req_rpc_data);
244 static nxt_buf_t *nxt_router_prepare_msg(nxt_task_t *task,
245     nxt_http_request_t *r, nxt_app_t *app, const nxt_str_t *prefix);
246 
247 static void nxt_router_app_timeout(nxt_task_t *task, void *obj, void *data);
248 static void nxt_router_adjust_idle_timer(nxt_task_t *task, void *obj,
249     void *data);
250 static void nxt_router_app_idle_timeout(nxt_task_t *task, void *obj,
251     void *data);
252 static void nxt_router_app_joint_release_handler(nxt_task_t *task, void *obj,
253     void *data);
254 static void nxt_router_free_app(nxt_task_t *task, void *obj, void *data);
255 
256 static const nxt_http_request_state_t  nxt_http_request_send_state;
257 static void nxt_http_request_send_body(nxt_task_t *task, void *obj, void *data);
258 
259 static void nxt_router_app_joint_use(nxt_task_t *task,
260     nxt_app_joint_t *app_joint, int i);
261 
262 static void nxt_router_http_request_release_post(nxt_task_t *task,
263     nxt_http_request_t *r);
264 static void nxt_router_http_request_release(nxt_task_t *task, void *obj,
265     void *data);
266 static void nxt_router_oosm_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg);
267 static void nxt_router_get_port_handler(nxt_task_t *task,
268     nxt_port_recv_msg_t *msg);
269 static void nxt_router_get_mmap_handler(nxt_task_t *task,
270     nxt_port_recv_msg_t *msg);
271 
272 extern const nxt_http_request_state_t  nxt_http_websocket;
273 
274 static nxt_router_t  *nxt_router;
275 
276 static const nxt_str_t http_prefix = nxt_string("HTTP_");
277 static const nxt_str_t empty_prefix = nxt_string("");
278 
279 static const nxt_str_t  *nxt_app_msg_prefix[] = {
280     &empty_prefix,
281     &empty_prefix,
282     &http_prefix,
283     &http_prefix,
284     &http_prefix,
285     &empty_prefix,
286 };
287 
288 
289 static const nxt_port_handlers_t  nxt_router_process_port_handlers = {
290     .quit         = nxt_signal_quit_handler,
291     .new_port     = nxt_router_new_port_handler,
292     .get_port     = nxt_router_get_port_handler,
293     .change_file  = nxt_port_change_log_file_handler,
294     .mmap         = nxt_port_mmap_handler,
295     .get_mmap     = nxt_router_get_mmap_handler,
296     .data         = nxt_router_conf_data_handler,
297     .app_restart  = nxt_router_app_restart_handler,
298     .remove_pid   = nxt_router_remove_pid_handler,
299     .access_log   = nxt_router_access_log_reopen_handler,
300     .rpc_ready    = nxt_port_rpc_handler,
301     .rpc_error    = nxt_port_rpc_handler,
302     .oosm         = nxt_router_oosm_handler,
303 };
304 
305 
306 const nxt_process_init_t  nxt_router_process = {
307     .name           = "router",
308     .type           = NXT_PROCESS_ROUTER,
309     .prefork        = nxt_router_prefork,
310     .restart        = 1,
311     .setup          = nxt_process_core_setup,
312     .start          = nxt_router_start,
313     .port_handlers  = &nxt_router_process_port_handlers,
314     .signals        = nxt_process_signals,
315 };
316 
317 
318 /* Queues of nxt_socket_conf_t */
319 nxt_queue_t  creating_sockets;
320 nxt_queue_t  pending_sockets;
321 nxt_queue_t  updating_sockets;
322 nxt_queue_t  keeping_sockets;
323 nxt_queue_t  deleting_sockets;
324 
325 
326 static nxt_int_t
327 nxt_router_prefork(nxt_task_t *task, nxt_process_t *process, nxt_mp_t *mp)
328 {
329     nxt_runtime_stop_app_processes(task, task->thread->runtime);
330 
331     return NXT_OK;
332 }
333 
334 
335 static nxt_int_t
336 nxt_router_start(nxt_task_t *task, nxt_process_data_t *data)
337 {
338     nxt_int_t      ret;
339     nxt_port_t     *controller_port;
340     nxt_router_t   *router;
341     nxt_runtime_t  *rt;
342 
343     rt = task->thread->runtime;
344 
345     nxt_log(task, NXT_LOG_INFO, "router started");
346 
347 #if (NXT_TLS)
348     rt->tls = nxt_service_get(rt->services, "SSL/TLS", "OpenSSL");
349     if (nxt_slow_path(rt->tls == NULL)) {
350         return NXT_ERROR;
351     }
352 
353     ret = rt->tls->library_init(task);
354     if (nxt_slow_path(ret != NXT_OK)) {
355         return ret;
356     }
357 #endif
358 
359     ret = nxt_http_init(task);
360     if (nxt_slow_path(ret != NXT_OK)) {
361         return ret;
362     }
363 
364     router = nxt_zalloc(sizeof(nxt_router_t));
365     if (nxt_slow_path(router == NULL)) {
366         return NXT_ERROR;
367     }
368 
369     nxt_queue_init(&router->engines);
370     nxt_queue_init(&router->sockets);
371     nxt_queue_init(&router->apps);
372 
373     nxt_router = router;
374 
375     controller_port = rt->port_by_type[NXT_PROCESS_CONTROLLER];
376     if (controller_port != NULL) {
377         nxt_router_greet_controller(task, controller_port);
378     }
379 
380     return NXT_OK;
381 }
382 
383 
384 static void
385 nxt_router_greet_controller(nxt_task_t *task, nxt_port_t *controller_port)
386 {
387     nxt_port_socket_write(task, controller_port, NXT_PORT_MSG_PROCESS_READY,
388                           -1, 0, 0, NULL);
389 }
390 
391 
392 static void
393 nxt_router_start_app_process_handler(nxt_task_t *task, nxt_port_t *port,
394     void *data)
395 {
396     size_t               size;
397     uint32_t             stream;
398     nxt_fd_t             port_fd, queue_fd;
399     nxt_int_t            ret;
400     nxt_app_t            *app;
401     nxt_buf_t            *b;
402     nxt_port_t           *dport;
403     nxt_runtime_t        *rt;
404     nxt_app_joint_rpc_t  *app_joint_rpc;
405 
406     app = data;
407 
408     nxt_thread_mutex_lock(&app->mutex);
409 
410     dport = app->proto_port;
411 
412     nxt_thread_mutex_unlock(&app->mutex);
413 
414     if (dport != NULL) {
415         nxt_debug(task, "app '%V' %p start process", &app->name, app);
416 
417         b = NULL;
418         port_fd = -1;
419         queue_fd = -1;
420 
421     } else {
422         if (app->proto_port_requests > 0) {
423             nxt_debug(task, "app '%V' %p wait for prototype process",
424                       &app->name, app);
425 
426             app->proto_port_requests++;
427 
428             goto skip;
429         }
430 
431         nxt_debug(task, "app '%V' %p start prototype process", &app->name, app);
432 
433         rt = task->thread->runtime;
434         dport = rt->port_by_type[NXT_PROCESS_MAIN];
435 
436         size = app->name.length + 1 + app->conf.length;
437 
438         b = nxt_buf_mem_alloc(task->thread->engine->mem_pool, size, 0);
439         if (nxt_slow_path(b == NULL)) {
440             goto failed;
441         }
442 
443         nxt_buf_cpystr(b, &app->name);
444         *b->mem.free++ = '\0';
445         nxt_buf_cpystr(b, &app->conf);
446 
447         port_fd = app->shared_port->pair[0];
448         queue_fd = app->shared_port->queue_fd;
449     }
450 
451     app_joint_rpc = nxt_port_rpc_register_handler_ex(task, port,
452                                                      nxt_router_app_port_ready,
453                                                      nxt_router_app_port_error,
454                                                    sizeof(nxt_app_joint_rpc_t));
455     if (nxt_slow_path(app_joint_rpc == NULL)) {
456         goto failed;
457     }
458 
459     stream = nxt_port_rpc_ex_stream(app_joint_rpc);
460 
461     ret = nxt_port_socket_write2(task, dport, NXT_PORT_MSG_START_PROCESS,
462                                  port_fd, queue_fd, stream, port->id, b);
463     if (nxt_slow_path(ret != NXT_OK)) {
464         nxt_port_rpc_cancel(task, port, stream);
465 
466         goto failed;
467     }
468 
469     app_joint_rpc->app_joint = app->joint;
470     app_joint_rpc->generation = app->generation;
471     app_joint_rpc->proto = (b != NULL);
472 
473     if (b != NULL) {
474         app->proto_port_requests++;
475 
476         b = NULL;
477     }
478 
479     nxt_router_app_joint_use(task, app->joint, 1);
480 
481 failed:
482 
483     if (b != NULL) {
484         nxt_mp_free(b->data, b);
485     }
486 
487 skip:
488 
489     nxt_router_app_use(task, app, -1);
490 }
491 
492 
493 static void
494 nxt_router_app_joint_use(nxt_task_t *task, nxt_app_joint_t *app_joint, int i)
495 {
496     app_joint->use_count += i;
497 
498     if (app_joint->use_count == 0) {
499         nxt_assert(app_joint->app == NULL);
500 
501         nxt_free(app_joint);
502     }
503 }
504 
505 
506 static nxt_int_t
507 nxt_router_start_app_process(nxt_task_t *task, nxt_app_t *app)
508 {
509     nxt_int_t      res;
510     nxt_port_t     *router_port;
511     nxt_runtime_t  *rt;
512 
513     nxt_debug(task, "app '%V' start process", &app->name);
514 
515     rt = task->thread->runtime;
516     router_port = rt->port_by_type[NXT_PROCESS_ROUTER];
517 
518     nxt_router_app_use(task, app, 1);
519 
520     res = nxt_port_post(task, router_port, nxt_router_start_app_process_handler,
521                         app);
522 
523     if (res == NXT_OK) {
524         return res;
525     }
526 
527     nxt_thread_mutex_lock(&app->mutex);
528 
529     app->pending_processes--;
530 
531     nxt_thread_mutex_unlock(&app->mutex);
532 
533     nxt_router_app_use(task, app, -1);
534 
535     return NXT_ERROR;
536 }
537 
538 
539 nxt_inline nxt_bool_t
540 nxt_router_msg_cancel(nxt_task_t *task, nxt_request_rpc_data_t *req_rpc_data)
541 {
542     nxt_buf_t       *b, *next;
543     nxt_bool_t      cancelled;
544     nxt_port_t      *app_port;
545     nxt_msg_info_t  *msg_info;
546 
547     msg_info = &req_rpc_data->msg_info;
548 
549     if (msg_info->buf == NULL) {
550         return 0;
551     }
552 
553     app_port = req_rpc_data->app_port;
554 
555     if (app_port != NULL && app_port->id == NXT_SHARED_PORT_ID) {
556         cancelled = nxt_app_queue_cancel(app_port->queue,
557                                          msg_info->tracking_cookie,
558                                          req_rpc_data->stream);
559 
560         if (cancelled) {
561             nxt_debug(task, "stream #%uD: cancelled by router",
562                       req_rpc_data->stream);
563         }
564 
565     } else {
566         cancelled = 0;
567     }
568 
569     for (b = msg_info->buf; b != NULL; b = next) {
570         next = b->next;
571         b->next = NULL;
572 
573         if (b->is_port_mmap_sent) {
574             b->is_port_mmap_sent = cancelled == 0;
575         }
576 
577         b->completion_handler(task, b, b->parent);
578     }
579 
580     msg_info->buf = NULL;
581 
582     return cancelled;
583 }
584 
585 
586 nxt_inline nxt_bool_t
587 nxt_queue_chk_remove(nxt_queue_link_t *lnk)
588 {
589     if (lnk->next != NULL) {
590         nxt_queue_remove(lnk);
591 
592         lnk->next = NULL;
593 
594         return 1;
595     }
596 
597     return 0;
598 }
599 
600 
601 nxt_inline void
602 nxt_request_rpc_data_unlink(nxt_task_t *task,
603     nxt_request_rpc_data_t *req_rpc_data)
604 {
605     nxt_app_t           *app;
606     nxt_bool_t          unlinked;
607     nxt_http_request_t  *r;
608 
609     nxt_router_msg_cancel(task, req_rpc_data);
610 
611     app = req_rpc_data->app;
612 
613     if (req_rpc_data->app_port != NULL) {
614         nxt_router_app_port_release(task, app, req_rpc_data->app_port,
615                                     req_rpc_data->apr_action);
616 
617         req_rpc_data->app_port = NULL;
618     }
619 
620     r = req_rpc_data->request;
621 
622     if (r != NULL) {
623         r->timer_data = NULL;
624 
625         nxt_router_http_request_release_post(task, r);
626 
627         r->req_rpc_data = NULL;
628         req_rpc_data->request = NULL;
629 
630         if (app != NULL) {
631             unlinked = 0;
632 
633             nxt_thread_mutex_lock(&app->mutex);
634 
635             if (r->app_link.next != NULL) {
636                 nxt_queue_remove(&r->app_link);
637                 r->app_link.next = NULL;
638 
639                 unlinked = 1;
640             }
641 
642             nxt_thread_mutex_unlock(&app->mutex);
643 
644             if (unlinked) {
645                 nxt_mp_release(r->mem_pool);
646             }
647         }
648     }
649 
650     if (app != NULL) {
651         nxt_router_app_use(task, app, -1);
652 
653         req_rpc_data->app = NULL;
654     }
655 
656     if (req_rpc_data->msg_info.body_fd != -1) {
657         nxt_fd_close(req_rpc_data->msg_info.body_fd);
658 
659         req_rpc_data->msg_info.body_fd = -1;
660     }
661 
662     if (req_rpc_data->rpc_cancel) {
663         req_rpc_data->rpc_cancel = 0;
664 
665         nxt_port_rpc_cancel(task, task->thread->engine->port,
666                             req_rpc_data->stream);
667     }
668 }
669 
670 
671 static void
672 nxt_router_new_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
673 {
674     nxt_int_t      res;
675     nxt_app_t      *app;
676     nxt_port_t     *port, *main_app_port;
677     nxt_runtime_t  *rt;
678 
679     nxt_port_new_port_handler(task, msg);
680 
681     port = msg->u.new_port;
682 
683     if (port != NULL && port->type == NXT_PROCESS_CONTROLLER) {
684         nxt_router_greet_controller(task, msg->u.new_port);
685     }
686 
687     if (port != NULL && port->type == NXT_PROCESS_PROTOTYPE)  {
688         nxt_port_rpc_handler(task, msg);
689 
690         return;
691     }
692 
693     if (port == NULL || port->type != NXT_PROCESS_APP) {
694 
695         if (msg->port_msg.stream == 0) {
696             return;
697         }
698 
699         msg->port_msg.type = _NXT_PORT_MSG_RPC_ERROR;
700 
701     } else {
702         if (msg->fd[1] != -1) {
703             res = nxt_router_port_queue_map(task, port, msg->fd[1]);
704             if (nxt_slow_path(res != NXT_OK)) {
705                 return;
706             }
707 
708             nxt_fd_close(msg->fd[1]);
709             msg->fd[1] = -1;
710         }
711     }
712 
713     if (msg->port_msg.stream != 0) {
714         nxt_port_rpc_handler(task, msg);
715         return;
716     }
717 
718     nxt_debug(task, "new port id %d (%d)", port->id, port->type);
719 
720     /*
721      * Port with "id == 0" is application 'main' port and it always
722      * should come with non-zero stream.
723      */
724     nxt_assert(port->id != 0);
725 
726     /* Find 'main' app port and get app reference. */
727     rt = task->thread->runtime;
728 
729     /*
730      * It is safe to access 'runtime->ports' hash because 'NEW_PORT'
731      * sent to main port (with id == 0) and processed in main thread.
732      */
733     main_app_port = nxt_port_hash_find(&rt->ports, port->pid, 0);
734     nxt_assert(main_app_port != NULL);
735 
736     app = main_app_port->app;
737 
738     if (nxt_fast_path(app != NULL)) {
739         nxt_thread_mutex_lock(&app->mutex);
740 
741         /* TODO here should be find-and-add code because there can be
742            port waiters in port_hash */
743         nxt_port_hash_add(&app->port_hash, port);
744         app->port_hash_count++;
745 
746         nxt_thread_mutex_unlock(&app->mutex);
747 
748         port->app = app;
749     }
750 
751     port->main_app_port = main_app_port;
752 
753     nxt_port_socket_write(task, port, NXT_PORT_MSG_PORT_ACK, -1, 0, 0, NULL);
754 }
755 
756 
757 static void
758 nxt_router_conf_data_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
759 {
760     void                    *p;
761     size_t                  size;
762     nxt_int_t               ret;
763     nxt_port_t              *port;
764     nxt_router_temp_conf_t  *tmcf;
765 
766     port = nxt_runtime_port_find(task->thread->runtime,
767                                  msg->port_msg.pid,
768                                  msg->port_msg.reply_port);
769     if (nxt_slow_path(port == NULL)) {
770         nxt_alert(task, "conf_data_handler: reply port not found");
771         return;
772     }
773 
774     p = MAP_FAILED;
775 
776     /*
777      * Ancient compilers like gcc 4.8.5 on CentOS 7 wants 'size' to be
778      * initialized in 'cleanup' section.
779      */
780     size = 0;
781 
782     tmcf = nxt_router_temp_conf(task);
783     if (nxt_slow_path(tmcf == NULL)) {
784         goto fail;
785     }
786 
787     if (nxt_slow_path(msg->fd[0] == -1)) {
788         nxt_alert(task, "conf_data_handler: invalid shm fd");
789         goto fail;
790     }
791 
792     if (nxt_buf_mem_used_size(&msg->buf->mem) != sizeof(size_t)) {
793         nxt_alert(task, "conf_data_handler: unexpected buffer size (%d)",
794                   (int) nxt_buf_mem_used_size(&msg->buf->mem));
795         goto fail;
796     }
797 
798     nxt_memcpy(&size, msg->buf->mem.pos, sizeof(size_t));
799 
800     p = nxt_mem_mmap(NULL, size, PROT_READ, MAP_SHARED, msg->fd[0], 0);
801 
802     nxt_fd_close(msg->fd[0]);
803     msg->fd[0] = -1;
804 
805     if (nxt_slow_path(p == MAP_FAILED)) {
806         goto fail;
807     }
808 
809     nxt_debug(task, "conf_data_handler(%uz): %*s", size, size, p);
810 
811     tmcf->router_conf->router = nxt_router;
812     tmcf->stream = msg->port_msg.stream;
813     tmcf->port = port;
814 
815     nxt_port_use(task, tmcf->port, 1);
816 
817     ret = nxt_router_conf_create(task, tmcf, p, nxt_pointer_to(p, size));
818 
819     if (nxt_fast_path(ret == NXT_OK)) {
820         nxt_router_conf_apply(task, tmcf, NULL);
821 
822     } else {
823         nxt_router_conf_error(task, tmcf);
824     }
825 
826     goto cleanup;
827 
828 fail:
829 
830     nxt_port_socket_write(task, port, NXT_PORT_MSG_RPC_ERROR, -1,
831                           msg->port_msg.stream, 0, NULL);
832 
833     if (tmcf != NULL) {
834         nxt_mp_release(tmcf->mem_pool);
835     }
836 
837 cleanup:
838 
839     if (p != MAP_FAILED) {
840         nxt_mem_munmap(p, size);
841     }
842 
843     if (msg->fd[0] != -1) {
844         nxt_fd_close(msg->fd[0]);
845         msg->fd[0] = -1;
846     }
847 }
848 
849 
850 static void
851 nxt_router_app_restart_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
852 {
853     nxt_app_t            *app;
854     nxt_int_t            ret;
855     nxt_str_t            app_name;
856     nxt_port_t           *reply_port, *shared_port, *old_shared_port;
857     nxt_port_t           *proto_port;
858     nxt_port_msg_type_t  reply;
859 
860     reply_port = nxt_runtime_port_find(task->thread->runtime,
861                                        msg->port_msg.pid,
862                                        msg->port_msg.reply_port);
863     if (nxt_slow_path(reply_port == NULL)) {
864         nxt_alert(task, "app_restart_handler: reply port not found");
865         return;
866     }
867 
868     app_name.length = nxt_buf_mem_used_size(&msg->buf->mem);
869     app_name.start = msg->buf->mem.pos;
870 
871     nxt_debug(task, "app_restart_handler: %V", &app_name);
872 
873     app = nxt_router_app_find(&nxt_router->apps, &app_name);
874 
875     if (nxt_fast_path(app != NULL)) {
876         shared_port = nxt_port_new(task, NXT_SHARED_PORT_ID, nxt_pid,
877                                    NXT_PROCESS_APP);
878         if (nxt_slow_path(shared_port == NULL)) {
879             goto fail;
880         }
881 
882         ret = nxt_port_socket_init(task, shared_port, 0);
883         if (nxt_slow_path(ret != NXT_OK)) {
884             nxt_port_use(task, shared_port, -1);
885             goto fail;
886         }
887 
888         ret = nxt_router_app_queue_init(task, shared_port);
889         if (nxt_slow_path(ret != NXT_OK)) {
890             nxt_port_write_close(shared_port);
891             nxt_port_read_close(shared_port);
892             nxt_port_use(task, shared_port, -1);
893             goto fail;
894         }
895 
896         nxt_port_write_enable(task, shared_port);
897 
898         nxt_thread_mutex_lock(&app->mutex);
899 
900         proto_port = app->proto_port;
901 
902         if (proto_port != NULL) {
903             nxt_debug(task, "send QUIT to prototype '%V' pid %PI", &app->name,
904                       proto_port->pid);
905 
906             app->proto_port = NULL;
907             proto_port->app = NULL;
908         }
909 
910         app->generation++;
911 
912         shared_port->app = app;
913 
914         old_shared_port = app->shared_port;
915         old_shared_port->app = NULL;
916 
917         app->shared_port = shared_port;
918 
919         nxt_thread_mutex_unlock(&app->mutex);
920 
921         nxt_port_close(task, old_shared_port);
922         nxt_port_use(task, old_shared_port, -1);
923 
924         if (proto_port != NULL) {
925             (void) nxt_port_socket_write(task, proto_port, NXT_PORT_MSG_QUIT,
926                                          -1, 0, 0, NULL);
927 
928             nxt_port_close(task, proto_port);
929 
930             nxt_port_use(task, proto_port, -1);
931         }
932 
933         reply = NXT_PORT_MSG_RPC_READY_LAST;
934 
935     } else {
936 
937 fail:
938 
939         reply = NXT_PORT_MSG_RPC_ERROR;
940     }
941 
942     nxt_port_socket_write(task, reply_port, reply, -1, msg->port_msg.stream,
943                           0, NULL);
944 }
945 
946 
947 static void
948 nxt_router_app_process_remove_pid(nxt_task_t *task, nxt_port_t *port,
949     void *data)
950 {
951     union {
952         nxt_pid_t  removed_pid;
953         void       *data;
954     } u;
955 
956     u.data = data;
957 
958     nxt_port_rpc_remove_peer(task, port, u.removed_pid);
959 }
960 
961 
962 static void
963 nxt_router_remove_pid_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
964 {
965     nxt_event_engine_t  *engine;
966 
967     nxt_port_remove_pid_handler(task, msg);
968 
969     nxt_queue_each(engine, &nxt_router->engines, nxt_event_engine_t, link0)
970     {
971         if (nxt_fast_path(engine->port != NULL)) {
972             nxt_port_post(task, engine->port, nxt_router_app_process_remove_pid,
973                           msg->u.data);
974         }
975     }
976     nxt_queue_loop;
977 
978     if (msg->port_msg.stream == 0) {
979         return;
980     }
981 
982     msg->port_msg.type = _NXT_PORT_MSG_RPC_ERROR;
983 
984     nxt_port_rpc_handler(task, msg);
985 }
986 
987 
988 static nxt_router_temp_conf_t *
989 nxt_router_temp_conf(nxt_task_t *task)
990 {
991     nxt_mp_t                *mp, *tmp;
992     nxt_router_conf_t       *rtcf;
993     nxt_router_temp_conf_t  *tmcf;
994 
995     mp = nxt_mp_create(1024, 128, 256, 32);
996     if (nxt_slow_path(mp == NULL)) {
997         return NULL;
998     }
999 
1000     rtcf = nxt_mp_zget(mp, sizeof(nxt_router_conf_t));
1001     if (nxt_slow_path(rtcf == NULL)) {
1002         goto fail;
1003     }
1004 
1005     rtcf->mem_pool = mp;
1006 
1007     rtcf->var_fields = nxt_array_create(mp, 4, sizeof(nxt_var_field_t));
1008     if (nxt_slow_path(rtcf->var_fields == NULL)) {
1009         goto fail;
1010     }
1011 
1012     tmp = nxt_mp_create(1024, 128, 256, 32);
1013     if (nxt_slow_path(tmp == NULL)) {
1014         goto fail;
1015     }
1016 
1017     tmcf = nxt_mp_zget(tmp, sizeof(nxt_router_temp_conf_t));
1018     if (nxt_slow_path(tmcf == NULL)) {
1019         goto temp_fail;
1020     }
1021 
1022     tmcf->mem_pool = tmp;
1023     tmcf->router_conf = rtcf;
1024     tmcf->count = 1;
1025     tmcf->engine = task->thread->engine;
1026 
1027     tmcf->engines = nxt_array_create(tmcf->mem_pool, 4,
1028                                      sizeof(nxt_router_engine_conf_t));
1029     if (nxt_slow_path(tmcf->engines == NULL)) {
1030         goto temp_fail;
1031     }
1032 
1033     nxt_queue_init(&creating_sockets);
1034     nxt_queue_init(&pending_sockets);
1035     nxt_queue_init(&updating_sockets);
1036     nxt_queue_init(&keeping_sockets);
1037     nxt_queue_init(&deleting_sockets);
1038 
1039 #if (NXT_TLS)
1040     nxt_queue_init(&tmcf->tls);
1041 #endif
1042 
1043     nxt_queue_init(&tmcf->apps);
1044     nxt_queue_init(&tmcf->previous);
1045 
1046     return tmcf;
1047 
1048 temp_fail:
1049 
1050     nxt_mp_destroy(tmp);
1051 
1052 fail:
1053 
1054     nxt_mp_destroy(mp);
1055 
1056     return NULL;
1057 }
1058 
1059 
1060 nxt_inline nxt_bool_t
1061 nxt_router_app_can_start(nxt_app_t *app)
1062 {
1063     return app->processes + app->pending_processes < app->max_processes
1064             && app->pending_processes < app->max_pending_processes;
1065 }
1066 
1067 
1068 nxt_inline nxt_bool_t
1069 nxt_router_app_need_start(nxt_app_t *app)
1070 {
1071     return (app->active_requests
1072               > app->port_hash_count + app->pending_processes)
1073            || (app->spare_processes
1074                 > app->idle_processes + app->pending_processes);
1075 }
1076 
1077 
1078 static void
1079 nxt_router_conf_apply(nxt_task_t *task, void *obj, void *data)
1080 {
1081     nxt_int_t                    ret;
1082     nxt_app_t                    *app;
1083     nxt_router_t                 *router;
1084     nxt_runtime_t                *rt;
1085     nxt_queue_link_t             *qlk;
1086     nxt_socket_conf_t            *skcf;
1087     nxt_router_conf_t            *rtcf;
1088     nxt_router_temp_conf_t       *tmcf;
1089     const nxt_event_interface_t  *interface;
1090 #if (NXT_TLS)
1091     nxt_router_tlssock_t         *tls;
1092 #endif
1093 
1094     tmcf = obj;
1095 
1096     qlk = nxt_queue_first(&pending_sockets);
1097 
1098     if (qlk != nxt_queue_tail(&pending_sockets)) {
1099         nxt_queue_remove(qlk);
1100         nxt_queue_insert_tail(&creating_sockets, qlk);
1101 
1102         skcf = nxt_queue_link_data(qlk, nxt_socket_conf_t, link);
1103 
1104         nxt_router_listen_socket_rpc_create(task, tmcf, skcf);
1105 
1106         return;
1107     }
1108 
1109 #if (NXT_TLS)
1110     qlk = nxt_queue_last(&tmcf->tls);
1111 
1112     if (qlk != nxt_queue_head(&tmcf->tls)) {
1113         nxt_queue_remove(qlk);
1114 
1115         tls = nxt_queue_link_data(qlk, nxt_router_tlssock_t, link);
1116 
1117         nxt_cert_store_get(task, &tls->name, tmcf->mem_pool,
1118                            nxt_router_tls_rpc_handler, tls);
1119         return;
1120     }
1121 #endif
1122 
1123     nxt_queue_each(app, &tmcf->apps, nxt_app_t, link) {
1124 
1125         if (nxt_router_app_need_start(app)) {
1126             nxt_router_app_rpc_create(task, tmcf, app);
1127             return;
1128         }
1129 
1130     } nxt_queue_loop;
1131 
1132     rtcf = tmcf->router_conf;
1133 
1134     if (rtcf->access_log != NULL && rtcf->access_log->fd == -1) {
1135         nxt_router_access_log_open(task, tmcf);
1136         return;
1137     }
1138 
1139     rt = task->thread->runtime;
1140 
1141     interface = nxt_service_get(rt->services, "engine", NULL);
1142 
1143     router = rtcf->router;
1144 
1145     ret = nxt_router_engines_create(task, router, tmcf, interface);
1146     if (nxt_slow_path(ret != NXT_OK)) {
1147         goto fail;
1148     }
1149 
1150     ret = nxt_router_threads_create(task, rt, tmcf);
1151     if (nxt_slow_path(ret != NXT_OK)) {
1152         goto fail;
1153     }
1154 
1155     nxt_router_apps_sort(task, router, tmcf);
1156 
1157     nxt_router_apps_hash_use(task, rtcf, 1);
1158 
1159     nxt_router_engines_post(router, tmcf);
1160 
1161     nxt_queue_add(&router->sockets, &updating_sockets);
1162     nxt_queue_add(&router->sockets, &creating_sockets);
1163 
1164     if (router->access_log != rtcf->access_log) {
1165         nxt_router_access_log_use(&router->lock, rtcf->access_log);
1166 
1167         nxt_router_access_log_release(task, &router->lock, router->access_log);
1168 
1169         router->access_log = rtcf->access_log;
1170     }
1171 
1172     nxt_router_conf_ready(task, tmcf);
1173 
1174     return;
1175 
1176 fail:
1177 
1178     nxt_router_conf_error(task, tmcf);
1179 
1180     return;
1181 }
1182 
1183 
1184 static void
1185 nxt_router_conf_wait(nxt_task_t *task, void *obj, void *data)
1186 {
1187     nxt_joint_job_t  *job;
1188 
1189     job = obj;
1190 
1191     nxt_router_conf_ready(task, job->tmcf);
1192 }
1193 
1194 
1195 static void
1196 nxt_router_conf_ready(nxt_task_t *task, nxt_router_temp_conf_t *tmcf)
1197 {
1198     uint32_t               count;
1199     nxt_router_conf_t      *rtcf;
1200     nxt_thread_spinlock_t  *lock;
1201 
1202     nxt_debug(task, "temp conf %p count: %D", tmcf, tmcf->count);
1203 
1204     if (--tmcf->count > 0) {
1205         return;
1206     }
1207 
1208     nxt_router_conf_send(task, tmcf, NXT_PORT_MSG_RPC_READY_LAST);
1209 
1210     rtcf = tmcf->router_conf;
1211 
1212     lock = &rtcf->router->lock;
1213 
1214     nxt_thread_spin_lock(lock);
1215 
1216     count = rtcf->count;
1217 
1218     nxt_thread_spin_unlock(lock);
1219 
1220     nxt_debug(task, "rtcf %p: %D", rtcf, count);
1221 
1222     if (count == 0) {
1223         nxt_router_apps_hash_use(task, rtcf, -1);
1224 
1225         nxt_router_access_log_release(task, lock, rtcf->access_log);
1226 
1227         nxt_mp_destroy(rtcf->mem_pool);
1228     }
1229 
1230     nxt_mp_release(tmcf->mem_pool);
1231 }
1232 
1233 
1234 static void
1235 nxt_router_conf_error(nxt_task_t *task, nxt_router_temp_conf_t *tmcf)
1236 {
1237     nxt_app_t          *app;
1238     nxt_socket_t       s;
1239     nxt_router_t       *router;
1240     nxt_queue_link_t   *qlk;
1241     nxt_socket_conf_t  *skcf;
1242     nxt_router_conf_t  *rtcf;
1243 
1244     nxt_alert(task, "failed to apply new conf");
1245 
1246     for (qlk = nxt_queue_first(&creating_sockets);
1247          qlk != nxt_queue_tail(&creating_sockets);
1248          qlk = nxt_queue_next(qlk))
1249     {
1250         skcf = nxt_queue_link_data(qlk, nxt_socket_conf_t, link);
1251         s = skcf->listen->socket;
1252 
1253         if (s != -1) {
1254             nxt_socket_close(task, s);
1255         }
1256 
1257         nxt_free(skcf->listen);
1258     }
1259 
1260     rtcf = tmcf->router_conf;
1261 
1262     nxt_queue_each(app, &tmcf->apps, nxt_app_t, link) {
1263 
1264         nxt_router_app_unlink(task, app);
1265 
1266     } nxt_queue_loop;
1267 
1268     router = rtcf->router;
1269 
1270     nxt_queue_add(&router->sockets, &keeping_sockets);
1271     nxt_queue_add(&router->sockets, &deleting_sockets);
1272 
1273     nxt_queue_add(&router->apps, &tmcf->previous);
1274 
1275     // TODO: new engines and threads
1276 
1277     nxt_router_access_log_release(task, &router->lock, rtcf->access_log);
1278 
1279     nxt_mp_destroy(rtcf->mem_pool);
1280 
1281     nxt_router_conf_send(task, tmcf, NXT_PORT_MSG_RPC_ERROR);
1282 
1283     nxt_mp_release(tmcf->mem_pool);
1284 }
1285 
1286 
1287 static void
1288 nxt_router_conf_send(nxt_task_t *task, nxt_router_temp_conf_t *tmcf,
1289     nxt_port_msg_type_t type)
1290 {
1291     nxt_port_socket_write(task, tmcf->port, type, -1, tmcf->stream, 0, NULL);
1292 
1293     nxt_port_use(task, tmcf->port, -1);
1294 
1295     tmcf->port = NULL;
1296 }
1297 
1298 
1299 static nxt_conf_map_t  nxt_router_conf[] = {
1300     {
1301         nxt_string("listeners_threads"),
1302         NXT_CONF_MAP_INT32,
1303         offsetof(nxt_router_conf_t, threads),
1304     },
1305 };
1306 
1307 
1308 static nxt_conf_map_t  nxt_router_app_conf[] = {
1309     {
1310         nxt_string("type"),
1311         NXT_CONF_MAP_STR,
1312         offsetof(nxt_router_app_conf_t, type),
1313     },
1314 
1315     {
1316         nxt_string("limits"),
1317         NXT_CONF_MAP_PTR,
1318         offsetof(nxt_router_app_conf_t, limits_value),
1319     },
1320 
1321     {
1322         nxt_string("processes"),
1323         NXT_CONF_MAP_INT32,
1324         offsetof(nxt_router_app_conf_t, processes),
1325     },
1326 
1327     {
1328         nxt_string("processes"),
1329         NXT_CONF_MAP_PTR,
1330         offsetof(nxt_router_app_conf_t, processes_value),
1331     },
1332 
1333     {
1334         nxt_string("targets"),
1335         NXT_CONF_MAP_PTR,
1336         offsetof(nxt_router_app_conf_t, targets_value),
1337     },
1338 };
1339 
1340 
1341 static nxt_conf_map_t  nxt_router_app_limits_conf[] = {
1342     {
1343         nxt_string("timeout"),
1344         NXT_CONF_MAP_MSEC,
1345         offsetof(nxt_router_app_conf_t, timeout),
1346     },
1347 };
1348 
1349 
1350 static nxt_conf_map_t  nxt_router_app_processes_conf[] = {
1351     {
1352         nxt_string("spare"),
1353         NXT_CONF_MAP_INT32,
1354         offsetof(nxt_router_app_conf_t, spare_processes),
1355     },
1356 
1357     {
1358         nxt_string("max"),
1359         NXT_CONF_MAP_INT32,
1360         offsetof(nxt_router_app_conf_t, max_processes),
1361     },
1362 
1363     {
1364         nxt_string("idle_timeout"),
1365         NXT_CONF_MAP_MSEC,
1366         offsetof(nxt_router_app_conf_t, idle_timeout),
1367     },
1368 };
1369 
1370 
1371 static nxt_conf_map_t  nxt_router_listener_conf[] = {
1372     {
1373         nxt_string("pass"),
1374         NXT_CONF_MAP_STR_COPY,
1375         offsetof(nxt_router_listener_conf_t, pass),
1376     },
1377 
1378     {
1379         nxt_string("application"),
1380         NXT_CONF_MAP_STR_COPY,
1381         offsetof(nxt_router_listener_conf_t, application),
1382     },
1383 };
1384 
1385 
1386 static nxt_conf_map_t  nxt_router_http_conf[] = {
1387     {
1388         nxt_string("header_buffer_size"),
1389         NXT_CONF_MAP_SIZE,
1390         offsetof(nxt_socket_conf_t, header_buffer_size),
1391     },
1392 
1393     {
1394         nxt_string("large_header_buffer_size"),
1395         NXT_CONF_MAP_SIZE,
1396         offsetof(nxt_socket_conf_t, large_header_buffer_size),
1397     },
1398 
1399     {
1400         nxt_string("large_header_buffers"),
1401         NXT_CONF_MAP_SIZE,
1402         offsetof(nxt_socket_conf_t, large_header_buffers),
1403     },
1404 
1405     {
1406         nxt_string("body_buffer_size"),
1407         NXT_CONF_MAP_SIZE,
1408         offsetof(nxt_socket_conf_t, body_buffer_size),
1409     },
1410 
1411     {
1412         nxt_string("max_body_size"),
1413         NXT_CONF_MAP_SIZE,
1414         offsetof(nxt_socket_conf_t, max_body_size),
1415     },
1416 
1417     {
1418         nxt_string("idle_timeout"),
1419         NXT_CONF_MAP_MSEC,
1420         offsetof(nxt_socket_conf_t, idle_timeout),
1421     },
1422 
1423     {
1424         nxt_string("header_read_timeout"),
1425         NXT_CONF_MAP_MSEC,
1426         offsetof(nxt_socket_conf_t, header_read_timeout),
1427     },
1428 
1429     {
1430         nxt_string("body_read_timeout"),
1431         NXT_CONF_MAP_MSEC,
1432         offsetof(nxt_socket_conf_t, body_read_timeout),
1433     },
1434 
1435     {
1436         nxt_string("send_timeout"),
1437         NXT_CONF_MAP_MSEC,
1438         offsetof(nxt_socket_conf_t, send_timeout),
1439     },
1440 
1441     {
1442         nxt_string("body_temp_path"),
1443         NXT_CONF_MAP_STR,
1444         offsetof(nxt_socket_conf_t, body_temp_path),
1445     },
1446 
1447     {
1448         nxt_string("discard_unsafe_fields"),
1449         NXT_CONF_MAP_INT8,
1450         offsetof(nxt_socket_conf_t, discard_unsafe_fields),
1451     },
1452 };
1453 
1454 
1455 static nxt_conf_map_t  nxt_router_websocket_conf[] = {
1456     {
1457         nxt_string("max_frame_size"),
1458         NXT_CONF_MAP_SIZE,
1459         offsetof(nxt_websocket_conf_t, max_frame_size),
1460     },
1461 
1462     {
1463         nxt_string("read_timeout"),
1464         NXT_CONF_MAP_MSEC,
1465         offsetof(nxt_websocket_conf_t, read_timeout),
1466     },
1467 
1468     {
1469         nxt_string("keepalive_interval"),
1470         NXT_CONF_MAP_MSEC,
1471         offsetof(nxt_websocket_conf_t, keepalive_interval),
1472     },
1473 
1474 };
1475 
1476 
1477 static nxt_int_t
1478 nxt_router_conf_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf,
1479     u_char *start, u_char *end)
1480 {
1481     u_char                      *p;
1482     size_t                      size;
1483     nxt_mp_t                    *mp, *app_mp;
1484     uint32_t                    next, next_target;
1485     nxt_int_t                   ret;
1486     nxt_str_t                   name, path, target;
1487     nxt_app_t                   *app, *prev;
1488     nxt_str_t                   *t, *s, *targets;
1489     nxt_uint_t                  n, i;
1490     nxt_port_t                  *port;
1491     nxt_router_t                *router;
1492     nxt_app_joint_t             *app_joint;
1493 #if (NXT_TLS)
1494     nxt_tls_init_t              *tls_init;
1495     nxt_conf_value_t            *certificate;
1496 #endif
1497     nxt_conf_value_t            *root, *conf, *http, *value, *websocket;
1498     nxt_conf_value_t            *applications, *application;
1499     nxt_conf_value_t            *listeners, *listener;
1500     nxt_socket_conf_t           *skcf;
1501     nxt_router_conf_t           *rtcf;
1502     nxt_http_routes_t           *routes;
1503     nxt_event_engine_t          *engine;
1504     nxt_app_lang_module_t       *lang;
1505     nxt_router_app_conf_t       apcf;
1506     nxt_router_access_log_t     *access_log;
1507     nxt_router_listener_conf_t  lscf;
1508 
1509     static nxt_str_t  http_path = nxt_string("/settings/http");
1510     static nxt_str_t  applications_path = nxt_string("/applications");
1511     static nxt_str_t  listeners_path = nxt_string("/listeners");
1512     static nxt_str_t  routes_path = nxt_string("/routes");
1513     static nxt_str_t  access_log_path = nxt_string("/access_log");
1514 #if (NXT_TLS)
1515     static nxt_str_t  certificate_path = nxt_string("/tls/certificate");
1516     static nxt_str_t  conf_commands_path = nxt_string("/tls/conf_commands");
1517     static nxt_str_t  conf_cache_path = nxt_string("/tls/session/cache_size");
1518     static nxt_str_t  conf_timeout_path = nxt_string("/tls/session/timeout");
1519     static nxt_str_t  conf_tickets = nxt_string("/tls/session/tickets");
1520 #endif
1521     static nxt_str_t  static_path = nxt_string("/settings/http/static");
1522     static nxt_str_t  websocket_path = nxt_string("/settings/http/websocket");
1523     static nxt_str_t  forwarded_path = nxt_string("/forwarded");
1524     static nxt_str_t  client_ip_path = nxt_string("/client_ip");
1525 
1526     root = nxt_conf_json_parse(tmcf->mem_pool, start, end, NULL);
1527     if (root == NULL) {
1528         nxt_alert(task, "configuration parsing error");
1529         return NXT_ERROR;
1530     }
1531 
1532     rtcf = tmcf->router_conf;
1533     mp = rtcf->mem_pool;
1534 
1535     ret = nxt_conf_map_object(mp, root, nxt_router_conf,
1536                               nxt_nitems(nxt_router_conf), rtcf);
1537     if (ret != NXT_OK) {
1538         nxt_alert(task, "root map error");
1539         return NXT_ERROR;
1540     }
1541 
1542     if (rtcf->threads == 0) {
1543         rtcf->threads = nxt_ncpu;
1544     }
1545 
1546     conf = nxt_conf_get_path(root, &static_path);
1547 
1548     ret = nxt_router_conf_process_static(task, rtcf, conf);
1549     if (nxt_slow_path(ret != NXT_OK)) {
1550         return NXT_ERROR;
1551     }
1552 
1553     router = rtcf->router;
1554 
1555     applications = nxt_conf_get_path(root, &applications_path);
1556 
1557     if (applications != NULL) {
1558         next = 0;
1559 
1560         for ( ;; ) {
1561             application = nxt_conf_next_object_member(applications,
1562                                                       &name, &next);
1563             if (application == NULL) {
1564                 break;
1565             }
1566 
1567             nxt_debug(task, "application \"%V\"", &name);
1568 
1569             size = nxt_conf_json_length(application, NULL);
1570 
1571             app_mp = nxt_mp_create(4096, 128, 1024, 64);
1572             if (nxt_slow_path(app_mp == NULL)) {
1573                 goto fail;
1574             }
1575 
1576             app = nxt_mp_get(app_mp, sizeof(nxt_app_t) + name.length + size);
1577             if (app == NULL) {
1578                 goto app_fail;
1579             }
1580 
1581             nxt_memzero(app, sizeof(nxt_app_t));
1582 
1583             app->mem_pool = app_mp;
1584 
1585             app->name.start = nxt_pointer_to(app, sizeof(nxt_app_t));
1586             app->conf.start = nxt_pointer_to(app, sizeof(nxt_app_t)
1587                                                   + name.length);
1588 
1589             p = nxt_conf_json_print(app->conf.start, application, NULL);
1590             app->conf.length = p - app->conf.start;
1591 
1592             nxt_assert(app->conf.length <= size);
1593 
1594             nxt_debug(task, "application conf \"%V\"", &app->conf);
1595 
1596             prev = nxt_router_app_find(&router->apps, &name);
1597 
1598             if (prev != NULL && nxt_strstr_eq(&app->conf, &prev->conf)) {
1599                 nxt_mp_destroy(app_mp);
1600 
1601                 nxt_queue_remove(&prev->link);
1602                 nxt_queue_insert_tail(&tmcf->previous, &prev->link);
1603 
1604                 ret = nxt_router_apps_hash_add(rtcf, prev);
1605                 if (nxt_slow_path(ret != NXT_OK)) {
1606                     goto fail;
1607                 }
1608 
1609                 continue;
1610             }
1611 
1612             apcf.processes = 1;
1613             apcf.max_processes = 1;
1614             apcf.spare_processes = 0;
1615             apcf.timeout = 0;
1616             apcf.idle_timeout = 15000;
1617             apcf.limits_value = NULL;
1618             apcf.processes_value = NULL;
1619             apcf.targets_value = NULL;
1620 
1621             app_joint = nxt_malloc(sizeof(nxt_app_joint_t));
1622             if (nxt_slow_path(app_joint == NULL)) {
1623                 goto app_fail;
1624             }
1625 
1626             nxt_memzero(app_joint, sizeof(nxt_app_joint_t));
1627 
1628             ret = nxt_conf_map_object(mp, application, nxt_router_app_conf,
1629                                       nxt_nitems(nxt_router_app_conf), &apcf);
1630             if (ret != NXT_OK) {
1631                 nxt_alert(task, "application map error");
1632                 goto app_fail;
1633             }
1634 
1635             if (apcf.limits_value != NULL) {
1636 
1637                 if (nxt_conf_type(apcf.limits_value) != NXT_CONF_OBJECT) {
1638                     nxt_alert(task, "application limits is not object");
1639                     goto app_fail;
1640                 }
1641 
1642                 ret = nxt_conf_map_object(mp, apcf.limits_value,
1643                                         nxt_router_app_limits_conf,
1644                                         nxt_nitems(nxt_router_app_limits_conf),
1645                                         &apcf);
1646                 if (ret != NXT_OK) {
1647                     nxt_alert(task, "application limits map error");
1648                     goto app_fail;
1649                 }
1650             }
1651 
1652             if (apcf.processes_value != NULL
1653                 && nxt_conf_type(apcf.processes_value) == NXT_CONF_OBJECT)
1654             {
1655                 ret = nxt_conf_map_object(mp, apcf.processes_value,
1656                                      nxt_router_app_processes_conf,
1657                                      nxt_nitems(nxt_router_app_processes_conf),
1658                                      &apcf);
1659                 if (ret != NXT_OK) {
1660                     nxt_alert(task, "application processes map error");
1661                     goto app_fail;
1662                 }
1663 
1664             } else {
1665                 apcf.max_processes = apcf.processes;
1666                 apcf.spare_processes = apcf.processes;
1667             }
1668 
1669             if (apcf.targets_value != NULL) {
1670                 n = nxt_conf_object_members_count(apcf.targets_value);
1671 
1672                 targets = nxt_mp_get(app_mp, sizeof(nxt_str_t) * n);
1673                 if (nxt_slow_path(targets == NULL)) {
1674                     goto app_fail;
1675                 }
1676 
1677                 next_target = 0;
1678 
1679                 for (i = 0; i < n; i++) {
1680                     (void) nxt_conf_next_object_member(apcf.targets_value,
1681                                                        &target, &next_target);
1682 
1683                     s = nxt_str_dup(app_mp, &targets[i], &target);
1684                     if (nxt_slow_path(s == NULL)) {
1685                         goto app_fail;
1686                     }
1687                 }
1688 
1689             } else {
1690                 targets = NULL;
1691             }
1692 
1693             nxt_debug(task, "application type: %V", &apcf.type);
1694             nxt_debug(task, "application processes: %D", apcf.processes);
1695             nxt_debug(task, "application request timeout: %M", apcf.timeout);
1696 
1697             lang = nxt_app_lang_module(task->thread->runtime, &apcf.type);
1698 
1699             if (lang == NULL) {
1700                 nxt_alert(task, "unknown application type: \"%V\"", &apcf.type);
1701                 goto app_fail;
1702             }
1703 
1704             nxt_debug(task, "application language module: \"%s\"", lang->file);
1705 
1706             ret = nxt_thread_mutex_create(&app->mutex);
1707             if (ret != NXT_OK) {
1708                 goto app_fail;
1709             }
1710 
1711             nxt_queue_init(&app->ports);
1712             nxt_queue_init(&app->spare_ports);
1713             nxt_queue_init(&app->idle_ports);
1714             nxt_queue_init(&app->ack_waiting_req);
1715 
1716             app->name.length = name.length;
1717             nxt_memcpy(app->name.start, name.start, name.length);
1718 
1719             app->type = lang->type;
1720             app->max_processes = apcf.max_processes;
1721             app->spare_processes = apcf.spare_processes;
1722             app->max_pending_processes = apcf.spare_processes
1723                                          ? apcf.spare_processes : 1;
1724             app->timeout = apcf.timeout;
1725             app->idle_timeout = apcf.idle_timeout;
1726 
1727             app->targets = targets;
1728 
1729             engine = task->thread->engine;
1730 
1731             app->engine = engine;
1732 
1733             app->adjust_idle_work.handler = nxt_router_adjust_idle_timer;
1734             app->adjust_idle_work.task = &engine->task;
1735             app->adjust_idle_work.obj = app;
1736 
1737             nxt_queue_insert_tail(&tmcf->apps, &app->link);
1738 
1739             ret = nxt_router_apps_hash_add(rtcf, app);
1740             if (nxt_slow_path(ret != NXT_OK)) {
1741                 goto app_fail;
1742             }
1743 
1744             nxt_router_app_use(task, app, 1);
1745 
1746             app->joint = app_joint;
1747 
1748             app_joint->use_count = 1;
1749             app_joint->app = app;
1750 
1751             app_joint->idle_timer.bias = NXT_TIMER_DEFAULT_BIAS;
1752             app_joint->idle_timer.work_queue = &engine->fast_work_queue;
1753             app_joint->idle_timer.handler = nxt_router_app_idle_timeout;
1754             app_joint->idle_timer.task = &engine->task;
1755             app_joint->idle_timer.log = app_joint->idle_timer.task->log;
1756 
1757             app_joint->free_app_work.handler = nxt_router_free_app;
1758             app_joint->free_app_work.task = &engine->task;
1759             app_joint->free_app_work.obj = app_joint;
1760 
1761             port = nxt_port_new(task, NXT_SHARED_PORT_ID, nxt_pid,
1762                                 NXT_PROCESS_APP);
1763             if (nxt_slow_path(port == NULL)) {
1764                 return NXT_ERROR;
1765             }
1766 
1767             ret = nxt_port_socket_init(task, port, 0);
1768             if (nxt_slow_path(ret != NXT_OK)) {
1769                 nxt_port_use(task, port, -1);
1770                 return NXT_ERROR;
1771             }
1772 
1773             ret = nxt_router_app_queue_init(task, port);
1774             if (nxt_slow_path(ret != NXT_OK)) {
1775                 nxt_port_write_close(port);
1776                 nxt_port_read_close(port);
1777                 nxt_port_use(task, port, -1);
1778                 return NXT_ERROR;
1779             }
1780 
1781             nxt_port_write_enable(task, port);
1782             port->app = app;
1783 
1784             app->shared_port = port;
1785 
1786             nxt_thread_mutex_create(&app->outgoing.mutex);
1787         }
1788     }
1789 
1790     conf = nxt_conf_get_path(root, &routes_path);
1791     if (nxt_fast_path(conf != NULL)) {
1792         routes = nxt_http_routes_create(task, tmcf, conf);
1793         if (nxt_slow_path(routes == NULL)) {
1794             return NXT_ERROR;
1795         }
1796         rtcf->routes = routes;
1797     }
1798 
1799     ret = nxt_upstreams_create(task, tmcf, root);
1800     if (nxt_slow_path(ret != NXT_OK)) {
1801         return ret;
1802     }
1803 
1804     http = nxt_conf_get_path(root, &http_path);
1805 #if 0
1806     if (http == NULL) {
1807         nxt_alert(task, "no \"http\" block");
1808         return NXT_ERROR;
1809     }
1810 #endif
1811 
1812     websocket = nxt_conf_get_path(root, &websocket_path);
1813 
1814     listeners = nxt_conf_get_path(root, &listeners_path);
1815 
1816     if (listeners != NULL) {
1817         next = 0;
1818 
1819         for ( ;; ) {
1820             listener = nxt_conf_next_object_member(listeners, &name, &next);
1821             if (listener == NULL) {
1822                 break;
1823             }
1824 
1825             skcf = nxt_router_socket_conf(task, tmcf, &name);
1826             if (skcf == NULL) {
1827                 goto fail;
1828             }
1829 
1830             nxt_memzero(&lscf, sizeof(lscf));
1831 
1832             ret = nxt_conf_map_object(mp, listener, nxt_router_listener_conf,
1833                                       nxt_nitems(nxt_router_listener_conf),
1834                                       &lscf);
1835             if (ret != NXT_OK) {
1836                 nxt_alert(task, "listener map error");
1837                 goto fail;
1838             }
1839 
1840             nxt_debug(task, "application: %V", &lscf.application);
1841 
1842             // STUB, default values if http block is not defined.
1843             skcf->header_buffer_size = 2048;
1844             skcf->large_header_buffer_size = 8192;
1845             skcf->large_header_buffers = 4;
1846             skcf->discard_unsafe_fields = 1;
1847             skcf->body_buffer_size = 16 * 1024;
1848             skcf->max_body_size = 8 * 1024 * 1024;
1849             skcf->proxy_header_buffer_size = 64 * 1024;
1850             skcf->proxy_buffer_size = 4096;
1851             skcf->proxy_buffers = 256;
1852             skcf->idle_timeout = 180 * 1000;
1853             skcf->header_read_timeout = 30 * 1000;
1854             skcf->body_read_timeout = 30 * 1000;
1855             skcf->send_timeout = 30 * 1000;
1856             skcf->proxy_timeout = 60 * 1000;
1857             skcf->proxy_send_timeout = 30 * 1000;
1858             skcf->proxy_read_timeout = 30 * 1000;
1859 
1860             skcf->websocket_conf.max_frame_size = 1024 * 1024;
1861             skcf->websocket_conf.read_timeout = 60 * 1000;
1862             skcf->websocket_conf.keepalive_interval = 30 * 1000;
1863 
1864             nxt_str_null(&skcf->body_temp_path);
1865 
1866             if (http != NULL) {
1867                 ret = nxt_conf_map_object(mp, http, nxt_router_http_conf,
1868                                           nxt_nitems(nxt_router_http_conf),
1869                                           skcf);
1870                 if (ret != NXT_OK) {
1871                     nxt_alert(task, "http map error");
1872                     goto fail;
1873                 }
1874             }
1875 
1876             if (websocket != NULL) {
1877                 ret = nxt_conf_map_object(mp, websocket,
1878                                           nxt_router_websocket_conf,
1879                                           nxt_nitems(nxt_router_websocket_conf),
1880                                           &skcf->websocket_conf);
1881                 if (ret != NXT_OK) {
1882                     nxt_alert(task, "websocket map error");
1883                     goto fail;
1884                 }
1885             }
1886 
1887             t = &skcf->body_temp_path;
1888 
1889             if (t->length == 0) {
1890                 t->start = (u_char *) task->thread->runtime->tmp;
1891                 t->length = nxt_strlen(t->start);
1892             }
1893 
1894             conf = nxt_conf_get_path(listener, &forwarded_path);
1895 
1896             if (conf != NULL) {
1897                 skcf->forwarded = nxt_router_conf_forward(task, mp, conf);
1898                 if (nxt_slow_path(skcf->forwarded == NULL)) {
1899                     return NXT_ERROR;
1900                 }
1901             }
1902 
1903             conf = nxt_conf_get_path(listener, &client_ip_path);
1904 
1905             if (conf != NULL) {
1906                 skcf->client_ip = nxt_router_conf_forward(task, mp, conf);
1907                 if (nxt_slow_path(skcf->client_ip == NULL)) {
1908                     return NXT_ERROR;
1909                 }
1910             }
1911 
1912 #if (NXT_TLS)
1913             certificate = nxt_conf_get_path(listener, &certificate_path);
1914 
1915             if (certificate != NULL) {
1916                 tls_init = nxt_mp_get(tmcf->mem_pool, sizeof(nxt_tls_init_t));
1917                 if (nxt_slow_path(tls_init == NULL)) {
1918                     return NXT_ERROR;
1919                 }
1920 
1921                 tls_init->cache_size = 0;
1922                 tls_init->timeout = 300;
1923 
1924                 value = nxt_conf_get_path(listener, &conf_cache_path);
1925                 if (value != NULL) {
1926                     tls_init->cache_size = nxt_conf_get_number(value);
1927                 }
1928 
1929                 value = nxt_conf_get_path(listener, &conf_timeout_path);
1930                 if (value != NULL) {
1931                     tls_init->timeout = nxt_conf_get_number(value);
1932                 }
1933 
1934                 tls_init->conf_cmds = nxt_conf_get_path(listener,
1935                                                         &conf_commands_path);
1936 
1937                 tls_init->tickets_conf = nxt_conf_get_path(listener,
1938                                                            &conf_tickets);
1939 
1940                 n = nxt_conf_array_elements_count_or_1(certificate);
1941 
1942                 for (i = 0; i < n; i++) {
1943                     value = nxt_conf_get_array_element_or_itself(certificate,
1944                                                                  i);
1945                     nxt_assert(value != NULL);
1946 
1947                     ret = nxt_router_conf_tls_insert(tmcf, value, skcf,
1948                                                      tls_init, i == 0);
1949                     if (nxt_slow_path(ret != NXT_OK)) {
1950                         goto fail;
1951                     }
1952                 }
1953             }
1954 #endif
1955 
1956             skcf->listen->handler = nxt_http_conn_init;
1957             skcf->router_conf = rtcf;
1958             skcf->router_conf->count++;
1959 
1960             if (lscf.pass.length != 0) {
1961                 skcf->action = nxt_http_action_create(task, tmcf, &lscf.pass);
1962 
1963             /* COMPATIBILITY: listener application. */
1964             } else if (lscf.application.length > 0) {
1965                 skcf->action = nxt_http_pass_application(task, rtcf,
1966                                                          &lscf.application);
1967             }
1968 
1969             if (nxt_slow_path(skcf->action == NULL)) {
1970                 goto fail;
1971             }
1972         }
1973     }
1974 
1975     ret = nxt_http_routes_resolve(task, tmcf);
1976     if (nxt_slow_path(ret != NXT_OK)) {
1977         goto fail;
1978     }
1979 
1980     value = nxt_conf_get_path(root, &access_log_path);
1981 
1982     if (value != NULL) {
1983         nxt_conf_get_string(value, &path);
1984 
1985         access_log = router->access_log;
1986 
1987         if (access_log != NULL && nxt_strstr_eq(&path, &access_log->path)) {
1988             nxt_router_access_log_use(&router->lock, access_log);
1989 
1990         } else {
1991             access_log = nxt_malloc(sizeof(nxt_router_access_log_t)
1992                                     + path.length);
1993             if (access_log == NULL) {
1994                 nxt_alert(task, "failed to allocate access log structure");
1995                 goto fail;
1996             }
1997 
1998             access_log->fd = -1;
1999             access_log->handler = &nxt_router_access_log_writer;
2000             access_log->count = 1;
2001 
2002             access_log->path.length = path.length;
2003             access_log->path.start = (u_char *) access_log
2004                                      + sizeof(nxt_router_access_log_t);
2005 
2006             nxt_memcpy(access_log->path.start, path.start, path.length);
2007         }
2008 
2009         rtcf->access_log = access_log;
2010     }
2011 
2012     nxt_queue_add(&deleting_sockets, &router->sockets);
2013     nxt_queue_init(&router->sockets);
2014 
2015     return NXT_OK;
2016 
2017 app_fail:
2018 
2019     nxt_mp_destroy(app_mp);
2020 
2021 fail:
2022 
2023     nxt_queue_each(app, &tmcf->apps, nxt_app_t, link) {
2024 
2025         nxt_queue_remove(&app->link);
2026         nxt_thread_mutex_destroy(&app->mutex);
2027         nxt_mp_destroy(app->mem_pool);
2028 
2029     } nxt_queue_loop;
2030 
2031     return NXT_ERROR;
2032 }
2033 
2034 
2035 #if (NXT_TLS)
2036 
2037 static nxt_int_t
2038 nxt_router_conf_tls_insert(nxt_router_temp_conf_t *tmcf,
2039     nxt_conf_value_t *value, nxt_socket_conf_t *skcf,
2040     nxt_tls_init_t *tls_init, nxt_bool_t last)
2041 {
2042     nxt_router_tlssock_t  *tls;
2043 
2044     tls = nxt_mp_get(tmcf->mem_pool, sizeof(nxt_router_tlssock_t));
2045     if (nxt_slow_path(tls == NULL)) {
2046         return NXT_ERROR;
2047     }
2048 
2049     tls->tls_init = tls_init;
2050     tls->socket_conf = skcf;
2051     tls->temp_conf = tmcf;
2052     tls->last = last;
2053     nxt_conf_get_string(value, &tls->name);
2054 
2055     nxt_queue_insert_tail(&tmcf->tls, &tls->link);
2056 
2057     return NXT_OK;
2058 }
2059 
2060 #endif
2061 
2062 
2063 static nxt_int_t
2064 nxt_router_conf_process_static(nxt_task_t *task, nxt_router_conf_t *rtcf,
2065     nxt_conf_value_t *conf)
2066 {
2067     uint32_t          next, i;
2068     nxt_mp_t          *mp;
2069     nxt_str_t         *type, exten, str;
2070     nxt_int_t         ret;
2071     nxt_uint_t        exts;
2072     nxt_conf_value_t  *mtypes_conf, *ext_conf, *value;
2073 
2074     static nxt_str_t  mtypes_path = nxt_string("/mime_types");
2075 
2076     mp = rtcf->mem_pool;
2077 
2078     ret = nxt_http_static_mtypes_init(mp, &rtcf->mtypes_hash);
2079     if (nxt_slow_path(ret != NXT_OK)) {
2080         return NXT_ERROR;
2081     }
2082 
2083     if (conf == NULL) {
2084         return NXT_OK;
2085     }
2086 
2087     mtypes_conf = nxt_conf_get_path(conf, &mtypes_path);
2088 
2089     if (mtypes_conf != NULL) {
2090         next = 0;
2091 
2092         for ( ;; ) {
2093             ext_conf = nxt_conf_next_object_member(mtypes_conf, &str, &next);
2094 
2095             if (ext_conf == NULL) {
2096                 break;
2097             }
2098 
2099             type = nxt_str_dup(mp, NULL, &str);
2100             if (nxt_slow_path(type == NULL)) {
2101                 return NXT_ERROR;
2102             }
2103 
2104             if (nxt_conf_type(ext_conf) == NXT_CONF_STRING) {
2105                 nxt_conf_get_string(ext_conf, &str);
2106 
2107                 if (nxt_slow_path(nxt_str_dup(mp, &exten, &str) == NULL)) {
2108                     return NXT_ERROR;
2109                 }
2110 
2111                 ret = nxt_http_static_mtypes_hash_add(mp, &rtcf->mtypes_hash,
2112                                                       &exten, type);
2113                 if (nxt_slow_path(ret != NXT_OK)) {
2114                     return NXT_ERROR;
2115                 }
2116 
2117                 continue;
2118             }
2119 
2120             exts = nxt_conf_array_elements_count(ext_conf);
2121 
2122             for (i = 0; i < exts; i++) {
2123                 value = nxt_conf_get_array_element(ext_conf, i);
2124 
2125                 nxt_conf_get_string(value, &str);
2126 
2127                 if (nxt_slow_path(nxt_str_dup(mp, &exten, &str) == NULL)) {
2128                     return NXT_ERROR;
2129                 }
2130 
2131                 ret = nxt_http_static_mtypes_hash_add(mp, &rtcf->mtypes_hash,
2132                                                       &exten, type);
2133                 if (nxt_slow_path(ret != NXT_OK)) {
2134                     return NXT_ERROR;
2135                 }
2136             }
2137         }
2138     }
2139 
2140     return NXT_OK;
2141 }
2142 
2143 
2144 static nxt_http_forward_t *
2145 nxt_router_conf_forward(nxt_task_t *task, nxt_mp_t *mp, nxt_conf_value_t *conf)
2146 {
2147     nxt_int_t                   ret;
2148     nxt_conf_value_t            *header_conf, *client_ip_conf, *protocol_conf;
2149     nxt_conf_value_t            *source_conf, *recursive_conf;
2150     nxt_http_forward_t          *forward;
2151     nxt_http_route_addr_rule_t  *source;
2152 
2153     static nxt_str_t  header_path = nxt_string("/header");
2154     static nxt_str_t  client_ip_path = nxt_string("/client_ip");
2155     static nxt_str_t  protocol_path = nxt_string("/protocol");
2156     static nxt_str_t  source_path = nxt_string("/source");
2157     static nxt_str_t  recursive_path = nxt_string("/recursive");
2158 
2159     header_conf = nxt_conf_get_path(conf, &header_path);
2160 
2161     if (header_conf != NULL) {
2162         client_ip_conf = nxt_conf_get_path(conf, &header_path);
2163         protocol_conf = NULL;
2164 
2165     } else {
2166         client_ip_conf = nxt_conf_get_path(conf, &client_ip_path);
2167         protocol_conf = nxt_conf_get_path(conf, &protocol_path);
2168     }
2169 
2170     source_conf = nxt_conf_get_path(conf, &source_path);
2171     recursive_conf = nxt_conf_get_path(conf, &recursive_path);
2172 
2173     if (source_conf == NULL
2174         || (protocol_conf == NULL && client_ip_conf == NULL))
2175     {
2176         return NULL;
2177     }
2178 
2179     forward = nxt_mp_zget(mp, sizeof(nxt_http_forward_t));
2180     if (nxt_slow_path(forward == NULL)) {
2181         return NULL;
2182     }
2183 
2184     source = nxt_http_route_addr_rule_create(task, mp, source_conf);
2185     if (nxt_slow_path(source == NULL)) {
2186         return NULL;
2187     }
2188 
2189     forward->source = source;
2190 
2191     if (recursive_conf != NULL) {
2192         forward->recursive = nxt_conf_get_boolean(recursive_conf);
2193     }
2194 
2195     if (client_ip_conf != NULL) {
2196         ret = nxt_router_conf_forward_header(mp, client_ip_conf,
2197                                              &forward->client_ip);
2198         if (nxt_slow_path(ret != NXT_OK)) {
2199             return NULL;
2200         }
2201     }
2202 
2203     if (protocol_conf != NULL) {
2204         ret = nxt_router_conf_forward_header(mp, protocol_conf,
2205                                              &forward->protocol);
2206         if (nxt_slow_path(ret != NXT_OK)) {
2207             return NULL;
2208         }
2209     }
2210 
2211     return forward;
2212 }
2213 
2214 
2215 static nxt_int_t
2216 nxt_router_conf_forward_header(nxt_mp_t *mp, nxt_conf_value_t *conf,
2217     nxt_http_forward_header_t *fh)
2218 {
2219     char       c;
2220     size_t     i;
2221     uint32_t   hash;
2222     nxt_str_t  header;
2223 
2224     nxt_conf_get_string(conf, &header);
2225 
2226     fh->header = nxt_str_dup(mp, NULL, &header);
2227     if (nxt_slow_path(fh->header == NULL)) {
2228         return NXT_ERROR;
2229     }
2230 
2231     hash = NXT_HTTP_FIELD_HASH_INIT;
2232 
2233     for (i = 0; i < fh->header->length; i++) {
2234         c = fh->header->start[i];
2235         hash = nxt_http_field_hash_char(hash, nxt_lowcase(c));
2236     }
2237 
2238     hash = nxt_http_field_hash_end(hash) & 0xFFFF;
2239 
2240     fh->header_hash = hash;
2241 
2242     return NXT_OK;
2243 }
2244 
2245 
2246 static nxt_app_t *
2247 nxt_router_app_find(nxt_queue_t *queue, nxt_str_t *name)
2248 {
2249     nxt_app_t  *app;
2250 
2251     nxt_queue_each(app, queue, nxt_app_t, link) {
2252 
2253         if (nxt_strstr_eq(name, &app->name)) {
2254             return app;
2255         }
2256 
2257     } nxt_queue_loop;
2258 
2259     return NULL;
2260 }
2261 
2262 
2263 static nxt_int_t
2264 nxt_router_app_queue_init(nxt_task_t *task, nxt_port_t *port)
2265 {
2266     void       *mem;
2267     nxt_int_t  fd;
2268 
2269     fd = nxt_shm_open(task, sizeof(nxt_app_queue_t));
2270     if (nxt_slow_path(fd == -1)) {
2271         return NXT_ERROR;
2272     }
2273 
2274     mem = nxt_mem_mmap(NULL, sizeof(nxt_app_queue_t),
2275                        PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
2276     if (nxt_slow_path(mem == MAP_FAILED)) {
2277         nxt_fd_close(fd);
2278 
2279         return NXT_ERROR;
2280     }
2281 
2282     nxt_app_queue_init(mem);
2283 
2284     port->queue_fd = fd;
2285     port->queue = mem;
2286 
2287     return NXT_OK;
2288 }
2289 
2290 
2291 static nxt_int_t
2292 nxt_router_port_queue_init(nxt_task_t *task, nxt_port_t *port)
2293 {
2294     void       *mem;
2295     nxt_int_t  fd;
2296 
2297     fd = nxt_shm_open(task, sizeof(nxt_port_queue_t));
2298     if (nxt_slow_path(fd == -1)) {
2299         return NXT_ERROR;
2300     }
2301 
2302     mem = nxt_mem_mmap(NULL, sizeof(nxt_port_queue_t),
2303                        PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
2304     if (nxt_slow_path(mem == MAP_FAILED)) {
2305         nxt_fd_close(fd);
2306 
2307         return NXT_ERROR;
2308     }
2309 
2310     nxt_port_queue_init(mem);
2311 
2312     port->queue_fd = fd;
2313     port->queue = mem;
2314 
2315     return NXT_OK;
2316 }
2317 
2318 
2319 static nxt_int_t
2320 nxt_router_port_queue_map(nxt_task_t *task, nxt_port_t *port, nxt_fd_t fd)
2321 {
2322     void  *mem;
2323 
2324     nxt_assert(fd != -1);
2325 
2326     mem = nxt_mem_mmap(NULL, sizeof(nxt_port_queue_t),
2327                        PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
2328     if (nxt_slow_path(mem == MAP_FAILED)) {
2329 
2330         return NXT_ERROR;
2331     }
2332 
2333     port->queue = mem;
2334 
2335     return NXT_OK;
2336 }
2337 
2338 
2339 static const nxt_lvlhsh_proto_t  nxt_router_apps_hash_proto  nxt_aligned(64) = {
2340     NXT_LVLHSH_DEFAULT,
2341     nxt_router_apps_hash_test,
2342     nxt_mp_lvlhsh_alloc,
2343     nxt_mp_lvlhsh_free,
2344 };
2345 
2346 
2347 static nxt_int_t
2348 nxt_router_apps_hash_test(nxt_lvlhsh_query_t *lhq, void *data)
2349 {
2350     nxt_app_t  *app;
2351 
2352     app = data;
2353 
2354     return nxt_strstr_eq(&lhq->key, &app->name) ? NXT_OK : NXT_DECLINED;
2355 }
2356 
2357 
2358 static nxt_int_t
2359 nxt_router_apps_hash_add(nxt_router_conf_t *rtcf, nxt_app_t *app)
2360 {
2361     nxt_lvlhsh_query_t  lhq;
2362 
2363     lhq.key_hash = nxt_djb_hash(app->name.start, app->name.length);
2364     lhq.replace = 0;
2365     lhq.key = app->name;
2366     lhq.value = app;
2367     lhq.proto = &nxt_router_apps_hash_proto;
2368     lhq.pool = rtcf->mem_pool;
2369 
2370     switch (nxt_lvlhsh_insert(&rtcf->apps_hash, &lhq)) {
2371 
2372     case NXT_OK:
2373         return NXT_OK;
2374 
2375     case NXT_DECLINED:
2376         nxt_thread_log_alert("router app hash adding failed: "
2377                              "\"%V\" is already in hash", &lhq.key);
2378         /* Fall through. */
2379     default:
2380         return NXT_ERROR;
2381     }
2382 }
2383 
2384 
2385 static nxt_app_t *
2386 nxt_router_apps_hash_get(nxt_router_conf_t *rtcf, nxt_str_t *name)
2387 {
2388     nxt_lvlhsh_query_t  lhq;
2389 
2390     lhq.key_hash = nxt_djb_hash(name->start, name->length);
2391     lhq.key = *name;
2392     lhq.proto = &nxt_router_apps_hash_proto;
2393 
2394     if (nxt_lvlhsh_find(&rtcf->apps_hash, &lhq) != NXT_OK) {
2395         return NULL;
2396     }
2397 
2398     return lhq.value;
2399 }
2400 
2401 
2402 static void
2403 nxt_router_apps_hash_use(nxt_task_t *task, nxt_router_conf_t *rtcf, int i)
2404 {
2405     nxt_app_t          *app;
2406     nxt_lvlhsh_each_t  lhe;
2407 
2408     nxt_lvlhsh_each_init(&lhe, &nxt_router_apps_hash_proto);
2409 
2410     for ( ;; ) {
2411         app = nxt_lvlhsh_each(&rtcf->apps_hash, &lhe);
2412 
2413         if (app == NULL) {
2414             break;
2415         }
2416 
2417         nxt_router_app_use(task, app, i);
2418     }
2419 }
2420 
2421 
2422 typedef struct {
2423     nxt_app_t  *app;
2424     nxt_int_t  target;
2425 } nxt_http_app_conf_t;
2426 
2427 
2428 nxt_int_t
2429 nxt_router_application_init(nxt_router_conf_t *rtcf, nxt_str_t *name,
2430     nxt_str_t *target, nxt_http_action_t *action)
2431 {
2432     nxt_app_t            *app;
2433     nxt_str_t            *targets;
2434     nxt_uint_t           i;
2435     nxt_http_app_conf_t  *conf;
2436 
2437     app = nxt_router_apps_hash_get(rtcf, name);
2438     if (app == NULL) {
2439         return NXT_DECLINED;
2440     }
2441 
2442     conf = nxt_mp_get(rtcf->mem_pool, sizeof(nxt_http_app_conf_t));
2443     if (nxt_slow_path(conf == NULL)) {
2444         return NXT_ERROR;
2445     }
2446 
2447     action->handler = nxt_http_application_handler;
2448     action->u.conf = conf;
2449 
2450     conf->app = app;
2451 
2452     if (target != NULL && target->length != 0) {
2453         targets = app->targets;
2454 
2455         for (i = 0; !nxt_strstr_eq(target, &targets[i]); i++);
2456 
2457         conf->target = i;
2458 
2459     } else {
2460         conf->target = 0;
2461     }
2462 
2463     return NXT_OK;
2464 }
2465 
2466 
2467 static nxt_socket_conf_t *
2468 nxt_router_socket_conf(nxt_task_t *task, nxt_router_temp_conf_t *tmcf,
2469     nxt_str_t *name)
2470 {
2471     size_t               size;
2472     nxt_int_t            ret;
2473     nxt_bool_t           wildcard;
2474     nxt_sockaddr_t       *sa;
2475     nxt_socket_conf_t    *skcf;
2476     nxt_listen_socket_t  *ls;
2477 
2478     sa = nxt_sockaddr_parse(tmcf->mem_pool, name);
2479     if (nxt_slow_path(sa == NULL)) {
2480         nxt_alert(task, "invalid listener \"%V\"", name);
2481         return NULL;
2482     }
2483 
2484     sa->type = SOCK_STREAM;
2485 
2486     nxt_debug(task, "router listener: \"%*s\"",
2487               (size_t) sa->length, nxt_sockaddr_start(sa));
2488 
2489     skcf = nxt_mp_zget(tmcf->router_conf->mem_pool, sizeof(nxt_socket_conf_t));
2490     if (nxt_slow_path(skcf == NULL)) {
2491         return NULL;
2492     }
2493 
2494     size = nxt_sockaddr_size(sa);
2495 
2496     ret = nxt_router_listen_socket_find(tmcf, skcf, sa);
2497 
2498     if (ret != NXT_OK) {
2499 
2500         ls = nxt_zalloc(sizeof(nxt_listen_socket_t) + size);
2501         if (nxt_slow_path(ls == NULL)) {
2502             return NULL;
2503         }
2504 
2505         skcf->listen = ls;
2506 
2507         ls->sockaddr = nxt_pointer_to(ls, sizeof(nxt_listen_socket_t));
2508         nxt_memcpy(ls->sockaddr, sa, size);
2509 
2510         nxt_listen_socket_remote_size(ls);
2511 
2512         ls->socket = -1;
2513         ls->backlog = NXT_LISTEN_BACKLOG;
2514         ls->flags = NXT_NONBLOCK;
2515         ls->read_after_accept = 1;
2516     }
2517 
2518     switch (sa->u.sockaddr.sa_family) {
2519 #if (NXT_HAVE_UNIX_DOMAIN)
2520     case AF_UNIX:
2521         wildcard = 0;
2522         break;
2523 #endif
2524 #if (NXT_INET6)
2525     case AF_INET6:
2526         wildcard = IN6_IS_ADDR_UNSPECIFIED(&sa->u.sockaddr_in6.sin6_addr);
2527         break;
2528 #endif
2529     case AF_INET:
2530     default:
2531         wildcard = (sa->u.sockaddr_in.sin_addr.s_addr == INADDR_ANY);
2532         break;
2533     }
2534 
2535     if (!wildcard) {
2536         skcf->sockaddr = nxt_mp_zget(tmcf->router_conf->mem_pool, size);
2537         if (nxt_slow_path(skcf->sockaddr == NULL)) {
2538             return NULL;
2539         }
2540 
2541         nxt_memcpy(skcf->sockaddr, sa, size);
2542     }
2543 
2544     return skcf;
2545 }
2546 
2547 
2548 static nxt_int_t
2549 nxt_router_listen_socket_find(nxt_router_temp_conf_t *tmcf,
2550     nxt_socket_conf_t *nskcf, nxt_sockaddr_t *sa)
2551 {
2552     nxt_router_t       *router;
2553     nxt_queue_link_t   *qlk;
2554     nxt_socket_conf_t  *skcf;
2555 
2556     router = tmcf->router_conf->router;
2557 
2558     for (qlk = nxt_queue_first(&router->sockets);
2559          qlk != nxt_queue_tail(&router->sockets);
2560          qlk = nxt_queue_next(qlk))
2561     {
2562         skcf = nxt_queue_link_data(qlk, nxt_socket_conf_t, link);
2563 
2564         if (nxt_sockaddr_cmp(skcf->listen->sockaddr, sa)) {
2565             nskcf->listen = skcf->listen;
2566 
2567             nxt_queue_remove(qlk);
2568             nxt_queue_insert_tail(&keeping_sockets, qlk);
2569 
2570             nxt_queue_insert_tail(&updating_sockets, &nskcf->link);
2571 
2572             return NXT_OK;
2573         }
2574     }
2575 
2576     nxt_queue_insert_tail(&pending_sockets, &nskcf->link);
2577 
2578     return NXT_DECLINED;
2579 }
2580 
2581 
2582 static void
2583 nxt_router_listen_socket_rpc_create(nxt_task_t *task,
2584     nxt_router_temp_conf_t *tmcf, nxt_socket_conf_t *skcf)
2585 {
2586     size_t            size;
2587     uint32_t          stream;
2588     nxt_int_t         ret;
2589     nxt_buf_t         *b;
2590     nxt_port_t        *main_port, *router_port;
2591     nxt_runtime_t     *rt;
2592     nxt_socket_rpc_t  *rpc;
2593 
2594     rpc = nxt_mp_alloc(tmcf->mem_pool, sizeof(nxt_socket_rpc_t));
2595     if (rpc == NULL) {
2596         goto fail;
2597     }
2598 
2599     rpc->socket_conf = skcf;
2600     rpc->temp_conf = tmcf;
2601 
2602     size = nxt_sockaddr_size(skcf->listen->sockaddr);
2603 
2604     b = nxt_buf_mem_alloc(tmcf->mem_pool, size, 0);
2605     if (b == NULL) {
2606         goto fail;
2607     }
2608 
2609     b->completion_handler = nxt_buf_dummy_completion;
2610 
2611     b->mem.free = nxt_cpymem(b->mem.free, skcf->listen->sockaddr, size);
2612 
2613     rt = task->thread->runtime;
2614     main_port = rt->port_by_type[NXT_PROCESS_MAIN];
2615     router_port = rt->port_by_type[NXT_PROCESS_ROUTER];
2616 
2617     stream = nxt_port_rpc_register_handler(task, router_port,
2618                                            nxt_router_listen_socket_ready,
2619                                            nxt_router_listen_socket_error,
2620                                            main_port->pid, rpc);
2621     if (nxt_slow_path(stream == 0)) {
2622         goto fail;
2623     }
2624 
2625     ret = nxt_port_socket_write(task, main_port, NXT_PORT_MSG_SOCKET, -1,
2626                                 stream, router_port->id, b);
2627 
2628     if (nxt_slow_path(ret != NXT_OK)) {
2629         nxt_port_rpc_cancel(task, router_port, stream);
2630         goto fail;
2631     }
2632 
2633     return;
2634 
2635 fail:
2636 
2637     nxt_router_conf_error(task, tmcf);
2638 }
2639 
2640 
2641 static void
2642 nxt_router_listen_socket_ready(nxt_task_t *task, nxt_port_recv_msg_t *msg,
2643     void *data)
2644 {
2645     nxt_int_t         ret;
2646     nxt_socket_t      s;
2647     nxt_socket_rpc_t  *rpc;
2648 
2649     rpc = data;
2650 
2651     s = msg->fd[0];
2652 
2653     ret = nxt_socket_nonblocking(task, s);
2654     if (nxt_slow_path(ret != NXT_OK)) {
2655         goto fail;
2656     }
2657 
2658     nxt_socket_defer_accept(task, s, rpc->socket_conf->listen->sockaddr);
2659 
2660     ret = nxt_listen_socket(task, s, NXT_LISTEN_BACKLOG);
2661     if (nxt_slow_path(ret != NXT_OK)) {
2662         goto fail;
2663     }
2664 
2665     rpc->socket_conf->listen->socket = s;
2666 
2667     nxt_work_queue_add(&task->thread->engine->fast_work_queue,
2668                        nxt_router_conf_apply, task, rpc->temp_conf, NULL);
2669 
2670     return;
2671 
2672 fail:
2673 
2674     nxt_socket_close(task, s);
2675 
2676     nxt_router_conf_error(task, rpc->temp_conf);
2677 }
2678 
2679 
2680 static void
2681 nxt_router_listen_socket_error(nxt_task_t *task, nxt_port_recv_msg_t *msg,
2682     void *data)
2683 {
2684     nxt_socket_rpc_t        *rpc;
2685     nxt_router_temp_conf_t  *tmcf;
2686 
2687     rpc = data;
2688     tmcf = rpc->temp_conf;
2689 
2690 #if 0
2691     u_char                  *p;
2692     size_t                  size;
2693     uint8_t                 error;
2694     nxt_buf_t               *in, *out;
2695     nxt_sockaddr_t          *sa;
2696 
2697     static nxt_str_t  socket_errors[] = {
2698         nxt_string("ListenerSystem"),
2699         nxt_string("ListenerNoIPv6"),
2700         nxt_string("ListenerPort"),
2701         nxt_string("ListenerInUse"),
2702         nxt_string("ListenerNoAddress"),
2703         nxt_string("ListenerNoAccess"),
2704         nxt_string("ListenerPath"),
2705     };
2706 
2707     sa = rpc->socket_conf->listen->sockaddr;
2708 
2709     in = nxt_buf_chk_make_plain(tmcf->mem_pool, msg->buf, msg->size);
2710 
2711     if (nxt_slow_path(in == NULL)) {
2712         return;
2713     }
2714 
2715     p = in->mem.pos;
2716 
2717     error = *p++;
2718 
2719     size = nxt_length("listen socket error: ")
2720            + nxt_length("{listener: \"\", code:\"\", message: \"\"}")
2721            + sa->length + socket_errors[error].length + (in->mem.free - p);
2722 
2723     out = nxt_buf_mem_alloc(tmcf->mem_pool, size, 0);
2724     if (nxt_slow_path(out == NULL)) {
2725         return;
2726     }
2727 
2728     out->mem.free = nxt_sprintf(out->mem.free, out->mem.end,
2729                         "listen socket error: "
2730                         "{listener: \"%*s\", code:\"%V\", message: \"%*s\"}",
2731                         (size_t) sa->length, nxt_sockaddr_start(sa),
2732                         &socket_errors[error], in->mem.free - p, p);
2733 
2734     nxt_debug(task, "%*s", out->mem.free - out->mem.pos, out->mem.pos);
2735 #endif
2736 
2737     nxt_router_conf_error(task, tmcf);
2738 }
2739 
2740 
2741 #if (NXT_TLS)
2742 
2743 static void
2744 nxt_router_tls_rpc_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg,
2745     void *data)
2746 {
2747     nxt_mp_t                *mp;
2748     nxt_int_t               ret;
2749     nxt_tls_conf_t          *tlscf;
2750     nxt_router_tlssock_t    *tls;
2751     nxt_tls_bundle_conf_t   *bundle;
2752     nxt_router_temp_conf_t  *tmcf;
2753 
2754     nxt_debug(task, "tls rpc handler");
2755 
2756     tls = data;
2757     tmcf = tls->temp_conf;
2758 
2759     if (msg == NULL || msg->port_msg.type == _NXT_PORT_MSG_RPC_ERROR) {
2760         goto fail;
2761     }
2762 
2763     mp = tmcf->router_conf->mem_pool;
2764 
2765     if (tls->socket_conf->tls == NULL){
2766         tlscf = nxt_mp_zget(mp, sizeof(nxt_tls_conf_t));
2767         if (nxt_slow_path(tlscf == NULL)) {
2768             goto fail;
2769         }
2770 
2771         tlscf->no_wait_shutdown = 1;
2772         tls->socket_conf->tls = tlscf;
2773 
2774     } else {
2775         tlscf = tls->socket_conf->tls;
2776     }
2777 
2778     tls->tls_init->conf = tlscf;
2779 
2780     bundle = nxt_mp_get(mp, sizeof(nxt_tls_bundle_conf_t));
2781     if (nxt_slow_path(bundle == NULL)) {
2782         goto fail;
2783     }
2784 
2785     if (nxt_slow_path(nxt_str_dup(mp, &bundle->name, &tls->name) == NULL)) {
2786         goto fail;
2787     }
2788 
2789     bundle->chain_file = msg->fd[0];
2790     bundle->next = tlscf->bundle;
2791     tlscf->bundle = bundle;
2792 
2793     ret = task->thread->runtime->tls->server_init(task, mp, tls->tls_init,
2794                                                   tls->last);
2795     if (nxt_slow_path(ret != NXT_OK)) {
2796         goto fail;
2797     }
2798 
2799     nxt_work_queue_add(&task->thread->engine->fast_work_queue,
2800                        nxt_router_conf_apply, task, tmcf, NULL);
2801     return;
2802 
2803 fail:
2804 
2805     nxt_router_conf_error(task, tmcf);
2806 }
2807 
2808 #endif
2809 
2810 
2811 static void
2812 nxt_router_app_rpc_create(nxt_task_t *task,
2813     nxt_router_temp_conf_t *tmcf, nxt_app_t *app)
2814 {
2815     size_t         size;
2816     uint32_t       stream;
2817     nxt_fd_t       port_fd, queue_fd;
2818     nxt_int_t      ret;
2819     nxt_buf_t      *b;
2820     nxt_port_t     *router_port, *dport;
2821     nxt_runtime_t  *rt;
2822     nxt_app_rpc_t  *rpc;
2823 
2824     rt = task->thread->runtime;
2825 
2826     dport = app->proto_port;
2827 
2828     if (dport == NULL) {
2829         nxt_debug(task, "app '%V' prototype prefork", &app->name);
2830 
2831         size = app->name.length + 1 + app->conf.length;
2832 
2833         b = nxt_buf_mem_alloc(tmcf->mem_pool, size, 0);
2834         if (nxt_slow_path(b == NULL)) {
2835             goto fail;
2836         }
2837 
2838         b->completion_handler = nxt_buf_dummy_completion;
2839 
2840         nxt_buf_cpystr(b, &app->name);
2841         *b->mem.free++ = '\0';
2842         nxt_buf_cpystr(b, &app->conf);
2843 
2844         dport = rt->port_by_type[NXT_PROCESS_MAIN];
2845 
2846         port_fd = app->shared_port->pair[0];
2847         queue_fd = app->shared_port->queue_fd;
2848 
2849     } else {
2850         nxt_debug(task, "app '%V' prefork", &app->name);
2851 
2852         b = NULL;
2853         port_fd = -1;
2854         queue_fd = -1;
2855     }
2856 
2857     router_port = rt->port_by_type[NXT_PROCESS_ROUTER];
2858 
2859     rpc = nxt_port_rpc_register_handler_ex(task, router_port,
2860                                            nxt_router_app_prefork_ready,
2861                                            nxt_router_app_prefork_error,
2862                                            sizeof(nxt_app_rpc_t));
2863     if (nxt_slow_path(rpc == NULL)) {
2864         goto fail;
2865     }
2866 
2867     rpc->app = app;
2868     rpc->temp_conf = tmcf;
2869     rpc->proto = (b != NULL);
2870 
2871     stream = nxt_port_rpc_ex_stream(rpc);
2872 
2873     ret = nxt_port_socket_write2(task, dport, NXT_PORT_MSG_START_PROCESS,
2874                                  port_fd, queue_fd, stream, router_port->id, b);
2875     if (nxt_slow_path(ret != NXT_OK)) {
2876         nxt_port_rpc_cancel(task, router_port, stream);
2877         goto fail;
2878     }
2879 
2880     if (b == NULL) {
2881         nxt_port_rpc_ex_set_peer(task, router_port, rpc, dport->pid);
2882 
2883         app->pending_processes++;
2884     }
2885 
2886     return;
2887 
2888 fail:
2889 
2890     nxt_router_conf_error(task, tmcf);
2891 }
2892 
2893 
2894 static void
2895 nxt_router_app_prefork_ready(nxt_task_t *task, nxt_port_recv_msg_t *msg,
2896     void *data)
2897 {
2898     nxt_app_t           *app;
2899     nxt_port_t          *port;
2900     nxt_app_rpc_t       *rpc;
2901     nxt_event_engine_t  *engine;
2902 
2903     rpc = data;
2904     app = rpc->app;
2905 
2906     port = msg->u.new_port;
2907 
2908     nxt_assert(port != NULL);
2909     nxt_assert(port->id == 0);
2910 
2911     if (rpc->proto) {
2912         nxt_assert(app->proto_port == NULL);
2913         nxt_assert(port->type == NXT_PROCESS_PROTOTYPE);
2914 
2915         nxt_port_inc_use(port);
2916 
2917         app->proto_port = port;
2918         port->app = app;
2919 
2920         nxt_router_app_rpc_create(task, rpc->temp_conf, app);
2921 
2922         return;
2923     }
2924 
2925     nxt_assert(port->type == NXT_PROCESS_APP);
2926 
2927     port->app = app;
2928     port->main_app_port = port;
2929 
2930     app->pending_processes--;
2931     app->processes++;
2932     app->idle_processes++;
2933 
2934     engine = task->thread->engine;
2935 
2936     nxt_queue_insert_tail(&app->ports, &port->app_link);
2937     nxt_queue_insert_tail(&app->spare_ports, &port->idle_link);
2938 
2939     nxt_debug(task, "app '%V' move new port %PI:%d to spare_ports",
2940               &app->name, port->pid, port->id);
2941 
2942     nxt_port_hash_add(&app->port_hash, port);
2943     app->port_hash_count++;
2944 
2945     port->idle_start = 0;
2946 
2947     nxt_port_inc_use(port);
2948 
2949     nxt_port_socket_write(task, port, NXT_PORT_MSG_PORT_ACK, -1, 0, 0, NULL);
2950 
2951     nxt_work_queue_add(&engine->fast_work_queue,
2952                        nxt_router_conf_apply, task, rpc->temp_conf, NULL);
2953 }
2954 
2955 
2956 static void
2957 nxt_router_app_prefork_error(nxt_task_t *task, nxt_port_recv_msg_t *msg,
2958     void *data)
2959 {
2960     nxt_app_t               *app;
2961     nxt_app_rpc_t           *rpc;
2962     nxt_router_temp_conf_t  *tmcf;
2963 
2964     rpc = data;
2965     app = rpc->app;
2966     tmcf = rpc->temp_conf;
2967 
2968     if (rpc->proto) {
2969         nxt_log(task, NXT_LOG_WARN, "failed to start prototype \"%V\"",
2970                 &app->name);
2971 
2972     } else {
2973         nxt_log(task, NXT_LOG_WARN, "failed to start application \"%V\"",
2974                 &app->name);
2975 
2976         app->pending_processes--;
2977     }
2978 
2979     nxt_router_conf_error(task, tmcf);
2980 }
2981 
2982 
2983 static nxt_int_t
2984 nxt_router_engines_create(nxt_task_t *task, nxt_router_t *router,
2985     nxt_router_temp_conf_t *tmcf, const nxt_event_interface_t *interface)
2986 {
2987     nxt_int_t                 ret;
2988     nxt_uint_t                n, threads;
2989     nxt_queue_link_t          *qlk;
2990     nxt_router_engine_conf_t  *recf;
2991 
2992     threads = tmcf->router_conf->threads;
2993 
2994     tmcf->engines = nxt_array_create(tmcf->mem_pool, threads,
2995                                      sizeof(nxt_router_engine_conf_t));
2996     if (nxt_slow_path(tmcf->engines == NULL)) {
2997         return NXT_ERROR;
2998     }
2999 
3000     n = 0;
3001 
3002     for (qlk = nxt_queue_first(&router->engines);
3003          qlk != nxt_queue_tail(&router->engines);
3004          qlk = nxt_queue_next(qlk))
3005     {
3006         recf = nxt_array_zero_add(tmcf->engines);
3007         if (nxt_slow_path(recf == NULL)) {
3008             return NXT_ERROR;
3009         }
3010 
3011         recf->engine = nxt_queue_link_data(qlk, nxt_event_engine_t, link0);
3012 
3013         if (n < threads) {
3014             recf->action = NXT_ROUTER_ENGINE_KEEP;
3015             ret = nxt_router_engine_conf_update(tmcf, recf);
3016 
3017         } else {
3018             recf->action = NXT_ROUTER_ENGINE_DELETE;
3019             ret = nxt_router_engine_conf_delete(tmcf, recf);
3020         }
3021 
3022         if (nxt_slow_path(ret != NXT_OK)) {
3023             return ret;
3024         }
3025 
3026         n++;
3027     }
3028 
3029     tmcf->new_threads = n;
3030 
3031     while (n < threads) {
3032         recf = nxt_array_zero_add(tmcf->engines);
3033         if (nxt_slow_path(recf == NULL)) {
3034             return NXT_ERROR;
3035         }
3036 
3037         recf->action = NXT_ROUTER_ENGINE_ADD;
3038 
3039         recf->engine = nxt_event_engine_create(task, interface, NULL, 0, 0);
3040         if (nxt_slow_path(recf->engine == NULL)) {
3041             return NXT_ERROR;
3042         }
3043 
3044         ret = nxt_router_engine_conf_create(tmcf, recf);
3045         if (nxt_slow_path(ret != NXT_OK)) {
3046             return ret;
3047         }
3048 
3049         n++;
3050     }
3051 
3052     return NXT_OK;
3053 }
3054 
3055 
3056 static nxt_int_t
3057 nxt_router_engine_conf_create(nxt_router_temp_conf_t *tmcf,
3058     nxt_router_engine_conf_t *recf)
3059 {
3060     nxt_int_t  ret;
3061 
3062     ret = nxt_router_engine_joints_create(tmcf, recf, &creating_sockets,
3063                                           nxt_router_listen_socket_create);
3064     if (nxt_slow_path(ret != NXT_OK)) {
3065         return ret;
3066     }
3067 
3068     ret = nxt_router_engine_joints_create(tmcf, recf, &updating_sockets,
3069                                           nxt_router_listen_socket_create);
3070     if (nxt_slow_path(ret != NXT_OK)) {
3071         return ret;
3072     }
3073 
3074     return ret;
3075 }
3076 
3077 
3078 static nxt_int_t
3079 nxt_router_engine_conf_update(nxt_router_temp_conf_t *tmcf,
3080     nxt_router_engine_conf_t *recf)
3081 {
3082     nxt_int_t  ret;
3083 
3084     ret = nxt_router_engine_joints_create(tmcf, recf, &creating_sockets,
3085                                           nxt_router_listen_socket_create);
3086     if (nxt_slow_path(ret != NXT_OK)) {
3087         return ret;
3088     }
3089 
3090     ret = nxt_router_engine_joints_create(tmcf, recf, &updating_sockets,
3091                                           nxt_router_listen_socket_update);
3092     if (nxt_slow_path(ret != NXT_OK)) {
3093         return ret;
3094     }
3095 
3096     ret = nxt_router_engine_joints_delete(tmcf, recf, &deleting_sockets);
3097     if (nxt_slow_path(ret != NXT_OK)) {
3098         return ret;
3099     }
3100 
3101     return ret;
3102 }
3103 
3104 
3105 static nxt_int_t
3106 nxt_router_engine_conf_delete(nxt_router_temp_conf_t *tmcf,
3107     nxt_router_engine_conf_t *recf)
3108 {
3109     nxt_int_t  ret;
3110 
3111     ret = nxt_router_engine_quit(tmcf, recf);
3112     if (nxt_slow_path(ret != NXT_OK)) {
3113         return ret;
3114     }
3115 
3116     ret = nxt_router_engine_joints_delete(tmcf, recf, &updating_sockets);
3117     if (nxt_slow_path(ret != NXT_OK)) {
3118         return ret;
3119     }
3120 
3121     return nxt_router_engine_joints_delete(tmcf, recf, &deleting_sockets);
3122 }
3123 
3124 
3125 static nxt_int_t
3126 nxt_router_engine_joints_create(nxt_router_temp_conf_t *tmcf,
3127     nxt_router_engine_conf_t *recf, nxt_queue_t *sockets,
3128     nxt_work_handler_t handler)
3129 {
3130     nxt_int_t                ret;
3131     nxt_joint_job_t          *job;
3132     nxt_queue_link_t         *qlk;
3133     nxt_socket_conf_t        *skcf;
3134     nxt_socket_conf_joint_t  *joint;
3135 
3136     for (qlk = nxt_queue_first(sockets);
3137          qlk != nxt_queue_tail(sockets);
3138          qlk = nxt_queue_next(qlk))
3139     {
3140         job = nxt_mp_get(tmcf->mem_pool, sizeof(nxt_joint_job_t));
3141         if (nxt_slow_path(job == NULL)) {
3142             return NXT_ERROR;
3143         }
3144 
3145         job->work.next = recf->jobs;
3146         recf->jobs = &job->work;
3147 
3148         job->task = tmcf->engine->task;
3149         job->work.handler = handler;
3150         job->work.task = &job->task;
3151         job->work.obj = job;
3152         job->tmcf = tmcf;
3153 
3154         tmcf->count++;
3155 
3156         joint = nxt_mp_alloc(tmcf->router_conf->mem_pool,
3157                              sizeof(nxt_socket_conf_joint_t));
3158         if (nxt_slow_path(joint == NULL)) {
3159             return NXT_ERROR;
3160         }
3161 
3162         job->work.data = joint;
3163 
3164         ret = nxt_upstreams_joint_create(tmcf, &joint->upstreams);
3165         if (nxt_slow_path(ret != NXT_OK)) {
3166             return ret;
3167         }
3168 
3169         joint->count = 1;
3170 
3171         skcf = nxt_queue_link_data(qlk, nxt_socket_conf_t, link);
3172         skcf->count++;
3173         joint->socket_conf = skcf;
3174 
3175         joint->engine = recf->engine;
3176     }
3177 
3178     return NXT_OK;
3179 }
3180 
3181 
3182 static nxt_int_t
3183 nxt_router_engine_quit(nxt_router_temp_conf_t *tmcf,
3184     nxt_router_engine_conf_t *recf)
3185 {
3186     nxt_joint_job_t  *job;
3187 
3188     job = nxt_mp_get(tmcf->mem_pool, sizeof(nxt_joint_job_t));
3189     if (nxt_slow_path(job == NULL)) {
3190         return NXT_ERROR;
3191     }
3192 
3193     job->work.next = recf->jobs;
3194     recf->jobs = &job->work;
3195 
3196     job->task = tmcf->engine->task;
3197     job->work.handler = nxt_router_worker_thread_quit;
3198     job->work.task = &job->task;
3199     job->work.obj = NULL;
3200     job->work.data = NULL;
3201     job->tmcf = NULL;
3202 
3203     return NXT_OK;
3204 }
3205 
3206 
3207 static nxt_int_t
3208 nxt_router_engine_joints_delete(nxt_router_temp_conf_t *tmcf,
3209     nxt_router_engine_conf_t *recf, nxt_queue_t *sockets)
3210 {
3211     nxt_joint_job_t   *job;
3212     nxt_queue_link_t  *qlk;
3213 
3214     for (qlk = nxt_queue_first(sockets);
3215          qlk != nxt_queue_tail(sockets);
3216          qlk = nxt_queue_next(qlk))
3217     {
3218         job = nxt_mp_get(tmcf->mem_pool, sizeof(nxt_joint_job_t));
3219         if (nxt_slow_path(job == NULL)) {
3220             return NXT_ERROR;
3221         }
3222 
3223         job->work.next = recf->jobs;
3224         recf->jobs = &job->work;
3225 
3226         job->task = tmcf->engine->task;
3227         job->work.handler = nxt_router_listen_socket_delete;
3228         job->work.task = &job->task;
3229         job->work.obj = job;
3230         job->work.data = nxt_queue_link_data(qlk, nxt_socket_conf_t, link);
3231         job->tmcf = tmcf;
3232 
3233         tmcf->count++;
3234     }
3235 
3236     return NXT_OK;
3237 }
3238 
3239 
3240 static nxt_int_t
3241 nxt_router_threads_create(nxt_task_t *task, nxt_runtime_t *rt,
3242     nxt_router_temp_conf_t *tmcf)
3243 {
3244     nxt_int_t                 ret;
3245     nxt_uint_t                i, threads;
3246     nxt_router_engine_conf_t  *recf;
3247 
3248     recf = tmcf->engines->elts;
3249     threads = tmcf->router_conf->threads;
3250 
3251     for (i = tmcf->new_threads; i < threads; i++) {
3252         ret = nxt_router_thread_create(task, rt, recf[i].engine);
3253         if (nxt_slow_path(ret != NXT_OK)) {
3254             return ret;
3255         }
3256     }
3257 
3258     return NXT_OK;
3259 }
3260 
3261 
3262 static nxt_int_t
3263 nxt_router_thread_create(nxt_task_t *task, nxt_runtime_t *rt,
3264     nxt_event_engine_t *engine)
3265 {
3266     nxt_int_t            ret;
3267     nxt_thread_link_t    *link;
3268     nxt_thread_handle_t  handle;
3269 
3270     link = nxt_zalloc(sizeof(nxt_thread_link_t));
3271 
3272     if (nxt_slow_path(link == NULL)) {
3273         return NXT_ERROR;
3274     }
3275 
3276     link->start = nxt_router_thread_start;
3277     link->engine = engine;
3278     link->work.handler = nxt_router_thread_exit_handler;
3279     link->work.task = task;
3280     link->work.data = link;
3281 
3282     nxt_queue_insert_tail(&rt->engines, &engine->link);
3283 
3284     ret = nxt_thread_create(&handle, link);
3285 
3286     if (nxt_slow_path(ret != NXT_OK)) {
3287         nxt_queue_remove(&engine->link);
3288     }
3289 
3290     return ret;
3291 }
3292 
3293 
3294 static void
3295 nxt_router_apps_sort(nxt_task_t *task, nxt_router_t *router,
3296     nxt_router_temp_conf_t *tmcf)
3297 {
3298     nxt_app_t  *app;
3299 
3300     nxt_queue_each(app, &router->apps, nxt_app_t, link) {
3301 
3302         nxt_router_app_unlink(task, app);
3303 
3304     } nxt_queue_loop;
3305 
3306     nxt_queue_add(&router->apps, &tmcf->previous);
3307     nxt_queue_add(&router->apps, &tmcf->apps);
3308 }
3309 
3310 
3311 static void
3312 nxt_router_engines_post(nxt_router_t *router, nxt_router_temp_conf_t *tmcf)
3313 {
3314     nxt_uint_t                n;
3315     nxt_event_engine_t        *engine;
3316     nxt_router_engine_conf_t  *recf;
3317 
3318     recf = tmcf->engines->elts;
3319 
3320     for (n = tmcf->engines->nelts; n != 0; n--) {
3321         engine = recf->engine;
3322 
3323         switch (recf->action) {
3324 
3325         case NXT_ROUTER_ENGINE_KEEP:
3326             break;
3327 
3328         case NXT_ROUTER_ENGINE_ADD:
3329             nxt_queue_insert_tail(&router->engines, &engine->link0);
3330             break;
3331 
3332         case NXT_ROUTER_ENGINE_DELETE:
3333             nxt_queue_remove(&engine->link0);
3334             break;
3335         }
3336 
3337         nxt_router_engine_post(engine, recf->jobs);
3338 
3339         recf++;
3340     }
3341 }
3342 
3343 
3344 static void
3345 nxt_router_engine_post(nxt_event_engine_t *engine, nxt_work_t *jobs)
3346 {
3347     nxt_work_t  *work, *next;
3348 
3349     for (work = jobs; work != NULL; work = next) {
3350         next = work->next;
3351         work->next = NULL;
3352 
3353         nxt_event_engine_post(engine, work);
3354     }
3355 }
3356 
3357 
3358 static nxt_port_handlers_t  nxt_router_app_port_handlers = {
3359     .rpc_error       = nxt_port_rpc_handler,
3360     .mmap            = nxt_port_mmap_handler,
3361     .data            = nxt_port_rpc_handler,
3362     .oosm            = nxt_router_oosm_handler,
3363     .req_headers_ack = nxt_port_rpc_handler,
3364 };
3365 
3366 
3367 static void
3368 nxt_router_thread_start(void *data)
3369 {
3370     nxt_int_t           ret;
3371     nxt_port_t          *port;
3372     nxt_task_t          *task;
3373     nxt_work_t          *work;
3374     nxt_thread_t        *thread;
3375     nxt_thread_link_t   *link;
3376     nxt_event_engine_t  *engine;
3377 
3378     link = data;
3379     engine = link->engine;
3380     task = &engine->task;
3381 
3382     thread = nxt_thread();
3383 
3384     nxt_event_engine_thread_adopt(engine);
3385 
3386     /* STUB */
3387     thread->runtime = engine->task.thread->runtime;
3388 
3389     engine->task.thread = thread;
3390     engine->task.log = thread->log;
3391     thread->engine = engine;
3392     thread->task = &engine->task;
3393 #if 0
3394     thread->fiber = &engine->fibers->fiber;
3395 #endif
3396 
3397     engine->mem_pool = nxt_mp_create(4096, 128, 1024, 64);
3398     if (nxt_slow_path(engine->mem_pool == NULL)) {
3399         return;
3400     }
3401 
3402     port = nxt_port_new(task, nxt_port_get_next_id(), nxt_pid,
3403                         NXT_PROCESS_ROUTER);
3404     if (nxt_slow_path(port == NULL)) {
3405         return;
3406     }
3407 
3408     ret = nxt_port_socket_init(task, port, 0);
3409     if (nxt_slow_path(ret != NXT_OK)) {
3410         nxt_port_use(task, port, -1);
3411         return;
3412     }
3413 
3414     ret = nxt_router_port_queue_init(task, port);
3415     if (nxt_slow_path(ret != NXT_OK)) {
3416         nxt_port_use(task, port, -1);
3417         return;
3418     }
3419 
3420     engine->port = port;
3421 
3422     nxt_port_enable(task, port, &nxt_router_app_port_handlers);
3423 
3424     work = nxt_zalloc(sizeof(nxt_work_t));
3425     if (nxt_slow_path(work == NULL)) {
3426         return;
3427     }
3428 
3429     work->handler = nxt_router_rt_add_port;
3430     work->task = link->work.task;
3431     work->obj = work;
3432     work->data = port;
3433 
3434     nxt_event_engine_post(link->work.task->thread->engine, work);
3435 
3436     nxt_event_engine_start(engine);
3437 }
3438 
3439 
3440 static void
3441 nxt_router_rt_add_port(nxt_task_t *task, void *obj, void *data)
3442 {
3443     nxt_int_t      res;
3444     nxt_port_t     *port;
3445     nxt_runtime_t  *rt;
3446 
3447     rt = task->thread->runtime;
3448     port = data;
3449 
3450     nxt_free(obj);
3451 
3452     res = nxt_port_hash_add(&rt->ports, port);
3453 
3454     if (nxt_fast_path(res == NXT_OK)) {
3455         nxt_port_use(task, port, 1);
3456     }
3457 }
3458 
3459 
3460 static void
3461 nxt_router_listen_socket_create(nxt_task_t *task, void *obj, void *data)
3462 {
3463     nxt_joint_job_t          *job;
3464     nxt_socket_conf_t        *skcf;
3465     nxt_listen_event_t       *lev;
3466     nxt_listen_socket_t      *ls;
3467     nxt_thread_spinlock_t    *lock;
3468     nxt_socket_conf_joint_t  *joint;
3469 
3470     job = obj;
3471     joint = data;
3472 
3473     nxt_queue_insert_tail(&task->thread->engine->joints, &joint->link);
3474 
3475     skcf = joint->socket_conf;
3476     ls = skcf->listen;
3477 
3478     lev = nxt_listen_event(task, ls);
3479     if (nxt_slow_path(lev == NULL)) {
3480         nxt_router_listen_socket_release(task, skcf);
3481         return;
3482     }
3483 
3484     lev->socket.data = joint;
3485 
3486     lock = &skcf->router_conf->router->lock;
3487 
3488     nxt_thread_spin_lock(lock);
3489     ls->count++;
3490     nxt_thread_spin_unlock(lock);
3491 
3492     job->work.next = NULL;
3493     job->work.handler = nxt_router_conf_wait;
3494 
3495     nxt_event_engine_post(job->tmcf->engine, &job->work);
3496 }
3497 
3498 
3499 nxt_inline nxt_listen_event_t *
3500 nxt_router_listen_event(nxt_queue_t *listen_connections,
3501     nxt_socket_conf_t *skcf)
3502 {
3503     nxt_socket_t        fd;
3504     nxt_queue_link_t    *qlk;
3505     nxt_listen_event_t  *lev;
3506 
3507     fd = skcf->listen->socket;
3508 
3509     for (qlk = nxt_queue_first(listen_connections);
3510          qlk != nxt_queue_tail(listen_connections);
3511          qlk = nxt_queue_next(qlk))
3512     {
3513         lev = nxt_queue_link_data(qlk, nxt_listen_event_t, link);
3514 
3515         if (fd == lev->socket.fd) {
3516             return lev;
3517         }
3518     }
3519 
3520     return NULL;
3521 }
3522 
3523 
3524 static void
3525 nxt_router_listen_socket_update(nxt_task_t *task, void *obj, void *data)
3526 {
3527     nxt_joint_job_t          *job;
3528     nxt_event_engine_t       *engine;
3529     nxt_listen_event_t       *lev;
3530     nxt_socket_conf_joint_t  *joint, *old;
3531 
3532     job = obj;
3533     joint = data;
3534 
3535     engine = task->thread->engine;
3536 
3537     nxt_queue_insert_tail(&engine->joints, &joint->link);
3538 
3539     lev = nxt_router_listen_event(&engine->listen_connections,
3540                                   joint->socket_conf);
3541 
3542     old = lev->socket.data;
3543     lev->socket.data = joint;
3544     lev->listen = joint->socket_conf->listen;
3545 
3546     job->work.next = NULL;
3547     job->work.handler = nxt_router_conf_wait;
3548 
3549     nxt_event_engine_post(job->tmcf->engine, &job->work);
3550 
3551     /*
3552      * The task is allocated from configuration temporary
3553      * memory pool so it can be freed after engine post operation.
3554      */
3555 
3556     nxt_router_conf_release(&engine->task, old);
3557 }
3558 
3559 
3560 static void
3561 nxt_router_listen_socket_delete(nxt_task_t *task, void *obj, void *data)
3562 {
3563     nxt_socket_conf_t        *skcf;
3564     nxt_listen_event_t       *lev;
3565     nxt_event_engine_t       *engine;
3566     nxt_socket_conf_joint_t  *joint;
3567 
3568     skcf = data;
3569 
3570     engine = task->thread->engine;
3571 
3572     lev = nxt_router_listen_event(&engine->listen_connections, skcf);
3573 
3574     nxt_fd_event_delete(engine, &lev->socket);
3575 
3576     nxt_debug(task, "engine %p: listen socket delete: %d", engine,
3577               lev->socket.fd);
3578 
3579     joint = lev->socket.data;
3580     joint->close_job = obj;
3581 
3582     lev->timer.handler = nxt_router_listen_socket_close;
3583     lev->timer.work_queue = &engine->fast_work_queue;
3584 
3585     nxt_timer_add(engine, &lev->timer, 0);
3586 }
3587 
3588 
3589 static void
3590 nxt_router_worker_thread_quit(nxt_task_t *task, void *obj, void *data)
3591 {
3592     nxt_event_engine_t  *engine;
3593 
3594     nxt_debug(task, "router worker thread quit");
3595 
3596     engine = task->thread->engine;
3597 
3598     engine->shutdown = 1;
3599 
3600     if (nxt_queue_is_empty(&engine->joints)) {
3601         nxt_thread_exit(task->thread);
3602     }
3603 }
3604 
3605 
3606 static void
3607 nxt_router_listen_socket_close(nxt_task_t *task, void *obj, void *data)
3608 {
3609     nxt_timer_t              *timer;
3610     nxt_joint_job_t          *job;
3611     nxt_listen_event_t       *lev;
3612     nxt_socket_conf_joint_t  *joint;
3613 
3614     timer = obj;
3615     lev = nxt_timer_data(timer, nxt_listen_event_t, timer);
3616 
3617     nxt_debug(task, "engine %p: listen socket close: %d", task->thread->engine,
3618               lev->socket.fd);
3619 
3620     nxt_queue_remove(&lev->link);
3621 
3622     joint = lev->socket.data;
3623     lev->socket.data = NULL;
3624 
3625     /* 'task' refers to lev->task and we cannot use after nxt_free() */
3626     task = &task->thread->engine->task;
3627 
3628     nxt_router_listen_socket_release(task, joint->socket_conf);
3629 
3630     job = joint->close_job;
3631     job->work.next = NULL;
3632     job->work.handler = nxt_router_conf_wait;
3633 
3634     nxt_event_engine_post(job->tmcf->engine, &job->work);
3635 
3636     nxt_router_listen_event_release(task, lev, joint);
3637 }
3638 
3639 
3640 static void
3641 nxt_router_listen_socket_release(nxt_task_t *task, nxt_socket_conf_t *skcf)
3642 {
3643     nxt_listen_socket_t    *ls;
3644     nxt_thread_spinlock_t  *lock;
3645 
3646     ls = skcf->listen;
3647     lock = &skcf->router_conf->router->lock;
3648 
3649     nxt_thread_spin_lock(lock);
3650 
3651     nxt_debug(task, "engine %p: listen socket release: ls->count %D",
3652               task->thread->engine, ls->count);
3653 
3654     if (--ls->count != 0) {
3655         ls = NULL;
3656     }
3657 
3658     nxt_thread_spin_unlock(lock);
3659 
3660     if (ls != NULL) {
3661         nxt_socket_close(task, ls->socket);
3662         nxt_free(ls);
3663     }
3664 }
3665 
3666 
3667 void
3668 nxt_router_listen_event_release(nxt_task_t *task, nxt_listen_event_t *lev,
3669     nxt_socket_conf_joint_t *joint)
3670 {
3671     nxt_event_engine_t  *engine;
3672 
3673     nxt_debug(task, "listen event count: %D", lev->count);
3674 
3675     engine = task->thread->engine;
3676 
3677     if (--lev->count == 0) {
3678         if (lev->next != NULL) {
3679             nxt_sockaddr_cache_free(engine, lev->next);
3680 
3681             nxt_conn_free(task, lev->next);
3682         }
3683 
3684         nxt_free(lev);
3685     }
3686 
3687     if (joint != NULL) {
3688         nxt_router_conf_release(task, joint);
3689     }
3690 
3691     if (engine->shutdown && nxt_queue_is_empty(&engine->joints)) {
3692         nxt_thread_exit(task->thread);
3693     }
3694 }
3695 
3696 
3697 void
3698 nxt_router_conf_release(nxt_task_t *task, nxt_socket_conf_joint_t *joint)
3699 {
3700     nxt_socket_conf_t      *skcf;
3701     nxt_router_conf_t      *rtcf;
3702     nxt_thread_spinlock_t  *lock;
3703 
3704     nxt_debug(task, "conf joint %p count: %D", joint, joint->count);
3705 
3706     if (--joint->count != 0) {
3707         return;
3708     }
3709 
3710     nxt_queue_remove(&joint->link);
3711 
3712     /*
3713      * The joint content can not be safely used after the critical
3714      * section protected by the spinlock because its memory pool may
3715      * be already destroyed by another thread.
3716      */
3717     skcf = joint->socket_conf;
3718     rtcf = skcf->router_conf;
3719     lock = &rtcf->router->lock;
3720 
3721     nxt_thread_spin_lock(lock);
3722 
3723     nxt_debug(task, "conf skcf %p: %D, rtcf %p: %D", skcf, skcf->count,
3724               rtcf, rtcf->count);
3725 
3726     if (--skcf->count != 0) {
3727         skcf = NULL;
3728         rtcf = NULL;
3729 
3730     } else {
3731         nxt_queue_remove(&skcf->link);
3732 
3733         if (--rtcf->count != 0) {
3734             rtcf = NULL;
3735         }
3736     }
3737 
3738     nxt_thread_spin_unlock(lock);
3739 
3740 #if (NXT_TLS)
3741     if (skcf != NULL && skcf->tls != NULL) {
3742         task->thread->runtime->tls->server_free(task, skcf->tls);
3743     }
3744 #endif
3745 
3746     /* TODO remove engine->port */
3747 
3748     if (rtcf != NULL) {
3749         nxt_debug(task, "old router conf is destroyed");
3750 
3751         nxt_router_apps_hash_use(task, rtcf, -1);
3752 
3753         nxt_router_access_log_release(task, lock, rtcf->access_log);
3754 
3755         nxt_mp_thread_adopt(rtcf->mem_pool);
3756 
3757         nxt_mp_destroy(rtcf->mem_pool);
3758     }
3759 }
3760 
3761 
3762 static void
3763 nxt_router_access_log_writer(nxt_task_t *task, nxt_http_request_t *r,
3764     nxt_router_access_log_t *access_log)
3765 {
3766     size_t     size;
3767     u_char     *buf, *p;
3768     nxt_off_t  bytes;
3769 
3770     static nxt_time_string_t  date_cache = {
3771         (nxt_atomic_uint_t) -1,
3772         nxt_router_access_log_date,
3773         "%02d/%s/%4d:%02d:%02d:%02d %c%02d%02d",
3774         nxt_length("31/Dec/1986:19:40:00 +0300"),
3775         NXT_THREAD_TIME_LOCAL,
3776         NXT_THREAD_TIME_SEC,
3777     };
3778 
3779     size = r->remote->address_length
3780            + 6                  /* ' - - [' */
3781            + date_cache.size
3782            + 3                  /* '] "' */
3783            + r->method->length
3784            + 1                  /* space */
3785            + r->target.length
3786            + 1                  /* space */
3787            + r->version.length
3788            + 2                  /* '" ' */
3789            + 3                  /* status */
3790            + 1                  /* space */
3791            + NXT_OFF_T_LEN
3792            + 2                  /* ' "' */
3793            + (r->referer != NULL ? r->referer->value_length : 1)
3794            + 3                  /* '" "' */
3795            + (r->user_agent != NULL ? r->user_agent->value_length : 1)
3796            + 2                  /* '"\n' */
3797     ;
3798 
3799     buf = nxt_mp_nget(r->mem_pool, size);
3800     if (nxt_slow_path(buf == NULL)) {
3801         return;
3802     }
3803 
3804     p = nxt_cpymem(buf, nxt_sockaddr_address(r->remote),
3805                    r->remote->address_length);
3806 
3807     p = nxt_cpymem(p, " - - [", 6);
3808 
3809     p = nxt_thread_time_string(task->thread, &date_cache, p);
3810 
3811     p = nxt_cpymem(p, "] \"", 3);
3812 
3813     if (r->method->length != 0) {
3814         p = nxt_cpymem(p, r->method->start, r->method->length);
3815 
3816         if (r->target.length != 0) {
3817             *p++ = ' ';
3818             p = nxt_cpymem(p, r->target.start, r->target.length);
3819 
3820             if (r->version.length != 0) {
3821                 *p++ = ' ';
3822                 p = nxt_cpymem(p, r->version.start, r->version.length);
3823             }
3824         }
3825 
3826     } else {
3827         *p++ = '-';
3828     }
3829 
3830     p = nxt_cpymem(p, "\" ", 2);
3831 
3832     p = nxt_sprintf(p, p + 3, "%03d", r->status);
3833 
3834     *p++ = ' ';
3835 
3836     bytes = nxt_http_proto[r->protocol].body_bytes_sent(task, r->proto);
3837 
3838     p = nxt_sprintf(p, p + NXT_OFF_T_LEN, "%O", bytes);
3839 
3840     p = nxt_cpymem(p, " \"", 2);
3841 
3842     if (r->referer != NULL) {
3843         p = nxt_cpymem(p, r->referer->value, r->referer->value_length);
3844 
3845     } else {
3846         *p++ = '-';
3847     }
3848 
3849     p = nxt_cpymem(p, "\" \"", 3);
3850 
3851     if (r->user_agent != NULL) {
3852         p = nxt_cpymem(p, r->user_agent->value, r->user_agent->value_length);
3853 
3854     } else {
3855         *p++ = '-';
3856     }
3857 
3858     p = nxt_cpymem(p, "\"\n", 2);
3859 
3860     nxt_fd_write(access_log->fd, buf, p - buf);
3861 }
3862 
3863 
3864 static u_char *
3865 nxt_router_access_log_date(u_char *buf, nxt_realtime_t *now, struct tm *tm,
3866     size_t size, const char *format)
3867 {
3868     u_char  sign;
3869     time_t  gmtoff;
3870 
3871     static const char  *month[] = { "Jan", "Feb", "Mar", "Apr", "May", "Jun",
3872                                     "Jul", "Aug", "Sep", "Oct", "Nov", "Dec" };
3873 
3874     gmtoff = nxt_timezone(tm) / 60;
3875 
3876     if (gmtoff < 0) {
3877         gmtoff = -gmtoff;
3878         sign = '-';
3879 
3880     } else {
3881         sign = '+';
3882     }
3883 
3884     return nxt_sprintf(buf, buf + size, format,
3885                        tm->tm_mday, month[tm->tm_mon], tm->tm_year + 1900,
3886                        tm->tm_hour, tm->tm_min, tm->tm_sec,
3887                        sign, gmtoff / 60, gmtoff % 60);
3888 }
3889 
3890 
3891 static void
3892 nxt_router_access_log_open(nxt_task_t *task, nxt_router_temp_conf_t *tmcf)
3893 {
3894     uint32_t                 stream;
3895     nxt_int_t                ret;
3896     nxt_buf_t                *b;
3897     nxt_port_t               *main_port, *router_port;
3898     nxt_runtime_t            *rt;
3899     nxt_router_access_log_t  *access_log;
3900 
3901     access_log = tmcf->router_conf->access_log;
3902 
3903     b = nxt_buf_mem_alloc(tmcf->mem_pool, access_log->path.length + 1, 0);
3904     if (nxt_slow_path(b == NULL)) {
3905         goto fail;
3906     }
3907 
3908     b->completion_handler = nxt_buf_dummy_completion;
3909 
3910     nxt_buf_cpystr(b, &access_log->path);
3911     *b->mem.free++ = '\0';
3912 
3913     rt = task->thread->runtime;
3914     main_port = rt->port_by_type[NXT_PROCESS_MAIN];
3915     router_port = rt->port_by_type[NXT_PROCESS_ROUTER];
3916 
3917     stream = nxt_port_rpc_register_handler(task, router_port,
3918                                            nxt_router_access_log_ready,
3919                                            nxt_router_access_log_error,
3920                                            -1, tmcf);
3921     if (nxt_slow_path(stream == 0)) {
3922         goto fail;
3923     }
3924 
3925     ret = nxt_port_socket_write(task, main_port, NXT_PORT_MSG_ACCESS_LOG, -1,
3926                                 stream, router_port->id, b);
3927 
3928     if (nxt_slow_path(ret != NXT_OK)) {
3929         nxt_port_rpc_cancel(task, router_port, stream);
3930         goto fail;
3931     }
3932 
3933     return;
3934 
3935 fail:
3936 
3937     nxt_router_conf_error(task, tmcf);
3938 }
3939 
3940 
3941 static void
3942 nxt_router_access_log_ready(nxt_task_t *task, nxt_port_recv_msg_t *msg,
3943     void *data)
3944 {
3945     nxt_router_temp_conf_t   *tmcf;
3946     nxt_router_access_log_t  *access_log;
3947 
3948     tmcf = data;
3949 
3950     access_log = tmcf->router_conf->access_log;
3951 
3952     access_log->fd = msg->fd[0];
3953 
3954     nxt_work_queue_add(&task->thread->engine->fast_work_queue,
3955                        nxt_router_conf_apply, task, tmcf, NULL);
3956 }
3957 
3958 
3959 static void
3960 nxt_router_access_log_error(nxt_task_t *task, nxt_port_recv_msg_t *msg,
3961     void *data)
3962 {
3963     nxt_router_temp_conf_t  *tmcf;
3964 
3965     tmcf = data;
3966 
3967     nxt_router_conf_error(task, tmcf);
3968 }
3969 
3970 
3971 static void
3972 nxt_router_access_log_use(nxt_thread_spinlock_t *lock,
3973     nxt_router_access_log_t *access_log)
3974 {
3975     if (access_log == NULL) {
3976         return;
3977     }
3978 
3979     nxt_thread_spin_lock(lock);
3980 
3981     access_log->count++;
3982 
3983     nxt_thread_spin_unlock(lock);
3984 }
3985 
3986 
3987 static void
3988 nxt_router_access_log_release(nxt_task_t *task, nxt_thread_spinlock_t *lock,
3989     nxt_router_access_log_t *access_log)
3990 {
3991     if (access_log == NULL) {
3992         return;
3993     }
3994 
3995     nxt_thread_spin_lock(lock);
3996 
3997     if (--access_log->count != 0) {
3998         access_log = NULL;
3999     }
4000 
4001     nxt_thread_spin_unlock(lock);
4002 
4003     if (access_log != NULL) {
4004 
4005         if (access_log->fd != -1) {
4006             nxt_fd_close(access_log->fd);
4007         }
4008 
4009         nxt_free(access_log);
4010     }
4011 }
4012 
4013 
4014 typedef struct {
4015     nxt_mp_t                 *mem_pool;
4016     nxt_router_access_log_t  *access_log;
4017 } nxt_router_access_log_reopen_t;
4018 
4019 
4020 static void
4021 nxt_router_access_log_reopen_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
4022 {
4023     nxt_mp_t                        *mp;
4024     uint32_t                        stream;
4025     nxt_int_t                       ret;
4026     nxt_buf_t                       *b;
4027     nxt_port_t                      *main_port, *router_port;
4028     nxt_runtime_t                   *rt;
4029     nxt_router_access_log_t         *access_log;
4030     nxt_router_access_log_reopen_t  *reopen;
4031 
4032     access_log = nxt_router->access_log;
4033 
4034     if (access_log == NULL) {
4035         return;
4036     }
4037 
4038     mp = nxt_mp_create(1024, 128, 256, 32);
4039     if (nxt_slow_path(mp == NULL)) {
4040         return;
4041     }
4042 
4043     reopen = nxt_mp_get(mp, sizeof(nxt_router_access_log_reopen_t));
4044     if (nxt_slow_path(reopen == NULL)) {
4045         goto fail;
4046     }
4047 
4048     reopen->mem_pool = mp;
4049     reopen->access_log = access_log;
4050 
4051     b = nxt_buf_mem_alloc(mp, access_log->path.length + 1, 0);
4052     if (nxt_slow_path(b == NULL)) {
4053         goto fail;
4054     }
4055 
4056     b->completion_handler = nxt_router_access_log_reopen_completion;
4057 
4058     nxt_buf_cpystr(b, &access_log->path);
4059     *b->mem.free++ = '\0';
4060 
4061     rt = task->thread->runtime;
4062     main_port = rt->port_by_type[NXT_PROCESS_MAIN];
4063     router_port = rt->port_by_type[NXT_PROCESS_ROUTER];
4064 
4065     stream = nxt_port_rpc_register_handler(task, router_port,
4066                                            nxt_router_access_log_reopen_ready,
4067                                            nxt_router_access_log_reopen_error,
4068                                            -1, reopen);
4069     if (nxt_slow_path(stream == 0)) {
4070         goto fail;
4071     }
4072 
4073     ret = nxt_port_socket_write(task, main_port, NXT_PORT_MSG_ACCESS_LOG, -1,
4074                                 stream, router_port->id, b);
4075 
4076     if (nxt_slow_path(ret != NXT_OK)) {
4077         nxt_port_rpc_cancel(task, router_port, stream);
4078         goto fail;
4079     }
4080 
4081     nxt_mp_retain(mp);
4082 
4083     return;
4084 
4085 fail:
4086 
4087     nxt_mp_destroy(mp);
4088 }
4089 
4090 
4091 static void
4092 nxt_router_access_log_reopen_completion(nxt_task_t *task, void *obj, void *data)
4093 {
4094     nxt_mp_t   *mp;
4095     nxt_buf_t  *b;
4096 
4097     b = obj;
4098     mp = b->data;
4099 
4100     nxt_mp_release(mp);
4101 }
4102 
4103 
4104 static void
4105 nxt_router_access_log_reopen_ready(nxt_task_t *task, nxt_port_recv_msg_t *msg,
4106     void *data)
4107 {
4108     nxt_router_access_log_t         *access_log;
4109     nxt_router_access_log_reopen_t  *reopen;
4110 
4111     reopen = data;
4112 
4113     access_log = reopen->access_log;
4114 
4115     if (access_log == nxt_router->access_log) {
4116 
4117         if (nxt_slow_path(dup2(msg->fd[0], access_log->fd) == -1)) {
4118             nxt_alert(task, "dup2(%FD, %FD) failed %E",
4119                       msg->fd[0], access_log->fd, nxt_errno);
4120         }
4121     }
4122 
4123     nxt_fd_close(msg->fd[0]);
4124     nxt_mp_release(reopen->mem_pool);
4125 }
4126 
4127 
4128 static void
4129 nxt_router_access_log_reopen_error(nxt_task_t *task, nxt_port_recv_msg_t *msg,
4130     void *data)
4131 {
4132     nxt_router_access_log_reopen_t  *reopen;
4133 
4134     reopen = data;
4135 
4136     nxt_mp_release(reopen->mem_pool);
4137 }
4138 
4139 
4140 static void
4141 nxt_router_thread_exit_handler(nxt_task_t *task, void *obj, void *data)
4142 {
4143     nxt_port_t           *port;
4144     nxt_thread_link_t    *link;
4145     nxt_event_engine_t   *engine;
4146     nxt_thread_handle_t  handle;
4147 
4148     handle = (nxt_thread_handle_t) (uintptr_t) obj;
4149     link = data;
4150 
4151     nxt_thread_wait(handle);
4152 
4153     engine = link->engine;
4154 
4155     nxt_queue_remove(&engine->link);
4156 
4157     port = engine->port;
4158 
4159     // TODO notify all apps
4160 
4161     port->engine = task->thread->engine;
4162     nxt_mp_thread_adopt(port->mem_pool);
4163     nxt_port_use(task, port, -1);
4164 
4165     nxt_mp_thread_adopt(engine->mem_pool);
4166     nxt_mp_destroy(engine->mem_pool);
4167 
4168     nxt_event_engine_free(engine);
4169 
4170     nxt_free(link);
4171 }
4172 
4173 
4174 static void
4175 nxt_router_response_ready_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg,
4176     void *data)
4177 {
4178     size_t                  b_size, count;
4179     nxt_int_t               ret;
4180     nxt_app_t               *app;
4181     nxt_buf_t               *b, *next;
4182     nxt_port_t              *app_port;
4183     nxt_unit_field_t        *f;
4184     nxt_http_field_t        *field;
4185     nxt_http_request_t      *r;
4186     nxt_unit_response_t     *resp;
4187     nxt_request_rpc_data_t  *req_rpc_data;
4188 
4189     req_rpc_data = data;
4190 
4191     r = req_rpc_data->request;
4192     if (nxt_slow_path(r == NULL)) {
4193         return;
4194     }
4195 
4196     if (r->error) {
4197         nxt_request_rpc_data_unlink(task, req_rpc_data);
4198         return;
4199     }
4200 
4201     app = req_rpc_data->app;
4202     nxt_assert(app != NULL);
4203 
4204     if (msg->port_msg.type == _NXT_PORT_MSG_REQ_HEADERS_ACK) {
4205         nxt_router_req_headers_ack_handler(task, msg, req_rpc_data);
4206 
4207         return;
4208     }
4209 
4210     b = (msg->size == 0) ? NULL : msg->buf;
4211 
4212     if (msg->port_msg.last != 0) {
4213         nxt_debug(task, "router data create last buf");
4214 
4215         nxt_buf_chain_add(&b, nxt_http_buf_last(r));
4216 
4217         req_rpc_data->rpc_cancel = 0;
4218 
4219         if (req_rpc_data->apr_action == NXT_APR_REQUEST_FAILED) {
4220             req_rpc_data->apr_action = NXT_APR_GOT_RESPONSE;
4221         }
4222 
4223         nxt_request_rpc_data_unlink(task, req_rpc_data);
4224 
4225     } else {
4226         if (app->timeout != 0) {
4227             r->timer.handler = nxt_router_app_timeout;
4228             r->timer_data = req_rpc_data;
4229             nxt_timer_add(task->thread->engine, &r->timer, app->timeout);
4230         }
4231     }
4232 
4233     if (b == NULL) {
4234         return;
4235     }
4236 
4237     if (msg->buf == b) {
4238         /* Disable instant buffer completion/re-using by port. */
4239         msg->buf = NULL;
4240     }
4241 
4242     if (r->header_sent) {
4243         nxt_buf_chain_add(&r->out, b);
4244         nxt_http_request_send_body(task, r, NULL);
4245 
4246     } else {
4247         b_size = nxt_buf_is_mem(b) ? nxt_buf_mem_used_size(&b->mem) : 0;
4248 
4249         if (nxt_slow_path(b_size < sizeof(nxt_unit_response_t))) {
4250             nxt_alert(task, "response buffer too small: %z", b_size);
4251             goto fail;
4252         }
4253 
4254         resp = (void *) b->mem.pos;
4255         count = (b_size - sizeof(nxt_unit_response_t))
4256                     / sizeof(nxt_unit_field_t);
4257 
4258         if (nxt_slow_path(count < resp->fields_count)) {
4259             nxt_alert(task, "response buffer too small for fields count: %D",
4260                       resp->fields_count);
4261             goto fail;
4262         }
4263 
4264         field = NULL;
4265 
4266         for (f = resp->fields; f < resp->fields + resp->fields_count; f++) {
4267             if (f->skip) {
4268                 continue;
4269             }
4270 
4271             field = nxt_list_add(r->resp.fields);
4272 
4273             if (nxt_slow_path(field == NULL)) {
4274                 goto fail;
4275             }
4276 
4277             field->hash = f->hash;
4278             field->skip = 0;
4279             field->hopbyhop = 0;
4280 
4281             field->name_length = f->name_length;
4282             field->value_length = f->value_length;
4283             field->name = nxt_unit_sptr_get(&f->name);
4284             field->value = nxt_unit_sptr_get(&f->value);
4285 
4286             ret = nxt_http_field_process(field, &nxt_response_fields_hash, r);
4287             if (nxt_slow_path(ret != NXT_OK)) {
4288                 goto fail;
4289             }
4290 
4291             nxt_debug(task, "header%s: %*s: %*s",
4292                       (field->skip ? " skipped" : ""),
4293                       (size_t) field->name_length, field->name,
4294                       (size_t) field->value_length, field->value);
4295 
4296             if (field->skip) {
4297                 r->resp.fields->last->nelts--;
4298             }
4299         }
4300 
4301         r->status = resp->status;
4302 
4303         if (resp->piggyback_content_length != 0) {
4304             b->mem.pos = nxt_unit_sptr_get(&resp->piggyback_content);
4305             b->mem.free = b->mem.pos + resp->piggyback_content_length;
4306 
4307         } else {
4308             b->mem.pos = b->mem.free;
4309         }
4310 
4311         if (nxt_buf_mem_used_size(&b->mem) == 0) {
4312             next = b->next;
4313             b->next = NULL;
4314 
4315             nxt_work_queue_add(&task->thread->engine->fast_work_queue,
4316                                b->completion_handler, task, b, b->parent);
4317 
4318             b = next;
4319         }
4320 
4321         if (b != NULL) {
4322             nxt_buf_chain_add(&r->out, b);
4323         }
4324 
4325         nxt_http_request_header_send(task, r, nxt_http_request_send_body, NULL);
4326 
4327         if (r->websocket_handshake
4328             && r->status == NXT_HTTP_SWITCHING_PROTOCOLS)
4329         {
4330             app_port = req_rpc_data->app_port;
4331             if (nxt_slow_path(app_port == NULL)) {
4332                 goto fail;
4333             }
4334 
4335             nxt_thread_mutex_lock(&app->mutex);
4336 
4337             app_port->main_app_port->active_websockets++;
4338 
4339             nxt_thread_mutex_unlock(&app->mutex);
4340 
4341             nxt_router_app_port_release(task, app, app_port, NXT_APR_UPGRADE);
4342             req_rpc_data->apr_action = NXT_APR_CLOSE;
4343 
4344             nxt_debug(task, "stream #%uD upgrade", req_rpc_data->stream);
4345 
4346             r->state = &nxt_http_websocket;
4347 
4348         } else {
4349             r->state = &nxt_http_request_send_state;
4350         }
4351     }
4352 
4353     return;
4354 
4355 fail:
4356 
4357     nxt_http_request_error(task, r, NXT_HTTP_SERVICE_UNAVAILABLE);
4358 
4359     nxt_request_rpc_data_unlink(task, req_rpc_data);
4360 }
4361 
4362 
4363 static void
4364 nxt_router_req_headers_ack_handler(nxt_task_t *task,
4365     nxt_port_recv_msg_t *msg, nxt_request_rpc_data_t *req_rpc_data)
4366 {
4367     int                 res;
4368     nxt_app_t           *app;
4369     nxt_buf_t           *b;
4370     nxt_bool_t          start_process, unlinked;
4371     nxt_port_t          *app_port, *main_app_port, *idle_port;
4372     nxt_queue_link_t    *idle_lnk;
4373     nxt_http_request_t  *r;
4374 
4375     nxt_debug(task, "stream #%uD: got ack from %PI:%d",
4376               req_rpc_data->stream,
4377               msg->port_msg.pid, msg->port_msg.reply_port);
4378 
4379     nxt_port_rpc_ex_set_peer(task, msg->port, req_rpc_data,
4380                              msg->port_msg.pid);
4381 
4382     app = req_rpc_data->app;
4383     r = req_rpc_data->request;
4384 
4385     start_process = 0;
4386     unlinked = 0;
4387 
4388     nxt_thread_mutex_lock(&app->mutex);
4389 
4390     if (r->app_link.next != NULL) {
4391         nxt_queue_remove(&r->app_link);
4392         r->app_link.next = NULL;
4393 
4394         unlinked = 1;
4395     }
4396 
4397     app_port = nxt_port_hash_find(&app->port_hash, msg->port_msg.pid,
4398                                   msg->port_msg.reply_port);
4399     if (nxt_slow_path(app_port == NULL)) {
4400         nxt_thread_mutex_unlock(&app->mutex);
4401 
4402         nxt_http_request_error(task, r, NXT_HTTP_INTERNAL_SERVER_ERROR);
4403 
4404         if (unlinked) {
4405             nxt_mp_release(r->mem_pool);
4406         }
4407 
4408         return;
4409     }
4410 
4411     main_app_port = app_port->main_app_port;
4412 
4413     if (nxt_queue_chk_remove(&main_app_port->idle_link)) {
4414         app->idle_processes--;
4415 
4416         nxt_debug(task, "app '%V' move port %PI:%d out of %s (ack)",
4417                   &app->name, main_app_port->pid, main_app_port->id,
4418                   (main_app_port->idle_start ? "idle_ports" : "spare_ports"));
4419 
4420         /* Check port was in 'spare_ports' using idle_start field. */
4421         if (main_app_port->idle_start == 0
4422             && app->idle_processes >= app->spare_processes)
4423         {
4424             /*
4425              * If there is a vacant space in spare ports,
4426              * move the last idle to spare_ports.
4427              */
4428             nxt_assert(!nxt_queue_is_empty(&app->idle_ports));
4429 
4430             idle_lnk = nxt_queue_last(&app->idle_ports);
4431             idle_port = nxt_queue_link_data(idle_lnk, nxt_port_t, idle_link);
4432             nxt_queue_remove(idle_lnk);
4433 
4434             nxt_queue_insert_tail(&app->spare_ports, idle_lnk);
4435 
4436             idle_port->idle_start = 0;
4437 
4438             nxt_debug(task, "app '%V' move port %PI:%d from idle_ports "
4439                       "to spare_ports",
4440                       &app->name, idle_port->pid, idle_port->id);
4441         }
4442 
4443         if (nxt_router_app_can_start(app) && nxt_router_app_need_start(app)) {
4444             app->pending_processes++;
4445             start_process = 1;
4446         }
4447     }
4448 
4449     main_app_port->active_requests++;
4450 
4451     nxt_port_inc_use(app_port);
4452 
4453     nxt_thread_mutex_unlock(&app->mutex);
4454 
4455     if (unlinked) {
4456         nxt_mp_release(r->mem_pool);
4457     }
4458 
4459     if (start_process) {
4460         nxt_router_start_app_process(task, app);
4461     }
4462 
4463     nxt_port_use(task, req_rpc_data->app_port, -1);
4464 
4465     req_rpc_data->app_port = app_port;
4466 
4467     b = req_rpc_data->msg_info.buf;
4468 
4469     if (b != NULL) {
4470         /* First buffer is already sent.  Start from second. */
4471         b = b->next;
4472 
4473         req_rpc_data->msg_info.buf->next = NULL;
4474     }
4475 
4476     if (req_rpc_data->msg_info.body_fd != -1 || b != NULL) {
4477         nxt_debug(task, "stream #%uD: send body fd %d", req_rpc_data->stream,
4478                   req_rpc_data->msg_info.body_fd);
4479 
4480         if (req_rpc_data->msg_info.body_fd != -1) {
4481             lseek(req_rpc_data->msg_info.body_fd, 0, SEEK_SET);
4482         }
4483 
4484         res = nxt_port_socket_write(task, app_port, NXT_PORT_MSG_REQ_BODY,
4485                                     req_rpc_data->msg_info.body_fd,
4486                                     req_rpc_data->stream,
4487                                     task->thread->engine->port->id, b);
4488 
4489         if (nxt_slow_path(res != NXT_OK)) {
4490             nxt_http_request_error(task, r, NXT_HTTP_INTERNAL_SERVER_ERROR);
4491         }
4492     }
4493 
4494     if (app->timeout != 0) {
4495         r->timer.handler = nxt_router_app_timeout;
4496         r->timer_data = req_rpc_data;
4497         nxt_timer_add(task->thread->engine, &r->timer, app->timeout);
4498     }
4499 }
4500 
4501 
4502 static const nxt_http_request_state_t  nxt_http_request_send_state
4503     nxt_aligned(64) =
4504 {
4505     .error_handler = nxt_http_request_error_handler,
4506 };
4507 
4508 
4509 static void
4510 nxt_http_request_send_body(nxt_task_t *task, void *obj, void *data)
4511 {
4512     nxt_buf_t           *out;
4513     nxt_http_request_t  *r;
4514 
4515     r = obj;
4516 
4517     out = r->out;
4518 
4519     if (out != NULL) {
4520         r->out = NULL;
4521         nxt_http_request_send(task, r, out);
4522     }
4523 }
4524 
4525 
4526 static void
4527 nxt_router_response_error_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg,
4528     void *data)
4529 {
4530     nxt_request_rpc_data_t  *req_rpc_data;
4531 
4532     req_rpc_data = data;
4533 
4534     req_rpc_data->rpc_cancel = 0;
4535 
4536     /* TODO cancel message and return if cancelled. */
4537     // nxt_router_msg_cancel(task, &req_rpc_data->msg_info, req_rpc_data->stream);
4538 
4539     if (req_rpc_data->request != NULL) {
4540         nxt_http_request_error(task, req_rpc_data->request,
4541                                NXT_HTTP_SERVICE_UNAVAILABLE);
4542     }
4543 
4544     nxt_request_rpc_data_unlink(task, req_rpc_data);
4545 }
4546 
4547 
4548 static void
4549 nxt_router_app_port_ready(nxt_task_t *task, nxt_port_recv_msg_t *msg,
4550     void *data)
4551 {
4552     uint32_t             n;
4553     nxt_app_t            *app;
4554     nxt_bool_t           start_process, restarted;
4555     nxt_port_t           *port;
4556     nxt_app_joint_t      *app_joint;
4557     nxt_app_joint_rpc_t  *app_joint_rpc;
4558 
4559     nxt_assert(data != NULL);
4560 
4561     app_joint_rpc = data;
4562     app_joint = app_joint_rpc->app_joint;
4563     port = msg->u.new_port;
4564 
4565     nxt_assert(app_joint != NULL);
4566     nxt_assert(port != NULL);
4567     nxt_assert(port->id == 0);
4568 
4569     app = app_joint->app;
4570 
4571     nxt_router_app_joint_use(task, app_joint, -1);
4572 
4573     if (nxt_slow_path(app == NULL)) {
4574         nxt_debug(task, "new port ready for released app, send QUIT");
4575 
4576         nxt_port_socket_write(task, port, NXT_PORT_MSG_QUIT, -1, 0, 0, NULL);
4577 
4578         return;
4579     }
4580 
4581     nxt_thread_mutex_lock(&app->mutex);
4582 
4583     restarted = (app->generation != app_joint_rpc->generation);
4584 
4585     if (app_joint_rpc->proto) {
4586         nxt_assert(app->proto_port == NULL);
4587         nxt_assert(port->type == NXT_PROCESS_PROTOTYPE);
4588 
4589         n = app->proto_port_requests;
4590         app->proto_port_requests = 0;
4591 
4592         if (nxt_slow_path(restarted)) {
4593             nxt_thread_mutex_unlock(&app->mutex);
4594 
4595             nxt_debug(task, "proto port ready for restarted app, send QUIT");
4596 
4597             nxt_port_socket_write(task, port, NXT_PORT_MSG_QUIT, -1, 0, 0,
4598                                   NULL);
4599 
4600         } else {
4601             port->app = app;
4602             app->proto_port = port;
4603 
4604             nxt_thread_mutex_unlock(&app->mutex);
4605 
4606             nxt_port_use(task, port, 1);
4607         }
4608 
4609         port = task->thread->runtime->port_by_type[NXT_PROCESS_ROUTER];
4610 
4611         while (n > 0) {
4612             nxt_router_app_use(task, app, 1);
4613 
4614             nxt_router_start_app_process_handler(task, port, app);
4615 
4616             n--;
4617         }
4618 
4619         return;
4620     }
4621 
4622     nxt_assert(port->type == NXT_PROCESS_APP);
4623     nxt_assert(app->pending_processes != 0);
4624 
4625     app->pending_processes--;
4626 
4627     if (nxt_slow_path(restarted)) {
4628         nxt_debug(task, "new port ready for restarted app, send QUIT");
4629 
4630         start_process = !task->thread->engine->shutdown
4631                         && nxt_router_app_can_start(app)
4632                         && nxt_router_app_need_start(app);
4633 
4634         if (start_process) {
4635             app->pending_processes++;
4636         }
4637 
4638         nxt_thread_mutex_unlock(&app->mutex);
4639 
4640         nxt_port_socket_write(task, port, NXT_PORT_MSG_QUIT, -1, 0, 0, NULL);
4641 
4642         if (start_process) {
4643             nxt_router_start_app_process(task, app);
4644         }
4645 
4646         return;
4647     }
4648 
4649     port->app = app;
4650     port->main_app_port = port;
4651 
4652     app->processes++;
4653     nxt_port_hash_add(&app->port_hash, port);
4654     app->port_hash_count++;
4655 
4656     nxt_thread_mutex_unlock(&app->mutex);
4657 
4658     nxt_debug(task, "app '%V' new port ready, pid %PI, %d/%d",
4659               &app->name, port->pid, app->processes, app->pending_processes);
4660 
4661     nxt_port_socket_write(task, port, NXT_PORT_MSG_PORT_ACK, -1, 0, 0, NULL);
4662 
4663     nxt_router_app_port_release(task, app, port, NXT_APR_NEW_PORT);
4664 }
4665 
4666 
4667 static void
4668 nxt_router_app_port_error(nxt_task_t *task, nxt_port_recv_msg_t *msg,
4669     void *data)
4670 {
4671     nxt_app_t            *app;
4672     nxt_app_joint_t      *app_joint;
4673     nxt_queue_link_t     *link;
4674     nxt_http_request_t   *r;
4675     nxt_app_joint_rpc_t  *app_joint_rpc;
4676 
4677     nxt_assert(data != NULL);
4678 
4679     app_joint_rpc = data;
4680     app_joint = app_joint_rpc->app_joint;
4681 
4682     nxt_assert(app_joint != NULL);
4683 
4684     app = app_joint->app;
4685 
4686     nxt_router_app_joint_use(task, app_joint, -1);
4687 
4688     if (nxt_slow_path(app == NULL)) {
4689         nxt_debug(task, "start error for released app");
4690 
4691         return;
4692     }
4693 
4694     nxt_debug(task, "app '%V' %p start error", &app->name, app);
4695 
4696     link = NULL;
4697 
4698     nxt_thread_mutex_lock(&app->mutex);
4699 
4700     nxt_assert(app->pending_processes != 0);
4701 
4702     app->pending_processes--;
4703 
4704     if (app->processes == 0 && !nxt_queue_is_empty(&app->ack_waiting_req)) {
4705         link = nxt_queue_first(&app->ack_waiting_req);
4706 
4707         nxt_queue_remove(link);
4708         link->next = NULL;
4709     }
4710 
4711     nxt_thread_mutex_unlock(&app->mutex);
4712 
4713     while (link != NULL) {
4714         r = nxt_container_of(link, nxt_http_request_t, app_link);
4715 
4716         nxt_event_engine_post(r->engine, &r->err_work);
4717 
4718         link = NULL;
4719 
4720         nxt_thread_mutex_lock(&app->mutex);
4721 
4722         if (app->processes == 0 && app->pending_processes == 0
4723             && !nxt_queue_is_empty(&app->ack_waiting_req))
4724         {
4725             link = nxt_queue_first(&app->ack_waiting_req);
4726 
4727             nxt_queue_remove(link);
4728             link->next = NULL;
4729         }
4730 
4731         nxt_thread_mutex_unlock(&app->mutex);
4732     }
4733 }
4734 
4735 
4736 nxt_inline nxt_port_t *
4737 nxt_router_app_get_port_for_quit(nxt_task_t *task, nxt_app_t *app)
4738 {
4739     nxt_port_t  *port;
4740 
4741     port = NULL;
4742 
4743     nxt_thread_mutex_lock(&app->mutex);
4744 
4745     nxt_queue_each(port, &app->ports, nxt_port_t, app_link) {
4746 
4747         /* Caller is responsible to decrease port use count. */
4748         nxt_queue_chk_remove(&port->app_link);
4749 
4750         if (nxt_queue_chk_remove(&port->idle_link)) {
4751             app->idle_processes--;
4752 
4753             nxt_debug(task, "app '%V' move port %PI:%d out of %s for quit",
4754                       &app->name, port->pid, port->id,
4755                       (port->idle_start ? "idle_ports" : "spare_ports"));
4756         }
4757 
4758         nxt_port_hash_remove(&app->port_hash, port);
4759         app->port_hash_count--;
4760 
4761         port->app = NULL;
4762         app->processes--;
4763 
4764         break;
4765 
4766     } nxt_queue_loop;
4767 
4768     nxt_thread_mutex_unlock(&app->mutex);
4769 
4770     return port;
4771 }
4772 
4773 
4774 static void
4775 nxt_router_app_use(nxt_task_t *task, nxt_app_t *app, int i)
4776 {
4777     int  c;
4778 
4779     c = nxt_atomic_fetch_add(&app->use_count, i);
4780 
4781     if (i < 0 && c == -i) {
4782 
4783         if (task->thread->engine != app->engine) {
4784             nxt_event_engine_post(app->engine, &app->joint->free_app_work);
4785 
4786         } else {
4787             nxt_router_free_app(task, app->joint, NULL);
4788         }
4789     }
4790 }
4791 
4792 
4793 static void
4794 nxt_router_app_unlink(nxt_task_t *task, nxt_app_t *app)
4795 {
4796     nxt_debug(task, "app '%V' %p unlink", &app->name, app);
4797 
4798     nxt_queue_remove(&app->link);
4799 
4800     nxt_router_app_use(task, app, -1);
4801 }
4802 
4803 
4804 static void
4805 nxt_router_app_port_release(nxt_task_t *task, nxt_app_t *app, nxt_port_t *port,
4806     nxt_apr_action_t action)
4807 {
4808     int         inc_use;
4809     uint32_t    got_response, dec_requests;
4810     nxt_bool_t  adjust_idle_timer;
4811     nxt_port_t  *main_app_port;
4812 
4813     nxt_assert(port != NULL);
4814 
4815     inc_use = 0;
4816     got_response = 0;
4817     dec_requests = 0;
4818 
4819     switch (action) {
4820     case NXT_APR_NEW_PORT:
4821         break;
4822     case NXT_APR_REQUEST_FAILED:
4823         dec_requests = 1;
4824         inc_use = -1;
4825         break;
4826     case NXT_APR_GOT_RESPONSE:
4827         got_response = 1;
4828         inc_use = -1;
4829         break;
4830     case NXT_APR_UPGRADE:
4831         got_response = 1;
4832         break;
4833     case NXT_APR_CLOSE:
4834         inc_use = -1;
4835         break;
4836     }
4837 
4838     nxt_debug(task, "app '%V' release port %PI:%d: %d %d", &app->name,
4839               port->pid, port->id,
4840               (int) inc_use, (int) got_response);
4841 
4842     if (port->id == NXT_SHARED_PORT_ID) {
4843         nxt_thread_mutex_lock(&app->mutex);
4844 
4845         app->active_requests -= got_response + dec_requests;
4846 
4847         nxt_thread_mutex_unlock(&app->mutex);
4848 
4849         goto adjust_use;
4850     }
4851 
4852     main_app_port = port->main_app_port;
4853 
4854     nxt_thread_mutex_lock(&app->mutex);
4855 
4856     main_app_port->active_requests -= got_response + dec_requests;
4857     app->active_requests -= got_response + dec_requests;
4858 
4859     if (main_app_port->pair[1] != -1 && main_app_port->app_link.next == NULL) {
4860         nxt_queue_insert_tail(&app->ports, &main_app_port->app_link);
4861 
4862         nxt_port_inc_use(main_app_port);
4863     }
4864 
4865     adjust_idle_timer = 0;
4866 
4867     if (main_app_port->pair[1] != -1
4868         && main_app_port->active_requests == 0
4869         && main_app_port->active_websockets == 0
4870         && main_app_port->idle_link.next == NULL)
4871     {
4872         if (app->idle_processes == app->spare_processes
4873             && app->adjust_idle_work.data == NULL)
4874         {
4875             adjust_idle_timer = 1;
4876             app->adjust_idle_work.data = app;
4877             app->adjust_idle_work.next = NULL;
4878         }
4879 
4880         if (app->idle_processes < app->spare_processes) {
4881             nxt_queue_insert_tail(&app->spare_ports, &main_app_port->idle_link);
4882 
4883             nxt_debug(task, "app '%V' move port %PI:%d to spare_ports",
4884                       &app->name, main_app_port->pid, main_app_port->id);
4885         } else {
4886             nxt_queue_insert_tail(&app->idle_ports, &main_app_port->idle_link);
4887 
4888             main_app_port->idle_start = task->thread->engine->timers.now;
4889 
4890             nxt_debug(task, "app '%V' move port %PI:%d to idle_ports",
4891                       &app->name, main_app_port->pid, main_app_port->id);
4892         }
4893 
4894         app->idle_processes++;
4895     }
4896 
4897     nxt_thread_mutex_unlock(&app->mutex);
4898 
4899     if (adjust_idle_timer) {
4900         nxt_router_app_use(task, app, 1);
4901         nxt_event_engine_post(app->engine, &app->adjust_idle_work);
4902     }
4903 
4904     /* ? */
4905     if (main_app_port->pair[1] == -1) {
4906         nxt_debug(task, "app '%V' %p port %p already closed (pid %PI dead?)",
4907                   &app->name, app, main_app_port, main_app_port->pid);
4908 
4909         goto adjust_use;
4910     }
4911 
4912     nxt_debug(task, "app '%V' %p requests queue is empty, keep the port",
4913               &app->name, app);
4914 
4915 adjust_use:
4916 
4917     nxt_port_use(task, port, inc_use);
4918 }
4919 
4920 
4921 void
4922 nxt_router_app_port_close(nxt_task_t *task, nxt_port_t *port)
4923 {
4924     nxt_app_t         *app;
4925     nxt_bool_t        unchain, start_process;
4926     nxt_port_t        *idle_port;
4927     nxt_queue_link_t  *idle_lnk;
4928 
4929     app = port->app;
4930 
4931     nxt_assert(app != NULL);
4932 
4933     nxt_thread_mutex_lock(&app->mutex);
4934 
4935     if (port == app->proto_port) {
4936         app->proto_port = NULL;
4937         port->app = NULL;
4938 
4939         nxt_thread_mutex_unlock(&app->mutex);
4940 
4941         nxt_debug(task, "app '%V' prototype pid %PI closed", &app->name,
4942                   port->pid);
4943 
4944         nxt_port_use(task, port, -1);
4945 
4946         return;
4947     }
4948 
4949     nxt_port_hash_remove(&app->port_hash, port);
4950     app->port_hash_count--;
4951 
4952     if (port->id != 0) {
4953         nxt_thread_mutex_unlock(&app->mutex);
4954 
4955         nxt_debug(task, "app '%V' port (%PI, %d) closed", &app->name,
4956                   port->pid, port->id);
4957 
4958         return;
4959     }
4960 
4961     unchain = nxt_queue_chk_remove(&port->app_link);
4962 
4963     if (nxt_queue_chk_remove(&port->idle_link)) {
4964         app->idle_processes--;
4965 
4966         nxt_debug(task, "app '%V' move port %PI:%d out of %s before close",
4967                   &app->name, port->pid, port->id,
4968                   (port->idle_start ? "idle_ports" : "spare_ports"));
4969 
4970         if (port->idle_start == 0
4971             && app->idle_processes >= app->spare_processes)
4972         {
4973             nxt_assert(!nxt_queue_is_empty(&app->idle_ports));
4974 
4975             idle_lnk = nxt_queue_last(&app->idle_ports);
4976             idle_port = nxt_queue_link_data(idle_lnk, nxt_port_t, idle_link);
4977             nxt_queue_remove(idle_lnk);
4978 
4979             nxt_queue_insert_tail(&app->spare_ports, idle_lnk);
4980 
4981             idle_port->idle_start = 0;
4982 
4983             nxt_debug(task, "app '%V' move port %PI:%d from idle_ports "
4984                       "to spare_ports",
4985                       &app->name, idle_port->pid, idle_port->id);
4986         }
4987     }
4988 
4989     app->processes--;
4990 
4991     start_process = !task->thread->engine->shutdown
4992                     && nxt_router_app_can_start(app)
4993                     && nxt_router_app_need_start(app);
4994 
4995     if (start_process) {
4996         app->pending_processes++;
4997     }
4998 
4999     nxt_thread_mutex_unlock(&app->mutex);
5000 
5001     nxt_debug(task, "app '%V' pid %PI closed", &app->name, port->pid);
5002 
5003     if (unchain) {
5004         nxt_port_use(task, port, -1);
5005     }
5006 
5007     if (start_process) {
5008         nxt_router_start_app_process(task, app);
5009     }
5010 }
5011 
5012 
5013 static void
5014 nxt_router_adjust_idle_timer(nxt_task_t *task, void *obj, void *data)
5015 {
5016     nxt_app_t           *app;
5017     nxt_bool_t          queued;
5018     nxt_port_t          *port;
5019     nxt_msec_t          timeout, threshold;
5020     nxt_queue_link_t    *lnk;
5021     nxt_event_engine_t  *engine;
5022 
5023     app = obj;
5024     queued = (data == app);
5025 
5026     nxt_debug(task, "nxt_router_adjust_idle_timer: app \"%V\", queued %b",
5027               &app->name, queued);
5028 
5029     engine = task->thread->engine;
5030 
5031     nxt_assert(app->engine == engine);
5032 
5033     threshold = engine->timers.now + app->joint->idle_timer.bias;
5034     timeout = 0;
5035 
5036     nxt_thread_mutex_lock(&app->mutex);
5037 
5038     if (queued) {
5039         app->adjust_idle_work.data = NULL;
5040     }
5041 
5042     nxt_debug(task, "app '%V' idle_processes %d, spare_processes %d",
5043               &app->name,
5044               (int) app->idle_processes, (int) app->spare_processes);
5045 
5046     while (app->idle_processes > app->spare_processes) {
5047 
5048         nxt_assert(!nxt_queue_is_empty(&app->idle_ports));
5049 
5050         lnk = nxt_queue_first(&app->idle_ports);
5051         port = nxt_queue_link_data(lnk, nxt_port_t, idle_link);
5052 
5053         timeout = port->idle_start + app->idle_timeout;
5054 
5055         nxt_debug(task, "app '%V' pid %PI, start %M, timeout %M, threshold %M",
5056                   &app->name, port->pid,
5057                   port->idle_start, timeout, threshold);
5058 
5059         if (timeout > threshold) {
5060             break;
5061         }
5062 
5063         nxt_queue_remove(lnk);
5064         lnk->next = NULL;
5065 
5066         nxt_debug(task, "app '%V' move port %PI:%d out of idle_ports (timeout)",
5067                   &app->name, port->pid, port->id);
5068 
5069         nxt_queue_chk_remove(&port->app_link);
5070 
5071         nxt_port_hash_remove(&app->port_hash, port);
5072         app->port_hash_count--;
5073 
5074         app->idle_processes--;
5075         app->processes--;
5076         port->app = NULL;
5077 
5078         nxt_thread_mutex_unlock(&app->mutex);
5079 
5080         nxt_debug(task, "app '%V' send QUIT to idle port %PI",
5081                   &app->name, port->pid);
5082 
5083         nxt_port_socket_write(task, port, NXT_PORT_MSG_QUIT, -1, 0, 0, NULL);
5084 
5085         nxt_port_use(task, port, -1);
5086 
5087         nxt_thread_mutex_lock(&app->mutex);
5088     }
5089 
5090     nxt_thread_mutex_unlock(&app->mutex);
5091 
5092     if (timeout > threshold) {
5093         nxt_timer_add(engine, &app->joint->idle_timer, timeout - threshold);
5094 
5095     } else {
5096         nxt_timer_disable(engine, &app->joint->idle_timer);
5097     }
5098 
5099     if (queued) {
5100         nxt_router_app_use(task, app, -1);
5101     }
5102 }
5103 
5104 
5105 static void
5106 nxt_router_app_idle_timeout(nxt_task_t *task, void *obj, void *data)
5107 {
5108     nxt_timer_t      *timer;
5109     nxt_app_joint_t  *app_joint;
5110 
5111     timer = obj;
5112     app_joint = nxt_container_of(timer, nxt_app_joint_t, idle_timer);
5113 
5114     if (nxt_fast_path(app_joint->app != NULL)) {
5115         nxt_router_adjust_idle_timer(task, app_joint->app, NULL);
5116     }
5117 }
5118 
5119 
5120 static void
5121 nxt_router_app_joint_release_handler(nxt_task_t *task, void *obj, void *data)
5122 {
5123     nxt_timer_t      *timer;
5124     nxt_app_joint_t  *app_joint;
5125 
5126     timer = obj;
5127     app_joint = nxt_container_of(timer, nxt_app_joint_t, idle_timer);
5128 
5129     nxt_router_app_joint_use(task, app_joint, -1);
5130 }
5131 
5132 
5133 static void
5134 nxt_router_free_app(nxt_task_t *task, void *obj, void *data)
5135 {
5136     nxt_app_t        *app;
5137     nxt_port_t       *port, *proto_port;
5138     nxt_app_joint_t  *app_joint;
5139 
5140     app_joint = obj;
5141     app = app_joint->app;
5142 
5143     for ( ;; ) {
5144         port = nxt_router_app_get_port_for_quit(task, app);
5145         if (port == NULL) {
5146             break;
5147         }
5148 
5149         nxt_port_use(task, port, -1);
5150     }
5151 
5152     nxt_thread_mutex_lock(&app->mutex);
5153 
5154     for ( ;; ) {
5155         port = nxt_port_hash_retrieve(&app->port_hash);
5156         if (port == NULL) {
5157             break;
5158         }
5159 
5160         app->port_hash_count--;
5161 
5162         port->app = NULL;
5163 
5164         nxt_port_close(task, port);
5165 
5166         nxt_port_use(task, port, -1);
5167     }
5168 
5169     proto_port = app->proto_port;
5170 
5171     if (proto_port != NULL) {
5172         nxt_debug(task, "send QUIT to prototype '%V' pid %PI", &app->name,
5173                   proto_port->pid);
5174 
5175         app->proto_port = NULL;
5176         proto_port->app = NULL;
5177     }
5178 
5179     nxt_thread_mutex_unlock(&app->mutex);
5180 
5181     if (proto_port != NULL) {
5182         nxt_port_socket_write(task, proto_port, NXT_PORT_MSG_QUIT,
5183                               -1, 0, 0, NULL);
5184 
5185         nxt_port_close(task, proto_port);
5186 
5187         nxt_port_use(task, proto_port, -1);
5188     }
5189 
5190     nxt_assert(app->proto_port == NULL);
5191     nxt_assert(app->processes == 0);
5192     nxt_assert(app->active_requests == 0);
5193     nxt_assert(app->port_hash_count == 0);
5194     nxt_assert(app->idle_processes == 0);
5195     nxt_assert(nxt_queue_is_empty(&app->ports));
5196     nxt_assert(nxt_queue_is_empty(&app->spare_ports));
5197     nxt_assert(nxt_queue_is_empty(&app->idle_ports));
5198 
5199     nxt_port_mmaps_destroy(&app->outgoing, 1);
5200 
5201     nxt_thread_mutex_destroy(&app->outgoing.mutex);
5202 
5203     if (app->shared_port != NULL) {
5204         app->shared_port->app = NULL;
5205         nxt_port_close(task, app->shared_port);
5206         nxt_port_use(task, app->shared_port, -1);
5207 
5208         app->shared_port = NULL;
5209     }
5210 
5211     nxt_thread_mutex_destroy(&app->mutex);
5212     nxt_mp_destroy(app->mem_pool);
5213 
5214     app_joint->app = NULL;
5215 
5216     if (nxt_timer_delete(task->thread->engine, &app_joint->idle_timer)) {
5217         app_joint->idle_timer.handler = nxt_router_app_joint_release_handler;
5218         nxt_timer_add(task->thread->engine, &app_joint->idle_timer, 0);
5219 
5220     } else {
5221         nxt_router_app_joint_use(task, app_joint, -1);
5222     }
5223 }
5224 
5225 
5226 static void
5227 nxt_router_app_port_get(nxt_task_t *task, nxt_app_t *app,
5228     nxt_request_rpc_data_t *req_rpc_data)
5229 {
5230     nxt_bool_t          start_process;
5231     nxt_port_t          *port;
5232     nxt_http_request_t  *r;
5233 
5234     start_process = 0;
5235 
5236     nxt_thread_mutex_lock(&app->mutex);
5237 
5238     port = app->shared_port;
5239     nxt_port_inc_use(port);
5240 
5241     app->active_requests++;
5242 
5243     if (nxt_router_app_can_start(app) && nxt_router_app_need_start(app)) {
5244         app->pending_processes++;
5245         start_process = 1;
5246     }
5247 
5248     r = req_rpc_data->request;
5249 
5250     /*
5251      * Put request into application-wide list to be able to cancel request
5252      * if something goes wrong with application processes.
5253      */
5254     nxt_queue_insert_tail(&app->ack_waiting_req, &r->app_link);
5255 
5256     nxt_thread_mutex_unlock(&app->mutex);
5257 
5258     /*
5259      * Retain request memory pool while request is linked in ack_waiting_req
5260      * to guarantee request structure memory is accessble.
5261      */
5262     nxt_mp_retain(r->mem_pool);
5263 
5264     req_rpc_data->app_port = port;
5265     req_rpc_data->apr_action = NXT_APR_REQUEST_FAILED;
5266 
5267     if (start_process) {
5268         nxt_router_start_app_process(task, app);
5269     }
5270 }
5271 
5272 
5273 void
5274 nxt_router_process_http_request(nxt_task_t *task, nxt_http_request_t *r,
5275     nxt_http_action_t *action)
5276 {
5277     nxt_event_engine_t      *engine;
5278     nxt_http_app_conf_t     *conf;
5279     nxt_request_rpc_data_t  *req_rpc_data;
5280 
5281     conf = action->u.conf;
5282     engine = task->thread->engine;
5283 
5284     r->app_target = conf->target;
5285 
5286     req_rpc_data = nxt_port_rpc_register_handler_ex(task, engine->port,
5287                                           nxt_router_response_ready_handler,
5288                                           nxt_router_response_error_handler,
5289                                           sizeof(nxt_request_rpc_data_t));
5290     if (nxt_slow_path(req_rpc_data == NULL)) {
5291         nxt_http_request_error(task, r, NXT_HTTP_INTERNAL_SERVER_ERROR);
5292         return;
5293     }
5294 
5295     /*
5296      * At this point we have request req_rpc_data allocated and registered
5297      * in port handlers.  Need to fixup request memory pool.  Counterpart
5298      * release will be called via following call chain:
5299      *    nxt_request_rpc_data_unlink() ->
5300      *        nxt_router_http_request_release_post() ->
5301      *            nxt_router_http_request_release()
5302      */
5303     nxt_mp_retain(r->mem_pool);
5304 
5305     r->timer.task = &engine->task;
5306     r->timer.work_queue = &engine->fast_work_queue;
5307     r->timer.log = engine->task.log;
5308     r->timer.bias = NXT_TIMER_DEFAULT_BIAS;
5309 
5310     r->engine = engine;
5311     r->err_work.handler = nxt_router_http_request_error;
5312     r->err_work.task = task;
5313     r->err_work.obj = r;
5314 
5315     req_rpc_data->stream = nxt_port_rpc_ex_stream(req_rpc_data);
5316     req_rpc_data->app = conf->app;
5317     req_rpc_data->msg_info.body_fd = -1;
5318     req_rpc_data->rpc_cancel = 1;
5319 
5320     nxt_router_app_use(task, conf->app, 1);
5321 
5322     req_rpc_data->request = r;
5323     r->req_rpc_data = req_rpc_data;
5324 
5325     if (r->last != NULL) {
5326         r->last->completion_handler = nxt_router_http_request_done;
5327     }
5328 
5329     nxt_router_app_port_get(task, conf->app, req_rpc_data);
5330     nxt_router_app_prepare_request(task, req_rpc_data);
5331 }
5332 
5333 
5334 static void
5335 nxt_router_http_request_error(nxt_task_t *task, void *obj, void *data)
5336 {
5337     nxt_http_request_t  *r;
5338 
5339     r = obj;
5340 
5341     nxt_debug(task, "router http request error (rpc_data %p)", r->req_rpc_data);
5342 
5343     nxt_http_request_error(task, r, NXT_HTTP_SERVICE_UNAVAILABLE);
5344 
5345     if (r->req_rpc_data != NULL) {
5346         nxt_request_rpc_data_unlink(task, r->req_rpc_data);
5347     }
5348 
5349     nxt_mp_release(r->mem_pool);
5350 }
5351 
5352 
5353 static void
5354 nxt_router_http_request_done(nxt_task_t *task, void *obj, void *data)
5355 {
5356     nxt_http_request_t  *r;
5357 
5358     r = data;
5359 
5360     nxt_debug(task, "router http request done (rpc_data %p)", r->req_rpc_data);
5361 
5362     if (r->req_rpc_data != NULL) {
5363         nxt_request_rpc_data_unlink(task, r->req_rpc_data);
5364     }
5365 
5366     nxt_http_request_close_handler(task, r, r->proto.any);
5367 }
5368 
5369 
5370 static void
5371 nxt_router_app_prepare_request(nxt_task_t *task,
5372     nxt_request_rpc_data_t *req_rpc_data)
5373 {
5374     nxt_app_t         *app;
5375     nxt_buf_t         *buf, *body;
5376     nxt_int_t         res;
5377     nxt_port_t        *port, *reply_port;
5378 
5379     int                   notify;
5380     struct {
5381         nxt_port_msg_t       pm;
5382         nxt_port_mmap_msg_t  mm;
5383     } msg;
5384 
5385 
5386     app = req_rpc_data->app;
5387 
5388     nxt_assert(app != NULL);
5389 
5390     port = req_rpc_data->app_port;
5391 
5392     nxt_assert(port != NULL);
5393     nxt_assert(port->queue != NULL);
5394 
5395     reply_port = task->thread->engine->port;
5396 
5397     buf = nxt_router_prepare_msg(task, req_rpc_data->request, app,
5398                                  nxt_app_msg_prefix[app->type]);
5399     if (nxt_slow_path(buf == NULL)) {
5400         nxt_alert(task, "stream #%uD, app '%V': failed to prepare app message",
5401                   req_rpc_data->stream, &app->name);
5402 
5403         nxt_http_request_error(task, req_rpc_data->request,
5404                                NXT_HTTP_INTERNAL_SERVER_ERROR);
5405 
5406         return;
5407     }
5408 
5409     nxt_debug(task, "about to send %O bytes buffer to app process port %d",
5410                     nxt_buf_used_size(buf),
5411                     port->socket.fd);
5412 
5413     req_rpc_data->msg_info.buf = buf;
5414 
5415     body = req_rpc_data->request->body;
5416 
5417     if (body != NULL && nxt_buf_is_file(body)) {
5418         req_rpc_data->msg_info.body_fd = body->file->fd;
5419 
5420         body->file->fd = -1;
5421 
5422     } else {
5423         req_rpc_data->msg_info.body_fd = -1;
5424     }
5425 
5426     msg.pm.stream = req_rpc_data->stream;
5427     msg.pm.pid = reply_port->pid;
5428     msg.pm.reply_port = reply_port->id;
5429     msg.pm.type = NXT_PORT_MSG_REQ_HEADERS;
5430     msg.pm.last = 0;
5431     msg.pm.mmap = 1;
5432     msg.pm.nf = 0;
5433     msg.pm.mf = 0;
5434 
5435     nxt_port_mmap_handler_t *mmap_handler = buf->parent;
5436     nxt_port_mmap_header_t *hdr = mmap_handler->hdr;
5437 
5438     msg.mm.mmap_id = hdr->id;
5439     msg.mm.chunk_id = nxt_port_mmap_chunk_id(hdr, buf->mem.pos);
5440     msg.mm.size = nxt_buf_used_size(buf);
5441 
5442     res = nxt_app_queue_send(port->queue, &msg, sizeof(msg),
5443                              req_rpc_data->stream, &notify,
5444                              &req_rpc_data->msg_info.tracking_cookie);
5445     if (nxt_fast_path(res == NXT_OK)) {
5446         if (notify != 0) {
5447             (void) nxt_port_socket_write(task, port,
5448                                          NXT_PORT_MSG_READ_QUEUE,
5449                                          -1, req_rpc_data->stream,
5450                                          reply_port->id, NULL);
5451 
5452         } else {
5453             nxt_debug(task, "queue is not empty");
5454         }
5455 
5456         buf->is_port_mmap_sent = 1;
5457         buf->mem.pos = buf->mem.free;
5458 
5459     } else {
5460         nxt_alert(task, "stream #%uD, app '%V': failed to send app message",
5461                   req_rpc_data->stream, &app->name);
5462 
5463         nxt_http_request_error(task, req_rpc_data->request,
5464                                NXT_HTTP_INTERNAL_SERVER_ERROR);
5465     }
5466 }
5467 
5468 
5469 struct nxt_fields_iter_s {
5470     nxt_list_part_t   *part;
5471     nxt_http_field_t  *field;
5472 };
5473 
5474 typedef struct nxt_fields_iter_s  nxt_fields_iter_t;
5475 
5476 
5477 static nxt_http_field_t *
5478 nxt_fields_part_first(nxt_list_part_t *part, nxt_fields_iter_t *i)
5479 {
5480     if (part == NULL) {
5481         return NULL;
5482     }
5483 
5484     while (part->nelts == 0) {
5485         part = part->next;
5486         if (part == NULL) {
5487             return NULL;
5488         }
5489     }
5490 
5491     i->part = part;
5492     i->field = nxt_list_data(i->part);
5493 
5494     return i->field;
5495 }
5496 
5497 
5498 static nxt_http_field_t *
5499 nxt_fields_first(nxt_list_t *fields, nxt_fields_iter_t *i)
5500 {
5501     return nxt_fields_part_first(nxt_list_part(fields), i);
5502 }
5503 
5504 
5505 static nxt_http_field_t *
5506 nxt_fields_next(nxt_fields_iter_t *i)
5507 {
5508     nxt_http_field_t  *end = nxt_list_data(i->part);
5509 
5510     end += i->part->nelts;
5511     i->field++;
5512 
5513     if (i->field < end) {
5514         return i->field;
5515     }
5516 
5517     return nxt_fields_part_first(i->part->next, i);
5518 }
5519 
5520 
5521 static nxt_buf_t *
5522 nxt_router_prepare_msg(nxt_task_t *task, nxt_http_request_t *r,
5523     nxt_app_t *app, const nxt_str_t *prefix)
5524 {
5525     void                *target_pos, *query_pos;
5526     u_char              *pos, *end, *p, c;
5527     size_t              fields_count, req_size, size, free_size;
5528     size_t              copy_size;
5529     nxt_off_t           content_length;
5530     nxt_buf_t           *b, *buf, *out, **tail;
5531     nxt_http_field_t    *field, *dup;
5532     nxt_unit_field_t    *dst_field;
5533     nxt_fields_iter_t   iter, dup_iter;
5534     nxt_unit_request_t  *req;
5535 
5536     req_size = sizeof(nxt_unit_request_t)
5537                + r->method->length + 1
5538                + r->version.length + 1
5539                + r->remote->length + 1
5540                + r->local->length + 1
5541                + r->server_name.length + 1
5542                + r->target.length + 1
5543                + (r->path->start != r->target.start ? r->path->length + 1 : 0);
5544 
5545     content_length = r->content_length_n < 0 ? 0 : r->content_length_n;
5546     fields_count = 0;
5547 
5548     nxt_list_each(field, r->fields) {
5549         fields_count++;
5550 
5551         req_size += field->name_length + prefix->length + 1
5552                     + field->value_length + 1;
5553     } nxt_list_loop;
5554 
5555     req_size += fields_count * sizeof(nxt_unit_field_t);
5556 
5557     if (nxt_slow_path(req_size > PORT_MMAP_DATA_SIZE)) {
5558         nxt_alert(task, "headers to big to fit in shared memory (%d)",
5559                   (int) req_size);
5560 
5561         return NULL;
5562     }
5563 
5564     out = nxt_port_mmap_get_buf(task, &app->outgoing,
5565               nxt_min(req_size + content_length, PORT_MMAP_DATA_SIZE));
5566     if (nxt_slow_path(out == NULL)) {
5567         return NULL;
5568     }
5569 
5570     req = (nxt_unit_request_t *) out->mem.free;
5571     out->mem.free += req_size;
5572 
5573     req->app_target = r->app_target;
5574 
5575     req->content_length = content_length;
5576 
5577     p = (u_char *) (req->fields + fields_count);
5578 
5579     nxt_debug(task, "fields_count=%d", (int) fields_count);
5580 
5581     req->method_length = r->method->length;
5582     nxt_unit_sptr_set(&req->method, p);
5583     p = nxt_cpymem(p, r->method->start, r->method->length);
5584     *p++ = '\0';
5585 
5586     req->version_length = r->version.length;
5587     nxt_unit_sptr_set(&req->version, p);
5588     p = nxt_cpymem(p, r->version.start, r->version.length);
5589     *p++ = '\0';
5590 
5591     req->remote_length = r->remote->address_length;
5592     nxt_unit_sptr_set(&req->remote, p);
5593     p = nxt_cpymem(p, nxt_sockaddr_address(r->remote),
5594                    r->remote->address_length);
5595     *p++ = '\0';
5596 
5597     req->local_length = r->local->address_length;
5598     nxt_unit_sptr_set(&req->local, p);
5599     p = nxt_cpymem(p, nxt_sockaddr_address(r->local), r->local->address_length);
5600     *p++ = '\0';
5601 
5602     req->tls = r->tls;
5603     req->websocket_handshake = r->websocket_handshake;
5604 
5605     req->server_name_length = r->server_name.length;
5606     nxt_unit_sptr_set(&req->server_name, p);
5607     p = nxt_cpymem(p, r->server_name.start, r->server_name.length);
5608     *p++ = '\0';
5609 
5610     target_pos = p;
5611     req->target_length = (uint32_t) r->target.length;
5612     nxt_unit_sptr_set(&req->target, p);
5613     p = nxt_cpymem(p, r->target.start, r->target.length);
5614     *p++ = '\0';
5615 
5616     req->path_length = (uint32_t) r->path->length;
5617     if (r->path->start == r->target.start) {
5618         nxt_unit_sptr_set(&req->path, target_pos);
5619 
5620     } else {
5621         nxt_unit_sptr_set(&req->path, p);
5622         p = nxt_cpymem(p, r->path->start, r->path->length);
5623         *p++ = '\0';
5624     }
5625 
5626     req->query_length = (uint32_t) r->args->length;
5627     if (r->args->start != NULL) {
5628         query_pos = nxt_pointer_to(target_pos,
5629                                    r->args->start - r->target.start);
5630 
5631         nxt_unit_sptr_set(&req->query, query_pos);
5632 
5633     } else {
5634         req->query.offset = 0;
5635     }
5636 
5637     req->content_length_field = NXT_UNIT_NONE_FIELD;
5638     req->content_type_field   = NXT_UNIT_NONE_FIELD;
5639     req->cookie_field         = NXT_UNIT_NONE_FIELD;
5640     req->authorization_field  = NXT_UNIT_NONE_FIELD;
5641 
5642     dst_field = req->fields;
5643 
5644     for (field = nxt_fields_first(r->fields, &iter);
5645          field != NULL;
5646          field = nxt_fields_next(&iter))
5647     {
5648         if (field->skip) {
5649             continue;
5650         }
5651 
5652         dst_field->hash = field->hash;
5653         dst_field->skip = 0;
5654         dst_field->name_length = field->name_length + prefix->length;
5655         dst_field->value_length = field->value_length;
5656 
5657         if (field == r->content_length) {
5658             req->content_length_field = dst_field - req->fields;
5659 
5660         } else if (field == r->content_type) {
5661             req->content_type_field = dst_field - req->fields;
5662 
5663         } else if (field == r->cookie) {
5664             req->cookie_field = dst_field - req->fields;
5665 
5666         } else if (field == r->authorization) {
5667             req->authorization_field = dst_field - req->fields;
5668         }
5669 
5670         nxt_debug(task, "add field 0x%04Xd, %d, %d, %p : %d %p",
5671                   (int) field->hash, (int) field->skip,
5672                   (int) field->name_length, field->name,
5673                   (int) field->value_length, field->value);
5674 
5675         if (prefix->length != 0) {
5676             nxt_unit_sptr_set(&dst_field->name, p);
5677             p = nxt_cpymem(p, prefix->start, prefix->length);
5678 
5679             end = field->name + field->name_length;
5680             for (pos = field->name; pos < end; pos++) {
5681                 c = *pos;
5682 
5683                 if (c >= 'a' && c <= 'z') {
5684                     *p++ = (c & ~0x20);
5685                     continue;
5686                 }
5687 
5688                 if (c == '-') {
5689                     *p++ = '_';
5690                     continue;
5691                 }
5692 
5693                 *p++ = c;
5694             }
5695 
5696         } else {
5697             nxt_unit_sptr_set(&dst_field->name, p);
5698             p = nxt_cpymem(p, field->name, field->name_length);
5699         }
5700 
5701         *p++ = '\0';
5702 
5703         nxt_unit_sptr_set(&dst_field->value, p);
5704         p = nxt_cpymem(p, field->value, field->value_length);
5705 
5706         if (prefix->length != 0) {
5707             dup_iter = iter;
5708 
5709             for (dup = nxt_fields_next(&dup_iter);
5710                  dup != NULL;
5711                  dup = nxt_fields_next(&dup_iter))
5712             {
5713                 if (dup->name_length != field->name_length
5714                     || dup->skip
5715                     || dup->hash != field->hash
5716                     || nxt_memcasecmp(dup->name, field->name, dup->name_length))
5717                 {
5718                     continue;
5719                 }
5720 
5721                 p = nxt_cpymem(p, ", ", 2);
5722                 p = nxt_cpymem(p, dup->value, dup->value_length);
5723 
5724                 dst_field->value_length += 2 + dup->value_length;
5725 
5726                 dup->skip = 1;
5727             }
5728         }
5729 
5730         *p++ = '\0';
5731 
5732         dst_field++;
5733     }
5734 
5735     req->fields_count = (uint32_t) (dst_field - req->fields);
5736 
5737     nxt_unit_sptr_set(&req->preread_content, out->mem.free);
5738 
5739     buf = out;
5740     tail = &buf->next;
5741 
5742     for (b = r->body; b != NULL; b = b->next) {
5743         size = nxt_buf_mem_used_size(&b->mem);
5744         pos = b->mem.pos;
5745 
5746         while (size > 0) {
5747             if (buf == NULL) {
5748                 free_size = nxt_min(size, PORT_MMAP_DATA_SIZE);
5749 
5750                 buf = nxt_port_mmap_get_buf(task, &app->outgoing, free_size);
5751                 if (nxt_slow_path(buf == NULL)) {
5752                     while (out != NULL) {
5753                         buf = out->next;
5754                         out->next = NULL;
5755                         out->completion_handler(task, out, out->parent);
5756                         out = buf;
5757                     }
5758                     return NULL;
5759                 }
5760 
5761                 *tail = buf;
5762                 tail = &buf->next;
5763 
5764             } else {
5765                 free_size = nxt_buf_mem_free_size(&buf->mem);
5766                 if (free_size < size
5767                     && nxt_port_mmap_increase_buf(task, buf, size, 1)
5768                        == NXT_OK)
5769                 {
5770                     free_size = nxt_buf_mem_free_size(&buf->mem);
5771                 }
5772             }
5773 
5774             if (free_size > 0) {
5775                 copy_size = nxt_min(free_size, size);
5776 
5777                 buf->mem.free = nxt_cpymem(buf->mem.free, pos, copy_size);
5778 
5779                 size -= copy_size;
5780                 pos += copy_size;
5781 
5782                 if (size == 0) {
5783                     break;
5784                 }
5785             }
5786 
5787             buf = NULL;
5788         }
5789     }
5790 
5791     return out;
5792 }
5793 
5794 
5795 static void
5796 nxt_router_app_timeout(nxt_task_t *task, void *obj, void *data)
5797 {
5798     nxt_timer_t              *timer;
5799     nxt_http_request_t       *r;
5800     nxt_request_rpc_data_t   *req_rpc_data;
5801 
5802     timer = obj;
5803 
5804     nxt_debug(task, "router app timeout");
5805 
5806     r = nxt_timer_data(timer, nxt_http_request_t, timer);
5807     req_rpc_data = r->timer_data;
5808 
5809     nxt_http_request_error(task, r, NXT_HTTP_SERVICE_UNAVAILABLE);
5810 
5811     nxt_request_rpc_data_unlink(task, req_rpc_data);
5812 }
5813 
5814 
5815 static void
5816 nxt_router_http_request_release_post(nxt_task_t *task, nxt_http_request_t *r)
5817 {
5818     r->timer.handler = nxt_router_http_request_release;
5819     nxt_timer_add(task->thread->engine, &r->timer, 0);
5820 }
5821 
5822 
5823 static void
5824 nxt_router_http_request_release(nxt_task_t *task, void *obj, void *data)
5825 {
5826     nxt_http_request_t  *r;
5827 
5828     nxt_debug(task, "http request pool release");
5829 
5830     r = nxt_timer_data(obj, nxt_http_request_t, timer);
5831 
5832     nxt_mp_release(r->mem_pool);
5833 }
5834 
5835 
5836 static void
5837 nxt_router_oosm_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
5838 {
5839     size_t                   mi;
5840     uint32_t                 i;
5841     nxt_bool_t               ack;
5842     nxt_process_t            *process;
5843     nxt_free_map_t           *m;
5844     nxt_port_mmap_handler_t  *mmap_handler;
5845 
5846     nxt_debug(task, "oosm in %PI", msg->port_msg.pid);
5847 
5848     process = nxt_runtime_process_find(task->thread->runtime,
5849                                        msg->port_msg.pid);
5850     if (nxt_slow_path(process == NULL)) {
5851         return;
5852     }
5853 
5854     ack = 0;
5855 
5856     /*
5857      * To mitigate possible racing condition (when OOSM message received
5858      * after some of the memory was already freed), need to try to find
5859      * first free segment in shared memory and send ACK if found.
5860      */
5861 
5862     nxt_thread_mutex_lock(&process->incoming.mutex);
5863 
5864     for (i = 0; i < process->incoming.size; i++) {
5865         mmap_handler = process->incoming.elts[i].mmap_handler;
5866 
5867         if (nxt_slow_path(mmap_handler == NULL)) {
5868             continue;
5869         }
5870 
5871         m = mmap_handler->hdr->free_map;
5872 
5873         for (mi = 0; mi < MAX_FREE_IDX; mi++) {
5874             if (m[mi] != 0) {
5875                 ack = 1;
5876 
5877                 nxt_debug(task, "oosm: already free #%uD %uz = 0x%08xA",
5878                           i, mi, m[mi]);
5879 
5880                 break;
5881             }
5882         }
5883     }
5884 
5885     nxt_thread_mutex_unlock(&process->incoming.mutex);
5886 
5887     if (ack) {
5888         nxt_process_broadcast_shm_ack(task, process);
5889     }
5890 }
5891 
5892 
5893 static void
5894 nxt_router_get_mmap_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
5895 {
5896     nxt_fd_t                 fd;
5897     nxt_port_t               *port;
5898     nxt_runtime_t            *rt;
5899     nxt_port_mmaps_t         *mmaps;
5900     nxt_port_msg_get_mmap_t  *get_mmap_msg;
5901     nxt_port_mmap_handler_t  *mmap_handler;
5902 
5903     rt = task->thread->runtime;
5904 
5905     port = nxt_runtime_port_find(rt, msg->port_msg.pid,
5906                                  msg->port_msg.reply_port);
5907     if (nxt_slow_path(port == NULL)) {
5908         nxt_alert(task, "get_mmap_handler: reply_port %PI:%d not found",
5909                   msg->port_msg.pid, msg->port_msg.reply_port);
5910 
5911         return;
5912     }
5913 
5914     if (nxt_slow_path(nxt_buf_used_size(msg->buf)
5915                       < (int) sizeof(nxt_port_msg_get_mmap_t)))
5916     {
5917         nxt_alert(task, "get_mmap_handler: message buffer too small (%d)",
5918                   (int) nxt_buf_used_size(msg->buf));
5919 
5920         return;
5921     }
5922 
5923     get_mmap_msg = (nxt_port_msg_get_mmap_t *) msg->buf->mem.pos;
5924 
5925     nxt_assert(port->type == NXT_PROCESS_APP);
5926 
5927     if (nxt_slow_path(port->app == NULL)) {
5928         nxt_alert(task, "get_mmap_handler: app == NULL for reply port %PI:%d",
5929                   port->pid, port->id);
5930 
5931         // FIXME
5932         nxt_port_socket_write(task, port, NXT_PORT_MSG_RPC_ERROR,
5933                               -1, msg->port_msg.stream, 0, NULL);
5934 
5935         return;
5936     }
5937 
5938     mmaps = &port->app->outgoing;
5939     nxt_thread_mutex_lock(&mmaps->mutex);
5940 
5941     if (nxt_slow_path(get_mmap_msg->id >= mmaps->size)) {
5942         nxt_thread_mutex_unlock(&mmaps->mutex);
5943 
5944         nxt_alert(task, "get_mmap_handler: mmap id is too big (%d)",
5945                   (int) get_mmap_msg->id);
5946 
5947         // FIXME
5948         nxt_port_socket_write(task, port, NXT_PORT_MSG_RPC_ERROR,
5949                               -1, msg->port_msg.stream, 0, NULL);
5950         return;
5951     }
5952 
5953     mmap_handler = mmaps->elts[get_mmap_msg->id].mmap_handler;
5954 
5955     fd = mmap_handler->fd;
5956 
5957     nxt_thread_mutex_unlock(&mmaps->mutex);
5958 
5959     nxt_debug(task, "get mmap %PI:%d found",
5960               msg->port_msg.pid, (int) get_mmap_msg->id);
5961 
5962     (void) nxt_port_socket_write(task, port, NXT_PORT_MSG_MMAP, fd, 0, 0, NULL);
5963 }
5964 
5965 
5966 static void
5967 nxt_router_get_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
5968 {
5969     nxt_port_t               *port, *reply_port;
5970     nxt_runtime_t            *rt;
5971     nxt_port_msg_get_port_t  *get_port_msg;
5972 
5973     rt = task->thread->runtime;
5974 
5975     reply_port = nxt_runtime_port_find(rt, msg->port_msg.pid,
5976                                        msg->port_msg.reply_port);
5977     if (nxt_slow_path(reply_port == NULL)) {
5978         nxt_alert(task, "get_port_handler: reply_port %PI:%d not found",
5979                   msg->port_msg.pid, msg->port_msg.reply_port);
5980 
5981         return;
5982     }
5983 
5984     if (nxt_slow_path(nxt_buf_used_size(msg->buf)
5985                       < (int) sizeof(nxt_port_msg_get_port_t)))
5986     {
5987         nxt_alert(task, "get_port_handler: message buffer too small (%d)",
5988                   (int) nxt_buf_used_size(msg->buf));
5989 
5990         return;
5991     }
5992 
5993     get_port_msg = (nxt_port_msg_get_port_t *) msg->buf->mem.pos;
5994 
5995     port = nxt_runtime_port_find(rt, get_port_msg->pid, get_port_msg->id);
5996     if (nxt_slow_path(port == NULL)) {
5997         nxt_alert(task, "get_port_handler: port %PI:%d not found",
5998                   get_port_msg->pid, get_port_msg->id);
5999 
6000         return;
6001     }
6002 
6003     nxt_debug(task, "get port %PI:%d found", get_port_msg->pid,
6004               get_port_msg->id);
6005 
6006     (void) nxt_port_send_port(task, reply_port, port, msg->port_msg.stream);
6007 }
6008