xref: /unit/src/nxt_router.c (revision 2017:c1617684637c)
1 
2 /*
3  * Copyright (C) Igor Sysoev
4  * Copyright (C) Valentin V. Bartenev
5  * Copyright (C) NGINX, Inc.
6  */
7 
8 #include <nxt_router.h>
9 #include <nxt_conf.h>
10 #if (NXT_TLS)
11 #include <nxt_cert.h>
12 #endif
13 #include <nxt_http.h>
14 #include <nxt_port_memory_int.h>
15 #include <nxt_unit_request.h>
16 #include <nxt_unit_response.h>
17 #include <nxt_router_request.h>
18 #include <nxt_app_queue.h>
19 #include <nxt_port_queue.h>
20 
21 #define NXT_SHARED_PORT_ID  0xFFFFu
22 
23 typedef struct {
24     nxt_str_t         type;
25     uint32_t          processes;
26     uint32_t          max_processes;
27     uint32_t          spare_processes;
28     nxt_msec_t        timeout;
29     nxt_msec_t        idle_timeout;
30     nxt_conf_value_t  *limits_value;
31     nxt_conf_value_t  *processes_value;
32     nxt_conf_value_t  *targets_value;
33 } nxt_router_app_conf_t;
34 
35 
36 typedef struct {
37     nxt_str_t         pass;
38     nxt_str_t         application;
39 } nxt_router_listener_conf_t;
40 
41 
42 #if (NXT_TLS)
43 
44 typedef struct {
45     nxt_str_t               name;
46     nxt_socket_conf_t       *socket_conf;
47     nxt_router_temp_conf_t  *temp_conf;
48     nxt_tls_init_t          *tls_init;
49     nxt_bool_t              last;
50 
51     nxt_queue_link_t        link;  /* for nxt_socket_conf_t.tls */
52 } nxt_router_tlssock_t;
53 
54 #endif
55 
56 
57 typedef struct {
58     nxt_str_t               *name;
59     nxt_socket_conf_t       *socket_conf;
60     nxt_router_temp_conf_t  *temp_conf;
61     nxt_bool_t              last;
62 } nxt_socket_rpc_t;
63 
64 
65 typedef struct {
66     nxt_app_t               *app;
67     nxt_router_temp_conf_t  *temp_conf;
68     uint8_t                 proto;  /* 1 bit */
69 } nxt_app_rpc_t;
70 
71 
72 typedef struct {
73     nxt_app_joint_t         *app_joint;
74     uint32_t                generation;
75     uint8_t                 proto;  /* 1 bit */
76 } nxt_app_joint_rpc_t;
77 
78 
79 static nxt_int_t nxt_router_prefork(nxt_task_t *task, nxt_process_t *process,
80     nxt_mp_t *mp);
81 static nxt_int_t nxt_router_start(nxt_task_t *task, nxt_process_data_t *data);
82 static void nxt_router_greet_controller(nxt_task_t *task,
83     nxt_port_t *controller_port);
84 
85 static nxt_int_t nxt_router_start_app_process(nxt_task_t *task, nxt_app_t *app);
86 
87 static void nxt_router_new_port_handler(nxt_task_t *task,
88     nxt_port_recv_msg_t *msg);
89 static void nxt_router_conf_data_handler(nxt_task_t *task,
90     nxt_port_recv_msg_t *msg);
91 static void nxt_router_app_restart_handler(nxt_task_t *task,
92     nxt_port_recv_msg_t *msg);
93 static void nxt_router_remove_pid_handler(nxt_task_t *task,
94     nxt_port_recv_msg_t *msg);
95 static void nxt_router_access_log_reopen_handler(nxt_task_t *task,
96     nxt_port_recv_msg_t *msg);
97 
98 static nxt_router_temp_conf_t *nxt_router_temp_conf(nxt_task_t *task);
99 static void nxt_router_conf_apply(nxt_task_t *task, void *obj, void *data);
100 static void nxt_router_conf_ready(nxt_task_t *task,
101     nxt_router_temp_conf_t *tmcf);
102 static void nxt_router_conf_error(nxt_task_t *task,
103     nxt_router_temp_conf_t *tmcf);
104 static void nxt_router_conf_send(nxt_task_t *task,
105     nxt_router_temp_conf_t *tmcf, nxt_port_msg_type_t type);
106 
107 static nxt_int_t nxt_router_conf_create(nxt_task_t *task,
108     nxt_router_temp_conf_t *tmcf, u_char *start, u_char *end);
109 static nxt_int_t nxt_router_conf_process_static(nxt_task_t *task,
110     nxt_router_conf_t *rtcf, nxt_conf_value_t *conf);
111 static nxt_int_t nxt_router_conf_process_client_ip(nxt_task_t *task,
112     nxt_router_temp_conf_t *tmcf, nxt_socket_conf_t *skcf,
113     nxt_conf_value_t *conf);
114 
115 static nxt_app_t *nxt_router_app_find(nxt_queue_t *queue, nxt_str_t *name);
116 static nxt_int_t nxt_router_apps_hash_test(nxt_lvlhsh_query_t *lhq, void *data);
117 static nxt_int_t nxt_router_apps_hash_add(nxt_router_conf_t *rtcf,
118     nxt_app_t *app);
119 static nxt_app_t *nxt_router_apps_hash_get(nxt_router_conf_t *rtcf,
120     nxt_str_t *name);
121 static void nxt_router_apps_hash_use(nxt_task_t *task, nxt_router_conf_t *rtcf,
122     int i);
123 
124 static nxt_int_t nxt_router_app_queue_init(nxt_task_t *task,
125     nxt_port_t *port);
126 static nxt_int_t nxt_router_port_queue_init(nxt_task_t *task,
127     nxt_port_t *port);
128 static nxt_int_t nxt_router_port_queue_map(nxt_task_t *task,
129     nxt_port_t *port, nxt_fd_t fd);
130 static void nxt_router_listen_socket_rpc_create(nxt_task_t *task,
131     nxt_router_temp_conf_t *tmcf, nxt_socket_conf_t *skcf);
132 static void nxt_router_listen_socket_ready(nxt_task_t *task,
133     nxt_port_recv_msg_t *msg, void *data);
134 static void nxt_router_listen_socket_error(nxt_task_t *task,
135     nxt_port_recv_msg_t *msg, void *data);
136 #if (NXT_TLS)
137 static void nxt_router_tls_rpc_handler(nxt_task_t *task,
138     nxt_port_recv_msg_t *msg, void *data);
139 static nxt_int_t nxt_router_conf_tls_insert(nxt_router_temp_conf_t *tmcf,
140     nxt_conf_value_t *value, nxt_socket_conf_t *skcf, nxt_tls_init_t *tls_init,
141     nxt_bool_t last);
142 #endif
143 static void nxt_router_app_rpc_create(nxt_task_t *task,
144     nxt_router_temp_conf_t *tmcf, nxt_app_t *app);
145 static void nxt_router_app_prefork_ready(nxt_task_t *task,
146     nxt_port_recv_msg_t *msg, void *data);
147 static void nxt_router_app_prefork_error(nxt_task_t *task,
148     nxt_port_recv_msg_t *msg, void *data);
149 static nxt_socket_conf_t *nxt_router_socket_conf(nxt_task_t *task,
150     nxt_router_temp_conf_t *tmcf, nxt_str_t *name);
151 static nxt_int_t nxt_router_listen_socket_find(nxt_router_temp_conf_t *tmcf,
152     nxt_socket_conf_t *nskcf, nxt_sockaddr_t *sa);
153 
154 static nxt_int_t nxt_router_engines_create(nxt_task_t *task,
155     nxt_router_t *router, nxt_router_temp_conf_t *tmcf,
156     const nxt_event_interface_t *interface);
157 static nxt_int_t nxt_router_engine_conf_create(nxt_router_temp_conf_t *tmcf,
158     nxt_router_engine_conf_t *recf);
159 static nxt_int_t nxt_router_engine_conf_update(nxt_router_temp_conf_t *tmcf,
160     nxt_router_engine_conf_t *recf);
161 static nxt_int_t nxt_router_engine_conf_delete(nxt_router_temp_conf_t *tmcf,
162     nxt_router_engine_conf_t *recf);
163 static nxt_int_t nxt_router_engine_joints_create(nxt_router_temp_conf_t *tmcf,
164     nxt_router_engine_conf_t *recf, nxt_queue_t *sockets,
165     nxt_work_handler_t handler);
166 static nxt_int_t nxt_router_engine_quit(nxt_router_temp_conf_t *tmcf,
167     nxt_router_engine_conf_t *recf);
168 static nxt_int_t nxt_router_engine_joints_delete(nxt_router_temp_conf_t *tmcf,
169     nxt_router_engine_conf_t *recf, nxt_queue_t *sockets);
170 
171 static nxt_int_t nxt_router_threads_create(nxt_task_t *task, nxt_runtime_t *rt,
172     nxt_router_temp_conf_t *tmcf);
173 static nxt_int_t nxt_router_thread_create(nxt_task_t *task, nxt_runtime_t *rt,
174     nxt_event_engine_t *engine);
175 static void nxt_router_apps_sort(nxt_task_t *task, nxt_router_t *router,
176     nxt_router_temp_conf_t *tmcf);
177 
178 static void nxt_router_engines_post(nxt_router_t *router,
179     nxt_router_temp_conf_t *tmcf);
180 static void nxt_router_engine_post(nxt_event_engine_t *engine,
181     nxt_work_t *jobs);
182 
183 static void nxt_router_thread_start(void *data);
184 static void nxt_router_rt_add_port(nxt_task_t *task, void *obj,
185     void *data);
186 static void nxt_router_listen_socket_create(nxt_task_t *task, void *obj,
187     void *data);
188 static void nxt_router_listen_socket_update(nxt_task_t *task, void *obj,
189     void *data);
190 static void nxt_router_listen_socket_delete(nxt_task_t *task, void *obj,
191     void *data);
192 static void nxt_router_worker_thread_quit(nxt_task_t *task, void *obj,
193     void *data);
194 static void nxt_router_listen_socket_close(nxt_task_t *task, void *obj,
195     void *data);
196 static void nxt_router_thread_exit_handler(nxt_task_t *task, void *obj,
197     void *data);
198 static void nxt_router_req_headers_ack_handler(nxt_task_t *task,
199     nxt_port_recv_msg_t *msg, nxt_request_rpc_data_t *req_rpc_data);
200 static void nxt_router_listen_socket_release(nxt_task_t *task,
201     nxt_socket_conf_t *skcf);
202 
203 static void nxt_router_access_log_writer(nxt_task_t *task,
204     nxt_http_request_t *r, nxt_router_access_log_t *access_log);
205 static u_char *nxt_router_access_log_date(u_char *buf, nxt_realtime_t *now,
206     struct tm *tm, size_t size, const char *format);
207 static void nxt_router_access_log_open(nxt_task_t *task,
208     nxt_router_temp_conf_t *tmcf);
209 static void nxt_router_access_log_ready(nxt_task_t *task,
210     nxt_port_recv_msg_t *msg, void *data);
211 static void nxt_router_access_log_error(nxt_task_t *task,
212     nxt_port_recv_msg_t *msg, void *data);
213 static void nxt_router_access_log_use(nxt_thread_spinlock_t *lock,
214     nxt_router_access_log_t *access_log);
215 static void nxt_router_access_log_release(nxt_task_t *task,
216     nxt_thread_spinlock_t *lock, nxt_router_access_log_t *access_log);
217 static void nxt_router_access_log_reopen_completion(nxt_task_t *task, void *obj,
218     void *data);
219 static void nxt_router_access_log_reopen_ready(nxt_task_t *task,
220     nxt_port_recv_msg_t *msg, void *data);
221 static void nxt_router_access_log_reopen_error(nxt_task_t *task,
222     nxt_port_recv_msg_t *msg, void *data);
223 
224 static void nxt_router_app_port_ready(nxt_task_t *task,
225     nxt_port_recv_msg_t *msg, void *data);
226 static void nxt_router_app_port_error(nxt_task_t *task,
227     nxt_port_recv_msg_t *msg, void *data);
228 
229 static void nxt_router_app_use(nxt_task_t *task, nxt_app_t *app, int i);
230 static void nxt_router_app_unlink(nxt_task_t *task, nxt_app_t *app);
231 
232 static void nxt_router_app_port_release(nxt_task_t *task, nxt_app_t *app,
233     nxt_port_t *port, nxt_apr_action_t action);
234 static void nxt_router_app_port_get(nxt_task_t *task, nxt_app_t *app,
235     nxt_request_rpc_data_t *req_rpc_data);
236 static void nxt_router_http_request_error(nxt_task_t *task, void *obj,
237     void *data);
238 static void nxt_router_http_request_done(nxt_task_t *task, void *obj,
239     void *data);
240 
241 static void nxt_router_app_prepare_request(nxt_task_t *task,
242     nxt_request_rpc_data_t *req_rpc_data);
243 static nxt_buf_t *nxt_router_prepare_msg(nxt_task_t *task,
244     nxt_http_request_t *r, nxt_app_t *app, const nxt_str_t *prefix);
245 
246 static void nxt_router_app_timeout(nxt_task_t *task, void *obj, void *data);
247 static void nxt_router_adjust_idle_timer(nxt_task_t *task, void *obj,
248     void *data);
249 static void nxt_router_app_idle_timeout(nxt_task_t *task, void *obj,
250     void *data);
251 static void nxt_router_app_joint_release_handler(nxt_task_t *task, void *obj,
252     void *data);
253 static void nxt_router_free_app(nxt_task_t *task, void *obj, void *data);
254 
255 static const nxt_http_request_state_t  nxt_http_request_send_state;
256 static void nxt_http_request_send_body(nxt_task_t *task, void *obj, void *data);
257 
258 static void nxt_router_app_joint_use(nxt_task_t *task,
259     nxt_app_joint_t *app_joint, int i);
260 
261 static void nxt_router_http_request_release_post(nxt_task_t *task,
262     nxt_http_request_t *r);
263 static void nxt_router_http_request_release(nxt_task_t *task, void *obj,
264     void *data);
265 static void nxt_router_oosm_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg);
266 static void nxt_router_get_port_handler(nxt_task_t *task,
267     nxt_port_recv_msg_t *msg);
268 static void nxt_router_get_mmap_handler(nxt_task_t *task,
269     nxt_port_recv_msg_t *msg);
270 
271 extern const nxt_http_request_state_t  nxt_http_websocket;
272 
273 static nxt_router_t  *nxt_router;
274 
275 static const nxt_str_t http_prefix = nxt_string("HTTP_");
276 static const nxt_str_t empty_prefix = nxt_string("");
277 
278 static const nxt_str_t  *nxt_app_msg_prefix[] = {
279     &empty_prefix,
280     &empty_prefix,
281     &http_prefix,
282     &http_prefix,
283     &http_prefix,
284     &empty_prefix,
285 };
286 
287 
288 static const nxt_port_handlers_t  nxt_router_process_port_handlers = {
289     .quit         = nxt_signal_quit_handler,
290     .new_port     = nxt_router_new_port_handler,
291     .get_port     = nxt_router_get_port_handler,
292     .change_file  = nxt_port_change_log_file_handler,
293     .mmap         = nxt_port_mmap_handler,
294     .get_mmap     = nxt_router_get_mmap_handler,
295     .data         = nxt_router_conf_data_handler,
296     .app_restart  = nxt_router_app_restart_handler,
297     .remove_pid   = nxt_router_remove_pid_handler,
298     .access_log   = nxt_router_access_log_reopen_handler,
299     .rpc_ready    = nxt_port_rpc_handler,
300     .rpc_error    = nxt_port_rpc_handler,
301     .oosm         = nxt_router_oosm_handler,
302 };
303 
304 
305 const nxt_process_init_t  nxt_router_process = {
306     .name           = "router",
307     .type           = NXT_PROCESS_ROUTER,
308     .prefork        = nxt_router_prefork,
309     .restart        = 1,
310     .setup          = nxt_process_core_setup,
311     .start          = nxt_router_start,
312     .port_handlers  = &nxt_router_process_port_handlers,
313     .signals        = nxt_process_signals,
314 };
315 
316 
317 /* Queues of nxt_socket_conf_t */
318 nxt_queue_t  creating_sockets;
319 nxt_queue_t  pending_sockets;
320 nxt_queue_t  updating_sockets;
321 nxt_queue_t  keeping_sockets;
322 nxt_queue_t  deleting_sockets;
323 
324 
325 static nxt_int_t
326 nxt_router_prefork(nxt_task_t *task, nxt_process_t *process, nxt_mp_t *mp)
327 {
328     nxt_runtime_stop_app_processes(task, task->thread->runtime);
329 
330     return NXT_OK;
331 }
332 
333 
334 static nxt_int_t
335 nxt_router_start(nxt_task_t *task, nxt_process_data_t *data)
336 {
337     nxt_int_t      ret;
338     nxt_port_t     *controller_port;
339     nxt_router_t   *router;
340     nxt_runtime_t  *rt;
341 
342     rt = task->thread->runtime;
343 
344     nxt_log(task, NXT_LOG_INFO, "router started");
345 
346 #if (NXT_TLS)
347     rt->tls = nxt_service_get(rt->services, "SSL/TLS", "OpenSSL");
348     if (nxt_slow_path(rt->tls == NULL)) {
349         return NXT_ERROR;
350     }
351 
352     ret = rt->tls->library_init(task);
353     if (nxt_slow_path(ret != NXT_OK)) {
354         return ret;
355     }
356 #endif
357 
358     ret = nxt_http_init(task);
359     if (nxt_slow_path(ret != NXT_OK)) {
360         return ret;
361     }
362 
363     router = nxt_zalloc(sizeof(nxt_router_t));
364     if (nxt_slow_path(router == NULL)) {
365         return NXT_ERROR;
366     }
367 
368     nxt_queue_init(&router->engines);
369     nxt_queue_init(&router->sockets);
370     nxt_queue_init(&router->apps);
371 
372     nxt_router = router;
373 
374     controller_port = rt->port_by_type[NXT_PROCESS_CONTROLLER];
375     if (controller_port != NULL) {
376         nxt_router_greet_controller(task, controller_port);
377     }
378 
379     return NXT_OK;
380 }
381 
382 
383 static void
384 nxt_router_greet_controller(nxt_task_t *task, nxt_port_t *controller_port)
385 {
386     nxt_port_socket_write(task, controller_port, NXT_PORT_MSG_PROCESS_READY,
387                           -1, 0, 0, NULL);
388 }
389 
390 
391 static void
392 nxt_router_start_app_process_handler(nxt_task_t *task, nxt_port_t *port,
393     void *data)
394 {
395     size_t               size;
396     uint32_t             stream;
397     nxt_fd_t             port_fd, queue_fd;
398     nxt_int_t            ret;
399     nxt_app_t            *app;
400     nxt_buf_t            *b;
401     nxt_port_t           *dport;
402     nxt_runtime_t        *rt;
403     nxt_app_joint_rpc_t  *app_joint_rpc;
404 
405     app = data;
406 
407     nxt_thread_mutex_lock(&app->mutex);
408 
409     dport = app->proto_port;
410 
411     nxt_thread_mutex_unlock(&app->mutex);
412 
413     if (dport != NULL) {
414         nxt_debug(task, "app '%V' %p start process", &app->name, app);
415 
416         b = NULL;
417         port_fd = -1;
418         queue_fd = -1;
419 
420     } else {
421         if (app->proto_port_requests > 0) {
422             nxt_debug(task, "app '%V' %p wait for prototype process",
423                       &app->name, app);
424 
425             app->proto_port_requests++;
426 
427             goto skip;
428         }
429 
430         nxt_debug(task, "app '%V' %p start prototype process", &app->name, app);
431 
432         rt = task->thread->runtime;
433         dport = rt->port_by_type[NXT_PROCESS_MAIN];
434 
435         size = app->name.length + 1 + app->conf.length;
436 
437         b = nxt_buf_mem_alloc(task->thread->engine->mem_pool, size, 0);
438         if (nxt_slow_path(b == NULL)) {
439             goto failed;
440         }
441 
442         nxt_buf_cpystr(b, &app->name);
443         *b->mem.free++ = '\0';
444         nxt_buf_cpystr(b, &app->conf);
445 
446         port_fd = app->shared_port->pair[0];
447         queue_fd = app->shared_port->queue_fd;
448     }
449 
450     app_joint_rpc = nxt_port_rpc_register_handler_ex(task, port,
451                                                      nxt_router_app_port_ready,
452                                                      nxt_router_app_port_error,
453                                                    sizeof(nxt_app_joint_rpc_t));
454     if (nxt_slow_path(app_joint_rpc == NULL)) {
455         goto failed;
456     }
457 
458     stream = nxt_port_rpc_ex_stream(app_joint_rpc);
459 
460     ret = nxt_port_socket_write2(task, dport, NXT_PORT_MSG_START_PROCESS,
461                                  port_fd, queue_fd, stream, port->id, b);
462     if (nxt_slow_path(ret != NXT_OK)) {
463         nxt_port_rpc_cancel(task, port, stream);
464 
465         goto failed;
466     }
467 
468     app_joint_rpc->app_joint = app->joint;
469     app_joint_rpc->generation = app->generation;
470     app_joint_rpc->proto = (b != NULL);
471 
472     if (b != NULL) {
473         app->proto_port_requests++;
474 
475         b = NULL;
476     }
477 
478     nxt_router_app_joint_use(task, app->joint, 1);
479 
480 failed:
481 
482     if (b != NULL) {
483         nxt_mp_free(b->data, b);
484     }
485 
486 skip:
487 
488     nxt_router_app_use(task, app, -1);
489 }
490 
491 
492 static void
493 nxt_router_app_joint_use(nxt_task_t *task, nxt_app_joint_t *app_joint, int i)
494 {
495     app_joint->use_count += i;
496 
497     if (app_joint->use_count == 0) {
498         nxt_assert(app_joint->app == NULL);
499 
500         nxt_free(app_joint);
501     }
502 }
503 
504 
505 static nxt_int_t
506 nxt_router_start_app_process(nxt_task_t *task, nxt_app_t *app)
507 {
508     nxt_int_t      res;
509     nxt_port_t     *router_port;
510     nxt_runtime_t  *rt;
511 
512     nxt_debug(task, "app '%V' start process", &app->name);
513 
514     rt = task->thread->runtime;
515     router_port = rt->port_by_type[NXT_PROCESS_ROUTER];
516 
517     nxt_router_app_use(task, app, 1);
518 
519     res = nxt_port_post(task, router_port, nxt_router_start_app_process_handler,
520                         app);
521 
522     if (res == NXT_OK) {
523         return res;
524     }
525 
526     nxt_thread_mutex_lock(&app->mutex);
527 
528     app->pending_processes--;
529 
530     nxt_thread_mutex_unlock(&app->mutex);
531 
532     nxt_router_app_use(task, app, -1);
533 
534     return NXT_ERROR;
535 }
536 
537 
538 nxt_inline nxt_bool_t
539 nxt_router_msg_cancel(nxt_task_t *task, nxt_request_rpc_data_t *req_rpc_data)
540 {
541     nxt_buf_t       *b, *next;
542     nxt_bool_t      cancelled;
543     nxt_port_t      *app_port;
544     nxt_msg_info_t  *msg_info;
545 
546     msg_info = &req_rpc_data->msg_info;
547 
548     if (msg_info->buf == NULL) {
549         return 0;
550     }
551 
552     app_port = req_rpc_data->app_port;
553 
554     if (app_port != NULL && app_port->id == NXT_SHARED_PORT_ID) {
555         cancelled = nxt_app_queue_cancel(app_port->queue,
556                                          msg_info->tracking_cookie,
557                                          req_rpc_data->stream);
558 
559         if (cancelled) {
560             nxt_debug(task, "stream #%uD: cancelled by router",
561                       req_rpc_data->stream);
562         }
563 
564     } else {
565         cancelled = 0;
566     }
567 
568     for (b = msg_info->buf; b != NULL; b = next) {
569         next = b->next;
570         b->next = NULL;
571 
572         if (b->is_port_mmap_sent) {
573             b->is_port_mmap_sent = cancelled == 0;
574         }
575 
576         b->completion_handler(task, b, b->parent);
577     }
578 
579     msg_info->buf = NULL;
580 
581     return cancelled;
582 }
583 
584 
585 nxt_inline nxt_bool_t
586 nxt_queue_chk_remove(nxt_queue_link_t *lnk)
587 {
588     if (lnk->next != NULL) {
589         nxt_queue_remove(lnk);
590 
591         lnk->next = NULL;
592 
593         return 1;
594     }
595 
596     return 0;
597 }
598 
599 
600 nxt_inline void
601 nxt_request_rpc_data_unlink(nxt_task_t *task,
602     nxt_request_rpc_data_t *req_rpc_data)
603 {
604     nxt_app_t           *app;
605     nxt_bool_t          unlinked;
606     nxt_http_request_t  *r;
607 
608     nxt_router_msg_cancel(task, req_rpc_data);
609 
610     app = req_rpc_data->app;
611 
612     if (req_rpc_data->app_port != NULL) {
613         nxt_router_app_port_release(task, app, req_rpc_data->app_port,
614                                     req_rpc_data->apr_action);
615 
616         req_rpc_data->app_port = NULL;
617     }
618 
619     r = req_rpc_data->request;
620 
621     if (r != NULL) {
622         r->timer_data = NULL;
623 
624         nxt_router_http_request_release_post(task, r);
625 
626         r->req_rpc_data = NULL;
627         req_rpc_data->request = NULL;
628 
629         if (app != NULL) {
630             unlinked = 0;
631 
632             nxt_thread_mutex_lock(&app->mutex);
633 
634             if (r->app_link.next != NULL) {
635                 nxt_queue_remove(&r->app_link);
636                 r->app_link.next = NULL;
637 
638                 unlinked = 1;
639             }
640 
641             nxt_thread_mutex_unlock(&app->mutex);
642 
643             if (unlinked) {
644                 nxt_mp_release(r->mem_pool);
645             }
646         }
647     }
648 
649     if (app != NULL) {
650         nxt_router_app_use(task, app, -1);
651 
652         req_rpc_data->app = NULL;
653     }
654 
655     if (req_rpc_data->msg_info.body_fd != -1) {
656         nxt_fd_close(req_rpc_data->msg_info.body_fd);
657 
658         req_rpc_data->msg_info.body_fd = -1;
659     }
660 
661     if (req_rpc_data->rpc_cancel) {
662         req_rpc_data->rpc_cancel = 0;
663 
664         nxt_port_rpc_cancel(task, task->thread->engine->port,
665                             req_rpc_data->stream);
666     }
667 }
668 
669 
670 static void
671 nxt_router_new_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
672 {
673     nxt_int_t      res;
674     nxt_app_t      *app;
675     nxt_port_t     *port, *main_app_port;
676     nxt_runtime_t  *rt;
677 
678     nxt_port_new_port_handler(task, msg);
679 
680     port = msg->u.new_port;
681 
682     if (port != NULL && port->type == NXT_PROCESS_CONTROLLER) {
683         nxt_router_greet_controller(task, msg->u.new_port);
684     }
685 
686     if (port != NULL && port->type == NXT_PROCESS_PROTOTYPE)  {
687         nxt_port_rpc_handler(task, msg);
688 
689         return;
690     }
691 
692     if (port == NULL || port->type != NXT_PROCESS_APP) {
693 
694         if (msg->port_msg.stream == 0) {
695             return;
696         }
697 
698         msg->port_msg.type = _NXT_PORT_MSG_RPC_ERROR;
699 
700     } else {
701         if (msg->fd[1] != -1) {
702             res = nxt_router_port_queue_map(task, port, msg->fd[1]);
703             if (nxt_slow_path(res != NXT_OK)) {
704                 return;
705             }
706 
707             nxt_fd_close(msg->fd[1]);
708             msg->fd[1] = -1;
709         }
710     }
711 
712     if (msg->port_msg.stream != 0) {
713         nxt_port_rpc_handler(task, msg);
714         return;
715     }
716 
717     nxt_debug(task, "new port id %d (%d)", port->id, port->type);
718 
719     /*
720      * Port with "id == 0" is application 'main' port and it always
721      * should come with non-zero stream.
722      */
723     nxt_assert(port->id != 0);
724 
725     /* Find 'main' app port and get app reference. */
726     rt = task->thread->runtime;
727 
728     /*
729      * It is safe to access 'runtime->ports' hash because 'NEW_PORT'
730      * sent to main port (with id == 0) and processed in main thread.
731      */
732     main_app_port = nxt_port_hash_find(&rt->ports, port->pid, 0);
733     nxt_assert(main_app_port != NULL);
734 
735     app = main_app_port->app;
736 
737     if (nxt_fast_path(app != NULL)) {
738         nxt_thread_mutex_lock(&app->mutex);
739 
740         /* TODO here should be find-and-add code because there can be
741            port waiters in port_hash */
742         nxt_port_hash_add(&app->port_hash, port);
743         app->port_hash_count++;
744 
745         nxt_thread_mutex_unlock(&app->mutex);
746 
747         port->app = app;
748     }
749 
750     port->main_app_port = main_app_port;
751 
752     nxt_port_socket_write(task, port, NXT_PORT_MSG_PORT_ACK, -1, 0, 0, NULL);
753 }
754 
755 
756 static void
757 nxt_router_conf_data_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
758 {
759     void                    *p;
760     size_t                  size;
761     nxt_int_t               ret;
762     nxt_port_t              *port;
763     nxt_router_temp_conf_t  *tmcf;
764 
765     port = nxt_runtime_port_find(task->thread->runtime,
766                                  msg->port_msg.pid,
767                                  msg->port_msg.reply_port);
768     if (nxt_slow_path(port == NULL)) {
769         nxt_alert(task, "conf_data_handler: reply port not found");
770         return;
771     }
772 
773     p = MAP_FAILED;
774 
775     /*
776      * Ancient compilers like gcc 4.8.5 on CentOS 7 wants 'size' to be
777      * initialized in 'cleanup' section.
778      */
779     size = 0;
780 
781     tmcf = nxt_router_temp_conf(task);
782     if (nxt_slow_path(tmcf == NULL)) {
783         goto fail;
784     }
785 
786     if (nxt_slow_path(msg->fd[0] == -1)) {
787         nxt_alert(task, "conf_data_handler: invalid shm fd");
788         goto fail;
789     }
790 
791     if (nxt_buf_mem_used_size(&msg->buf->mem) != sizeof(size_t)) {
792         nxt_alert(task, "conf_data_handler: unexpected buffer size (%d)",
793                   (int) nxt_buf_mem_used_size(&msg->buf->mem));
794         goto fail;
795     }
796 
797     nxt_memcpy(&size, msg->buf->mem.pos, sizeof(size_t));
798 
799     p = nxt_mem_mmap(NULL, size, PROT_READ, MAP_SHARED, msg->fd[0], 0);
800 
801     nxt_fd_close(msg->fd[0]);
802     msg->fd[0] = -1;
803 
804     if (nxt_slow_path(p == MAP_FAILED)) {
805         goto fail;
806     }
807 
808     nxt_debug(task, "conf_data_handler(%uz): %*s", size, size, p);
809 
810     tmcf->router_conf->router = nxt_router;
811     tmcf->stream = msg->port_msg.stream;
812     tmcf->port = port;
813 
814     nxt_port_use(task, tmcf->port, 1);
815 
816     ret = nxt_router_conf_create(task, tmcf, p, nxt_pointer_to(p, size));
817 
818     if (nxt_fast_path(ret == NXT_OK)) {
819         nxt_router_conf_apply(task, tmcf, NULL);
820 
821     } else {
822         nxt_router_conf_error(task, tmcf);
823     }
824 
825     goto cleanup;
826 
827 fail:
828 
829     nxt_port_socket_write(task, port, NXT_PORT_MSG_RPC_ERROR, -1,
830                           msg->port_msg.stream, 0, NULL);
831 
832     if (tmcf != NULL) {
833         nxt_mp_release(tmcf->mem_pool);
834     }
835 
836 cleanup:
837 
838     if (p != MAP_FAILED) {
839         nxt_mem_munmap(p, size);
840     }
841 
842     if (msg->fd[0] != -1) {
843         nxt_fd_close(msg->fd[0]);
844         msg->fd[0] = -1;
845     }
846 }
847 
848 
849 static void
850 nxt_router_app_restart_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
851 {
852     nxt_app_t            *app;
853     nxt_int_t            ret;
854     nxt_str_t            app_name;
855     nxt_port_t           *reply_port, *shared_port, *old_shared_port;
856     nxt_port_t           *proto_port;
857     nxt_port_msg_type_t  reply;
858 
859     reply_port = nxt_runtime_port_find(task->thread->runtime,
860                                        msg->port_msg.pid,
861                                        msg->port_msg.reply_port);
862     if (nxt_slow_path(reply_port == NULL)) {
863         nxt_alert(task, "app_restart_handler: reply port not found");
864         return;
865     }
866 
867     app_name.length = nxt_buf_mem_used_size(&msg->buf->mem);
868     app_name.start = msg->buf->mem.pos;
869 
870     nxt_debug(task, "app_restart_handler: %V", &app_name);
871 
872     app = nxt_router_app_find(&nxt_router->apps, &app_name);
873 
874     if (nxt_fast_path(app != NULL)) {
875         shared_port = nxt_port_new(task, NXT_SHARED_PORT_ID, nxt_pid,
876                                    NXT_PROCESS_APP);
877         if (nxt_slow_path(shared_port == NULL)) {
878             goto fail;
879         }
880 
881         ret = nxt_port_socket_init(task, shared_port, 0);
882         if (nxt_slow_path(ret != NXT_OK)) {
883             nxt_port_use(task, shared_port, -1);
884             goto fail;
885         }
886 
887         ret = nxt_router_app_queue_init(task, shared_port);
888         if (nxt_slow_path(ret != NXT_OK)) {
889             nxt_port_write_close(shared_port);
890             nxt_port_read_close(shared_port);
891             nxt_port_use(task, shared_port, -1);
892             goto fail;
893         }
894 
895         nxt_port_write_enable(task, shared_port);
896 
897         nxt_thread_mutex_lock(&app->mutex);
898 
899         proto_port = app->proto_port;
900 
901         if (proto_port != NULL) {
902             nxt_debug(task, "send QUIT to prototype '%V' pid %PI", &app->name,
903                       proto_port->pid);
904 
905             app->proto_port = NULL;
906             proto_port->app = NULL;
907         }
908 
909         app->generation++;
910 
911         shared_port->app = app;
912 
913         old_shared_port = app->shared_port;
914         old_shared_port->app = NULL;
915 
916         app->shared_port = shared_port;
917 
918         nxt_thread_mutex_unlock(&app->mutex);
919 
920         nxt_port_close(task, old_shared_port);
921         nxt_port_use(task, old_shared_port, -1);
922 
923         if (proto_port != NULL) {
924             (void) nxt_port_socket_write(task, proto_port, NXT_PORT_MSG_QUIT,
925                                          -1, 0, 0, NULL);
926 
927             nxt_port_close(task, proto_port);
928 
929             nxt_port_use(task, proto_port, -1);
930         }
931 
932         reply = NXT_PORT_MSG_RPC_READY_LAST;
933 
934     } else {
935 
936 fail:
937 
938         reply = NXT_PORT_MSG_RPC_ERROR;
939     }
940 
941     nxt_port_socket_write(task, reply_port, reply, -1, msg->port_msg.stream,
942                           0, NULL);
943 }
944 
945 
946 static void
947 nxt_router_app_process_remove_pid(nxt_task_t *task, nxt_port_t *port,
948     void *data)
949 {
950     union {
951         nxt_pid_t  removed_pid;
952         void       *data;
953     } u;
954 
955     u.data = data;
956 
957     nxt_port_rpc_remove_peer(task, port, u.removed_pid);
958 }
959 
960 
961 static void
962 nxt_router_remove_pid_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
963 {
964     nxt_event_engine_t  *engine;
965 
966     nxt_port_remove_pid_handler(task, msg);
967 
968     nxt_queue_each(engine, &nxt_router->engines, nxt_event_engine_t, link0)
969     {
970         if (nxt_fast_path(engine->port != NULL)) {
971             nxt_port_post(task, engine->port, nxt_router_app_process_remove_pid,
972                           msg->u.data);
973         }
974     }
975     nxt_queue_loop;
976 
977     if (msg->port_msg.stream == 0) {
978         return;
979     }
980 
981     msg->port_msg.type = _NXT_PORT_MSG_RPC_ERROR;
982 
983     nxt_port_rpc_handler(task, msg);
984 }
985 
986 
987 static nxt_router_temp_conf_t *
988 nxt_router_temp_conf(nxt_task_t *task)
989 {
990     nxt_mp_t                *mp, *tmp;
991     nxt_router_conf_t       *rtcf;
992     nxt_router_temp_conf_t  *tmcf;
993 
994     mp = nxt_mp_create(1024, 128, 256, 32);
995     if (nxt_slow_path(mp == NULL)) {
996         return NULL;
997     }
998 
999     rtcf = nxt_mp_zget(mp, sizeof(nxt_router_conf_t));
1000     if (nxt_slow_path(rtcf == NULL)) {
1001         goto fail;
1002     }
1003 
1004     rtcf->mem_pool = mp;
1005 
1006     tmp = nxt_mp_create(1024, 128, 256, 32);
1007     if (nxt_slow_path(tmp == NULL)) {
1008         goto fail;
1009     }
1010 
1011     tmcf = nxt_mp_zget(tmp, sizeof(nxt_router_temp_conf_t));
1012     if (nxt_slow_path(tmcf == NULL)) {
1013         goto temp_fail;
1014     }
1015 
1016     tmcf->mem_pool = tmp;
1017     tmcf->router_conf = rtcf;
1018     tmcf->count = 1;
1019     tmcf->engine = task->thread->engine;
1020 
1021     tmcf->engines = nxt_array_create(tmcf->mem_pool, 4,
1022                                      sizeof(nxt_router_engine_conf_t));
1023     if (nxt_slow_path(tmcf->engines == NULL)) {
1024         goto temp_fail;
1025     }
1026 
1027     nxt_queue_init(&creating_sockets);
1028     nxt_queue_init(&pending_sockets);
1029     nxt_queue_init(&updating_sockets);
1030     nxt_queue_init(&keeping_sockets);
1031     nxt_queue_init(&deleting_sockets);
1032 
1033 #if (NXT_TLS)
1034     nxt_queue_init(&tmcf->tls);
1035 #endif
1036 
1037     nxt_queue_init(&tmcf->apps);
1038     nxt_queue_init(&tmcf->previous);
1039 
1040     return tmcf;
1041 
1042 temp_fail:
1043 
1044     nxt_mp_destroy(tmp);
1045 
1046 fail:
1047 
1048     nxt_mp_destroy(mp);
1049 
1050     return NULL;
1051 }
1052 
1053 
1054 nxt_inline nxt_bool_t
1055 nxt_router_app_can_start(nxt_app_t *app)
1056 {
1057     return app->processes + app->pending_processes < app->max_processes
1058             && app->pending_processes < app->max_pending_processes;
1059 }
1060 
1061 
1062 nxt_inline nxt_bool_t
1063 nxt_router_app_need_start(nxt_app_t *app)
1064 {
1065     return (app->active_requests
1066               > app->port_hash_count + app->pending_processes)
1067            || (app->spare_processes
1068                 > app->idle_processes + app->pending_processes);
1069 }
1070 
1071 
1072 static void
1073 nxt_router_conf_apply(nxt_task_t *task, void *obj, void *data)
1074 {
1075     nxt_int_t                    ret;
1076     nxt_app_t                    *app;
1077     nxt_router_t                 *router;
1078     nxt_runtime_t                *rt;
1079     nxt_queue_link_t             *qlk;
1080     nxt_socket_conf_t            *skcf;
1081     nxt_router_conf_t            *rtcf;
1082     nxt_router_temp_conf_t       *tmcf;
1083     const nxt_event_interface_t  *interface;
1084 #if (NXT_TLS)
1085     nxt_router_tlssock_t         *tls;
1086 #endif
1087 
1088     tmcf = obj;
1089 
1090     qlk = nxt_queue_first(&pending_sockets);
1091 
1092     if (qlk != nxt_queue_tail(&pending_sockets)) {
1093         nxt_queue_remove(qlk);
1094         nxt_queue_insert_tail(&creating_sockets, qlk);
1095 
1096         skcf = nxt_queue_link_data(qlk, nxt_socket_conf_t, link);
1097 
1098         nxt_router_listen_socket_rpc_create(task, tmcf, skcf);
1099 
1100         return;
1101     }
1102 
1103 #if (NXT_TLS)
1104     qlk = nxt_queue_last(&tmcf->tls);
1105 
1106     if (qlk != nxt_queue_head(&tmcf->tls)) {
1107         nxt_queue_remove(qlk);
1108 
1109         tls = nxt_queue_link_data(qlk, nxt_router_tlssock_t, link);
1110 
1111         nxt_cert_store_get(task, &tls->name, tmcf->mem_pool,
1112                            nxt_router_tls_rpc_handler, tls);
1113         return;
1114     }
1115 #endif
1116 
1117     nxt_queue_each(app, &tmcf->apps, nxt_app_t, link) {
1118 
1119         if (nxt_router_app_need_start(app)) {
1120             nxt_router_app_rpc_create(task, tmcf, app);
1121             return;
1122         }
1123 
1124     } nxt_queue_loop;
1125 
1126     rtcf = tmcf->router_conf;
1127 
1128     if (rtcf->access_log != NULL && rtcf->access_log->fd == -1) {
1129         nxt_router_access_log_open(task, tmcf);
1130         return;
1131     }
1132 
1133     rt = task->thread->runtime;
1134 
1135     interface = nxt_service_get(rt->services, "engine", NULL);
1136 
1137     router = rtcf->router;
1138 
1139     ret = nxt_router_engines_create(task, router, tmcf, interface);
1140     if (nxt_slow_path(ret != NXT_OK)) {
1141         goto fail;
1142     }
1143 
1144     ret = nxt_router_threads_create(task, rt, tmcf);
1145     if (nxt_slow_path(ret != NXT_OK)) {
1146         goto fail;
1147     }
1148 
1149     nxt_router_apps_sort(task, router, tmcf);
1150 
1151     nxt_router_apps_hash_use(task, rtcf, 1);
1152 
1153     nxt_router_engines_post(router, tmcf);
1154 
1155     nxt_queue_add(&router->sockets, &updating_sockets);
1156     nxt_queue_add(&router->sockets, &creating_sockets);
1157 
1158     if (router->access_log != rtcf->access_log) {
1159         nxt_router_access_log_use(&router->lock, rtcf->access_log);
1160 
1161         nxt_router_access_log_release(task, &router->lock, router->access_log);
1162 
1163         router->access_log = rtcf->access_log;
1164     }
1165 
1166     nxt_router_conf_ready(task, tmcf);
1167 
1168     return;
1169 
1170 fail:
1171 
1172     nxt_router_conf_error(task, tmcf);
1173 
1174     return;
1175 }
1176 
1177 
1178 static void
1179 nxt_router_conf_wait(nxt_task_t *task, void *obj, void *data)
1180 {
1181     nxt_joint_job_t  *job;
1182 
1183     job = obj;
1184 
1185     nxt_router_conf_ready(task, job->tmcf);
1186 }
1187 
1188 
1189 static void
1190 nxt_router_conf_ready(nxt_task_t *task, nxt_router_temp_conf_t *tmcf)
1191 {
1192     uint32_t               count;
1193     nxt_router_conf_t      *rtcf;
1194     nxt_thread_spinlock_t  *lock;
1195 
1196     nxt_debug(task, "temp conf %p count: %D", tmcf, tmcf->count);
1197 
1198     if (--tmcf->count > 0) {
1199         return;
1200     }
1201 
1202     nxt_router_conf_send(task, tmcf, NXT_PORT_MSG_RPC_READY_LAST);
1203 
1204     rtcf = tmcf->router_conf;
1205 
1206     lock = &rtcf->router->lock;
1207 
1208     nxt_thread_spin_lock(lock);
1209 
1210     count = rtcf->count;
1211 
1212     nxt_thread_spin_unlock(lock);
1213 
1214     nxt_debug(task, "rtcf %p: %D", rtcf, count);
1215 
1216     if (count == 0) {
1217         nxt_router_apps_hash_use(task, rtcf, -1);
1218 
1219         nxt_router_access_log_release(task, lock, rtcf->access_log);
1220 
1221         nxt_mp_destroy(rtcf->mem_pool);
1222     }
1223 
1224     nxt_mp_release(tmcf->mem_pool);
1225 }
1226 
1227 
1228 static void
1229 nxt_router_conf_error(nxt_task_t *task, nxt_router_temp_conf_t *tmcf)
1230 {
1231     nxt_app_t          *app;
1232     nxt_queue_t        new_socket_confs;
1233     nxt_socket_t       s;
1234     nxt_router_t       *router;
1235     nxt_queue_link_t   *qlk;
1236     nxt_socket_conf_t  *skcf;
1237     nxt_router_conf_t  *rtcf;
1238 
1239     nxt_alert(task, "failed to apply new conf");
1240 
1241     for (qlk = nxt_queue_first(&creating_sockets);
1242          qlk != nxt_queue_tail(&creating_sockets);
1243          qlk = nxt_queue_next(qlk))
1244     {
1245         skcf = nxt_queue_link_data(qlk, nxt_socket_conf_t, link);
1246         s = skcf->listen->socket;
1247 
1248         if (s != -1) {
1249             nxt_socket_close(task, s);
1250         }
1251 
1252         nxt_free(skcf->listen);
1253     }
1254 
1255     nxt_queue_init(&new_socket_confs);
1256     nxt_queue_add(&new_socket_confs, &updating_sockets);
1257     nxt_queue_add(&new_socket_confs, &pending_sockets);
1258     nxt_queue_add(&new_socket_confs, &creating_sockets);
1259 
1260     rtcf = tmcf->router_conf;
1261 
1262     nxt_queue_each(app, &tmcf->apps, nxt_app_t, link) {
1263 
1264         nxt_router_app_unlink(task, app);
1265 
1266     } nxt_queue_loop;
1267 
1268     router = rtcf->router;
1269 
1270     nxt_queue_add(&router->sockets, &keeping_sockets);
1271     nxt_queue_add(&router->sockets, &deleting_sockets);
1272 
1273     nxt_queue_add(&router->apps, &tmcf->previous);
1274 
1275     // TODO: new engines and threads
1276 
1277     nxt_router_access_log_release(task, &router->lock, rtcf->access_log);
1278 
1279     nxt_mp_destroy(rtcf->mem_pool);
1280 
1281     nxt_router_conf_send(task, tmcf, NXT_PORT_MSG_RPC_ERROR);
1282 
1283     nxt_mp_release(tmcf->mem_pool);
1284 }
1285 
1286 
1287 static void
1288 nxt_router_conf_send(nxt_task_t *task, nxt_router_temp_conf_t *tmcf,
1289     nxt_port_msg_type_t type)
1290 {
1291     nxt_port_socket_write(task, tmcf->port, type, -1, tmcf->stream, 0, NULL);
1292 
1293     nxt_port_use(task, tmcf->port, -1);
1294 
1295     tmcf->port = NULL;
1296 }
1297 
1298 
1299 static nxt_conf_map_t  nxt_router_conf[] = {
1300     {
1301         nxt_string("listeners_threads"),
1302         NXT_CONF_MAP_INT32,
1303         offsetof(nxt_router_conf_t, threads),
1304     },
1305 };
1306 
1307 
1308 static nxt_conf_map_t  nxt_router_app_conf[] = {
1309     {
1310         nxt_string("type"),
1311         NXT_CONF_MAP_STR,
1312         offsetof(nxt_router_app_conf_t, type),
1313     },
1314 
1315     {
1316         nxt_string("limits"),
1317         NXT_CONF_MAP_PTR,
1318         offsetof(nxt_router_app_conf_t, limits_value),
1319     },
1320 
1321     {
1322         nxt_string("processes"),
1323         NXT_CONF_MAP_INT32,
1324         offsetof(nxt_router_app_conf_t, processes),
1325     },
1326 
1327     {
1328         nxt_string("processes"),
1329         NXT_CONF_MAP_PTR,
1330         offsetof(nxt_router_app_conf_t, processes_value),
1331     },
1332 
1333     {
1334         nxt_string("targets"),
1335         NXT_CONF_MAP_PTR,
1336         offsetof(nxt_router_app_conf_t, targets_value),
1337     },
1338 };
1339 
1340 
1341 static nxt_conf_map_t  nxt_router_app_limits_conf[] = {
1342     {
1343         nxt_string("timeout"),
1344         NXT_CONF_MAP_MSEC,
1345         offsetof(nxt_router_app_conf_t, timeout),
1346     },
1347 };
1348 
1349 
1350 static nxt_conf_map_t  nxt_router_app_processes_conf[] = {
1351     {
1352         nxt_string("spare"),
1353         NXT_CONF_MAP_INT32,
1354         offsetof(nxt_router_app_conf_t, spare_processes),
1355     },
1356 
1357     {
1358         nxt_string("max"),
1359         NXT_CONF_MAP_INT32,
1360         offsetof(nxt_router_app_conf_t, max_processes),
1361     },
1362 
1363     {
1364         nxt_string("idle_timeout"),
1365         NXT_CONF_MAP_MSEC,
1366         offsetof(nxt_router_app_conf_t, idle_timeout),
1367     },
1368 };
1369 
1370 
1371 static nxt_conf_map_t  nxt_router_listener_conf[] = {
1372     {
1373         nxt_string("pass"),
1374         NXT_CONF_MAP_STR_COPY,
1375         offsetof(nxt_router_listener_conf_t, pass),
1376     },
1377 
1378     {
1379         nxt_string("application"),
1380         NXT_CONF_MAP_STR_COPY,
1381         offsetof(nxt_router_listener_conf_t, application),
1382     },
1383 };
1384 
1385 
1386 static nxt_conf_map_t  nxt_router_http_conf[] = {
1387     {
1388         nxt_string("header_buffer_size"),
1389         NXT_CONF_MAP_SIZE,
1390         offsetof(nxt_socket_conf_t, header_buffer_size),
1391     },
1392 
1393     {
1394         nxt_string("large_header_buffer_size"),
1395         NXT_CONF_MAP_SIZE,
1396         offsetof(nxt_socket_conf_t, large_header_buffer_size),
1397     },
1398 
1399     {
1400         nxt_string("large_header_buffers"),
1401         NXT_CONF_MAP_SIZE,
1402         offsetof(nxt_socket_conf_t, large_header_buffers),
1403     },
1404 
1405     {
1406         nxt_string("body_buffer_size"),
1407         NXT_CONF_MAP_SIZE,
1408         offsetof(nxt_socket_conf_t, body_buffer_size),
1409     },
1410 
1411     {
1412         nxt_string("max_body_size"),
1413         NXT_CONF_MAP_SIZE,
1414         offsetof(nxt_socket_conf_t, max_body_size),
1415     },
1416 
1417     {
1418         nxt_string("idle_timeout"),
1419         NXT_CONF_MAP_MSEC,
1420         offsetof(nxt_socket_conf_t, idle_timeout),
1421     },
1422 
1423     {
1424         nxt_string("header_read_timeout"),
1425         NXT_CONF_MAP_MSEC,
1426         offsetof(nxt_socket_conf_t, header_read_timeout),
1427     },
1428 
1429     {
1430         nxt_string("body_read_timeout"),
1431         NXT_CONF_MAP_MSEC,
1432         offsetof(nxt_socket_conf_t, body_read_timeout),
1433     },
1434 
1435     {
1436         nxt_string("send_timeout"),
1437         NXT_CONF_MAP_MSEC,
1438         offsetof(nxt_socket_conf_t, send_timeout),
1439     },
1440 
1441     {
1442         nxt_string("body_temp_path"),
1443         NXT_CONF_MAP_STR,
1444         offsetof(nxt_socket_conf_t, body_temp_path),
1445     },
1446 
1447     {
1448         nxt_string("discard_unsafe_fields"),
1449         NXT_CONF_MAP_INT8,
1450         offsetof(nxt_socket_conf_t, discard_unsafe_fields),
1451     },
1452 };
1453 
1454 
1455 static nxt_conf_map_t  nxt_router_websocket_conf[] = {
1456     {
1457         nxt_string("max_frame_size"),
1458         NXT_CONF_MAP_SIZE,
1459         offsetof(nxt_websocket_conf_t, max_frame_size),
1460     },
1461 
1462     {
1463         nxt_string("read_timeout"),
1464         NXT_CONF_MAP_MSEC,
1465         offsetof(nxt_websocket_conf_t, read_timeout),
1466     },
1467 
1468     {
1469         nxt_string("keepalive_interval"),
1470         NXT_CONF_MAP_MSEC,
1471         offsetof(nxt_websocket_conf_t, keepalive_interval),
1472     },
1473 
1474 };
1475 
1476 
1477 static nxt_int_t
1478 nxt_router_conf_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf,
1479     u_char *start, u_char *end)
1480 {
1481     u_char                      *p;
1482     size_t                      size;
1483     nxt_mp_t                    *mp, *app_mp;
1484     uint32_t                    next, next_target;
1485     nxt_int_t                   ret;
1486     nxt_str_t                   name, path, target;
1487     nxt_app_t                   *app, *prev;
1488     nxt_str_t                   *t, *s, *targets;
1489     nxt_uint_t                  n, i;
1490     nxt_port_t                  *port;
1491     nxt_router_t                *router;
1492     nxt_app_joint_t             *app_joint;
1493 #if (NXT_TLS)
1494     nxt_tls_init_t              *tls_init;
1495     nxt_conf_value_t            *certificate;
1496 #endif
1497     nxt_conf_value_t            *conf, *http, *value, *websocket;
1498     nxt_conf_value_t            *applications, *application;
1499     nxt_conf_value_t            *listeners, *listener;
1500     nxt_conf_value_t            *routes_conf, *static_conf, *client_ip_conf;
1501     nxt_socket_conf_t           *skcf;
1502     nxt_http_routes_t           *routes;
1503     nxt_event_engine_t          *engine;
1504     nxt_app_lang_module_t       *lang;
1505     nxt_router_app_conf_t       apcf;
1506     nxt_router_access_log_t     *access_log;
1507     nxt_router_listener_conf_t  lscf;
1508 
1509     static nxt_str_t  http_path = nxt_string("/settings/http");
1510     static nxt_str_t  applications_path = nxt_string("/applications");
1511     static nxt_str_t  listeners_path = nxt_string("/listeners");
1512     static nxt_str_t  routes_path = nxt_string("/routes");
1513     static nxt_str_t  access_log_path = nxt_string("/access_log");
1514 #if (NXT_TLS)
1515     static nxt_str_t  certificate_path = nxt_string("/tls/certificate");
1516     static nxt_str_t  conf_commands_path = nxt_string("/tls/conf_commands");
1517     static nxt_str_t  conf_cache_path = nxt_string("/tls/session/cache_size");
1518     static nxt_str_t  conf_timeout_path = nxt_string("/tls/session/timeout");
1519     static nxt_str_t  conf_tickets = nxt_string("/tls/session/tickets");
1520 #endif
1521     static nxt_str_t  static_path = nxt_string("/settings/http/static");
1522     static nxt_str_t  websocket_path = nxt_string("/settings/http/websocket");
1523     static nxt_str_t  client_ip_path = nxt_string("/client_ip");
1524 
1525     conf = nxt_conf_json_parse(tmcf->mem_pool, start, end, NULL);
1526     if (conf == NULL) {
1527         nxt_alert(task, "configuration parsing error");
1528         return NXT_ERROR;
1529     }
1530 
1531     mp = tmcf->router_conf->mem_pool;
1532 
1533     ret = nxt_conf_map_object(mp, conf, nxt_router_conf,
1534                               nxt_nitems(nxt_router_conf), tmcf->router_conf);
1535     if (ret != NXT_OK) {
1536         nxt_alert(task, "root map error");
1537         return NXT_ERROR;
1538     }
1539 
1540     if (tmcf->router_conf->threads == 0) {
1541         tmcf->router_conf->threads = nxt_ncpu;
1542     }
1543 
1544     static_conf = nxt_conf_get_path(conf, &static_path);
1545 
1546     ret = nxt_router_conf_process_static(task, tmcf->router_conf, static_conf);
1547     if (nxt_slow_path(ret != NXT_OK)) {
1548         return NXT_ERROR;
1549     }
1550 
1551     router = tmcf->router_conf->router;
1552 
1553     applications = nxt_conf_get_path(conf, &applications_path);
1554 
1555     if (applications != NULL) {
1556         next = 0;
1557 
1558         for ( ;; ) {
1559             application = nxt_conf_next_object_member(applications,
1560                                                       &name, &next);
1561             if (application == NULL) {
1562                 break;
1563             }
1564 
1565             nxt_debug(task, "application \"%V\"", &name);
1566 
1567             size = nxt_conf_json_length(application, NULL);
1568 
1569             app_mp = nxt_mp_create(4096, 128, 1024, 64);
1570             if (nxt_slow_path(app_mp == NULL)) {
1571                 goto fail;
1572             }
1573 
1574             app = nxt_mp_get(app_mp, sizeof(nxt_app_t) + name.length + size);
1575             if (app == NULL) {
1576                 goto app_fail;
1577             }
1578 
1579             nxt_memzero(app, sizeof(nxt_app_t));
1580 
1581             app->mem_pool = app_mp;
1582 
1583             app->name.start = nxt_pointer_to(app, sizeof(nxt_app_t));
1584             app->conf.start = nxt_pointer_to(app, sizeof(nxt_app_t)
1585                                                   + name.length);
1586 
1587             p = nxt_conf_json_print(app->conf.start, application, NULL);
1588             app->conf.length = p - app->conf.start;
1589 
1590             nxt_assert(app->conf.length <= size);
1591 
1592             nxt_debug(task, "application conf \"%V\"", &app->conf);
1593 
1594             prev = nxt_router_app_find(&router->apps, &name);
1595 
1596             if (prev != NULL && nxt_strstr_eq(&app->conf, &prev->conf)) {
1597                 nxt_mp_destroy(app_mp);
1598 
1599                 nxt_queue_remove(&prev->link);
1600                 nxt_queue_insert_tail(&tmcf->previous, &prev->link);
1601 
1602                 ret = nxt_router_apps_hash_add(tmcf->router_conf, prev);
1603                 if (nxt_slow_path(ret != NXT_OK)) {
1604                     goto fail;
1605                 }
1606 
1607                 continue;
1608             }
1609 
1610             apcf.processes = 1;
1611             apcf.max_processes = 1;
1612             apcf.spare_processes = 0;
1613             apcf.timeout = 0;
1614             apcf.idle_timeout = 15000;
1615             apcf.limits_value = NULL;
1616             apcf.processes_value = NULL;
1617             apcf.targets_value = NULL;
1618 
1619             app_joint = nxt_malloc(sizeof(nxt_app_joint_t));
1620             if (nxt_slow_path(app_joint == NULL)) {
1621                 goto app_fail;
1622             }
1623 
1624             nxt_memzero(app_joint, sizeof(nxt_app_joint_t));
1625 
1626             ret = nxt_conf_map_object(mp, application, nxt_router_app_conf,
1627                                       nxt_nitems(nxt_router_app_conf), &apcf);
1628             if (ret != NXT_OK) {
1629                 nxt_alert(task, "application map error");
1630                 goto app_fail;
1631             }
1632 
1633             if (apcf.limits_value != NULL) {
1634 
1635                 if (nxt_conf_type(apcf.limits_value) != NXT_CONF_OBJECT) {
1636                     nxt_alert(task, "application limits is not object");
1637                     goto app_fail;
1638                 }
1639 
1640                 ret = nxt_conf_map_object(mp, apcf.limits_value,
1641                                         nxt_router_app_limits_conf,
1642                                         nxt_nitems(nxt_router_app_limits_conf),
1643                                         &apcf);
1644                 if (ret != NXT_OK) {
1645                     nxt_alert(task, "application limits map error");
1646                     goto app_fail;
1647                 }
1648             }
1649 
1650             if (apcf.processes_value != NULL
1651                 && nxt_conf_type(apcf.processes_value) == NXT_CONF_OBJECT)
1652             {
1653                 ret = nxt_conf_map_object(mp, apcf.processes_value,
1654                                      nxt_router_app_processes_conf,
1655                                      nxt_nitems(nxt_router_app_processes_conf),
1656                                      &apcf);
1657                 if (ret != NXT_OK) {
1658                     nxt_alert(task, "application processes map error");
1659                     goto app_fail;
1660                 }
1661 
1662             } else {
1663                 apcf.max_processes = apcf.processes;
1664                 apcf.spare_processes = apcf.processes;
1665             }
1666 
1667             if (apcf.targets_value != NULL) {
1668                 n = nxt_conf_object_members_count(apcf.targets_value);
1669 
1670                 targets = nxt_mp_get(app_mp, sizeof(nxt_str_t) * n);
1671                 if (nxt_slow_path(targets == NULL)) {
1672                     goto app_fail;
1673                 }
1674 
1675                 next_target = 0;
1676 
1677                 for (i = 0; i < n; i++) {
1678                     (void) nxt_conf_next_object_member(apcf.targets_value,
1679                                                        &target, &next_target);
1680 
1681                     s = nxt_str_dup(app_mp, &targets[i], &target);
1682                     if (nxt_slow_path(s == NULL)) {
1683                         goto app_fail;
1684                     }
1685                 }
1686 
1687             } else {
1688                 targets = NULL;
1689             }
1690 
1691             nxt_debug(task, "application type: %V", &apcf.type);
1692             nxt_debug(task, "application processes: %D", apcf.processes);
1693             nxt_debug(task, "application request timeout: %M", apcf.timeout);
1694 
1695             lang = nxt_app_lang_module(task->thread->runtime, &apcf.type);
1696 
1697             if (lang == NULL) {
1698                 nxt_alert(task, "unknown application type: \"%V\"", &apcf.type);
1699                 goto app_fail;
1700             }
1701 
1702             nxt_debug(task, "application language module: \"%s\"", lang->file);
1703 
1704             ret = nxt_thread_mutex_create(&app->mutex);
1705             if (ret != NXT_OK) {
1706                 goto app_fail;
1707             }
1708 
1709             nxt_queue_init(&app->ports);
1710             nxt_queue_init(&app->spare_ports);
1711             nxt_queue_init(&app->idle_ports);
1712             nxt_queue_init(&app->ack_waiting_req);
1713 
1714             app->name.length = name.length;
1715             nxt_memcpy(app->name.start, name.start, name.length);
1716 
1717             app->type = lang->type;
1718             app->max_processes = apcf.max_processes;
1719             app->spare_processes = apcf.spare_processes;
1720             app->max_pending_processes = apcf.spare_processes
1721                                          ? apcf.spare_processes : 1;
1722             app->timeout = apcf.timeout;
1723             app->idle_timeout = apcf.idle_timeout;
1724 
1725             app->targets = targets;
1726 
1727             engine = task->thread->engine;
1728 
1729             app->engine = engine;
1730 
1731             app->adjust_idle_work.handler = nxt_router_adjust_idle_timer;
1732             app->adjust_idle_work.task = &engine->task;
1733             app->adjust_idle_work.obj = app;
1734 
1735             nxt_queue_insert_tail(&tmcf->apps, &app->link);
1736 
1737             ret = nxt_router_apps_hash_add(tmcf->router_conf, app);
1738             if (nxt_slow_path(ret != NXT_OK)) {
1739                 goto app_fail;
1740             }
1741 
1742             nxt_router_app_use(task, app, 1);
1743 
1744             app->joint = app_joint;
1745 
1746             app_joint->use_count = 1;
1747             app_joint->app = app;
1748 
1749             app_joint->idle_timer.bias = NXT_TIMER_DEFAULT_BIAS;
1750             app_joint->idle_timer.work_queue = &engine->fast_work_queue;
1751             app_joint->idle_timer.handler = nxt_router_app_idle_timeout;
1752             app_joint->idle_timer.task = &engine->task;
1753             app_joint->idle_timer.log = app_joint->idle_timer.task->log;
1754 
1755             app_joint->free_app_work.handler = nxt_router_free_app;
1756             app_joint->free_app_work.task = &engine->task;
1757             app_joint->free_app_work.obj = app_joint;
1758 
1759             port = nxt_port_new(task, NXT_SHARED_PORT_ID, nxt_pid,
1760                                 NXT_PROCESS_APP);
1761             if (nxt_slow_path(port == NULL)) {
1762                 return NXT_ERROR;
1763             }
1764 
1765             ret = nxt_port_socket_init(task, port, 0);
1766             if (nxt_slow_path(ret != NXT_OK)) {
1767                 nxt_port_use(task, port, -1);
1768                 return NXT_ERROR;
1769             }
1770 
1771             ret = nxt_router_app_queue_init(task, port);
1772             if (nxt_slow_path(ret != NXT_OK)) {
1773                 nxt_port_write_close(port);
1774                 nxt_port_read_close(port);
1775                 nxt_port_use(task, port, -1);
1776                 return NXT_ERROR;
1777             }
1778 
1779             nxt_port_write_enable(task, port);
1780             port->app = app;
1781 
1782             app->shared_port = port;
1783 
1784             nxt_thread_mutex_create(&app->outgoing.mutex);
1785         }
1786     }
1787 
1788     routes_conf = nxt_conf_get_path(conf, &routes_path);
1789     if (nxt_fast_path(routes_conf != NULL)) {
1790         routes = nxt_http_routes_create(task, tmcf, routes_conf);
1791         if (nxt_slow_path(routes == NULL)) {
1792             return NXT_ERROR;
1793         }
1794         tmcf->router_conf->routes = routes;
1795     }
1796 
1797     ret = nxt_upstreams_create(task, tmcf, conf);
1798     if (nxt_slow_path(ret != NXT_OK)) {
1799         return ret;
1800     }
1801 
1802     http = nxt_conf_get_path(conf, &http_path);
1803 #if 0
1804     if (http == NULL) {
1805         nxt_alert(task, "no \"http\" block");
1806         return NXT_ERROR;
1807     }
1808 #endif
1809 
1810     websocket = nxt_conf_get_path(conf, &websocket_path);
1811 
1812     listeners = nxt_conf_get_path(conf, &listeners_path);
1813 
1814     if (listeners != NULL) {
1815         next = 0;
1816 
1817         for ( ;; ) {
1818             listener = nxt_conf_next_object_member(listeners, &name, &next);
1819             if (listener == NULL) {
1820                 break;
1821             }
1822 
1823             skcf = nxt_router_socket_conf(task, tmcf, &name);
1824             if (skcf == NULL) {
1825                 goto fail;
1826             }
1827 
1828             nxt_memzero(&lscf, sizeof(lscf));
1829 
1830             ret = nxt_conf_map_object(mp, listener, nxt_router_listener_conf,
1831                                       nxt_nitems(nxt_router_listener_conf),
1832                                       &lscf);
1833             if (ret != NXT_OK) {
1834                 nxt_alert(task, "listener map error");
1835                 goto fail;
1836             }
1837 
1838             nxt_debug(task, "application: %V", &lscf.application);
1839 
1840             // STUB, default values if http block is not defined.
1841             skcf->header_buffer_size = 2048;
1842             skcf->large_header_buffer_size = 8192;
1843             skcf->large_header_buffers = 4;
1844             skcf->discard_unsafe_fields = 1;
1845             skcf->body_buffer_size = 16 * 1024;
1846             skcf->max_body_size = 8 * 1024 * 1024;
1847             skcf->proxy_header_buffer_size = 64 * 1024;
1848             skcf->proxy_buffer_size = 4096;
1849             skcf->proxy_buffers = 256;
1850             skcf->idle_timeout = 180 * 1000;
1851             skcf->header_read_timeout = 30 * 1000;
1852             skcf->body_read_timeout = 30 * 1000;
1853             skcf->send_timeout = 30 * 1000;
1854             skcf->proxy_timeout = 60 * 1000;
1855             skcf->proxy_send_timeout = 30 * 1000;
1856             skcf->proxy_read_timeout = 30 * 1000;
1857 
1858             skcf->websocket_conf.max_frame_size = 1024 * 1024;
1859             skcf->websocket_conf.read_timeout = 60 * 1000;
1860             skcf->websocket_conf.keepalive_interval = 30 * 1000;
1861 
1862             nxt_str_null(&skcf->body_temp_path);
1863 
1864             if (http != NULL) {
1865                 ret = nxt_conf_map_object(mp, http, nxt_router_http_conf,
1866                                           nxt_nitems(nxt_router_http_conf),
1867                                           skcf);
1868                 if (ret != NXT_OK) {
1869                     nxt_alert(task, "http map error");
1870                     goto fail;
1871                 }
1872             }
1873 
1874             if (websocket != NULL) {
1875                 ret = nxt_conf_map_object(mp, websocket,
1876                                           nxt_router_websocket_conf,
1877                                           nxt_nitems(nxt_router_websocket_conf),
1878                                           &skcf->websocket_conf);
1879                 if (ret != NXT_OK) {
1880                     nxt_alert(task, "websocket map error");
1881                     goto fail;
1882                 }
1883             }
1884 
1885             t = &skcf->body_temp_path;
1886 
1887             if (t->length == 0) {
1888                 t->start = (u_char *) task->thread->runtime->tmp;
1889                 t->length = nxt_strlen(t->start);
1890             }
1891 
1892             client_ip_conf = nxt_conf_get_path(listener, &client_ip_path);
1893             ret = nxt_router_conf_process_client_ip(task, tmcf, skcf,
1894                                                     client_ip_conf);
1895             if (nxt_slow_path(ret != NXT_OK)) {
1896                 return NXT_ERROR;
1897             }
1898 
1899 #if (NXT_TLS)
1900             certificate = nxt_conf_get_path(listener, &certificate_path);
1901 
1902             if (certificate != NULL) {
1903                 tls_init = nxt_mp_get(tmcf->mem_pool, sizeof(nxt_tls_init_t));
1904                 if (nxt_slow_path(tls_init == NULL)) {
1905                     return NXT_ERROR;
1906                 }
1907 
1908                 tls_init->cache_size = 0;
1909                 tls_init->timeout = 300;
1910 
1911                 value = nxt_conf_get_path(listener, &conf_cache_path);
1912                 if (value != NULL) {
1913                     tls_init->cache_size = nxt_conf_get_number(value);
1914                 }
1915 
1916                 value = nxt_conf_get_path(listener, &conf_timeout_path);
1917                 if (value != NULL) {
1918                     tls_init->timeout = nxt_conf_get_number(value);
1919                 }
1920 
1921                 tls_init->conf_cmds = nxt_conf_get_path(listener,
1922                                                         &conf_commands_path);
1923 
1924                 tls_init->tickets_conf = nxt_conf_get_path(listener,
1925                                                            &conf_tickets);
1926 
1927                 if (nxt_conf_type(certificate) == NXT_CONF_ARRAY) {
1928                     n = nxt_conf_array_elements_count(certificate);
1929 
1930                     for (i = 0; i < n; i++) {
1931                         value = nxt_conf_get_array_element(certificate, i);
1932 
1933                         nxt_assert(value != NULL);
1934 
1935                         ret = nxt_router_conf_tls_insert(tmcf, value, skcf,
1936                                                          tls_init, i == 0);
1937                         if (nxt_slow_path(ret != NXT_OK)) {
1938                             goto fail;
1939                         }
1940                     }
1941 
1942                 } else {
1943                     /* NXT_CONF_STRING */
1944                     ret = nxt_router_conf_tls_insert(tmcf, certificate, skcf,
1945                                                      tls_init, 1);
1946                     if (nxt_slow_path(ret != NXT_OK)) {
1947                         goto fail;
1948                     }
1949                 }
1950             }
1951 #endif
1952 
1953             skcf->listen->handler = nxt_http_conn_init;
1954             skcf->router_conf = tmcf->router_conf;
1955             skcf->router_conf->count++;
1956 
1957             if (lscf.pass.length != 0) {
1958                 skcf->action = nxt_http_action_create(task, tmcf, &lscf.pass);
1959 
1960             /* COMPATIBILITY: listener application. */
1961             } else if (lscf.application.length > 0) {
1962                 skcf->action = nxt_http_pass_application(task,
1963                                                          tmcf->router_conf,
1964                                                          &lscf.application);
1965             }
1966 
1967             if (nxt_slow_path(skcf->action == NULL)) {
1968                 goto fail;
1969             }
1970         }
1971     }
1972 
1973     ret = nxt_http_routes_resolve(task, tmcf);
1974     if (nxt_slow_path(ret != NXT_OK)) {
1975         goto fail;
1976     }
1977 
1978     value = nxt_conf_get_path(conf, &access_log_path);
1979 
1980     if (value != NULL) {
1981         nxt_conf_get_string(value, &path);
1982 
1983         access_log = router->access_log;
1984 
1985         if (access_log != NULL && nxt_strstr_eq(&path, &access_log->path)) {
1986             nxt_router_access_log_use(&router->lock, access_log);
1987 
1988         } else {
1989             access_log = nxt_malloc(sizeof(nxt_router_access_log_t)
1990                                     + path.length);
1991             if (access_log == NULL) {
1992                 nxt_alert(task, "failed to allocate access log structure");
1993                 goto fail;
1994             }
1995 
1996             access_log->fd = -1;
1997             access_log->handler = &nxt_router_access_log_writer;
1998             access_log->count = 1;
1999 
2000             access_log->path.length = path.length;
2001             access_log->path.start = (u_char *) access_log
2002                                      + sizeof(nxt_router_access_log_t);
2003 
2004             nxt_memcpy(access_log->path.start, path.start, path.length);
2005         }
2006 
2007         tmcf->router_conf->access_log = access_log;
2008     }
2009 
2010     nxt_queue_add(&deleting_sockets, &router->sockets);
2011     nxt_queue_init(&router->sockets);
2012 
2013     return NXT_OK;
2014 
2015 app_fail:
2016 
2017     nxt_mp_destroy(app_mp);
2018 
2019 fail:
2020 
2021     nxt_queue_each(app, &tmcf->apps, nxt_app_t, link) {
2022 
2023         nxt_queue_remove(&app->link);
2024         nxt_thread_mutex_destroy(&app->mutex);
2025         nxt_mp_destroy(app->mem_pool);
2026 
2027     } nxt_queue_loop;
2028 
2029     return NXT_ERROR;
2030 }
2031 
2032 
2033 #if (NXT_TLS)
2034 
2035 static nxt_int_t
2036 nxt_router_conf_tls_insert(nxt_router_temp_conf_t *tmcf,
2037     nxt_conf_value_t *value, nxt_socket_conf_t *skcf,
2038     nxt_tls_init_t *tls_init, nxt_bool_t last)
2039 {
2040     nxt_router_tlssock_t  *tls;
2041 
2042     tls = nxt_mp_get(tmcf->mem_pool, sizeof(nxt_router_tlssock_t));
2043     if (nxt_slow_path(tls == NULL)) {
2044         return NXT_ERROR;
2045     }
2046 
2047     tls->tls_init = tls_init;
2048     tls->socket_conf = skcf;
2049     tls->temp_conf = tmcf;
2050     tls->last = last;
2051     nxt_conf_get_string(value, &tls->name);
2052 
2053     nxt_queue_insert_tail(&tmcf->tls, &tls->link);
2054 
2055     return NXT_OK;
2056 }
2057 
2058 #endif
2059 
2060 
2061 static nxt_int_t
2062 nxt_router_conf_process_static(nxt_task_t *task, nxt_router_conf_t *rtcf,
2063     nxt_conf_value_t *conf)
2064 {
2065     uint32_t          next, i;
2066     nxt_mp_t          *mp;
2067     nxt_str_t         *type, exten, str;
2068     nxt_int_t         ret;
2069     nxt_uint_t        exts;
2070     nxt_conf_value_t  *mtypes_conf, *ext_conf, *value;
2071 
2072     static nxt_str_t  mtypes_path = nxt_string("/mime_types");
2073 
2074     mp = rtcf->mem_pool;
2075 
2076     ret = nxt_http_static_mtypes_init(mp, &rtcf->mtypes_hash);
2077     if (nxt_slow_path(ret != NXT_OK)) {
2078         return NXT_ERROR;
2079     }
2080 
2081     if (conf == NULL) {
2082         return NXT_OK;
2083     }
2084 
2085     mtypes_conf = nxt_conf_get_path(conf, &mtypes_path);
2086 
2087     if (mtypes_conf != NULL) {
2088         next = 0;
2089 
2090         for ( ;; ) {
2091             ext_conf = nxt_conf_next_object_member(mtypes_conf, &str, &next);
2092 
2093             if (ext_conf == NULL) {
2094                 break;
2095             }
2096 
2097             type = nxt_str_dup(mp, NULL, &str);
2098             if (nxt_slow_path(type == NULL)) {
2099                 return NXT_ERROR;
2100             }
2101 
2102             if (nxt_conf_type(ext_conf) == NXT_CONF_STRING) {
2103                 nxt_conf_get_string(ext_conf, &str);
2104 
2105                 if (nxt_slow_path(nxt_str_dup(mp, &exten, &str) == NULL)) {
2106                     return NXT_ERROR;
2107                 }
2108 
2109                 ret = nxt_http_static_mtypes_hash_add(mp, &rtcf->mtypes_hash,
2110                                                       &exten, type);
2111                 if (nxt_slow_path(ret != NXT_OK)) {
2112                     return NXT_ERROR;
2113                 }
2114 
2115                 continue;
2116             }
2117 
2118             exts = nxt_conf_array_elements_count(ext_conf);
2119 
2120             for (i = 0; i < exts; i++) {
2121                 value = nxt_conf_get_array_element(ext_conf, i);
2122 
2123                 nxt_conf_get_string(value, &str);
2124 
2125                 if (nxt_slow_path(nxt_str_dup(mp, &exten, &str) == NULL)) {
2126                     return NXT_ERROR;
2127                 }
2128 
2129                 ret = nxt_http_static_mtypes_hash_add(mp, &rtcf->mtypes_hash,
2130                                                       &exten, type);
2131                 if (nxt_slow_path(ret != NXT_OK)) {
2132                     return NXT_ERROR;
2133                 }
2134             }
2135         }
2136     }
2137 
2138     return NXT_OK;
2139 }
2140 
2141 
2142 static nxt_int_t
2143 nxt_router_conf_process_client_ip(nxt_task_t *task, nxt_router_temp_conf_t *tmcf,
2144     nxt_socket_conf_t *skcf, nxt_conf_value_t *conf)
2145 {
2146     char                        c;
2147     size_t                      i;
2148     nxt_mp_t                    *mp;
2149     uint32_t                    hash;
2150     nxt_str_t                   header;
2151     nxt_conf_value_t            *source_conf, *header_conf, *recursive_conf;
2152     nxt_http_client_ip_t        *client_ip;
2153     nxt_http_route_addr_rule_t  *source;
2154 
2155     static nxt_str_t  header_path = nxt_string("/header");
2156     static nxt_str_t  source_path = nxt_string("/source");
2157     static nxt_str_t  recursive_path = nxt_string("/recursive");
2158 
2159     if (conf == NULL) {
2160         skcf->client_ip = NULL;
2161 
2162         return NXT_OK;
2163     }
2164 
2165     mp = tmcf->router_conf->mem_pool;
2166 
2167     source_conf = nxt_conf_get_path(conf, &source_path);
2168     header_conf = nxt_conf_get_path(conf, &header_path);
2169     recursive_conf = nxt_conf_get_path(conf, &recursive_path);
2170 
2171     if (source_conf == NULL || header_conf == NULL) {
2172         return NXT_ERROR;
2173     }
2174 
2175     client_ip = nxt_mp_zget(mp, sizeof(nxt_http_client_ip_t));
2176     if (nxt_slow_path(client_ip == NULL)) {
2177         return NXT_ERROR;
2178     }
2179 
2180     source = nxt_http_route_addr_rule_create(task, mp, source_conf);
2181     if (nxt_slow_path(source == NULL)) {
2182         return NXT_ERROR;
2183     }
2184 
2185     client_ip->source = source;
2186 
2187     nxt_conf_get_string(header_conf, &header);
2188 
2189     if (recursive_conf != NULL) {
2190         client_ip->recursive = nxt_conf_get_boolean(recursive_conf);
2191     }
2192 
2193     client_ip->header = nxt_str_dup(mp, NULL, &header);
2194     if (nxt_slow_path(client_ip->header == NULL)) {
2195         return NXT_ERROR;
2196     }
2197 
2198     hash = NXT_HTTP_FIELD_HASH_INIT;
2199 
2200     for (i = 0; i < client_ip->header->length; i++) {
2201         c = client_ip->header->start[i];
2202         hash = nxt_http_field_hash_char(hash, nxt_lowcase(c));
2203     }
2204 
2205     hash = nxt_http_field_hash_end(hash) & 0xFFFF;
2206 
2207     client_ip->header_hash = hash;
2208 
2209     skcf->client_ip = client_ip;
2210 
2211     return NXT_OK;
2212 }
2213 
2214 
2215 static nxt_app_t *
2216 nxt_router_app_find(nxt_queue_t *queue, nxt_str_t *name)
2217 {
2218     nxt_app_t  *app;
2219 
2220     nxt_queue_each(app, queue, nxt_app_t, link) {
2221 
2222         if (nxt_strstr_eq(name, &app->name)) {
2223             return app;
2224         }
2225 
2226     } nxt_queue_loop;
2227 
2228     return NULL;
2229 }
2230 
2231 
2232 static nxt_int_t
2233 nxt_router_app_queue_init(nxt_task_t *task, nxt_port_t *port)
2234 {
2235     void       *mem;
2236     nxt_int_t  fd;
2237 
2238     fd = nxt_shm_open(task, sizeof(nxt_app_queue_t));
2239     if (nxt_slow_path(fd == -1)) {
2240         return NXT_ERROR;
2241     }
2242 
2243     mem = nxt_mem_mmap(NULL, sizeof(nxt_app_queue_t),
2244                        PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
2245     if (nxt_slow_path(mem == MAP_FAILED)) {
2246         nxt_fd_close(fd);
2247 
2248         return NXT_ERROR;
2249     }
2250 
2251     nxt_app_queue_init(mem);
2252 
2253     port->queue_fd = fd;
2254     port->queue = mem;
2255 
2256     return NXT_OK;
2257 }
2258 
2259 
2260 static nxt_int_t
2261 nxt_router_port_queue_init(nxt_task_t *task, nxt_port_t *port)
2262 {
2263     void       *mem;
2264     nxt_int_t  fd;
2265 
2266     fd = nxt_shm_open(task, sizeof(nxt_port_queue_t));
2267     if (nxt_slow_path(fd == -1)) {
2268         return NXT_ERROR;
2269     }
2270 
2271     mem = nxt_mem_mmap(NULL, sizeof(nxt_port_queue_t),
2272                        PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
2273     if (nxt_slow_path(mem == MAP_FAILED)) {
2274         nxt_fd_close(fd);
2275 
2276         return NXT_ERROR;
2277     }
2278 
2279     nxt_port_queue_init(mem);
2280 
2281     port->queue_fd = fd;
2282     port->queue = mem;
2283 
2284     return NXT_OK;
2285 }
2286 
2287 
2288 static nxt_int_t
2289 nxt_router_port_queue_map(nxt_task_t *task, nxt_port_t *port, nxt_fd_t fd)
2290 {
2291     void  *mem;
2292 
2293     nxt_assert(fd != -1);
2294 
2295     mem = nxt_mem_mmap(NULL, sizeof(nxt_port_queue_t),
2296                        PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
2297     if (nxt_slow_path(mem == MAP_FAILED)) {
2298 
2299         return NXT_ERROR;
2300     }
2301 
2302     port->queue = mem;
2303 
2304     return NXT_OK;
2305 }
2306 
2307 
2308 static const nxt_lvlhsh_proto_t  nxt_router_apps_hash_proto  nxt_aligned(64) = {
2309     NXT_LVLHSH_DEFAULT,
2310     nxt_router_apps_hash_test,
2311     nxt_mp_lvlhsh_alloc,
2312     nxt_mp_lvlhsh_free,
2313 };
2314 
2315 
2316 static nxt_int_t
2317 nxt_router_apps_hash_test(nxt_lvlhsh_query_t *lhq, void *data)
2318 {
2319     nxt_app_t  *app;
2320 
2321     app = data;
2322 
2323     return nxt_strstr_eq(&lhq->key, &app->name) ? NXT_OK : NXT_DECLINED;
2324 }
2325 
2326 
2327 static nxt_int_t
2328 nxt_router_apps_hash_add(nxt_router_conf_t *rtcf, nxt_app_t *app)
2329 {
2330     nxt_lvlhsh_query_t  lhq;
2331 
2332     lhq.key_hash = nxt_djb_hash(app->name.start, app->name.length);
2333     lhq.replace = 0;
2334     lhq.key = app->name;
2335     lhq.value = app;
2336     lhq.proto = &nxt_router_apps_hash_proto;
2337     lhq.pool = rtcf->mem_pool;
2338 
2339     switch (nxt_lvlhsh_insert(&rtcf->apps_hash, &lhq)) {
2340 
2341     case NXT_OK:
2342         return NXT_OK;
2343 
2344     case NXT_DECLINED:
2345         nxt_thread_log_alert("router app hash adding failed: "
2346                              "\"%V\" is already in hash", &lhq.key);
2347         /* Fall through. */
2348     default:
2349         return NXT_ERROR;
2350     }
2351 }
2352 
2353 
2354 static nxt_app_t *
2355 nxt_router_apps_hash_get(nxt_router_conf_t *rtcf, nxt_str_t *name)
2356 {
2357     nxt_lvlhsh_query_t  lhq;
2358 
2359     lhq.key_hash = nxt_djb_hash(name->start, name->length);
2360     lhq.key = *name;
2361     lhq.proto = &nxt_router_apps_hash_proto;
2362 
2363     if (nxt_lvlhsh_find(&rtcf->apps_hash, &lhq) != NXT_OK) {
2364         return NULL;
2365     }
2366 
2367     return lhq.value;
2368 }
2369 
2370 
2371 static void
2372 nxt_router_apps_hash_use(nxt_task_t *task, nxt_router_conf_t *rtcf, int i)
2373 {
2374     nxt_app_t          *app;
2375     nxt_lvlhsh_each_t  lhe;
2376 
2377     nxt_lvlhsh_each_init(&lhe, &nxt_router_apps_hash_proto);
2378 
2379     for ( ;; ) {
2380         app = nxt_lvlhsh_each(&rtcf->apps_hash, &lhe);
2381 
2382         if (app == NULL) {
2383             break;
2384         }
2385 
2386         nxt_router_app_use(task, app, i);
2387     }
2388 }
2389 
2390 
2391 typedef struct {
2392     nxt_app_t  *app;
2393     nxt_int_t  target;
2394 } nxt_http_app_conf_t;
2395 
2396 
2397 nxt_int_t
2398 nxt_router_application_init(nxt_router_conf_t *rtcf, nxt_str_t *name,
2399     nxt_str_t *target, nxt_http_action_t *action)
2400 {
2401     nxt_app_t            *app;
2402     nxt_str_t            *targets;
2403     nxt_uint_t           i;
2404     nxt_http_app_conf_t  *conf;
2405 
2406     app = nxt_router_apps_hash_get(rtcf, name);
2407     if (app == NULL) {
2408         return NXT_DECLINED;
2409     }
2410 
2411     conf = nxt_mp_get(rtcf->mem_pool, sizeof(nxt_http_app_conf_t));
2412     if (nxt_slow_path(conf == NULL)) {
2413         return NXT_ERROR;
2414     }
2415 
2416     action->handler = nxt_http_application_handler;
2417     action->u.conf = conf;
2418 
2419     conf->app = app;
2420 
2421     if (target != NULL && target->length != 0) {
2422         targets = app->targets;
2423 
2424         for (i = 0; !nxt_strstr_eq(target, &targets[i]); i++);
2425 
2426         conf->target = i;
2427 
2428     } else {
2429         conf->target = 0;
2430     }
2431 
2432     return NXT_OK;
2433 }
2434 
2435 
2436 static nxt_socket_conf_t *
2437 nxt_router_socket_conf(nxt_task_t *task, nxt_router_temp_conf_t *tmcf,
2438     nxt_str_t *name)
2439 {
2440     size_t               size;
2441     nxt_int_t            ret;
2442     nxt_bool_t           wildcard;
2443     nxt_sockaddr_t       *sa;
2444     nxt_socket_conf_t    *skcf;
2445     nxt_listen_socket_t  *ls;
2446 
2447     sa = nxt_sockaddr_parse(tmcf->mem_pool, name);
2448     if (nxt_slow_path(sa == NULL)) {
2449         nxt_alert(task, "invalid listener \"%V\"", name);
2450         return NULL;
2451     }
2452 
2453     sa->type = SOCK_STREAM;
2454 
2455     nxt_debug(task, "router listener: \"%*s\"",
2456               (size_t) sa->length, nxt_sockaddr_start(sa));
2457 
2458     skcf = nxt_mp_zget(tmcf->router_conf->mem_pool, sizeof(nxt_socket_conf_t));
2459     if (nxt_slow_path(skcf == NULL)) {
2460         return NULL;
2461     }
2462 
2463     size = nxt_sockaddr_size(sa);
2464 
2465     ret = nxt_router_listen_socket_find(tmcf, skcf, sa);
2466 
2467     if (ret != NXT_OK) {
2468 
2469         ls = nxt_zalloc(sizeof(nxt_listen_socket_t) + size);
2470         if (nxt_slow_path(ls == NULL)) {
2471             return NULL;
2472         }
2473 
2474         skcf->listen = ls;
2475 
2476         ls->sockaddr = nxt_pointer_to(ls, sizeof(nxt_listen_socket_t));
2477         nxt_memcpy(ls->sockaddr, sa, size);
2478 
2479         nxt_listen_socket_remote_size(ls);
2480 
2481         ls->socket = -1;
2482         ls->backlog = NXT_LISTEN_BACKLOG;
2483         ls->flags = NXT_NONBLOCK;
2484         ls->read_after_accept = 1;
2485     }
2486 
2487     switch (sa->u.sockaddr.sa_family) {
2488 #if (NXT_HAVE_UNIX_DOMAIN)
2489     case AF_UNIX:
2490         wildcard = 0;
2491         break;
2492 #endif
2493 #if (NXT_INET6)
2494     case AF_INET6:
2495         wildcard = IN6_IS_ADDR_UNSPECIFIED(&sa->u.sockaddr_in6.sin6_addr);
2496         break;
2497 #endif
2498     case AF_INET:
2499     default:
2500         wildcard = (sa->u.sockaddr_in.sin_addr.s_addr == INADDR_ANY);
2501         break;
2502     }
2503 
2504     if (!wildcard) {
2505         skcf->sockaddr = nxt_mp_zget(tmcf->router_conf->mem_pool, size);
2506         if (nxt_slow_path(skcf->sockaddr == NULL)) {
2507             return NULL;
2508         }
2509 
2510         nxt_memcpy(skcf->sockaddr, sa, size);
2511     }
2512 
2513     return skcf;
2514 }
2515 
2516 
2517 static nxt_int_t
2518 nxt_router_listen_socket_find(nxt_router_temp_conf_t *tmcf,
2519     nxt_socket_conf_t *nskcf, nxt_sockaddr_t *sa)
2520 {
2521     nxt_router_t       *router;
2522     nxt_queue_link_t   *qlk;
2523     nxt_socket_conf_t  *skcf;
2524 
2525     router = tmcf->router_conf->router;
2526 
2527     for (qlk = nxt_queue_first(&router->sockets);
2528          qlk != nxt_queue_tail(&router->sockets);
2529          qlk = nxt_queue_next(qlk))
2530     {
2531         skcf = nxt_queue_link_data(qlk, nxt_socket_conf_t, link);
2532 
2533         if (nxt_sockaddr_cmp(skcf->listen->sockaddr, sa)) {
2534             nskcf->listen = skcf->listen;
2535 
2536             nxt_queue_remove(qlk);
2537             nxt_queue_insert_tail(&keeping_sockets, qlk);
2538 
2539             nxt_queue_insert_tail(&updating_sockets, &nskcf->link);
2540 
2541             return NXT_OK;
2542         }
2543     }
2544 
2545     nxt_queue_insert_tail(&pending_sockets, &nskcf->link);
2546 
2547     return NXT_DECLINED;
2548 }
2549 
2550 
2551 static void
2552 nxt_router_listen_socket_rpc_create(nxt_task_t *task,
2553     nxt_router_temp_conf_t *tmcf, nxt_socket_conf_t *skcf)
2554 {
2555     size_t            size;
2556     uint32_t          stream;
2557     nxt_int_t         ret;
2558     nxt_buf_t         *b;
2559     nxt_port_t        *main_port, *router_port;
2560     nxt_runtime_t     *rt;
2561     nxt_socket_rpc_t  *rpc;
2562 
2563     rpc = nxt_mp_alloc(tmcf->mem_pool, sizeof(nxt_socket_rpc_t));
2564     if (rpc == NULL) {
2565         goto fail;
2566     }
2567 
2568     rpc->socket_conf = skcf;
2569     rpc->temp_conf = tmcf;
2570 
2571     size = nxt_sockaddr_size(skcf->listen->sockaddr);
2572 
2573     b = nxt_buf_mem_alloc(tmcf->mem_pool, size, 0);
2574     if (b == NULL) {
2575         goto fail;
2576     }
2577 
2578     b->completion_handler = nxt_buf_dummy_completion;
2579 
2580     b->mem.free = nxt_cpymem(b->mem.free, skcf->listen->sockaddr, size);
2581 
2582     rt = task->thread->runtime;
2583     main_port = rt->port_by_type[NXT_PROCESS_MAIN];
2584     router_port = rt->port_by_type[NXT_PROCESS_ROUTER];
2585 
2586     stream = nxt_port_rpc_register_handler(task, router_port,
2587                                            nxt_router_listen_socket_ready,
2588                                            nxt_router_listen_socket_error,
2589                                            main_port->pid, rpc);
2590     if (nxt_slow_path(stream == 0)) {
2591         goto fail;
2592     }
2593 
2594     ret = nxt_port_socket_write(task, main_port, NXT_PORT_MSG_SOCKET, -1,
2595                                 stream, router_port->id, b);
2596 
2597     if (nxt_slow_path(ret != NXT_OK)) {
2598         nxt_port_rpc_cancel(task, router_port, stream);
2599         goto fail;
2600     }
2601 
2602     return;
2603 
2604 fail:
2605 
2606     nxt_router_conf_error(task, tmcf);
2607 }
2608 
2609 
2610 static void
2611 nxt_router_listen_socket_ready(nxt_task_t *task, nxt_port_recv_msg_t *msg,
2612     void *data)
2613 {
2614     nxt_int_t         ret;
2615     nxt_socket_t      s;
2616     nxt_socket_rpc_t  *rpc;
2617 
2618     rpc = data;
2619 
2620     s = msg->fd[0];
2621 
2622     ret = nxt_socket_nonblocking(task, s);
2623     if (nxt_slow_path(ret != NXT_OK)) {
2624         goto fail;
2625     }
2626 
2627     nxt_socket_defer_accept(task, s, rpc->socket_conf->listen->sockaddr);
2628 
2629     ret = nxt_listen_socket(task, s, NXT_LISTEN_BACKLOG);
2630     if (nxt_slow_path(ret != NXT_OK)) {
2631         goto fail;
2632     }
2633 
2634     rpc->socket_conf->listen->socket = s;
2635 
2636     nxt_work_queue_add(&task->thread->engine->fast_work_queue,
2637                        nxt_router_conf_apply, task, rpc->temp_conf, NULL);
2638 
2639     return;
2640 
2641 fail:
2642 
2643     nxt_socket_close(task, s);
2644 
2645     nxt_router_conf_error(task, rpc->temp_conf);
2646 }
2647 
2648 
2649 static void
2650 nxt_router_listen_socket_error(nxt_task_t *task, nxt_port_recv_msg_t *msg,
2651     void *data)
2652 {
2653     nxt_socket_rpc_t        *rpc;
2654     nxt_router_temp_conf_t  *tmcf;
2655 
2656     rpc = data;
2657     tmcf = rpc->temp_conf;
2658 
2659 #if 0
2660     u_char                  *p;
2661     size_t                  size;
2662     uint8_t                 error;
2663     nxt_buf_t               *in, *out;
2664     nxt_sockaddr_t          *sa;
2665 
2666     static nxt_str_t  socket_errors[] = {
2667         nxt_string("ListenerSystem"),
2668         nxt_string("ListenerNoIPv6"),
2669         nxt_string("ListenerPort"),
2670         nxt_string("ListenerInUse"),
2671         nxt_string("ListenerNoAddress"),
2672         nxt_string("ListenerNoAccess"),
2673         nxt_string("ListenerPath"),
2674     };
2675 
2676     sa = rpc->socket_conf->listen->sockaddr;
2677 
2678     in = nxt_buf_chk_make_plain(tmcf->mem_pool, msg->buf, msg->size);
2679 
2680     if (nxt_slow_path(in == NULL)) {
2681         return;
2682     }
2683 
2684     p = in->mem.pos;
2685 
2686     error = *p++;
2687 
2688     size = nxt_length("listen socket error: ")
2689            + nxt_length("{listener: \"\", code:\"\", message: \"\"}")
2690            + sa->length + socket_errors[error].length + (in->mem.free - p);
2691 
2692     out = nxt_buf_mem_alloc(tmcf->mem_pool, size, 0);
2693     if (nxt_slow_path(out == NULL)) {
2694         return;
2695     }
2696 
2697     out->mem.free = nxt_sprintf(out->mem.free, out->mem.end,
2698                         "listen socket error: "
2699                         "{listener: \"%*s\", code:\"%V\", message: \"%*s\"}",
2700                         (size_t) sa->length, nxt_sockaddr_start(sa),
2701                         &socket_errors[error], in->mem.free - p, p);
2702 
2703     nxt_debug(task, "%*s", out->mem.free - out->mem.pos, out->mem.pos);
2704 #endif
2705 
2706     nxt_router_conf_error(task, tmcf);
2707 }
2708 
2709 
2710 #if (NXT_TLS)
2711 
2712 static void
2713 nxt_router_tls_rpc_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg,
2714     void *data)
2715 {
2716     nxt_mp_t                *mp;
2717     nxt_int_t               ret;
2718     nxt_tls_conf_t          *tlscf;
2719     nxt_router_tlssock_t    *tls;
2720     nxt_tls_bundle_conf_t   *bundle;
2721     nxt_router_temp_conf_t  *tmcf;
2722 
2723     nxt_debug(task, "tls rpc handler");
2724 
2725     tls = data;
2726     tmcf = tls->temp_conf;
2727 
2728     if (msg == NULL || msg->port_msg.type == _NXT_PORT_MSG_RPC_ERROR) {
2729         goto fail;
2730     }
2731 
2732     mp = tmcf->router_conf->mem_pool;
2733 
2734     if (tls->socket_conf->tls == NULL){
2735         tlscf = nxt_mp_zget(mp, sizeof(nxt_tls_conf_t));
2736         if (nxt_slow_path(tlscf == NULL)) {
2737             goto fail;
2738         }
2739 
2740         tlscf->no_wait_shutdown = 1;
2741         tls->socket_conf->tls = tlscf;
2742 
2743     } else {
2744         tlscf = tls->socket_conf->tls;
2745     }
2746 
2747     tls->tls_init->conf = tlscf;
2748 
2749     bundle = nxt_mp_get(mp, sizeof(nxt_tls_bundle_conf_t));
2750     if (nxt_slow_path(bundle == NULL)) {
2751         goto fail;
2752     }
2753 
2754     if (nxt_slow_path(nxt_str_dup(mp, &bundle->name, &tls->name) == NULL)) {
2755         goto fail;
2756     }
2757 
2758     bundle->chain_file = msg->fd[0];
2759     bundle->next = tlscf->bundle;
2760     tlscf->bundle = bundle;
2761 
2762     ret = task->thread->runtime->tls->server_init(task, mp, tls->tls_init,
2763                                                   tls->last);
2764     if (nxt_slow_path(ret != NXT_OK)) {
2765         goto fail;
2766     }
2767 
2768     nxt_work_queue_add(&task->thread->engine->fast_work_queue,
2769                        nxt_router_conf_apply, task, tmcf, NULL);
2770     return;
2771 
2772 fail:
2773 
2774     nxt_router_conf_error(task, tmcf);
2775 }
2776 
2777 #endif
2778 
2779 
2780 static void
2781 nxt_router_app_rpc_create(nxt_task_t *task,
2782     nxt_router_temp_conf_t *tmcf, nxt_app_t *app)
2783 {
2784     size_t         size;
2785     uint32_t       stream;
2786     nxt_fd_t       port_fd, queue_fd;
2787     nxt_int_t      ret;
2788     nxt_buf_t      *b;
2789     nxt_port_t     *router_port, *dport;
2790     nxt_runtime_t  *rt;
2791     nxt_app_rpc_t  *rpc;
2792 
2793     rt = task->thread->runtime;
2794 
2795     dport = app->proto_port;
2796 
2797     if (dport == NULL) {
2798         nxt_debug(task, "app '%V' prototype prefork", &app->name);
2799 
2800         size = app->name.length + 1 + app->conf.length;
2801 
2802         b = nxt_buf_mem_alloc(tmcf->mem_pool, size, 0);
2803         if (nxt_slow_path(b == NULL)) {
2804             goto fail;
2805         }
2806 
2807         b->completion_handler = nxt_buf_dummy_completion;
2808 
2809         nxt_buf_cpystr(b, &app->name);
2810         *b->mem.free++ = '\0';
2811         nxt_buf_cpystr(b, &app->conf);
2812 
2813         dport = rt->port_by_type[NXT_PROCESS_MAIN];
2814 
2815         port_fd = app->shared_port->pair[0];
2816         queue_fd = app->shared_port->queue_fd;
2817 
2818     } else {
2819         nxt_debug(task, "app '%V' prefork", &app->name);
2820 
2821         b = NULL;
2822         port_fd = -1;
2823         queue_fd = -1;
2824     }
2825 
2826     router_port = rt->port_by_type[NXT_PROCESS_ROUTER];
2827 
2828     rpc = nxt_port_rpc_register_handler_ex(task, router_port,
2829                                            nxt_router_app_prefork_ready,
2830                                            nxt_router_app_prefork_error,
2831                                            sizeof(nxt_app_rpc_t));
2832     if (nxt_slow_path(rpc == NULL)) {
2833         goto fail;
2834     }
2835 
2836     rpc->app = app;
2837     rpc->temp_conf = tmcf;
2838     rpc->proto = (b != NULL);
2839 
2840     stream = nxt_port_rpc_ex_stream(rpc);
2841 
2842     ret = nxt_port_socket_write2(task, dport, NXT_PORT_MSG_START_PROCESS,
2843                                  port_fd, queue_fd, stream, router_port->id, b);
2844     if (nxt_slow_path(ret != NXT_OK)) {
2845         nxt_port_rpc_cancel(task, router_port, stream);
2846         goto fail;
2847     }
2848 
2849     if (b == NULL) {
2850         nxt_port_rpc_ex_set_peer(task, router_port, rpc, dport->pid);
2851 
2852         app->pending_processes++;
2853     }
2854 
2855     return;
2856 
2857 fail:
2858 
2859     nxt_router_conf_error(task, tmcf);
2860 }
2861 
2862 
2863 static void
2864 nxt_router_app_prefork_ready(nxt_task_t *task, nxt_port_recv_msg_t *msg,
2865     void *data)
2866 {
2867     nxt_app_t           *app;
2868     nxt_port_t          *port;
2869     nxt_app_rpc_t       *rpc;
2870     nxt_event_engine_t  *engine;
2871 
2872     rpc = data;
2873     app = rpc->app;
2874 
2875     port = msg->u.new_port;
2876 
2877     nxt_assert(port != NULL);
2878     nxt_assert(port->id == 0);
2879 
2880     if (rpc->proto) {
2881         nxt_assert(app->proto_port == NULL);
2882         nxt_assert(port->type == NXT_PROCESS_PROTOTYPE);
2883 
2884         nxt_port_inc_use(port);
2885 
2886         app->proto_port = port;
2887         port->app = app;
2888 
2889         nxt_router_app_rpc_create(task, rpc->temp_conf, app);
2890 
2891         return;
2892     }
2893 
2894     nxt_assert(port->type == NXT_PROCESS_APP);
2895 
2896     port->app = app;
2897     port->main_app_port = port;
2898 
2899     app->pending_processes--;
2900     app->processes++;
2901     app->idle_processes++;
2902 
2903     engine = task->thread->engine;
2904 
2905     nxt_queue_insert_tail(&app->ports, &port->app_link);
2906     nxt_queue_insert_tail(&app->spare_ports, &port->idle_link);
2907 
2908     nxt_debug(task, "app '%V' move new port %PI:%d to spare_ports",
2909               &app->name, port->pid, port->id);
2910 
2911     nxt_port_hash_add(&app->port_hash, port);
2912     app->port_hash_count++;
2913 
2914     port->idle_start = 0;
2915 
2916     nxt_port_inc_use(port);
2917 
2918     nxt_port_socket_write(task, port, NXT_PORT_MSG_PORT_ACK, -1, 0, 0, NULL);
2919 
2920     nxt_work_queue_add(&engine->fast_work_queue,
2921                        nxt_router_conf_apply, task, rpc->temp_conf, NULL);
2922 }
2923 
2924 
2925 static void
2926 nxt_router_app_prefork_error(nxt_task_t *task, nxt_port_recv_msg_t *msg,
2927     void *data)
2928 {
2929     nxt_app_t               *app;
2930     nxt_app_rpc_t           *rpc;
2931     nxt_router_temp_conf_t  *tmcf;
2932 
2933     rpc = data;
2934     app = rpc->app;
2935     tmcf = rpc->temp_conf;
2936 
2937     if (rpc->proto) {
2938         nxt_log(task, NXT_LOG_WARN, "failed to start prototype \"%V\"",
2939                 &app->name);
2940 
2941     } else {
2942         nxt_log(task, NXT_LOG_WARN, "failed to start application \"%V\"",
2943                 &app->name);
2944 
2945         app->pending_processes--;
2946     }
2947 
2948     nxt_router_conf_error(task, tmcf);
2949 }
2950 
2951 
2952 static nxt_int_t
2953 nxt_router_engines_create(nxt_task_t *task, nxt_router_t *router,
2954     nxt_router_temp_conf_t *tmcf, const nxt_event_interface_t *interface)
2955 {
2956     nxt_int_t                 ret;
2957     nxt_uint_t                n, threads;
2958     nxt_queue_link_t          *qlk;
2959     nxt_router_engine_conf_t  *recf;
2960 
2961     threads = tmcf->router_conf->threads;
2962 
2963     tmcf->engines = nxt_array_create(tmcf->mem_pool, threads,
2964                                      sizeof(nxt_router_engine_conf_t));
2965     if (nxt_slow_path(tmcf->engines == NULL)) {
2966         return NXT_ERROR;
2967     }
2968 
2969     n = 0;
2970 
2971     for (qlk = nxt_queue_first(&router->engines);
2972          qlk != nxt_queue_tail(&router->engines);
2973          qlk = nxt_queue_next(qlk))
2974     {
2975         recf = nxt_array_zero_add(tmcf->engines);
2976         if (nxt_slow_path(recf == NULL)) {
2977             return NXT_ERROR;
2978         }
2979 
2980         recf->engine = nxt_queue_link_data(qlk, nxt_event_engine_t, link0);
2981 
2982         if (n < threads) {
2983             recf->action = NXT_ROUTER_ENGINE_KEEP;
2984             ret = nxt_router_engine_conf_update(tmcf, recf);
2985 
2986         } else {
2987             recf->action = NXT_ROUTER_ENGINE_DELETE;
2988             ret = nxt_router_engine_conf_delete(tmcf, recf);
2989         }
2990 
2991         if (nxt_slow_path(ret != NXT_OK)) {
2992             return ret;
2993         }
2994 
2995         n++;
2996     }
2997 
2998     tmcf->new_threads = n;
2999 
3000     while (n < threads) {
3001         recf = nxt_array_zero_add(tmcf->engines);
3002         if (nxt_slow_path(recf == NULL)) {
3003             return NXT_ERROR;
3004         }
3005 
3006         recf->action = NXT_ROUTER_ENGINE_ADD;
3007 
3008         recf->engine = nxt_event_engine_create(task, interface, NULL, 0, 0);
3009         if (nxt_slow_path(recf->engine == NULL)) {
3010             return NXT_ERROR;
3011         }
3012 
3013         ret = nxt_router_engine_conf_create(tmcf, recf);
3014         if (nxt_slow_path(ret != NXT_OK)) {
3015             return ret;
3016         }
3017 
3018         n++;
3019     }
3020 
3021     return NXT_OK;
3022 }
3023 
3024 
3025 static nxt_int_t
3026 nxt_router_engine_conf_create(nxt_router_temp_conf_t *tmcf,
3027     nxt_router_engine_conf_t *recf)
3028 {
3029     nxt_int_t  ret;
3030 
3031     ret = nxt_router_engine_joints_create(tmcf, recf, &creating_sockets,
3032                                           nxt_router_listen_socket_create);
3033     if (nxt_slow_path(ret != NXT_OK)) {
3034         return ret;
3035     }
3036 
3037     ret = nxt_router_engine_joints_create(tmcf, recf, &updating_sockets,
3038                                           nxt_router_listen_socket_create);
3039     if (nxt_slow_path(ret != NXT_OK)) {
3040         return ret;
3041     }
3042 
3043     return ret;
3044 }
3045 
3046 
3047 static nxt_int_t
3048 nxt_router_engine_conf_update(nxt_router_temp_conf_t *tmcf,
3049     nxt_router_engine_conf_t *recf)
3050 {
3051     nxt_int_t  ret;
3052 
3053     ret = nxt_router_engine_joints_create(tmcf, recf, &creating_sockets,
3054                                           nxt_router_listen_socket_create);
3055     if (nxt_slow_path(ret != NXT_OK)) {
3056         return ret;
3057     }
3058 
3059     ret = nxt_router_engine_joints_create(tmcf, recf, &updating_sockets,
3060                                           nxt_router_listen_socket_update);
3061     if (nxt_slow_path(ret != NXT_OK)) {
3062         return ret;
3063     }
3064 
3065     ret = nxt_router_engine_joints_delete(tmcf, recf, &deleting_sockets);
3066     if (nxt_slow_path(ret != NXT_OK)) {
3067         return ret;
3068     }
3069 
3070     return ret;
3071 }
3072 
3073 
3074 static nxt_int_t
3075 nxt_router_engine_conf_delete(nxt_router_temp_conf_t *tmcf,
3076     nxt_router_engine_conf_t *recf)
3077 {
3078     nxt_int_t  ret;
3079 
3080     ret = nxt_router_engine_quit(tmcf, recf);
3081     if (nxt_slow_path(ret != NXT_OK)) {
3082         return ret;
3083     }
3084 
3085     ret = nxt_router_engine_joints_delete(tmcf, recf, &updating_sockets);
3086     if (nxt_slow_path(ret != NXT_OK)) {
3087         return ret;
3088     }
3089 
3090     return nxt_router_engine_joints_delete(tmcf, recf, &deleting_sockets);
3091 }
3092 
3093 
3094 static nxt_int_t
3095 nxt_router_engine_joints_create(nxt_router_temp_conf_t *tmcf,
3096     nxt_router_engine_conf_t *recf, nxt_queue_t *sockets,
3097     nxt_work_handler_t handler)
3098 {
3099     nxt_int_t                ret;
3100     nxt_joint_job_t          *job;
3101     nxt_queue_link_t         *qlk;
3102     nxt_socket_conf_t        *skcf;
3103     nxt_socket_conf_joint_t  *joint;
3104 
3105     for (qlk = nxt_queue_first(sockets);
3106          qlk != nxt_queue_tail(sockets);
3107          qlk = nxt_queue_next(qlk))
3108     {
3109         job = nxt_mp_get(tmcf->mem_pool, sizeof(nxt_joint_job_t));
3110         if (nxt_slow_path(job == NULL)) {
3111             return NXT_ERROR;
3112         }
3113 
3114         job->work.next = recf->jobs;
3115         recf->jobs = &job->work;
3116 
3117         job->task = tmcf->engine->task;
3118         job->work.handler = handler;
3119         job->work.task = &job->task;
3120         job->work.obj = job;
3121         job->tmcf = tmcf;
3122 
3123         tmcf->count++;
3124 
3125         joint = nxt_mp_alloc(tmcf->router_conf->mem_pool,
3126                              sizeof(nxt_socket_conf_joint_t));
3127         if (nxt_slow_path(joint == NULL)) {
3128             return NXT_ERROR;
3129         }
3130 
3131         job->work.data = joint;
3132 
3133         ret = nxt_upstreams_joint_create(tmcf, &joint->upstreams);
3134         if (nxt_slow_path(ret != NXT_OK)) {
3135             return ret;
3136         }
3137 
3138         joint->count = 1;
3139 
3140         skcf = nxt_queue_link_data(qlk, nxt_socket_conf_t, link);
3141         skcf->count++;
3142         joint->socket_conf = skcf;
3143 
3144         joint->engine = recf->engine;
3145     }
3146 
3147     return NXT_OK;
3148 }
3149 
3150 
3151 static nxt_int_t
3152 nxt_router_engine_quit(nxt_router_temp_conf_t *tmcf,
3153     nxt_router_engine_conf_t *recf)
3154 {
3155     nxt_joint_job_t  *job;
3156 
3157     job = nxt_mp_get(tmcf->mem_pool, sizeof(nxt_joint_job_t));
3158     if (nxt_slow_path(job == NULL)) {
3159         return NXT_ERROR;
3160     }
3161 
3162     job->work.next = recf->jobs;
3163     recf->jobs = &job->work;
3164 
3165     job->task = tmcf->engine->task;
3166     job->work.handler = nxt_router_worker_thread_quit;
3167     job->work.task = &job->