xref: /unit/src/nxt_router.c (revision 2017:c1617684637c)
1 
2 /*
3  * Copyright (C) Igor Sysoev
4  * Copyright (C) Valentin V. Bartenev
5  * Copyright (C) NGINX, Inc.
6  */
7 
8 #include <nxt_router.h>
9 #include <nxt_conf.h>
10 #if (NXT_TLS)
11 #include <nxt_cert.h>
12 #endif
13 #include <nxt_http.h>
14 #include <nxt_port_memory_int.h>
15 #include <nxt_unit_request.h>
16 #include <nxt_unit_response.h>
17 #include <nxt_router_request.h>
18 #include <nxt_app_queue.h>
19 #include <nxt_port_queue.h>
20 
21 #define NXT_SHARED_PORT_ID  0xFFFFu
22 
23 typedef struct {
24     nxt_str_t         type;
25     uint32_t          processes;
26     uint32_t          max_processes;
27     uint32_t          spare_processes;
28     nxt_msec_t        timeout;
29     nxt_msec_t        idle_timeout;
30     nxt_conf_value_t  *limits_value;
31     nxt_conf_value_t  *processes_value;
32     nxt_conf_value_t  *targets_value;
33 } nxt_router_app_conf_t;
34 
35 
36 typedef struct {
37     nxt_str_t         pass;
38     nxt_str_t         application;
39 } nxt_router_listener_conf_t;
40 
41 
42 #if (NXT_TLS)
43 
44 typedef struct {
45     nxt_str_t               name;
46     nxt_socket_conf_t       *socket_conf;
47     nxt_router_temp_conf_t  *temp_conf;
48     nxt_tls_init_t          *tls_init;
49     nxt_bool_t              last;
50 
51     nxt_queue_link_t        link;  /* for nxt_socket_conf_t.tls */
52 } nxt_router_tlssock_t;
53 
54 #endif
55 
56 
57 typedef struct {
58     nxt_str_t               *name;
59     nxt_socket_conf_t       *socket_conf;
60     nxt_router_temp_conf_t  *temp_conf;
61     nxt_bool_t              last;
62 } nxt_socket_rpc_t;
63 
64 
65 typedef struct {
66     nxt_app_t               *app;
67     nxt_router_temp_conf_t  *temp_conf;
68     uint8_t                 proto;  /* 1 bit */
69 } nxt_app_rpc_t;
70 
71 
72 typedef struct {
73     nxt_app_joint_t         *app_joint;
74     uint32_t                generation;
75     uint8_t                 proto;  /* 1 bit */
76 } nxt_app_joint_rpc_t;
77 
78 
79 static nxt_int_t nxt_router_prefork(nxt_task_t *task, nxt_process_t *process,
80     nxt_mp_t *mp);
81 static nxt_int_t nxt_router_start(nxt_task_t *task, nxt_process_data_t *data);
82 static void nxt_router_greet_controller(nxt_task_t *task,
83     nxt_port_t *controller_port);
84 
85 static nxt_int_t nxt_router_start_app_process(nxt_task_t *task, nxt_app_t *app);
86 
87 static void nxt_router_new_port_handler(nxt_task_t *task,
88     nxt_port_recv_msg_t *msg);
89 static void nxt_router_conf_data_handler(nxt_task_t *task,
90     nxt_port_recv_msg_t *msg);
91 static void nxt_router_app_restart_handler(nxt_task_t *task,
92     nxt_port_recv_msg_t *msg);
93 static void nxt_router_remove_pid_handler(nxt_task_t *task,
94     nxt_port_recv_msg_t *msg);
95 static void nxt_router_access_log_reopen_handler(nxt_task_t *task,
96     nxt_port_recv_msg_t *msg);
97 
98 static nxt_router_temp_conf_t *nxt_router_temp_conf(nxt_task_t *task);
99 static void nxt_router_conf_apply(nxt_task_t *task, void *obj, void *data);
100 static void nxt_router_conf_ready(nxt_task_t *task,
101     nxt_router_temp_conf_t *tmcf);
102 static void nxt_router_conf_error(nxt_task_t *task,
103     nxt_router_temp_conf_t *tmcf);
104 static void nxt_router_conf_send(nxt_task_t *task,
105     nxt_router_temp_conf_t *tmcf, nxt_port_msg_type_t type);
106 
107 static nxt_int_t nxt_router_conf_create(nxt_task_t *task,
108     nxt_router_temp_conf_t *tmcf, u_char *start, u_char *end);
109 static nxt_int_t nxt_router_conf_process_static(nxt_task_t *task,
110     nxt_router_conf_t *rtcf, nxt_conf_value_t *conf);
111 static nxt_int_t nxt_router_conf_process_client_ip(nxt_task_t *task,
112     nxt_router_temp_conf_t *tmcf, nxt_socket_conf_t *skcf,
113     nxt_conf_value_t *conf);
114 
115 static nxt_app_t *nxt_router_app_find(nxt_queue_t *queue, nxt_str_t *name);
116 static nxt_int_t nxt_router_apps_hash_test(nxt_lvlhsh_query_t *lhq, void *data);
117 static nxt_int_t nxt_router_apps_hash_add(nxt_router_conf_t *rtcf,
118     nxt_app_t *app);
119 static nxt_app_t *nxt_router_apps_hash_get(nxt_router_conf_t *rtcf,
120     nxt_str_t *name);
121 static void nxt_router_apps_hash_use(nxt_task_t *task, nxt_router_conf_t *rtcf,
122     int i);
123 
124 static nxt_int_t nxt_router_app_queue_init(nxt_task_t *task,
125     nxt_port_t *port);
126 static nxt_int_t nxt_router_port_queue_init(nxt_task_t *task,
127     nxt_port_t *port);
128 static nxt_int_t nxt_router_port_queue_map(nxt_task_t *task,
129     nxt_port_t *port, nxt_fd_t fd);
130 static void nxt_router_listen_socket_rpc_create(nxt_task_t *task,
131     nxt_router_temp_conf_t *tmcf, nxt_socket_conf_t *skcf);
132 static void nxt_router_listen_socket_ready(nxt_task_t *task,
133     nxt_port_recv_msg_t *msg, void *data);
134 static void nxt_router_listen_socket_error(nxt_task_t *task,
135     nxt_port_recv_msg_t *msg, void *data);
136 #if (NXT_TLS)
137 static void nxt_router_tls_rpc_handler(nxt_task_t *task,
138     nxt_port_recv_msg_t *msg, void *data);
139 static nxt_int_t nxt_router_conf_tls_insert(nxt_router_temp_conf_t *tmcf,
140     nxt_conf_value_t *value, nxt_socket_conf_t *skcf, nxt_tls_init_t *tls_init,
141     nxt_bool_t last);
142 #endif
143 static void nxt_router_app_rpc_create(nxt_task_t *task,
144     nxt_router_temp_conf_t *tmcf, nxt_app_t *app);
145 static void nxt_router_app_prefork_ready(nxt_task_t *task,
146     nxt_port_recv_msg_t *msg, void *data);
147 static void nxt_router_app_prefork_error(nxt_task_t *task,
148     nxt_port_recv_msg_t *msg, void *data);
149 static nxt_socket_conf_t *nxt_router_socket_conf(nxt_task_t *task,
150     nxt_router_temp_conf_t *tmcf, nxt_str_t *name);
151 static nxt_int_t nxt_router_listen_socket_find(nxt_router_temp_conf_t *tmcf,
152     nxt_socket_conf_t *nskcf, nxt_sockaddr_t *sa);
153 
154 static nxt_int_t nxt_router_engines_create(nxt_task_t *task,
155     nxt_router_t *router, nxt_router_temp_conf_t *tmcf,
156     const nxt_event_interface_t *interface);
157 static nxt_int_t nxt_router_engine_conf_create(nxt_router_temp_conf_t *tmcf,
158     nxt_router_engine_conf_t *recf);
159 static nxt_int_t nxt_router_engine_conf_update(nxt_router_temp_conf_t *tmcf,
160     nxt_router_engine_conf_t *recf);
161 static nxt_int_t nxt_router_engine_conf_delete(nxt_router_temp_conf_t *tmcf,
162     nxt_router_engine_conf_t *recf);
163 static nxt_int_t nxt_router_engine_joints_create(nxt_router_temp_conf_t *tmcf,
164     nxt_router_engine_conf_t *recf, nxt_queue_t *sockets,
165     nxt_work_handler_t handler);
166 static nxt_int_t nxt_router_engine_quit(nxt_router_temp_conf_t *tmcf,
167     nxt_router_engine_conf_t *recf);
168 static nxt_int_t nxt_router_engine_joints_delete(nxt_router_temp_conf_t *tmcf,
169     nxt_router_engine_conf_t *recf, nxt_queue_t *sockets);
170 
171 static nxt_int_t nxt_router_threads_create(nxt_task_t *task, nxt_runtime_t *rt,
172     nxt_router_temp_conf_t *tmcf);
173 static nxt_int_t nxt_router_thread_create(nxt_task_t *task, nxt_runtime_t *rt,
174     nxt_event_engine_t *engine);
175 static void nxt_router_apps_sort(nxt_task_t *task, nxt_router_t *router,
176     nxt_router_temp_conf_t *tmcf);
177 
178 static void nxt_router_engines_post(nxt_router_t *router,
179     nxt_router_temp_conf_t *tmcf);
180 static void nxt_router_engine_post(nxt_event_engine_t *engine,
181     nxt_work_t *jobs);
182 
183 static void nxt_router_thread_start(void *data);
184 static void nxt_router_rt_add_port(nxt_task_t *task, void *obj,
185     void *data);
186 static void nxt_router_listen_socket_create(nxt_task_t *task, void *obj,
187     void *data);
188 static void nxt_router_listen_socket_update(nxt_task_t *task, void *obj,
189     void *data);
190 static void nxt_router_listen_socket_delete(nxt_task_t *task, void *obj,
191     void *data);
192 static void nxt_router_worker_thread_quit(nxt_task_t *task, void *obj,
193     void *data);
194 static void nxt_router_listen_socket_close(nxt_task_t *task, void *obj,
195     void *data);
196 static void nxt_router_thread_exit_handler(nxt_task_t *task, void *obj,
197     void *data);
198 static void nxt_router_req_headers_ack_handler(nxt_task_t *task,
199     nxt_port_recv_msg_t *msg, nxt_request_rpc_data_t *req_rpc_data);
200 static void nxt_router_listen_socket_release(nxt_task_t *task,
201     nxt_socket_conf_t *skcf);
202 
203 static void nxt_router_access_log_writer(nxt_task_t *task,
204     nxt_http_request_t *r, nxt_router_access_log_t *access_log);
205 static u_char *nxt_router_access_log_date(u_char *buf, nxt_realtime_t *now,
206     struct tm *tm, size_t size, const char *format);
207 static void nxt_router_access_log_open(nxt_task_t *task,
208     nxt_router_temp_conf_t *tmcf);
209 static void nxt_router_access_log_ready(nxt_task_t *task,
210     nxt_port_recv_msg_t *msg, void *data);
211 static void nxt_router_access_log_error(nxt_task_t *task,
212     nxt_port_recv_msg_t *msg, void *data);
213 static void nxt_router_access_log_use(nxt_thread_spinlock_t *lock,
214     nxt_router_access_log_t *access_log);
215 static void nxt_router_access_log_release(nxt_task_t *task,
216     nxt_thread_spinlock_t *lock, nxt_router_access_log_t *access_log);
217 static void nxt_router_access_log_reopen_completion(nxt_task_t *task, void *obj,
218     void *data);
219 static void nxt_router_access_log_reopen_ready(nxt_task_t *task,
220     nxt_port_recv_msg_t *msg, void *data);
221 static void nxt_router_access_log_reopen_error(nxt_task_t *task,
222     nxt_port_recv_msg_t *msg, void *data);
223 
224 static void nxt_router_app_port_ready(nxt_task_t *task,
225     nxt_port_recv_msg_t *msg, void *data);
226 static void nxt_router_app_port_error(nxt_task_t *task,
227     nxt_port_recv_msg_t *msg, void *data);
228 
229 static void nxt_router_app_use(nxt_task_t *task, nxt_app_t *app, int i);
230 static void nxt_router_app_unlink(nxt_task_t *task, nxt_app_t *app);
231 
232 static void nxt_router_app_port_release(nxt_task_t *task, nxt_app_t *app,
233     nxt_port_t *port, nxt_apr_action_t action);
234 static void nxt_router_app_port_get(nxt_task_t *task, nxt_app_t *app,
235     nxt_request_rpc_data_t *req_rpc_data);
236 static void nxt_router_http_request_error(nxt_task_t *task, void *obj,
237     void *data);
238 static void nxt_router_http_request_done(nxt_task_t *task, void *obj,
239     void *data);
240 
241 static void nxt_router_app_prepare_request(nxt_task_t *task,
242     nxt_request_rpc_data_t *req_rpc_data);
243 static nxt_buf_t *nxt_router_prepare_msg(nxt_task_t *task,
244     nxt_http_request_t *r, nxt_app_t *app, const nxt_str_t *prefix);
245 
246 static void nxt_router_app_timeout(nxt_task_t *task, void *obj, void *data);
247 static void nxt_router_adjust_idle_timer(nxt_task_t *task, void *obj,
248     void *data);
249 static void nxt_router_app_idle_timeout(nxt_task_t *task, void *obj,
250     void *data);
251 static void nxt_router_app_joint_release_handler(nxt_task_t *task, void *obj,
252     void *data);
253 static void nxt_router_free_app(nxt_task_t *task, void *obj, void *data);
254 
255 static const nxt_http_request_state_t  nxt_http_request_send_state;
256 static void nxt_http_request_send_body(nxt_task_t *task, void *obj, void *data);
257 
258 static void nxt_router_app_joint_use(nxt_task_t *task,
259     nxt_app_joint_t *app_joint, int i);
260 
261 static void nxt_router_http_request_release_post(nxt_task_t *task,
262     nxt_http_request_t *r);
263 static void nxt_router_http_request_release(nxt_task_t *task, void *obj,
264     void *data);
265 static void nxt_router_oosm_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg);
266 static void nxt_router_get_port_handler(nxt_task_t *task,
267     nxt_port_recv_msg_t *msg);
268 static void nxt_router_get_mmap_handler(nxt_task_t *task,
269     nxt_port_recv_msg_t *msg);
270 
271 extern const nxt_http_request_state_t  nxt_http_websocket;
272 
273 static nxt_router_t  *nxt_router;
274 
275 static const nxt_str_t http_prefix = nxt_string("HTTP_");
276 static const nxt_str_t empty_prefix = nxt_string("");
277 
278 static const nxt_str_t  *nxt_app_msg_prefix[] = {
279     &empty_prefix,
280     &empty_prefix,
281     &http_prefix,
282     &http_prefix,
283     &http_prefix,
284     &empty_prefix,
285 };
286 
287 
288 static const nxt_port_handlers_t  nxt_router_process_port_handlers = {
289     .quit         = nxt_signal_quit_handler,
290     .new_port     = nxt_router_new_port_handler,
291     .get_port     = nxt_router_get_port_handler,
292     .change_file  = nxt_port_change_log_file_handler,
293     .mmap         = nxt_port_mmap_handler,
294     .get_mmap     = nxt_router_get_mmap_handler,
295     .data         = nxt_router_conf_data_handler,
296     .app_restart  = nxt_router_app_restart_handler,
297     .remove_pid   = nxt_router_remove_pid_handler,
298     .access_log   = nxt_router_access_log_reopen_handler,
299     .rpc_ready    = nxt_port_rpc_handler,
300     .rpc_error    = nxt_port_rpc_handler,
301     .oosm         = nxt_router_oosm_handler,
302 };
303 
304 
305 const nxt_process_init_t  nxt_router_process = {
306     .name           = "router",
307     .type           = NXT_PROCESS_ROUTER,
308     .prefork        = nxt_router_prefork,
309     .restart        = 1,
310     .setup          = nxt_process_core_setup,
311     .start          = nxt_router_start,
312     .port_handlers  = &nxt_router_process_port_handlers,
313     .signals        = nxt_process_signals,
314 };
315 
316 
317 /* Queues of nxt_socket_conf_t */
318 nxt_queue_t  creating_sockets;
319 nxt_queue_t  pending_sockets;
320 nxt_queue_t  updating_sockets;
321 nxt_queue_t  keeping_sockets;
322 nxt_queue_t  deleting_sockets;
323 
324 
325 static nxt_int_t
326 nxt_router_prefork(nxt_task_t *task, nxt_process_t *process, nxt_mp_t *mp)
327 {
328     nxt_runtime_stop_app_processes(task, task->thread->runtime);
329 
330     return NXT_OK;
331 }
332 
333 
334 static nxt_int_t
335 nxt_router_start(nxt_task_t *task, nxt_process_data_t *data)
336 {
337     nxt_int_t      ret;
338     nxt_port_t     *controller_port;
339     nxt_router_t   *router;
340     nxt_runtime_t  *rt;
341 
342     rt = task->thread->runtime;
343 
344     nxt_log(task, NXT_LOG_INFO, "router started");
345 
346 #if (NXT_TLS)
347     rt->tls = nxt_service_get(rt->services, "SSL/TLS", "OpenSSL");
348     if (nxt_slow_path(rt->tls == NULL)) {
349         return NXT_ERROR;
350     }
351 
352     ret = rt->tls->library_init(task);
353     if (nxt_slow_path(ret != NXT_OK)) {
354         return ret;
355     }
356 #endif
357 
358     ret = nxt_http_init(task);
359     if (nxt_slow_path(ret != NXT_OK)) {
360         return ret;
361     }
362 
363     router = nxt_zalloc(sizeof(nxt_router_t));
364     if (nxt_slow_path(router == NULL)) {
365         return NXT_ERROR;
366     }
367 
368     nxt_queue_init(&router->engines);
369     nxt_queue_init(&router->sockets);
370     nxt_queue_init(&router->apps);
371 
372     nxt_router = router;
373 
374     controller_port = rt->port_by_type[NXT_PROCESS_CONTROLLER];
375     if (controller_port != NULL) {
376         nxt_router_greet_controller(task, controller_port);
377     }
378 
379     return NXT_OK;
380 }
381 
382 
383 static void
384 nxt_router_greet_controller(nxt_task_t *task, nxt_port_t *controller_port)
385 {
386     nxt_port_socket_write(task, controller_port, NXT_PORT_MSG_PROCESS_READY,
387                           -1, 0, 0, NULL);
388 }
389 
390 
391 static void
392 nxt_router_start_app_process_handler(nxt_task_t *task, nxt_port_t *port,
393     void *data)
394 {
395     size_t               size;
396     uint32_t             stream;
397     nxt_fd_t             port_fd, queue_fd;
398     nxt_int_t            ret;
399     nxt_app_t            *app;
400     nxt_buf_t            *b;
401     nxt_port_t           *dport;
402     nxt_runtime_t        *rt;
403     nxt_app_joint_rpc_t  *app_joint_rpc;
404 
405     app = data;
406 
407     nxt_thread_mutex_lock(&app->mutex);
408 
409     dport = app->proto_port;
410 
411     nxt_thread_mutex_unlock(&app->mutex);
412 
413     if (dport != NULL) {
414         nxt_debug(task, "app '%V' %p start process", &app->name, app);
415 
416         b = NULL;
417         port_fd = -1;
418         queue_fd = -1;
419 
420     } else {
421         if (app->proto_port_requests > 0) {
422             nxt_debug(task, "app '%V' %p wait for prototype process",
423                       &app->name, app);
424 
425             app->proto_port_requests++;
426 
427             goto skip;
428         }
429 
430         nxt_debug(task, "app '%V' %p start prototype process", &app->name, app);
431 
432         rt = task->thread->runtime;
433         dport = rt->port_by_type[NXT_PROCESS_MAIN];
434 
435         size = app->name.length + 1 + app->conf.length;
436 
437         b = nxt_buf_mem_alloc(task->thread->engine->mem_pool, size, 0);
438         if (nxt_slow_path(b == NULL)) {
439             goto failed;
440         }
441 
442         nxt_buf_cpystr(b, &app->name);
443         *b->mem.free++ = '\0';
444         nxt_buf_cpystr(b, &app->conf);
445 
446         port_fd = app->shared_port->pair[0];
447         queue_fd = app->shared_port->queue_fd;
448     }
449 
450     app_joint_rpc = nxt_port_rpc_register_handler_ex(task, port,
451                                                      nxt_router_app_port_ready,
452                                                      nxt_router_app_port_error,
453                                                    sizeof(nxt_app_joint_rpc_t));
454     if (nxt_slow_path(app_joint_rpc == NULL)) {
455         goto failed;
456     }
457 
458     stream = nxt_port_rpc_ex_stream(app_joint_rpc);
459 
460     ret = nxt_port_socket_write2(task, dport, NXT_PORT_MSG_START_PROCESS,
461                                  port_fd, queue_fd, stream, port->id, b);
462     if (nxt_slow_path(ret != NXT_OK)) {
463         nxt_port_rpc_cancel(task, port, stream);
464 
465         goto failed;
466     }
467 
468     app_joint_rpc->app_joint = app->joint;
469     app_joint_rpc->generation = app->generation;
470     app_joint_rpc->proto = (b != NULL);
471 
472     if (b != NULL) {
473         app->proto_port_requests++;
474 
475         b = NULL;
476     }
477 
478     nxt_router_app_joint_use(task, app->joint, 1);
479 
480 failed:
481 
482     if (b != NULL) {
483         nxt_mp_free(b->data, b);
484     }
485 
486 skip:
487 
488     nxt_router_app_use(task, app, -1);
489 }
490 
491 
492 static void
493 nxt_router_app_joint_use(nxt_task_t *task, nxt_app_joint_t *app_joint, int i)
494 {
495     app_joint->use_count += i;
496 
497     if (app_joint->use_count == 0) {
498         nxt_assert(app_joint->app == NULL);
499 
500         nxt_free(app_joint);
501     }
502 }
503 
504 
505 static nxt_int_t
506 nxt_router_start_app_process(nxt_task_t *task, nxt_app_t *app)
507 {
508     nxt_int_t      res;
509     nxt_port_t     *router_port;
510     nxt_runtime_t  *rt;
511 
512     nxt_debug(task, "app '%V' start process", &app->name);
513 
514     rt = task->thread->runtime;
515     router_port = rt->port_by_type[NXT_PROCESS_ROUTER];
516 
517     nxt_router_app_use(task, app, 1);
518 
519     res = nxt_port_post(task, router_port, nxt_router_start_app_process_handler,
520                         app);
521 
522     if (res == NXT_OK) {
523         return res;
524     }
525 
526     nxt_thread_mutex_lock(&app->mutex);
527 
528     app->pending_processes--;
529 
530     nxt_thread_mutex_unlock(&app->mutex);
531 
532     nxt_router_app_use(task, app, -1);
533 
534     return NXT_ERROR;
535 }
536 
537 
538 nxt_inline nxt_bool_t
539 nxt_router_msg_cancel(nxt_task_t *task, nxt_request_rpc_data_t *req_rpc_data)
540 {
541     nxt_buf_t       *b, *next;
542     nxt_bool_t      cancelled;
543     nxt_port_t      *app_port;
544     nxt_msg_info_t  *msg_info;
545 
546     msg_info = &req_rpc_data->msg_info;
547 
548     if (msg_info->buf == NULL) {
549         return 0;
550     }
551 
552     app_port = req_rpc_data->app_port;
553 
554     if (app_port != NULL && app_port->id == NXT_SHARED_PORT_ID) {
555         cancelled = nxt_app_queue_cancel(app_port->queue,
556                                          msg_info->tracking_cookie,
557                                          req_rpc_data->stream);
558 
559         if (cancelled) {
560             nxt_debug(task, "stream #%uD: cancelled by router",
561                       req_rpc_data->stream);
562         }
563 
564     } else {
565         cancelled = 0;
566     }
567 
568     for (b = msg_info->buf; b != NULL; b = next) {
569         next = b->next;
570         b->next = NULL;
571 
572         if (b->is_port_mmap_sent) {
573             b->is_port_mmap_sent = cancelled == 0;
574         }
575 
576         b->completion_handler(task, b, b->parent);
577     }
578 
579     msg_info->buf = NULL;
580 
581     return cancelled;
582 }
583 
584 
585 nxt_inline nxt_bool_t
586 nxt_queue_chk_remove(nxt_queue_link_t *lnk)
587 {
588     if (lnk->next != NULL) {
589         nxt_queue_remove(lnk);
590 
591         lnk->next = NULL;
592 
593         return 1;
594     }
595 
596     return 0;
597 }
598 
599 
600 nxt_inline void
601 nxt_request_rpc_data_unlink(nxt_task_t *task,
602     nxt_request_rpc_data_t *req_rpc_data)
603 {
604     nxt_app_t           *app;
605     nxt_bool_t          unlinked;
606     nxt_http_request_t  *r;
607 
608     nxt_router_msg_cancel(task, req_rpc_data);
609 
610     app = req_rpc_data->app;
611 
612     if (req_rpc_data->app_port != NULL) {
613         nxt_router_app_port_release(task, app, req_rpc_data->app_port,
614                                     req_rpc_data->apr_action);
615 
616         req_rpc_data->app_port = NULL;
617     }
618 
619     r = req_rpc_data->request;
620 
621     if (r != NULL) {
622         r->timer_data = NULL;
623 
624         nxt_router_http_request_release_post(task, r);
625 
626         r->req_rpc_data = NULL;
627         req_rpc_data->request = NULL;
628 
629         if (app != NULL) {
630             unlinked = 0;
631 
632             nxt_thread_mutex_lock(&app->mutex);
633 
634             if (r->app_link.next != NULL) {
635                 nxt_queue_remove(&r->app_link);
636                 r->app_link.next = NULL;
637 
638                 unlinked = 1;
639             }
640 
641             nxt_thread_mutex_unlock(&app->mutex);
642 
643             if (unlinked) {
644                 nxt_mp_release(r->mem_pool);
645             }
646         }
647     }
648 
649     if (app != NULL) {
650         nxt_router_app_use(task, app, -1);
651 
652         req_rpc_data->app = NULL;
653     }
654 
655     if (req_rpc_data->msg_info.body_fd != -1) {
656         nxt_fd_close(req_rpc_data->msg_info.body_fd);
657 
658         req_rpc_data->msg_info.body_fd = -1;
659     }
660 
661     if (req_rpc_data->rpc_cancel) {
662         req_rpc_data->rpc_cancel = 0;
663 
664         nxt_port_rpc_cancel(task, task->thread->engine->port,
665                             req_rpc_data->stream);
666     }
667 }
668 
669 
670 static void
671 nxt_router_new_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
672 {
673     nxt_int_t      res;
674     nxt_app_t      *app;
675     nxt_port_t     *port, *main_app_port;
676     nxt_runtime_t  *rt;
677 
678     nxt_port_new_port_handler(task, msg);
679 
680     port = msg->u.new_port;
681 
682     if (port != NULL && port->type == NXT_PROCESS_CONTROLLER) {
683         nxt_router_greet_controller(task, msg->u.new_port);
684     }
685 
686     if (port != NULL && port->type == NXT_PROCESS_PROTOTYPE)  {
687         nxt_port_rpc_handler(task, msg);
688 
689         return;
690     }
691 
692     if (port == NULL || port->type != NXT_PROCESS_APP) {
693 
694         if (msg->port_msg.stream == 0) {
695             return;
696         }
697 
698         msg->port_msg.type = _NXT_PORT_MSG_RPC_ERROR;
699 
700     } else {
701         if (msg->fd[1] != -1) {
702             res = nxt_router_port_queue_map(task, port, msg->fd[1]);
703             if (nxt_slow_path(res != NXT_OK)) {
704                 return;
705             }
706 
707             nxt_fd_close(msg->fd[1]);
708             msg->fd[1] = -1;
709         }
710     }
711 
712     if (msg->port_msg.stream != 0) {
713         nxt_port_rpc_handler(task, msg);
714         return;
715     }
716 
717     nxt_debug(task, "new port id %d (%d)", port->id, port->type);
718 
719     /*
720      * Port with "id == 0" is application 'main' port and it always
721      * should come with non-zero stream.
722      */
723     nxt_assert(port->id != 0);
724 
725     /* Find 'main' app port and get app reference. */
726     rt = task->thread->runtime;
727 
728     /*
729      * It is safe to access 'runtime->ports' hash because 'NEW_PORT'
730      * sent to main port (with id == 0) and processed in main thread.
731      */
732     main_app_port = nxt_port_hash_find(&rt->ports, port->pid, 0);
733     nxt_assert(main_app_port != NULL);
734 
735     app = main_app_port->app;
736 
737     if (nxt_fast_path(app != NULL)) {
738         nxt_thread_mutex_lock(&app->mutex);
739 
740         /* TODO here should be find-and-add code because there can be
741            port waiters in port_hash */
742         nxt_port_hash_add(&app->port_hash, port);
743         app->port_hash_count++;
744 
745         nxt_thread_mutex_unlock(&app->mutex);
746 
747         port->app = app;
748     }
749 
750     port->main_app_port = main_app_port;
751 
752     nxt_port_socket_write(task, port, NXT_PORT_MSG_PORT_ACK, -1, 0, 0, NULL);
753 }
754 
755 
756 static void
757 nxt_router_conf_data_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
758 {
759     void                    *p;
760     size_t                  size;
761     nxt_int_t               ret;
762     nxt_port_t              *port;
763     nxt_router_temp_conf_t  *tmcf;
764 
765     port = nxt_runtime_port_find(task->thread->runtime,
766                                  msg->port_msg.pid,
767                                  msg->port_msg.reply_port);
768     if (nxt_slow_path(port == NULL)) {
769         nxt_alert(task, "conf_data_handler: reply port not found");
770         return;
771     }
772 
773     p = MAP_FAILED;
774 
775     /*
776      * Ancient compilers like gcc 4.8.5 on CentOS 7 wants 'size' to be
777      * initialized in 'cleanup' section.
778      */
779     size = 0;
780 
781     tmcf = nxt_router_temp_conf(task);
782     if (nxt_slow_path(tmcf == NULL)) {
783         goto fail;
784     }
785 
786     if (nxt_slow_path(msg->fd[0] == -1)) {
787         nxt_alert(task, "conf_data_handler: invalid shm fd");
788         goto fail;
789     }
790 
791     if (nxt_buf_mem_used_size(&msg->buf->mem) != sizeof(size_t)) {
792         nxt_alert(task, "conf_data_handler: unexpected buffer size (%d)",
793                   (int) nxt_buf_mem_used_size(&msg->buf->mem));
794         goto fail;
795     }
796 
797     nxt_memcpy(&size, msg->buf->mem.pos, sizeof(size_t));
798 
799     p = nxt_mem_mmap(NULL, size, PROT_READ, MAP_SHARED, msg->fd[0], 0);
800 
801     nxt_fd_close(msg->fd[0]);
802     msg->fd[0] = -1;
803 
804     if (nxt_slow_path(p == MAP_FAILED)) {
805         goto fail;
806     }
807 
808     nxt_debug(task, "conf_data_handler(%uz): %*s", size, size, p);
809 
810     tmcf->router_conf->router = nxt_router;
811     tmcf->stream = msg->port_msg.stream;
812     tmcf->port = port;
813 
814     nxt_port_use(task, tmcf->port, 1);
815 
816     ret = nxt_router_conf_create(task, tmcf, p, nxt_pointer_to(p, size));
817 
818     if (nxt_fast_path(ret == NXT_OK)) {
819         nxt_router_conf_apply(task, tmcf, NULL);
820 
821     } else {
822         nxt_router_conf_error(task, tmcf);
823     }
824 
825     goto cleanup;
826 
827 fail:
828 
829     nxt_port_socket_write(task, port, NXT_PORT_MSG_RPC_ERROR, -1,
830                           msg->port_msg.stream, 0, NULL);
831 
832     if (tmcf != NULL) {
833         nxt_mp_release(tmcf->mem_pool);
834     }
835 
836 cleanup:
837 
838     if (p != MAP_FAILED) {
839         nxt_mem_munmap(p, size);
840     }
841 
842     if (msg->fd[0] != -1) {
843         nxt_fd_close(msg->fd[0]);
844         msg->fd[0] = -1;
845     }
846 }
847 
848 
849 static void
850 nxt_router_app_restart_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
851 {
852     nxt_app_t            *app;
853     nxt_int_t            ret;
854     nxt_str_t            app_name;
855     nxt_port_t           *reply_port, *shared_port, *old_shared_port;
856     nxt_port_t           *proto_port;
857     nxt_port_msg_type_t  reply;
858 
859     reply_port = nxt_runtime_port_find(task->thread->runtime,
860                                        msg->port_msg.pid,
861                                        msg->port_msg.reply_port);
862     if (nxt_slow_path(reply_port == NULL)) {
863         nxt_alert(task, "app_restart_handler: reply port not found");
864         return;
865     }
866 
867     app_name.length = nxt_buf_mem_used_size(&msg->buf->mem);
868     app_name.start = msg->buf->mem.pos;
869 
870     nxt_debug(task, "app_restart_handler: %V", &app_name);
871 
872     app = nxt_router_app_find(&nxt_router->apps, &app_name);
873 
874     if (nxt_fast_path(app != NULL)) {
875         shared_port = nxt_port_new(task, NXT_SHARED_PORT_ID, nxt_pid,
876                                    NXT_PROCESS_APP);
877         if (nxt_slow_path(shared_port == NULL)) {
878             goto fail;
879         }
880 
881         ret = nxt_port_socket_init(task, shared_port, 0);
882         if (nxt_slow_path(ret != NXT_OK)) {
883             nxt_port_use(task, shared_port, -1);
884             goto fail;
885         }
886 
887         ret = nxt_router_app_queue_init(task, shared_port);
888         if (nxt_slow_path(ret != NXT_OK)) {
889             nxt_port_write_close(shared_port);
890             nxt_port_read_close(shared_port);
891             nxt_port_use(task, shared_port, -1);
892             goto fail;
893         }
894 
895         nxt_port_write_enable(task, shared_port);
896 
897         nxt_thread_mutex_lock(&app->mutex);
898 
899         proto_port = app->proto_port;
900 
901         if (proto_port != NULL) {
902             nxt_debug(task, "send QUIT to prototype '%V' pid %PI", &app->name,
903                       proto_port->pid);
904 
905             app->proto_port = NULL;
906             proto_port->app = NULL;
907         }
908 
909         app->generation++;
910 
911         shared_port->app = app;
912 
913         old_shared_port = app->shared_port;
914         old_shared_port->app = NULL;
915 
916         app->shared_port = shared_port;
917 
918         nxt_thread_mutex_unlock(&app->mutex);
919 
920         nxt_port_close(task, old_shared_port);
921         nxt_port_use(task, old_shared_port, -1);
922 
923         if (proto_port != NULL) {
924             (void) nxt_port_socket_write(task, proto_port, NXT_PORT_MSG_QUIT,
925                                          -1, 0, 0, NULL);
926 
927             nxt_port_close(task, proto_port);
928 
929             nxt_port_use(task, proto_port, -1);
930         }
931 
932         reply = NXT_PORT_MSG_RPC_READY_LAST;
933 
934     } else {
935 
936 fail:
937 
938         reply = NXT_PORT_MSG_RPC_ERROR;
939     }
940 
941     nxt_port_socket_write(task, reply_port, reply, -1, msg->port_msg.stream,
942                           0, NULL);
943 }
944 
945 
946 static void
947 nxt_router_app_process_remove_pid(nxt_task_t *task, nxt_port_t *port,
948     void *data)
949 {
950     union {
951         nxt_pid_t  removed_pid;
952         void       *data;
953     } u;
954 
955     u.data = data;
956 
957     nxt_port_rpc_remove_peer(task, port, u.removed_pid);
958 }
959 
960 
961 static void
962 nxt_router_remove_pid_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
963 {
964     nxt_event_engine_t  *engine;
965 
966     nxt_port_remove_pid_handler(task, msg);
967 
968     nxt_queue_each(engine, &nxt_router->engines, nxt_event_engine_t, link0)
969     {
970         if (nxt_fast_path(engine->port != NULL)) {
971             nxt_port_post(task, engine->port, nxt_router_app_process_remove_pid,
972                           msg->u.data);
973         }
974     }
975     nxt_queue_loop;
976 
977     if (msg->port_msg.stream == 0) {
978         return;
979     }
980 
981     msg->port_msg.type = _NXT_PORT_MSG_RPC_ERROR;
982 
983     nxt_port_rpc_handler(task, msg);
984 }
985 
986 
987 static nxt_router_temp_conf_t *
988 nxt_router_temp_conf(nxt_task_t *task)
989 {
990     nxt_mp_t                *mp, *tmp;
991     nxt_router_conf_t       *rtcf;
992     nxt_router_temp_conf_t  *tmcf;
993 
994     mp = nxt_mp_create(1024, 128, 256, 32);
995     if (nxt_slow_path(mp == NULL)) {
996         return NULL;
997     }
998 
999     rtcf = nxt_mp_zget(mp, sizeof(nxt_router_conf_t));
1000     if (nxt_slow_path(rtcf == NULL)) {
1001         goto fail;
1002     }
1003 
1004     rtcf->mem_pool = mp;
1005 
1006     tmp = nxt_mp_create(1024, 128, 256, 32);
1007     if (nxt_slow_path(tmp == NULL)) {
1008         goto fail;
1009     }
1010 
1011     tmcf = nxt_mp_zget(tmp, sizeof(nxt_router_temp_conf_t));
1012     if (nxt_slow_path(tmcf == NULL)) {
1013         goto temp_fail;
1014     }
1015 
1016     tmcf->mem_pool = tmp;
1017     tmcf->router_conf = rtcf;
1018     tmcf->count = 1;
1019     tmcf->engine = task->thread->engine;
1020 
1021     tmcf->engines = nxt_array_create(tmcf->mem_pool, 4,
1022                                      sizeof(nxt_router_engine_conf_t));
1023     if (nxt_slow_path(tmcf->engines == NULL)) {
1024         goto temp_fail;
1025     }
1026 
1027     nxt_queue_init(&creating_sockets);
1028     nxt_queue_init(&pending_sockets);
1029     nxt_queue_init(&updating_sockets);
1030     nxt_queue_init(&keeping_sockets);
1031     nxt_queue_init(&deleting_sockets);
1032 
1033 #if (NXT_TLS)
1034     nxt_queue_init(&tmcf->tls);
1035 #endif
1036 
1037     nxt_queue_init(&tmcf->apps);
1038     nxt_queue_init(&tmcf->previous);
1039 
1040     return tmcf;
1041 
1042 temp_fail:
1043 
1044     nxt_mp_destroy(tmp);
1045 
1046 fail:
1047 
1048     nxt_mp_destroy(mp);
1049 
1050     return NULL;
1051 }
1052 
1053 
1054 nxt_inline nxt_bool_t
1055 nxt_router_app_can_start(nxt_app_t *app)
1056 {
1057     return app->processes + app->pending_processes < app->max_processes
1058             && app->pending_processes < app->max_pending_processes;
1059 }
1060 
1061 
1062 nxt_inline nxt_bool_t
1063 nxt_router_app_need_start(nxt_app_t *app)
1064 {
1065     return (app->active_requests
1066               > app->port_hash_count + app->pending_processes)
1067            || (app->spare_processes
1068                 > app->idle_processes + app->pending_processes);
1069 }
1070 
1071 
1072 static void
1073 nxt_router_conf_apply(nxt_task_t *task, void *obj, void *data)
1074 {
1075     nxt_int_t                    ret;
1076     nxt_app_t                    *app;
1077     nxt_router_t                 *router;
1078     nxt_runtime_t                *rt;
1079     nxt_queue_link_t             *qlk;
1080     nxt_socket_conf_t            *skcf;
1081     nxt_router_conf_t            *rtcf;
1082     nxt_router_temp_conf_t       *tmcf;
1083     const nxt_event_interface_t  *interface;
1084 #if (NXT_TLS)
1085     nxt_router_tlssock_t         *tls;
1086 #endif
1087 
1088     tmcf = obj;
1089 
1090     qlk = nxt_queue_first(&pending_sockets);
1091 
1092     if (qlk != nxt_queue_tail(&pending_sockets)) {
1093         nxt_queue_remove(qlk);
1094         nxt_queue_insert_tail(&creating_sockets, qlk);
1095 
1096         skcf = nxt_queue_link_data(qlk, nxt_socket_conf_t, link);
1097 
1098         nxt_router_listen_socket_rpc_create(task, tmcf, skcf);
1099 
1100         return;
1101     }
1102 
1103 #if (NXT_TLS)
1104     qlk = nxt_queue_last(&tmcf->tls);
1105 
1106     if (qlk != nxt_queue_head(&tmcf->tls)) {
1107         nxt_queue_remove(qlk);
1108 
1109         tls = nxt_queue_link_data(qlk, nxt_router_tlssock_t, link);
1110 
1111         nxt_cert_store_get(task, &tls->name, tmcf->mem_pool,
1112                            nxt_router_tls_rpc_handler, tls);
1113         return;
1114     }
1115 #endif
1116 
1117     nxt_queue_each(app, &tmcf->apps, nxt_app_t, link) {
1118 
1119         if (nxt_router_app_need_start(app)) {
1120             nxt_router_app_rpc_create(task, tmcf, app);
1121             return;
1122         }
1123 
1124     } nxt_queue_loop;
1125 
1126     rtcf = tmcf->router_conf;
1127 
1128     if (rtcf->access_log != NULL && rtcf->access_log->fd == -1) {
1129         nxt_router_access_log_open(task, tmcf);
1130         return;
1131     }
1132 
1133     rt = task->thread->runtime;
1134 
1135     interface = nxt_service_get(rt->services, "engine", NULL);
1136 
1137     router = rtcf->router;
1138 
1139     ret = nxt_router_engines_create(task, router, tmcf, interface);
1140     if (nxt_slow_path(ret != NXT_OK)) {
1141         goto fail;
1142     }
1143 
1144     ret = nxt_router_threads_create(task, rt, tmcf);
1145     if (nxt_slow_path(ret != NXT_OK)) {
1146         goto fail;
1147     }
1148 
1149     nxt_router_apps_sort(task, router, tmcf);
1150 
1151     nxt_router_apps_hash_use(task, rtcf, 1);
1152 
1153     nxt_router_engines_post(router, tmcf);
1154 
1155     nxt_queue_add(&router->sockets, &updating_sockets);
1156     nxt_queue_add(&router->sockets, &creating_sockets);
1157 
1158     if (router->access_log != rtcf->access_log) {
1159         nxt_router_access_log_use(&router->lock, rtcf->access_log);
1160 
1161         nxt_router_access_log_release(task, &router->lock, router->access_log);
1162 
1163         router->access_log = rtcf->access_log;
1164     }
1165 
1166     nxt_router_conf_ready(task, tmcf);
1167 
1168     return;
1169 
1170 fail:
1171 
1172     nxt_router_conf_error(task, tmcf);
1173 
1174     return;
1175 }
1176 
1177 
1178 static void
1179 nxt_router_conf_wait(nxt_task_t *task, void *obj, void *data)
1180 {
1181     nxt_joint_job_t  *job;
1182 
1183     job = obj;
1184 
1185     nxt_router_conf_ready(task, job->tmcf);
1186 }
1187 
1188 
1189 static void
1190 nxt_router_conf_ready(nxt_task_t *task, nxt_router_temp_conf_t *tmcf)
1191 {
1192     uint32_t               count;
1193     nxt_router_conf_t      *rtcf;
1194     nxt_thread_spinlock_t  *lock;
1195 
1196     nxt_debug(task, "temp conf %p count: %D", tmcf, tmcf->count);
1197 
1198     if (--tmcf->count > 0) {
1199         return;
1200     }
1201 
1202     nxt_router_conf_send(task, tmcf, NXT_PORT_MSG_RPC_READY_LAST);
1203 
1204     rtcf = tmcf->router_conf;
1205 
1206     lock = &rtcf->router->lock;
1207 
1208     nxt_thread_spin_lock(lock);
1209 
1210     count = rtcf->count;
1211 
1212     nxt_thread_spin_unlock(lock);
1213 
1214     nxt_debug(task, "rtcf %p: %D", rtcf, count);
1215 
1216     if (count == 0) {
1217         nxt_router_apps_hash_use(task, rtcf, -1);
1218 
1219         nxt_router_access_log_release(task, lock, rtcf->access_log);
1220 
1221         nxt_mp_destroy(rtcf->mem_pool);
1222     }
1223 
1224     nxt_mp_release(tmcf->mem_pool);
1225 }
1226 
1227 
1228 static void
1229 nxt_router_conf_error(nxt_task_t *task, nxt_router_temp_conf_t *tmcf)
1230 {
1231     nxt_app_t          *app;
1232     nxt_queue_t        new_socket_confs;
1233     nxt_socket_t       s;
1234     nxt_router_t       *router;
1235     nxt_queue_link_t   *qlk;
1236     nxt_socket_conf_t  *skcf;
1237     nxt_router_conf_t  *rtcf;
1238 
1239     nxt_alert(task, "failed to apply new conf");
1240 
1241     for (qlk = nxt_queue_first(&creating_sockets);
1242          qlk != nxt_queue_tail(&creating_sockets);
1243          qlk = nxt_queue_next(qlk))
1244     {
1245         skcf = nxt_queue_link_data(qlk, nxt_socket_conf_t, link);
1246         s = skcf->listen->socket;
1247 
1248         if (s != -1) {
1249             nxt_socket_close(task, s);
1250         }
1251 
1252         nxt_free(skcf->listen);
1253     }
1254 
1255     nxt_queue_init(&new_socket_confs);
1256     nxt_queue_add(&new_socket_confs, &updating_sockets);
1257     nxt_queue_add(&new_socket_confs, &pending_sockets);
1258     nxt_queue_add(&new_socket_confs, &creating_sockets);
1259 
1260     rtcf = tmcf->router_conf;
1261 
1262     nxt_queue_each(app, &tmcf->apps, nxt_app_t, link) {
1263 
1264         nxt_router_app_unlink(task, app);
1265 
1266     } nxt_queue_loop;
1267 
1268     router = rtcf->router;
1269 
1270     nxt_queue_add(&router->sockets, &keeping_sockets);
1271     nxt_queue_add(&router->sockets, &deleting_sockets);
1272 
1273     nxt_queue_add(&router->apps, &tmcf->previous);
1274 
1275     // TODO: new engines and threads
1276 
1277     nxt_router_access_log_release(task, &router->lock, rtcf->access_log);
1278 
1279     nxt_mp_destroy(rtcf->mem_pool);
1280 
1281     nxt_router_conf_send(task, tmcf, NXT_PORT_MSG_RPC_ERROR);
1282 
1283     nxt_mp_release(tmcf->mem_pool);
1284 }
1285 
1286 
1287 static void
1288 nxt_router_conf_send(nxt_task_t *task, nxt_router_temp_conf_t *tmcf,
1289     nxt_port_msg_type_t type)
1290 {
1291     nxt_port_socket_write(task, tmcf->port, type, -1, tmcf->stream, 0, NULL);
1292 
1293     nxt_port_use(task, tmcf->port, -1);
1294 
1295     tmcf->port = NULL;
1296 }
1297 
1298 
1299 static nxt_conf_map_t  nxt_router_conf[] = {
1300     {
1301         nxt_string("listeners_threads"),
1302         NXT_CONF_MAP_INT32,
1303         offsetof(nxt_router_conf_t, threads),
1304     },
1305 };
1306 
1307 
1308 static nxt_conf_map_t  nxt_router_app_conf[] = {
1309     {
1310         nxt_string("type"),
1311         NXT_CONF_MAP_STR,
1312         offsetof(nxt_router_app_conf_t, type),
1313     },
1314 
1315     {
1316         nxt_string("limits"),
1317         NXT_CONF_MAP_PTR,
1318         offsetof(nxt_router_app_conf_t, limits_value),
1319     },
1320 
1321     {
1322         nxt_string("processes"),
1323         NXT_CONF_MAP_INT32,
1324         offsetof(nxt_router_app_conf_t, processes),
1325     },
1326 
1327     {
1328         nxt_string("processes"),
1329         NXT_CONF_MAP_PTR,
1330         offsetof(nxt_router_app_conf_t, processes_value),
1331     },
1332 
1333     {
1334         nxt_string("targets"),
1335         NXT_CONF_MAP_PTR,
1336         offsetof(nxt_router_app_conf_t, targets_value),
1337     },
1338 };
1339 
1340 
1341 static nxt_conf_map_t  nxt_router_app_limits_conf[] = {
1342     {
1343         nxt_string("timeout"),
1344         NXT_CONF_MAP_MSEC,
1345         offsetof(nxt_router_app_conf_t, timeout),
1346     },
1347 };
1348 
1349 
1350 static nxt_conf_map_t  nxt_router_app_processes_conf[] = {
1351     {
1352         nxt_string("spare"),
1353         NXT_CONF_MAP_INT32,
1354         offsetof(nxt_router_app_conf_t, spare_processes),
1355     },
1356 
1357     {
1358         nxt_string("max"),
1359         NXT_CONF_MAP_INT32,
1360         offsetof(nxt_router_app_conf_t, max_processes),
1361     },
1362 
1363     {
1364         nxt_string("idle_timeout"),
1365         NXT_CONF_MAP_MSEC,
1366         offsetof(nxt_router_app_conf_t, idle_timeout),
1367     },
1368 };
1369 
1370 
1371 static nxt_conf_map_t  nxt_router_listener_conf[] = {
1372     {
1373         nxt_string("pass"),
1374         NXT_CONF_MAP_STR_COPY,
1375         offsetof(nxt_router_listener_conf_t, pass),
1376     },
1377 
1378     {
1379         nxt_string("application"),
1380         NXT_CONF_MAP_STR_COPY,
1381         offsetof(nxt_router_listener_conf_t, application),
1382     },
1383 };
1384 
1385 
1386 static nxt_conf_map_t  nxt_router_http_conf[] = {
1387     {
1388         nxt_string("header_buffer_size"),
1389         NXT_CONF_MAP_SIZE,
1390         offsetof(nxt_socket_conf_t, header_buffer_size),
1391     },
1392 
1393     {
1394         nxt_string("large_header_buffer_size"),
1395         NXT_CONF_MAP_SIZE,
1396         offsetof(nxt_socket_conf_t, large_header_buffer_size),
1397     },
1398 
1399     {
1400         nxt_string("large_header_buffers"),
1401         NXT_CONF_MAP_SIZE,
1402         offsetof(nxt_socket_conf_t, large_header_buffers),
1403     },
1404 
1405     {
1406         nxt_string("body_buffer_size"),
1407         NXT_CONF_MAP_SIZE,
1408         offsetof(nxt_socket_conf_t, body_buffer_size),
1409     },
1410 
1411     {
1412         nxt_string("max_body_size"),
1413         NXT_CONF_MAP_SIZE,
1414         offsetof(nxt_socket_conf_t, max_body_size),
1415     },
1416 
1417     {
1418         nxt_string("idle_timeout"),
1419         NXT_CONF_MAP_MSEC,
1420         offsetof(nxt_socket_conf_t, idle_timeout),
1421     },
1422 
1423     {
1424         nxt_string("header_read_timeout"),
1425         NXT_CONF_MAP_MSEC,
1426         offsetof(nxt_socket_conf_t, header_read_timeout),
1427     },
1428 
1429     {
1430         nxt_string("body_read_timeout"),
1431         NXT_CONF_MAP_MSEC,
1432         offsetof(nxt_socket_conf_t, body_read_timeout),
1433     },
1434 
1435     {
1436         nxt_string("send_timeout"),
1437         NXT_CONF_MAP_MSEC,
1438         offsetof(nxt_socket_conf_t, send_timeout),
1439     },
1440 
1441     {
1442         nxt_string("body_temp_path"),
1443         NXT_CONF_MAP_STR,
1444         offsetof(nxt_socket_conf_t, body_temp_path),
1445     },
1446 
1447     {
1448         nxt_string("discard_unsafe_fields"),
1449         NXT_CONF_MAP_INT8,
1450         offsetof(nxt_socket_conf_t, discard_unsafe_fields),
1451     },
1452 };
1453 
1454 
1455 static nxt_conf_map_t  nxt_router_websocket_conf[] = {
1456     {
1457         nxt_string("max_frame_size"),
1458         NXT_CONF_MAP_SIZE,
1459         offsetof(nxt_websocket_conf_t, max_frame_size),
1460     },
1461 
1462     {
1463         nxt_string("read_timeout"),
1464         NXT_CONF_MAP_MSEC,
1465         offsetof(nxt_websocket_conf_t, read_timeout),
1466     },
1467 
1468     {
1469         nxt_string("keepalive_interval"),
1470         NXT_CONF_MAP_MSEC,
1471         offsetof(nxt_websocket_conf_t, keepalive_interval),
1472     },
1473 
1474 };
1475 
1476 
1477 static nxt_int_t
1478 nxt_router_conf_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf,
1479     u_char *start, u_char *end)
1480 {
1481     u_char                      *p;
1482     size_t                      size;
1483     nxt_mp_t                    *mp, *app_mp;
1484     uint32_t                    next, next_target;
1485     nxt_int_t                   ret;
1486     nxt_str_t                   name, path, target;
1487     nxt_app_t                   *app, *prev;
1488     nxt_str_t                   *t, *s, *targets;
1489     nxt_uint_t                  n, i;
1490     nxt_port_t                  *port;
1491     nxt_router_t                *router;
1492     nxt_app_joint_t             *app_joint;
1493 #if (NXT_TLS)
1494     nxt_tls_init_t              *tls_init;
1495     nxt_conf_value_t            *certificate;
1496 #endif
1497     nxt_conf_value_t            *conf, *http, *value, *websocket;
1498     nxt_conf_value_t            *applications, *application;
1499     nxt_conf_value_t            *listeners, *listener;
1500     nxt_conf_value_t            *routes_conf, *static_conf, *client_ip_conf;
1501     nxt_socket_conf_t           *skcf;
1502     nxt_http_routes_t           *routes;
1503     nxt_event_engine_t          *engine;
1504     nxt_app_lang_module_t       *lang;
1505     nxt_router_app_conf_t       apcf;
1506     nxt_router_access_log_t     *access_log;
1507     nxt_router_listener_conf_t  lscf;
1508 
1509     static nxt_str_t  http_path = nxt_string("/settings/http");
1510     static nxt_str_t  applications_path = nxt_string("/applications");
1511     static nxt_str_t  listeners_path = nxt_string("/listeners");
1512     static nxt_str_t  routes_path = nxt_string("/routes");
1513     static nxt_str_t  access_log_path = nxt_string("/access_log");
1514 #if (NXT_TLS)
1515     static nxt_str_t  certificate_path = nxt_string("/tls/certificate");
1516     static nxt_str_t  conf_commands_path = nxt_string("/tls/conf_commands");
1517     static nxt_str_t  conf_cache_path = nxt_string("/tls/session/cache_size");
1518     static nxt_str_t  conf_timeout_path = nxt_string("/tls/session/timeout");
1519     static nxt_str_t  conf_tickets = nxt_string("/tls/session/tickets");
1520 #endif
1521     static nxt_str_t  static_path = nxt_string("/settings/http/static");
1522     static nxt_str_t  websocket_path = nxt_string("/settings/http/websocket");
1523     static nxt_str_t  client_ip_path = nxt_string("/client_ip");
1524 
1525     conf = nxt_conf_json_parse(tmcf->mem_pool, start, end, NULL);
1526     if (conf == NULL) {
1527         nxt_alert(task, "configuration parsing error");
1528         return NXT_ERROR;
1529     }
1530 
1531     mp = tmcf->router_conf->mem_pool;
1532 
1533     ret = nxt_conf_map_object(mp, conf, nxt_router_conf,
1534                               nxt_nitems(nxt_router_conf), tmcf->router_conf);
1535     if (ret != NXT_OK) {
1536         nxt_alert(task, "root map error");
1537         return NXT_ERROR;
1538     }
1539 
1540     if (tmcf->router_conf->threads == 0) {
1541         tmcf->router_conf->threads = nxt_ncpu;
1542     }
1543 
1544     static_conf = nxt_conf_get_path(conf, &static_path);
1545 
1546     ret = nxt_router_conf_process_static(task, tmcf->router_conf, static_conf);
1547     if (nxt_slow_path(ret != NXT_OK)) {
1548         return NXT_ERROR;
1549     }
1550 
1551     router = tmcf->router_conf->router;
1552 
1553     applications = nxt_conf_get_path(conf, &applications_path);
1554 
1555     if (applications != NULL) {
1556         next = 0;
1557 
1558         for ( ;; ) {
1559             application = nxt_conf_next_object_member(applications,
1560                                                       &name, &next);
1561             if (application == NULL) {
1562                 break;
1563             }
1564 
1565             nxt_debug(task, "application \"%V\"", &name);
1566 
1567             size = nxt_conf_json_length(application, NULL);
1568 
1569             app_mp = nxt_mp_create(4096, 128, 1024, 64);
1570             if (nxt_slow_path(app_mp == NULL)) {
1571                 goto fail;
1572             }
1573 
1574             app = nxt_mp_get(app_mp, sizeof(nxt_app_t) + name.length + size);
1575             if (app == NULL) {
1576                 goto app_fail;
1577             }
1578 
1579             nxt_memzero(app, sizeof(nxt_app_t));
1580 
1581             app->mem_pool = app_mp;
1582 
1583             app->name.start = nxt_pointer_to(app, sizeof(nxt_app_t));
1584             app->conf.start = nxt_pointer_to(app, sizeof(nxt_app_t)
1585                                                   + name.length);
1586 
1587             p = nxt_conf_json_print(app->conf.start, application, NULL);
1588             app->conf.length = p - app->conf.start;
1589 
1590             nxt_assert(app->conf.length <= size);
1591 
1592             nxt_debug(task, "application conf \"%V\"", &app->conf);
1593 
1594             prev = nxt_router_app_find(&router->apps, &name);
1595 
1596             if (prev != NULL && nxt_strstr_eq(&app->conf, &prev->conf)) {
1597                 nxt_mp_destroy(app_mp);
1598 
1599                 nxt_queue_remove(&prev->link);
1600                 nxt_queue_insert_tail(&tmcf->previous, &prev->link);
1601 
1602                 ret = nxt_router_apps_hash_add(tmcf->router_conf, prev);
1603                 if (nxt_slow_path(ret != NXT_OK)) {
1604                     goto fail;
1605                 }
1606 
1607                 continue;
1608             }
1609 
1610             apcf.processes = 1;
1611             apcf.max_processes = 1;
1612             apcf.spare_processes = 0;
1613             apcf.timeout = 0;
1614             apcf.idle_timeout = 15000;
1615             apcf.limits_value = NULL;
1616             apcf.processes_value = NULL;
1617             apcf.targets_value = NULL;
1618 
1619             app_joint = nxt_malloc(sizeof(nxt_app_joint_t));
1620             if (nxt_slow_path(app_joint == NULL)) {
1621                 goto app_fail;
1622             }
1623 
1624             nxt_memzero(app_joint, sizeof(nxt_app_joint_t));
1625 
1626             ret = nxt_conf_map_object(mp, application, nxt_router_app_conf,
1627                                       nxt_nitems(nxt_router_app_conf), &apcf);
1628             if (ret != NXT_OK) {
1629                 nxt_alert(task, "application map error");
1630                 goto app_fail;
1631             }
1632 
1633             if (apcf.limits_value != NULL) {
1634 
1635                 if (nxt_conf_type(apcf.limits_value) != NXT_CONF_OBJECT) {
1636                     nxt_alert(task, "application limits is not object");
1637                     goto app_fail;
1638                 }
1639 
1640                 ret = nxt_conf_map_object(mp, apcf.limits_value,
1641                                         nxt_router_app_limits_conf,
1642                                         nxt_nitems(nxt_router_app_limits_conf),
1643                                         &apcf);
1644                 if (ret != NXT_OK) {
1645                     nxt_alert(task, "application limits map error");
1646                     goto app_fail;
1647                 }
1648             }
1649 
1650             if (apcf.processes_value != NULL
1651                 && nxt_conf_type(apcf.processes_value) == NXT_CONF_OBJECT)
1652             {
1653                 ret = nxt_conf_map_object(mp, apcf.processes_value,
1654                                      nxt_router_app_processes_conf,
1655                                      nxt_nitems(nxt_router_app_processes_conf),
1656                                      &apcf);
1657                 if (ret != NXT_OK) {
1658                     nxt_alert(task, "application processes map error");
1659                     goto app_fail;
1660                 }
1661 
1662             } else {
1663                 apcf.max_processes = apcf.processes;
1664                 apcf.spare_processes = apcf.processes;
1665             }
1666 
1667             if (apcf.targets_value != NULL) {
1668                 n = nxt_conf_object_members_count(apcf.targets_value);
1669 
1670                 targets = nxt_mp_get(app_mp, sizeof(nxt_str_t) * n);
1671                 if (nxt_slow_path(targets == NULL)) {
1672                     goto app_fail;
1673                 }
1674 
1675                 next_target = 0;
1676 
1677                 for (i = 0; i < n; i++) {
1678                     (void) nxt_conf_next_object_member(apcf.targets_value,
1679                                                        &target, &next_target);
1680 
1681                     s = nxt_str_dup(app_mp, &targets[i], &target);
1682                     if (nxt_slow_path(s == NULL)) {
1683                         goto app_fail;
1684                     }
1685                 }
1686 
1687             } else {
1688                 targets = NULL;
1689             }
1690 
1691             nxt_debug(task, "application type: %V", &apcf.type);
1692             nxt_debug(task, "application processes: %D", apcf.processes);
1693             nxt_debug(task, "application request timeout: %M", apcf.timeout);
1694 
1695             lang = nxt_app_lang_module(task->thread->runtime, &apcf.type);
1696 
1697             if (lang == NULL) {
1698                 nxt_alert(task, "unknown application type: \"%V\"", &apcf.type);
1699                 goto app_fail;
1700             }
1701 
1702             nxt_debug(task, "application language module: \"%s\"", lang->file);
1703 
1704             ret = nxt_thread_mutex_create(&app->mutex);
1705             if (ret != NXT_OK) {
1706                 goto app_fail;
1707             }
1708 
1709             nxt_queue_init(&app->ports);
1710             nxt_queue_init(&app->spare_ports);
1711             nxt_queue_init(&app->idle_ports);
1712             nxt_queue_init(&app->ack_waiting_req);
1713 
1714             app->name.length = name.length;
1715             nxt_memcpy(app->name.start, name.start, name.length);
1716 
1717             app->type = lang->type;
1718             app->max_processes = apcf.max_processes;
1719             app->spare_processes = apcf.spare_processes;
1720             app->max_pending_processes = apcf.spare_processes
1721                                          ? apcf.spare_processes : 1;
1722             app->timeout = apcf.timeout;
1723             app->idle_timeout = apcf.idle_timeout;
1724 
1725             app->targets = targets;
1726 
1727             engine = task->thread->engine;
1728 
1729             app->engine = engine;
1730 
1731             app->adjust_idle_work.handler = nxt_router_adjust_idle_timer;
1732             app->adjust_idle_work.task = &engine->task;
1733             app->adjust_idle_work.obj = app;
1734 
1735             nxt_queue_insert_tail(&tmcf->apps, &app->link);
1736 
1737             ret = nxt_router_apps_hash_add(tmcf->router_conf, app);
1738             if (nxt_slow_path(ret != NXT_OK)) {
1739                 goto app_fail;
1740             }
1741 
1742             nxt_router_app_use(task, app, 1);
1743 
1744             app->joint = app_joint;
1745 
1746             app_joint->use_count = 1;
1747             app_joint->app = app;
1748 
1749             app_joint->idle_timer.bias = NXT_TIMER_DEFAULT_BIAS;
1750             app_joint->idle_timer.work_queue = &engine->fast_work_queue;
1751             app_joint->idle_timer.handler = nxt_router_app_idle_timeout;
1752             app_joint->idle_timer.task = &engine->task;
1753             app_joint->idle_timer.log = app_joint->idle_timer.task->log;
1754 
1755             app_joint->free_app_work.handler = nxt_router_free_app;
1756             app_joint->free_app_work.task = &engine->task;
1757             app_joint->free_app_work.obj = app_joint;
1758 
1759             port = nxt_port_new(task, NXT_SHARED_PORT_ID, nxt_pid,
1760                                 NXT_PROCESS_APP);
1761             if (nxt_slow_path(port == NULL)) {
1762                 return NXT_ERROR;
1763             }
1764 
1765             ret = nxt_port_socket_init(task, port, 0);
1766             if (nxt_slow_path(ret != NXT_OK)) {
1767                 nxt_port_use(task, port, -1);
1768                 return NXT_ERROR;
1769             }
1770 
1771             ret = nxt_router_app_queue_init(task, port);
1772             if (nxt_slow_path(ret != NXT_OK)) {
1773                 nxt_port_write_close(port);
1774                 nxt_port_read_close(port);
1775                 nxt_port_use(task, port, -1);
1776                 return NXT_ERROR;
1777             }
1778 
1779             nxt_port_write_enable(task, port);
1780             port->app = app;
1781 
1782             app->shared_port = port;
1783 
1784             nxt_thread_mutex_create(&app->outgoing.mutex);
1785         }
1786     }
1787 
1788     routes_conf = nxt_conf_get_path(conf, &routes_path);
1789     if (nxt_fast_path(routes_conf != NULL)) {
1790         routes = nxt_http_routes_create(task, tmcf, routes_conf);
1791         if (nxt_slow_path(routes == NULL)) {
1792             return NXT_ERROR;
1793         }
1794         tmcf->router_conf->routes = routes;
1795     }
1796 
1797     ret = nxt_upstreams_create(task, tmcf, conf);
1798     if (nxt_slow_path(ret != NXT_OK)) {
1799         return ret;
1800     }
1801 
1802     http = nxt_conf_get_path(conf, &http_path);
1803 #if 0
1804     if (http == NULL) {
1805         nxt_alert(task, "no \"http\" block");
1806         return NXT_ERROR;
1807     }
1808 #endif
1809 
1810     websocket = nxt_conf_get_path(conf, &websocket_path);
1811 
1812     listeners = nxt_conf_get_path(conf, &listeners_path);
1813 
1814     if (listeners != NULL) {
1815         next = 0;
1816 
1817         for ( ;; ) {
1818             listener = nxt_conf_next_object_member(listeners, &name, &next);
1819             if (listener == NULL) {
1820                 break;
1821             }
1822 
1823             skcf = nxt_router_socket_conf(task, tmcf, &name);
1824             if (skcf == NULL) {
1825                 goto fail;
1826             }
1827 
1828             nxt_memzero(&lscf, sizeof(lscf));
1829 
1830             ret = nxt_conf_map_object(mp, listener, nxt_router_listener_conf,
1831                                       nxt_nitems(nxt_router_listener_conf),
1832                                       &lscf);
1833             if (ret != NXT_OK) {
1834                 nxt_alert(task, "listener map error");
1835                 goto fail;
1836             }
1837 
1838             nxt_debug(task, "application: %V", &lscf.application);
1839 
1840             // STUB, default values if http block is not defined.
1841             skcf->header_buffer_size = 2048;
1842             skcf->large_header_buffer_size = 8192;
1843             skcf->large_header_buffers = 4;
1844             skcf->discard_unsafe_fields = 1;
1845             skcf->body_buffer_size = 16 * 1024;
1846             skcf->max_body_size = 8 * 1024 * 1024;
1847             skcf->proxy_header_buffer_size = 64 * 1024;
1848             skcf->proxy_buffer_size = 4096;
1849             skcf->proxy_buffers = 256;
1850             skcf->idle_timeout = 180 * 1000;
1851             skcf->header_read_timeout = 30 * 1000;
1852             skcf->body_read_timeout = 30 * 1000;
1853             skcf->send_timeout = 30 * 1000;
1854             skcf->proxy_timeout = 60 * 1000;
1855             skcf->proxy_send_timeout = 30 * 1000;
1856             skcf->proxy_read_timeout = 30 * 1000;
1857 
1858             skcf->websocket_conf.max_frame_size = 1024 * 1024;
1859             skcf->websocket_conf.read_timeout = 60 * 1000;
1860             skcf->websocket_conf.keepalive_interval = 30 * 1000;
1861 
1862             nxt_str_null(&skcf->body_temp_path);
1863 
1864             if (http != NULL) {
1865                 ret = nxt_conf_map_object(mp, http, nxt_router_http_conf,
1866                                           nxt_nitems(nxt_router_http_conf),
1867                                           skcf);
1868                 if (ret != NXT_OK) {
1869                     nxt_alert(task, "http map error");
1870                     goto fail;
1871                 }
1872             }
1873 
1874             if (websocket != NULL) {
1875                 ret = nxt_conf_map_object(mp, websocket,
1876                                           nxt_router_websocket_conf,
1877                                           nxt_nitems(nxt_router_websocket_conf),
1878                                           &skcf->websocket_conf);
1879                 if (ret != NXT_OK) {
1880                     nxt_alert(task, "websocket map error");
1881                     goto fail;
1882                 }
1883             }
1884 
1885             t = &skcf->body_temp_path;
1886 
1887             if (t->length == 0) {
1888                 t->start = (u_char *) task->thread->runtime->tmp;
1889                 t->length = nxt_strlen(t->start);
1890             }
1891 
1892             client_ip_conf = nxt_conf_get_path(listener, &client_ip_path);
1893             ret = nxt_router_conf_process_client_ip(task, tmcf, skcf,
1894                                                     client_ip_conf);
1895             if (nxt_slow_path(ret != NXT_OK)) {
1896                 return NXT_ERROR;
1897             }
1898 
1899 #if (NXT_TLS)
1900             certificate = nxt_conf_get_path(listener, &certificate_path);
1901 
1902             if (certificate != NULL) {
1903                 tls_init = nxt_mp_get(tmcf->mem_pool, sizeof(nxt_tls_init_t));
1904                 if (nxt_slow_path(tls_init == NULL)) {
1905                     return NXT_ERROR;
1906                 }
1907 
1908                 tls_init->cache_size = 0;
1909                 tls_init->timeout = 300;
1910 
1911                 value = nxt_conf_get_path(listener, &conf_cache_path);
1912                 if (value != NULL) {
1913                     tls_init->cache_size = nxt_conf_get_number(value);
1914                 }
1915 
1916                 value = nxt_conf_get_path(listener, &conf_timeout_path);
1917                 if (value != NULL) {
1918                     tls_init->timeout = nxt_conf_get_number(value);
1919                 }
1920 
1921                 tls_init->conf_cmds = nxt_conf_get_path(listener,
1922                                                         &conf_commands_path);
1923 
1924                 tls_init->tickets_conf = nxt_conf_get_path(listener,
1925                                                            &conf_tickets);
1926 
1927                 if (nxt_conf_type(certificate) == NXT_CONF_ARRAY) {
1928                     n = nxt_conf_array_elements_count(certificate);
1929 
1930                     for (i = 0; i < n; i++) {
1931                         value = nxt_conf_get_array_element(certificate, i);
1932 
1933                         nxt_assert(value != NULL);
1934 
1935                         ret = nxt_router_conf_tls_insert(tmcf, value, skcf,
1936                                                          tls_init, i == 0);
1937                         if (nxt_slow_path(ret != NXT_OK)) {
1938                             goto fail;
1939                         }
1940                     }
1941 
1942                 } else {
1943                     /* NXT_CONF_STRING */
1944                     ret = nxt_router_conf_tls_insert(tmcf, certificate, skcf,
1945                                                      tls_init, 1);
1946                     if (nxt_slow_path(ret != NXT_OK)) {
1947                         goto fail;
1948                     }
1949                 }
1950             }
1951 #endif
1952 
1953             skcf->listen->handler = nxt_http_conn_init;
1954             skcf->router_conf = tmcf->router_conf;
1955             skcf->router_conf->count++;
1956 
1957             if (lscf.pass.length != 0) {
1958                 skcf->action = nxt_http_action_create(task, tmcf, &lscf.pass);
1959 
1960             /* COMPATIBILITY: listener application. */
1961             } else if (lscf.application.length > 0) {
1962                 skcf->action = nxt_http_pass_application(task,
1963                                                          tmcf->router_conf,
1964                                                          &lscf.application);
1965             }
1966 
1967             if (nxt_slow_path(skcf->action == NULL)) {
1968                 goto fail;
1969             }
1970         }
1971     }
1972 
1973     ret = nxt_http_routes_resolve(task, tmcf);
1974     if (nxt_slow_path(ret != NXT_OK)) {
1975         goto fail;
1976     }
1977 
1978     value = nxt_conf_get_path(conf, &access_log_path);
1979 
1980     if (value != NULL) {
1981         nxt_conf_get_string(value, &path);
1982 
1983         access_log = router->access_log;
1984 
1985         if (access_log != NULL && nxt_strstr_eq(&path, &access_log->path)) {
1986             nxt_router_access_log_use(&router->lock, access_log);
1987 
1988         } else {
1989             access_log = nxt_malloc(sizeof(nxt_router_access_log_t)
1990                                     + path.length);
1991             if (access_log == NULL) {
1992                 nxt_alert(task, "failed to allocate access log structure");
1993                 goto fail;
1994             }
1995 
1996             access_log->fd = -1;
1997             access_log->handler = &nxt_router_access_log_writer;
1998             access_log->count = 1;
1999 
2000             access_log->path.length = path.length;
2001             access_log->path.start = (u_char *) access_log
2002                                      + sizeof(nxt_router_access_log_t);
2003 
2004             nxt_memcpy(access_log->path.start, path.start, path.length);
2005         }
2006 
2007         tmcf->router_conf->access_log = access_log;
2008     }
2009 
2010     nxt_queue_add(&deleting_sockets, &router->sockets);
2011     nxt_queue_init(&router->sockets);
2012 
2013     return NXT_OK;
2014 
2015 app_fail:
2016 
2017     nxt_mp_destroy(app_mp);
2018 
2019 fail:
2020 
2021     nxt_queue_each(app, &tmcf->apps, nxt_app_t, link) {
2022 
2023         nxt_queue_remove(&app->link);
2024         nxt_thread_mutex_destroy(&app->mutex);
2025         nxt_mp_destroy(app->mem_pool);
2026 
2027     } nxt_queue_loop;
2028 
2029     return NXT_ERROR;
2030 }
2031 
2032 
2033 #if (NXT_TLS)
2034 
2035 static nxt_int_t
2036 nxt_router_conf_tls_insert(nxt_router_temp_conf_t *tmcf,
2037     nxt_conf_value_t *value, nxt_socket_conf_t *skcf,
2038     nxt_tls_init_t *tls_init, nxt_bool_t last)
2039 {
2040     nxt_router_tlssock_t  *tls;
2041 
2042     tls = nxt_mp_get(tmcf->mem_pool, sizeof(nxt_router_tlssock_t));
2043     if (nxt_slow_path(tls == NULL)) {
2044         return NXT_ERROR;
2045     }
2046 
2047     tls->tls_init = tls_init;
2048     tls->socket_conf = skcf;
2049     tls->temp_conf = tmcf;
2050     tls->last = last;
2051     nxt_conf_get_string(value, &tls->name);
2052 
2053     nxt_queue_insert_tail(&tmcf->tls, &tls->link);
2054 
2055     return NXT_OK;
2056 }
2057 
2058 #endif
2059 
2060 
2061 static nxt_int_t
2062 nxt_router_conf_process_static(nxt_task_t *task, nxt_router_conf_t *rtcf,
2063     nxt_conf_value_t *conf)
2064 {
2065     uint32_t          next, i;
2066     nxt_mp_t          *mp;
2067     nxt_str_t         *type, exten, str;
2068     nxt_int_t         ret;
2069     nxt_uint_t        exts;
2070     nxt_conf_value_t  *mtypes_conf, *ext_conf, *value;
2071 
2072     static nxt_str_t  mtypes_path = nxt_string("/mime_types");
2073 
2074     mp = rtcf->mem_pool;
2075 
2076     ret = nxt_http_static_mtypes_init(mp, &rtcf->mtypes_hash);
2077     if (nxt_slow_path(ret != NXT_OK)) {
2078         return NXT_ERROR;
2079     }
2080 
2081     if (conf == NULL) {
2082         return NXT_OK;
2083     }
2084 
2085     mtypes_conf = nxt_conf_get_path(conf, &mtypes_path);
2086 
2087     if (mtypes_conf != NULL) {
2088         next = 0;
2089 
2090         for ( ;; ) {
2091             ext_conf = nxt_conf_next_object_member(mtypes_conf, &str, &next);
2092 
2093             if (ext_conf == NULL) {
2094                 break;
2095             }
2096 
2097             type = nxt_str_dup(mp, NULL, &str);
2098             if (nxt_slow_path(type == NULL)) {
2099                 return NXT_ERROR;
2100             }
2101 
2102             if (nxt_conf_type(ext_conf) == NXT_CONF_STRING) {
2103                 nxt_conf_get_string(ext_conf, &str);
2104 
2105                 if (nxt_slow_path(nxt_str_dup(mp, &exten, &str) == NULL)) {
2106                     return NXT_ERROR;
2107                 }
2108 
2109                 ret = nxt_http_static_mtypes_hash_add(mp, &rtcf->mtypes_hash,
2110                                                       &exten, type);
2111                 if (nxt_slow_path(ret != NXT_OK)) {
2112                     return NXT_ERROR;
2113                 }
2114 
2115                 continue;
2116             }
2117 
2118             exts = nxt_conf_array_elements_count(ext_conf);
2119 
2120             for (i = 0; i < exts; i++) {
2121                 value = nxt_conf_get_array_element(ext_conf, i);
2122 
2123                 nxt_conf_get_string(value, &str);
2124 
2125                 if (nxt_slow_path(nxt_str_dup(mp, &exten, &str) == NULL)) {
2126                     return NXT_ERROR;
2127                 }
2128 
2129                 ret = nxt_http_static_mtypes_hash_add(mp, &rtcf->mtypes_hash,
2130                                                       &exten, type);
2131                 if (nxt_slow_path(ret != NXT_OK)) {
2132                     return NXT_ERROR;
2133                 }
2134             }
2135         }
2136     }
2137 
2138     return NXT_OK;
2139 }
2140 
2141 
2142 static nxt_int_t
2143 nxt_router_conf_process_client_ip(nxt_task_t *task, nxt_router_temp_conf_t *tmcf,
2144     nxt_socket_conf_t *skcf, nxt_conf_value_t *conf)
2145 {
2146     char                        c;
2147     size_t                      i;
2148     nxt_mp_t                    *mp;
2149     uint32_t                    hash;
2150     nxt_str_t                   header;
2151     nxt_conf_value_t            *source_conf, *header_conf, *recursive_conf;
2152     nxt_http_client_ip_t        *client_ip;
2153     nxt_http_route_addr_rule_t  *source;
2154 
2155     static nxt_str_t  header_path = nxt_string("/header");
2156     static nxt_str_t  source_path = nxt_string("/source");
2157     static nxt_str_t  recursive_path = nxt_string("/recursive");
2158 
2159     if (conf == NULL) {
2160         skcf->client_ip = NULL;
2161 
2162         return NXT_OK;
2163     }
2164 
2165     mp = tmcf->router_conf->mem_pool;
2166 
2167     source_conf = nxt_conf_get_path(conf, &source_path);
2168     header_conf = nxt_conf_get_path(conf, &header_path);
2169     recursive_conf = nxt_conf_get_path(conf, &recursive_path);
2170 
2171     if (source_conf == NULL || header_conf == NULL) {
2172         return NXT_ERROR;
2173     }
2174 
2175     client_ip = nxt_mp_zget(mp, sizeof(nxt_http_client_ip_t));
2176     if (nxt_slow_path(client_ip == NULL)) {
2177         return NXT_ERROR;
2178     }
2179 
2180     source = nxt_http_route_addr_rule_create(task, mp, source_conf);
2181     if (nxt_slow_path(source == NULL)) {
2182         return NXT_ERROR;
2183     }
2184 
2185     client_ip->source = source;
2186 
2187     nxt_conf_get_string(header_conf, &header);
2188 
2189     if (recursive_conf != NULL) {
2190         client_ip->recursive = nxt_conf_get_boolean(recursive_conf);
2191     }
2192 
2193     client_ip->header = nxt_str_dup(mp, NULL, &header);
2194     if (nxt_slow_path(client_ip->header == NULL)) {
2195         return NXT_ERROR;
2196     }
2197 
2198     hash = NXT_HTTP_FIELD_HASH_INIT;
2199 
2200     for (i = 0; i < client_ip->header->length; i++) {
2201         c = client_ip->header->start[i];
2202         hash = nxt_http_field_hash_char(hash, nxt_lowcase(c));
2203     }
2204 
2205     hash = nxt_http_field_hash_end(hash) & 0xFFFF;
2206 
2207     client_ip->header_hash = hash;
2208 
2209     skcf->client_ip = client_ip;
2210 
2211     return NXT_OK;
2212 }
2213 
2214 
2215 static nxt_app_t *
2216 nxt_router_app_find(nxt_queue_t *queue, nxt_str_t *name)
2217 {
2218     nxt_app_t  *app;
2219 
2220     nxt_queue_each(app, queue, nxt_app_t, link) {
2221 
2222         if (nxt_strstr_eq(name, &app->name)) {
2223             return app;
2224         }
2225 
2226     } nxt_queue_loop;
2227 
2228     return NULL;
2229 }
2230 
2231 
2232 static nxt_int_t
2233 nxt_router_app_queue_init(nxt_task_t *task, nxt_port_t *port)
2234 {
2235     void       *mem;
2236     nxt_int_t  fd;
2237 
2238     fd = nxt_shm_open(task, sizeof(nxt_app_queue_t));
2239     if (nxt_slow_path(fd == -1)) {
2240         return NXT_ERROR;
2241     }
2242 
2243     mem = nxt_mem_mmap(NULL, sizeof(nxt_app_queue_t),
2244                        PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
2245     if (nxt_slow_path(mem == MAP_FAILED)) {
2246         nxt_fd_close(fd);
2247 
2248         return NXT_ERROR;
2249     }
2250 
2251     nxt_app_queue_init(mem);
2252 
2253     port->queue_fd = fd;
2254     port->queue = mem;
2255 
2256     return NXT_OK;
2257 }
2258 
2259 
2260 static nxt_int_t
2261 nxt_router_port_queue_init(nxt_task_t *task, nxt_port_t *port)
2262 {
2263     void       *mem;
2264     nxt_int_t  fd;
2265 
2266     fd = nxt_shm_open(task, sizeof(nxt_port_queue_t));
2267     if (nxt_slow_path(fd == -1)) {
2268         return NXT_ERROR;
2269     }
2270 
2271     mem = nxt_mem_mmap(NULL, sizeof(nxt_port_queue_t),
2272                        PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
2273     if (nxt_slow_path(mem == MAP_FAILED)) {
2274         nxt_fd_close(fd);
2275 
2276         return NXT_ERROR;
2277     }
2278 
2279     nxt_port_queue_init(mem);
2280 
2281     port->queue_fd = fd;
2282     port->queue = mem;
2283 
2284     return NXT_OK;
2285 }
2286 
2287 
2288 static nxt_int_t
2289 nxt_router_port_queue_map(nxt_task_t *task, nxt_port_t *port, nxt_fd_t fd)
2290 {
2291     void  *mem;
2292 
2293     nxt_assert(fd != -1);
2294 
2295     mem = nxt_mem_mmap(NULL, sizeof(nxt_port_queue_t),
2296                        PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
2297     if (nxt_slow_path(mem == MAP_FAILED)) {
2298 
2299         return NXT_ERROR;
2300     }
2301 
2302     port->queue = mem;
2303 
2304     return NXT_OK;
2305 }
2306 
2307 
2308 static const nxt_lvlhsh_proto_t  nxt_router_apps_hash_proto  nxt_aligned(64) = {
2309     NXT_LVLHSH_DEFAULT,
2310     nxt_router_apps_hash_test,
2311     nxt_mp_lvlhsh_alloc,
2312     nxt_mp_lvlhsh_free,
2313 };
2314 
2315 
2316 static nxt_int_t
2317 nxt_router_apps_hash_test(nxt_lvlhsh_query_t *lhq, void *data)
2318 {
2319     nxt_app_t  *app;
2320 
2321     app = data;
2322 
2323     return nxt_strstr_eq(&lhq->key, &app->name) ? NXT_OK : NXT_DECLINED;
2324 }
2325 
2326 
2327 static nxt_int_t
2328 nxt_router_apps_hash_add(nxt_router_conf_t *rtcf, nxt_app_t *app)
2329 {
2330     nxt_lvlhsh_query_t  lhq;
2331 
2332     lhq.key_hash = nxt_djb_hash(app->name.start, app->name.length);
2333     lhq.replace = 0;
2334     lhq.key = app->name;
2335     lhq.value = app;
2336     lhq.proto = &nxt_router_apps_hash_proto;
2337     lhq.pool = rtcf->mem_pool;
2338 
2339     switch (nxt_lvlhsh_insert(&rtcf->apps_hash, &lhq)) {
2340 
2341     case NXT_OK:
2342         return NXT_OK;
2343 
2344     case NXT_DECLINED:
2345         nxt_thread_log_alert("router app hash adding failed: "
2346                              "\"%V\" is already in hash", &lhq.key);
2347         /* Fall through. */
2348     default:
2349         return NXT_ERROR;
2350     }
2351 }
2352 
2353 
2354 static nxt_app_t *
2355 nxt_router_apps_hash_get(nxt_router_conf_t *rtcf, nxt_str_t *name)
2356 {
2357     nxt_lvlhsh_query_t  lhq;
2358 
2359     lhq.key_hash = nxt_djb_hash(name->start, name->length);
2360     lhq.key = *name;
2361     lhq.proto = &nxt_router_apps_hash_proto;
2362 
2363     if (nxt_lvlhsh_find(&rtcf->apps_hash, &lhq) != NXT_OK) {
2364         return NULL;
2365     }
2366 
2367     return lhq.value;
2368 }
2369 
2370 
2371 static void
2372 nxt_router_apps_hash_use(nxt_task_t *task, nxt_router_conf_t *rtcf, int i)
2373 {
2374     nxt_app_t          *app;
2375     nxt_lvlhsh_each_t  lhe;
2376 
2377     nxt_lvlhsh_each_init(&lhe, &nxt_router_apps_hash_proto);
2378 
2379     for ( ;; ) {
2380         app = nxt_lvlhsh_each(&rtcf->apps_hash, &lhe);
2381 
2382         if (app == NULL) {
2383             break;
2384         }
2385 
2386         nxt_router_app_use(task, app, i);
2387     }
2388 }
2389 
2390 
2391 typedef struct {
2392     nxt_app_t  *app;
2393     nxt_int_t  target;
2394 } nxt_http_app_conf_t;
2395 
2396 
2397 nxt_int_t
2398 nxt_router_application_init(nxt_router_conf_t *rtcf, nxt_str_t *name,
2399     nxt_str_t *target, nxt_http_action_t *action)
2400 {
2401     nxt_app_t            *app;
2402     nxt_str_t            *targets;
2403     nxt_uint_t           i;
2404     nxt_http_app_conf_t  *conf;
2405 
2406     app = nxt_router_apps_hash_get(rtcf, name);
2407     if (app == NULL) {
2408         return NXT_DECLINED;
2409     }
2410 
2411     conf = nxt_mp_get(rtcf->mem_pool, sizeof(nxt_http_app_conf_t));
2412     if (nxt_slow_path(conf == NULL)) {
2413         return NXT_ERROR;
2414     }
2415 
2416     action->handler = nxt_http_application_handler;
2417     action->u.conf = conf;
2418 
2419     conf->app = app;
2420 
2421     if (target != NULL && target->length != 0) {
2422         targets = app->targets;
2423 
2424         for (i = 0; !nxt_strstr_eq(target, &targets[i]); i++);
2425 
2426         conf->target = i;
2427 
2428     } else {
2429         conf->target = 0;
2430     }
2431 
2432     return NXT_OK;
2433 }
2434 
2435 
2436 static nxt_socket_conf_t *
2437 nxt_router_socket_conf(nxt_task_t *task, nxt_router_temp_conf_t *tmcf,
2438     nxt_str_t *name)
2439 {
2440     size_t               size;
2441     nxt_int_t            ret;
2442     nxt_bool_t           wildcard;
2443     nxt_sockaddr_t       *sa;
2444     nxt_socket_conf_t    *skcf;
2445     nxt_listen_socket_t  *ls;
2446 
2447     sa = nxt_sockaddr_parse(tmcf->mem_pool, name);
2448     if (nxt_slow_path(sa == NULL)) {
2449         nxt_alert(task, "invalid listener \"%V\"", name);
2450         return NULL;
2451     }
2452 
2453     sa->type = SOCK_STREAM;
2454 
2455     nxt_debug(task, "router listener: \"%*s\"",
2456               (size_t) sa->length, nxt_sockaddr_start(sa));
2457 
2458     skcf = nxt_mp_zget(tmcf->router_conf->mem_pool, sizeof(nxt_socket_conf_t));
2459     if (nxt_slow_path(skcf == NULL)) {
2460         return NULL;
2461     }
2462 
2463     size = nxt_sockaddr_size(sa);
2464 
2465     ret = nxt_router_listen_socket_find(tmcf, skcf, sa);
2466 
2467     if (ret != NXT_OK) {
2468 
2469         ls = nxt_zalloc(sizeof(nxt_listen_socket_t) + size);
2470         if (nxt_slow_path(ls == NULL)) {
2471             return NULL;
2472         }
2473 
2474         skcf->listen = ls;
2475 
2476         ls->sockaddr = nxt_pointer_to(ls, sizeof(nxt_listen_socket_t));
2477         nxt_memcpy(ls->sockaddr, sa, size);
2478 
2479         nxt_listen_socket_remote_size(ls);
2480 
2481         ls->socket = -1;
2482         ls->backlog = NXT_LISTEN_BACKLOG;
2483         ls->flags = NXT_NONBLOCK;
2484         ls->read_after_accept = 1;
2485     }
2486 
2487     switch (sa->u.sockaddr.sa_family) {
2488 #if (NXT_HAVE_UNIX_DOMAIN)
2489     case AF_UNIX:
2490         wildcard = 0;
2491         break;
2492 #endif
2493 #if (NXT_INET6)
2494     case AF_INET6:
2495         wildcard = IN6_IS_ADDR_UNSPECIFIED(&sa->u.sockaddr_in6.sin6_addr);
2496         break;
2497 #endif
2498     case AF_INET:
2499     default:
2500         wildcard = (sa->u.sockaddr_in.sin_addr.s_addr == INADDR_ANY);
2501         break;
2502     }
2503 
2504     if (!wildcard) {
2505         skcf->sockaddr = nxt_mp_zget(tmcf->router_conf->mem_pool, size);
2506         if (nxt_slow_path(skcf->sockaddr == NULL)) {
2507             return NULL;
2508         }
2509 
2510         nxt_memcpy(skcf->sockaddr, sa, size);
2511     }
2512 
2513     return skcf;
2514 }
2515 
2516 
2517 static nxt_int_t
2518 nxt_router_listen_socket_find(nxt_router_temp_conf_t *tmcf,
2519     nxt_socket_conf_t *nskcf, nxt_sockaddr_t *sa)
2520 {
2521     nxt_router_t       *router;
2522     nxt_queue_link_t   *qlk;
2523     nxt_socket_conf_t  *skcf;
2524 
2525     router = tmcf->router_conf->router;
2526 
2527     for (qlk = nxt_queue_first(&router->sockets);
2528          qlk != nxt_queue_tail(&router->sockets);
2529          qlk = nxt_queue_next(qlk))
2530     {
2531         skcf = nxt_queue_link_data(qlk, nxt_socket_conf_t, link);
2532 
2533         if (nxt_sockaddr_cmp(skcf->listen->sockaddr, sa)) {
2534             nskcf->listen = skcf->listen;
2535 
2536             nxt_queue_remove(qlk);
2537             nxt_queue_insert_tail(&keeping_sockets, qlk);
2538 
2539             nxt_queue_insert_tail(&updating_sockets, &nskcf->link);
2540 
2541             return NXT_OK;
2542         }
2543     }
2544 
2545     nxt_queue_insert_tail(&pending_sockets, &nskcf->link);
2546 
2547     return NXT_DECLINED;
2548 }
2549 
2550 
2551 static void
2552 nxt_router_listen_socket_rpc_create(nxt_task_t *task,
2553     nxt_router_temp_conf_t *tmcf, nxt_socket_conf_t *skcf)
2554 {
2555     size_t            size;
2556     uint32_t          stream;
2557     nxt_int_t         ret;
2558     nxt_buf_t         *b;
2559     nxt_port_t        *main_port, *router_port;
2560     nxt_runtime_t     *rt;
2561     nxt_socket_rpc_t  *rpc;
2562 
2563     rpc = nxt_mp_alloc(tmcf->mem_pool, sizeof(nxt_socket_rpc_t));
2564     if (rpc == NULL) {
2565         goto fail;
2566     }
2567 
2568     rpc->socket_conf = skcf;
2569     rpc->temp_conf = tmcf;
2570 
2571     size = nxt_sockaddr_size(skcf->listen->sockaddr);
2572 
2573     b = nxt_buf_mem_alloc(tmcf->mem_pool, size, 0);
2574     if (b == NULL) {
2575         goto fail;
2576     }
2577 
2578     b->completion_handler = nxt_buf_dummy_completion;
2579 
2580     b->mem.free = nxt_cpymem(b->mem.free, skcf->listen->sockaddr, size);
2581 
2582     rt = task->thread->runtime;
2583     main_port = rt->port_by_type[NXT_PROCESS_MAIN];
2584     router_port = rt->port_by_type[NXT_PROCESS_ROUTER];
2585 
2586     stream = nxt_port_rpc_register_handler(task, router_port,
2587                                            nxt_router_listen_socket_ready,
2588                                            nxt_router_listen_socket_error,
2589                                            main_port->pid, rpc);
2590     if (nxt_slow_path(stream == 0)) {
2591         goto fail;
2592     }
2593 
2594     ret = nxt_port_socket_write(task, main_port, NXT_PORT_MSG_SOCKET, -1,
2595                                 stream, router_port->id, b);
2596 
2597     if (nxt_slow_path(ret != NXT_OK)) {
2598         nxt_port_rpc_cancel(task, router_port, stream);
2599         goto fail;
2600     }
2601 
2602     return;
2603 
2604 fail:
2605 
2606     nxt_router_conf_error(task, tmcf);
2607 }
2608 
2609 
2610 static void
2611 nxt_router_listen_socket_ready(nxt_task_t *task, nxt_port_recv_msg_t *msg,
2612     void *data)
2613 {
2614     nxt_int_t         ret;
2615     nxt_socket_t      s;
2616     nxt_socket_rpc_t  *rpc;
2617 
2618     rpc = data;
2619 
2620     s = msg->fd[0];
2621 
2622     ret = nxt_socket_nonblocking(task, s);
2623     if (nxt_slow_path(ret != NXT_OK)) {
2624         goto fail;
2625     }
2626 
2627     nxt_socket_defer_accept(task, s, rpc->socket_conf->listen->sockaddr);
2628 
2629     ret = nxt_listen_socket(task, s, NXT_LISTEN_BACKLOG);
2630     if (nxt_slow_path(ret != NXT_OK)) {
2631         goto fail;
2632     }
2633 
2634     rpc->socket_conf->listen->socket = s;
2635 
2636     nxt_work_queue_add(&task->thread->engine->fast_work_queue,
2637                        nxt_router_conf_apply, task, rpc->temp_conf, NULL);
2638 
2639     return;
2640 
2641 fail:
2642 
2643     nxt_socket_close(task, s);
2644 
2645     nxt_router_conf_error(task, rpc->temp_conf);
2646 }
2647 
2648 
2649 static void
2650 nxt_router_listen_socket_error(nxt_task_t *task, nxt_port_recv_msg_t *msg,
2651     void *data)
2652 {
2653     nxt_socket_rpc_t        *rpc;
2654     nxt_router_temp_conf_t  *tmcf;
2655 
2656     rpc = data;
2657     tmcf = rpc->temp_conf;
2658 
2659 #if 0
2660     u_char                  *p;
2661     size_t                  size;
2662     uint8_t                 error;
2663     nxt_buf_t               *in, *out;
2664     nxt_sockaddr_t          *sa;
2665 
2666     static nxt_str_t  socket_errors[] = {
2667         nxt_string("ListenerSystem"),
2668         nxt_string("ListenerNoIPv6"),
2669         nxt_string("ListenerPort"),
2670         nxt_string("ListenerInUse"),
2671         nxt_string("ListenerNoAddress"),
2672         nxt_string("ListenerNoAccess"),
2673         nxt_string("ListenerPath"),
2674     };
2675 
2676     sa = rpc->socket_conf->listen->sockaddr;
2677 
2678     in = nxt_buf_chk_make_plain(tmcf->mem_pool, msg->buf, msg->size);
2679 
2680     if (nxt_slow_path(in == NULL)) {
2681         return;
2682     }
2683 
2684     p = in->mem.pos;
2685 
2686     error = *p++;
2687 
2688     size = nxt_length("listen socket error: ")
2689            + nxt_length("{listener: \"\", code:\"\", message: \"\"}")
2690            + sa->length + socket_errors[error].length + (in->mem.free - p);
2691 
2692     out = nxt_buf_mem_alloc(tmcf->mem_pool, size, 0);
2693     if (nxt_slow_path(out == NULL)) {
2694         return;
2695     }
2696 
2697     out->mem.free = nxt_sprintf(out->mem.free, out->mem.end,
2698                         "listen socket error: "
2699                         "{listener: \"%*s\", code:\"%V\", message: \"%*s\"}",
2700                         (size_t) sa->length, nxt_sockaddr_start(sa),
2701                         &socket_errors[error], in->mem.free - p, p);
2702 
2703     nxt_debug(task, "%*s", out->mem.free - out->mem.pos, out->mem.pos);
2704 #endif
2705 
2706     nxt_router_conf_error(task, tmcf);
2707 }
2708 
2709 
2710 #if (NXT_TLS)
2711 
2712 static void
2713 nxt_router_tls_rpc_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg,
2714     void *data)
2715 {
2716     nxt_mp_t                *mp;
2717     nxt_int_t               ret;
2718     nxt_tls_conf_t          *tlscf;
2719     nxt_router_tlssock_t    *tls;
2720     nxt_tls_bundle_conf_t   *bundle;
2721     nxt_router_temp_conf_t  *tmcf;
2722 
2723     nxt_debug(task, "tls rpc handler");
2724 
2725     tls = data;
2726     tmcf = tls->temp_conf;
2727 
2728     if (msg == NULL || msg->port_msg.type == _NXT_PORT_MSG_RPC_ERROR) {
2729         goto fail;
2730     }
2731 
2732     mp = tmcf->router_conf->mem_pool;
2733 
2734     if (tls->socket_conf->tls == NULL){
2735         tlscf = nxt_mp_zget(mp, sizeof(nxt_tls_conf_t));
2736         if (nxt_slow_path(tlscf == NULL)) {
2737             goto fail;
2738         }
2739 
2740         tlscf->no_wait_shutdown = 1;
2741         tls->socket_conf->tls = tlscf;
2742 
2743     } else {
2744         tlscf = tls->socket_conf->tls;
2745     }
2746 
2747     tls->tls_init->conf = tlscf;
2748 
2749     bundle = nxt_mp_get(mp, sizeof(nxt_tls_bundle_conf_t));
2750     if (nxt_slow_path(bundle == NULL)) {
2751         goto fail;
2752     }
2753 
2754     if (nxt_slow_path(nxt_str_dup(mp, &bundle->name, &tls->name) == NULL)) {
2755         goto fail;
2756     }
2757 
2758     bundle->chain_file = msg->fd[0];
2759     bundle->next = tlscf->bundle;
2760     tlscf->bundle = bundle;
2761 
2762     ret = task->thread->runtime->tls->server_init(task, mp, tls->tls_init,
2763                                                   tls->last);
2764     if (nxt_slow_path(ret != NXT_OK)) {
2765         goto fail;
2766     }
2767 
2768     nxt_work_queue_add(&task->thread->engine->fast_work_queue,
2769                        nxt_router_conf_apply, task, tmcf, NULL);
2770     return;
2771 
2772 fail:
2773 
2774     nxt_router_conf_error(task, tmcf);
2775 }
2776 
2777 #endif
2778 
2779 
2780 static void
2781 nxt_router_app_rpc_create(nxt_task_t *task,
2782     nxt_router_temp_conf_t *tmcf, nxt_app_t *app)
2783 {
2784     size_t         size;
2785     uint32_t       stream;
2786     nxt_fd_t       port_fd, queue_fd;
2787     nxt_int_t      ret;
2788     nxt_buf_t      *b;
2789     nxt_port_t     *router_port, *dport;
2790     nxt_runtime_t  *rt;
2791     nxt_app_rpc_t  *rpc;
2792 
2793     rt = task->thread->runtime;
2794 
2795     dport = app->proto_port;
2796 
2797     if (dport == NULL) {
2798         nxt_debug(task, "app '%V' prototype prefork", &app->name);
2799 
2800         size = app->name.length + 1 + app->conf.length;
2801 
2802         b = nxt_buf_mem_alloc(tmcf->mem_pool, size, 0);
2803         if (nxt_slow_path(b == NULL)) {
2804             goto fail;
2805         }
2806 
2807         b->completion_handler = nxt_buf_dummy_completion;
2808 
2809         nxt_buf_cpystr(b, &app->name);
2810         *b->mem.free++ = '\0';
2811         nxt_buf_cpystr(b, &app->conf);
2812 
2813         dport = rt->port_by_type[NXT_PROCESS_MAIN];
2814 
2815         port_fd = app->shared_port->pair[0];
2816         queue_fd = app->shared_port->queue_fd;
2817 
2818     } else {
2819         nxt_debug(task, "app '%V' prefork", &app->name);
2820 
2821         b = NULL;
2822         port_fd = -1;
2823         queue_fd = -1;
2824     }
2825 
2826     router_port = rt->port_by_type[NXT_PROCESS_ROUTER];
2827 
2828     rpc = nxt_port_rpc_register_handler_ex(task, router_port,
2829                                            nxt_router_app_prefork_ready,
2830                                            nxt_router_app_prefork_error,
2831                                            sizeof(nxt_app_rpc_t));
2832     if (nxt_slow_path(rpc == NULL)) {
2833         goto fail;
2834     }
2835 
2836     rpc->app = app;
2837     rpc->temp_conf = tmcf;
2838     rpc->proto = (b != NULL);
2839 
2840     stream = nxt_port_rpc_ex_stream(rpc);
2841 
2842     ret = nxt_port_socket_write2(task, dport, NXT_PORT_MSG_START_PROCESS,
2843                                  port_fd, queue_fd, stream, router_port->id, b);
2844     if (nxt_slow_path(ret != NXT_OK)) {
2845         nxt_port_rpc_cancel(task, router_port, stream);
2846         goto fail;
2847     }
2848 
2849     if (b == NULL) {
2850         nxt_port_rpc_ex_set_peer(task, router_port, rpc, dport->pid);
2851 
2852         app->pending_processes++;
2853     }
2854 
2855     return;
2856 
2857 fail:
2858 
2859     nxt_router_conf_error(task, tmcf);
2860 }
2861 
2862 
2863 static void
2864 nxt_router_app_prefork_ready(nxt_task_t *task, nxt_port_recv_msg_t *msg,
2865     void *data)
2866 {
2867     nxt_app_t           *app;
2868     nxt_port_t          *port;
2869     nxt_app_rpc_t       *rpc;
2870     nxt_event_engine_t  *engine;
2871 
2872     rpc = data;
2873     app = rpc->app;
2874 
2875     port = msg->u.new_port;
2876 
2877     nxt_assert(port != NULL);
2878     nxt_assert(port->id == 0);
2879 
2880     if (rpc->proto) {
2881         nxt_assert(app->proto_port == NULL);
2882         nxt_assert(port->type == NXT_PROCESS_PROTOTYPE);
2883 
2884         nxt_port_inc_use(port);
2885 
2886         app->proto_port = port;
2887         port->app = app;
2888 
2889         nxt_router_app_rpc_create(task, rpc->temp_conf, app);
2890 
2891         return;
2892     }
2893 
2894     nxt_assert(port->type == NXT_PROCESS_APP);
2895 
2896     port->app = app;
2897     port->main_app_port = port;
2898 
2899     app->pending_processes--;
2900     app->processes++;
2901     app->idle_processes++;
2902 
2903     engine = task->thread->engine;
2904 
2905     nxt_queue_insert_tail(&app->ports, &port->app_link);
2906     nxt_queue_insert_tail(&app->spare_ports, &port->idle_link);
2907 
2908     nxt_debug(task, "app '%V' move new port %PI:%d to spare_ports",
2909               &app->name, port->pid, port->id);
2910 
2911     nxt_port_hash_add(&app->port_hash, port);
2912     app->port_hash_count++;
2913 
2914     port->idle_start = 0;
2915 
2916     nxt_port_inc_use(port);
2917 
2918     nxt_port_socket_write(task, port, NXT_PORT_MSG_PORT_ACK, -1, 0, 0, NULL);
2919 
2920     nxt_work_queue_add(&engine->fast_work_queue,
2921                        nxt_router_conf_apply, task, rpc->temp_conf, NULL);
2922 }
2923 
2924 
2925 static void
2926 nxt_router_app_prefork_error(nxt_task_t *task, nxt_port_recv_msg_t *msg,
2927     void *data)
2928 {
2929     nxt_app_t               *app;
2930     nxt_app_rpc_t           *rpc;
2931     nxt_router_temp_conf_t  *tmcf;
2932 
2933     rpc = data;
2934     app = rpc->app;
2935     tmcf = rpc->temp_conf;
2936 
2937     if (rpc->proto) {
2938         nxt_log(task, NXT_LOG_WARN, "failed to start prototype \"%V\"",
2939                 &app->name);
2940 
2941     } else {
2942         nxt_log(task, NXT_LOG_WARN, "failed to start application \"%V\"",
2943                 &app->name);
2944 
2945         app->pending_processes--;
2946     }
2947 
2948     nxt_router_conf_error(task, tmcf);
2949 }
2950 
2951 
2952 static nxt_int_t
2953 nxt_router_engines_create(nxt_task_t *task, nxt_router_t *router,
2954     nxt_router_temp_conf_t *tmcf, const nxt_event_interface_t *interface)
2955 {
2956     nxt_int_t                 ret;
2957     nxt_uint_t                n, threads;
2958     nxt_queue_link_t          *qlk;
2959     nxt_router_engine_conf_t  *recf;
2960 
2961     threads = tmcf->router_conf->threads;
2962 
2963     tmcf->engines = nxt_array_create(tmcf->mem_pool, threads,
2964                                      sizeof(nxt_router_engine_conf_t));
2965     if (nxt_slow_path(tmcf->engines == NULL)) {
2966         return NXT_ERROR;
2967     }
2968 
2969     n = 0;
2970 
2971     for (qlk = nxt_queue_first(&router->engines);
2972          qlk != nxt_queue_tail(&router->engines);
2973          qlk = nxt_queue_next(qlk))
2974     {
2975         recf = nxt_array_zero_add(tmcf->engines);
2976         if (nxt_slow_path(recf == NULL)) {
2977             return NXT_ERROR;
2978         }
2979 
2980         recf->engine = nxt_queue_link_data(qlk, nxt_event_engine_t, link0);
2981 
2982         if (n < threads) {
2983             recf->action = NXT_ROUTER_ENGINE_KEEP;
2984             ret = nxt_router_engine_conf_update(tmcf, recf);
2985 
2986         } else {
2987             recf->action = NXT_ROUTER_ENGINE_DELETE;
2988             ret = nxt_router_engine_conf_delete(tmcf, recf);
2989         }
2990 
2991         if (nxt_slow_path(ret != NXT_OK)) {
2992             return ret;
2993         }
2994 
2995         n++;
2996     }
2997 
2998     tmcf->new_threads = n;
2999 
3000     while (n < threads) {
3001         recf = nxt_array_zero_add(tmcf->engines);
3002         if (nxt_slow_path(recf == NULL)) {
3003             return NXT_ERROR;
3004         }
3005 
3006         recf->action = NXT_ROUTER_ENGINE_ADD;
3007 
3008         recf->engine = nxt_event_engine_create(task, interface, NULL, 0, 0);
3009         if (nxt_slow_path(recf->engine == NULL)) {
3010             return NXT_ERROR;
3011         }
3012 
3013         ret = nxt_router_engine_conf_create(tmcf, recf);
3014         if (nxt_slow_path(ret != NXT_OK)) {
3015             return ret;
3016         }
3017 
3018         n++;
3019     }
3020 
3021     return NXT_OK;
3022 }
3023 
3024 
3025 static nxt_int_t
3026 nxt_router_engine_conf_create(nxt_router_temp_conf_t *tmcf,
3027     nxt_router_engine_conf_t *recf)
3028 {
3029     nxt_int_t  ret;
3030 
3031     ret = nxt_router_engine_joints_create(tmcf, recf, &creating_sockets,
3032                                           nxt_router_listen_socket_create);
3033     if (nxt_slow_path(ret != NXT_OK)) {
3034         return ret;
3035     }
3036 
3037     ret = nxt_router_engine_joints_create(tmcf, recf, &updating_sockets,
3038                                           nxt_router_listen_socket_create);
3039     if (nxt_slow_path(ret != NXT_OK)) {
3040         return ret;
3041     }
3042 
3043     return ret;
3044 }
3045 
3046 
3047 static nxt_int_t
3048 nxt_router_engine_conf_update(nxt_router_temp_conf_t *tmcf,
3049     nxt_router_engine_conf_t *recf)
3050 {
3051     nxt_int_t  ret;
3052 
3053     ret = nxt_router_engine_joints_create(tmcf, recf, &creating_sockets,
3054                                           nxt_router_listen_socket_create);
3055     if (nxt_slow_path(ret != NXT_OK)) {
3056         return ret;
3057     }
3058 
3059     ret = nxt_router_engine_joints_create(tmcf, recf, &updating_sockets,
3060                                           nxt_router_listen_socket_update);
3061     if (nxt_slow_path(ret != NXT_OK)) {
3062         return ret;
3063     }
3064 
3065     ret = nxt_router_engine_joints_delete(tmcf, recf, &deleting_sockets);
3066     if (nxt_slow_path(ret != NXT_OK)) {
3067         return ret;
3068     }
3069 
3070     return ret;
3071 }
3072 
3073 
3074 static nxt_int_t
3075 nxt_router_engine_conf_delete(nxt_router_temp_conf_t *tmcf,
3076     nxt_router_engine_conf_t *recf)
3077 {
3078     nxt_int_t  ret;
3079 
3080     ret = nxt_router_engine_quit(tmcf, recf);
3081     if (nxt_slow_path(ret != NXT_OK)) {
3082         return ret;
3083     }
3084 
3085     ret = nxt_router_engine_joints_delete(tmcf, recf, &updating_sockets);
3086     if (nxt_slow_path(ret != NXT_OK)) {
3087         return ret;
3088     }
3089 
3090     return nxt_router_engine_joints_delete(tmcf, recf, &deleting_sockets);
3091 }
3092 
3093 
3094 static nxt_int_t
3095 nxt_router_engine_joints_create(nxt_router_temp_conf_t *tmcf,
3096     nxt_router_engine_conf_t *recf, nxt_queue_t *sockets,
3097     nxt_work_handler_t handler)
3098 {
3099     nxt_int_t                ret;
3100     nxt_joint_job_t          *job;
3101     nxt_queue_link_t         *qlk;
3102     nxt_socket_conf_t        *skcf;
3103     nxt_socket_conf_joint_t  *joint;
3104 
3105     for (qlk = nxt_queue_first(sockets);
3106          qlk != nxt_queue_tail(sockets);
3107          qlk = nxt_queue_next(qlk))
3108     {
3109         job = nxt_mp_get(tmcf->mem_pool, sizeof(nxt_joint_job_t));
3110         if (nxt_slow_path(job == NULL)) {
3111             return NXT_ERROR;
3112         }
3113 
3114         job->work.next = recf->jobs;
3115         recf->jobs = &job->work;
3116 
3117         job->task = tmcf->engine->task;
3118         job->work.handler = handler;
3119         job->work.task = &job->task;
3120         job->work.obj = job;
3121         job->tmcf = tmcf;
3122 
3123         tmcf->count++;
3124 
3125         joint = nxt_mp_alloc(tmcf->router_conf->mem_pool,
3126                              sizeof(nxt_socket_conf_joint_t));
3127         if (nxt_slow_path(joint == NULL)) {
3128             return NXT_ERROR;
3129         }
3130 
3131         job->work.data = joint;
3132 
3133         ret = nxt_upstreams_joint_create(tmcf, &joint->upstreams);
3134         if (nxt_slow_path(ret != NXT_OK)) {
3135             return ret;
3136         }
3137 
3138         joint->count = 1;
3139 
3140         skcf = nxt_queue_link_data(qlk, nxt_socket_conf_t, link);
3141         skcf->count++;
3142         joint->socket_conf = skcf;
3143 
3144         joint->engine = recf->engine;
3145     }
3146 
3147     return NXT_OK;
3148 }
3149 
3150 
3151 static nxt_int_t
3152 nxt_router_engine_quit(nxt_router_temp_conf_t *tmcf,
3153     nxt_router_engine_conf_t *recf)
3154 {
3155     nxt_joint_job_t  *job;
3156 
3157     job = nxt_mp_get(tmcf->mem_pool, sizeof(nxt_joint_job_t));
3158     if (nxt_slow_path(job == NULL)) {
3159         return NXT_ERROR;
3160     }
3161 
3162     job->work.next = recf->jobs;
3163     recf->jobs = &job->work;
3164 
3165     job->task = tmcf->engine->task;
3166     job->work.handler = nxt_router_worker_thread_quit;
3167     job->work.task = &job->task;
3168     job->work.obj = NULL;
3169     job->work.data = NULL;
3170     job->tmcf = NULL;
3171 
3172     return NXT_OK;
3173 }
3174 
3175 
3176 static nxt_int_t
3177 nxt_router_engine_joints_delete(nxt_router_temp_conf_t *tmcf,
3178     nxt_router_engine_conf_t *recf, nxt_queue_t *sockets)
3179 {
3180     nxt_joint_job_t   *job;
3181     nxt_queue_link_t  *qlk;
3182 
3183     for (qlk = nxt_queue_first(sockets);
3184          qlk != nxt_queue_tail(sockets);
3185          qlk = nxt_queue_next(qlk))
3186     {
3187         job = nxt_mp_get(tmcf->mem_pool, sizeof(nxt_joint_job_t));
3188         if (nxt_slow_path(job == NULL)) {
3189             return NXT_ERROR;
3190         }
3191 
3192         job->work.next = recf->jobs;
3193         recf->jobs = &job->work;
3194 
3195         job->task = tmcf->engine->task;
3196         job->work.handler = nxt_router_listen_socket_delete;
3197         job->work.task = &job->task;
3198         job->work.obj = job;
3199         job->work.data = nxt_queue_link_data(qlk, nxt_socket_conf_t, link);
3200         job->tmcf = tmcf;
3201 
3202         tmcf->count++;
3203     }
3204 
3205     return NXT_OK;
3206 }
3207 
3208 
3209 static nxt_int_t
3210 nxt_router_threads_create(nxt_task_t *task, nxt_runtime_t *rt,
3211     nxt_router_temp_conf_t *tmcf)
3212 {
3213     nxt_int_t                 ret;
3214     nxt_uint_t                i, threads;
3215     nxt_router_engine_conf_t  *recf;
3216 
3217     recf = tmcf->engines->elts;
3218     threads = tmcf->router_conf->threads;
3219 
3220     for (i = tmcf->new_threads; i < threads; i++) {
3221         ret = nxt_router_thread_create(task, rt, recf[i].engine);
3222         if (nxt_slow_path(ret != NXT_OK)) {
3223             return ret;
3224         }
3225     }
3226 
3227     return NXT_OK;
3228 }
3229 
3230 
3231 static nxt_int_t
3232 nxt_router_thread_create(nxt_task_t *task, nxt_runtime_t *rt,
3233     nxt_event_engine_t *engine)
3234 {
3235     nxt_int_t            ret;
3236     nxt_thread_link_t    *link;
3237     nxt_thread_handle_t  handle;
3238 
3239     link = nxt_zalloc(sizeof(nxt_thread_link_t));
3240 
3241     if (nxt_slow_path(link == NULL)) {
3242         return NXT_ERROR;
3243     }
3244 
3245     link->start = nxt_router_thread_start;
3246     link->engine = engine;
3247     link->work.handler = nxt_router_thread_exit_handler;
3248     link->work.task = task;
3249     link->work.data = link;
3250 
3251     nxt_queue_insert_tail(&rt->engines, &engine->link);
3252 
3253     ret = nxt_thread_create(&handle, link);
3254 
3255     if (nxt_slow_path(ret != NXT_OK)) {
3256         nxt_queue_remove(&engine->link);
3257     }
3258 
3259     return ret;
3260 }
3261 
3262 
3263 static void
3264 nxt_router_apps_sort(nxt_task_t *task, nxt_router_t *router,
3265     nxt_router_temp_conf_t *tmcf)
3266 {
3267     nxt_app_t  *app;
3268 
3269     nxt_queue_each(app, &router->apps, nxt_app_t, link) {
3270 
3271         nxt_router_app_unlink(task, app);
3272 
3273     } nxt_queue_loop;
3274 
3275     nxt_queue_add(&router->apps, &tmcf->previous);
3276     nxt_queue_add(&router->apps, &tmcf->apps);
3277 }
3278 
3279 
3280 static void
3281 nxt_router_engines_post(nxt_router_t *router, nxt_router_temp_conf_t *tmcf)
3282 {
3283     nxt_uint_t                n;
3284     nxt_event_engine_t        *engine;
3285     nxt_router_engine_conf_t  *recf;
3286 
3287     recf = tmcf->engines->elts;
3288 
3289     for (n = tmcf->engines->nelts; n != 0; n--) {
3290         engine = recf->engine;
3291 
3292         switch (recf->action) {
3293 
3294         case NXT_ROUTER_ENGINE_KEEP:
3295             break;
3296 
3297         case NXT_ROUTER_ENGINE_ADD:
3298             nxt_queue_insert_tail(&router->engines, &engine->link0);
3299             break;
3300 
3301         case NXT_ROUTER_ENGINE_DELETE:
3302             nxt_queue_remove(&engine->link0);
3303             break;
3304         }
3305 
3306         nxt_router_engine_post(engine, recf->jobs);
3307 
3308         recf++;
3309     }
3310 }
3311 
3312 
3313 static void
3314 nxt_router_engine_post(nxt_event_engine_t *engine, nxt_work_t *jobs)
3315 {
3316     nxt_work_t  *work, *next;
3317 
3318     for (work = jobs; work != NULL; work = next) {
3319         next = work->next;
3320         work->next = NULL;
3321 
3322         nxt_event_engine_post(engine, work);
3323     }
3324 }
3325 
3326 
3327 static nxt_port_handlers_t  nxt_router_app_port_handlers = {
3328     .rpc_error       = nxt_port_rpc_handler,
3329     .mmap            = nxt_port_mmap_handler,
3330     .data            = nxt_port_rpc_handler,
3331     .oosm            = nxt_router_oosm_handler,
3332     .req_headers_ack = nxt_port_rpc_handler,
3333 };
3334 
3335 
3336 static void
3337 nxt_router_thread_start(void *data)
3338 {
3339     nxt_int_t           ret;
3340     nxt_port_t          *port;
3341     nxt_task_t          *task;
3342     nxt_work_t          *work;
3343     nxt_thread_t        *thread;
3344     nxt_thread_link_t   *link;
3345     nxt_event_engine_t  *engine;
3346 
3347     link = data;
3348     engine = link->engine;
3349     task = &engine->task;
3350 
3351     thread = nxt_thread();
3352 
3353     nxt_event_engine_thread_adopt(engine);
3354 
3355     /* STUB */
3356     thread->runtime = engine->task.thread->runtime;
3357 
3358     engine->task.thread = thread;
3359     engine->task.log = thread->log;
3360     thread->engine = engine;
3361     thread->task = &engine->task;
3362 #if 0
3363     thread->fiber = &engine->fibers->fiber;
3364 #endif
3365 
3366     engine->mem_pool = nxt_mp_create(4096, 128, 1024, 64);
3367     if (nxt_slow_path(engine->mem_pool == NULL)) {
3368         return;
3369     }
3370 
3371     port = nxt_port_new(task, nxt_port_get_next_id(), nxt_pid,
3372                         NXT_PROCESS_ROUTER);
3373     if (nxt_slow_path(port == NULL)) {
3374         return;
3375     }
3376 
3377     ret = nxt_port_socket_init(task, port, 0);
3378     if (nxt_slow_path(ret != NXT_OK)) {
3379         nxt_port_use(task, port, -1);
3380         return;
3381     }
3382 
3383     ret = nxt_router_port_queue_init(task, port);
3384     if (nxt_slow_path(ret != NXT_OK)) {
3385         nxt_port_use(task, port, -1);
3386         return;
3387     }
3388 
3389     engine->port = port;
3390 
3391     nxt_port_enable(task, port, &nxt_router_app_port_handlers);
3392 
3393     work = nxt_zalloc(sizeof(nxt_work_t));
3394     if (nxt_slow_path(work == NULL)) {
3395         return;
3396     }
3397 
3398     work->handler = nxt_router_rt_add_port;
3399     work->task = link->work.task;
3400     work->obj = work;
3401     work->data = port;
3402 
3403     nxt_event_engine_post(link->work.task->thread->engine, work);
3404 
3405     nxt_event_engine_start(engine);
3406 }
3407 
3408 
3409 static void
3410 nxt_router_rt_add_port(nxt_task_t *task, void *obj, void *data)
3411 {
3412     nxt_int_t      res;
3413     nxt_port_t     *port;
3414     nxt_runtime_t  *rt;
3415 
3416     rt = task->thread->runtime;
3417     port = data;
3418 
3419     nxt_free(obj);
3420 
3421     res = nxt_port_hash_add(&rt->ports, port);
3422 
3423     if (nxt_fast_path(res == NXT_OK)) {
3424         nxt_port_use(task, port, 1);
3425     }
3426 }
3427 
3428 
3429 static void
3430 nxt_router_listen_socket_create(nxt_task_t *task, void *obj, void *data)
3431 {
3432     nxt_joint_job_t          *job;
3433     nxt_socket_conf_t        *skcf;
3434     nxt_listen_event_t       *lev;
3435     nxt_listen_socket_t      *ls;
3436     nxt_thread_spinlock_t    *lock;
3437     nxt_socket_conf_joint_t  *joint;
3438 
3439     job = obj;
3440     joint = data;
3441 
3442     nxt_queue_insert_tail(&task->thread->engine->joints, &joint->link);
3443 
3444     skcf = joint->socket_conf;
3445     ls = skcf->listen;
3446 
3447     lev = nxt_listen_event(task, ls);
3448     if (nxt_slow_path(lev == NULL)) {
3449         nxt_router_listen_socket_release(task, skcf);
3450         return;
3451     }
3452 
3453     lev->socket.data = joint;
3454 
3455     lock = &skcf->router_conf->router->lock;
3456 
3457     nxt_thread_spin_lock(lock);
3458     ls->count++;
3459     nxt_thread_spin_unlock(lock);
3460 
3461     job->work.next = NULL;
3462     job->work.handler = nxt_router_conf_wait;
3463 
3464     nxt_event_engine_post(job->tmcf->engine, &job->work);
3465 }
3466 
3467 
3468 nxt_inline nxt_listen_event_t *
3469 nxt_router_listen_event(nxt_queue_t *listen_connections,
3470     nxt_socket_conf_t *skcf)
3471 {
3472     nxt_socket_t        fd;
3473     nxt_queue_link_t    *qlk;
3474     nxt_listen_event_t  *lev;
3475 
3476     fd = skcf->listen->socket;
3477 
3478     for (qlk = nxt_queue_first(listen_connections);
3479          qlk != nxt_queue_tail(listen_connections);
3480          qlk = nxt_queue_next(qlk))
3481     {
3482         lev = nxt_queue_link_data(qlk, nxt_listen_event_t, link);
3483 
3484         if (fd == lev->socket.fd) {
3485             return lev;
3486         }
3487     }
3488 
3489     return NULL;
3490 }
3491 
3492 
3493 static void
3494 nxt_router_listen_socket_update(nxt_task_t *task, void *obj, void *data)
3495 {
3496     nxt_joint_job_t          *job;
3497     nxt_event_engine_t       *engine;
3498     nxt_listen_event_t       *lev;
3499     nxt_socket_conf_joint_t  *joint, *old;
3500 
3501     job = obj;
3502     joint = data;
3503 
3504     engine = task->thread->engine;
3505 
3506     nxt_queue_insert_tail(&engine->joints, &joint->link);
3507 
3508     lev = nxt_router_listen_event(&engine->listen_connections,
3509                                   joint->socket_conf);
3510 
3511     old = lev->socket.data;
3512     lev->socket.data = joint;
3513     lev->listen = joint->socket_conf->listen;
3514 
3515     job->work.next = NULL;
3516     job->work.handler = nxt_router_conf_wait;
3517 
3518     nxt_event_engine_post(job->tmcf->engine, &job->work);
3519 
3520     /*
3521      * The task is allocated from configuration temporary
3522      * memory pool so it can be freed after engine post operation.
3523      */
3524 
3525     nxt_router_conf_release(&engine->task, old);
3526 }
3527 
3528 
3529 static void
3530 nxt_router_listen_socket_delete(nxt_task_t *task, void *obj, void *data)
3531 {
3532     nxt_socket_conf_t        *skcf;
3533     nxt_listen_event_t       *lev;
3534     nxt_event_engine_t       *engine;
3535     nxt_socket_conf_joint_t  *joint;
3536 
3537     skcf = data;
3538 
3539     engine = task->thread->engine;
3540 
3541     lev = nxt_router_listen_event(&engine->listen_connections, skcf);
3542 
3543     nxt_fd_event_delete(engine, &lev->socket);
3544 
3545     nxt_debug(task, "engine %p: listen socket delete: %d", engine,
3546               lev->socket.fd);
3547 
3548     joint = lev->socket.data;
3549     joint->close_job = obj;
3550 
3551     lev->timer.handler = nxt_router_listen_socket_close;
3552     lev->timer.work_queue = &engine->fast_work_queue;
3553 
3554     nxt_timer_add(engine, &lev->timer, 0);
3555 }
3556 
3557 
3558 static void
3559 nxt_router_worker_thread_quit(nxt_task_t *task, void *obj, void *data)
3560 {
3561     nxt_event_engine_t  *engine;
3562 
3563     nxt_debug(task, "router worker thread quit");
3564 
3565     engine = task->thread->engine;
3566 
3567     engine->shutdown = 1;
3568 
3569     if (nxt_queue_is_empty(&engine->joints)) {
3570         nxt_thread_exit(task->thread);
3571     }
3572 }
3573 
3574 
3575 static void
3576 nxt_router_listen_socket_close(nxt_task_t *task, void *obj, void *data)
3577 {
3578     nxt_timer_t              *timer;
3579     nxt_joint_job_t          *job;
3580     nxt_listen_event_t       *lev;
3581     nxt_socket_conf_joint_t  *joint;
3582 
3583     timer = obj;
3584     lev = nxt_timer_data(timer, nxt_listen_event_t, timer);
3585 
3586     nxt_debug(task, "engine %p: listen socket close: %d", task->thread->engine,
3587               lev->socket.fd);
3588 
3589     nxt_queue_remove(&lev->link);
3590 
3591     joint = lev->socket.data;
3592     lev->socket.data = NULL;
3593 
3594     /* 'task' refers to lev->task and we cannot use after nxt_free() */
3595     task = &task->thread->engine->task;
3596 
3597     nxt_router_listen_socket_release(task, joint->socket_conf);
3598 
3599     job = joint->close_job;
3600     job->work.next = NULL;
3601     job->work.handler = nxt_router_conf_wait;
3602 
3603     nxt_event_engine_post(job->tmcf->engine, &job->work);
3604 
3605     nxt_router_listen_event_release(task, lev, joint);
3606 }
3607 
3608 
3609 static void
3610 nxt_router_listen_socket_release(nxt_task_t *task, nxt_socket_conf_t *skcf)
3611 {
3612     nxt_listen_socket_t    *ls;
3613     nxt_thread_spinlock_t  *lock;
3614 
3615     ls = skcf->listen;
3616     lock = &skcf->router_conf->router->lock;
3617 
3618     nxt_thread_spin_lock(lock);
3619 
3620     nxt_debug(task, "engine %p: listen socket release: ls->count %D",
3621               task->thread->engine, ls->count);
3622 
3623     if (--ls->count != 0) {
3624         ls = NULL;
3625     }
3626 
3627     nxt_thread_spin_unlock(lock);
3628 
3629     if (ls != NULL) {
3630         nxt_socket_close(task, ls->socket);
3631         nxt_free(ls);
3632     }
3633 }
3634 
3635 
3636 void
3637 nxt_router_listen_event_release(nxt_task_t *task, nxt_listen_event_t *lev,
3638     nxt_socket_conf_joint_t *joint)
3639 {
3640     nxt_event_engine_t  *engine;
3641 
3642     nxt_debug(task, "listen event count: %D", lev->count);
3643 
3644     engine = task->thread->engine;
3645 
3646     if (--lev->count == 0) {
3647         if (lev->next != NULL) {
3648             nxt_sockaddr_cache_free(engine, lev->next);
3649 
3650             nxt_conn_free(task, lev->next);
3651         }
3652 
3653         nxt_free(lev);
3654     }
3655 
3656     if (joint != NULL) {
3657         nxt_router_conf_release(task, joint);
3658     }
3659 
3660     if (engine->shutdown && nxt_queue_is_empty(&engine->joints)) {
3661         nxt_thread_exit(task->thread);
3662     }
3663 }
3664 
3665 
3666 void
3667 nxt_router_conf_release(nxt_task_t *task, nxt_socket_conf_joint_t *joint)
3668 {
3669     nxt_socket_conf_t      *skcf;
3670     nxt_router_conf_t      *rtcf;
3671     nxt_thread_spinlock_t  *lock;
3672 
3673     nxt_debug(task, "conf joint %p count: %D", joint, joint->count);
3674 
3675     if (--joint->count != 0) {
3676         return;
3677     }
3678 
3679     nxt_queue_remove(&joint->link);
3680 
3681     /*
3682      * The joint content can not be safely used after the critical
3683      * section protected by the spinlock because its memory pool may
3684      * be already destroyed by another thread.
3685      */
3686     skcf = joint->socket_conf;
3687     rtcf = skcf->router_conf;
3688     lock = &rtcf->router->lock;
3689 
3690     nxt_thread_spin_lock(lock);
3691 
3692     nxt_debug(task, "conf skcf %p: %D, rtcf %p: %D", skcf, skcf->count,
3693               rtcf, rtcf->count);
3694 
3695     if (--skcf->count != 0) {
3696         skcf = NULL;
3697         rtcf = NULL;
3698 
3699     } else {
3700         nxt_queue_remove(&skcf->link);
3701 
3702         if (--rtcf->count != 0) {
3703             rtcf = NULL;
3704         }
3705     }
3706 
3707     nxt_thread_spin_unlock(lock);
3708 
3709 #if (NXT_TLS)
3710     if (skcf != NULL && skcf->tls != NULL) {
3711         task->thread->runtime->tls->server_free(task, skcf->tls);
3712     }
3713 #endif
3714 
3715     /* TODO remove engine->port */
3716 
3717     if (rtcf != NULL) {
3718         nxt_debug(task, "old router conf is destroyed");
3719 
3720         nxt_router_apps_hash_use(task, rtcf, -1);
3721 
3722         nxt_router_access_log_release(task, lock, rtcf->access_log);
3723 
3724         nxt_mp_thread_adopt(rtcf->mem_pool);
3725 
3726         nxt_mp_destroy(rtcf->mem_pool);
3727     }
3728 }
3729 
3730 
3731 static void
3732 nxt_router_access_log_writer(nxt_task_t *task, nxt_http_request_t *r,
3733     nxt_router_access_log_t *access_log)
3734 {
3735     size_t     size;
3736     u_char     *buf, *p;
3737     nxt_off_t  bytes;
3738 
3739     static nxt_time_string_t  date_cache = {
3740         (nxt_atomic_uint_t) -1,
3741         nxt_router_access_log_date,
3742         "%02d/%s/%4d:%02d:%02d:%02d %c%02d%02d",
3743         nxt_length("31/Dec/1986:19:40:00 +0300"),
3744         NXT_THREAD_TIME_LOCAL,
3745         NXT_THREAD_TIME_SEC,
3746     };
3747 
3748     size = r->remote->address_length
3749            + 6                  /* ' - - [' */
3750            + date_cache.size
3751            + 3                  /* '] "' */
3752            + r->method->length
3753            + 1                  /* space */
3754            + r->target.length
3755            + 1                  /* space */
3756            + r->version.length
3757            + 2                  /* '" ' */
3758            + 3                  /* status */
3759            + 1                  /* space */
3760            + NXT_OFF_T_LEN
3761            + 2                  /* ' "' */
3762            + (r->referer != NULL ? r->referer->value_length : 1)
3763            + 3                  /* '" "' */
3764            + (r->user_agent != NULL ? r->user_agent->value_length : 1)
3765            + 2                  /* '"\n' */
3766     ;
3767 
3768     buf = nxt_mp_nget(r->mem_pool, size);
3769     if (nxt_slow_path(buf == NULL)) {
3770         return;
3771     }
3772 
3773     p = nxt_cpymem(buf, nxt_sockaddr_address(r->remote),
3774                    r->remote->address_length);
3775 
3776     p = nxt_cpymem(p, " - - [", 6);
3777 
3778     p = nxt_thread_time_string(task->thread, &date_cache, p);
3779 
3780     p = nxt_cpymem(p, "] \"", 3);
3781 
3782     if (r->method->length != 0) {
3783         p = nxt_cpymem(p, r->method->start, r->method->length);
3784 
3785         if (r->target.length != 0) {
3786             *p++ = ' ';
3787             p = nxt_cpymem(p, r->target.start, r->target.length);
3788 
3789             if (r->version.length != 0) {
3790                 *p++ = ' ';
3791                 p = nxt_cpymem(p, r->version.start, r->version.length);
3792             }
3793         }
3794 
3795     } else {
3796         *p++ = '-';
3797     }
3798 
3799     p = nxt_cpymem(p, "\" ", 2);
3800 
3801     p = nxt_sprintf(p, p + 3, "%03d", r->status);
3802 
3803     *p++ = ' ';
3804 
3805     bytes = nxt_http_proto[r->protocol].body_bytes_sent(task, r->proto);
3806 
3807     p = nxt_sprintf(p, p + NXT_OFF_T_LEN, "%O", bytes);
3808 
3809     p = nxt_cpymem(p, " \"", 2);
3810 
3811     if (r->referer != NULL) {
3812         p = nxt_cpymem(p, r->referer->value, r->referer->value_length);
3813 
3814     } else {
3815         *p++ = '-';
3816     }
3817 
3818     p = nxt_cpymem(p, "\" \"", 3);
3819 
3820     if (r->user_agent != NULL) {
3821         p = nxt_cpymem(p, r->user_agent->value, r->user_agent->value_length);
3822 
3823     } else {
3824         *p++ = '-';
3825     }
3826 
3827     p = nxt_cpymem(p, "\"\n", 2);
3828 
3829     nxt_fd_write(access_log->fd, buf, p - buf);
3830 }
3831 
3832 
3833 static u_char *
3834 nxt_router_access_log_date(u_char *buf, nxt_realtime_t *now, struct tm *tm,
3835     size_t size, const char *format)
3836 {
3837     u_char  sign;
3838     time_t  gmtoff;
3839 
3840     static const char  *month[] = { "Jan", "Feb", "Mar", "Apr", "May", "Jun",
3841                                     "Jul", "Aug", "Sep", "Oct", "Nov", "Dec" };
3842 
3843     gmtoff = nxt_timezone(tm) / 60;
3844 
3845     if (gmtoff < 0) {
3846         gmtoff = -gmtoff;
3847         sign = '-';
3848 
3849     } else {
3850         sign = '+';
3851     }
3852 
3853     return nxt_sprintf(buf, buf + size, format,
3854                        tm->tm_mday, month[tm->tm_mon], tm->tm_year + 1900,
3855                        tm->tm_hour, tm->tm_min, tm->tm_sec,
3856                        sign, gmtoff / 60, gmtoff % 60);
3857 }
3858 
3859 
3860 static void
3861 nxt_router_access_log_open(nxt_task_t *task, nxt_router_temp_conf_t *tmcf)
3862 {
3863     uint32_t                 stream;
3864     nxt_int_t                ret;
3865     nxt_buf_t                *b;
3866     nxt_port_t               *main_port, *router_port;
3867     nxt_runtime_t            *rt;
3868     nxt_router_access_log_t  *access_log;
3869 
3870     access_log = tmcf->router_conf->access_log;
3871 
3872     b = nxt_buf_mem_alloc(tmcf->mem_pool, access_log->path.length + 1, 0);
3873     if (nxt_slow_path(b == NULL)) {
3874         goto fail;
3875     }
3876 
3877     b->completion_handler = nxt_buf_dummy_completion;
3878 
3879     nxt_buf_cpystr(b, &access_log->path);
3880     *b->mem.free++ = '\0';
3881 
3882     rt = task->thread->runtime;
3883     main_port = rt->port_by_type[NXT_PROCESS_MAIN];
3884     router_port = rt->port_by_type[NXT_PROCESS_ROUTER];
3885 
3886     stream = nxt_port_rpc_register_handler(task, router_port,
3887                                            nxt_router_access_log_ready,
3888                                            nxt_router_access_log_error,
3889                                            -1, tmcf);
3890     if (nxt_slow_path(stream == 0)) {
3891         goto fail;
3892     }
3893 
3894     ret = nxt_port_socket_write(task, main_port, NXT_PORT_MSG_ACCESS_LOG, -1,
3895                                 stream, router_port->id, b);
3896 
3897     if (nxt_slow_path(ret != NXT_OK)) {
3898         nxt_port_rpc_cancel(task, router_port, stream);
3899         goto fail;
3900     }
3901 
3902     return;
3903 
3904 fail:
3905 
3906     nxt_router_conf_error(task, tmcf);
3907 }
3908 
3909 
3910 static void
3911 nxt_router_access_log_ready(nxt_task_t *task, nxt_port_recv_msg_t *msg,
3912     void *data)
3913 {
3914     nxt_router_temp_conf_t   *tmcf;
3915     nxt_router_access_log_t  *access_log;
3916 
3917     tmcf = data;
3918 
3919     access_log = tmcf->router_conf->access_log;
3920 
3921     access_log->fd = msg->fd[0];
3922 
3923     nxt_work_queue_add(&task->thread->engine->fast_work_queue,
3924                        nxt_router_conf_apply, task, tmcf, NULL);
3925 }
3926 
3927 
3928 static void
3929 nxt_router_access_log_error(nxt_task_t *task, nxt_port_recv_msg_t *msg,
3930     void *data)
3931 {
3932     nxt_router_temp_conf_t  *tmcf;
3933 
3934     tmcf = data;
3935 
3936     nxt_router_conf_error(task, tmcf);
3937 }
3938 
3939 
3940 static void
3941 nxt_router_access_log_use(nxt_thread_spinlock_t *lock,
3942     nxt_router_access_log_t *access_log)
3943 {
3944     if (access_log == NULL) {
3945         return;
3946     }
3947 
3948     nxt_thread_spin_lock(lock);
3949 
3950     access_log->count++;
3951 
3952     nxt_thread_spin_unlock(lock);
3953 }
3954 
3955 
3956 static void
3957 nxt_router_access_log_release(nxt_task_t *task, nxt_thread_spinlock_t *lock,
3958     nxt_router_access_log_t *access_log)
3959 {
3960     if (access_log == NULL) {
3961         return;
3962     }
3963 
3964     nxt_thread_spin_lock(lock);
3965 
3966     if (--access_log->count != 0) {
3967         access_log = NULL;
3968     }
3969 
3970     nxt_thread_spin_unlock(lock);
3971 
3972     if (access_log != NULL) {
3973 
3974         if (access_log->fd != -1) {
3975             nxt_fd_close(access_log->fd);
3976         }
3977 
3978         nxt_free(access_log);
3979     }
3980 }
3981 
3982 
3983 typedef struct {
3984     nxt_mp_t                 *mem_pool;
3985     nxt_router_access_log_t  *access_log;
3986 } nxt_router_access_log_reopen_t;
3987 
3988 
3989 static void
3990 nxt_router_access_log_reopen_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
3991 {
3992     nxt_mp_t                        *mp;
3993     uint32_t                        stream;
3994     nxt_int_t                       ret;
3995     nxt_buf_t                       *b;
3996     nxt_port_t                      *main_port, *router_port;
3997     nxt_runtime_t                   *rt;
3998     nxt_router_access_log_t         *access_log;
3999     nxt_router_access_log_reopen_t  *reopen;
4000 
4001     access_log = nxt_router->access_log;
4002 
4003     if (access_log == NULL) {
4004         return;
4005     }
4006 
4007     mp = nxt_mp_create(1024, 128, 256, 32);
4008     if (nxt_slow_path(mp == NULL)) {
4009         return;
4010     }
4011 
4012     reopen = nxt_mp_get(mp, sizeof(nxt_router_access_log_reopen_t));
4013     if (nxt_slow_path(reopen == NULL)) {
4014         goto fail;
4015     }
4016 
4017     reopen->mem_pool = mp;
4018     reopen->access_log = access_log;
4019 
4020     b = nxt_buf_mem_alloc(mp, access_log->path.length + 1, 0);
4021     if (nxt_slow_path(b == NULL)) {
4022         goto fail;
4023     }
4024 
4025     b->completion_handler = nxt_router_access_log_reopen_completion;
4026 
4027     nxt_buf_cpystr(b, &access_log->path);
4028     *b->mem.free++ = '\0';
4029 
4030     rt = task->thread->runtime;
4031     main_port = rt->port_by_type[NXT_PROCESS_MAIN];
4032     router_port = rt->port_by_type[NXT_PROCESS_ROUTER];
4033 
4034     stream = nxt_port_rpc_register_handler(task, router_port,
4035                                            nxt_router_access_log_reopen_ready,
4036                                            nxt_router_access_log_reopen_error,
4037                                            -1, reopen);
4038     if (nxt_slow_path(stream == 0)) {
4039         goto fail;
4040     }
4041 
4042     ret = nxt_port_socket_write(task, main_port, NXT_PORT_MSG_ACCESS_LOG, -1,
4043                                 stream, router_port->id, b);
4044 
4045     if (nxt_slow_path(ret != NXT_OK)) {
4046         nxt_port_rpc_cancel(task, router_port, stream);
4047         goto fail;
4048     }
4049 
4050     nxt_mp_retain(mp);
4051 
4052     return;
4053 
4054 fail:
4055 
4056     nxt_mp_destroy(mp);
4057 }
4058 
4059 
4060 static void
4061 nxt_router_access_log_reopen_completion(nxt_task_t *task, void *obj, void *data)
4062 {
4063     nxt_mp_t   *mp;
4064     nxt_buf_t  *b;
4065 
4066     b = obj;
4067     mp = b->data;
4068 
4069     nxt_mp_release(mp);
4070 }
4071 
4072 
4073 static void
4074 nxt_router_access_log_reopen_ready(nxt_task_t *task, nxt_port_recv_msg_t *msg,
4075     void *data)
4076 {
4077     nxt_router_access_log_t         *access_log;
4078     nxt_router_access_log_reopen_t  *reopen;
4079 
4080     reopen = data;
4081 
4082     access_log = reopen->access_log;
4083 
4084     if (access_log == nxt_router->access_log) {
4085 
4086         if (nxt_slow_path(dup2(msg->fd[0], access_log->fd) == -1)) {
4087             nxt_alert(task, "dup2(%FD, %FD) failed %E",
4088                       msg->fd[0], access_log->fd, nxt_errno);
4089         }
4090     }
4091 
4092     nxt_fd_close(msg->fd[0]);
4093     nxt_mp_release(reopen->mem_pool);
4094 }
4095 
4096 
4097 static void
4098 nxt_router_access_log_reopen_error(nxt_task_t *task, nxt_port_recv_msg_t *msg,
4099     void *data)
4100 {
4101     nxt_router_access_log_reopen_t  *reopen;
4102 
4103     reopen = data;
4104 
4105     nxt_mp_release(reopen->mem_pool);
4106 }
4107 
4108 
4109 static void
4110 nxt_router_thread_exit_handler(nxt_task_t *task, void *obj, void *data)
4111 {
4112     nxt_port_t           *port;
4113     nxt_thread_link_t    *link;
4114     nxt_event_engine_t   *engine;
4115     nxt_thread_handle_t  handle;
4116 
4117     handle = (nxt_thread_handle_t) (uintptr_t) obj;
4118     link = data;
4119 
4120     nxt_thread_wait(handle);
4121 
4122     engine = link->engine;
4123 
4124     nxt_queue_remove(&engine->link);
4125 
4126     port = engine->port;
4127 
4128     // TODO notify all apps
4129 
4130     port->engine = task->thread->engine;
4131     nxt_mp_thread_adopt(port->mem_pool);
4132     nxt_port_use(task, port, -1);
4133 
4134     nxt_mp_thread_adopt(engine->mem_pool);
4135     nxt_mp_destroy(engine->mem_pool);
4136 
4137     nxt_event_engine_free(engine);
4138 
4139     nxt_free(link);
4140 }
4141 
4142 
4143 static void
4144 nxt_router_response_ready_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg,
4145     void *data)
4146 {
4147     size_t                  b_size, count;
4148     nxt_int_t               ret;
4149     nxt_app_t               *app;
4150     nxt_buf_t               *b, *next;
4151     nxt_port_t              *app_port;
4152     nxt_unit_field_t        *f;
4153     nxt_http_field_t        *field;
4154     nxt_http_request_t      *r;
4155     nxt_unit_response_t     *resp;
4156     nxt_request_rpc_data_t  *req_rpc_data;
4157 
4158     req_rpc_data = data;
4159 
4160     r = req_rpc_data->request;
4161     if (nxt_slow_path(r == NULL)) {
4162         return;
4163     }
4164 
4165     if (r->error) {
4166         nxt_request_rpc_data_unlink(task, req_rpc_data);
4167         return;
4168     }
4169 
4170     app = req_rpc_data->app;
4171     nxt_assert(app != NULL);
4172 
4173     if (msg->port_msg.type == _NXT_PORT_MSG_REQ_HEADERS_ACK) {
4174         nxt_router_req_headers_ack_handler(task, msg, req_rpc_data);
4175 
4176         return;
4177     }
4178 
4179     b = (msg->size == 0) ? NULL : msg->buf;
4180 
4181     if (msg->port_msg.last != 0) {
4182         nxt_debug(task, "router data create last buf");
4183 
4184         nxt_buf_chain_add(&b, nxt_http_buf_last(r));
4185 
4186         req_rpc_data->rpc_cancel = 0;
4187 
4188         if (req_rpc_data->apr_action == NXT_APR_REQUEST_FAILED) {
4189             req_rpc_data->apr_action = NXT_APR_GOT_RESPONSE;
4190         }
4191 
4192         nxt_request_rpc_data_unlink(task, req_rpc_data);
4193 
4194     } else {
4195         if (app->timeout != 0) {
4196             r->timer.handler = nxt_router_app_timeout;
4197             r->timer_data = req_rpc_data;
4198             nxt_timer_add(task->thread->engine, &r->timer, app->timeout);
4199         }
4200     }
4201 
4202     if (b == NULL) {
4203         return;
4204     }
4205 
4206     if (msg->buf == b) {
4207         /* Disable instant buffer completion/re-using by port. */
4208         msg->buf = NULL;
4209     }
4210 
4211     if (r->header_sent) {
4212         nxt_buf_chain_add(&r->out, b);
4213         nxt_http_request_send_body(task, r, NULL);
4214 
4215     } else {
4216         b_size = nxt_buf_is_mem(b) ? nxt_buf_mem_used_size(&b->mem) : 0;
4217 
4218         if (nxt_slow_path(b_size < sizeof(nxt_unit_response_t))) {
4219             nxt_alert(task, "response buffer too small: %z", b_size);
4220             goto fail;
4221         }
4222 
4223         resp = (void *) b->mem.pos;
4224         count = (b_size - sizeof(nxt_unit_response_t))
4225                     / sizeof(nxt_unit_field_t);
4226 
4227         if (nxt_slow_path(count < resp->fields_count)) {
4228             nxt_alert(task, "response buffer too small for fields count: %D",
4229                       resp->fields_count);
4230             goto fail;
4231         }
4232 
4233         field = NULL;
4234 
4235         for (f = resp->fields; f < resp->fields + resp->fields_count; f++) {
4236             if (f->skip) {
4237                 continue;
4238             }
4239 
4240             field = nxt_list_add(r->resp.fields);
4241 
4242             if (nxt_slow_path(field == NULL)) {
4243                 goto fail;
4244             }
4245 
4246             field->hash = f->hash;
4247             field->skip = 0;
4248             field->hopbyhop = 0;
4249 
4250             field->name_length = f->name_length;
4251             field->value_length = f->value_length;
4252             field->name = nxt_unit_sptr_get(&f->name);
4253             field->value = nxt_unit_sptr_get(&f->value);
4254 
4255             ret = nxt_http_field_process(field, &nxt_response_fields_hash, r);
4256             if (nxt_slow_path(ret != NXT_OK)) {
4257                 goto fail;
4258             }
4259 
4260             nxt_debug(task, "header%s: %*s: %*s",
4261                       (field->skip ? " skipped" : ""),
4262                       (size_t) field->name_length, field->name,
4263                       (size_t) field->value_length, field->value);
4264 
4265             if (field->skip) {
4266                 r->resp.fields->last->nelts--;
4267             }
4268         }
4269 
4270         r->status = resp->status;
4271 
4272         if (resp->piggyback_content_length != 0) {
4273             b->mem.pos = nxt_unit_sptr_get(&resp->piggyback_content);
4274             b->mem.free = b->mem.pos + resp->piggyback_content_length;
4275 
4276         } else {
4277             b->mem.pos = b->mem.free;
4278         }
4279 
4280         if (nxt_buf_mem_used_size(&b->mem) == 0) {
4281             next = b->next;
4282             b->next = NULL;
4283 
4284             nxt_work_queue_add(&task->thread->engine->fast_work_queue,
4285                                b->completion_handler, task, b, b->parent);
4286 
4287             b = next;
4288         }
4289 
4290         if (b != NULL) {
4291             nxt_buf_chain_add(&r->out, b);
4292         }
4293 
4294         nxt_http_request_header_send(task, r, nxt_http_request_send_body, NULL);
4295 
4296         if (r->websocket_handshake
4297             && r->status == NXT_HTTP_SWITCHING_PROTOCOLS)
4298         {
4299             app_port = req_rpc_data->app_port;
4300             if (nxt_slow_path(app_port == NULL)) {
4301                 goto fail;
4302             }
4303 
4304             nxt_thread_mutex_lock(&app->mutex);
4305 
4306             app_port->main_app_port->active_websockets++;
4307 
4308             nxt_thread_mutex_unlock(&app->mutex);
4309 
4310             nxt_router_app_port_release(task, app, app_port, NXT_APR_UPGRADE);
4311             req_rpc_data->apr_action = NXT_APR_CLOSE;
4312 
4313             nxt_debug(task, "stream #%uD upgrade", req_rpc_data->stream);
4314 
4315             r->state = &nxt_http_websocket;
4316 
4317         } else {
4318             r->state = &nxt_http_request_send_state;
4319         }
4320     }
4321 
4322     return;
4323 
4324 fail:
4325 
4326     nxt_http_request_error(task, r, NXT_HTTP_SERVICE_UNAVAILABLE);
4327 
4328     nxt_request_rpc_data_unlink(task, req_rpc_data);
4329 }
4330 
4331 
4332 static void
4333 nxt_router_req_headers_ack_handler(nxt_task_t *task,
4334     nxt_port_recv_msg_t *msg, nxt_request_rpc_data_t *req_rpc_data)
4335 {
4336     int                 res;
4337     nxt_app_t           *app;
4338     nxt_buf_t           *b;
4339     nxt_bool_t          start_process, unlinked;
4340     nxt_port_t          *app_port, *main_app_port, *idle_port;
4341     nxt_queue_link_t    *idle_lnk;
4342     nxt_http_request_t  *r;
4343 
4344     nxt_debug(task, "stream #%uD: got ack from %PI:%d",
4345               req_rpc_data->stream,
4346               msg->port_msg.pid, msg->port_msg.reply_port);
4347 
4348     nxt_port_rpc_ex_set_peer(task, msg->port, req_rpc_data,
4349                              msg->port_msg.pid);
4350 
4351     app = req_rpc_data->app;
4352     r = req_rpc_data->request;
4353 
4354     start_process = 0;
4355     unlinked = 0;
4356 
4357     nxt_thread_mutex_lock(&app->mutex);
4358 
4359     if (r->app_link.next != NULL) {
4360         nxt_queue_remove(&r->app_link);
4361         r->app_link.next = NULL;
4362 
4363         unlinked = 1;
4364     }
4365 
4366     app_port = nxt_port_hash_find(&app->port_hash, msg->port_msg.pid,
4367                                   msg->port_msg.reply_port);
4368     if (nxt_slow_path(app_port == NULL)) {
4369         nxt_thread_mutex_unlock(&app->mutex);
4370 
4371         nxt_http_request_error(task, r, NXT_HTTP_INTERNAL_SERVER_ERROR);
4372 
4373         if (unlinked) {
4374             nxt_mp_release(r->mem_pool);
4375         }
4376 
4377         return;
4378     }
4379 
4380     main_app_port = app_port->main_app_port;
4381 
4382     if (nxt_queue_chk_remove(&main_app_port->idle_link)) {
4383         app->idle_processes--;
4384 
4385         nxt_debug(task, "app '%V' move port %PI:%d out of %s (ack)",
4386                   &app->name, main_app_port->pid, main_app_port->id,
4387                   (main_app_port->idle_start ? "idle_ports" : "spare_ports"));
4388 
4389         /* Check port was in 'spare_ports' using idle_start field. */
4390         if (main_app_port->idle_start == 0
4391             && app->idle_processes >= app->spare_processes)
4392         {
4393             /*
4394              * If there is a vacant space in spare ports,
4395              * move the last idle to spare_ports.
4396              */
4397             nxt_assert(!nxt_queue_is_empty(&app->idle_ports));
4398 
4399             idle_lnk = nxt_queue_last(&app->idle_ports);
4400             idle_port = nxt_queue_link_data(idle_lnk, nxt_port_t, idle_link);
4401             nxt_queue_remove(idle_lnk);
4402 
4403             nxt_queue_insert_tail(&app->spare_ports, idle_lnk);
4404 
4405             idle_port->idle_start = 0;
4406 
4407             nxt_debug(task, "app '%V' move port %PI:%d from idle_ports "
4408                       "to spare_ports",
4409                       &app->name, idle_port->pid, idle_port->id);
4410         }
4411 
4412         if (nxt_router_app_can_start(app) && nxt_router_app_need_start(app)) {
4413             app->pending_processes++;
4414             start_process = 1;
4415         }
4416     }
4417 
4418     main_app_port->active_requests++;
4419 
4420     nxt_port_inc_use(app_port);
4421 
4422     nxt_thread_mutex_unlock(&app->mutex);
4423 
4424     if (unlinked) {
4425         nxt_mp_release(r->mem_pool);
4426     }
4427 
4428     if (start_process) {
4429         nxt_router_start_app_process(task, app);
4430     }
4431 
4432     nxt_port_use(task, req_rpc_data->app_port, -1);
4433 
4434     req_rpc_data->app_port = app_port;
4435 
4436     b = req_rpc_data->msg_info.buf;
4437 
4438     if (b != NULL) {
4439         /* First buffer is already sent.  Start from second. */
4440         b = b->next;
4441 
4442         req_rpc_data->msg_info.buf->next = NULL;
4443     }
4444 
4445     if (req_rpc_data->msg_info.body_fd != -1 || b != NULL) {
4446         nxt_debug(task, "stream #%uD: send body fd %d", req_rpc_data->stream,
4447                   req_rpc_data->msg_info.body_fd);
4448 
4449         if (req_rpc_data->msg_info.body_fd != -1) {
4450             lseek(req_rpc_data->msg_info.body_fd, 0, SEEK_SET);
4451         }
4452 
4453         res = nxt_port_socket_write(task, app_port, NXT_PORT_MSG_REQ_BODY,
4454                                     req_rpc_data->msg_info.body_fd,
4455                                     req_rpc_data->stream,
4456                                     task->thread->engine->port->id, b);
4457 
4458         if (nxt_slow_path(res != NXT_OK)) {
4459             nxt_http_request_error(task, r, NXT_HTTP_INTERNAL_SERVER_ERROR);
4460         }
4461     }
4462 
4463     if (app->timeout != 0) {
4464         r->timer.handler = nxt_router_app_timeout;
4465         r->timer_data = req_rpc_data;
4466         nxt_timer_add(task->thread->engine, &r->timer, app->timeout);
4467     }
4468 }
4469 
4470 
4471 static const nxt_http_request_state_t  nxt_http_request_send_state
4472     nxt_aligned(64) =
4473 {
4474     .error_handler = nxt_http_request_error_handler,
4475 };
4476 
4477 
4478 static void
4479 nxt_http_request_send_body(nxt_task_t *task, void *obj, void *data)
4480 {
4481     nxt_buf_t           *out;
4482     nxt_http_request_t  *r;
4483 
4484     r = obj;
4485 
4486     out = r->out;
4487 
4488     if (out != NULL) {
4489         r->out = NULL;
4490         nxt_http_request_send(task, r, out);
4491     }
4492 }
4493 
4494 
4495 static void
4496 nxt_router_response_error_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg,
4497     void *data)
4498 {
4499     nxt_request_rpc_data_t  *req_rpc_data;
4500 
4501     req_rpc_data = data;
4502 
4503     req_rpc_data->rpc_cancel = 0;
4504 
4505     /* TODO cancel message and return if cancelled. */
4506     // nxt_router_msg_cancel(task, &req_rpc_data->msg_info, req_rpc_data->stream);
4507 
4508     if (req_rpc_data->request != NULL) {
4509         nxt_http_request_error(task, req_rpc_data->request,
4510                                NXT_HTTP_SERVICE_UNAVAILABLE);
4511     }
4512 
4513     nxt_request_rpc_data_unlink(task, req_rpc_data);
4514 }
4515 
4516 
4517 static void
4518 nxt_router_app_port_ready(nxt_task_t *task, nxt_port_recv_msg_t *msg,
4519     void *data)
4520 {
4521     uint32_t             n;
4522     nxt_app_t            *app;
4523     nxt_bool_t           start_process, restarted;
4524     nxt_port_t           *port;
4525     nxt_app_joint_t      *app_joint;
4526     nxt_app_joint_rpc_t  *app_joint_rpc;
4527 
4528     nxt_assert(data != NULL);
4529 
4530     app_joint_rpc = data;
4531     app_joint = app_joint_rpc->app_joint;
4532     port = msg->u.new_port;
4533 
4534     nxt_assert(app_joint != NULL);
4535     nxt_assert(port != NULL);
4536     nxt_assert(port->id == 0);
4537 
4538     app = app_joint->app;
4539 
4540     nxt_router_app_joint_use(task, app_joint, -1);
4541 
4542     if (nxt_slow_path(app == NULL)) {
4543         nxt_debug(task, "new port ready for released app, send QUIT");
4544 
4545         nxt_port_socket_write(task, port, NXT_PORT_MSG_QUIT, -1, 0, 0, NULL);
4546 
4547         return;
4548     }
4549 
4550     nxt_thread_mutex_lock(&app->mutex);
4551 
4552     restarted = (app->generation != app_joint_rpc->generation);
4553 
4554     if (app_joint_rpc->proto) {
4555         nxt_assert(app->proto_port == NULL);
4556         nxt_assert(port->type == NXT_PROCESS_PROTOTYPE);
4557 
4558         n = app->proto_port_requests;
4559         app->proto_port_requests = 0;
4560 
4561         if (nxt_slow_path(restarted)) {
4562             nxt_thread_mutex_unlock(&app->mutex);
4563 
4564             nxt_debug(task, "proto port ready for restarted app, send QUIT");
4565 
4566             nxt_port_socket_write(task, port, NXT_PORT_MSG_QUIT, -1, 0, 0,
4567                                   NULL);
4568 
4569         } else {
4570             port->app = app;
4571             app->proto_port = port;
4572 
4573             nxt_thread_mutex_unlock(&app->mutex);
4574 
4575             nxt_port_use(task, port, 1);
4576         }
4577 
4578         port = task->thread->runtime->port_by_type[NXT_PROCESS_ROUTER];
4579 
4580         while (n > 0) {
4581             nxt_router_app_use(task, app, 1);
4582 
4583             nxt_router_start_app_process_handler(task, port, app);
4584 
4585             n--;
4586         }
4587 
4588         return;
4589     }
4590 
4591     nxt_assert(port->type == NXT_PROCESS_APP);
4592     nxt_assert(app->pending_processes != 0);
4593 
4594     app->pending_processes--;
4595 
4596     if (nxt_slow_path(restarted)) {
4597         nxt_debug(task, "new port ready for restarted app, send QUIT");
4598 
4599         start_process = !task->thread->engine->shutdown
4600                         && nxt_router_app_can_start(app)
4601                         && nxt_router_app_need_start(app);
4602 
4603         if (start_process) {
4604             app->pending_processes++;
4605         }
4606 
4607         nxt_thread_mutex_unlock(&app->mutex);
4608 
4609         nxt_port_socket_write(task, port, NXT_PORT_MSG_QUIT, -1, 0, 0, NULL);
4610 
4611         if (start_process) {
4612             nxt_router_start_app_process(task, app);
4613         }
4614 
4615         return;
4616     }
4617 
4618     port->app = app;
4619     port->main_app_port = port;
4620 
4621     app->processes++;
4622     nxt_port_hash_add(&app->port_hash, port);
4623     app->port_hash_count++;
4624 
4625     nxt_thread_mutex_unlock(&app->mutex);
4626 
4627     nxt_debug(task, "app '%V' new port ready, pid %PI, %d/%d",
4628               &app->name, port->pid, app->processes, app->pending_processes);
4629 
4630     nxt_port_socket_write(task, port, NXT_PORT_MSG_PORT_ACK, -1, 0, 0, NULL);
4631 
4632     nxt_router_app_port_release(task, app, port, NXT_APR_NEW_PORT);
4633 }
4634 
4635 
4636 static void
4637 nxt_router_app_port_error(nxt_task_t *task, nxt_port_recv_msg_t *msg,
4638     void *data)
4639 {
4640     nxt_app_t            *app;
4641     nxt_app_joint_t      *app_joint;
4642     nxt_queue_link_t     *link;
4643     nxt_http_request_t   *r;
4644     nxt_app_joint_rpc_t  *app_joint_rpc;
4645 
4646     nxt_assert(data != NULL);
4647 
4648     app_joint_rpc = data;
4649     app_joint = app_joint_rpc->app_joint;
4650 
4651     nxt_assert(app_joint != NULL);
4652 
4653     app = app_joint->app;
4654 
4655     nxt_router_app_joint_use(task, app_joint, -1);
4656 
4657     if (nxt_slow_path(app == NULL)) {
4658         nxt_debug(task, "start error for released app");
4659 
4660         return;
4661     }
4662 
4663     nxt_debug(task, "app '%V' %p start error", &app->name, app);
4664 
4665     link = NULL;
4666 
4667     nxt_thread_mutex_lock(&app->mutex);
4668 
4669     nxt_assert(app->pending_processes != 0);
4670 
4671     app->pending_processes--;
4672 
4673     if (app->processes == 0 && !nxt_queue_is_empty(&app->ack_waiting_req)) {
4674         link = nxt_queue_first(&app->ack_waiting_req);
4675 
4676         nxt_queue_remove(link);
4677         link->next = NULL;
4678     }
4679 
4680     nxt_thread_mutex_unlock(&app->mutex);
4681 
4682     while (link != NULL) {
4683         r = nxt_container_of(link, nxt_http_request_t, app_link);
4684 
4685         nxt_event_engine_post(r->engine, &r->err_work);
4686 
4687         link = NULL;
4688 
4689         nxt_thread_mutex_lock(&app->mutex);
4690 
4691         if (app->processes == 0 && app->pending_processes == 0
4692             && !nxt_queue_is_empty(&app->ack_waiting_req))
4693         {
4694             link = nxt_queue_first(&app->ack_waiting_req);
4695 
4696             nxt_queue_remove(link);
4697             link->next = NULL;
4698         }
4699 
4700         nxt_thread_mutex_unlock(&app->mutex);
4701     }
4702 }
4703 
4704 
4705 nxt_inline nxt_port_t *
4706 nxt_router_app_get_port_for_quit(nxt_task_t *task, nxt_app_t *app)
4707 {
4708     nxt_port_t  *port;
4709 
4710     port = NULL;
4711 
4712     nxt_thread_mutex_lock(&app->mutex);
4713 
4714     nxt_queue_each(port, &app->ports, nxt_port_t, app_link) {
4715 
4716         /* Caller is responsible to decrease port use count. */
4717         nxt_queue_chk_remove(&port->app_link);
4718 
4719         if (nxt_queue_chk_remove(&port->idle_link)) {
4720             app->idle_processes--;
4721 
4722             nxt_debug(task, "app '%V' move port %PI:%d out of %s for quit",
4723                       &app->name, port->pid, port->id,
4724                       (port->idle_start ? "idle_ports" : "spare_ports"));
4725         }
4726 
4727         nxt_port_hash_remove(&app->port_hash, port);
4728         app->port_hash_count--;
4729 
4730         port->app = NULL;
4731         app->processes--;
4732 
4733         break;
4734 
4735     } nxt_queue_loop;
4736 
4737     nxt_thread_mutex_unlock(&app->mutex);
4738 
4739     return port;
4740 }
4741 
4742 
4743 static void
4744 nxt_router_app_use(nxt_task_t *task, nxt_app_t *app, int i)
4745 {
4746     int  c;
4747 
4748     c = nxt_atomic_fetch_add(&app->use_count, i);
4749 
4750     if (i < 0 && c == -i) {
4751 
4752         if (task->thread->engine != app->engine) {
4753             nxt_event_engine_post(app->engine, &app->joint->free_app_work);
4754 
4755         } else {
4756             nxt_router_free_app(task, app->joint, NULL);
4757         }
4758     }
4759 }
4760 
4761 
4762 static void
4763 nxt_router_app_unlink(nxt_task_t *task, nxt_app_t *app)
4764 {
4765     nxt_debug(task, "app '%V' %p unlink", &app->name, app);
4766 
4767     nxt_queue_remove(&app->link);
4768 
4769     nxt_router_app_use(task, app, -1);
4770 }
4771 
4772 
4773 static void
4774 nxt_router_app_port_release(nxt_task_t *task, nxt_app_t *app, nxt_port_t *port,
4775     nxt_apr_action_t action)
4776 {
4777     int         inc_use;
4778     uint32_t    got_response, dec_requests;
4779     nxt_bool_t  adjust_idle_timer;
4780     nxt_port_t  *main_app_port;
4781 
4782     nxt_assert(port != NULL);
4783 
4784     inc_use = 0;
4785     got_response = 0;
4786     dec_requests = 0;
4787 
4788     switch (action) {
4789     case NXT_APR_NEW_PORT:
4790         break;
4791     case NXT_APR_REQUEST_FAILED:
4792         dec_requests = 1;
4793         inc_use = -1;
4794         break;
4795     case NXT_APR_GOT_RESPONSE:
4796         got_response = 1;
4797         inc_use = -1;
4798         break;
4799     case NXT_APR_UPGRADE:
4800         got_response = 1;
4801         break;
4802     case NXT_APR_CLOSE:
4803         inc_use = -1;
4804         break;
4805     }
4806 
4807     nxt_debug(task, "app '%V' release port %PI:%d: %d %d", &app->name,
4808               port->pid, port->id,
4809               (int) inc_use, (int) got_response);
4810 
4811     if (port->id == NXT_SHARED_PORT_ID) {
4812         nxt_thread_mutex_lock(&app->mutex);
4813 
4814         app->active_requests -= got_response + dec_requests;
4815 
4816         nxt_thread_mutex_unlock(&app->mutex);
4817 
4818         goto adjust_use;
4819     }
4820 
4821     main_app_port = port->main_app_port;
4822 
4823     nxt_thread_mutex_lock(&app->mutex);
4824 
4825     main_app_port->active_requests -= got_response + dec_requests;
4826     app->active_requests -= got_response + dec_requests;
4827 
4828     if (main_app_port->pair[1] != -1 && main_app_port->app_link.next == NULL) {
4829         nxt_queue_insert_tail(&app->ports, &main_app_port->app_link);
4830 
4831         nxt_port_inc_use(main_app_port);
4832     }
4833 
4834     adjust_idle_timer = 0;
4835 
4836     if (main_app_port->pair[1] != -1
4837         && main_app_port->active_requests == 0
4838         && main_app_port->active_websockets == 0
4839         && main_app_port->idle_link.next == NULL)
4840     {
4841         if (app->idle_processes == app->spare_processes
4842             && app->adjust_idle_work.data == NULL)
4843         {
4844             adjust_idle_timer = 1;
4845             app->adjust_idle_work.data = app;
4846             app->adjust_idle_work.next = NULL;
4847         }
4848 
4849         if (app->idle_processes < app->spare_processes) {
4850             nxt_queue_insert_tail(&app->spare_ports, &main_app_port->idle_link);
4851 
4852             nxt_debug(task, "app '%V' move port %PI:%d to spare_ports",
4853                       &app->name, main_app_port->pid, main_app_port->id);
4854         } else {
4855             nxt_queue_insert_tail(&app->idle_ports, &main_app_port->idle_link);
4856 
4857             main_app_port->idle_start = task->thread->engine->timers.now;
4858 
4859             nxt_debug(task, "app '%V' move port %PI:%d to idle_ports",
4860                       &app->name, main_app_port->pid, main_app_port->id);
4861         }
4862 
4863         app->idle_processes++;
4864     }
4865 
4866     nxt_thread_mutex_unlock(&app->mutex);
4867 
4868     if (adjust_idle_timer) {
4869         nxt_router_app_use(task, app, 1);
4870         nxt_event_engine_post(app->engine, &app->adjust_idle_work);
4871     }
4872 
4873     /* ? */
4874     if (main_app_port->pair[1] == -1) {
4875         nxt_debug(task, "app '%V' %p port %p already closed (pid %PI dead?)",
4876                   &app->name, app, main_app_port, main_app_port->pid);
4877 
4878         goto adjust_use;
4879     }
4880 
4881     nxt_debug(task, "app '%V' %p requests queue is empty, keep the port",
4882               &app->name, app);
4883 
4884 adjust_use:
4885 
4886     nxt_port_use(task, port, inc_use);
4887 }
4888 
4889 
4890 void
4891 nxt_router_app_port_close(nxt_task_t *task, nxt_port_t *port)
4892 {
4893     nxt_app_t         *app;
4894     nxt_bool_t        unchain, start_process;
4895     nxt_port_t        *idle_port;
4896     nxt_queue_link_t  *idle_lnk;
4897 
4898     app = port->app;
4899 
4900     nxt_assert(app != NULL);
4901 
4902     nxt_thread_mutex_lock(&app->mutex);
4903 
4904     if (port == app->proto_port) {
4905         app->proto_port = NULL;
4906         port->app = NULL;
4907 
4908         nxt_thread_mutex_unlock(&app->mutex);
4909 
4910         nxt_debug(task, "app '%V' prototype pid %PI closed", &app->name,
4911                   port->pid);
4912 
4913         nxt_port_use(task, port, -1);
4914 
4915         return;
4916     }
4917 
4918     nxt_port_hash_remove(&app->port_hash, port);
4919     app->port_hash_count--;
4920 
4921     if (port->id != 0) {
4922         nxt_thread_mutex_unlock(&app->mutex);
4923 
4924         nxt_debug(task, "app '%V' port (%PI, %d) closed", &app->name,
4925                   port->pid, port->id);
4926 
4927         return;
4928     }
4929 
4930     unchain = nxt_queue_chk_remove(&port->app_link);
4931 
4932     if (nxt_queue_chk_remove(&port->idle_link)) {
4933         app->idle_processes--;
4934 
4935         nxt_debug(task, "app '%V' move port %PI:%d out of %s before close",
4936                   &app->name, port->pid, port->id,
4937                   (port->idle_start ? "idle_ports" : "spare_ports"));
4938 
4939         if (port->idle_start == 0
4940             && app->idle_processes >= app->spare_processes)
4941         {
4942             nxt_assert(!nxt_queue_is_empty(&app->idle_ports));
4943 
4944             idle_lnk = nxt_queue_last(&app->idle_ports);
4945             idle_port = nxt_queue_link_data(idle_lnk, nxt_port_t, idle_link);
4946             nxt_queue_remove(idle_lnk);
4947 
4948             nxt_queue_insert_tail(&app->spare_ports, idle_lnk);
4949 
4950             idle_port->idle_start = 0;
4951 
4952             nxt_debug(task, "app '%V' move port %PI:%d from idle_ports "
4953                       "to spare_ports",
4954                       &app->name, idle_port->pid, idle_port->id);
4955         }
4956     }
4957 
4958     app->processes--;
4959 
4960     start_process = !task->thread->engine->shutdown
4961                     && nxt_router_app_can_start(app)
4962                     && nxt_router_app_need_start(app);
4963 
4964     if (start_process) {
4965         app->pending_processes++;
4966     }
4967 
4968     nxt_thread_mutex_unlock(&app->mutex);
4969 
4970     nxt_debug(task, "app '%V' pid %PI closed", &app->name, port->pid);
4971 
4972     if (unchain) {
4973         nxt_port_use(task, port, -1);
4974     }
4975 
4976     if (start_process) {
4977         nxt_router_start_app_process(task, app);
4978     }
4979 }
4980 
4981 
4982 static void
4983 nxt_router_adjust_idle_timer(nxt_task_t *task, void *obj, void *data)
4984 {
4985     nxt_app_t           *app;
4986     nxt_bool_t          queued;
4987     nxt_port_t          *port;
4988     nxt_msec_t          timeout, threshold;
4989     nxt_queue_link_t    *lnk;
4990     nxt_event_engine_t  *engine;
4991 
4992     app = obj;
4993     queued = (data == app);
4994 
4995     nxt_debug(task, "nxt_router_adjust_idle_timer: app \"%V\", queued %b",
4996               &app->name, queued);
4997 
4998     engine = task->thread->engine;
4999 
5000     nxt_assert(app->engine == engine);
5001 
5002     threshold = engine->timers.now + app->joint->idle_timer.bias;
5003     timeout = 0;
5004 
5005     nxt_thread_mutex_lock(&app->mutex);
5006 
5007     if (queued) {
5008         app->adjust_idle_work.data = NULL;
5009     }
5010 
5011     nxt_debug(task, "app '%V' idle_processes %d, spare_processes %d",
5012               &app->name,
5013               (int) app->idle_processes, (int) app->spare_processes);
5014 
5015     while (app->idle_processes > app->spare_processes) {
5016 
5017         nxt_assert(!nxt_queue_is_empty(&app->idle_ports));
5018 
5019         lnk = nxt_queue_first(&app->idle_ports);
5020         port = nxt_queue_link_data(lnk, nxt_port_t, idle_link);
5021 
5022         timeout = port->idle_start + app->idle_timeout;
5023 
5024         nxt_debug(task, "app '%V' pid %PI, start %M, timeout %M, threshold %M",
5025                   &app->name, port->pid,
5026                   port->idle_start, timeout, threshold);
5027 
5028         if (timeout > threshold) {
5029             break;
5030         }
5031 
5032         nxt_queue_remove(lnk);
5033         lnk->next = NULL;
5034 
5035         nxt_debug(task, "app '%V' move port %PI:%d out of idle_ports (timeout)",
5036                   &app->name, port->pid, port->id);
5037 
5038         nxt_queue_chk_remove(&port->app_link);
5039 
5040         nxt_port_hash_remove(&app->port_hash, port);
5041         app->port_hash_count--;
5042 
5043         app->idle_processes--;
5044         app->processes--;
5045         port->app = NULL;
5046 
5047         nxt_thread_mutex_unlock(&app->mutex);
5048 
5049         nxt_debug(task, "app '%V' send QUIT to idle port %PI",
5050                   &app->name, port->pid);
5051 
5052         nxt_port_socket_write(task, port, NXT_PORT_MSG_QUIT, -1, 0, 0, NULL);
5053 
5054         nxt_port_use(task, port, -1);
5055 
5056         nxt_thread_mutex_lock(&app->mutex);
5057     }
5058 
5059     nxt_thread_mutex_unlock(&app->mutex);
5060 
5061     if (timeout > threshold) {
5062         nxt_timer_add(engine, &app->joint->idle_timer, timeout - threshold);
5063 
5064     } else {
5065         nxt_timer_disable(engine, &app->joint->idle_timer);
5066     }
5067 
5068     if (queued) {
5069         nxt_router_app_use(task, app, -1);
5070     }
5071 }
5072 
5073 
5074 static void
5075 nxt_router_app_idle_timeout(nxt_task_t *task, void *obj, void *data)
5076 {
5077     nxt_timer_t      *timer;
5078     nxt_app_joint_t  *app_joint;
5079 
5080     timer = obj;
5081     app_joint = nxt_container_of(timer, nxt_app_joint_t, idle_timer);
5082 
5083     if (nxt_fast_path(app_joint->app != NULL)) {
5084         nxt_router_adjust_idle_timer(task, app_joint->app, NULL);
5085     }
5086 }
5087 
5088 
5089 static void
5090 nxt_router_app_joint_release_handler(nxt_task_t *task, void *obj, void *data)
5091 {
5092     nxt_timer_t      *timer;
5093     nxt_app_joint_t  *app_joint;
5094 
5095     timer = obj;
5096     app_joint = nxt_container_of(timer, nxt_app_joint_t, idle_timer);
5097 
5098     nxt_router_app_joint_use(task, app_joint, -1);
5099 }
5100 
5101 
5102 static void
5103 nxt_router_free_app(nxt_task_t *task, void *obj, void *data)
5104 {
5105     nxt_app_t        *app;
5106     nxt_port_t       *port, *proto_port;
5107     nxt_app_joint_t  *app_joint;
5108 
5109     app_joint = obj;
5110     app = app_joint->app;
5111 
5112     for ( ;; ) {
5113         port = nxt_router_app_get_port_for_quit(task, app);
5114         if (port == NULL) {
5115             break;
5116         }
5117 
5118         nxt_port_use(task, port, -1);
5119     }
5120 
5121     nxt_thread_mutex_lock(&app->mutex);
5122 
5123     for ( ;; ) {
5124         port = nxt_port_hash_retrieve(&app->port_hash);
5125         if (port == NULL) {
5126             break;
5127         }
5128 
5129         app->port_hash_count--;
5130 
5131         port->app = NULL;
5132 
5133         nxt_port_close(task, port);
5134 
5135         nxt_port_use(task, port, -1);
5136     }
5137 
5138     proto_port = app->proto_port;
5139 
5140     if (proto_port != NULL) {
5141         nxt_debug(task, "send QUIT to prototype '%V' pid %PI", &app->name,
5142                   proto_port->pid);
5143 
5144         app->proto_port = NULL;
5145         proto_port->app = NULL;
5146     }
5147 
5148     nxt_thread_mutex_unlock(&app->mutex);
5149 
5150     if (proto_port != NULL) {
5151         nxt_port_socket_write(task, proto_port, NXT_PORT_MSG_QUIT,
5152                               -1, 0, 0, NULL);
5153 
5154         nxt_port_close(task, proto_port);
5155 
5156         nxt_port_use(task, proto_port, -1);
5157     }
5158 
5159     nxt_assert(app->proto_port == NULL);
5160     nxt_assert(app->processes == 0);
5161     nxt_assert(app->active_requests == 0);
5162     nxt_assert(app->port_hash_count == 0);
5163     nxt_assert(app->idle_processes == 0);
5164     nxt_assert(nxt_queue_is_empty(&app->ports));
5165     nxt_assert(nxt_queue_is_empty(&app->spare_ports));
5166     nxt_assert(nxt_queue_is_empty(&app->idle_ports));
5167 
5168     nxt_port_mmaps_destroy(&app->outgoing, 1);
5169 
5170     nxt_thread_mutex_destroy(&app->outgoing.mutex);
5171 
5172     if (app->shared_port != NULL) {
5173         app->shared_port->app = NULL;
5174         nxt_port_close(task, app->shared_port);
5175         nxt_port_use(task, app->shared_port, -1);
5176 
5177         app->shared_port = NULL;
5178     }
5179 
5180     nxt_thread_mutex_destroy(&app->mutex);
5181     nxt_mp_destroy(app->mem_pool);
5182 
5183     app_joint->app = NULL;
5184 
5185     if (nxt_timer_delete(task->thread->engine, &app_joint->idle_timer)) {
5186         app_joint->idle_timer.handler = nxt_router_app_joint_release_handler;
5187         nxt_timer_add(task->thread->engine, &app_joint->idle_timer, 0);
5188 
5189     } else {
5190         nxt_router_app_joint_use(task, app_joint, -1);
5191     }
5192 }
5193 
5194 
5195 static void
5196 nxt_router_app_port_get(nxt_task_t *task, nxt_app_t *app,
5197     nxt_request_rpc_data_t *req_rpc_data)
5198 {
5199     nxt_bool_t          start_process;
5200     nxt_port_t          *port;
5201     nxt_http_request_t  *r;
5202 
5203     start_process = 0;
5204 
5205     nxt_thread_mutex_lock(&app->mutex);
5206 
5207     port = app->shared_port;
5208     nxt_port_inc_use(port);
5209 
5210     app->active_requests++;
5211 
5212     if (nxt_router_app_can_start(app) && nxt_router_app_need_start(app)) {
5213         app->pending_processes++;
5214         start_process = 1;
5215     }
5216 
5217     r = req_rpc_data->request;
5218 
5219     /*
5220      * Put request into application-wide list to be able to cancel request
5221      * if something goes wrong with application processes.
5222      */
5223     nxt_queue_insert_tail(&app->ack_waiting_req, &r->app_link);
5224 
5225     nxt_thread_mutex_unlock(&app->mutex);
5226 
5227     /*
5228      * Retain request memory pool while request is linked in ack_waiting_req
5229      * to guarantee request structure memory is accessble.
5230      */
5231     nxt_mp_retain(r->mem_pool);
5232 
5233     req_rpc_data->app_port = port;
5234     req_rpc_data->apr_action = NXT_APR_REQUEST_FAILED;
5235 
5236     if (start_process) {
5237         nxt_router_start_app_process(task, app);
5238     }
5239 }
5240 
5241 
5242 void
5243 nxt_router_process_http_request(nxt_task_t *task, nxt_http_request_t *r,
5244     nxt_http_action_t *action)
5245 {
5246     nxt_event_engine_t      *engine;
5247     nxt_http_app_conf_t     *conf;
5248     nxt_request_rpc_data_t  *req_rpc_data;
5249 
5250     conf = action->u.conf;
5251     engine = task->thread->engine;
5252 
5253     r->app_target = conf->target;
5254 
5255     req_rpc_data = nxt_port_rpc_register_handler_ex(task, engine->port,
5256                                           nxt_router_response_ready_handler,
5257                                           nxt_router_response_error_handler,
5258                                           sizeof(nxt_request_rpc_data_t));
5259     if (nxt_slow_path(req_rpc_data == NULL)) {
5260         nxt_http_request_error(task, r, NXT_HTTP_INTERNAL_SERVER_ERROR);
5261         return;
5262     }
5263 
5264     /*
5265      * At this point we have request req_rpc_data allocated and registered
5266      * in port handlers.  Need to fixup request memory pool.  Counterpart
5267      * release will be called via following call chain:
5268      *    nxt_request_rpc_data_unlink() ->
5269      *        nxt_router_http_request_release_post() ->
5270      *            nxt_router_http_request_release()
5271      */
5272     nxt_mp_retain(r->mem_pool);
5273 
5274     r->timer.task = &engine->task;
5275     r->timer.work_queue = &engine->fast_work_queue;
5276     r->timer.log = engine->task.log;
5277     r->timer.bias = NXT_TIMER_DEFAULT_BIAS;
5278 
5279     r->engine = engine;
5280     r->err_work.handler = nxt_router_http_request_error;
5281     r->err_work.task = task;
5282     r->err_work.obj = r;
5283 
5284     req_rpc_data->stream = nxt_port_rpc_ex_stream(req_rpc_data);
5285     req_rpc_data->app = conf->app;
5286     req_rpc_data->msg_info.body_fd = -1;
5287     req_rpc_data->rpc_cancel = 1;
5288 
5289     nxt_router_app_use(task, conf->app, 1);
5290 
5291     req_rpc_data->request = r;
5292     r->req_rpc_data = req_rpc_data;
5293 
5294     if (r->last != NULL) {
5295         r->last->completion_handler = nxt_router_http_request_done;
5296     }
5297 
5298     nxt_router_app_port_get(task, conf->app, req_rpc_data);
5299     nxt_router_app_prepare_request(task, req_rpc_data);
5300 }
5301 
5302 
5303 static void
5304 nxt_router_http_request_error(nxt_task_t *task, void *obj, void *data)
5305 {
5306     nxt_http_request_t  *r;
5307 
5308     r = obj;
5309 
5310     nxt_debug(task, "router http request error (rpc_data %p)", r->req_rpc_data);
5311 
5312     nxt_http_request_error(task, r, NXT_HTTP_SERVICE_UNAVAILABLE);
5313 
5314     if (r->req_rpc_data != NULL) {
5315         nxt_request_rpc_data_unlink(task, r->req_rpc_data);
5316     }
5317 
5318     nxt_mp_release(r->mem_pool);
5319 }
5320 
5321 
5322 static void
5323 nxt_router_http_request_done(nxt_task_t *task, void *obj, void *data)
5324 {
5325     nxt_http_request_t  *r;
5326 
5327     r = data;
5328 
5329     nxt_debug(task, "router http request done (rpc_data %p)", r->req_rpc_data);
5330 
5331     if (r->req_rpc_data != NULL) {
5332         nxt_request_rpc_data_unlink(task, r->req_rpc_data);
5333     }
5334 
5335     nxt_http_request_close_handler(task, r, r->proto.any);
5336 }
5337 
5338 
5339 static void
5340 nxt_router_app_prepare_request(nxt_task_t *task,
5341     nxt_request_rpc_data_t *req_rpc_data)
5342 {
5343     nxt_app_t         *app;
5344     nxt_buf_t         *buf, *body;
5345     nxt_int_t         res;
5346     nxt_port_t        *port, *reply_port;
5347 
5348     int                   notify;
5349     struct {
5350         nxt_port_msg_t       pm;
5351         nxt_port_mmap_msg_t  mm;
5352     } msg;
5353 
5354 
5355     app = req_rpc_data->app;
5356 
5357     nxt_assert(app != NULL);
5358 
5359     port = req_rpc_data->app_port;
5360 
5361     nxt_assert(port != NULL);
5362     nxt_assert(port->queue != NULL);
5363 
5364     reply_port = task->thread->engine->port;
5365 
5366     buf = nxt_router_prepare_msg(task, req_rpc_data->request, app,
5367                                  nxt_app_msg_prefix[app->type]);
5368     if (nxt_slow_path(buf == NULL)) {
5369         nxt_alert(task, "stream #%uD, app '%V': failed to prepare app message",
5370                   req_rpc_data->stream, &app->name);
5371 
5372         nxt_http_request_error(task, req_rpc_data->request,
5373                                NXT_HTTP_INTERNAL_SERVER_ERROR);
5374 
5375         return;
5376     }
5377 
5378     nxt_debug(task, "about to send %O bytes buffer to app process port %d",
5379                     nxt_buf_used_size(buf),
5380                     port->socket.fd);
5381 
5382     req_rpc_data->msg_info.buf = buf;
5383 
5384     body = req_rpc_data->request->body;
5385 
5386     if (body != NULL && nxt_buf_is_file(body)) {
5387         req_rpc_data->msg_info.body_fd = body->file->fd;
5388 
5389         body->file->fd = -1;
5390 
5391     } else {
5392         req_rpc_data->msg_info.body_fd = -1;
5393     }
5394 
5395     msg.pm.stream = req_rpc_data->stream;
5396     msg.pm.pid = reply_port->pid;
5397     msg.pm.reply_port = reply_port->id;
5398     msg.pm.type = NXT_PORT_MSG_REQ_HEADERS;
5399     msg.pm.last = 0;
5400     msg.pm.mmap = 1;
5401     msg.pm.nf = 0;
5402     msg.pm.mf = 0;
5403     msg.pm.tracking = 0;
5404 
5405     nxt_port_mmap_handler_t *mmap_handler = buf->parent;
5406     nxt_port_mmap_header_t *hdr = mmap_handler->hdr;
5407 
5408     msg.mm.mmap_id = hdr->id;
5409     msg.mm.chunk_id = nxt_port_mmap_chunk_id(hdr, buf->mem.pos);
5410     msg.mm.size = nxt_buf_used_size(buf);
5411 
5412     res = nxt_app_queue_send(port->queue, &msg, sizeof(msg),
5413                              req_rpc_data->stream, &notify,
5414                              &req_rpc_data->msg_info.tracking_cookie);
5415     if (nxt_fast_path(res == NXT_OK)) {
5416         if (notify != 0) {
5417             (void) nxt_port_socket_write(task, port,
5418                                          NXT_PORT_MSG_READ_QUEUE,
5419                                          -1, req_rpc_data->stream,
5420                                          reply_port->id, NULL);
5421 
5422         } else {
5423             nxt_debug(task, "queue is not empty");
5424         }
5425 
5426         buf->is_port_mmap_sent = 1;
5427         buf->mem.pos = buf->mem.free;
5428 
5429     } else {
5430         nxt_alert(task, "stream #%uD, app '%V': failed to send app message",
5431                   req_rpc_data->stream, &app->name);
5432 
5433         nxt_http_request_error(task, req_rpc_data->request,
5434                                NXT_HTTP_INTERNAL_SERVER_ERROR);
5435     }
5436 }
5437 
5438 
5439 struct nxt_fields_iter_s {
5440     nxt_list_part_t   *part;
5441     nxt_http_field_t  *field;
5442 };
5443 
5444 typedef struct nxt_fields_iter_s  nxt_fields_iter_t;
5445 
5446 
5447 static nxt_http_field_t *
5448 nxt_fields_part_first(nxt_list_part_t *part, nxt_fields_iter_t *i)
5449 {
5450     if (part == NULL) {
5451         return NULL;
5452     }
5453 
5454     while (part->nelts == 0) {
5455         part = part->next;
5456         if (part == NULL) {
5457             return NULL;
5458         }
5459     }
5460 
5461     i->part = part;
5462     i->field = nxt_list_data(i->part);
5463 
5464     return i->field;
5465 }
5466 
5467 
5468 static nxt_http_field_t *
5469 nxt_fields_first(nxt_list_t *fields, nxt_fields_iter_t *i)
5470 {
5471     return nxt_fields_part_first(nxt_list_part(fields), i);
5472 }
5473 
5474 
5475 static nxt_http_field_t *
5476 nxt_fields_next(nxt_fields_iter_t *i)
5477 {
5478     nxt_http_field_t  *end = nxt_list_data(i->part);
5479 
5480     end += i->part->nelts;
5481     i->field++;
5482 
5483     if (i->field < end) {
5484         return i->field;
5485     }
5486 
5487     return nxt_fields_part_first(i->part->next, i);
5488 }
5489 
5490 
5491 static nxt_buf_t *
5492 nxt_router_prepare_msg(nxt_task_t *task, nxt_http_request_t *r,
5493     nxt_app_t *app, const nxt_str_t *prefix)
5494 {
5495     void                *target_pos, *query_pos;
5496     u_char              *pos, *end, *p, c;
5497     size_t              fields_count, req_size, size, free_size;
5498     size_t              copy_size;
5499     nxt_off_t           content_length;
5500     nxt_buf_t           *b, *buf, *out, **tail;
5501     nxt_http_field_t    *field, *dup;
5502     nxt_unit_field_t    *dst_field;
5503     nxt_fields_iter_t   iter, dup_iter;
5504     nxt_unit_request_t  *req;
5505 
5506     req_size = sizeof(nxt_unit_request_t)
5507                + r->method->length + 1
5508                + r->version.length + 1
5509                + r->remote->length + 1
5510                + r->local->length + 1
5511                + r->server_name.length + 1
5512                + r->target.length + 1
5513                + (r->path->start != r->target.start ? r->path->length + 1 : 0);
5514 
5515     content_length = r->content_length_n < 0 ? 0 : r->content_length_n;
5516     fields_count = 0;
5517 
5518     nxt_list_each(field, r->fields) {
5519         fields_count++;
5520 
5521         req_size += field->name_length + prefix->length + 1
5522                     + field->value_length + 1;
5523     } nxt_list_loop;
5524 
5525     req_size += fields_count * sizeof(nxt_unit_field_t);
5526 
5527     if (nxt_slow_path(req_size > PORT_MMAP_DATA_SIZE)) {
5528         nxt_alert(task, "headers to big to fit in shared memory (%d)",
5529                   (int) req_size);
5530 
5531         return NULL;
5532     }
5533 
5534     out = nxt_port_mmap_get_buf(task, &app->outgoing,
5535               nxt_min(req_size + content_length, PORT_MMAP_DATA_SIZE));
5536     if (nxt_slow_path(out == NULL)) {
5537         return NULL;
5538     }
5539 
5540     req = (nxt_unit_request_t *) out->mem.free;
5541     out->mem.free += req_size;
5542 
5543     req->app_target = r->app_target;
5544 
5545     req->content_length = content_length;
5546 
5547     p = (u_char *) (req->fields + fields_count);
5548 
5549     nxt_debug(task, "fields_count=%d", (int) fields_count);
5550 
5551     req->method_length = r->method->length;
5552     nxt_unit_sptr_set(&req->method, p);
5553     p = nxt_cpymem(p, r->method->start, r->method->length);
5554     *p++ = '\0';
5555 
5556     req->version_length = r->version.length;
5557     nxt_unit_sptr_set(&req->version, p);
5558     p = nxt_cpymem(p, r->version.start, r->version.length);
5559     *p++ = '\0';
5560 
5561     req->remote_length = r->remote->address_length;
5562     nxt_unit_sptr_set(&req->remote, p);
5563     p = nxt_cpymem(p, nxt_sockaddr_address(r->remote),
5564                    r->remote->address_length);
5565     *p++ = '\0';
5566 
5567     req->local_length = r->local->address_length;
5568     nxt_unit_sptr_set(&req->local, p);
5569     p = nxt_cpymem(p, nxt_sockaddr_address(r->local), r->local->address_length);
5570     *p++ = '\0';
5571 
5572     req->tls = (r->tls != NULL);
5573     req->websocket_handshake = r->websocket_handshake;
5574 
5575     req->server_name_length = r->server_name.length;
5576     nxt_unit_sptr_set(&req->server_name, p);
5577     p = nxt_cpymem(p, r->server_name.start, r->server_name.length);
5578     *p++ = '\0';
5579 
5580     target_pos = p;
5581     req->target_length = (uint32_t) r->target.length;
5582     nxt_unit_sptr_set(&req->target, p);
5583     p = nxt_cpymem(p, r->target.start, r->target.length);
5584     *p++ = '\0';
5585 
5586     req->path_length = (uint32_t) r->path->length;
5587     if (r->path->start == r->target.start) {
5588         nxt_unit_sptr_set(&req->path, target_pos);
5589 
5590     } else {
5591         nxt_unit_sptr_set(&req->path, p);
5592         p = nxt_cpymem(p, r->path->start, r->path->length);
5593         *p++ = '\0';
5594     }
5595 
5596     req->query_length = (uint32_t) r->args->length;
5597     if (r->args->start != NULL) {
5598         query_pos = nxt_pointer_to(target_pos,
5599                                    r->args->start - r->target.start);
5600 
5601         nxt_unit_sptr_set(&req->query, query_pos);
5602 
5603     } else {
5604         req->query.offset = 0;
5605     }
5606 
5607     req->content_length_field = NXT_UNIT_NONE_FIELD;
5608     req->content_type_field   = NXT_UNIT_NONE_FIELD;
5609     req->cookie_field         = NXT_UNIT_NONE_FIELD;
5610     req->authorization_field  = NXT_UNIT_NONE_FIELD;
5611 
5612     dst_field = req->fields;
5613 
5614     for (field = nxt_fields_first(r->fields, &iter);
5615          field != NULL;
5616          field = nxt_fields_next(&iter))
5617     {
5618         if (field->skip) {
5619             continue;
5620         }
5621 
5622         dst_field->hash = field->hash;
5623         dst_field->skip = 0;
5624         dst_field->name_length = field->name_length + prefix->length;
5625         dst_field->value_length = field->value_length;
5626 
5627         if (field == r->content_length) {
5628             req->content_length_field = dst_field - req->fields;
5629 
5630         } else if (field == r->content_type) {
5631             req->content_type_field = dst_field - req->fields;
5632 
5633         } else if (field == r->cookie) {
5634             req->cookie_field = dst_field - req->fields;
5635 
5636         } else if (field == r->authorization) {
5637             req->authorization_field = dst_field - req->fields;
5638         }
5639 
5640         nxt_debug(task, "add field 0x%04Xd, %d, %d, %p : %d %p",
5641                   (int) field->hash, (int) field->skip,
5642                   (int) field->name_length, field->name,
5643                   (int) field->value_length, field->value);
5644 
5645         if (prefix->length != 0) {
5646             nxt_unit_sptr_set(&dst_field->name, p);
5647             p = nxt_cpymem(p, prefix->start, prefix->length);
5648 
5649             end = field->name + field->name_length;
5650             for (pos = field->name; pos < end; pos++) {
5651                 c = *pos;
5652 
5653                 if (c >= 'a' && c <= 'z') {
5654                     *p++ = (c & ~0x20);
5655                     continue;
5656                 }
5657 
5658                 if (c == '-') {
5659                     *p++ = '_';
5660                     continue;
5661                 }
5662 
5663                 *p++ = c;
5664             }
5665 
5666         } else {
5667             nxt_unit_sptr_set(&dst_field->name, p);
5668             p = nxt_cpymem(p, field->name, field->name_length);
5669         }
5670 
5671         *p++ = '\0';
5672 
5673         nxt_unit_sptr_set(&dst_field->value, p);
5674         p = nxt_cpymem(p, field->value, field->value_length);
5675 
5676         if (prefix->length != 0) {
5677             dup_iter = iter;
5678 
5679             for (dup = nxt_fields_next(&dup_iter);
5680                  dup != NULL;
5681                  dup = nxt_fields_next(&dup_iter))
5682             {
5683                 if (dup->name_length != field->name_length
5684                     || dup->skip
5685                     || dup->hash != field->hash
5686                     || nxt_memcasecmp(dup->name, field->name, dup->name_length))
5687                 {
5688                     continue;
5689                 }
5690 
5691                 p = nxt_cpymem(p, ", ", 2);
5692                 p = nxt_cpymem(p, dup->value, dup->value_length);
5693 
5694                 dst_field->value_length += 2 + dup->value_length;
5695 
5696                 dup->skip = 1;
5697             }
5698         }
5699 
5700         *p++ = '\0';
5701 
5702         dst_field++;
5703     }
5704 
5705     req->fields_count = (uint32_t) (dst_field - req->fields);
5706 
5707     nxt_unit_sptr_set(&req->preread_content, out->mem.free);
5708 
5709     buf = out;
5710     tail = &buf->next;
5711 
5712     for (b = r->body; b != NULL; b = b->next) {
5713         size = nxt_buf_mem_used_size(&b->mem);
5714         pos = b->mem.pos;
5715 
5716         while (size > 0) {
5717             if (buf == NULL) {
5718                 free_size = nxt_min(size, PORT_MMAP_DATA_SIZE);
5719 
5720                 buf = nxt_port_mmap_get_buf(task, &app->outgoing, free_size);
5721                 if (nxt_slow_path(buf == NULL)) {
5722                     while (out != NULL) {
5723                         buf = out->next;
5724                         out->next = NULL;
5725                         out->completion_handler(task, out, out->parent);
5726                         out = buf;
5727                     }
5728                     return NULL;
5729                 }
5730 
5731                 *tail = buf;
5732                 tail = &buf->next;
5733 
5734             } else {
5735                 free_size = nxt_buf_mem_free_size(&buf->mem);
5736                 if (free_size < size
5737                     && nxt_port_mmap_increase_buf(task, buf, size, 1)
5738                        == NXT_OK)
5739                 {
5740                     free_size = nxt_buf_mem_free_size(&buf->mem);
5741                 }
5742             }
5743 
5744             if (free_size > 0) {
5745                 copy_size = nxt_min(free_size, size);
5746 
5747                 buf->mem.free = nxt_cpymem(buf->mem.free, pos, copy_size);
5748 
5749                 size -= copy_size;
5750                 pos += copy_size;
5751 
5752                 if (size == 0) {
5753                     break;
5754                 }
5755             }
5756 
5757             buf = NULL;
5758         }
5759     }
5760 
5761     return out;
5762 }
5763 
5764 
5765 static void
5766 nxt_router_app_timeout(nxt_task_t *task, void *obj, void *data)
5767 {
5768     nxt_timer_t              *timer;
5769     nxt_http_request_t       *r;
5770     nxt_request_rpc_data_t   *req_rpc_data;
5771 
5772     timer = obj;
5773 
5774     nxt_debug(task, "router app timeout");
5775 
5776     r = nxt_timer_data(timer, nxt_http_request_t, timer);
5777     req_rpc_data = r->timer_data;
5778 
5779     nxt_http_request_error(task, r, NXT_HTTP_SERVICE_UNAVAILABLE);
5780 
5781     nxt_request_rpc_data_unlink(task, req_rpc_data);
5782 }
5783 
5784 
5785 static void
5786 nxt_router_http_request_release_post(nxt_task_t *task, nxt_http_request_t *r)
5787 {
5788     r->timer.handler = nxt_router_http_request_release;
5789     nxt_timer_add(task->thread->engine, &r->timer, 0);
5790 }
5791 
5792 
5793 static void
5794 nxt_router_http_request_release(nxt_task_t *task, void *obj, void *data)
5795 {
5796     nxt_http_request_t  *r;
5797 
5798     nxt_debug(task, "http request pool release");
5799 
5800     r = nxt_timer_data(obj, nxt_http_request_t, timer);
5801 
5802     nxt_mp_release(r->mem_pool);
5803 }
5804 
5805 
5806 static void
5807 nxt_router_oosm_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
5808 {
5809     size_t                   mi;
5810     uint32_t                 i;
5811     nxt_bool_t               ack;
5812     nxt_process_t            *process;
5813     nxt_free_map_t           *m;
5814     nxt_port_mmap_handler_t  *mmap_handler;
5815 
5816     nxt_debug(task, "oosm in %PI", msg->port_msg.pid);
5817 
5818     process = nxt_runtime_process_find(task->thread->runtime,
5819                                        msg->port_msg.pid);
5820     if (nxt_slow_path(process == NULL)) {
5821         return;
5822     }
5823 
5824     ack = 0;
5825 
5826     /*
5827      * To mitigate possible racing condition (when OOSM message received
5828      * after some of the memory was already freed), need to try to find
5829      * first free segment in shared memory and send ACK if found.
5830      */
5831 
5832     nxt_thread_mutex_lock(&process->incoming.mutex);
5833 
5834     for (i = 0; i < process->incoming.size; i++) {
5835         mmap_handler = process->incoming.elts[i].mmap_handler;
5836 
5837         if (nxt_slow_path(mmap_handler == NULL)) {
5838             continue;
5839         }
5840 
5841         m = mmap_handler->hdr->free_map;
5842 
5843         for (mi = 0; mi < MAX_FREE_IDX; mi++) {
5844             if (m[mi] != 0) {
5845                 ack = 1;
5846 
5847                 nxt_debug(task, "oosm: already free #%uD %uz = 0x%08xA",
5848                           i, mi, m[mi]);
5849 
5850                 break;
5851             }
5852         }
5853     }
5854 
5855     nxt_thread_mutex_unlock(&process->incoming.mutex);
5856 
5857     if (ack) {
5858         nxt_process_broadcast_shm_ack(task, process);
5859     }
5860 }
5861 
5862 
5863 static void
5864 nxt_router_get_mmap_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
5865 {
5866     nxt_fd_t                 fd;
5867     nxt_port_t               *port;
5868     nxt_runtime_t            *rt;
5869     nxt_port_mmaps_t         *mmaps;
5870     nxt_port_msg_get_mmap_t  *get_mmap_msg;
5871     nxt_port_mmap_handler_t  *mmap_handler;
5872 
5873     rt = task->thread->runtime;
5874 
5875     port = nxt_runtime_port_find(rt, msg->port_msg.pid,
5876                                  msg->port_msg.reply_port);
5877     if (nxt_slow_path(port == NULL)) {
5878         nxt_alert(task, "get_mmap_handler: reply_port %PI:%d not found",
5879                   msg->port_msg.pid, msg->port_msg.reply_port);
5880 
5881         return;
5882     }
5883 
5884     if (nxt_slow_path(nxt_buf_used_size(msg->buf)
5885                       < (int) sizeof(nxt_port_msg_get_mmap_t)))
5886     {
5887         nxt_alert(task, "get_mmap_handler: message buffer too small (%d)",
5888                   (int) nxt_buf_used_size(msg->buf));
5889 
5890         return;
5891     }
5892 
5893     get_mmap_msg = (nxt_port_msg_get_mmap_t *) msg->buf->mem.pos;
5894 
5895     nxt_assert(port->type == NXT_PROCESS_APP);
5896 
5897     if (nxt_slow_path(port->app == NULL)) {
5898         nxt_alert(task, "get_mmap_handler: app == NULL for reply port %PI:%d",
5899                   port->pid, port->id);
5900 
5901         // FIXME
5902         nxt_port_socket_write(task, port, NXT_PORT_MSG_RPC_ERROR,
5903                               -1, msg->port_msg.stream, 0, NULL);
5904 
5905         return;
5906     }
5907 
5908     mmaps = &port->app->outgoing;
5909     nxt_thread_mutex_lock(&mmaps->mutex);
5910 
5911     if (nxt_slow_path(get_mmap_msg->id >= mmaps->size)) {
5912         nxt_thread_mutex_unlock(&mmaps->mutex);
5913 
5914         nxt_alert(task, "get_mmap_handler: mmap id is too big (%d)",
5915                   (int) get_mmap_msg->id);
5916 
5917         // FIXME
5918         nxt_port_socket_write(task, port, NXT_PORT_MSG_RPC_ERROR,
5919                               -1, msg->port_msg.stream, 0, NULL);
5920         return;
5921     }
5922 
5923     mmap_handler = mmaps->elts[get_mmap_msg->id].mmap_handler;
5924 
5925     fd = mmap_handler->fd;
5926 
5927     nxt_thread_mutex_unlock(&mmaps->mutex);
5928 
5929     nxt_debug(task, "get mmap %PI:%d found",
5930               msg->port_msg.pid, (int) get_mmap_msg->id);
5931 
5932     (void) nxt_port_socket_write(task, port, NXT_PORT_MSG_MMAP, fd, 0, 0, NULL);
5933 }
5934 
5935 
5936 static void
5937 nxt_router_get_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
5938 {
5939     nxt_port_t               *port, *reply_port;
5940     nxt_runtime_t            *rt;
5941     nxt_port_msg_get_port_t  *get_port_msg;
5942 
5943     rt = task->thread->runtime;
5944 
5945     reply_port = nxt_runtime_port_find(rt, msg->port_msg.pid,
5946                                        msg->port_msg.reply_port);
5947     if (nxt_slow_path(reply_port == NULL)) {
5948         nxt_alert(task, "get_port_handler: reply_port %PI:%d not found",
5949                   msg->port_msg.pid, msg->port_msg.reply_port);
5950 
5951         return;
5952     }
5953 
5954     if (nxt_slow_path(nxt_buf_used_size(msg->buf)
5955                       < (int) sizeof(nxt_port_msg_get_port_t)))
5956     {
5957         nxt_alert(task, "get_port_handler: message buffer too small (%d)",
5958                   (int) nxt_buf_used_size(msg->buf));
5959 
5960         return;
5961     }
5962 
5963     get_port_msg = (nxt_port_msg_get_port_t *) msg->buf->mem.pos;
5964 
5965     port = nxt_runtime_port_find(rt, get_port_msg->pid, get_port_msg->id);
5966     if (nxt_slow_path(port == NULL)) {
5967         nxt_alert(task, "get_port_handler: port %PI:%d not found",
5968                   get_port_msg->pid, get_port_msg->id);
5969 
5970         return;
5971     }
5972 
5973     nxt_debug(task, "get port %PI:%d found", get_port_msg->pid,
5974               get_port_msg->id);
5975 
5976     (void) nxt_port_send_port(task, reply_port, port, msg->port_msg.stream);
5977 }
5978