xref: /unit/src/nxt_router.c (revision 2014:f8a0992944df)
1 
2 /*
3  * Copyright (C) Igor Sysoev
4  * Copyright (C) Valentin V. Bartenev
5  * Copyright (C) NGINX, Inc.
6  */
7 
8 #include <nxt_router.h>
9 #include <nxt_conf.h>
10 #if (NXT_TLS)
11 #include <nxt_cert.h>
12 #endif
13 #include <nxt_http.h>
14 #include <nxt_port_memory_int.h>
15 #include <nxt_unit_request.h>
16 #include <nxt_unit_response.h>
17 #include <nxt_router_request.h>
18 #include <nxt_app_queue.h>
19 #include <nxt_port_queue.h>
20 
21 #define NXT_SHARED_PORT_ID  0xFFFFu
22 
23 typedef struct {
24     nxt_str_t         type;
25     uint32_t          processes;
26     uint32_t          max_processes;
27     uint32_t          spare_processes;
28     nxt_msec_t        timeout;
29     nxt_msec_t        idle_timeout;
30     nxt_conf_value_t  *limits_value;
31     nxt_conf_value_t  *processes_value;
32     nxt_conf_value_t  *targets_value;
33 } nxt_router_app_conf_t;
34 
35 
36 typedef struct {
37     nxt_str_t         pass;
38     nxt_str_t         application;
39 } nxt_router_listener_conf_t;
40 
41 
42 #if (NXT_TLS)
43 
44 typedef struct {
45     nxt_str_t               name;
46     nxt_socket_conf_t       *socket_conf;
47     nxt_router_temp_conf_t  *temp_conf;
48     nxt_tls_init_t          *tls_init;
49     nxt_bool_t              last;
50 
51     nxt_queue_link_t        link;  /* for nxt_socket_conf_t.tls */
52 } nxt_router_tlssock_t;
53 
54 #endif
55 
56 
57 typedef struct {
58     nxt_str_t               *name;
59     nxt_socket_conf_t       *socket_conf;
60     nxt_router_temp_conf_t  *temp_conf;
61     nxt_bool_t              last;
62 } nxt_socket_rpc_t;
63 
64 
65 typedef struct {
66     nxt_app_t               *app;
67     nxt_router_temp_conf_t  *temp_conf;
68     uint8_t                 proto;  /* 1 bit */
69 } nxt_app_rpc_t;
70 
71 
72 typedef struct {
73     nxt_app_joint_t         *app_joint;
74     uint32_t                generation;
75     uint8_t                 proto;  /* 1 bit */
76 } nxt_app_joint_rpc_t;
77 
78 
79 static nxt_int_t nxt_router_prefork(nxt_task_t *task, nxt_process_t *process,
80     nxt_mp_t *mp);
81 static nxt_int_t nxt_router_start(nxt_task_t *task, nxt_process_data_t *data);
82 static void nxt_router_greet_controller(nxt_task_t *task,
83     nxt_port_t *controller_port);
84 
85 static nxt_int_t nxt_router_start_app_process(nxt_task_t *task, nxt_app_t *app);
86 
87 static void nxt_router_new_port_handler(nxt_task_t *task,
88     nxt_port_recv_msg_t *msg);
89 static void nxt_router_conf_data_handler(nxt_task_t *task,
90     nxt_port_recv_msg_t *msg);
91 static void nxt_router_app_restart_handler(nxt_task_t *task,
92     nxt_port_recv_msg_t *msg);
93 static void nxt_router_remove_pid_handler(nxt_task_t *task,
94     nxt_port_recv_msg_t *msg);
95 static void nxt_router_access_log_reopen_handler(nxt_task_t *task,
96     nxt_port_recv_msg_t *msg);
97 
98 static nxt_router_temp_conf_t *nxt_router_temp_conf(nxt_task_t *task);
99 static void nxt_router_conf_apply(nxt_task_t *task, void *obj, void *data);
100 static void nxt_router_conf_ready(nxt_task_t *task,
101     nxt_router_temp_conf_t *tmcf);
102 static void nxt_router_conf_error(nxt_task_t *task,
103     nxt_router_temp_conf_t *tmcf);
104 static void nxt_router_conf_send(nxt_task_t *task,
105     nxt_router_temp_conf_t *tmcf, nxt_port_msg_type_t type);
106 
107 static nxt_int_t nxt_router_conf_create(nxt_task_t *task,
108     nxt_router_temp_conf_t *tmcf, u_char *start, u_char *end);
109 static nxt_int_t nxt_router_conf_process_static(nxt_task_t *task,
110     nxt_router_conf_t *rtcf, nxt_conf_value_t *conf);
111 static nxt_int_t nxt_router_conf_process_client_ip(nxt_task_t *task,
112     nxt_router_temp_conf_t *tmcf, nxt_socket_conf_t *skcf,
113     nxt_conf_value_t *conf);
114 
115 static nxt_app_t *nxt_router_app_find(nxt_queue_t *queue, nxt_str_t *name);
116 static nxt_int_t nxt_router_apps_hash_test(nxt_lvlhsh_query_t *lhq, void *data);
117 static nxt_int_t nxt_router_apps_hash_add(nxt_router_conf_t *rtcf,
118     nxt_app_t *app);
119 static nxt_app_t *nxt_router_apps_hash_get(nxt_router_conf_t *rtcf,
120     nxt_str_t *name);
121 static void nxt_router_apps_hash_use(nxt_task_t *task, nxt_router_conf_t *rtcf,
122     int i);
123 
124 static nxt_int_t nxt_router_app_queue_init(nxt_task_t *task,
125     nxt_port_t *port);
126 static nxt_int_t nxt_router_port_queue_init(nxt_task_t *task,
127     nxt_port_t *port);
128 static nxt_int_t nxt_router_port_queue_map(nxt_task_t *task,
129     nxt_port_t *port, nxt_fd_t fd);
130 static void nxt_router_listen_socket_rpc_create(nxt_task_t *task,
131     nxt_router_temp_conf_t *tmcf, nxt_socket_conf_t *skcf);
132 static void nxt_router_listen_socket_ready(nxt_task_t *task,
133     nxt_port_recv_msg_t *msg, void *data);
134 static void nxt_router_listen_socket_error(nxt_task_t *task,
135     nxt_port_recv_msg_t *msg, void *data);
136 #if (NXT_TLS)
137 static void nxt_router_tls_rpc_handler(nxt_task_t *task,
138     nxt_port_recv_msg_t *msg, void *data);
139 static nxt_int_t nxt_router_conf_tls_insert(nxt_router_temp_conf_t *tmcf,
140     nxt_conf_value_t *value, nxt_socket_conf_t *skcf, nxt_tls_init_t *tls_init,
141     nxt_bool_t last);
142 #endif
143 static void nxt_router_app_rpc_create(nxt_task_t *task,
144     nxt_router_temp_conf_t *tmcf, nxt_app_t *app);
145 static void nxt_router_app_prefork_ready(nxt_task_t *task,
146     nxt_port_recv_msg_t *msg, void *data);
147 static void nxt_router_app_prefork_error(nxt_task_t *task,
148     nxt_port_recv_msg_t *msg, void *data);
149 static nxt_socket_conf_t *nxt_router_socket_conf(nxt_task_t *task,
150     nxt_router_temp_conf_t *tmcf, nxt_str_t *name);
151 static nxt_int_t nxt_router_listen_socket_find(nxt_router_temp_conf_t *tmcf,
152     nxt_socket_conf_t *nskcf, nxt_sockaddr_t *sa);
153 
154 static nxt_int_t nxt_router_engines_create(nxt_task_t *task,
155     nxt_router_t *router, nxt_router_temp_conf_t *tmcf,
156     const nxt_event_interface_t *interface);
157 static nxt_int_t nxt_router_engine_conf_create(nxt_router_temp_conf_t *tmcf,
158     nxt_router_engine_conf_t *recf);
159 static nxt_int_t nxt_router_engine_conf_update(nxt_router_temp_conf_t *tmcf,
160     nxt_router_engine_conf_t *recf);
161 static nxt_int_t nxt_router_engine_conf_delete(nxt_router_temp_conf_t *tmcf,
162     nxt_router_engine_conf_t *recf);
163 static nxt_int_t nxt_router_engine_joints_create(nxt_router_temp_conf_t *tmcf,
164     nxt_router_engine_conf_t *recf, nxt_queue_t *sockets,
165     nxt_work_handler_t handler);
166 static nxt_int_t nxt_router_engine_quit(nxt_router_temp_conf_t *tmcf,
167     nxt_router_engine_conf_t *recf);
168 static nxt_int_t nxt_router_engine_joints_delete(nxt_router_temp_conf_t *tmcf,
169     nxt_router_engine_conf_t *recf, nxt_queue_t *sockets);
170 
171 static nxt_int_t nxt_router_threads_create(nxt_task_t *task, nxt_runtime_t *rt,
172     nxt_router_temp_conf_t *tmcf);
173 static nxt_int_t nxt_router_thread_create(nxt_task_t *task, nxt_runtime_t *rt,
174     nxt_event_engine_t *engine);
175 static void nxt_router_apps_sort(nxt_task_t *task, nxt_router_t *router,
176     nxt_router_temp_conf_t *tmcf);
177 
178 static void nxt_router_engines_post(nxt_router_t *router,
179     nxt_router_temp_conf_t *tmcf);
180 static void nxt_router_engine_post(nxt_event_engine_t *engine,
181     nxt_work_t *jobs);
182 
183 static void nxt_router_thread_start(void *data);
184 static void nxt_router_rt_add_port(nxt_task_t *task, void *obj,
185     void *data);
186 static void nxt_router_listen_socket_create(nxt_task_t *task, void *obj,
187     void *data);
188 static void nxt_router_listen_socket_update(nxt_task_t *task, void *obj,
189     void *data);
190 static void nxt_router_listen_socket_delete(nxt_task_t *task, void *obj,
191     void *data);
192 static void nxt_router_worker_thread_quit(nxt_task_t *task, void *obj,
193     void *data);
194 static void nxt_router_listen_socket_close(nxt_task_t *task, void *obj,
195     void *data);
196 static void nxt_router_thread_exit_handler(nxt_task_t *task, void *obj,
197     void *data);
198 static void nxt_router_req_headers_ack_handler(nxt_task_t *task,
199     nxt_port_recv_msg_t *msg, nxt_request_rpc_data_t *req_rpc_data);
200 static void nxt_router_listen_socket_release(nxt_task_t *task,
201     nxt_socket_conf_t *skcf);
202 
203 static void nxt_router_access_log_writer(nxt_task_t *task,
204     nxt_http_request_t *r, nxt_router_access_log_t *access_log);
205 static u_char *nxt_router_access_log_date(u_char *buf, nxt_realtime_t *now,
206     struct tm *tm, size_t size, const char *format);
207 static void nxt_router_access_log_open(nxt_task_t *task,
208     nxt_router_temp_conf_t *tmcf);
209 static void nxt_router_access_log_ready(nxt_task_t *task,
210     nxt_port_recv_msg_t *msg, void *data);
211 static void nxt_router_access_log_error(nxt_task_t *task,
212     nxt_port_recv_msg_t *msg, void *data);
213 static void nxt_router_access_log_release(nxt_task_t *task,
214     nxt_thread_spinlock_t *lock, nxt_router_access_log_t *access_log);
215 static void nxt_router_access_log_reopen_completion(nxt_task_t *task, void *obj,
216     void *data);
217 static void nxt_router_access_log_reopen_ready(nxt_task_t *task,
218     nxt_port_recv_msg_t *msg, void *data);
219 static void nxt_router_access_log_reopen_error(nxt_task_t *task,
220     nxt_port_recv_msg_t *msg, void *data);
221 
222 static void nxt_router_app_port_ready(nxt_task_t *task,
223     nxt_port_recv_msg_t *msg, void *data);
224 static void nxt_router_app_port_error(nxt_task_t *task,
225     nxt_port_recv_msg_t *msg, void *data);
226 
227 static void nxt_router_app_use(nxt_task_t *task, nxt_app_t *app, int i);
228 static void nxt_router_app_unlink(nxt_task_t *task, nxt_app_t *app);
229 
230 static void nxt_router_app_port_release(nxt_task_t *task, nxt_app_t *app,
231     nxt_port_t *port, nxt_apr_action_t action);
232 static void nxt_router_app_port_get(nxt_task_t *task, nxt_app_t *app,
233     nxt_request_rpc_data_t *req_rpc_data);
234 static void nxt_router_http_request_error(nxt_task_t *task, void *obj,
235     void *data);
236 static void nxt_router_http_request_done(nxt_task_t *task, void *obj,
237     void *data);
238 
239 static void nxt_router_app_prepare_request(nxt_task_t *task,
240     nxt_request_rpc_data_t *req_rpc_data);
241 static nxt_buf_t *nxt_router_prepare_msg(nxt_task_t *task,
242     nxt_http_request_t *r, nxt_app_t *app, const nxt_str_t *prefix);
243 
244 static void nxt_router_app_timeout(nxt_task_t *task, void *obj, void *data);
245 static void nxt_router_adjust_idle_timer(nxt_task_t *task, void *obj,
246     void *data);
247 static void nxt_router_app_idle_timeout(nxt_task_t *task, void *obj,
248     void *data);
249 static void nxt_router_app_joint_release_handler(nxt_task_t *task, void *obj,
250     void *data);
251 static void nxt_router_free_app(nxt_task_t *task, void *obj, void *data);
252 
253 static const nxt_http_request_state_t  nxt_http_request_send_state;
254 static void nxt_http_request_send_body(nxt_task_t *task, void *obj, void *data);
255 
256 static void nxt_router_app_joint_use(nxt_task_t *task,
257     nxt_app_joint_t *app_joint, int i);
258 
259 static void nxt_router_http_request_release_post(nxt_task_t *task,
260     nxt_http_request_t *r);
261 static void nxt_router_http_request_release(nxt_task_t *task, void *obj,
262     void *data);
263 static void nxt_router_oosm_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg);
264 static void nxt_router_get_port_handler(nxt_task_t *task,
265     nxt_port_recv_msg_t *msg);
266 static void nxt_router_get_mmap_handler(nxt_task_t *task,
267     nxt_port_recv_msg_t *msg);
268 
269 extern const nxt_http_request_state_t  nxt_http_websocket;
270 
271 static nxt_router_t  *nxt_router;
272 
273 static const nxt_str_t http_prefix = nxt_string("HTTP_");
274 static const nxt_str_t empty_prefix = nxt_string("");
275 
276 static const nxt_str_t  *nxt_app_msg_prefix[] = {
277     &empty_prefix,
278     &empty_prefix,
279     &http_prefix,
280     &http_prefix,
281     &http_prefix,
282     &empty_prefix,
283 };
284 
285 
286 static const nxt_port_handlers_t  nxt_router_process_port_handlers = {
287     .quit         = nxt_signal_quit_handler,
288     .new_port     = nxt_router_new_port_handler,
289     .get_port     = nxt_router_get_port_handler,
290     .change_file  = nxt_port_change_log_file_handler,
291     .mmap         = nxt_port_mmap_handler,
292     .get_mmap     = nxt_router_get_mmap_handler,
293     .data         = nxt_router_conf_data_handler,
294     .app_restart  = nxt_router_app_restart_handler,
295     .remove_pid   = nxt_router_remove_pid_handler,
296     .access_log   = nxt_router_access_log_reopen_handler,
297     .rpc_ready    = nxt_port_rpc_handler,
298     .rpc_error    = nxt_port_rpc_handler,
299     .oosm         = nxt_router_oosm_handler,
300 };
301 
302 
303 const nxt_process_init_t  nxt_router_process = {
304     .name           = "router",
305     .type           = NXT_PROCESS_ROUTER,
306     .prefork        = nxt_router_prefork,
307     .restart        = 1,
308     .setup          = nxt_process_core_setup,
309     .start          = nxt_router_start,
310     .port_handlers  = &nxt_router_process_port_handlers,
311     .signals        = nxt_process_signals,
312 };
313 
314 
315 /* Queues of nxt_socket_conf_t */
316 nxt_queue_t  creating_sockets;
317 nxt_queue_t  pending_sockets;
318 nxt_queue_t  updating_sockets;
319 nxt_queue_t  keeping_sockets;
320 nxt_queue_t  deleting_sockets;
321 
322 
323 static nxt_int_t
324 nxt_router_prefork(nxt_task_t *task, nxt_process_t *process, nxt_mp_t *mp)
325 {
326     nxt_runtime_stop_app_processes(task, task->thread->runtime);
327 
328     return NXT_OK;
329 }
330 
331 
332 static nxt_int_t
333 nxt_router_start(nxt_task_t *task, nxt_process_data_t *data)
334 {
335     nxt_int_t      ret;
336     nxt_port_t     *controller_port;
337     nxt_router_t   *router;
338     nxt_runtime_t  *rt;
339 
340     rt = task->thread->runtime;
341 
342     nxt_log(task, NXT_LOG_INFO, "router started");
343 
344 #if (NXT_TLS)
345     rt->tls = nxt_service_get(rt->services, "SSL/TLS", "OpenSSL");
346     if (nxt_slow_path(rt->tls == NULL)) {
347         return NXT_ERROR;
348     }
349 
350     ret = rt->tls->library_init(task);
351     if (nxt_slow_path(ret != NXT_OK)) {
352         return ret;
353     }
354 #endif
355 
356     ret = nxt_http_init(task);
357     if (nxt_slow_path(ret != NXT_OK)) {
358         return ret;
359     }
360 
361     router = nxt_zalloc(sizeof(nxt_router_t));
362     if (nxt_slow_path(router == NULL)) {
363         return NXT_ERROR;
364     }
365 
366     nxt_queue_init(&router->engines);
367     nxt_queue_init(&router->sockets);
368     nxt_queue_init(&router->apps);
369 
370     nxt_router = router;
371 
372     controller_port = rt->port_by_type[NXT_PROCESS_CONTROLLER];
373     if (controller_port != NULL) {
374         nxt_router_greet_controller(task, controller_port);
375     }
376 
377     return NXT_OK;
378 }
379 
380 
381 static void
382 nxt_router_greet_controller(nxt_task_t *task, nxt_port_t *controller_port)
383 {
384     nxt_port_socket_write(task, controller_port, NXT_PORT_MSG_PROCESS_READY,
385                           -1, 0, 0, NULL);
386 }
387 
388 
389 static void
390 nxt_router_start_app_process_handler(nxt_task_t *task, nxt_port_t *port,
391     void *data)
392 {
393     size_t               size;
394     uint32_t             stream;
395     nxt_fd_t             port_fd, queue_fd;
396     nxt_int_t            ret;
397     nxt_app_t            *app;
398     nxt_buf_t            *b;
399     nxt_port_t           *dport;
400     nxt_runtime_t        *rt;
401     nxt_app_joint_rpc_t  *app_joint_rpc;
402 
403     app = data;
404 
405     nxt_thread_mutex_lock(&app->mutex);
406 
407     dport = app->proto_port;
408 
409     nxt_thread_mutex_unlock(&app->mutex);
410 
411     if (dport != NULL) {
412         nxt_debug(task, "app '%V' %p start process", &app->name, app);
413 
414         b = NULL;
415         port_fd = -1;
416         queue_fd = -1;
417 
418     } else {
419         if (app->proto_port_requests > 0) {
420             nxt_debug(task, "app '%V' %p wait for prototype process",
421                       &app->name, app);
422 
423             app->proto_port_requests++;
424 
425             goto skip;
426         }
427 
428         nxt_debug(task, "app '%V' %p start prototype process", &app->name, app);
429 
430         rt = task->thread->runtime;
431         dport = rt->port_by_type[NXT_PROCESS_MAIN];
432 
433         size = app->name.length + 1 + app->conf.length;
434 
435         b = nxt_buf_mem_alloc(task->thread->engine->mem_pool, size, 0);
436         if (nxt_slow_path(b == NULL)) {
437             goto failed;
438         }
439 
440         nxt_buf_cpystr(b, &app->name);
441         *b->mem.free++ = '\0';
442         nxt_buf_cpystr(b, &app->conf);
443 
444         port_fd = app->shared_port->pair[0];
445         queue_fd = app->shared_port->queue_fd;
446     }
447 
448     app_joint_rpc = nxt_port_rpc_register_handler_ex(task, port,
449                                                      nxt_router_app_port_ready,
450                                                      nxt_router_app_port_error,
451                                                    sizeof(nxt_app_joint_rpc_t));
452     if (nxt_slow_path(app_joint_rpc == NULL)) {
453         goto failed;
454     }
455 
456     stream = nxt_port_rpc_ex_stream(app_joint_rpc);
457 
458     ret = nxt_port_socket_write2(task, dport, NXT_PORT_MSG_START_PROCESS,
459                                  port_fd, queue_fd, stream, port->id, b);
460     if (nxt_slow_path(ret != NXT_OK)) {
461         nxt_port_rpc_cancel(task, port, stream);
462 
463         goto failed;
464     }
465 
466     app_joint_rpc->app_joint = app->joint;
467     app_joint_rpc->generation = app->generation;
468     app_joint_rpc->proto = (b != NULL);
469 
470     if (b != NULL) {
471         app->proto_port_requests++;
472 
473         b = NULL;
474     }
475 
476     nxt_router_app_joint_use(task, app->joint, 1);
477 
478 failed:
479 
480     if (b != NULL) {
481         nxt_mp_free(b->data, b);
482     }
483 
484 skip:
485 
486     nxt_router_app_use(task, app, -1);
487 }
488 
489 
490 static void
491 nxt_router_app_joint_use(nxt_task_t *task, nxt_app_joint_t *app_joint, int i)
492 {
493     app_joint->use_count += i;
494 
495     if (app_joint->use_count == 0) {
496         nxt_assert(app_joint->app == NULL);
497 
498         nxt_free(app_joint);
499     }
500 }
501 
502 
503 static nxt_int_t
504 nxt_router_start_app_process(nxt_task_t *task, nxt_app_t *app)
505 {
506     nxt_int_t      res;
507     nxt_port_t     *router_port;
508     nxt_runtime_t  *rt;
509 
510     nxt_debug(task, "app '%V' start process", &app->name);
511 
512     rt = task->thread->runtime;
513     router_port = rt->port_by_type[NXT_PROCESS_ROUTER];
514 
515     nxt_router_app_use(task, app, 1);
516 
517     res = nxt_port_post(task, router_port, nxt_router_start_app_process_handler,
518                         app);
519 
520     if (res == NXT_OK) {
521         return res;
522     }
523 
524     nxt_thread_mutex_lock(&app->mutex);
525 
526     app->pending_processes--;
527 
528     nxt_thread_mutex_unlock(&app->mutex);
529 
530     nxt_router_app_use(task, app, -1);
531 
532     return NXT_ERROR;
533 }
534 
535 
536 nxt_inline nxt_bool_t
537 nxt_router_msg_cancel(nxt_task_t *task, nxt_request_rpc_data_t *req_rpc_data)
538 {
539     nxt_buf_t       *b, *next;
540     nxt_bool_t      cancelled;
541     nxt_port_t      *app_port;
542     nxt_msg_info_t  *msg_info;
543 
544     msg_info = &req_rpc_data->msg_info;
545 
546     if (msg_info->buf == NULL) {
547         return 0;
548     }
549 
550     app_port = req_rpc_data->app_port;
551 
552     if (app_port != NULL && app_port->id == NXT_SHARED_PORT_ID) {
553         cancelled = nxt_app_queue_cancel(app_port->queue,
554                                          msg_info->tracking_cookie,
555                                          req_rpc_data->stream);
556 
557         if (cancelled) {
558             nxt_debug(task, "stream #%uD: cancelled by router",
559                       req_rpc_data->stream);
560         }
561 
562     } else {
563         cancelled = 0;
564     }
565 
566     for (b = msg_info->buf; b != NULL; b = next) {
567         next = b->next;
568         b->next = NULL;
569 
570         if (b->is_port_mmap_sent) {
571             b->is_port_mmap_sent = cancelled == 0;
572         }
573 
574         b->completion_handler(task, b, b->parent);
575     }
576 
577     msg_info->buf = NULL;
578 
579     return cancelled;
580 }
581 
582 
583 nxt_inline nxt_bool_t
584 nxt_queue_chk_remove(nxt_queue_link_t *lnk)
585 {
586     if (lnk->next != NULL) {
587         nxt_queue_remove(lnk);
588 
589         lnk->next = NULL;
590 
591         return 1;
592     }
593 
594     return 0;
595 }
596 
597 
598 nxt_inline void
599 nxt_request_rpc_data_unlink(nxt_task_t *task,
600     nxt_request_rpc_data_t *req_rpc_data)
601 {
602     nxt_app_t           *app;
603     nxt_bool_t          unlinked;
604     nxt_http_request_t  *r;
605 
606     nxt_router_msg_cancel(task, req_rpc_data);
607 
608     app = req_rpc_data->app;
609 
610     if (req_rpc_data->app_port != NULL) {
611         nxt_router_app_port_release(task, app, req_rpc_data->app_port,
612                                     req_rpc_data->apr_action);
613 
614         req_rpc_data->app_port = NULL;
615     }
616 
617     r = req_rpc_data->request;
618 
619     if (r != NULL) {
620         r->timer_data = NULL;
621 
622         nxt_router_http_request_release_post(task, r);
623 
624         r->req_rpc_data = NULL;
625         req_rpc_data->request = NULL;
626 
627         if (app != NULL) {
628             unlinked = 0;
629 
630             nxt_thread_mutex_lock(&app->mutex);
631 
632             if (r->app_link.next != NULL) {
633                 nxt_queue_remove(&r->app_link);
634                 r->app_link.next = NULL;
635 
636                 unlinked = 1;
637             }
638 
639             nxt_thread_mutex_unlock(&app->mutex);
640 
641             if (unlinked) {
642                 nxt_mp_release(r->mem_pool);
643             }
644         }
645     }
646 
647     if (app != NULL) {
648         nxt_router_app_use(task, app, -1);
649 
650         req_rpc_data->app = NULL;
651     }
652 
653     if (req_rpc_data->msg_info.body_fd != -1) {
654         nxt_fd_close(req_rpc_data->msg_info.body_fd);
655 
656         req_rpc_data->msg_info.body_fd = -1;
657     }
658 
659     if (req_rpc_data->rpc_cancel) {
660         req_rpc_data->rpc_cancel = 0;
661 
662         nxt_port_rpc_cancel(task, task->thread->engine->port,
663                             req_rpc_data->stream);
664     }
665 }
666 
667 
668 static void
669 nxt_router_new_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
670 {
671     nxt_int_t      res;
672     nxt_app_t      *app;
673     nxt_port_t     *port, *main_app_port;
674     nxt_runtime_t  *rt;
675 
676     nxt_port_new_port_handler(task, msg);
677 
678     port = msg->u.new_port;
679 
680     if (port != NULL && port->type == NXT_PROCESS_CONTROLLER) {
681         nxt_router_greet_controller(task, msg->u.new_port);
682     }
683 
684     if (port != NULL && port->type == NXT_PROCESS_PROTOTYPE)  {
685         nxt_port_rpc_handler(task, msg);
686 
687         return;
688     }
689 
690     if (port == NULL || port->type != NXT_PROCESS_APP) {
691 
692         if (msg->port_msg.stream == 0) {
693             return;
694         }
695 
696         msg->port_msg.type = _NXT_PORT_MSG_RPC_ERROR;
697 
698     } else {
699         if (msg->fd[1] != -1) {
700             res = nxt_router_port_queue_map(task, port, msg->fd[1]);
701             if (nxt_slow_path(res != NXT_OK)) {
702                 return;
703             }
704 
705             nxt_fd_close(msg->fd[1]);
706             msg->fd[1] = -1;
707         }
708     }
709 
710     if (msg->port_msg.stream != 0) {
711         nxt_port_rpc_handler(task, msg);
712         return;
713     }
714 
715     nxt_debug(task, "new port id %d (%d)", port->id, port->type);
716 
717     /*
718      * Port with "id == 0" is application 'main' port and it always
719      * should come with non-zero stream.
720      */
721     nxt_assert(port->id != 0);
722 
723     /* Find 'main' app port and get app reference. */
724     rt = task->thread->runtime;
725 
726     /*
727      * It is safe to access 'runtime->ports' hash because 'NEW_PORT'
728      * sent to main port (with id == 0) and processed in main thread.
729      */
730     main_app_port = nxt_port_hash_find(&rt->ports, port->pid, 0);
731     nxt_assert(main_app_port != NULL);
732 
733     app = main_app_port->app;
734 
735     if (nxt_fast_path(app != NULL)) {
736         nxt_thread_mutex_lock(&app->mutex);
737 
738         /* TODO here should be find-and-add code because there can be
739            port waiters in port_hash */
740         nxt_port_hash_add(&app->port_hash, port);
741         app->port_hash_count++;
742 
743         nxt_thread_mutex_unlock(&app->mutex);
744 
745         port->app = app;
746     }
747 
748     port->main_app_port = main_app_port;
749 
750     nxt_port_socket_write(task, port, NXT_PORT_MSG_PORT_ACK, -1, 0, 0, NULL);
751 }
752 
753 
754 static void
755 nxt_router_conf_data_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
756 {
757     void                    *p;
758     size_t                  size;
759     nxt_int_t               ret;
760     nxt_port_t              *port;
761     nxt_router_temp_conf_t  *tmcf;
762 
763     port = nxt_runtime_port_find(task->thread->runtime,
764                                  msg->port_msg.pid,
765                                  msg->port_msg.reply_port);
766     if (nxt_slow_path(port == NULL)) {
767         nxt_alert(task, "conf_data_handler: reply port not found");
768         return;
769     }
770 
771     p = MAP_FAILED;
772 
773     /*
774      * Ancient compilers like gcc 4.8.5 on CentOS 7 wants 'size' to be
775      * initialized in 'cleanup' section.
776      */
777     size = 0;
778 
779     tmcf = nxt_router_temp_conf(task);
780     if (nxt_slow_path(tmcf == NULL)) {
781         goto fail;
782     }
783 
784     if (nxt_slow_path(msg->fd[0] == -1)) {
785         nxt_alert(task, "conf_data_handler: invalid shm fd");
786         goto fail;
787     }
788 
789     if (nxt_buf_mem_used_size(&msg->buf->mem) != sizeof(size_t)) {
790         nxt_alert(task, "conf_data_handler: unexpected buffer size (%d)",
791                   (int) nxt_buf_mem_used_size(&msg->buf->mem));
792         goto fail;
793     }
794 
795     nxt_memcpy(&size, msg->buf->mem.pos, sizeof(size_t));
796 
797     p = nxt_mem_mmap(NULL, size, PROT_READ, MAP_SHARED, msg->fd[0], 0);
798 
799     nxt_fd_close(msg->fd[0]);
800     msg->fd[0] = -1;
801 
802     if (nxt_slow_path(p == MAP_FAILED)) {
803         goto fail;
804     }
805 
806     nxt_debug(task, "conf_data_handler(%uz): %*s", size, size, p);
807 
808     tmcf->router_conf->router = nxt_router;
809     tmcf->stream = msg->port_msg.stream;
810     tmcf->port = port;
811 
812     nxt_port_use(task, tmcf->port, 1);
813 
814     ret = nxt_router_conf_create(task, tmcf, p, nxt_pointer_to(p, size));
815 
816     if (nxt_fast_path(ret == NXT_OK)) {
817         nxt_router_conf_apply(task, tmcf, NULL);
818 
819     } else {
820         nxt_router_conf_error(task, tmcf);
821     }
822 
823     goto cleanup;
824 
825 fail:
826 
827     nxt_port_socket_write(task, port, NXT_PORT_MSG_RPC_ERROR, -1,
828                           msg->port_msg.stream, 0, NULL);
829 
830     if (tmcf != NULL) {
831         nxt_mp_release(tmcf->mem_pool);
832     }
833 
834 cleanup:
835 
836     if (p != MAP_FAILED) {
837         nxt_mem_munmap(p, size);
838     }
839 
840     if (msg->fd[0] != -1) {
841         nxt_fd_close(msg->fd[0]);
842         msg->fd[0] = -1;
843     }
844 }
845 
846 
847 static void
848 nxt_router_app_restart_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
849 {
850     nxt_app_t            *app;
851     nxt_int_t            ret;
852     nxt_str_t            app_name;
853     nxt_port_t           *reply_port, *shared_port, *old_shared_port;
854     nxt_port_t           *proto_port;
855     nxt_port_msg_type_t  reply;
856 
857     reply_port = nxt_runtime_port_find(task->thread->runtime,
858                                        msg->port_msg.pid,
859                                        msg->port_msg.reply_port);
860     if (nxt_slow_path(reply_port == NULL)) {
861         nxt_alert(task, "app_restart_handler: reply port not found");
862         return;
863     }
864 
865     app_name.length = nxt_buf_mem_used_size(&msg->buf->mem);
866     app_name.start = msg->buf->mem.pos;
867 
868     nxt_debug(task, "app_restart_handler: %V", &app_name);
869 
870     app = nxt_router_app_find(&nxt_router->apps, &app_name);
871 
872     if (nxt_fast_path(app != NULL)) {
873         shared_port = nxt_port_new(task, NXT_SHARED_PORT_ID, nxt_pid,
874                                    NXT_PROCESS_APP);
875         if (nxt_slow_path(shared_port == NULL)) {
876             goto fail;
877         }
878 
879         ret = nxt_port_socket_init(task, shared_port, 0);
880         if (nxt_slow_path(ret != NXT_OK)) {
881             nxt_port_use(task, shared_port, -1);
882             goto fail;
883         }
884 
885         ret = nxt_router_app_queue_init(task, shared_port);
886         if (nxt_slow_path(ret != NXT_OK)) {
887             nxt_port_write_close(shared_port);
888             nxt_port_read_close(shared_port);
889             nxt_port_use(task, shared_port, -1);
890             goto fail;
891         }
892 
893         nxt_port_write_enable(task, shared_port);
894 
895         nxt_thread_mutex_lock(&app->mutex);
896 
897         proto_port = app->proto_port;
898 
899         if (proto_port != NULL) {
900             nxt_debug(task, "send QUIT to prototype '%V' pid %PI", &app->name,
901                       proto_port->pid);
902 
903             app->proto_port = NULL;
904             proto_port->app = NULL;
905         }
906 
907         app->generation++;
908 
909         shared_port->app = app;
910 
911         old_shared_port = app->shared_port;
912         old_shared_port->app = NULL;
913 
914         app->shared_port = shared_port;
915 
916         nxt_thread_mutex_unlock(&app->mutex);
917 
918         nxt_port_close(task, old_shared_port);
919         nxt_port_use(task, old_shared_port, -1);
920 
921         if (proto_port != NULL) {
922             (void) nxt_port_socket_write(task, proto_port, NXT_PORT_MSG_QUIT,
923                                          -1, 0, 0, NULL);
924 
925             nxt_port_close(task, proto_port);
926 
927             nxt_port_use(task, proto_port, -1);
928         }
929 
930         reply = NXT_PORT_MSG_RPC_READY_LAST;
931 
932     } else {
933 
934 fail:
935 
936         reply = NXT_PORT_MSG_RPC_ERROR;
937     }
938 
939     nxt_port_socket_write(task, reply_port, reply, -1, msg->port_msg.stream,
940                           0, NULL);
941 }
942 
943 
944 static void
945 nxt_router_app_process_remove_pid(nxt_task_t *task, nxt_port_t *port,
946     void *data)
947 {
948     union {
949         nxt_pid_t  removed_pid;
950         void       *data;
951     } u;
952 
953     u.data = data;
954 
955     nxt_port_rpc_remove_peer(task, port, u.removed_pid);
956 }
957 
958 
959 static void
960 nxt_router_remove_pid_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
961 {
962     nxt_event_engine_t  *engine;
963 
964     nxt_port_remove_pid_handler(task, msg);
965 
966     nxt_queue_each(engine, &nxt_router->engines, nxt_event_engine_t, link0)
967     {
968         if (nxt_fast_path(engine->port != NULL)) {
969             nxt_port_post(task, engine->port, nxt_router_app_process_remove_pid,
970                           msg->u.data);
971         }
972     }
973     nxt_queue_loop;
974 
975     if (msg->port_msg.stream == 0) {
976         return;
977     }
978 
979     msg->port_msg.type = _NXT_PORT_MSG_RPC_ERROR;
980 
981     nxt_port_rpc_handler(task, msg);
982 }
983 
984 
985 static nxt_router_temp_conf_t *
986 nxt_router_temp_conf(nxt_task_t *task)
987 {
988     nxt_mp_t                *mp, *tmp;
989     nxt_router_conf_t       *rtcf;
990     nxt_router_temp_conf_t  *tmcf;
991 
992     mp = nxt_mp_create(1024, 128, 256, 32);
993     if (nxt_slow_path(mp == NULL)) {
994         return NULL;
995     }
996 
997     rtcf = nxt_mp_zget(mp, sizeof(nxt_router_conf_t));
998     if (nxt_slow_path(rtcf == NULL)) {
999         goto fail;
1000     }
1001 
1002     rtcf->mem_pool = mp;
1003 
1004     tmp = nxt_mp_create(1024, 128, 256, 32);
1005     if (nxt_slow_path(tmp == NULL)) {
1006         goto fail;
1007     }
1008 
1009     tmcf = nxt_mp_zget(tmp, sizeof(nxt_router_temp_conf_t));
1010     if (nxt_slow_path(tmcf == NULL)) {
1011         goto temp_fail;
1012     }
1013 
1014     tmcf->mem_pool = tmp;
1015     tmcf->router_conf = rtcf;
1016     tmcf->count = 1;
1017     tmcf->engine = task->thread->engine;
1018 
1019     tmcf->engines = nxt_array_create(tmcf->mem_pool, 4,
1020                                      sizeof(nxt_router_engine_conf_t));
1021     if (nxt_slow_path(tmcf->engines == NULL)) {
1022         goto temp_fail;
1023     }
1024 
1025     nxt_queue_init(&creating_sockets);
1026     nxt_queue_init(&pending_sockets);
1027     nxt_queue_init(&updating_sockets);
1028     nxt_queue_init(&keeping_sockets);
1029     nxt_queue_init(&deleting_sockets);
1030 
1031 #if (NXT_TLS)
1032     nxt_queue_init(&tmcf->tls);
1033 #endif
1034 
1035     nxt_queue_init(&tmcf->apps);
1036     nxt_queue_init(&tmcf->previous);
1037 
1038     return tmcf;
1039 
1040 temp_fail:
1041 
1042     nxt_mp_destroy(tmp);
1043 
1044 fail:
1045 
1046     nxt_mp_destroy(mp);
1047 
1048     return NULL;
1049 }
1050 
1051 
1052 nxt_inline nxt_bool_t
1053 nxt_router_app_can_start(nxt_app_t *app)
1054 {
1055     return app->processes + app->pending_processes < app->max_processes
1056             && app->pending_processes < app->max_pending_processes;
1057 }
1058 
1059 
1060 nxt_inline nxt_bool_t
1061 nxt_router_app_need_start(nxt_app_t *app)
1062 {
1063     return (app->active_requests
1064               > app->port_hash_count + app->pending_processes)
1065            || (app->spare_processes
1066                 > app->idle_processes + app->pending_processes);
1067 }
1068 
1069 
1070 static void
1071 nxt_router_conf_apply(nxt_task_t *task, void *obj, void *data)
1072 {
1073     nxt_int_t                    ret;
1074     nxt_app_t                    *app;
1075     nxt_router_t                 *router;
1076     nxt_runtime_t                *rt;
1077     nxt_queue_link_t             *qlk;
1078     nxt_socket_conf_t            *skcf;
1079     nxt_router_conf_t            *rtcf;
1080     nxt_router_temp_conf_t       *tmcf;
1081     const nxt_event_interface_t  *interface;
1082 #if (NXT_TLS)
1083     nxt_router_tlssock_t         *tls;
1084 #endif
1085 
1086     tmcf = obj;
1087 
1088     qlk = nxt_queue_first(&pending_sockets);
1089 
1090     if (qlk != nxt_queue_tail(&pending_sockets)) {
1091         nxt_queue_remove(qlk);
1092         nxt_queue_insert_tail(&creating_sockets, qlk);
1093 
1094         skcf = nxt_queue_link_data(qlk, nxt_socket_conf_t, link);
1095 
1096         nxt_router_listen_socket_rpc_create(task, tmcf, skcf);
1097 
1098         return;
1099     }
1100 
1101 #if (NXT_TLS)
1102     qlk = nxt_queue_last(&tmcf->tls);
1103 
1104     if (qlk != nxt_queue_head(&tmcf->tls)) {
1105         nxt_queue_remove(qlk);
1106 
1107         tls = nxt_queue_link_data(qlk, nxt_router_tlssock_t, link);
1108 
1109         nxt_cert_store_get(task, &tls->name, tmcf->mem_pool,
1110                            nxt_router_tls_rpc_handler, tls);
1111         return;
1112     }
1113 #endif
1114 
1115     nxt_queue_each(app, &tmcf->apps, nxt_app_t, link) {
1116 
1117         if (nxt_router_app_need_start(app)) {
1118             nxt_router_app_rpc_create(task, tmcf, app);
1119             return;
1120         }
1121 
1122     } nxt_queue_loop;
1123 
1124     rtcf = tmcf->router_conf;
1125 
1126     if (rtcf->access_log != NULL && rtcf->access_log->fd == -1) {
1127         nxt_router_access_log_open(task, tmcf);
1128         return;
1129     }
1130 
1131     rt = task->thread->runtime;
1132 
1133     interface = nxt_service_get(rt->services, "engine", NULL);
1134 
1135     router = rtcf->router;
1136 
1137     ret = nxt_router_engines_create(task, router, tmcf, interface);
1138     if (nxt_slow_path(ret != NXT_OK)) {
1139         goto fail;
1140     }
1141 
1142     ret = nxt_router_threads_create(task, rt, tmcf);
1143     if (nxt_slow_path(ret != NXT_OK)) {
1144         goto fail;
1145     }
1146 
1147     nxt_router_apps_sort(task, router, tmcf);
1148 
1149     nxt_router_apps_hash_use(task, rtcf, 1);
1150 
1151     nxt_router_engines_post(router, tmcf);
1152 
1153     nxt_queue_add(&router->sockets, &updating_sockets);
1154     nxt_queue_add(&router->sockets, &creating_sockets);
1155 
1156     router->access_log = rtcf->access_log;
1157 
1158     nxt_router_conf_ready(task, tmcf);
1159 
1160     return;
1161 
1162 fail:
1163 
1164     nxt_router_conf_error(task, tmcf);
1165 
1166     return;
1167 }
1168 
1169 
1170 static void
1171 nxt_router_conf_wait(nxt_task_t *task, void *obj, void *data)
1172 {
1173     nxt_joint_job_t  *job;
1174 
1175     job = obj;
1176 
1177     nxt_router_conf_ready(task, job->tmcf);
1178 }
1179 
1180 
1181 static void
1182 nxt_router_conf_ready(nxt_task_t *task, nxt_router_temp_conf_t *tmcf)
1183 {
1184     uint32_t               count;
1185     nxt_router_conf_t      *rtcf;
1186     nxt_thread_spinlock_t  *lock;
1187 
1188     nxt_debug(task, "temp conf %p count: %D", tmcf, tmcf->count);
1189 
1190     if (--tmcf->count > 0) {
1191         return;
1192     }
1193 
1194     nxt_router_conf_send(task, tmcf, NXT_PORT_MSG_RPC_READY_LAST);
1195 
1196     rtcf = tmcf->router_conf;
1197 
1198     lock = &rtcf->router->lock;
1199 
1200     nxt_thread_spin_lock(lock);
1201 
1202     count = rtcf->count;
1203 
1204     nxt_thread_spin_unlock(lock);
1205 
1206     nxt_debug(task, "rtcf %p: %D", rtcf, count);
1207 
1208     if (count == 0) {
1209         nxt_router_apps_hash_use(task, rtcf, -1);
1210 
1211         nxt_router_access_log_release(task, lock, rtcf->access_log);
1212 
1213         nxt_mp_destroy(rtcf->mem_pool);
1214     }
1215 
1216     nxt_mp_release(tmcf->mem_pool);
1217 }
1218 
1219 
1220 static void
1221 nxt_router_conf_error(nxt_task_t *task, nxt_router_temp_conf_t *tmcf)
1222 {
1223     nxt_app_t          *app;
1224     nxt_queue_t        new_socket_confs;
1225     nxt_socket_t       s;
1226     nxt_router_t       *router;
1227     nxt_queue_link_t   *qlk;
1228     nxt_socket_conf_t  *skcf;
1229     nxt_router_conf_t  *rtcf;
1230 
1231     nxt_alert(task, "failed to apply new conf");
1232 
1233     for (qlk = nxt_queue_first(&creating_sockets);
1234          qlk != nxt_queue_tail(&creating_sockets);
1235          qlk = nxt_queue_next(qlk))
1236     {
1237         skcf = nxt_queue_link_data(qlk, nxt_socket_conf_t, link);
1238         s = skcf->listen->socket;
1239 
1240         if (s != -1) {
1241             nxt_socket_close(task, s);
1242         }
1243 
1244         nxt_free(skcf->listen);
1245     }
1246 
1247     nxt_queue_init(&new_socket_confs);
1248     nxt_queue_add(&new_socket_confs, &updating_sockets);
1249     nxt_queue_add(&new_socket_confs, &pending_sockets);
1250     nxt_queue_add(&new_socket_confs, &creating_sockets);
1251 
1252     rtcf = tmcf->router_conf;
1253 
1254     nxt_queue_each(app, &tmcf->apps, nxt_app_t, link) {
1255 
1256         nxt_router_app_unlink(task, app);
1257 
1258     } nxt_queue_loop;
1259 
1260     router = rtcf->router;
1261 
1262     nxt_queue_add(&router->sockets, &keeping_sockets);
1263     nxt_queue_add(&router->sockets, &deleting_sockets);
1264 
1265     nxt_queue_add(&router->apps, &tmcf->previous);
1266 
1267     // TODO: new engines and threads
1268 
1269     nxt_router_access_log_release(task, &router->lock, rtcf->access_log);
1270 
1271     nxt_mp_destroy(rtcf->mem_pool);
1272 
1273     nxt_router_conf_send(task, tmcf, NXT_PORT_MSG_RPC_ERROR);
1274 
1275     nxt_mp_release(tmcf->mem_pool);
1276 }
1277 
1278 
1279 static void
1280 nxt_router_conf_send(nxt_task_t *task, nxt_router_temp_conf_t *tmcf,
1281     nxt_port_msg_type_t type)
1282 {
1283     nxt_port_socket_write(task, tmcf->port, type, -1, tmcf->stream, 0, NULL);
1284 
1285     nxt_port_use(task, tmcf->port, -1);
1286 
1287     tmcf->port = NULL;
1288 }
1289 
1290 
1291 static nxt_conf_map_t  nxt_router_conf[] = {
1292     {
1293         nxt_string("listeners_threads"),
1294         NXT_CONF_MAP_INT32,
1295         offsetof(nxt_router_conf_t, threads),
1296     },
1297 };
1298 
1299 
1300 static nxt_conf_map_t  nxt_router_app_conf[] = {
1301     {
1302         nxt_string("type"),
1303         NXT_CONF_MAP_STR,
1304         offsetof(nxt_router_app_conf_t, type),
1305     },
1306 
1307     {
1308         nxt_string("limits"),
1309         NXT_CONF_MAP_PTR,
1310         offsetof(nxt_router_app_conf_t, limits_value),
1311     },
1312 
1313     {
1314         nxt_string("processes"),
1315         NXT_CONF_MAP_INT32,
1316         offsetof(nxt_router_app_conf_t, processes),
1317     },
1318 
1319     {
1320         nxt_string("processes"),
1321         NXT_CONF_MAP_PTR,
1322         offsetof(nxt_router_app_conf_t, processes_value),
1323     },
1324 
1325     {
1326         nxt_string("targets"),
1327         NXT_CONF_MAP_PTR,
1328         offsetof(nxt_router_app_conf_t, targets_value),
1329     },
1330 };
1331 
1332 
1333 static nxt_conf_map_t  nxt_router_app_limits_conf[] = {
1334     {
1335         nxt_string("timeout"),
1336         NXT_CONF_MAP_MSEC,
1337         offsetof(nxt_router_app_conf_t, timeout),
1338     },
1339 };
1340 
1341 
1342 static nxt_conf_map_t  nxt_router_app_processes_conf[] = {
1343     {
1344         nxt_string("spare"),
1345         NXT_CONF_MAP_INT32,
1346         offsetof(nxt_router_app_conf_t, spare_processes),
1347     },
1348 
1349     {
1350         nxt_string("max"),
1351         NXT_CONF_MAP_INT32,
1352         offsetof(nxt_router_app_conf_t, max_processes),
1353     },
1354 
1355     {
1356         nxt_string("idle_timeout"),
1357         NXT_CONF_MAP_MSEC,
1358         offsetof(nxt_router_app_conf_t, idle_timeout),
1359     },
1360 };
1361 
1362 
1363 static nxt_conf_map_t  nxt_router_listener_conf[] = {
1364     {
1365         nxt_string("pass"),
1366         NXT_CONF_MAP_STR_COPY,
1367         offsetof(nxt_router_listener_conf_t, pass),
1368     },
1369 
1370     {
1371         nxt_string("application"),
1372         NXT_CONF_MAP_STR_COPY,
1373         offsetof(nxt_router_listener_conf_t, application),
1374     },
1375 };
1376 
1377 
1378 static nxt_conf_map_t  nxt_router_http_conf[] = {
1379     {
1380         nxt_string("header_buffer_size"),
1381         NXT_CONF_MAP_SIZE,
1382         offsetof(nxt_socket_conf_t, header_buffer_size),
1383     },
1384 
1385     {
1386         nxt_string("large_header_buffer_size"),
1387         NXT_CONF_MAP_SIZE,
1388         offsetof(nxt_socket_conf_t, large_header_buffer_size),
1389     },
1390 
1391     {
1392         nxt_string("large_header_buffers"),
1393         NXT_CONF_MAP_SIZE,
1394         offsetof(nxt_socket_conf_t, large_header_buffers),
1395     },
1396 
1397     {
1398         nxt_string("body_buffer_size"),
1399         NXT_CONF_MAP_SIZE,
1400         offsetof(nxt_socket_conf_t, body_buffer_size),
1401     },
1402 
1403     {
1404         nxt_string("max_body_size"),
1405         NXT_CONF_MAP_SIZE,
1406         offsetof(nxt_socket_conf_t, max_body_size),
1407     },
1408 
1409     {
1410         nxt_string("idle_timeout"),
1411         NXT_CONF_MAP_MSEC,
1412         offsetof(nxt_socket_conf_t, idle_timeout),
1413     },
1414 
1415     {
1416         nxt_string("header_read_timeout"),
1417         NXT_CONF_MAP_MSEC,
1418         offsetof(nxt_socket_conf_t, header_read_timeout),
1419     },
1420 
1421     {
1422         nxt_string("body_read_timeout"),
1423         NXT_CONF_MAP_MSEC,
1424         offsetof(nxt_socket_conf_t, body_read_timeout),
1425     },
1426 
1427     {
1428         nxt_string("send_timeout"),
1429         NXT_CONF_MAP_MSEC,
1430         offsetof(nxt_socket_conf_t, send_timeout),
1431     },
1432 
1433     {
1434         nxt_string("body_temp_path"),
1435         NXT_CONF_MAP_STR,
1436         offsetof(nxt_socket_conf_t, body_temp_path),
1437     },
1438 
1439     {
1440         nxt_string("discard_unsafe_fields"),
1441         NXT_CONF_MAP_INT8,
1442         offsetof(nxt_socket_conf_t, discard_unsafe_fields),
1443     },
1444 };
1445 
1446 
1447 static nxt_conf_map_t  nxt_router_websocket_conf[] = {
1448     {
1449         nxt_string("max_frame_size"),
1450         NXT_CONF_MAP_SIZE,
1451         offsetof(nxt_websocket_conf_t, max_frame_size),
1452     },
1453 
1454     {
1455         nxt_string("read_timeout"),
1456         NXT_CONF_MAP_MSEC,
1457         offsetof(nxt_websocket_conf_t, read_timeout),
1458     },
1459 
1460     {
1461         nxt_string("keepalive_interval"),
1462         NXT_CONF_MAP_MSEC,
1463         offsetof(nxt_websocket_conf_t, keepalive_interval),
1464     },
1465 
1466 };
1467 
1468 
1469 static nxt_int_t
1470 nxt_router_conf_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf,
1471     u_char *start, u_char *end)
1472 {
1473     u_char                      *p;
1474     size_t                      size;
1475     nxt_mp_t                    *mp, *app_mp;
1476     uint32_t                    next, next_target;
1477     nxt_int_t                   ret;
1478     nxt_str_t                   name, path, target;
1479     nxt_app_t                   *app, *prev;
1480     nxt_str_t                   *t, *s, *targets;
1481     nxt_uint_t                  n, i;
1482     nxt_port_t                  *port;
1483     nxt_router_t                *router;
1484     nxt_app_joint_t             *app_joint;
1485 #if (NXT_TLS)
1486     nxt_tls_init_t              *tls_init;
1487     nxt_conf_value_t            *certificate;
1488 #endif
1489     nxt_conf_value_t            *conf, *http, *value, *websocket;
1490     nxt_conf_value_t            *applications, *application;
1491     nxt_conf_value_t            *listeners, *listener;
1492     nxt_conf_value_t            *routes_conf, *static_conf, *client_ip_conf;
1493     nxt_socket_conf_t           *skcf;
1494     nxt_http_routes_t           *routes;
1495     nxt_event_engine_t          *engine;
1496     nxt_app_lang_module_t       *lang;
1497     nxt_router_app_conf_t       apcf;
1498     nxt_router_access_log_t     *access_log;
1499     nxt_router_listener_conf_t  lscf;
1500 
1501     static nxt_str_t  http_path = nxt_string("/settings/http");
1502     static nxt_str_t  applications_path = nxt_string("/applications");
1503     static nxt_str_t  listeners_path = nxt_string("/listeners");
1504     static nxt_str_t  routes_path = nxt_string("/routes");
1505     static nxt_str_t  access_log_path = nxt_string("/access_log");
1506 #if (NXT_TLS)
1507     static nxt_str_t  certificate_path = nxt_string("/tls/certificate");
1508     static nxt_str_t  conf_commands_path = nxt_string("/tls/conf_commands");
1509     static nxt_str_t  conf_cache_path = nxt_string("/tls/session/cache_size");
1510     static nxt_str_t  conf_timeout_path = nxt_string("/tls/session/timeout");
1511     static nxt_str_t  conf_tickets = nxt_string("/tls/session/tickets");
1512 #endif
1513     static nxt_str_t  static_path = nxt_string("/settings/http/static");
1514     static nxt_str_t  websocket_path = nxt_string("/settings/http/websocket");
1515     static nxt_str_t  client_ip_path = nxt_string("/client_ip");
1516 
1517     conf = nxt_conf_json_parse(tmcf->mem_pool, start, end, NULL);
1518     if (conf == NULL) {
1519         nxt_alert(task, "configuration parsing error");
1520         return NXT_ERROR;
1521     }
1522 
1523     mp = tmcf->router_conf->mem_pool;
1524 
1525     ret = nxt_conf_map_object(mp, conf, nxt_router_conf,
1526                               nxt_nitems(nxt_router_conf), tmcf->router_conf);
1527     if (ret != NXT_OK) {
1528         nxt_alert(task, "root map error");
1529         return NXT_ERROR;
1530     }
1531 
1532     if (tmcf->router_conf->threads == 0) {
1533         tmcf->router_conf->threads = nxt_ncpu;
1534     }
1535 
1536     static_conf = nxt_conf_get_path(conf, &static_path);
1537 
1538     ret = nxt_router_conf_process_static(task, tmcf->router_conf, static_conf);
1539     if (nxt_slow_path(ret != NXT_OK)) {
1540         return NXT_ERROR;
1541     }
1542 
1543     router = tmcf->router_conf->router;
1544 
1545     applications = nxt_conf_get_path(conf, &applications_path);
1546 
1547     if (applications != NULL) {
1548         next = 0;
1549 
1550         for ( ;; ) {
1551             application = nxt_conf_next_object_member(applications,
1552                                                       &name, &next);
1553             if (application == NULL) {
1554                 break;
1555             }
1556 
1557             nxt_debug(task, "application \"%V\"", &name);
1558 
1559             size = nxt_conf_json_length(application, NULL);
1560 
1561             app_mp = nxt_mp_create(4096, 128, 1024, 64);
1562             if (nxt_slow_path(app_mp == NULL)) {
1563                 goto fail;
1564             }
1565 
1566             app = nxt_mp_get(app_mp, sizeof(nxt_app_t) + name.length + size);
1567             if (app == NULL) {
1568                 goto app_fail;
1569             }
1570 
1571             nxt_memzero(app, sizeof(nxt_app_t));
1572 
1573             app->mem_pool = app_mp;
1574 
1575             app->name.start = nxt_pointer_to(app, sizeof(nxt_app_t));
1576             app->conf.start = nxt_pointer_to(app, sizeof(nxt_app_t)
1577                                                   + name.length);
1578 
1579             p = nxt_conf_json_print(app->conf.start, application, NULL);
1580             app->conf.length = p - app->conf.start;
1581 
1582             nxt_assert(app->conf.length <= size);
1583 
1584             nxt_debug(task, "application conf \"%V\"", &app->conf);
1585 
1586             prev = nxt_router_app_find(&router->apps, &name);
1587 
1588             if (prev != NULL && nxt_strstr_eq(&app->conf, &prev->conf)) {
1589                 nxt_mp_destroy(app_mp);
1590 
1591                 nxt_queue_remove(&prev->link);
1592                 nxt_queue_insert_tail(&tmcf->previous, &prev->link);
1593 
1594                 ret = nxt_router_apps_hash_add(tmcf->router_conf, prev);
1595                 if (nxt_slow_path(ret != NXT_OK)) {
1596                     goto fail;
1597                 }
1598 
1599                 continue;
1600             }
1601 
1602             apcf.processes = 1;
1603             apcf.max_processes = 1;
1604             apcf.spare_processes = 0;
1605             apcf.timeout = 0;
1606             apcf.idle_timeout = 15000;
1607             apcf.limits_value = NULL;
1608             apcf.processes_value = NULL;
1609             apcf.targets_value = NULL;
1610 
1611             app_joint = nxt_malloc(sizeof(nxt_app_joint_t));
1612             if (nxt_slow_path(app_joint == NULL)) {
1613                 goto app_fail;
1614             }
1615 
1616             nxt_memzero(app_joint, sizeof(nxt_app_joint_t));
1617 
1618             ret = nxt_conf_map_object(mp, application, nxt_router_app_conf,
1619                                       nxt_nitems(nxt_router_app_conf), &apcf);
1620             if (ret != NXT_OK) {
1621                 nxt_alert(task, "application map error");
1622                 goto app_fail;
1623             }
1624 
1625             if (apcf.limits_value != NULL) {
1626 
1627                 if (nxt_conf_type(apcf.limits_value) != NXT_CONF_OBJECT) {
1628                     nxt_alert(task, "application limits is not object");
1629                     goto app_fail;
1630                 }
1631 
1632                 ret = nxt_conf_map_object(mp, apcf.limits_value,
1633                                         nxt_router_app_limits_conf,
1634                                         nxt_nitems(nxt_router_app_limits_conf),
1635                                         &apcf);
1636                 if (ret != NXT_OK) {
1637                     nxt_alert(task, "application limits map error");
1638                     goto app_fail;
1639                 }
1640             }
1641 
1642             if (apcf.processes_value != NULL
1643                 && nxt_conf_type(apcf.processes_value) == NXT_CONF_OBJECT)
1644             {
1645                 ret = nxt_conf_map_object(mp, apcf.processes_value,
1646                                      nxt_router_app_processes_conf,
1647                                      nxt_nitems(nxt_router_app_processes_conf),
1648                                      &apcf);
1649                 if (ret != NXT_OK) {
1650                     nxt_alert(task, "application processes map error");
1651                     goto app_fail;
1652                 }
1653 
1654             } else {
1655                 apcf.max_processes = apcf.processes;
1656                 apcf.spare_processes = apcf.processes;
1657             }
1658 
1659             if (apcf.targets_value != NULL) {
1660                 n = nxt_conf_object_members_count(apcf.targets_value);
1661 
1662                 targets = nxt_mp_get(app_mp, sizeof(nxt_str_t) * n);
1663                 if (nxt_slow_path(targets == NULL)) {
1664                     goto app_fail;
1665                 }
1666 
1667                 next_target = 0;
1668 
1669                 for (i = 0; i < n; i++) {
1670                     (void) nxt_conf_next_object_member(apcf.targets_value,
1671                                                        &target, &next_target);
1672 
1673                     s = nxt_str_dup(app_mp, &targets[i], &target);
1674                     if (nxt_slow_path(s == NULL)) {
1675                         goto app_fail;
1676                     }
1677                 }
1678 
1679             } else {
1680                 targets = NULL;
1681             }
1682 
1683             nxt_debug(task, "application type: %V", &apcf.type);
1684             nxt_debug(task, "application processes: %D", apcf.processes);
1685             nxt_debug(task, "application request timeout: %M", apcf.timeout);
1686 
1687             lang = nxt_app_lang_module(task->thread->runtime, &apcf.type);
1688 
1689             if (lang == NULL) {
1690                 nxt_alert(task, "unknown application type: \"%V\"", &apcf.type);
1691                 goto app_fail;
1692             }
1693 
1694             nxt_debug(task, "application language module: \"%s\"", lang->file);
1695 
1696             ret = nxt_thread_mutex_create(&app->mutex);
1697             if (ret != NXT_OK) {
1698                 goto app_fail;
1699             }
1700 
1701             nxt_queue_init(&app->ports);
1702             nxt_queue_init(&app->spare_ports);
1703             nxt_queue_init(&app->idle_ports);
1704             nxt_queue_init(&app->ack_waiting_req);
1705 
1706             app->name.length = name.length;
1707             nxt_memcpy(app->name.start, name.start, name.length);
1708 
1709             app->type = lang->type;
1710             app->max_processes = apcf.max_processes;
1711             app->spare_processes = apcf.spare_processes;
1712             app->max_pending_processes = apcf.spare_processes
1713                                          ? apcf.spare_processes : 1;
1714             app->timeout = apcf.timeout;
1715             app->idle_timeout = apcf.idle_timeout;
1716 
1717             app->targets = targets;
1718 
1719             engine = task->thread->engine;
1720 
1721             app->engine = engine;
1722 
1723             app->adjust_idle_work.handler = nxt_router_adjust_idle_timer;
1724             app->adjust_idle_work.task = &engine->task;
1725             app->adjust_idle_work.obj = app;
1726 
1727             nxt_queue_insert_tail(&tmcf->apps, &app->link);
1728 
1729             ret = nxt_router_apps_hash_add(tmcf->router_conf, app);
1730             if (nxt_slow_path(ret != NXT_OK)) {
1731                 goto app_fail;
1732             }
1733 
1734             nxt_router_app_use(task, app, 1);
1735 
1736             app->joint = app_joint;
1737 
1738             app_joint->use_count = 1;
1739             app_joint->app = app;
1740 
1741             app_joint->idle_timer.bias = NXT_TIMER_DEFAULT_BIAS;
1742             app_joint->idle_timer.work_queue = &engine->fast_work_queue;
1743             app_joint->idle_timer.handler = nxt_router_app_idle_timeout;
1744             app_joint->idle_timer.task = &engine->task;
1745             app_joint->idle_timer.log = app_joint->idle_timer.task->log;
1746 
1747             app_joint->free_app_work.handler = nxt_router_free_app;
1748             app_joint->free_app_work.task = &engine->task;
1749             app_joint->free_app_work.obj = app_joint;
1750 
1751             port = nxt_port_new(task, NXT_SHARED_PORT_ID, nxt_pid,
1752                                 NXT_PROCESS_APP);
1753             if (nxt_slow_path(port == NULL)) {
1754                 return NXT_ERROR;
1755             }
1756 
1757             ret = nxt_port_socket_init(task, port, 0);
1758             if (nxt_slow_path(ret != NXT_OK)) {
1759                 nxt_port_use(task, port, -1);
1760                 return NXT_ERROR;
1761             }
1762 
1763             ret = nxt_router_app_queue_init(task, port);
1764             if (nxt_slow_path(ret != NXT_OK)) {
1765                 nxt_port_write_close(port);
1766                 nxt_port_read_close(port);
1767                 nxt_port_use(task, port, -1);
1768                 return NXT_ERROR;
1769             }
1770 
1771             nxt_port_write_enable(task, port);
1772             port->app = app;
1773 
1774             app->shared_port = port;
1775 
1776             nxt_thread_mutex_create(&app->outgoing.mutex);
1777         }
1778     }
1779 
1780     routes_conf = nxt_conf_get_path(conf, &routes_path);
1781     if (nxt_fast_path(routes_conf != NULL)) {
1782         routes = nxt_http_routes_create(task, tmcf, routes_conf);
1783         if (nxt_slow_path(routes == NULL)) {
1784             return NXT_ERROR;
1785         }
1786         tmcf->router_conf->routes = routes;
1787     }
1788 
1789     ret = nxt_upstreams_create(task, tmcf, conf);
1790     if (nxt_slow_path(ret != NXT_OK)) {
1791         return ret;
1792     }
1793 
1794     http = nxt_conf_get_path(conf, &http_path);
1795 #if 0
1796     if (http == NULL) {
1797         nxt_alert(task, "no \"http\" block");
1798         return NXT_ERROR;
1799     }
1800 #endif
1801 
1802     websocket = nxt_conf_get_path(conf, &websocket_path);
1803 
1804     listeners = nxt_conf_get_path(conf, &listeners_path);
1805 
1806     if (listeners != NULL) {
1807         next = 0;
1808 
1809         for ( ;; ) {
1810             listener = nxt_conf_next_object_member(listeners, &name, &next);
1811             if (listener == NULL) {
1812                 break;
1813             }
1814 
1815             skcf = nxt_router_socket_conf(task, tmcf, &name);
1816             if (skcf == NULL) {
1817                 goto fail;
1818             }
1819 
1820             nxt_memzero(&lscf, sizeof(lscf));
1821 
1822             ret = nxt_conf_map_object(mp, listener, nxt_router_listener_conf,
1823                                       nxt_nitems(nxt_router_listener_conf),
1824                                       &lscf);
1825             if (ret != NXT_OK) {
1826                 nxt_alert(task, "listener map error");
1827                 goto fail;
1828             }
1829 
1830             nxt_debug(task, "application: %V", &lscf.application);
1831 
1832             // STUB, default values if http block is not defined.
1833             skcf->header_buffer_size = 2048;
1834             skcf->large_header_buffer_size = 8192;
1835             skcf->large_header_buffers = 4;
1836             skcf->discard_unsafe_fields = 1;
1837             skcf->body_buffer_size = 16 * 1024;
1838             skcf->max_body_size = 8 * 1024 * 1024;
1839             skcf->proxy_header_buffer_size = 64 * 1024;
1840             skcf->proxy_buffer_size = 4096;
1841             skcf->proxy_buffers = 256;
1842             skcf->idle_timeout = 180 * 1000;
1843             skcf->header_read_timeout = 30 * 1000;
1844             skcf->body_read_timeout = 30 * 1000;
1845             skcf->send_timeout = 30 * 1000;
1846             skcf->proxy_timeout = 60 * 1000;
1847             skcf->proxy_send_timeout = 30 * 1000;
1848             skcf->proxy_read_timeout = 30 * 1000;
1849 
1850             skcf->websocket_conf.max_frame_size = 1024 * 1024;
1851             skcf->websocket_conf.read_timeout = 60 * 1000;
1852             skcf->websocket_conf.keepalive_interval = 30 * 1000;
1853 
1854             nxt_str_null(&skcf->body_temp_path);
1855 
1856             if (http != NULL) {
1857                 ret = nxt_conf_map_object(mp, http, nxt_router_http_conf,
1858                                           nxt_nitems(nxt_router_http_conf),
1859                                           skcf);
1860                 if (ret != NXT_OK) {
1861                     nxt_alert(task, "http map error");
1862                     goto fail;
1863                 }
1864             }
1865 
1866             if (websocket != NULL) {
1867                 ret = nxt_conf_map_object(mp, websocket,
1868                                           nxt_router_websocket_conf,
1869                                           nxt_nitems(nxt_router_websocket_conf),
1870                                           &skcf->websocket_conf);
1871                 if (ret != NXT_OK) {
1872                     nxt_alert(task, "websocket map error");
1873                     goto fail;
1874                 }
1875             }
1876 
1877             t = &skcf->body_temp_path;
1878 
1879             if (t->length == 0) {
1880                 t->start = (u_char *) task->thread->runtime->tmp;
1881                 t->length = nxt_strlen(t->start);
1882             }
1883 
1884             client_ip_conf = nxt_conf_get_path(listener, &client_ip_path);
1885             ret = nxt_router_conf_process_client_ip(task, tmcf, skcf,
1886                                                     client_ip_conf);
1887             if (nxt_slow_path(ret != NXT_OK)) {
1888                 return NXT_ERROR;
1889             }
1890 
1891 #if (NXT_TLS)
1892             certificate = nxt_conf_get_path(listener, &certificate_path);
1893 
1894             if (certificate != NULL) {
1895                 tls_init = nxt_mp_get(tmcf->mem_pool, sizeof(nxt_tls_init_t));
1896                 if (nxt_slow_path(tls_init == NULL)) {
1897                     return NXT_ERROR;
1898                 }
1899 
1900                 tls_init->cache_size = 0;
1901                 tls_init->timeout = 300;
1902 
1903                 value = nxt_conf_get_path(listener, &conf_cache_path);
1904                 if (value != NULL) {
1905                     tls_init->cache_size = nxt_conf_get_number(value);
1906                 }
1907 
1908                 value = nxt_conf_get_path(listener, &conf_timeout_path);
1909                 if (value != NULL) {
1910                     tls_init->timeout = nxt_conf_get_number(value);
1911                 }
1912 
1913                 tls_init->conf_cmds = nxt_conf_get_path(listener,
1914                                                         &conf_commands_path);
1915 
1916                 tls_init->tickets_conf = nxt_conf_get_path(listener,
1917                                                            &conf_tickets);
1918 
1919                 if (nxt_conf_type(certificate) == NXT_CONF_ARRAY) {
1920                     n = nxt_conf_array_elements_count(certificate);
1921 
1922                     for (i = 0; i < n; i++) {
1923                         value = nxt_conf_get_array_element(certificate, i);
1924 
1925                         nxt_assert(value != NULL);
1926 
1927                         ret = nxt_router_conf_tls_insert(tmcf, value, skcf,
1928                                                          tls_init, i == 0);
1929                         if (nxt_slow_path(ret != NXT_OK)) {
1930                             goto fail;
1931                         }
1932                     }
1933 
1934                 } else {
1935                     /* NXT_CONF_STRING */
1936                     ret = nxt_router_conf_tls_insert(tmcf, certificate, skcf,
1937                                                      tls_init, 1);
1938                     if (nxt_slow_path(ret != NXT_OK)) {
1939                         goto fail;
1940                     }
1941                 }
1942             }
1943 #endif
1944 
1945             skcf->listen->handler = nxt_http_conn_init;
1946             skcf->router_conf = tmcf->router_conf;
1947             skcf->router_conf->count++;
1948 
1949             if (lscf.pass.length != 0) {
1950                 skcf->action = nxt_http_action_create(task, tmcf, &lscf.pass);
1951 
1952             /* COMPATIBILITY: listener application. */
1953             } else if (lscf.application.length > 0) {
1954                 skcf->action = nxt_http_pass_application(task,
1955                                                          tmcf->router_conf,
1956                                                          &lscf.application);
1957             }
1958 
1959             if (nxt_slow_path(skcf->action == NULL)) {
1960                 goto fail;
1961             }
1962         }
1963     }
1964 
1965     ret = nxt_http_routes_resolve(task, tmcf);
1966     if (nxt_slow_path(ret != NXT_OK)) {
1967         goto fail;
1968     }
1969 
1970     value = nxt_conf_get_path(conf, &access_log_path);
1971 
1972     if (value != NULL) {
1973         nxt_conf_get_string(value, &path);
1974 
1975         access_log = router->access_log;
1976 
1977         if (access_log != NULL && nxt_strstr_eq(&path, &access_log->path)) {
1978             nxt_thread_spin_lock(&router->lock);
1979             access_log->count++;
1980             nxt_thread_spin_unlock(&router->lock);
1981 
1982         } else {
1983             access_log = nxt_malloc(sizeof(nxt_router_access_log_t)
1984                                     + path.length);
1985             if (access_log == NULL) {
1986                 nxt_alert(task, "failed to allocate access log structure");
1987                 goto fail;
1988             }
1989 
1990             access_log->fd = -1;
1991             access_log->handler = &nxt_router_access_log_writer;
1992             access_log->count = 1;
1993 
1994             access_log->path.length = path.length;
1995             access_log->path.start = (u_char *) access_log
1996                                      + sizeof(nxt_router_access_log_t);
1997 
1998             nxt_memcpy(access_log->path.start, path.start, path.length);
1999         }
2000 
2001         tmcf->router_conf->access_log = access_log;
2002     }
2003 
2004     nxt_queue_add(&deleting_sockets, &router->sockets);
2005     nxt_queue_init(&router->sockets);
2006 
2007     return NXT_OK;
2008 
2009 app_fail:
2010 
2011     nxt_mp_destroy(app_mp);
2012 
2013 fail:
2014 
2015     nxt_queue_each(app, &tmcf->apps, nxt_app_t, link) {
2016 
2017         nxt_queue_remove(&app->link);
2018         nxt_thread_mutex_destroy(&app->mutex);
2019         nxt_mp_destroy(app->mem_pool);
2020 
2021     } nxt_queue_loop;
2022 
2023     return NXT_ERROR;
2024 }
2025 
2026 
2027 #if (NXT_TLS)
2028 
2029 static nxt_int_t
2030 nxt_router_conf_tls_insert(nxt_router_temp_conf_t *tmcf,
2031     nxt_conf_value_t *value, nxt_socket_conf_t *skcf,
2032     nxt_tls_init_t *tls_init, nxt_bool_t last)
2033 {
2034     nxt_router_tlssock_t  *tls;
2035 
2036     tls = nxt_mp_get(tmcf->mem_pool, sizeof(nxt_router_tlssock_t));
2037     if (nxt_slow_path(tls == NULL)) {
2038         return NXT_ERROR;
2039     }
2040 
2041     tls->tls_init = tls_init;
2042     tls->socket_conf = skcf;
2043     tls->temp_conf = tmcf;
2044     tls->last = last;
2045     nxt_conf_get_string(value, &tls->name);
2046 
2047     nxt_queue_insert_tail(&tmcf->tls, &tls->link);
2048 
2049     return NXT_OK;
2050 }
2051 
2052 #endif
2053 
2054 
2055 static nxt_int_t
2056 nxt_router_conf_process_static(nxt_task_t *task, nxt_router_conf_t *rtcf,
2057     nxt_conf_value_t *conf)
2058 {
2059     uint32_t          next, i;
2060     nxt_mp_t          *mp;
2061     nxt_str_t         *type, exten, str;
2062     nxt_int_t         ret;
2063     nxt_uint_t        exts;
2064     nxt_conf_value_t  *mtypes_conf, *ext_conf, *value;
2065 
2066     static nxt_str_t  mtypes_path = nxt_string("/mime_types");
2067 
2068     mp = rtcf->mem_pool;
2069 
2070     ret = nxt_http_static_mtypes_init(mp, &rtcf->mtypes_hash);
2071     if (nxt_slow_path(ret != NXT_OK)) {
2072         return NXT_ERROR;
2073     }
2074 
2075     if (conf == NULL) {
2076         return NXT_OK;
2077     }
2078 
2079     mtypes_conf = nxt_conf_get_path(conf, &mtypes_path);
2080 
2081     if (mtypes_conf != NULL) {
2082         next = 0;
2083 
2084         for ( ;; ) {
2085             ext_conf = nxt_conf_next_object_member(mtypes_conf, &str, &next);
2086 
2087             if (ext_conf == NULL) {
2088                 break;
2089             }
2090 
2091             type = nxt_str_dup(mp, NULL, &str);
2092             if (nxt_slow_path(type == NULL)) {
2093                 return NXT_ERROR;
2094             }
2095 
2096             if (nxt_conf_type(ext_conf) == NXT_CONF_STRING) {
2097                 nxt_conf_get_string(ext_conf, &str);
2098 
2099                 if (nxt_slow_path(nxt_str_dup(mp, &exten, &str) == NULL)) {
2100                     return NXT_ERROR;
2101                 }
2102 
2103                 ret = nxt_http_static_mtypes_hash_add(mp, &rtcf->mtypes_hash,
2104                                                       &exten, type);
2105                 if (nxt_slow_path(ret != NXT_OK)) {
2106                     return NXT_ERROR;
2107                 }
2108 
2109                 continue;
2110             }
2111 
2112             exts = nxt_conf_array_elements_count(ext_conf);
2113 
2114             for (i = 0; i < exts; i++) {
2115                 value = nxt_conf_get_array_element(ext_conf, i);
2116 
2117                 nxt_conf_get_string(value, &str);
2118 
2119                 if (nxt_slow_path(nxt_str_dup(mp, &exten, &str) == NULL)) {
2120                     return NXT_ERROR;
2121                 }
2122 
2123                 ret = nxt_http_static_mtypes_hash_add(mp, &rtcf->mtypes_hash,
2124                                                       &exten, type);
2125                 if (nxt_slow_path(ret != NXT_OK)) {
2126                     return NXT_ERROR;
2127                 }
2128             }
2129         }
2130     }
2131 
2132     return NXT_OK;
2133 }
2134 
2135 
2136 static nxt_int_t
2137 nxt_router_conf_process_client_ip(nxt_task_t *task, nxt_router_temp_conf_t *tmcf,
2138     nxt_socket_conf_t *skcf, nxt_conf_value_t *conf)
2139 {
2140     char                        c;
2141     size_t                      i;
2142     nxt_mp_t                    *mp;
2143     uint32_t                    hash;
2144     nxt_str_t                   header;
2145     nxt_conf_value_t            *source_conf, *header_conf, *recursive_conf;
2146     nxt_http_client_ip_t        *client_ip;
2147     nxt_http_route_addr_rule_t  *source;
2148 
2149     static nxt_str_t  header_path = nxt_string("/header");
2150     static nxt_str_t  source_path = nxt_string("/source");
2151     static nxt_str_t  recursive_path = nxt_string("/recursive");
2152 
2153     if (conf == NULL) {
2154         skcf->client_ip = NULL;
2155 
2156         return NXT_OK;
2157     }
2158 
2159     mp = tmcf->router_conf->mem_pool;
2160 
2161     source_conf = nxt_conf_get_path(conf, &source_path);
2162     header_conf = nxt_conf_get_path(conf, &header_path);
2163     recursive_conf = nxt_conf_get_path(conf, &recursive_path);
2164 
2165     if (source_conf == NULL || header_conf == NULL) {
2166         return NXT_ERROR;
2167     }
2168 
2169     client_ip = nxt_mp_zget(mp, sizeof(nxt_http_client_ip_t));
2170     if (nxt_slow_path(client_ip == NULL)) {
2171         return NXT_ERROR;
2172     }
2173 
2174     source = nxt_http_route_addr_rule_create(task, mp, source_conf);
2175     if (nxt_slow_path(source == NULL)) {
2176         return NXT_ERROR;
2177     }
2178 
2179     client_ip->source = source;
2180 
2181     nxt_conf_get_string(header_conf, &header);
2182 
2183     if (recursive_conf != NULL) {
2184         client_ip->recursive = nxt_conf_get_boolean(recursive_conf);
2185     }
2186 
2187     client_ip->header = nxt_str_dup(mp, NULL, &header);
2188     if (nxt_slow_path(client_ip->header == NULL)) {
2189         return NXT_ERROR;
2190     }
2191 
2192     hash = NXT_HTTP_FIELD_HASH_INIT;
2193 
2194     for (i = 0; i < client_ip->header->length; i++) {
2195         c = client_ip->header->start[i];
2196         hash = nxt_http_field_hash_char(hash, nxt_lowcase(c));
2197     }
2198 
2199     hash = nxt_http_field_hash_end(hash) & 0xFFFF;
2200 
2201     client_ip->header_hash = hash;
2202 
2203     skcf->client_ip = client_ip;
2204 
2205     return NXT_OK;
2206 }
2207 
2208 
2209 static nxt_app_t *
2210 nxt_router_app_find(nxt_queue_t *queue, nxt_str_t *name)
2211 {
2212     nxt_app_t  *app;
2213 
2214     nxt_queue_each(app, queue, nxt_app_t, link) {
2215 
2216         if (nxt_strstr_eq(name, &app->name)) {
2217             return app;
2218         }
2219 
2220     } nxt_queue_loop;
2221 
2222     return NULL;
2223 }
2224 
2225 
2226 static nxt_int_t
2227 nxt_router_app_queue_init(nxt_task_t *task, nxt_port_t *port)
2228 {
2229     void       *mem;
2230     nxt_int_t  fd;
2231 
2232     fd = nxt_shm_open(task, sizeof(nxt_app_queue_t));
2233     if (nxt_slow_path(fd == -1)) {
2234         return NXT_ERROR;
2235     }
2236 
2237     mem = nxt_mem_mmap(NULL, sizeof(nxt_app_queue_t),
2238                        PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
2239     if (nxt_slow_path(mem == MAP_FAILED)) {
2240         nxt_fd_close(fd);
2241 
2242         return NXT_ERROR;
2243     }
2244 
2245     nxt_app_queue_init(mem);
2246 
2247     port->queue_fd = fd;
2248     port->queue = mem;
2249 
2250     return NXT_OK;
2251 }
2252 
2253 
2254 static nxt_int_t
2255 nxt_router_port_queue_init(nxt_task_t *task, nxt_port_t *port)
2256 {
2257     void       *mem;
2258     nxt_int_t  fd;
2259 
2260     fd = nxt_shm_open(task, sizeof(nxt_port_queue_t));
2261     if (nxt_slow_path(fd == -1)) {
2262         return NXT_ERROR;
2263     }
2264 
2265     mem = nxt_mem_mmap(NULL, sizeof(nxt_port_queue_t),
2266                        PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
2267     if (nxt_slow_path(mem == MAP_FAILED)) {
2268         nxt_fd_close(fd);
2269 
2270         return NXT_ERROR;
2271     }
2272 
2273     nxt_port_queue_init(mem);
2274 
2275     port->queue_fd = fd;
2276     port->queue = mem;
2277 
2278     return NXT_OK;
2279 }
2280 
2281 
2282 static nxt_int_t
2283 nxt_router_port_queue_map(nxt_task_t *task, nxt_port_t *port, nxt_fd_t fd)
2284 {
2285     void  *mem;
2286 
2287     nxt_assert(fd != -1);
2288 
2289     mem = nxt_mem_mmap(NULL, sizeof(nxt_port_queue_t),
2290                        PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
2291     if (nxt_slow_path(mem == MAP_FAILED)) {
2292 
2293         return NXT_ERROR;
2294     }
2295 
2296     port->queue = mem;
2297 
2298     return NXT_OK;
2299 }
2300 
2301 
2302 static const nxt_lvlhsh_proto_t  nxt_router_apps_hash_proto  nxt_aligned(64) = {
2303     NXT_LVLHSH_DEFAULT,
2304     nxt_router_apps_hash_test,
2305     nxt_mp_lvlhsh_alloc,
2306     nxt_mp_lvlhsh_free,
2307 };
2308 
2309 
2310 static nxt_int_t
2311 nxt_router_apps_hash_test(nxt_lvlhsh_query_t *lhq, void *data)
2312 {
2313     nxt_app_t  *app;
2314 
2315     app = data;
2316 
2317     return nxt_strstr_eq(&lhq->key, &app->name) ? NXT_OK : NXT_DECLINED;
2318 }
2319 
2320 
2321 static nxt_int_t
2322 nxt_router_apps_hash_add(nxt_router_conf_t *rtcf, nxt_app_t *app)
2323 {
2324     nxt_lvlhsh_query_t  lhq;
2325 
2326     lhq.key_hash = nxt_djb_hash(app->name.start, app->name.length);
2327     lhq.replace = 0;
2328     lhq.key = app->name;
2329     lhq.value = app;
2330     lhq.proto = &nxt_router_apps_hash_proto;
2331     lhq.pool = rtcf->mem_pool;
2332 
2333     switch (nxt_lvlhsh_insert(&rtcf->apps_hash, &lhq)) {
2334 
2335     case NXT_OK:
2336         return NXT_OK;
2337 
2338     case NXT_DECLINED:
2339         nxt_thread_log_alert("router app hash adding failed: "
2340                              "\"%V\" is already in hash", &lhq.key);
2341         /* Fall through. */
2342     default:
2343         return NXT_ERROR;
2344     }
2345 }
2346 
2347 
2348 static nxt_app_t *
2349 nxt_router_apps_hash_get(nxt_router_conf_t *rtcf, nxt_str_t *name)
2350 {
2351     nxt_lvlhsh_query_t  lhq;
2352 
2353     lhq.key_hash = nxt_djb_hash(name->start, name->length);
2354     lhq.key = *name;
2355     lhq.proto = &nxt_router_apps_hash_proto;
2356 
2357     if (nxt_lvlhsh_find(&rtcf->apps_hash, &lhq) != NXT_OK) {
2358         return NULL;
2359     }
2360 
2361     return lhq.value;
2362 }
2363 
2364 
2365 static void
2366 nxt_router_apps_hash_use(nxt_task_t *task, nxt_router_conf_t *rtcf, int i)
2367 {
2368     nxt_app_t          *app;
2369     nxt_lvlhsh_each_t  lhe;
2370 
2371     nxt_lvlhsh_each_init(&lhe, &nxt_router_apps_hash_proto);
2372 
2373     for ( ;; ) {
2374         app = nxt_lvlhsh_each(&rtcf->apps_hash, &lhe);
2375 
2376         if (app == NULL) {
2377             break;
2378         }
2379 
2380         nxt_router_app_use(task, app, i);
2381     }
2382 }
2383 
2384 
2385 typedef struct {
2386     nxt_app_t  *app;
2387     nxt_int_t  target;
2388 } nxt_http_app_conf_t;
2389 
2390 
2391 nxt_int_t
2392 nxt_router_application_init(nxt_router_conf_t *rtcf, nxt_str_t *name,
2393     nxt_str_t *target, nxt_http_action_t *action)
2394 {
2395     nxt_app_t            *app;
2396     nxt_str_t            *targets;
2397     nxt_uint_t           i;
2398     nxt_http_app_conf_t  *conf;
2399 
2400     app = nxt_router_apps_hash_get(rtcf, name);
2401     if (app == NULL) {
2402         return NXT_DECLINED;
2403     }
2404 
2405     conf = nxt_mp_get(rtcf->mem_pool, sizeof(nxt_http_app_conf_t));
2406     if (nxt_slow_path(conf == NULL)) {
2407         return NXT_ERROR;
2408     }
2409 
2410     action->handler = nxt_http_application_handler;
2411     action->u.conf = conf;
2412 
2413     conf->app = app;
2414 
2415     if (target != NULL && target->length != 0) {
2416         targets = app->targets;
2417 
2418         for (i = 0; !nxt_strstr_eq(target, &targets[i]); i++);
2419 
2420         conf->target = i;
2421 
2422     } else {
2423         conf->target = 0;
2424     }
2425 
2426     return NXT_OK;
2427 }
2428 
2429 
2430 static nxt_socket_conf_t *
2431 nxt_router_socket_conf(nxt_task_t *task, nxt_router_temp_conf_t *tmcf,
2432     nxt_str_t *name)
2433 {
2434     size_t               size;
2435     nxt_int_t            ret;
2436     nxt_bool_t           wildcard;
2437     nxt_sockaddr_t       *sa;
2438     nxt_socket_conf_t    *skcf;
2439     nxt_listen_socket_t  *ls;
2440 
2441     sa = nxt_sockaddr_parse(tmcf->mem_pool, name);
2442     if (nxt_slow_path(sa == NULL)) {
2443         nxt_alert(task, "invalid listener \"%V\"", name);
2444         return NULL;
2445     }
2446 
2447     sa->type = SOCK_STREAM;
2448 
2449     nxt_debug(task, "router listener: \"%*s\"",
2450               (size_t) sa->length, nxt_sockaddr_start(sa));
2451 
2452     skcf = nxt_mp_zget(tmcf->router_conf->mem_pool, sizeof(nxt_socket_conf_t));
2453     if (nxt_slow_path(skcf == NULL)) {
2454         return NULL;
2455     }
2456 
2457     size = nxt_sockaddr_size(sa);
2458 
2459     ret = nxt_router_listen_socket_find(tmcf, skcf, sa);
2460 
2461     if (ret != NXT_OK) {
2462 
2463         ls = nxt_zalloc(sizeof(nxt_listen_socket_t) + size);
2464         if (nxt_slow_path(ls == NULL)) {
2465             return NULL;
2466         }
2467 
2468         skcf->listen = ls;
2469 
2470         ls->sockaddr = nxt_pointer_to(ls, sizeof(nxt_listen_socket_t));
2471         nxt_memcpy(ls->sockaddr, sa, size);
2472 
2473         nxt_listen_socket_remote_size(ls);
2474 
2475         ls->socket = -1;
2476         ls->backlog = NXT_LISTEN_BACKLOG;
2477         ls->flags = NXT_NONBLOCK;
2478         ls->read_after_accept = 1;
2479     }
2480 
2481     switch (sa->u.sockaddr.sa_family) {
2482 #if (NXT_HAVE_UNIX_DOMAIN)
2483     case AF_UNIX:
2484         wildcard = 0;
2485         break;
2486 #endif
2487 #if (NXT_INET6)
2488     case AF_INET6:
2489         wildcard = IN6_IS_ADDR_UNSPECIFIED(&sa->u.sockaddr_in6.sin6_addr);
2490         break;
2491 #endif
2492     case AF_INET:
2493     default:
2494         wildcard = (sa->u.sockaddr_in.sin_addr.s_addr == INADDR_ANY);
2495         break;
2496     }
2497 
2498     if (!wildcard) {
2499         skcf->sockaddr = nxt_mp_zget(tmcf->router_conf->mem_pool, size);
2500         if (nxt_slow_path(skcf->sockaddr == NULL)) {
2501             return NULL;
2502         }
2503 
2504         nxt_memcpy(skcf->sockaddr, sa, size);
2505     }
2506 
2507     return skcf;
2508 }
2509 
2510 
2511 static nxt_int_t
2512 nxt_router_listen_socket_find(nxt_router_temp_conf_t *tmcf,
2513     nxt_socket_conf_t *nskcf, nxt_sockaddr_t *sa)
2514 {
2515     nxt_router_t       *router;
2516     nxt_queue_link_t   *qlk;
2517     nxt_socket_conf_t  *skcf;
2518 
2519     router = tmcf->router_conf->router;
2520 
2521     for (qlk = nxt_queue_first(&router->sockets);
2522          qlk != nxt_queue_tail(&router->sockets);
2523          qlk = nxt_queue_next(qlk))
2524     {
2525         skcf = nxt_queue_link_data(qlk, nxt_socket_conf_t, link);
2526 
2527         if (nxt_sockaddr_cmp(skcf->listen->sockaddr, sa)) {
2528             nskcf->listen = skcf->listen;
2529 
2530             nxt_queue_remove(qlk);
2531             nxt_queue_insert_tail(&keeping_sockets, qlk);
2532 
2533             nxt_queue_insert_tail(&updating_sockets, &nskcf->link);
2534 
2535             return NXT_OK;
2536         }
2537     }
2538 
2539     nxt_queue_insert_tail(&pending_sockets, &nskcf->link);
2540 
2541     return NXT_DECLINED;
2542 }
2543 
2544 
2545 static void
2546 nxt_router_listen_socket_rpc_create(nxt_task_t *task,
2547     nxt_router_temp_conf_t *tmcf, nxt_socket_conf_t *skcf)
2548 {
2549     size_t            size;
2550     uint32_t          stream;
2551     nxt_int_t         ret;
2552     nxt_buf_t         *b;
2553     nxt_port_t        *main_port, *router_port;
2554     nxt_runtime_t     *rt;
2555     nxt_socket_rpc_t  *rpc;
2556 
2557     rpc = nxt_mp_alloc(tmcf->mem_pool, sizeof(nxt_socket_rpc_t));
2558     if (rpc == NULL) {
2559         goto fail;
2560     }
2561 
2562     rpc->socket_conf = skcf;
2563     rpc->temp_conf = tmcf;
2564 
2565     size = nxt_sockaddr_size(skcf->listen->sockaddr);
2566 
2567     b = nxt_buf_mem_alloc(tmcf->mem_pool, size, 0);
2568     if (b == NULL) {
2569         goto fail;
2570     }
2571 
2572     b->completion_handler = nxt_buf_dummy_completion;
2573 
2574     b->mem.free = nxt_cpymem(b->mem.free, skcf->listen->sockaddr, size);
2575 
2576     rt = task->thread->runtime;
2577     main_port = rt->port_by_type[NXT_PROCESS_MAIN];
2578     router_port = rt->port_by_type[NXT_PROCESS_ROUTER];
2579 
2580     stream = nxt_port_rpc_register_handler(task, router_port,
2581                                            nxt_router_listen_socket_ready,
2582                                            nxt_router_listen_socket_error,
2583                                            main_port->pid, rpc);
2584     if (nxt_slow_path(stream == 0)) {
2585         goto fail;
2586     }
2587 
2588     ret = nxt_port_socket_write(task, main_port, NXT_PORT_MSG_SOCKET, -1,
2589                                 stream, router_port->id, b);
2590 
2591     if (nxt_slow_path(ret != NXT_OK)) {
2592         nxt_port_rpc_cancel(task, router_port, stream);
2593         goto fail;
2594     }
2595 
2596     return;
2597 
2598 fail:
2599 
2600     nxt_router_conf_error(task, tmcf);
2601 }
2602 
2603 
2604 static void
2605 nxt_router_listen_socket_ready(nxt_task_t *task, nxt_port_recv_msg_t *msg,
2606     void *data)
2607 {
2608     nxt_int_t         ret;
2609     nxt_socket_t      s;
2610     nxt_socket_rpc_t  *rpc;
2611 
2612     rpc = data;
2613 
2614     s = msg->fd[0];
2615 
2616     ret = nxt_socket_nonblocking(task, s);
2617     if (nxt_slow_path(ret != NXT_OK)) {
2618         goto fail;
2619     }
2620 
2621     nxt_socket_defer_accept(task, s, rpc->socket_conf->listen->sockaddr);
2622 
2623     ret = nxt_listen_socket(task, s, NXT_LISTEN_BACKLOG);
2624     if (nxt_slow_path(ret != NXT_OK)) {
2625         goto fail;
2626     }
2627 
2628     rpc->socket_conf->listen->socket = s;
2629 
2630     nxt_work_queue_add(&task->thread->engine->fast_work_queue,
2631                        nxt_router_conf_apply, task, rpc->temp_conf, NULL);
2632 
2633     return;
2634 
2635 fail:
2636 
2637     nxt_socket_close(task, s);
2638 
2639     nxt_router_conf_error(task, rpc->temp_conf);
2640 }
2641 
2642 
2643 static void
2644 nxt_router_listen_socket_error(nxt_task_t *task, nxt_port_recv_msg_t *msg,
2645     void *data)
2646 {
2647     nxt_socket_rpc_t        *rpc;
2648     nxt_router_temp_conf_t  *tmcf;
2649 
2650     rpc = data;
2651     tmcf = rpc->temp_conf;
2652 
2653 #if 0
2654     u_char                  *p;
2655     size_t                  size;
2656     uint8_t                 error;
2657     nxt_buf_t               *in, *out;
2658     nxt_sockaddr_t          *sa;
2659 
2660     static nxt_str_t  socket_errors[] = {
2661         nxt_string("ListenerSystem"),
2662         nxt_string("ListenerNoIPv6"),
2663         nxt_string("ListenerPort"),
2664         nxt_string("ListenerInUse"),
2665         nxt_string("ListenerNoAddress"),
2666         nxt_string("ListenerNoAccess"),
2667         nxt_string("ListenerPath"),
2668     };
2669 
2670     sa = rpc->socket_conf->listen->sockaddr;
2671 
2672     in = nxt_buf_chk_make_plain(tmcf->mem_pool, msg->buf, msg->size);
2673 
2674     if (nxt_slow_path(in == NULL)) {
2675         return;
2676     }
2677 
2678     p = in->mem.pos;
2679 
2680     error = *p++;
2681 
2682     size = nxt_length("listen socket error: ")
2683            + nxt_length("{listener: \"\", code:\"\", message: \"\"}")
2684            + sa->length + socket_errors[error].length + (in->mem.free - p);
2685 
2686     out = nxt_buf_mem_alloc(tmcf->mem_pool, size, 0);
2687     if (nxt_slow_path(out == NULL)) {
2688         return;
2689     }
2690 
2691     out->mem.free = nxt_sprintf(out->mem.free, out->mem.end,
2692                         "listen socket error: "
2693                         "{listener: \"%*s\", code:\"%V\", message: \"%*s\"}",
2694                         (size_t) sa->length, nxt_sockaddr_start(sa),
2695                         &socket_errors[error], in->mem.free - p, p);
2696 
2697     nxt_debug(task, "%*s", out->mem.free - out->mem.pos, out->mem.pos);
2698 #endif
2699 
2700     nxt_router_conf_error(task, tmcf);
2701 }
2702 
2703 
2704 #if (NXT_TLS)
2705 
2706 static void
2707 nxt_router_tls_rpc_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg,
2708     void *data)
2709 {
2710     nxt_mp_t                *mp;
2711     nxt_int_t               ret;
2712     nxt_tls_conf_t          *tlscf;
2713     nxt_router_tlssock_t    *tls;
2714     nxt_tls_bundle_conf_t   *bundle;
2715     nxt_router_temp_conf_t  *tmcf;
2716 
2717     nxt_debug(task, "tls rpc handler");
2718 
2719     tls = data;
2720     tmcf = tls->temp_conf;
2721 
2722     if (msg == NULL || msg->port_msg.type == _NXT_PORT_MSG_RPC_ERROR) {
2723         goto fail;
2724     }
2725 
2726     mp = tmcf->router_conf->mem_pool;
2727 
2728     if (tls->socket_conf->tls == NULL){
2729         tlscf = nxt_mp_zget(mp, sizeof(nxt_tls_conf_t));
2730         if (nxt_slow_path(tlscf == NULL)) {
2731             goto fail;
2732         }
2733 
2734         tlscf->no_wait_shutdown = 1;
2735         tls->socket_conf->tls = tlscf;
2736 
2737     } else {
2738         tlscf = tls->socket_conf->tls;
2739     }
2740 
2741     tls->tls_init->conf = tlscf;
2742 
2743     bundle = nxt_mp_get(mp, sizeof(nxt_tls_bundle_conf_t));
2744     if (nxt_slow_path(bundle == NULL)) {
2745         goto fail;
2746     }
2747 
2748     if (nxt_slow_path(nxt_str_dup(mp, &bundle->name, &tls->name) == NULL)) {
2749         goto fail;
2750     }
2751 
2752     bundle->chain_file = msg->fd[0];
2753     bundle->next = tlscf->bundle;
2754     tlscf->bundle = bundle;
2755 
2756     ret = task->thread->runtime->tls->server_init(task, mp, tls->tls_init,
2757                                                   tls->last);
2758     if (nxt_slow_path(ret != NXT_OK)) {
2759         goto fail;
2760     }
2761 
2762     nxt_work_queue_add(&task->thread->engine->fast_work_queue,
2763                        nxt_router_conf_apply, task, tmcf, NULL);
2764     return;
2765 
2766 fail:
2767 
2768     nxt_router_conf_error(task, tmcf);
2769 }
2770 
2771 #endif
2772 
2773 
2774 static void
2775 nxt_router_app_rpc_create(nxt_task_t *task,
2776     nxt_router_temp_conf_t *tmcf, nxt_app_t *app)
2777 {
2778     size_t         size;
2779     uint32_t       stream;
2780     nxt_fd_t       port_fd, queue_fd;
2781     nxt_int_t      ret;
2782     nxt_buf_t      *b;
2783     nxt_port_t     *router_port, *dport;
2784     nxt_runtime_t  *rt;
2785     nxt_app_rpc_t  *rpc;
2786 
2787     rt = task->thread->runtime;
2788 
2789     dport = app->proto_port;
2790 
2791     if (dport == NULL) {
2792         nxt_debug(task, "app '%V' prototype prefork", &app->name);
2793 
2794         size = app->name.length + 1 + app->conf.length;
2795 
2796         b = nxt_buf_mem_alloc(tmcf->mem_pool, size, 0);
2797         if (nxt_slow_path(b == NULL)) {
2798             goto fail;
2799         }
2800 
2801         b->completion_handler = nxt_buf_dummy_completion;
2802 
2803         nxt_buf_cpystr(b, &app->name);
2804         *b->mem.free++ = '\0';
2805         nxt_buf_cpystr(b, &app->conf);
2806 
2807         dport = rt->port_by_type[NXT_PROCESS_MAIN];
2808 
2809         port_fd = app->shared_port->pair[0];
2810         queue_fd = app->shared_port->queue_fd;
2811 
2812     } else {
2813         nxt_debug(task, "app '%V' prefork", &app->name);
2814 
2815         b = NULL;
2816         port_fd = -1;
2817         queue_fd = -1;
2818     }
2819 
2820     router_port = rt->port_by_type[NXT_PROCESS_ROUTER];
2821 
2822     rpc = nxt_port_rpc_register_handler_ex(task, router_port,
2823                                            nxt_router_app_prefork_ready,
2824                                            nxt_router_app_prefork_error,
2825                                            sizeof(nxt_app_rpc_t));
2826     if (nxt_slow_path(rpc == NULL)) {
2827         goto fail;
2828     }
2829 
2830     rpc->app = app;
2831     rpc->temp_conf = tmcf;
2832     rpc->proto = (b != NULL);
2833 
2834     stream = nxt_port_rpc_ex_stream(rpc);
2835 
2836     ret = nxt_port_socket_write2(task, dport, NXT_PORT_MSG_START_PROCESS,
2837                                  port_fd, queue_fd, stream, router_port->id, b);
2838     if (nxt_slow_path(ret != NXT_OK)) {
2839         nxt_port_rpc_cancel(task, router_port, stream);
2840         goto fail;
2841     }
2842 
2843     if (b == NULL) {
2844         nxt_port_rpc_ex_set_peer(task, router_port, rpc, dport->pid);
2845 
2846         app->pending_processes++;
2847     }
2848 
2849     return;
2850 
2851 fail:
2852 
2853     nxt_router_conf_error(task, tmcf);
2854 }
2855 
2856 
2857 static void
2858 nxt_router_app_prefork_ready(nxt_task_t *task, nxt_port_recv_msg_t *msg,
2859     void *data)
2860 {
2861     nxt_app_t           *app;
2862     nxt_port_t          *port;
2863     nxt_app_rpc_t       *rpc;
2864     nxt_event_engine_t  *engine;
2865 
2866     rpc = data;
2867     app = rpc->app;
2868 
2869     port = msg->u.new_port;
2870 
2871     nxt_assert(port != NULL);
2872     nxt_assert(port->id == 0);
2873 
2874     if (rpc->proto) {
2875         nxt_assert(app->proto_port == NULL);
2876         nxt_assert(port->type == NXT_PROCESS_PROTOTYPE);
2877 
2878         nxt_port_inc_use(port);
2879 
2880         app->proto_port = port;
2881         port->app = app;
2882 
2883         nxt_router_app_rpc_create(task, rpc->temp_conf, app);
2884 
2885         return;
2886     }
2887 
2888     nxt_assert(port->type == NXT_PROCESS_APP);
2889 
2890     port->app = app;
2891     port->main_app_port = port;
2892 
2893     app->pending_processes--;
2894     app->processes++;
2895     app->idle_processes++;
2896 
2897     engine = task->thread->engine;
2898 
2899     nxt_queue_insert_tail(&app->ports, &port->app_link);
2900     nxt_queue_insert_tail(&app->spare_ports, &port->idle_link);
2901 
2902     nxt_debug(task, "app '%V' move new port %PI:%d to spare_ports",
2903               &app->name, port->pid, port->id);
2904 
2905     nxt_port_hash_add(&app->port_hash, port);
2906     app->port_hash_count++;
2907 
2908     port->idle_start = 0;
2909 
2910     nxt_port_inc_use(port);
2911 
2912     nxt_port_socket_write(task, port, NXT_PORT_MSG_PORT_ACK, -1, 0, 0, NULL);
2913 
2914     nxt_work_queue_add(&engine->fast_work_queue,
2915                        nxt_router_conf_apply, task, rpc->temp_conf, NULL);
2916 }
2917 
2918 
2919 static void
2920 nxt_router_app_prefork_error(nxt_task_t *task, nxt_port_recv_msg_t *msg,
2921     void *data)
2922 {
2923     nxt_app_t               *app;
2924     nxt_app_rpc_t           *rpc;
2925     nxt_router_temp_conf_t  *tmcf;
2926 
2927     rpc = data;
2928     app = rpc->app;
2929     tmcf = rpc->temp_conf;
2930 
2931     if (rpc->proto) {
2932         nxt_log(task, NXT_LOG_WARN, "failed to start prototype \"%V\"",
2933                 &app->name);
2934 
2935     } else {
2936         nxt_log(task, NXT_LOG_WARN, "failed to start application \"%V\"",
2937                 &app->name);
2938 
2939         app->pending_processes--;
2940     }
2941 
2942     nxt_router_conf_error(task, tmcf);
2943 }
2944 
2945 
2946 static nxt_int_t
2947 nxt_router_engines_create(nxt_task_t *task, nxt_router_t *router,
2948     nxt_router_temp_conf_t *tmcf, const nxt_event_interface_t *interface)
2949 {
2950     nxt_int_t                 ret;
2951     nxt_uint_t                n, threads;
2952     nxt_queue_link_t          *qlk;
2953     nxt_router_engine_conf_t  *recf;
2954 
2955     threads = tmcf->router_conf->threads;
2956 
2957     tmcf->engines = nxt_array_create(tmcf->mem_pool, threads,
2958                                      sizeof(nxt_router_engine_conf_t));
2959     if (nxt_slow_path(tmcf->engines == NULL)) {
2960         return NXT_ERROR;
2961     }
2962 
2963     n = 0;
2964 
2965     for (qlk = nxt_queue_first(&router->engines);
2966          qlk != nxt_queue_tail(&router->engines);
2967          qlk = nxt_queue_next(qlk))
2968     {
2969         recf = nxt_array_zero_add(tmcf->engines);
2970         if (nxt_slow_path(recf == NULL)) {
2971             return NXT_ERROR;
2972         }
2973 
2974         recf->engine = nxt_queue_link_data(qlk, nxt_event_engine_t, link0);
2975 
2976         if (n < threads) {
2977             recf->action = NXT_ROUTER_ENGINE_KEEP;
2978             ret = nxt_router_engine_conf_update(tmcf, recf);
2979 
2980         } else {
2981             recf->action = NXT_ROUTER_ENGINE_DELETE;
2982             ret = nxt_router_engine_conf_delete(tmcf, recf);
2983         }
2984 
2985         if (nxt_slow_path(ret != NXT_OK)) {
2986             return ret;
2987         }
2988 
2989         n++;
2990     }
2991 
2992     tmcf->new_threads = n;
2993 
2994     while (n < threads) {
2995         recf = nxt_array_zero_add(tmcf->engines);
2996         if (nxt_slow_path(recf == NULL)) {
2997             return NXT_ERROR;
2998         }
2999 
3000         recf->action = NXT_ROUTER_ENGINE_ADD;
3001 
3002         recf->engine = nxt_event_engine_create(task, interface, NULL, 0, 0);
3003         if (nxt_slow_path(recf->engine == NULL)) {
3004             return NXT_ERROR;
3005         }
3006 
3007         ret = nxt_router_engine_conf_create(tmcf, recf);
3008         if (nxt_slow_path(ret != NXT_OK)) {
3009             return ret;
3010         }
3011 
3012         n++;
3013     }
3014 
3015     return NXT_OK;
3016 }
3017 
3018 
3019 static nxt_int_t
3020 nxt_router_engine_conf_create(nxt_router_temp_conf_t *tmcf,
3021     nxt_router_engine_conf_t *recf)
3022 {
3023     nxt_int_t  ret;
3024 
3025     ret = nxt_router_engine_joints_create(tmcf, recf, &creating_sockets,
3026                                           nxt_router_listen_socket_create);
3027     if (nxt_slow_path(ret != NXT_OK)) {
3028         return ret;
3029     }
3030 
3031     ret = nxt_router_engine_joints_create(tmcf, recf, &updating_sockets,
3032                                           nxt_router_listen_socket_create);
3033     if (nxt_slow_path(ret != NXT_OK)) {
3034         return ret;
3035     }
3036 
3037     return ret;
3038 }
3039 
3040 
3041 static nxt_int_t
3042 nxt_router_engine_conf_update(nxt_router_temp_conf_t *tmcf,
3043     nxt_router_engine_conf_t *recf)
3044 {
3045     nxt_int_t  ret;
3046 
3047     ret = nxt_router_engine_joints_create(tmcf, recf, &creating_sockets,
3048                                           nxt_router_listen_socket_create);
3049     if (nxt_slow_path(ret != NXT_OK)) {
3050         return ret;
3051     }
3052 
3053     ret = nxt_router_engine_joints_create(tmcf, recf, &updating_sockets,
3054                                           nxt_router_listen_socket_update);
3055     if (nxt_slow_path(ret != NXT_OK)) {
3056         return ret;
3057     }
3058 
3059     ret = nxt_router_engine_joints_delete(tmcf, recf, &deleting_sockets);
3060     if (nxt_slow_path(ret != NXT_OK)) {
3061         return ret;
3062     }
3063 
3064     return ret;
3065 }
3066 
3067 
3068 static nxt_int_t
3069 nxt_router_engine_conf_delete(nxt_router_temp_conf_t *tmcf,
3070     nxt_router_engine_conf_t *recf)
3071 {
3072     nxt_int_t  ret;
3073 
3074     ret = nxt_router_engine_quit(tmcf, recf);
3075     if (nxt_slow_path(ret != NXT_OK)) {
3076         return ret;
3077     }
3078 
3079     ret = nxt_router_engine_joints_delete(tmcf, recf, &updating_sockets);
3080     if (nxt_slow_path(ret != NXT_OK)) {
3081         return ret;
3082     }
3083 
3084     return nxt_router_engine_joints_delete(tmcf, recf, &deleting_sockets);
3085 }
3086 
3087 
3088 static nxt_int_t
3089 nxt_router_engine_joints_create(nxt_router_temp_conf_t *tmcf,
3090     nxt_router_engine_conf_t *recf, nxt_queue_t *sockets,
3091     nxt_work_handler_t handler)
3092 {
3093     nxt_int_t                ret;
3094     nxt_joint_job_t          *job;
3095     nxt_queue_link_t         *qlk;
3096     nxt_socket_conf_t        *skcf;
3097     nxt_socket_conf_joint_t  *joint;
3098 
3099     for (qlk = nxt_queue_first(sockets);
3100          qlk != nxt_queue_tail(sockets);
3101          qlk = nxt_queue_next(qlk))
3102     {
3103         job = nxt_mp_get(tmcf->mem_pool, sizeof(nxt_joint_job_t));
3104         if (nxt_slow_path(job == NULL)) {
3105             return NXT_ERROR;
3106         }
3107 
3108         job->work.next = recf->jobs;
3109         recf->jobs = &job->work;
3110 
3111         job->task = tmcf->engine->task;
3112         job->work.handler = handler;
3113         job->work.task = &job->task;
3114         job->work.obj = job;
3115         job->tmcf = tmcf;
3116 
3117         tmcf->count++;
3118 
3119         joint = nxt_mp_alloc(tmcf->router_conf->mem_pool,
3120                              sizeof(nxt_socket_conf_joint_t));
3121         if (nxt_slow_path(joint == NULL)) {
3122             return NXT_ERROR;
3123         }
3124 
3125         job->work.data = joint;
3126 
3127         ret = nxt_upstreams_joint_create(tmcf, &joint->upstreams);
3128         if (nxt_slow_path(ret != NXT_OK)) {
3129             return ret;
3130         }
3131 
3132         joint->count = 1;
3133 
3134         skcf = nxt_queue_link_data(qlk, nxt_socket_conf_t, link);
3135         skcf->count++;
3136         joint->socket_conf = skcf;
3137 
3138         joint->engine = recf->engine;
3139     }
3140 
3141     return NXT_OK;
3142 }
3143 
3144 
3145 static nxt_int_t
3146 nxt_router_engine_quit(nxt_router_temp_conf_t *tmcf,
3147     nxt_router_engine_conf_t *recf)
3148 {
3149     nxt_joint_job_t  *job;
3150 
3151     job = nxt_mp_get(tmcf->mem_pool, sizeof(nxt_joint_job_t));
3152     if (nxt_slow_path(job == NULL)) {
3153         return NXT_ERROR;
3154     }
3155 
3156     job->work.next = recf->jobs;
3157     recf->jobs = &job->work;
3158 
3159     job->task = tmcf->engine->task;
3160     job->work.handler = nxt_router_worker_thread_quit;
3161     job->work.task = &job->task;
3162     job->work.obj = NULL;
3163     job->work.data = NULL;
3164     job->tmcf = NULL;
3165 
3166     return NXT_OK;
3167 }
3168 
3169 
3170 static nxt_int_t
3171 nxt_router_engine_joints_delete(