xref: /unit/src/nxt_router.c (revision 1998:c8790d2a89bb)
1 
2 /*
3  * Copyright (C) Igor Sysoev
4  * Copyright (C) Valentin V. Bartenev
5  * Copyright (C) NGINX, Inc.
6  */
7 
8 #include <nxt_router.h>
9 #include <nxt_conf.h>
10 #if (NXT_TLS)
11 #include <nxt_cert.h>
12 #endif
13 #include <nxt_http.h>
14 #include <nxt_port_memory_int.h>
15 #include <nxt_unit_request.h>
16 #include <nxt_unit_response.h>
17 #include <nxt_router_request.h>
18 #include <nxt_app_queue.h>
19 #include <nxt_port_queue.h>
20 
21 #define NXT_SHARED_PORT_ID  0xFFFFu
22 
23 typedef struct {
24     nxt_str_t         type;
25     uint32_t          processes;
26     uint32_t          max_processes;
27     uint32_t          spare_processes;
28     nxt_msec_t        timeout;
29     nxt_msec_t        idle_timeout;
30     nxt_conf_value_t  *limits_value;
31     nxt_conf_value_t  *processes_value;
32     nxt_conf_value_t  *targets_value;
33 } nxt_router_app_conf_t;
34 
35 
36 typedef struct {
37     nxt_str_t         pass;
38     nxt_str_t         application;
39 } nxt_router_listener_conf_t;
40 
41 
42 #if (NXT_TLS)
43 
44 typedef struct {
45     nxt_str_t               name;
46     nxt_socket_conf_t       *socket_conf;
47     nxt_router_temp_conf_t  *temp_conf;
48     nxt_tls_init_t          *tls_init;
49     nxt_bool_t              last;
50 
51     nxt_queue_link_t        link;  /* for nxt_socket_conf_t.tls */
52 } nxt_router_tlssock_t;
53 
54 #endif
55 
56 
57 typedef struct {
58     nxt_str_t               *name;
59     nxt_socket_conf_t       *socket_conf;
60     nxt_router_temp_conf_t  *temp_conf;
61     nxt_bool_t              last;
62 } nxt_socket_rpc_t;
63 
64 
65 typedef struct {
66     nxt_app_t               *app;
67     nxt_router_temp_conf_t  *temp_conf;
68     uint8_t                 proto;  /* 1 bit */
69 } nxt_app_rpc_t;
70 
71 
72 typedef struct {
73     nxt_app_joint_t         *app_joint;
74     uint32_t                generation;
75     uint8_t                 proto;  /* 1 bit */
76 } nxt_app_joint_rpc_t;
77 
78 
79 static nxt_int_t nxt_router_prefork(nxt_task_t *task, nxt_process_t *process,
80     nxt_mp_t *mp);
81 static nxt_int_t nxt_router_start(nxt_task_t *task, nxt_process_data_t *data);
82 static void nxt_router_greet_controller(nxt_task_t *task,
83     nxt_port_t *controller_port);
84 
85 static nxt_int_t nxt_router_start_app_process(nxt_task_t *task, nxt_app_t *app);
86 
87 static void nxt_router_new_port_handler(nxt_task_t *task,
88     nxt_port_recv_msg_t *msg);
89 static void nxt_router_conf_data_handler(nxt_task_t *task,
90     nxt_port_recv_msg_t *msg);
91 static void nxt_router_app_restart_handler(nxt_task_t *task,
92     nxt_port_recv_msg_t *msg);
93 static void nxt_router_remove_pid_handler(nxt_task_t *task,
94     nxt_port_recv_msg_t *msg);
95 static void nxt_router_access_log_reopen_handler(nxt_task_t *task,
96     nxt_port_recv_msg_t *msg);
97 
98 static nxt_router_temp_conf_t *nxt_router_temp_conf(nxt_task_t *task);
99 static void nxt_router_conf_apply(nxt_task_t *task, void *obj, void *data);
100 static void nxt_router_conf_ready(nxt_task_t *task,
101     nxt_router_temp_conf_t *tmcf);
102 static void nxt_router_conf_error(nxt_task_t *task,
103     nxt_router_temp_conf_t *tmcf);
104 static void nxt_router_conf_send(nxt_task_t *task,
105     nxt_router_temp_conf_t *tmcf, nxt_port_msg_type_t type);
106 
107 static nxt_int_t nxt_router_conf_create(nxt_task_t *task,
108     nxt_router_temp_conf_t *tmcf, u_char *start, u_char *end);
109 static nxt_int_t nxt_router_conf_process_static(nxt_task_t *task,
110     nxt_router_conf_t *rtcf, nxt_conf_value_t *conf);
111 static nxt_int_t nxt_router_conf_process_client_ip(nxt_task_t *task,
112     nxt_router_temp_conf_t *tmcf, nxt_socket_conf_t *skcf,
113     nxt_conf_value_t *conf);
114 
115 static nxt_app_t *nxt_router_app_find(nxt_queue_t *queue, nxt_str_t *name);
116 static nxt_int_t nxt_router_apps_hash_test(nxt_lvlhsh_query_t *lhq, void *data);
117 static nxt_int_t nxt_router_apps_hash_add(nxt_router_conf_t *rtcf,
118     nxt_app_t *app);
119 static nxt_app_t *nxt_router_apps_hash_get(nxt_router_conf_t *rtcf,
120     nxt_str_t *name);
121 static void nxt_router_apps_hash_use(nxt_task_t *task, nxt_router_conf_t *rtcf,
122     int i);
123 
124 static nxt_int_t nxt_router_app_queue_init(nxt_task_t *task,
125     nxt_port_t *port);
126 static nxt_int_t nxt_router_port_queue_init(nxt_task_t *task,
127     nxt_port_t *port);
128 static nxt_int_t nxt_router_port_queue_map(nxt_task_t *task,
129     nxt_port_t *port, nxt_fd_t fd);
130 static void nxt_router_listen_socket_rpc_create(nxt_task_t *task,
131     nxt_router_temp_conf_t *tmcf, nxt_socket_conf_t *skcf);
132 static void nxt_router_listen_socket_ready(nxt_task_t *task,
133     nxt_port_recv_msg_t *msg, void *data);
134 static void nxt_router_listen_socket_error(nxt_task_t *task,
135     nxt_port_recv_msg_t *msg, void *data);
136 #if (NXT_TLS)
137 static void nxt_router_tls_rpc_handler(nxt_task_t *task,
138     nxt_port_recv_msg_t *msg, void *data);
139 static nxt_int_t nxt_router_conf_tls_insert(nxt_router_temp_conf_t *tmcf,
140     nxt_conf_value_t *value, nxt_socket_conf_t *skcf, nxt_tls_init_t *tls_init,
141     nxt_bool_t last);
142 #endif
143 static void nxt_router_app_rpc_create(nxt_task_t *task,
144     nxt_router_temp_conf_t *tmcf, nxt_app_t *app);
145 static void nxt_router_app_prefork_ready(nxt_task_t *task,
146     nxt_port_recv_msg_t *msg, void *data);
147 static void nxt_router_app_prefork_error(nxt_task_t *task,
148     nxt_port_recv_msg_t *msg, void *data);
149 static nxt_socket_conf_t *nxt_router_socket_conf(nxt_task_t *task,
150     nxt_router_temp_conf_t *tmcf, nxt_str_t *name);
151 static nxt_int_t nxt_router_listen_socket_find(nxt_router_temp_conf_t *tmcf,
152     nxt_socket_conf_t *nskcf, nxt_sockaddr_t *sa);
153 
154 static nxt_int_t nxt_router_engines_create(nxt_task_t *task,
155     nxt_router_t *router, nxt_router_temp_conf_t *tmcf,
156     const nxt_event_interface_t *interface);
157 static nxt_int_t nxt_router_engine_conf_create(nxt_router_temp_conf_t *tmcf,
158     nxt_router_engine_conf_t *recf);
159 static nxt_int_t nxt_router_engine_conf_update(nxt_router_temp_conf_t *tmcf,
160     nxt_router_engine_conf_t *recf);
161 static nxt_int_t nxt_router_engine_conf_delete(nxt_router_temp_conf_t *tmcf,
162     nxt_router_engine_conf_t *recf);
163 static nxt_int_t nxt_router_engine_joints_create(nxt_router_temp_conf_t *tmcf,
164     nxt_router_engine_conf_t *recf, nxt_queue_t *sockets,
165     nxt_work_handler_t handler);
166 static nxt_int_t nxt_router_engine_quit(nxt_router_temp_conf_t *tmcf,
167     nxt_router_engine_conf_t *recf);
168 static nxt_int_t nxt_router_engine_joints_delete(nxt_router_temp_conf_t *tmcf,
169     nxt_router_engine_conf_t *recf, nxt_queue_t *sockets);
170 
171 static nxt_int_t nxt_router_threads_create(nxt_task_t *task, nxt_runtime_t *rt,
172     nxt_router_temp_conf_t *tmcf);
173 static nxt_int_t nxt_router_thread_create(nxt_task_t *task, nxt_runtime_t *rt,
174     nxt_event_engine_t *engine);
175 static void nxt_router_apps_sort(nxt_task_t *task, nxt_router_t *router,
176     nxt_router_temp_conf_t *tmcf);
177 
178 static void nxt_router_engines_post(nxt_router_t *router,
179     nxt_router_temp_conf_t *tmcf);
180 static void nxt_router_engine_post(nxt_event_engine_t *engine,
181     nxt_work_t *jobs);
182 
183 static void nxt_router_thread_start(void *data);
184 static void nxt_router_rt_add_port(nxt_task_t *task, void *obj,
185     void *data);
186 static void nxt_router_listen_socket_create(nxt_task_t *task, void *obj,
187     void *data);
188 static void nxt_router_listen_socket_update(nxt_task_t *task, void *obj,
189     void *data);
190 static void nxt_router_listen_socket_delete(nxt_task_t *task, void *obj,
191     void *data);
192 static void nxt_router_worker_thread_quit(nxt_task_t *task, void *obj,
193     void *data);
194 static void nxt_router_listen_socket_close(nxt_task_t *task, void *obj,
195     void *data);
196 static void nxt_router_thread_exit_handler(nxt_task_t *task, void *obj,
197     void *data);
198 static void nxt_router_req_headers_ack_handler(nxt_task_t *task,
199     nxt_port_recv_msg_t *msg, nxt_request_rpc_data_t *req_rpc_data);
200 static void nxt_router_listen_socket_release(nxt_task_t *task,
201     nxt_socket_conf_t *skcf);
202 
203 static void nxt_router_access_log_writer(nxt_task_t *task,
204     nxt_http_request_t *r, nxt_router_access_log_t *access_log);
205 static u_char *nxt_router_access_log_date(u_char *buf, nxt_realtime_t *now,
206     struct tm *tm, size_t size, const char *format);
207 static void nxt_router_access_log_open(nxt_task_t *task,
208     nxt_router_temp_conf_t *tmcf);
209 static void nxt_router_access_log_ready(nxt_task_t *task,
210     nxt_port_recv_msg_t *msg, void *data);
211 static void nxt_router_access_log_error(nxt_task_t *task,
212     nxt_port_recv_msg_t *msg, void *data);
213 static void nxt_router_access_log_release(nxt_task_t *task,
214     nxt_thread_spinlock_t *lock, nxt_router_access_log_t *access_log);
215 static void nxt_router_access_log_reopen_completion(nxt_task_t *task, void *obj,
216     void *data);
217 static void nxt_router_access_log_reopen_ready(nxt_task_t *task,
218     nxt_port_recv_msg_t *msg, void *data);
219 static void nxt_router_access_log_reopen_error(nxt_task_t *task,
220     nxt_port_recv_msg_t *msg, void *data);
221 
222 static void nxt_router_app_port_ready(nxt_task_t *task,
223     nxt_port_recv_msg_t *msg, void *data);
224 static nxt_int_t nxt_router_app_shared_port_send(nxt_task_t *task,
225     nxt_port_t *app_port);
226 static void nxt_router_app_port_error(nxt_task_t *task,
227     nxt_port_recv_msg_t *msg, void *data);
228 
229 static void nxt_router_app_use(nxt_task_t *task, nxt_app_t *app, int i);
230 static void nxt_router_app_unlink(nxt_task_t *task, nxt_app_t *app);
231 
232 static void nxt_router_app_port_release(nxt_task_t *task, nxt_app_t *app,
233     nxt_port_t *port, nxt_apr_action_t action);
234 static void nxt_router_app_port_get(nxt_task_t *task, nxt_app_t *app,
235     nxt_request_rpc_data_t *req_rpc_data);
236 static void nxt_router_http_request_error(nxt_task_t *task, void *obj,
237     void *data);
238 static void nxt_router_http_request_done(nxt_task_t *task, void *obj,
239     void *data);
240 
241 static void nxt_router_app_prepare_request(nxt_task_t *task,
242     nxt_request_rpc_data_t *req_rpc_data);
243 static nxt_buf_t *nxt_router_prepare_msg(nxt_task_t *task,
244     nxt_http_request_t *r, nxt_app_t *app, const nxt_str_t *prefix);
245 
246 static void nxt_router_app_timeout(nxt_task_t *task, void *obj, void *data);
247 static void nxt_router_adjust_idle_timer(nxt_task_t *task, void *obj,
248     void *data);
249 static void nxt_router_app_idle_timeout(nxt_task_t *task, void *obj,
250     void *data);
251 static void nxt_router_app_joint_release_handler(nxt_task_t *task, void *obj,
252     void *data);
253 static void nxt_router_free_app(nxt_task_t *task, void *obj, void *data);
254 
255 static const nxt_http_request_state_t  nxt_http_request_send_state;
256 static void nxt_http_request_send_body(nxt_task_t *task, void *obj, void *data);
257 
258 static void nxt_router_app_joint_use(nxt_task_t *task,
259     nxt_app_joint_t *app_joint, int i);
260 
261 static void nxt_router_http_request_release_post(nxt_task_t *task,
262     nxt_http_request_t *r);
263 static void nxt_router_http_request_release(nxt_task_t *task, void *obj,
264     void *data);
265 static void nxt_router_oosm_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg);
266 static void nxt_router_get_port_handler(nxt_task_t *task,
267     nxt_port_recv_msg_t *msg);
268 static void nxt_router_get_mmap_handler(nxt_task_t *task,
269     nxt_port_recv_msg_t *msg);
270 
271 extern const nxt_http_request_state_t  nxt_http_websocket;
272 
273 static nxt_router_t  *nxt_router;
274 
275 static const nxt_str_t http_prefix = nxt_string("HTTP_");
276 static const nxt_str_t empty_prefix = nxt_string("");
277 
278 static const nxt_str_t  *nxt_app_msg_prefix[] = {
279     &empty_prefix,
280     &empty_prefix,
281     &http_prefix,
282     &http_prefix,
283     &http_prefix,
284     &empty_prefix,
285 };
286 
287 
288 static const nxt_port_handlers_t  nxt_router_process_port_handlers = {
289     .quit         = nxt_signal_quit_handler,
290     .new_port     = nxt_router_new_port_handler,
291     .get_port     = nxt_router_get_port_handler,
292     .change_file  = nxt_port_change_log_file_handler,
293     .mmap         = nxt_port_mmap_handler,
294     .get_mmap     = nxt_router_get_mmap_handler,
295     .data         = nxt_router_conf_data_handler,
296     .app_restart  = nxt_router_app_restart_handler,
297     .remove_pid   = nxt_router_remove_pid_handler,
298     .access_log   = nxt_router_access_log_reopen_handler,
299     .rpc_ready    = nxt_port_rpc_handler,
300     .rpc_error    = nxt_port_rpc_handler,
301     .oosm         = nxt_router_oosm_handler,
302 };
303 
304 
305 const nxt_process_init_t  nxt_router_process = {
306     .name           = "router",
307     .type           = NXT_PROCESS_ROUTER,
308     .prefork        = nxt_router_prefork,
309     .restart        = 1,
310     .setup          = nxt_process_core_setup,
311     .start          = nxt_router_start,
312     .port_handlers  = &nxt_router_process_port_handlers,
313     .signals        = nxt_process_signals,
314 };
315 
316 
317 /* Queues of nxt_socket_conf_t */
318 nxt_queue_t  creating_sockets;
319 nxt_queue_t  pending_sockets;
320 nxt_queue_t  updating_sockets;
321 nxt_queue_t  keeping_sockets;
322 nxt_queue_t  deleting_sockets;
323 
324 
325 static nxt_int_t
326 nxt_router_prefork(nxt_task_t *task, nxt_process_t *process, nxt_mp_t *mp)
327 {
328     nxt_runtime_stop_app_processes(task, task->thread->runtime);
329 
330     return NXT_OK;
331 }
332 
333 
334 static nxt_int_t
335 nxt_router_start(nxt_task_t *task, nxt_process_data_t *data)
336 {
337     nxt_int_t      ret;
338     nxt_port_t     *controller_port;
339     nxt_router_t   *router;
340     nxt_runtime_t  *rt;
341 
342     rt = task->thread->runtime;
343 
344     nxt_log(task, NXT_LOG_INFO, "router started");
345 
346 #if (NXT_TLS)
347     rt->tls = nxt_service_get(rt->services, "SSL/TLS", "OpenSSL");
348     if (nxt_slow_path(rt->tls == NULL)) {
349         return NXT_ERROR;
350     }
351 
352     ret = rt->tls->library_init(task);
353     if (nxt_slow_path(ret != NXT_OK)) {
354         return ret;
355     }
356 #endif
357 
358     ret = nxt_http_init(task);
359     if (nxt_slow_path(ret != NXT_OK)) {
360         return ret;
361     }
362 
363     router = nxt_zalloc(sizeof(nxt_router_t));
364     if (nxt_slow_path(router == NULL)) {
365         return NXT_ERROR;
366     }
367 
368     nxt_queue_init(&router->engines);
369     nxt_queue_init(&router->sockets);
370     nxt_queue_init(&router->apps);
371 
372     nxt_router = router;
373 
374     controller_port = rt->port_by_type[NXT_PROCESS_CONTROLLER];
375     if (controller_port != NULL) {
376         nxt_router_greet_controller(task, controller_port);
377     }
378 
379     return NXT_OK;
380 }
381 
382 
383 static void
384 nxt_router_greet_controller(nxt_task_t *task, nxt_port_t *controller_port)
385 {
386     nxt_port_socket_write(task, controller_port, NXT_PORT_MSG_PROCESS_READY,
387                           -1, 0, 0, NULL);
388 }
389 
390 
391 static void
392 nxt_router_start_app_process_handler(nxt_task_t *task, nxt_port_t *port,
393     void *data)
394 {
395     size_t               size;
396     uint32_t             stream;
397     nxt_int_t            ret;
398     nxt_app_t            *app;
399     nxt_buf_t            *b;
400     nxt_port_t           *dport;
401     nxt_runtime_t        *rt;
402     nxt_app_joint_rpc_t  *app_joint_rpc;
403 
404     app = data;
405 
406     nxt_thread_mutex_lock(&app->mutex);
407 
408     dport = app->proto_port;
409 
410     nxt_thread_mutex_unlock(&app->mutex);
411 
412     if (dport != NULL) {
413         nxt_debug(task, "app '%V' %p start process", &app->name, app);
414 
415         b = NULL;
416 
417     } else {
418         if (app->proto_port_requests > 0) {
419             nxt_debug(task, "app '%V' %p wait for prototype process",
420                       &app->name, app);
421 
422             app->proto_port_requests++;
423 
424             goto skip;
425         }
426 
427         nxt_debug(task, "app '%V' %p start prototype process", &app->name, app);
428 
429         rt = task->thread->runtime;
430         dport = rt->port_by_type[NXT_PROCESS_MAIN];
431 
432         size = app->name.length + 1 + app->conf.length;
433 
434         b = nxt_buf_mem_alloc(task->thread->engine->mem_pool, size, 0);
435         if (nxt_slow_path(b == NULL)) {
436             goto failed;
437         }
438 
439         nxt_buf_cpystr(b, &app->name);
440         *b->mem.free++ = '\0';
441         nxt_buf_cpystr(b, &app->conf);
442     }
443 
444     app_joint_rpc = nxt_port_rpc_register_handler_ex(task, port,
445                                                      nxt_router_app_port_ready,
446                                                      nxt_router_app_port_error,
447                                                    sizeof(nxt_app_joint_rpc_t));
448     if (nxt_slow_path(app_joint_rpc == NULL)) {
449         goto failed;
450     }
451 
452     stream = nxt_port_rpc_ex_stream(app_joint_rpc);
453 
454     ret = nxt_port_socket_write(task, dport, NXT_PORT_MSG_START_PROCESS,
455                                 -1, stream, port->id, b);
456     if (nxt_slow_path(ret != NXT_OK)) {
457         nxt_port_rpc_cancel(task, port, stream);
458 
459         goto failed;
460     }
461 
462     app_joint_rpc->app_joint = app->joint;
463     app_joint_rpc->generation = app->generation;
464     app_joint_rpc->proto = (b != NULL);
465 
466     if (b != NULL) {
467         app->proto_port_requests++;
468 
469         b = NULL;
470     }
471 
472     nxt_router_app_joint_use(task, app->joint, 1);
473 
474 failed:
475 
476     if (b != NULL) {
477         nxt_mp_free(b->data, b);
478     }
479 
480 skip:
481 
482     nxt_router_app_use(task, app, -1);
483 }
484 
485 
486 static void
487 nxt_router_app_joint_use(nxt_task_t *task, nxt_app_joint_t *app_joint, int i)
488 {
489     app_joint->use_count += i;
490 
491     if (app_joint->use_count == 0) {
492         nxt_assert(app_joint->app == NULL);
493 
494         nxt_free(app_joint);
495     }
496 }
497 
498 
499 static nxt_int_t
500 nxt_router_start_app_process(nxt_task_t *task, nxt_app_t *app)
501 {
502     nxt_int_t      res;
503     nxt_port_t     *router_port;
504     nxt_runtime_t  *rt;
505 
506     nxt_debug(task, "app '%V' start process", &app->name);
507 
508     rt = task->thread->runtime;
509     router_port = rt->port_by_type[NXT_PROCESS_ROUTER];
510 
511     nxt_router_app_use(task, app, 1);
512 
513     res = nxt_port_post(task, router_port, nxt_router_start_app_process_handler,
514                         app);
515 
516     if (res == NXT_OK) {
517         return res;
518     }
519 
520     nxt_thread_mutex_lock(&app->mutex);
521 
522     app->pending_processes--;
523 
524     nxt_thread_mutex_unlock(&app->mutex);
525 
526     nxt_router_app_use(task, app, -1);
527 
528     return NXT_ERROR;
529 }
530 
531 
532 nxt_inline nxt_bool_t
533 nxt_router_msg_cancel(nxt_task_t *task, nxt_request_rpc_data_t *req_rpc_data)
534 {
535     nxt_buf_t       *b, *next;
536     nxt_bool_t      cancelled;
537     nxt_port_t      *app_port;
538     nxt_msg_info_t  *msg_info;
539 
540     msg_info = &req_rpc_data->msg_info;
541 
542     if (msg_info->buf == NULL) {
543         return 0;
544     }
545 
546     app_port = req_rpc_data->app_port;
547 
548     if (app_port != NULL && app_port->id == NXT_SHARED_PORT_ID) {
549         cancelled = nxt_app_queue_cancel(app_port->queue,
550                                          msg_info->tracking_cookie,
551                                          req_rpc_data->stream);
552 
553         if (cancelled) {
554             nxt_debug(task, "stream #%uD: cancelled by router",
555                       req_rpc_data->stream);
556         }
557 
558     } else {
559         cancelled = 0;
560     }
561 
562     for (b = msg_info->buf; b != NULL; b = next) {
563         next = b->next;
564         b->next = NULL;
565 
566         if (b->is_port_mmap_sent) {
567             b->is_port_mmap_sent = cancelled == 0;
568         }
569 
570         b->completion_handler(task, b, b->parent);
571     }
572 
573     msg_info->buf = NULL;
574 
575     return cancelled;
576 }
577 
578 
579 nxt_inline nxt_bool_t
580 nxt_queue_chk_remove(nxt_queue_link_t *lnk)
581 {
582     if (lnk->next != NULL) {
583         nxt_queue_remove(lnk);
584 
585         lnk->next = NULL;
586 
587         return 1;
588     }
589 
590     return 0;
591 }
592 
593 
594 nxt_inline void
595 nxt_request_rpc_data_unlink(nxt_task_t *task,
596     nxt_request_rpc_data_t *req_rpc_data)
597 {
598     nxt_app_t           *app;
599     nxt_bool_t          unlinked;
600     nxt_http_request_t  *r;
601 
602     nxt_router_msg_cancel(task, req_rpc_data);
603 
604     app = req_rpc_data->app;
605 
606     if (req_rpc_data->app_port != NULL) {
607         nxt_router_app_port_release(task, app, req_rpc_data->app_port,
608                                     req_rpc_data->apr_action);
609 
610         req_rpc_data->app_port = NULL;
611     }
612 
613     r = req_rpc_data->request;
614 
615     if (r != NULL) {
616         r->timer_data = NULL;
617 
618         nxt_router_http_request_release_post(task, r);
619 
620         r->req_rpc_data = NULL;
621         req_rpc_data->request = NULL;
622 
623         if (app != NULL) {
624             unlinked = 0;
625 
626             nxt_thread_mutex_lock(&app->mutex);
627 
628             if (r->app_link.next != NULL) {
629                 nxt_queue_remove(&r->app_link);
630                 r->app_link.next = NULL;
631 
632                 unlinked = 1;
633             }
634 
635             nxt_thread_mutex_unlock(&app->mutex);
636 
637             if (unlinked) {
638                 nxt_mp_release(r->mem_pool);
639             }
640         }
641     }
642 
643     if (app != NULL) {
644         nxt_router_app_use(task, app, -1);
645 
646         req_rpc_data->app = NULL;
647     }
648 
649     if (req_rpc_data->msg_info.body_fd != -1) {
650         nxt_fd_close(req_rpc_data->msg_info.body_fd);
651 
652         req_rpc_data->msg_info.body_fd = -1;
653     }
654 
655     if (req_rpc_data->rpc_cancel) {
656         req_rpc_data->rpc_cancel = 0;
657 
658         nxt_port_rpc_cancel(task, task->thread->engine->port,
659                             req_rpc_data->stream);
660     }
661 }
662 
663 
664 static void
665 nxt_router_new_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
666 {
667     nxt_int_t      res;
668     nxt_app_t      *app;
669     nxt_port_t     *port, *main_app_port;
670     nxt_runtime_t  *rt;
671 
672     nxt_port_new_port_handler(task, msg);
673 
674     port = msg->u.new_port;
675 
676     if (port != NULL && port->type == NXT_PROCESS_CONTROLLER) {
677         nxt_router_greet_controller(task, msg->u.new_port);
678     }
679 
680     if (port != NULL && port->type == NXT_PROCESS_PROTOTYPE)  {
681         nxt_port_rpc_handler(task, msg);
682 
683         return;
684     }
685 
686     if (port == NULL || port->type != NXT_PROCESS_APP) {
687 
688         if (msg->port_msg.stream == 0) {
689             return;
690         }
691 
692         msg->port_msg.type = _NXT_PORT_MSG_RPC_ERROR;
693 
694     } else {
695         if (msg->fd[1] != -1) {
696             res = nxt_router_port_queue_map(task, port, msg->fd[1]);
697             if (nxt_slow_path(res != NXT_OK)) {
698                 return;
699             }
700 
701             nxt_fd_close(msg->fd[1]);
702             msg->fd[1] = -1;
703         }
704     }
705 
706     if (msg->port_msg.stream != 0) {
707         nxt_port_rpc_handler(task, msg);
708         return;
709     }
710 
711     nxt_debug(task, "new port id %d (%d)", port->id, port->type);
712 
713     /*
714      * Port with "id == 0" is application 'main' port and it always
715      * should come with non-zero stream.
716      */
717     nxt_assert(port->id != 0);
718 
719     /* Find 'main' app port and get app reference. */
720     rt = task->thread->runtime;
721 
722     /*
723      * It is safe to access 'runtime->ports' hash because 'NEW_PORT'
724      * sent to main port (with id == 0) and processed in main thread.
725      */
726     main_app_port = nxt_port_hash_find(&rt->ports, port->pid, 0);
727     nxt_assert(main_app_port != NULL);
728 
729     app = main_app_port->app;
730 
731     if (nxt_fast_path(app != NULL)) {
732         nxt_thread_mutex_lock(&app->mutex);
733 
734         /* TODO here should be find-and-add code because there can be
735            port waiters in port_hash */
736         nxt_port_hash_add(&app->port_hash, port);
737         app->port_hash_count++;
738 
739         nxt_thread_mutex_unlock(&app->mutex);
740 
741         port->app = app;
742     }
743 
744     port->main_app_port = main_app_port;
745 
746     nxt_port_socket_write(task, port, NXT_PORT_MSG_PORT_ACK, -1, 0, 0, NULL);
747 }
748 
749 
750 static void
751 nxt_router_conf_data_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
752 {
753     void                    *p;
754     size_t                  size;
755     nxt_int_t               ret;
756     nxt_port_t              *port;
757     nxt_router_temp_conf_t  *tmcf;
758 
759     port = nxt_runtime_port_find(task->thread->runtime,
760                                  msg->port_msg.pid,
761                                  msg->port_msg.reply_port);
762     if (nxt_slow_path(port == NULL)) {
763         nxt_alert(task, "conf_data_handler: reply port not found");
764         return;
765     }
766 
767     p = MAP_FAILED;
768 
769     /*
770      * Ancient compilers like gcc 4.8.5 on CentOS 7 wants 'size' to be
771      * initialized in 'cleanup' section.
772      */
773     size = 0;
774 
775     tmcf = nxt_router_temp_conf(task);
776     if (nxt_slow_path(tmcf == NULL)) {
777         goto fail;
778     }
779 
780     if (nxt_slow_path(msg->fd[0] == -1)) {
781         nxt_alert(task, "conf_data_handler: invalid shm fd");
782         goto fail;
783     }
784 
785     if (nxt_buf_mem_used_size(&msg->buf->mem) != sizeof(size_t)) {
786         nxt_alert(task, "conf_data_handler: unexpected buffer size (%d)",
787                   (int) nxt_buf_mem_used_size(&msg->buf->mem));
788         goto fail;
789     }
790 
791     nxt_memcpy(&size, msg->buf->mem.pos, sizeof(size_t));
792 
793     p = nxt_mem_mmap(NULL, size, PROT_READ, MAP_SHARED, msg->fd[0], 0);
794 
795     nxt_fd_close(msg->fd[0]);
796     msg->fd[0] = -1;
797 
798     if (nxt_slow_path(p == MAP_FAILED)) {
799         goto fail;
800     }
801 
802     nxt_debug(task, "conf_data_handler(%uz): %*s", size, size, p);
803 
804     tmcf->router_conf->router = nxt_router;
805     tmcf->stream = msg->port_msg.stream;
806     tmcf->port = port;
807 
808     nxt_port_use(task, tmcf->port, 1);
809 
810     ret = nxt_router_conf_create(task, tmcf, p, nxt_pointer_to(p, size));
811 
812     if (nxt_fast_path(ret == NXT_OK)) {
813         nxt_router_conf_apply(task, tmcf, NULL);
814 
815     } else {
816         nxt_router_conf_error(task, tmcf);
817     }
818 
819     goto cleanup;
820 
821 fail:
822 
823     nxt_port_socket_write(task, port, NXT_PORT_MSG_RPC_ERROR, -1,
824                           msg->port_msg.stream, 0, NULL);
825 
826     if (tmcf != NULL) {
827         nxt_mp_release(tmcf->mem_pool);
828     }
829 
830 cleanup:
831 
832     if (p != MAP_FAILED) {
833         nxt_mem_munmap(p, size);
834     }
835 
836     if (msg->fd[0] != -1) {
837         nxt_fd_close(msg->fd[0]);
838         msg->fd[0] = -1;
839     }
840 }
841 
842 
843 static void
844 nxt_router_app_restart_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
845 {
846     nxt_app_t            *app;
847     nxt_int_t            ret;
848     nxt_str_t            app_name;
849     nxt_port_t           *reply_port, *shared_port, *old_shared_port;
850     nxt_port_t           *proto_port;
851     nxt_port_msg_type_t  reply;
852 
853     reply_port = nxt_runtime_port_find(task->thread->runtime,
854                                        msg->port_msg.pid,
855                                        msg->port_msg.reply_port);
856     if (nxt_slow_path(reply_port == NULL)) {
857         nxt_alert(task, "app_restart_handler: reply port not found");
858         return;
859     }
860 
861     app_name.length = nxt_buf_mem_used_size(&msg->buf->mem);
862     app_name.start = msg->buf->mem.pos;
863 
864     nxt_debug(task, "app_restart_handler: %V", &app_name);
865 
866     app = nxt_router_app_find(&nxt_router->apps, &app_name);
867 
868     if (nxt_fast_path(app != NULL)) {
869         shared_port = nxt_port_new(task, NXT_SHARED_PORT_ID, nxt_pid,
870                                    NXT_PROCESS_APP);
871         if (nxt_slow_path(shared_port == NULL)) {
872             goto fail;
873         }
874 
875         ret = nxt_port_socket_init(task, shared_port, 0);
876         if (nxt_slow_path(ret != NXT_OK)) {
877             nxt_port_use(task, shared_port, -1);
878             goto fail;
879         }
880 
881         ret = nxt_router_app_queue_init(task, shared_port);
882         if (nxt_slow_path(ret != NXT_OK)) {
883             nxt_port_write_close(shared_port);
884             nxt_port_read_close(shared_port);
885             nxt_port_use(task, shared_port, -1);
886             goto fail;
887         }
888 
889         nxt_port_write_enable(task, shared_port);
890 
891         nxt_thread_mutex_lock(&app->mutex);
892 
893         proto_port = app->proto_port;
894 
895         if (proto_port != NULL) {
896             nxt_debug(task, "send QUIT to prototype '%V' pid %PI", &app->name,
897                       proto_port->pid);
898 
899             app->proto_port = NULL;
900             proto_port->app = NULL;
901         }
902 
903         app->generation++;
904 
905         shared_port->app = app;
906 
907         old_shared_port = app->shared_port;
908         old_shared_port->app = NULL;
909 
910         app->shared_port = shared_port;
911 
912         nxt_thread_mutex_unlock(&app->mutex);
913 
914         nxt_port_close(task, old_shared_port);
915         nxt_port_use(task, old_shared_port, -1);
916 
917         if (proto_port != NULL) {
918             (void) nxt_port_socket_write(task, proto_port, NXT_PORT_MSG_QUIT,
919                                          -1, 0, 0, NULL);
920 
921             nxt_port_close(task, proto_port);
922 
923             nxt_port_use(task, proto_port, -1);
924         }
925 
926         reply = NXT_PORT_MSG_RPC_READY_LAST;
927 
928     } else {
929 
930 fail:
931 
932         reply = NXT_PORT_MSG_RPC_ERROR;
933     }
934 
935     nxt_port_socket_write(task, reply_port, reply, -1, msg->port_msg.stream,
936                           0, NULL);
937 }
938 
939 
940 static void
941 nxt_router_app_process_remove_pid(nxt_task_t *task, nxt_port_t *port,
942     void *data)
943 {
944     union {
945         nxt_pid_t  removed_pid;
946         void       *data;
947     } u;
948 
949     u.data = data;
950 
951     nxt_port_rpc_remove_peer(task, port, u.removed_pid);
952 }
953 
954 
955 static void
956 nxt_router_remove_pid_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
957 {
958     nxt_event_engine_t  *engine;
959 
960     nxt_port_remove_pid_handler(task, msg);
961 
962     nxt_queue_each(engine, &nxt_router->engines, nxt_event_engine_t, link0)
963     {
964         if (nxt_fast_path(engine->port != NULL)) {
965             nxt_port_post(task, engine->port, nxt_router_app_process_remove_pid,
966                           msg->u.data);
967         }
968     }
969     nxt_queue_loop;
970 
971     if (msg->port_msg.stream == 0) {
972         return;
973     }
974 
975     msg->port_msg.type = _NXT_PORT_MSG_RPC_ERROR;
976 
977     nxt_port_rpc_handler(task, msg);
978 }
979 
980 
981 static nxt_router_temp_conf_t *
982 nxt_router_temp_conf(nxt_task_t *task)
983 {
984     nxt_mp_t                *mp, *tmp;
985     nxt_router_conf_t       *rtcf;
986     nxt_router_temp_conf_t  *tmcf;
987 
988     mp = nxt_mp_create(1024, 128, 256, 32);
989     if (nxt_slow_path(mp == NULL)) {
990         return NULL;
991     }
992 
993     rtcf = nxt_mp_zget(mp, sizeof(nxt_router_conf_t));
994     if (nxt_slow_path(rtcf == NULL)) {
995         goto fail;
996     }
997 
998     rtcf->mem_pool = mp;
999 
1000     tmp = nxt_mp_create(1024, 128, 256, 32);
1001     if (nxt_slow_path(tmp == NULL)) {
1002         goto fail;
1003     }
1004 
1005     tmcf = nxt_mp_zget(tmp, sizeof(nxt_router_temp_conf_t));
1006     if (nxt_slow_path(tmcf == NULL)) {
1007         goto temp_fail;
1008     }
1009 
1010     tmcf->mem_pool = tmp;
1011     tmcf->router_conf = rtcf;
1012     tmcf->count = 1;
1013     tmcf->engine = task->thread->engine;
1014 
1015     tmcf->engines = nxt_array_create(tmcf->mem_pool, 4,
1016                                      sizeof(nxt_router_engine_conf_t));
1017     if (nxt_slow_path(tmcf->engines == NULL)) {
1018         goto temp_fail;
1019     }
1020 
1021     nxt_queue_init(&creating_sockets);
1022     nxt_queue_init(&pending_sockets);
1023     nxt_queue_init(&updating_sockets);
1024     nxt_queue_init(&keeping_sockets);
1025     nxt_queue_init(&deleting_sockets);
1026 
1027 #if (NXT_TLS)
1028     nxt_queue_init(&tmcf->tls);
1029 #endif
1030 
1031     nxt_queue_init(&tmcf->apps);
1032     nxt_queue_init(&tmcf->previous);
1033 
1034     return tmcf;
1035 
1036 temp_fail:
1037 
1038     nxt_mp_destroy(tmp);
1039 
1040 fail:
1041 
1042     nxt_mp_destroy(mp);
1043 
1044     return NULL;
1045 }
1046 
1047 
1048 nxt_inline nxt_bool_t
1049 nxt_router_app_can_start(nxt_app_t *app)
1050 {
1051     return app->processes + app->pending_processes < app->max_processes
1052             && app->pending_processes < app->max_pending_processes;
1053 }
1054 
1055 
1056 nxt_inline nxt_bool_t
1057 nxt_router_app_need_start(nxt_app_t *app)
1058 {
1059     return (app->active_requests
1060               > app->port_hash_count + app->pending_processes)
1061            || (app->spare_processes
1062                 > app->idle_processes + app->pending_processes);
1063 }
1064 
1065 
1066 static void
1067 nxt_router_conf_apply(nxt_task_t *task, void *obj, void *data)
1068 {
1069     nxt_int_t                    ret;
1070     nxt_app_t                    *app;
1071     nxt_router_t                 *router;
1072     nxt_runtime_t                *rt;
1073     nxt_queue_link_t             *qlk;
1074     nxt_socket_conf_t            *skcf;
1075     nxt_router_conf_t            *rtcf;
1076     nxt_router_temp_conf_t       *tmcf;
1077     const nxt_event_interface_t  *interface;
1078 #if (NXT_TLS)
1079     nxt_router_tlssock_t         *tls;
1080 #endif
1081 
1082     tmcf = obj;
1083 
1084     qlk = nxt_queue_first(&pending_sockets);
1085 
1086     if (qlk != nxt_queue_tail(&pending_sockets)) {
1087         nxt_queue_remove(qlk);
1088         nxt_queue_insert_tail(&creating_sockets, qlk);
1089 
1090         skcf = nxt_queue_link_data(qlk, nxt_socket_conf_t, link);
1091 
1092         nxt_router_listen_socket_rpc_create(task, tmcf, skcf);
1093 
1094         return;
1095     }
1096 
1097 #if (NXT_TLS)
1098     qlk = nxt_queue_last(&tmcf->tls);
1099 
1100     if (qlk != nxt_queue_head(&tmcf->tls)) {
1101         nxt_queue_remove(qlk);
1102 
1103         tls = nxt_queue_link_data(qlk, nxt_router_tlssock_t, link);
1104 
1105         nxt_cert_store_get(task, &tls->name, tmcf->mem_pool,
1106                            nxt_router_tls_rpc_handler, tls);
1107         return;
1108     }
1109 #endif
1110 
1111     nxt_queue_each(app, &tmcf->apps, nxt_app_t, link) {
1112 
1113         if (nxt_router_app_need_start(app)) {
1114             nxt_router_app_rpc_create(task, tmcf, app);
1115             return;
1116         }
1117 
1118     } nxt_queue_loop;
1119 
1120     rtcf = tmcf->router_conf;
1121 
1122     if (rtcf->access_log != NULL && rtcf->access_log->fd == -1) {
1123         nxt_router_access_log_open(task, tmcf);
1124         return;
1125     }
1126 
1127     rt = task->thread->runtime;
1128 
1129     interface = nxt_service_get(rt->services, "engine", NULL);
1130 
1131     router = rtcf->router;
1132 
1133     ret = nxt_router_engines_create(task, router, tmcf, interface);
1134     if (nxt_slow_path(ret != NXT_OK)) {
1135         goto fail;
1136     }
1137 
1138     ret = nxt_router_threads_create(task, rt, tmcf);
1139     if (nxt_slow_path(ret != NXT_OK)) {
1140         goto fail;
1141     }
1142 
1143     nxt_router_apps_sort(task, router, tmcf);
1144 
1145     nxt_router_apps_hash_use(task, rtcf, 1);
1146 
1147     nxt_router_engines_post(router, tmcf);
1148 
1149     nxt_queue_add(&router->sockets, &updating_sockets);
1150     nxt_queue_add(&router->sockets, &creating_sockets);
1151 
1152     router->access_log = rtcf->access_log;
1153 
1154     nxt_router_conf_ready(task, tmcf);
1155 
1156     return;
1157 
1158 fail:
1159 
1160     nxt_router_conf_error(task, tmcf);
1161 
1162     return;
1163 }
1164 
1165 
1166 static void
1167 nxt_router_conf_wait(nxt_task_t *task, void *obj, void *data)
1168 {
1169     nxt_joint_job_t  *job;
1170 
1171     job = obj;
1172 
1173     nxt_router_conf_ready(task, job->tmcf);
1174 }
1175 
1176 
1177 static void
1178 nxt_router_conf_ready(nxt_task_t *task, nxt_router_temp_conf_t *tmcf)
1179 {
1180     uint32_t               count;
1181     nxt_router_conf_t      *rtcf;
1182     nxt_thread_spinlock_t  *lock;
1183 
1184     nxt_debug(task, "temp conf %p count: %D", tmcf, tmcf->count);
1185 
1186     if (--tmcf->count > 0) {
1187         return;
1188     }
1189 
1190     nxt_router_conf_send(task, tmcf, NXT_PORT_MSG_RPC_READY_LAST);
1191 
1192     rtcf = tmcf->router_conf;
1193 
1194     lock = &rtcf->router->lock;
1195 
1196     nxt_thread_spin_lock(lock);
1197 
1198     count = rtcf->count;
1199 
1200     nxt_thread_spin_unlock(lock);
1201 
1202     nxt_debug(task, "rtcf %p: %D", rtcf, count);
1203 
1204     if (count == 0) {
1205         nxt_router_apps_hash_use(task, rtcf, -1);
1206 
1207         nxt_router_access_log_release(task, lock, rtcf->access_log);
1208 
1209         nxt_mp_destroy(rtcf->mem_pool);
1210     }
1211 
1212     nxt_mp_release(tmcf->mem_pool);
1213 }
1214 
1215 
1216 static void
1217 nxt_router_conf_error(nxt_task_t *task, nxt_router_temp_conf_t *tmcf)
1218 {
1219     nxt_app_t          *app;
1220     nxt_queue_t        new_socket_confs;
1221     nxt_socket_t       s;
1222     nxt_router_t       *router;
1223     nxt_queue_link_t   *qlk;
1224     nxt_socket_conf_t  *skcf;
1225     nxt_router_conf_t  *rtcf;
1226 
1227     nxt_alert(task, "failed to apply new conf");
1228 
1229     for (qlk = nxt_queue_first(&creating_sockets);
1230          qlk != nxt_queue_tail(&creating_sockets);
1231          qlk = nxt_queue_next(qlk))
1232     {
1233         skcf = nxt_queue_link_data(qlk, nxt_socket_conf_t, link);
1234         s = skcf->listen->socket;
1235 
1236         if (s != -1) {
1237             nxt_socket_close(task, s);
1238         }
1239 
1240         nxt_free(skcf->listen);
1241     }
1242 
1243     nxt_queue_init(&new_socket_confs);
1244     nxt_queue_add(&new_socket_confs, &updating_sockets);
1245     nxt_queue_add(&new_socket_confs, &pending_sockets);
1246     nxt_queue_add(&new_socket_confs, &creating_sockets);
1247 
1248     rtcf = tmcf->router_conf;
1249 
1250     nxt_queue_each(app, &tmcf->apps, nxt_app_t, link) {
1251 
1252         nxt_router_app_unlink(task, app);
1253 
1254     } nxt_queue_loop;
1255 
1256     router = rtcf->router;
1257 
1258     nxt_queue_add(&router->sockets, &keeping_sockets);
1259     nxt_queue_add(&router->sockets, &deleting_sockets);
1260 
1261     nxt_queue_add(&router->apps, &tmcf->previous);
1262 
1263     // TODO: new engines and threads
1264 
1265     nxt_router_access_log_release(task, &router->lock, rtcf->access_log);
1266 
1267     nxt_mp_destroy(rtcf->mem_pool);
1268 
1269     nxt_router_conf_send(task, tmcf, NXT_PORT_MSG_RPC_ERROR);
1270 
1271     nxt_mp_release(tmcf->mem_pool);
1272 }
1273 
1274 
1275 static void
1276 nxt_router_conf_send(nxt_task_t *task, nxt_router_temp_conf_t *tmcf,
1277     nxt_port_msg_type_t type)
1278 {
1279     nxt_port_socket_write(task, tmcf->port, type, -1, tmcf->stream, 0, NULL);
1280 
1281     nxt_port_use(task, tmcf->port, -1);
1282 
1283     tmcf->port = NULL;
1284 }
1285 
1286 
1287 static nxt_conf_map_t  nxt_router_conf[] = {
1288     {
1289         nxt_string("listeners_threads"),
1290         NXT_CONF_MAP_INT32,
1291         offsetof(nxt_router_conf_t, threads),
1292     },
1293 };
1294 
1295 
1296 static nxt_conf_map_t  nxt_router_app_conf[] = {
1297     {
1298         nxt_string("type"),
1299         NXT_CONF_MAP_STR,
1300         offsetof(nxt_router_app_conf_t, type),
1301     },
1302 
1303     {
1304         nxt_string("limits"),
1305         NXT_CONF_MAP_PTR,
1306         offsetof(nxt_router_app_conf_t, limits_value),
1307     },
1308 
1309     {
1310         nxt_string("processes"),
1311         NXT_CONF_MAP_INT32,
1312         offsetof(nxt_router_app_conf_t, processes),
1313     },
1314 
1315     {
1316         nxt_string("processes"),
1317         NXT_CONF_MAP_PTR,
1318         offsetof(nxt_router_app_conf_t, processes_value),
1319     },
1320 
1321     {
1322         nxt_string("targets"),
1323         NXT_CONF_MAP_PTR,
1324         offsetof(nxt_router_app_conf_t, targets_value),
1325     },
1326 };
1327 
1328 
1329 static nxt_conf_map_t  nxt_router_app_limits_conf[] = {
1330     {
1331         nxt_string("timeout"),
1332         NXT_CONF_MAP_MSEC,
1333         offsetof(nxt_router_app_conf_t, timeout),
1334     },
1335 };
1336 
1337 
1338 static nxt_conf_map_t  nxt_router_app_processes_conf[] = {
1339     {
1340         nxt_string("spare"),
1341         NXT_CONF_MAP_INT32,
1342         offsetof(nxt_router_app_conf_t, spare_processes),
1343     },
1344 
1345     {
1346         nxt_string("max"),
1347         NXT_CONF_MAP_INT32,
1348         offsetof(nxt_router_app_conf_t, max_processes),
1349     },
1350 
1351     {
1352         nxt_string("idle_timeout"),
1353         NXT_CONF_MAP_MSEC,
1354         offsetof(nxt_router_app_conf_t, idle_timeout),
1355     },
1356 };
1357 
1358 
1359 static nxt_conf_map_t  nxt_router_listener_conf[] = {
1360     {
1361         nxt_string("pass"),
1362         NXT_CONF_MAP_STR_COPY,
1363         offsetof(nxt_router_listener_conf_t, pass),
1364     },
1365 
1366     {
1367         nxt_string("application"),
1368         NXT_CONF_MAP_STR_COPY,
1369         offsetof(nxt_router_listener_conf_t, application),
1370     },
1371 };
1372 
1373 
1374 static nxt_conf_map_t  nxt_router_http_conf[] = {
1375     {
1376         nxt_string("header_buffer_size"),
1377         NXT_CONF_MAP_SIZE,
1378         offsetof(nxt_socket_conf_t, header_buffer_size),
1379     },
1380 
1381     {
1382         nxt_string("large_header_buffer_size"),
1383         NXT_CONF_MAP_SIZE,
1384         offsetof(nxt_socket_conf_t, large_header_buffer_size),
1385     },
1386 
1387     {
1388         nxt_string("large_header_buffers"),
1389         NXT_CONF_MAP_SIZE,
1390         offsetof(nxt_socket_conf_t, large_header_buffers),
1391     },
1392 
1393     {
1394         nxt_string("body_buffer_size"),
1395         NXT_CONF_MAP_SIZE,
1396         offsetof(nxt_socket_conf_t, body_buffer_size),
1397     },
1398 
1399     {
1400         nxt_string("max_body_size"),
1401         NXT_CONF_MAP_SIZE,
1402         offsetof(nxt_socket_conf_t, max_body_size),
1403     },
1404 
1405     {
1406         nxt_string("idle_timeout"),
1407         NXT_CONF_MAP_MSEC,
1408         offsetof(nxt_socket_conf_t, idle_timeout),
1409     },
1410 
1411     {
1412         nxt_string("header_read_timeout"),
1413         NXT_CONF_MAP_MSEC,
1414         offsetof(nxt_socket_conf_t, header_read_timeout),
1415     },
1416 
1417     {
1418         nxt_string("body_read_timeout"),
1419         NXT_CONF_MAP_MSEC,
1420         offsetof(nxt_socket_conf_t, body_read_timeout),
1421     },
1422 
1423     {
1424         nxt_string("send_timeout"),
1425         NXT_CONF_MAP_MSEC,
1426         offsetof(nxt_socket_conf_t, send_timeout),
1427     },
1428 
1429     {
1430         nxt_string("body_temp_path"),
1431         NXT_CONF_MAP_STR,
1432         offsetof(nxt_socket_conf_t, body_temp_path),
1433     },
1434 
1435     {
1436         nxt_string("discard_unsafe_fields"),
1437         NXT_CONF_MAP_INT8,
1438         offsetof(nxt_socket_conf_t, discard_unsafe_fields),
1439     },
1440 };
1441 
1442 
1443 static nxt_conf_map_t  nxt_router_websocket_conf[] = {
1444     {
1445         nxt_string("max_frame_size"),
1446         NXT_CONF_MAP_SIZE,
1447         offsetof(nxt_websocket_conf_t, max_frame_size),
1448     },
1449 
1450     {
1451         nxt_string("read_timeout"),
1452         NXT_CONF_MAP_MSEC,
1453         offsetof(nxt_websocket_conf_t, read_timeout),
1454     },
1455 
1456     {
1457         nxt_string("keepalive_interval"),
1458         NXT_CONF_MAP_MSEC,
1459         offsetof(nxt_websocket_conf_t, keepalive_interval),
1460     },
1461 
1462 };
1463 
1464 
1465 static nxt_int_t
1466 nxt_router_conf_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf,
1467     u_char *start, u_char *end)
1468 {
1469     u_char                      *p;
1470     size_t                      size;
1471     nxt_mp_t                    *mp, *app_mp;
1472     uint32_t                    next, next_target;
1473     nxt_int_t                   ret;
1474     nxt_str_t                   name, path, target;
1475     nxt_app_t                   *app, *prev;
1476     nxt_str_t                   *t, *s, *targets;
1477     nxt_uint_t                  n, i;
1478     nxt_port_t                  *port;
1479     nxt_router_t                *router;
1480     nxt_app_joint_t             *app_joint;
1481 #if (NXT_TLS)
1482     nxt_tls_init_t              *tls_init;
1483     nxt_conf_value_t            *certificate;
1484 #endif
1485     nxt_conf_value_t            *conf, *http, *value, *websocket;
1486     nxt_conf_value_t            *applications, *application;
1487     nxt_conf_value_t            *listeners, *listener;
1488     nxt_conf_value_t            *routes_conf, *static_conf, *client_ip_conf;
1489     nxt_socket_conf_t           *skcf;
1490     nxt_http_routes_t           *routes;
1491     nxt_event_engine_t          *engine;
1492     nxt_app_lang_module_t       *lang;
1493     nxt_router_app_conf_t       apcf;
1494     nxt_router_access_log_t     *access_log;
1495     nxt_router_listener_conf_t  lscf;
1496 
1497     static nxt_str_t  http_path = nxt_string("/settings/http");
1498     static nxt_str_t  applications_path = nxt_string("/applications");
1499     static nxt_str_t  listeners_path = nxt_string("/listeners");
1500     static nxt_str_t  routes_path = nxt_string("/routes");
1501     static nxt_str_t  access_log_path = nxt_string("/access_log");
1502 #if (NXT_TLS)
1503     static nxt_str_t  certificate_path = nxt_string("/tls/certificate");
1504     static nxt_str_t  conf_commands_path = nxt_string("/tls/conf_commands");
1505     static nxt_str_t  conf_cache_path = nxt_string("/tls/session/cache_size");
1506     static nxt_str_t  conf_timeout_path = nxt_string("/tls/session/timeout");
1507     static nxt_str_t  conf_tickets = nxt_string("/tls/session/tickets");
1508 #endif
1509     static nxt_str_t  static_path = nxt_string("/settings/http/static");
1510     static nxt_str_t  websocket_path = nxt_string("/settings/http/websocket");
1511     static nxt_str_t  client_ip_path = nxt_string("/client_ip");
1512 
1513     conf = nxt_conf_json_parse(tmcf->mem_pool, start, end, NULL);
1514     if (conf == NULL) {
1515         nxt_alert(task, "configuration parsing error");
1516         return NXT_ERROR;
1517     }
1518 
1519     mp = tmcf->router_conf->mem_pool;
1520 
1521     ret = nxt_conf_map_object(mp, conf, nxt_router_conf,
1522                               nxt_nitems(nxt_router_conf), tmcf->router_conf);
1523     if (ret != NXT_OK) {
1524         nxt_alert(task, "root map error");
1525         return NXT_ERROR;
1526     }
1527 
1528     if (tmcf->router_conf->threads == 0) {
1529         tmcf->router_conf->threads = nxt_ncpu;
1530     }
1531 
1532     static_conf = nxt_conf_get_path(conf, &static_path);
1533 
1534     ret = nxt_router_conf_process_static(task, tmcf->router_conf, static_conf);
1535     if (nxt_slow_path(ret != NXT_OK)) {
1536         return NXT_ERROR;
1537     }
1538 
1539     router = tmcf->router_conf->router;
1540 
1541     applications = nxt_conf_get_path(conf, &applications_path);
1542 
1543     if (applications != NULL) {
1544         next = 0;
1545 
1546         for ( ;; ) {
1547             application = nxt_conf_next_object_member(applications,
1548                                                       &name, &next);
1549             if (application == NULL) {
1550                 break;
1551             }
1552 
1553             nxt_debug(task, "application \"%V\"", &name);
1554 
1555             size = nxt_conf_json_length(application, NULL);
1556 
1557             app_mp = nxt_mp_create(4096, 128, 1024, 64);
1558             if (nxt_slow_path(app_mp == NULL)) {
1559                 goto fail;
1560             }
1561 
1562             app = nxt_mp_get(app_mp, sizeof(nxt_app_t) + name.length + size);
1563             if (app == NULL) {
1564                 goto app_fail;
1565             }
1566 
1567             nxt_memzero(app, sizeof(nxt_app_t));
1568 
1569             app->mem_pool = app_mp;
1570 
1571             app->name.start = nxt_pointer_to(app, sizeof(nxt_app_t));
1572             app->conf.start = nxt_pointer_to(app, sizeof(nxt_app_t)
1573                                                   + name.length);
1574 
1575             p = nxt_conf_json_print(app->conf.start, application, NULL);
1576             app->conf.length = p - app->conf.start;
1577 
1578             nxt_assert(app->conf.length <= size);
1579 
1580             nxt_debug(task, "application conf \"%V\"", &app->conf);
1581 
1582             prev = nxt_router_app_find(&router->apps, &name);
1583 
1584             if (prev != NULL && nxt_strstr_eq(&app->conf, &prev->conf)) {
1585                 nxt_mp_destroy(app_mp);
1586 
1587                 nxt_queue_remove(&prev->link);
1588                 nxt_queue_insert_tail(&tmcf->previous, &prev->link);
1589 
1590                 ret = nxt_router_apps_hash_add(tmcf->router_conf, prev);
1591                 if (nxt_slow_path(ret != NXT_OK)) {
1592                     goto fail;
1593                 }
1594 
1595                 continue;
1596             }
1597 
1598             apcf.processes = 1;
1599             apcf.max_processes = 1;
1600             apcf.spare_processes = 0;
1601             apcf.timeout = 0;
1602             apcf.idle_timeout = 15000;
1603             apcf.limits_value = NULL;
1604             apcf.processes_value = NULL;
1605             apcf.targets_value = NULL;
1606 
1607             app_joint = nxt_malloc(sizeof(nxt_app_joint_t));
1608             if (nxt_slow_path(app_joint == NULL)) {
1609                 goto app_fail;
1610             }
1611 
1612             nxt_memzero(app_joint, sizeof(nxt_app_joint_t));
1613 
1614             ret = nxt_conf_map_object(mp, application, nxt_router_app_conf,
1615                                       nxt_nitems(nxt_router_app_conf), &apcf);
1616             if (ret != NXT_OK) {
1617                 nxt_alert(task, "application map error");
1618                 goto app_fail;
1619             }
1620 
1621             if (apcf.limits_value != NULL) {
1622 
1623                 if (nxt_conf_type(apcf.limits_value) != NXT_CONF_OBJECT) {
1624                     nxt_alert(task, "application limits is not object");
1625                     goto app_fail;
1626                 }
1627 
1628                 ret = nxt_conf_map_object(mp, apcf.limits_value,
1629                                         nxt_router_app_limits_conf,
1630                                         nxt_nitems(nxt_router_app_limits_conf),
1631                                         &apcf);
1632                 if (ret != NXT_OK) {
1633                     nxt_alert(task, "application limits map error");
1634                     goto app_fail;
1635                 }
1636             }
1637 
1638             if (apcf.processes_value != NULL
1639                 && nxt_conf_type(apcf.processes_value) == NXT_CONF_OBJECT)
1640             {
1641                 ret = nxt_conf_map_object(mp, apcf.processes_value,
1642                                      nxt_router_app_processes_conf,
1643                                      nxt_nitems(nxt_router_app_processes_conf),
1644                                      &apcf);
1645                 if (ret != NXT_OK) {
1646                     nxt_alert(task, "application processes map error");
1647                     goto app_fail;
1648                 }
1649 
1650             } else {
1651                 apcf.max_processes = apcf.processes;
1652                 apcf.spare_processes = apcf.processes;
1653             }
1654 
1655             if (apcf.targets_value != NULL) {
1656                 n = nxt_conf_object_members_count(apcf.targets_value);
1657 
1658                 targets = nxt_mp_get(app_mp, sizeof(nxt_str_t) * n);
1659                 if (nxt_slow_path(targets == NULL)) {
1660                     goto app_fail;
1661                 }
1662 
1663                 next_target = 0;
1664 
1665                 for (i = 0; i < n; i++) {
1666                     (void) nxt_conf_next_object_member(apcf.targets_value,
1667                                                        &target, &next_target);
1668 
1669                     s = nxt_str_dup(app_mp, &targets[i], &target);
1670                     if (nxt_slow_path(s == NULL)) {
1671                         goto app_fail;
1672                     }
1673                 }
1674 
1675             } else {
1676                 targets = NULL;
1677             }
1678 
1679             nxt_debug(task, "application type: %V", &apcf.type);
1680             nxt_debug(task, "application processes: %D", apcf.processes);
1681             nxt_debug(task, "application request timeout: %M", apcf.timeout);
1682 
1683             lang = nxt_app_lang_module(task->thread->runtime, &apcf.type);
1684 
1685             if (lang == NULL) {
1686                 nxt_alert(task, "unknown application type: \"%V\"", &apcf.type);
1687                 goto app_fail;
1688             }
1689 
1690             nxt_debug(task, "application language module: \"%s\"", lang->file);
1691 
1692             ret = nxt_thread_mutex_create(&app->mutex);
1693             if (ret != NXT_OK) {
1694                 goto app_fail;
1695             }
1696 
1697             nxt_queue_init(&app->ports);
1698             nxt_queue_init(&app->spare_ports);
1699             nxt_queue_init(&app->idle_ports);
1700             nxt_queue_init(&app->ack_waiting_req);
1701 
1702             app->name.length = name.length;
1703             nxt_memcpy(app->name.start, name.start, name.length);
1704 
1705             app->type = lang->type;
1706             app->max_processes = apcf.max_processes;
1707             app->spare_processes = apcf.spare_processes;
1708             app->max_pending_processes = apcf.spare_processes
1709                                          ? apcf.spare_processes : 1;
1710             app->timeout = apcf.timeout;
1711             app->idle_timeout = apcf.idle_timeout;
1712 
1713             app->targets = targets;
1714 
1715             engine = task->thread->engine;
1716 
1717             app->engine = engine;
1718 
1719             app->adjust_idle_work.handler = nxt_router_adjust_idle_timer;
1720             app->adjust_idle_work.task = &engine->task;
1721             app->adjust_idle_work.obj = app;
1722 
1723             nxt_queue_insert_tail(&tmcf->apps, &app->link);
1724 
1725             ret = nxt_router_apps_hash_add(tmcf->router_conf, app);
1726             if (nxt_slow_path(ret != NXT_OK)) {
1727                 goto app_fail;
1728             }
1729 
1730             nxt_router_app_use(task, app, 1);
1731 
1732             app->joint = app_joint;
1733 
1734             app_joint->use_count = 1;
1735             app_joint->app = app;
1736 
1737             app_joint->idle_timer.bias = NXT_TIMER_DEFAULT_BIAS;
1738             app_joint->idle_timer.work_queue = &engine->fast_work_queue;
1739             app_joint->idle_timer.handler = nxt_router_app_idle_timeout;
1740             app_joint->idle_timer.task = &engine->task;
1741             app_joint->idle_timer.log = app_joint->idle_timer.task->log;
1742 
1743             app_joint->free_app_work.handler = nxt_router_free_app;
1744             app_joint->free_app_work.task = &engine->task;
1745             app_joint->free_app_work.obj = app_joint;
1746 
1747             port = nxt_port_new(task, NXT_SHARED_PORT_ID, nxt_pid,
1748                                 NXT_PROCESS_APP);
1749             if (nxt_slow_path(port == NULL)) {
1750                 return NXT_ERROR;
1751             }
1752 
1753             ret = nxt_port_socket_init(task, port, 0);
1754             if (nxt_slow_path(ret != NXT_OK)) {
1755                 nxt_port_use(task, port, -1);
1756                 return NXT_ERROR;
1757             }
1758 
1759             ret = nxt_router_app_queue_init(task, port);
1760             if (nxt_slow_path(ret != NXT_OK)) {
1761                 nxt_port_write_close(port);
1762                 nxt_port_read_close(port);
1763                 nxt_port_use(task, port, -1);
1764                 return NXT_ERROR;
1765             }
1766 
1767             nxt_port_write_enable(task, port);
1768             port->app = app;
1769 
1770             app->shared_port = port;
1771 
1772             nxt_thread_mutex_create(&app->outgoing.mutex);
1773         }
1774     }
1775 
1776     routes_conf = nxt_conf_get_path(conf, &routes_path);
1777     if (nxt_fast_path(routes_conf != NULL)) {
1778         routes = nxt_http_routes_create(task, tmcf, routes_conf);
1779         if (nxt_slow_path(routes == NULL)) {
1780             return NXT_ERROR;
1781         }
1782         tmcf->router_conf->routes = routes;
1783     }
1784 
1785     ret = nxt_upstreams_create(task, tmcf, conf);
1786     if (nxt_slow_path(ret != NXT_OK)) {
1787         return ret;
1788     }
1789 
1790     http = nxt_conf_get_path(conf, &http_path);
1791 #if 0
1792     if (http == NULL) {
1793         nxt_alert(task, "no \"http\" block");
1794         return NXT_ERROR;
1795     }
1796 #endif
1797 
1798     websocket = nxt_conf_get_path(conf, &websocket_path);
1799 
1800     listeners = nxt_conf_get_path(conf, &listeners_path);
1801 
1802     if (listeners != NULL) {
1803         next = 0;
1804 
1805         for ( ;; ) {
1806             listener = nxt_conf_next_object_member(listeners, &name, &next);
1807             if (listener == NULL) {
1808                 break;
1809             }
1810 
1811             skcf = nxt_router_socket_conf(task, tmcf, &name);
1812             if (skcf == NULL) {
1813                 goto fail;
1814             }
1815 
1816             nxt_memzero(&lscf, sizeof(lscf));
1817 
1818             ret = nxt_conf_map_object(mp, listener, nxt_router_listener_conf,
1819                                       nxt_nitems(nxt_router_listener_conf),
1820                                       &lscf);
1821             if (ret != NXT_OK) {
1822                 nxt_alert(task, "listener map error");
1823                 goto fail;
1824             }
1825 
1826             nxt_debug(task, "application: %V", &lscf.application);
1827 
1828             // STUB, default values if http block is not defined.
1829             skcf->header_buffer_size = 2048;
1830             skcf->large_header_buffer_size = 8192;
1831             skcf->large_header_buffers = 4;
1832             skcf->discard_unsafe_fields = 1;
1833             skcf->body_buffer_size = 16 * 1024;
1834             skcf->max_body_size = 8 * 1024 * 1024;
1835             skcf->proxy_header_buffer_size = 64 * 1024;
1836             skcf->proxy_buffer_size = 4096;
1837             skcf->proxy_buffers = 256;
1838             skcf->idle_timeout = 180 * 1000;
1839             skcf->header_read_timeout = 30 * 1000;
1840             skcf->body_read_timeout = 30 * 1000;
1841             skcf->send_timeout = 30 * 1000;
1842             skcf->proxy_timeout = 60 * 1000;
1843             skcf->proxy_send_timeout = 30 * 1000;
1844             skcf->proxy_read_timeout = 30 * 1000;
1845 
1846             skcf->websocket_conf.max_frame_size = 1024 * 1024;
1847             skcf->websocket_conf.read_timeout = 60 * 1000;
1848             skcf->websocket_conf.keepalive_interval = 30 * 1000;
1849 
1850             nxt_str_null(&skcf->body_temp_path);
1851 
1852             if (http != NULL) {
1853                 ret = nxt_conf_map_object(mp, http, nxt_router_http_conf,
1854                                           nxt_nitems(nxt_router_http_conf),
1855                                           skcf);
1856                 if (ret != NXT_OK) {
1857                     nxt_alert(task, "http map error");
1858                     goto fail;
1859                 }
1860             }
1861 
1862             if (websocket != NULL) {
1863                 ret = nxt_conf_map_object(mp, websocket,
1864                                           nxt_router_websocket_conf,
1865                                           nxt_nitems(nxt_router_websocket_conf),
1866                                           &skcf->websocket_conf);
1867                 if (ret != NXT_OK) {
1868                     nxt_alert(task, "websocket map error");
1869                     goto fail;
1870                 }
1871             }
1872 
1873             t = &skcf->body_temp_path;
1874 
1875             if (t->length == 0) {
1876                 t->start = (u_char *) task->thread->runtime->tmp;
1877                 t->length = nxt_strlen(t->start);
1878             }
1879 
1880             client_ip_conf = nxt_conf_get_path(listener, &client_ip_path);
1881             ret = nxt_router_conf_process_client_ip(task, tmcf, skcf,
1882                                                     client_ip_conf);
1883             if (nxt_slow_path(ret != NXT_OK)) {
1884                 return NXT_ERROR;
1885             }
1886 
1887 #if (NXT_TLS)
1888             certificate = nxt_conf_get_path(listener, &certificate_path);
1889 
1890             if (certificate != NULL) {
1891                 tls_init = nxt_mp_get(tmcf->mem_pool, sizeof(nxt_tls_init_t));
1892                 if (nxt_slow_path(tls_init == NULL)) {
1893                     return NXT_ERROR;
1894                 }
1895 
1896                 tls_init->cache_size = 0;
1897                 tls_init->timeout = 300;
1898 
1899                 value = nxt_conf_get_path(listener, &conf_cache_path);
1900                 if (value != NULL) {
1901                     tls_init->cache_size = nxt_conf_get_number(value);
1902                 }
1903 
1904                 value = nxt_conf_get_path(listener, &conf_timeout_path);
1905                 if (value != NULL) {
1906                     tls_init->timeout = nxt_conf_get_number(value);
1907                 }
1908 
1909                 tls_init->conf_cmds = nxt_conf_get_path(listener,
1910                                                         &conf_commands_path);
1911 
1912                 tls_init->tickets_conf = nxt_conf_get_path(listener,
1913                                                            &conf_tickets);
1914 
1915                 if (nxt_conf_type(certificate) == NXT_CONF_ARRAY) {
1916                     n = nxt_conf_array_elements_count(certificate);
1917 
1918                     for (i = 0; i < n; i++) {
1919                         value = nxt_conf_get_array_element(certificate, i);
1920 
1921                         nxt_assert(value != NULL);
1922 
1923                         ret = nxt_router_conf_tls_insert(tmcf, value, skcf,
1924                                                          tls_init, i == 0);
1925                         if (nxt_slow_path(ret != NXT_OK)) {
1926                             goto fail;
1927                         }
1928                     }
1929 
1930                 } else {
1931                     /* NXT_CONF_STRING */
1932                     ret = nxt_router_conf_tls_insert(tmcf, certificate, skcf,
1933                                                      tls_init, 1);
1934                     if (nxt_slow_path(ret != NXT_OK)) {
1935                         goto fail;
1936                     }
1937                 }
1938             }
1939 #endif
1940 
1941             skcf->listen->handler = nxt_http_conn_init;
1942             skcf->router_conf = tmcf->router_conf;
1943             skcf->router_conf->count++;
1944 
1945             if (lscf.pass.length != 0) {
1946                 skcf->action = nxt_http_action_create(task, tmcf, &lscf.pass);
1947 
1948             /* COMPATIBILITY: listener application. */
1949             } else if (lscf.application.length > 0) {
1950                 skcf->action = nxt_http_pass_application(task,
1951                                                          tmcf->router_conf,
1952                                                          &lscf.application);
1953             }
1954 
1955             if (nxt_slow_path(skcf->action == NULL)) {
1956                 goto fail;
1957             }
1958         }
1959     }
1960 
1961     ret = nxt_http_routes_resolve(task, tmcf);
1962     if (nxt_slow_path(ret != NXT_OK)) {
1963         goto fail;
1964     }
1965 
1966     value = nxt_conf_get_path(conf, &access_log_path);
1967 
1968     if (value != NULL) {
1969         nxt_conf_get_string(value, &path);
1970 
1971         access_log = router->access_log;
1972 
1973         if (access_log != NULL && nxt_strstr_eq(&path, &access_log->path)) {
1974             nxt_thread_spin_lock(&router->lock);
1975             access_log->count++;
1976             nxt_thread_spin_unlock(&router->lock);
1977 
1978         } else {
1979             access_log = nxt_malloc(sizeof(nxt_router_access_log_t)
1980                                     + path.length);
1981             if (access_log == NULL) {
1982                 nxt_alert(task, "failed to allocate access log structure");
1983                 goto fail;
1984             }
1985 
1986             access_log->fd = -1;
1987             access_log->handler = &nxt_router_access_log_writer;
1988             access_log->count = 1;
1989 
1990             access_log->path.length = path.length;
1991             access_log->path.start = (u_char *) access_log
1992                                      + sizeof(nxt_router_access_log_t);
1993 
1994             nxt_memcpy(access_log->path.start, path.start, path.length);
1995         }
1996 
1997         tmcf->router_conf->access_log = access_log;
1998     }
1999 
2000     nxt_queue_add(&deleting_sockets, &router->sockets);
2001     nxt_queue_init(&router->sockets);
2002 
2003     return NXT_OK;
2004 
2005 app_fail:
2006 
2007     nxt_mp_destroy(app_mp);
2008 
2009 fail:
2010 
2011     nxt_queue_each(app, &tmcf->apps, nxt_app_t, link) {
2012 
2013         nxt_queue_remove(&app->link);
2014         nxt_thread_mutex_destroy(&app->mutex);
2015         nxt_mp_destroy(app->mem_pool);
2016 
2017     } nxt_queue_loop;
2018 
2019     return NXT_ERROR;
2020 }
2021 
2022 
2023 #if (NXT_TLS)
2024 
2025 static nxt_int_t
2026 nxt_router_conf_tls_insert(nxt_router_temp_conf_t *tmcf,
2027     nxt_conf_value_t *value, nxt_socket_conf_t *skcf,
2028     nxt_tls_init_t *tls_init, nxt_bool_t last)
2029 {
2030     nxt_router_tlssock_t  *tls;
2031 
2032     tls = nxt_mp_get(tmcf->mem_pool, sizeof(nxt_router_tlssock_t));
2033     if (nxt_slow_path(tls == NULL)) {
2034         return NXT_ERROR;
2035     }
2036 
2037     tls->tls_init = tls_init;
2038     tls->socket_conf = skcf;
2039     tls->temp_conf = tmcf;
2040     tls->last = last;
2041     nxt_conf_get_string(value, &tls->name);
2042 
2043     nxt_queue_insert_tail(&tmcf->tls, &tls->link);
2044 
2045     return NXT_OK;
2046 }
2047 
2048 #endif
2049 
2050 
2051 static nxt_int_t
2052 nxt_router_conf_process_static(nxt_task_t *task, nxt_router_conf_t *rtcf,
2053     nxt_conf_value_t *conf)
2054 {
2055     uint32_t          next, i;
2056     nxt_mp_t          *mp;
2057     nxt_str_t         *type, exten, str;
2058     nxt_int_t         ret;
2059     nxt_uint_t        exts;
2060     nxt_conf_value_t  *mtypes_conf, *ext_conf, *value;
2061 
2062     static nxt_str_t  mtypes_path = nxt_string("/mime_types");
2063 
2064     mp = rtcf->mem_pool;
2065 
2066     ret = nxt_http_static_mtypes_init(mp, &rtcf->mtypes_hash);
2067     if (nxt_slow_path(ret != NXT_OK)) {
2068         return NXT_ERROR;
2069     }
2070 
2071     if (conf == NULL) {
2072         return NXT_OK;
2073     }
2074 
2075     mtypes_conf = nxt_conf_get_path(conf, &mtypes_path);
2076 
2077     if (mtypes_conf != NULL) {
2078         next = 0;
2079 
2080         for ( ;; ) {
2081             ext_conf = nxt_conf_next_object_member(mtypes_conf, &str, &next);
2082 
2083             if (ext_conf == NULL) {
2084                 break;
2085             }
2086 
2087             type = nxt_str_dup(mp, NULL, &str);
2088             if (nxt_slow_path(type == NULL)) {
2089                 return NXT_ERROR;
2090             }
2091 
2092             if (nxt_conf_type(ext_conf) == NXT_CONF_STRING) {
2093                 nxt_conf_get_string(ext_conf, &str);
2094 
2095                 if (nxt_slow_path(nxt_str_dup(mp, &exten, &str) == NULL)) {
2096                     return NXT_ERROR;
2097                 }
2098 
2099                 ret = nxt_http_static_mtypes_hash_add(mp, &rtcf->mtypes_hash,
2100                                                       &exten, type);
2101                 if (nxt_slow_path(ret != NXT_OK)) {
2102                     return NXT_ERROR;
2103                 }
2104 
2105                 continue;
2106             }
2107 
2108             exts = nxt_conf_array_elements_count(ext_conf);
2109 
2110             for (i = 0; i < exts; i++) {
2111                 value = nxt_conf_get_array_element(ext_conf, i);
2112 
2113                 nxt_conf_get_string(value, &str);
2114 
2115                 if (nxt_slow_path(nxt_str_dup(mp, &exten, &str) == NULL)) {
2116                     return NXT_ERROR;
2117                 }
2118 
2119                 ret = nxt_http_static_mtypes_hash_add(mp, &rtcf->mtypes_hash,
2120                                                       &exten, type);
2121                 if (nxt_slow_path(ret != NXT_OK)) {
2122                     return NXT_ERROR;
2123                 }
2124             }
2125         }
2126     }
2127 
2128     return NXT_OK;
2129 }
2130 
2131 
2132 static nxt_int_t
2133 nxt_router_conf_process_client_ip(nxt_task_t *task, nxt_router_temp_conf_t *tmcf,
2134     nxt_socket_conf_t *skcf, nxt_conf_value_t *conf)
2135 {
2136     char                        c;
2137     size_t                      i;
2138     nxt_mp_t                    *mp;
2139     uint32_t                    hash;
2140     nxt_str_t                   header;
2141     nxt_conf_value_t            *source_conf, *header_conf, *recursive_conf;
2142     nxt_http_client_ip_t        *client_ip;
2143     nxt_http_route_addr_rule_t  *source;
2144 
2145     static nxt_str_t  header_path = nxt_string("/header");
2146     static nxt_str_t  source_path = nxt_string("/source");
2147     static nxt_str_t  recursive_path = nxt_string("/recursive");
2148 
2149     if (conf == NULL) {
2150         skcf->client_ip = NULL;
2151 
2152         return NXT_OK;
2153     }
2154 
2155     mp = tmcf->router_conf->mem_pool;
2156 
2157     source_conf = nxt_conf_get_path(conf, &source_path);
2158     header_conf = nxt_conf_get_path(conf, &header_path);
2159     recursive_conf = nxt_conf_get_path(conf, &recursive_path);
2160 
2161     if (source_conf == NULL || header_conf == NULL) {
2162         return NXT_ERROR;
2163     }
2164 
2165     client_ip = nxt_mp_zget(mp, sizeof(nxt_http_client_ip_t));
2166     if (nxt_slow_path(client_ip == NULL)) {
2167         return NXT_ERROR;
2168     }
2169 
2170     source = nxt_http_route_addr_rule_create(task, mp, source_conf);
2171     if (nxt_slow_path(source == NULL)) {
2172         return NXT_ERROR;
2173     }
2174 
2175     client_ip->source = source;
2176 
2177     nxt_conf_get_string(header_conf, &header);
2178 
2179     if (recursive_conf != NULL) {
2180         client_ip->recursive = nxt_conf_get_boolean(recursive_conf);
2181     }
2182 
2183     client_ip->header = nxt_str_dup(mp, NULL, &header);
2184     if (nxt_slow_path(client_ip->header == NULL)) {
2185         return NXT_ERROR;
2186     }
2187 
2188     hash = NXT_HTTP_FIELD_HASH_INIT;
2189 
2190     for (i = 0; i < client_ip->header->length; i++) {
2191         c = client_ip->header->start[i];
2192         hash = nxt_http_field_hash_char(hash, nxt_lowcase(c));
2193     }
2194 
2195     hash = nxt_http_field_hash_end(hash) & 0xFFFF;
2196 
2197     client_ip->header_hash = hash;
2198 
2199     skcf->client_ip = client_ip;
2200 
2201     return NXT_OK;
2202 }
2203 
2204 
2205 static nxt_app_t *
2206 nxt_router_app_find(nxt_queue_t *queue, nxt_str_t *name)
2207 {
2208     nxt_app_t  *app;
2209 
2210     nxt_queue_each(app, queue, nxt_app_t, link) {
2211 
2212         if (nxt_strstr_eq(name, &app->name)) {
2213             return app;
2214         }
2215 
2216     } nxt_queue_loop;
2217 
2218     return NULL;
2219 }
2220 
2221 
2222 static nxt_int_t
2223 nxt_router_app_queue_init(nxt_task_t *task, nxt_port_t *port)
2224 {
2225     void       *mem;
2226     nxt_int_t  fd;
2227 
2228     fd = nxt_shm_open(task, sizeof(nxt_app_queue_t));
2229     if (nxt_slow_path(fd == -1)) {
2230         return NXT_ERROR;
2231     }
2232 
2233     mem = nxt_mem_mmap(NULL, sizeof(nxt_app_queue_t),
2234                        PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
2235     if (nxt_slow_path(mem == MAP_FAILED)) {
2236         nxt_fd_close(fd);
2237 
2238         return NXT_ERROR;
2239     }
2240 
2241     nxt_app_queue_init(mem);
2242 
2243     port->queue_fd = fd;
2244     port->queue = mem;
2245 
2246     return NXT_OK;
2247 }
2248 
2249 
2250 static nxt_int_t
2251 nxt_router_port_queue_init(nxt_task_t *task, nxt_port_t *port)
2252 {
2253     void       *mem;
2254     nxt_int_t  fd;
2255 
2256     fd = nxt_shm_open(task, sizeof(nxt_port_queue_t));
2257     if (nxt_slow_path(fd == -1)) {
2258         return NXT_ERROR;
2259     }
2260 
2261     mem = nxt_mem_mmap(NULL, sizeof(nxt_port_queue_t),
2262                        PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
2263     if (nxt_slow_path(mem == MAP_FAILED)) {
2264         nxt_fd_close(fd);
2265 
2266         return NXT_ERROR;
2267     }
2268 
2269     nxt_port_queue_init(mem);
2270 
2271     port->queue_fd = fd;
2272     port->queue = mem;
2273 
2274     return NXT_OK;
2275 }
2276 
2277 
2278 static nxt_int_t
2279 nxt_router_port_queue_map(nxt_task_t *task, nxt_port_t *port, nxt_fd_t fd)
2280 {
2281     void  *mem;
2282 
2283     nxt_assert(fd != -1);
2284 
2285     mem = nxt_mem_mmap(NULL, sizeof(nxt_port_queue_t),
2286                        PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
2287     if (nxt_slow_path(mem == MAP_FAILED)) {
2288 
2289         return NXT_ERROR;
2290     }
2291 
2292     port->queue = mem;
2293 
2294     return NXT_OK;
2295 }
2296 
2297 
2298 static const nxt_lvlhsh_proto_t  nxt_router_apps_hash_proto  nxt_aligned(64) = {
2299     NXT_LVLHSH_DEFAULT,
2300     nxt_router_apps_hash_test,
2301     nxt_mp_lvlhsh_alloc,
2302     nxt_mp_lvlhsh_free,
2303 };
2304 
2305 
2306 static nxt_int_t
2307 nxt_router_apps_hash_test(nxt_lvlhsh_query_t *lhq, void *data)
2308 {
2309     nxt_app_t  *app;
2310 
2311     app = data;
2312 
2313     return nxt_strstr_eq(&lhq->key, &app->name) ? NXT_OK : NXT_DECLINED;
2314 }
2315 
2316 
2317 static nxt_int_t
2318 nxt_router_apps_hash_add(nxt_router_conf_t *rtcf, nxt_app_t *app)
2319 {
2320     nxt_lvlhsh_query_t  lhq;
2321 
2322     lhq.key_hash = nxt_djb_hash(app->name.start, app->name.length);
2323     lhq.replace = 0;
2324     lhq.key = app->name;
2325     lhq.value = app;
2326     lhq.proto = &nxt_router_apps_hash_proto;
2327     lhq.pool = rtcf->mem_pool;
2328 
2329     switch (nxt_lvlhsh_insert(&rtcf->apps_hash, &lhq)) {
2330 
2331     case NXT_OK:
2332         return NXT_OK;
2333 
2334     case NXT_DECLINED:
2335         nxt_thread_log_alert("router app hash adding failed: "
2336                              "\"%V\" is already in hash", &lhq.key);
2337         /* Fall through. */
2338     default:
2339         return NXT_ERROR;
2340     }
2341 }
2342 
2343 
2344 static nxt_app_t *
2345 nxt_router_apps_hash_get(nxt_router_conf_t *rtcf, nxt_str_t *name)
2346 {
2347     nxt_lvlhsh_query_t  lhq;
2348 
2349     lhq.key_hash = nxt_djb_hash(name->start, name->length);
2350     lhq.key = *name;
2351     lhq.proto = &nxt_router_apps_hash_proto;
2352 
2353     if (nxt_lvlhsh_find(&rtcf->apps_hash, &lhq) != NXT_OK) {
2354         return NULL;
2355     }
2356 
2357     return lhq.value;
2358 }
2359 
2360 
2361 static void
2362 nxt_router_apps_hash_use(nxt_task_t *task, nxt_router_conf_t *rtcf, int i)
2363 {
2364     nxt_app_t          *app;
2365     nxt_lvlhsh_each_t  lhe;
2366 
2367     nxt_lvlhsh_each_init(&lhe, &nxt_router_apps_hash_proto);
2368 
2369     for ( ;; ) {
2370         app = nxt_lvlhsh_each(&rtcf->apps_hash, &lhe);
2371 
2372         if (app == NULL) {
2373             break;
2374         }
2375 
2376         nxt_router_app_use(task, app, i);
2377     }
2378 }
2379 
2380 
2381 typedef struct {
2382     nxt_app_t  *app;
2383     nxt_int_t  target;
2384 } nxt_http_app_conf_t;
2385 
2386 
2387 nxt_int_t
2388 nxt_router_application_init(nxt_router_conf_t *rtcf, nxt_str_t *name,
2389     nxt_str_t *target, nxt_http_action_t *action)
2390 {
2391     nxt_app_t            *app;
2392     nxt_str_t            *targets;
2393     nxt_uint_t           i;
2394     nxt_http_app_conf_t  *conf;
2395 
2396     app = nxt_router_apps_hash_get(rtcf, name);
2397     if (app == NULL) {
2398         return NXT_DECLINED;
2399     }
2400 
2401     conf = nxt_mp_get(rtcf->mem_pool, sizeof(nxt_http_app_conf_t));
2402     if (nxt_slow_path(conf == NULL)) {
2403         return NXT_ERROR;
2404     }
2405 
2406     action->handler = nxt_http_application_handler;
2407     action->u.conf = conf;
2408 
2409     conf->app = app;
2410 
2411     if (target != NULL && target->length != 0) {
2412         targets = app->targets;
2413 
2414         for (i = 0; !nxt_strstr_eq(target, &targets[i]); i++);
2415 
2416         conf->target = i;
2417 
2418     } else {
2419         conf->target = 0;
2420     }
2421 
2422     return NXT_OK;
2423 }
2424 
2425 
2426 static nxt_socket_conf_t *
2427 nxt_router_socket_conf(nxt_task_t *task, nxt_router_temp_conf_t *tmcf,
2428     nxt_str_t *name)
2429 {
2430     size_t               size;
2431     nxt_int_t            ret;
2432     nxt_bool_t           wildcard;
2433     nxt_sockaddr_t       *sa;
2434     nxt_socket_conf_t    *skcf;
2435     nxt_listen_socket_t  *ls;
2436 
2437     sa = nxt_sockaddr_parse(tmcf->mem_pool, name);
2438     if (nxt_slow_path(sa == NULL)) {
2439         nxt_alert(task, "invalid listener \"%V\"", name);
2440         return NULL;
2441     }
2442 
2443     sa->type = SOCK_STREAM;
2444 
2445     nxt_debug(task, "router listener: \"%*s\"",
2446               (size_t) sa->length, nxt_sockaddr_start(sa));
2447 
2448     skcf = nxt_mp_zget(tmcf->router_conf->mem_pool, sizeof(nxt_socket_conf_t));
2449     if (nxt_slow_path(skcf == NULL)) {
2450         return NULL;
2451     }
2452 
2453     size = nxt_sockaddr_size(sa);
2454 
2455     ret = nxt_router_listen_socket_find(tmcf, skcf, sa);
2456 
2457     if (ret != NXT_OK) {
2458 
2459         ls = nxt_zalloc(sizeof(nxt_listen_socket_t) + size);
2460         if (nxt_slow_path(ls == NULL)) {
2461             return NULL;
2462         }
2463 
2464         skcf->listen = ls;
2465 
2466         ls->sockaddr = nxt_pointer_to(ls, sizeof(nxt_listen_socket_t));
2467         nxt_memcpy(ls->sockaddr, sa, size);
2468 
2469         nxt_listen_socket_remote_size(ls);
2470 
2471         ls->socket = -1;
2472         ls->backlog = NXT_LISTEN_BACKLOG;
2473         ls->flags = NXT_NONBLOCK;
2474         ls->read_after_accept = 1;
2475     }
2476 
2477     switch (sa->u.sockaddr.sa_family) {
2478 #if (NXT_HAVE_UNIX_DOMAIN)
2479     case AF_UNIX:
2480         wildcard = 0;
2481         break;
2482 #endif
2483 #if (NXT_INET6)
2484     case AF_INET6:
2485         wildcard = IN6_IS_ADDR_UNSPECIFIED(&sa->u.sockaddr_in6.sin6_addr);
2486         break;
2487 #endif
2488     case AF_INET:
2489     default:
2490         wildcard = (sa->u.sockaddr_in.sin_addr.s_addr == INADDR_ANY);
2491         break;
2492     }
2493 
2494     if (!wildcard) {
2495         skcf->sockaddr = nxt_mp_zget(tmcf->router_conf->mem_pool, size);
2496         if (nxt_slow_path(skcf->sockaddr == NULL)) {
2497             return NULL;
2498         }
2499 
2500         nxt_memcpy(skcf->sockaddr, sa, size);
2501     }
2502 
2503     return skcf;
2504 }
2505 
2506 
2507 static nxt_int_t
2508 nxt_router_listen_socket_find(nxt_router_temp_conf_t *tmcf,
2509     nxt_socket_conf_t *nskcf, nxt_sockaddr_t *sa)
2510 {
2511     nxt_router_t       *router;
2512     nxt_queue_link_t   *qlk;
2513     nxt_socket_conf_t  *skcf;
2514 
2515     router = tmcf->router_conf->router;
2516 
2517     for (qlk = nxt_queue_first(&router->sockets);
2518          qlk != nxt_queue_tail(&router->sockets);
2519          qlk = nxt_queue_next(qlk))
2520     {
2521         skcf = nxt_queue_link_data(qlk, nxt_socket_conf_t, link);
2522 
2523         if (nxt_sockaddr_cmp(skcf->listen->sockaddr, sa)) {
2524             nskcf->listen = skcf->listen;
2525 
2526             nxt_queue_remove(qlk);
2527             nxt_queue_insert_tail(&keeping_sockets, qlk);
2528 
2529             nxt_queue_insert_tail(&updating_sockets, &nskcf->link);
2530 
2531             return NXT_OK;
2532         }
2533     }
2534 
2535     nxt_queue_insert_tail(&pending_sockets, &nskcf->link);
2536 
2537     return NXT_DECLINED;
2538 }
2539 
2540 
2541 static void
2542 nxt_router_listen_socket_rpc_create(nxt_task_t *task,
2543     nxt_router_temp_conf_t *tmcf, nxt_socket_conf_t *skcf)
2544 {
2545     size_t            size;
2546     uint32_t          stream;
2547     nxt_int_t         ret;
2548     nxt_buf_t         *b;
2549     nxt_port_t        *main_port, *router_port;
2550     nxt_runtime_t     *rt;
2551     nxt_socket_rpc_t  *rpc;
2552 
2553     rpc = nxt_mp_alloc(tmcf->mem_pool, sizeof(nxt_socket_rpc_t));
2554     if (rpc == NULL) {
2555         goto fail;
2556     }
2557 
2558     rpc->socket_conf = skcf;
2559     rpc->temp_conf = tmcf;
2560 
2561     size = nxt_sockaddr_size(skcf->listen->sockaddr);
2562 
2563     b = nxt_buf_mem_alloc(tmcf->mem_pool, size, 0);
2564     if (b == NULL) {
2565         goto fail;
2566     }
2567 
2568     b->completion_handler = nxt_buf_dummy_completion;
2569 
2570     b->mem.free = nxt_cpymem(b->mem.free, skcf->listen->sockaddr, size);
2571 
2572     rt = task->thread->runtime;
2573     main_port = rt->port_by_type[NXT_PROCESS_MAIN];
2574     router_port = rt->port_by_type[NXT_PROCESS_ROUTER];
2575 
2576     stream = nxt_port_rpc_register_handler(task, router_port,
2577                                            nxt_router_listen_socket_ready,
2578                                            nxt_router_listen_socket_error,
2579                                            main_port->pid, rpc);
2580     if (nxt_slow_path(stream == 0)) {
2581         goto fail;
2582     }
2583 
2584     ret = nxt_port_socket_write(task, main_port, NXT_PORT_MSG_SOCKET, -1,
2585                                 stream, router_port->id, b);
2586 
2587     if (nxt_slow_path(ret != NXT_OK)) {
2588         nxt_port_rpc_cancel(task, router_port, stream);
2589         goto fail;
2590     }
2591 
2592     return;
2593 
2594 fail:
2595 
2596     nxt_router_conf_error(task, tmcf);
2597 }
2598 
2599 
2600 static void
2601 nxt_router_listen_socket_ready(nxt_task_t *task, nxt_port_recv_msg_t *msg,
2602     void *data)
2603 {
2604     nxt_int_t         ret;
2605     nxt_socket_t      s;
2606     nxt_socket_rpc_t  *rpc;
2607 
2608     rpc = data;
2609 
2610     s = msg->fd[0];
2611 
2612     ret = nxt_socket_nonblocking(task, s);
2613     if (nxt_slow_path(ret != NXT_OK)) {
2614         goto fail;
2615     }
2616 
2617     nxt_socket_defer_accept(task, s, rpc->socket_conf->listen->sockaddr);
2618 
2619     ret = nxt_listen_socket(task, s, NXT_LISTEN_BACKLOG);
2620     if (nxt_slow_path(ret != NXT_OK)) {
2621         goto fail;
2622     }
2623 
2624     rpc->socket_conf->listen->socket = s;
2625 
2626     nxt_work_queue_add(&task->thread->engine->fast_work_queue,
2627                        nxt_router_conf_apply, task, rpc->temp_conf, NULL);
2628 
2629     return;
2630 
2631 fail:
2632 
2633     nxt_socket_close(task, s);
2634 
2635     nxt_router_conf_error(task, rpc->temp_conf);
2636 }
2637 
2638 
2639 static void
2640 nxt_router_listen_socket_error(nxt_task_t *task, nxt_port_recv_msg_t *msg,
2641     void *data)
2642 {
2643     nxt_socket_rpc_t        *rpc;
2644     nxt_router_temp_conf_t  *tmcf;
2645 
2646     rpc = data;
2647     tmcf = rpc->temp_conf;
2648 
2649 #if 0
2650     u_char                  *p;
2651     size_t                  size;
2652     uint8_t                 error;
2653     nxt_buf_t               *in, *out;
2654     nxt_sockaddr_t          *sa;
2655 
2656     static nxt_str_t  socket_errors[] = {
2657         nxt_string("ListenerSystem"),
2658         nxt_string("ListenerNoIPv6"),
2659         nxt_string("ListenerPort"),
2660         nxt_string("ListenerInUse"),
2661         nxt_string("ListenerNoAddress"),
2662         nxt_string("ListenerNoAccess"),
2663         nxt_string("ListenerPath"),
2664     };
2665 
2666     sa = rpc->socket_conf->listen->sockaddr;
2667 
2668     in = nxt_buf_chk_make_plain(tmcf->mem_pool, msg->buf, msg->size);
2669 
2670     if (nxt_slow_path(in == NULL)) {
2671         return;
2672     }
2673 
2674     p = in->mem.pos;
2675 
2676     error = *p++;
2677 
2678     size = nxt_length("listen socket error: ")
2679            + nxt_length("{listener: \"\", code:\"\", message: \"\"}")
2680            + sa->length + socket_errors[error].length + (in->mem.free - p);
2681 
2682     out = nxt_buf_mem_alloc(tmcf->mem_pool, size, 0);
2683     if (nxt_slow_path(out == NULL)) {
2684         return;
2685     }
2686 
2687     out->mem.free = nxt_sprintf(out->mem.free, out->mem.end,
2688                         "listen socket error: "
2689                         "{listener: \"%*s\", code:\"%V\", message: \"%*s\"}",
2690                         (size_t) sa->length, nxt_sockaddr_start(sa),
2691                         &socket_errors[error], in->mem.free - p, p);
2692 
2693     nxt_debug(task, "%*s", out->mem.free - out->mem.pos, out->mem.pos);
2694 #endif
2695 
2696     nxt_router_conf_error(task, tmcf);
2697 }
2698 
2699 
2700 #if (NXT_TLS)
2701 
2702 static void
2703 nxt_router_tls_rpc_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg,
2704     void *data)
2705 {
2706     nxt_mp_t                *mp;
2707     nxt_int_t               ret;
2708     nxt_tls_conf_t          *tlscf;
2709     nxt_router_tlssock_t    *tls;
2710     nxt_tls_bundle_conf_t   *bundle;
2711     nxt_router_temp_conf_t  *tmcf;
2712 
2713     nxt_debug(task, "tls rpc handler");
2714 
2715     tls = data;
2716     tmcf = tls->temp_conf;
2717 
2718     if (msg == NULL || msg->port_msg.type == _NXT_PORT_MSG_RPC_ERROR) {
2719         goto fail;
2720     }
2721 
2722     mp = tmcf->router_conf->mem_pool;
2723 
2724     if (tls->socket_conf->tls == NULL){
2725         tlscf = nxt_mp_zget(mp, sizeof(nxt_tls_conf_t));
2726         if (nxt_slow_path(tlscf == NULL)) {
2727             goto fail;
2728         }
2729 
2730         tlscf->no_wait_shutdown = 1;
2731         tls->socket_conf->tls = tlscf;
2732 
2733     } else {
2734         tlscf = tls->socket_conf->tls;
2735     }
2736 
2737     tls->tls_init->conf = tlscf;
2738 
2739     bundle = nxt_mp_get(mp, sizeof(nxt_tls_bundle_conf_t));
2740     if (nxt_slow_path(bundle == NULL)) {
2741         goto fail;
2742     }
2743 
2744     if (nxt_slow_path(nxt_str_dup(mp, &bundle->name, &tls->name) == NULL)) {
2745         goto fail;
2746     }
2747 
2748     bundle->chain_file = msg->fd[0];
2749     bundle->next = tlscf->bundle;
2750     tlscf->bundle = bundle;
2751 
2752     ret = task->thread->runtime->tls->server_init(task, mp, tls->tls_init,
2753                                                   tls->last);
2754     if (nxt_slow_path(ret != NXT_OK)) {
2755         goto fail;
2756     }
2757 
2758     nxt_work_queue_add(&task->thread->engine->fast_work_queue,
2759                        nxt_router_conf_apply, task, tmcf, NULL);
2760     return;
2761 
2762 fail:
2763 
2764     nxt_router_conf_error(task, tmcf);
2765 }
2766 
2767 #endif
2768 
2769 
2770 static void
2771 nxt_router_app_rpc_create(nxt_task_t *task,
2772     nxt_router_temp_conf_t *tmcf, nxt_app_t *app)
2773 {
2774     size_t         size;
2775     uint32_t       stream;
2776     nxt_int_t      ret;
2777     nxt_buf_t      *b;
2778     nxt_port_t     *router_port, *dport;
2779     nxt_runtime_t  *rt;
2780     nxt_app_rpc_t  *rpc;
2781 
2782     rt = task->thread->runtime;
2783 
2784     dport = app->proto_port;
2785 
2786     if (dport == NULL) {
2787         nxt_debug(task, "app '%V' prototype prefork", &app->name);
2788 
2789         size = app->name.length + 1 + app->conf.length;
2790 
2791         b = nxt_buf_mem_alloc(tmcf->mem_pool, size, 0);
2792         if (nxt_slow_path(b == NULL)) {
2793             goto fail;
2794         }
2795 
2796         b->completion_handler = nxt_buf_dummy_completion;
2797 
2798         nxt_buf_cpystr(b, &app->name);
2799         *b->mem.free++ = '\0';
2800         nxt_buf_cpystr(b, &app->conf);
2801 
2802         dport = rt->port_by_type[NXT_PROCESS_MAIN];
2803 
2804     } else {
2805         nxt_debug(task, "app '%V' prefork", &app->name);
2806 
2807         b = NULL;
2808     }
2809 
2810     router_port = rt->port_by_type[NXT_PROCESS_ROUTER];
2811 
2812     rpc = nxt_port_rpc_register_handler_ex(task, router_port,
2813                                            nxt_router_app_prefork_ready,
2814                                            nxt_router_app_prefork_error,
2815                                            sizeof(nxt_app_rpc_t));
2816     if (nxt_slow_path(rpc == NULL)) {
2817         goto fail;
2818     }
2819 
2820     rpc->app = app;
2821     rpc->temp_conf = tmcf;
2822     rpc->proto = (b != NULL);
2823 
2824     stream = nxt_port_rpc_ex_stream(rpc);
2825 
2826     ret = nxt_port_socket_write(task, dport,
2827                                 NXT_PORT_MSG_START_PROCESS,
2828                                 -1, stream, router_port->id, b);
2829     if (nxt_slow_path(ret != NXT_OK)) {
2830         nxt_port_rpc_cancel(task, router_port, stream);
2831         goto fail;
2832     }
2833 
2834     if (b == NULL) {
2835         nxt_port_rpc_ex_set_peer(task, router_port, rpc, dport->pid);
2836 
2837         app->pending_processes++;
2838     }
2839 
2840     return;
2841 
2842 fail:
2843 
2844     nxt_router_conf_error(task, tmcf);
2845 }
2846 
2847 
2848 static void
2849 nxt_router_app_prefork_ready(nxt_task_t *task, nxt_port_recv_msg_t *msg,
2850     void *data)
2851 {
2852     nxt_app_t           *app;
2853     nxt_port_t          *port;
2854     nxt_app_rpc_t       *rpc;
2855     nxt_event_engine_t  *engine;
2856 
2857     rpc = data;
2858     app = rpc->app;
2859 
2860     port = msg->u.new_port;
2861 
2862     nxt_assert(port != NULL);
2863     nxt_assert(port->id == 0);
2864 
2865     if (rpc->proto) {
2866         nxt_assert(app->proto_port == NULL);
2867         nxt_assert(port->type == NXT_PROCESS_PROTOTYPE);
2868 
2869         nxt_port_inc_use(port);
2870 
2871         app->proto_port = port;
2872         port->app = app;
2873 
2874         nxt_router_app_rpc_create(task, rpc->temp_conf, app);
2875 
2876         return;
2877     }
2878 
2879     nxt_assert(port->type == NXT_PROCESS_APP);
2880 
2881     port->app = app;
2882     port->main_app_port = port;
2883 
2884     app->pending_processes--;
2885     app->processes++;
2886     app->idle_processes++;
2887 
2888     engine = task->thread->engine;
2889 
2890     nxt_queue_insert_tail(&app->ports, &port->app_link);
2891     nxt_queue_insert_tail(&app->spare_ports, &port->idle_link);
2892 
2893     nxt_debug(task, "app '%V' move new port %PI:%d to spare_ports",
2894               &app->name, port->pid, port->id);
2895 
2896     nxt_port_hash_add(&app->port_hash, port);
2897     app->port_hash_count++;
2898 
2899     port->idle_start = 0;
2900 
2901     nxt_port_inc_use(port);
2902 
2903     nxt_router_app_shared_port_send(task, port);
2904 
2905     nxt_work_queue_add(&engine->fast_work_queue,
2906                        nxt_router_conf_apply, task, rpc->temp_conf, NULL);
2907 }
2908 
2909 
2910 static void
2911 nxt_router_app_prefork_error(nxt_task_t *task, nxt_port_recv_msg_t *msg,
2912     void *data)
2913 {
2914     nxt_app_t               *app;
2915     nxt_app_rpc_t           *rpc;
2916     nxt_router_temp_conf_t  *tmcf;
2917 
2918     rpc = data;
2919     app = rpc->app;
2920     tmcf = rpc->temp_conf;
2921 
2922     if (rpc->proto) {
2923         nxt_log(task, NXT_LOG_WARN, "failed to start prototype \"%V\"",
2924                 &app->name);
2925 
2926     } else {
2927         nxt_log(task, NXT_LOG_WARN, "failed to start application \"%V\"",
2928                 &app->name);
2929 
2930         app->pending_processes--;
2931     }
2932 
2933     nxt_router_conf_error(task, tmcf);
2934 }
2935 
2936 
2937 static nxt_int_t
2938 nxt_router_engines_create(nxt_task_t *task, nxt_router_t *router,
2939     nxt_router_temp_conf_t *tmcf, const nxt_event_interface_t *interface)
2940 {
2941     nxt_int_t                 ret;
2942     nxt_uint_t                n, threads;
2943     nxt_queue_link_t          *qlk;
2944     nxt_router_engine_conf_t  *recf;
2945 
2946     threads = tmcf->router_conf->threads;
2947 
2948     tmcf->engines = nxt_array_create(tmcf->mem_pool, threads,
2949                                      sizeof(nxt_router_engine_conf_t));
2950     if (nxt_slow_path(tmcf->engines == NULL)) {
2951         return NXT_ERROR;
2952     }
2953 
2954     n = 0;
2955 
2956     for (qlk = nxt_queue_first(&router->engines);
2957          qlk != nxt_queue_tail(&router->engines);
2958          qlk = nxt_queue_next(qlk))
2959     {
2960         recf = nxt_array_zero_add(tmcf->engines);
2961         if (nxt_slow_path(recf == NULL)) {
2962             return NXT_ERROR;
2963         }
2964 
2965         recf->engine = nxt_queue_link_data(qlk, nxt_event_engine_t, link0);
2966 
2967         if (n < threads) {
2968             recf->action = NXT_ROUTER_ENGINE_KEEP;
2969             ret = nxt_router_engine_conf_update(tmcf, recf);
2970 
2971         } else {
2972             recf->action = NXT_ROUTER_ENGINE_DELETE;
2973             ret = nxt_router_engine_conf_delete(tmcf, recf);
2974         }
2975 
2976         if (nxt_slow_path(ret != NXT_OK)) {
2977             return ret;
2978         }
2979 
2980         n++;
2981     }
2982 
2983     tmcf->new_threads = n;
2984 
2985     while (n < threads) {
2986         recf = nxt_array_zero_add(tmcf->engines);
2987         if (nxt_slow_path(recf == NULL)) {
2988             return NXT_ERROR;
2989         }
2990 
2991         recf->action = NXT_ROUTER_ENGINE_ADD;
2992 
2993         recf->engine = nxt_event_engine_create(task, interface, NULL, 0, 0);
2994         if (nxt_slow_path(recf->engine == NULL)) {
2995             return NXT_ERROR;
2996         }
2997 
2998         ret = nxt_router_engine_conf_create(tmcf, recf);
2999         if (nxt_slow_path(ret != NXT_OK)) {
3000             return ret;
3001         }
3002 
3003         n++;
3004     }
3005 
3006     return NXT_OK;
3007 }
3008 
3009 
3010 static nxt_int_t
3011 nxt_router_engine_conf_create(nxt_router_temp_conf_t *tmcf,
3012     nxt_router_engine_conf_t *recf)
3013 {
3014     nxt_int_t  ret;
3015 
3016     ret = nxt_router_engine_joints_create(tmcf, recf, &creating_sockets,
3017                                           nxt_router_listen_socket_create);
3018     if (nxt_slow_path(ret != NXT_OK)) {
3019         return ret;
3020     }
3021 
3022     ret = nxt_router_engine_joints_create(tmcf, recf, &updating_sockets,
3023                                           nxt_router_listen_socket_create);
3024     if (nxt_slow_path(ret != NXT_OK)) {
3025         return ret;
3026     }
3027 
3028     return ret;
3029 }
3030 
3031 
3032 static nxt_int_t
3033 nxt_router_engine_conf_update(nxt_router_temp_conf_t *tmcf,
3034     nxt_router_engine_conf_t *recf)
3035 {
3036     nxt_int_t  ret;
3037 
3038     ret = nxt_router_engine_joints_create(tmcf, recf, &creating_sockets,
3039                                           nxt_router_listen_socket_create);
3040     if (nxt_slow_path(ret != NXT_OK)) {
3041         return ret;
3042     }
3043 
3044     ret = nxt_router_engine_joints_create(tmcf, recf, &updating_sockets,
3045                                           nxt_router_listen_socket_update);
3046     if (nxt_slow_path(ret != NXT_OK)) {
3047         return ret;
3048     }
3049 
3050     ret = nxt_router_engine_joints_delete(tmcf, recf, &deleting_sockets);
3051     if (nxt_slow_path(ret != NXT_OK)) {
3052         return ret;
3053     }
3054 
3055     return ret;
3056 }
3057 
3058 
3059 static nxt_int_t
3060 nxt_router_engine_conf_delete(nxt_router_temp_conf_t *tmcf,
3061     nxt_router_engine_conf_t *recf)
3062 {
3063     nxt_int_t  ret;
3064 
3065     ret = nxt_router_engine_quit(tmcf, recf);
3066     if (nxt_slow_path(ret != NXT_OK)) {
3067         return ret;
3068     }
3069 
3070     ret = nxt_router_engine_joints_delete(tmcf, recf, &updating_sockets);
3071     if (nxt_slow_path(ret != NXT_OK)) {
3072         return ret;
3073     }
3074 
3075     return nxt_router_engine_joints_delete(tmcf, recf, &deleting_sockets);
3076 }
3077 
3078 
3079 static nxt_int_t
3080 nxt_router_engine_joints_create(nxt_router_temp_conf_t *tmcf,
3081     nxt_router_engine_conf_t *recf, nxt_queue_t *sockets,
3082     nxt_work_handler_t handler)
3083 {
3084     nxt_int_t                ret;
3085     nxt_joint_job_t          *job;
3086     nxt_queue_link_t         *qlk;
3087     nxt_socket_conf_t        *skcf;
3088     nxt_socket_conf_joint_t  *joint;
3089 
3090     for (qlk = nxt_queue_first(sockets);
3091          qlk != nxt_queue_tail(sockets);
3092          qlk = nxt_queue_next(qlk))
3093     {
3094         job = nxt_mp_get(tmcf->mem_pool, sizeof(nxt_joint_job_t));
3095         if (nxt_slow_path(job == NULL)) {
3096             return NXT_ERROR;
3097         }
3098 
3099         job->work.next = recf->jobs;
3100         recf->jobs = &job->work;
3101 
3102         job->task = tmcf->engine->task;
3103         job->work.handler = handler;
3104         job->work.task = &job->task;
3105         job->work.obj = job;
3106         job->tmcf = tmcf;
3107 
3108         tmcf->count++;
3109 
3110         joint = nxt_mp_alloc(tmcf->router_conf->mem_pool,
3111                              sizeof(nxt_socket_conf_joint_t));
3112         if (nxt_slow_path(joint == NULL)) {
3113             return NXT_ERROR;
3114         }
3115 
3116         job->work.data = joint;
3117 
3118         ret = nxt_upstreams_joint_create(tmcf, &joint->upstreams);
3119         if (nxt_slow_path(ret != NXT_OK)) {
3120             return ret;
3121         }
3122 
3123         joint->count = 1;
3124 
3125         skcf = nxt_queue_link_data(qlk, nxt_socket_conf_t, link);
3126         skcf->count++;
3127         joint->socket_conf = skcf;
3128 
3129         joint->engine = recf->engine;
3130     }
3131 
3132     return NXT_OK;
3133 }
3134 
3135 
3136 static nxt_int_t
3137 nxt_router_engine_quit(nxt_router_temp_conf_t *tmcf,
3138     nxt_router_engine_conf_t *recf)
3139 {
3140     nxt_joint_job_t  *job;
3141 
3142     job = nxt_mp_get(tmcf->mem_pool, sizeof(nxt_joint_job_t));
3143     if (nxt_slow_path(job == NULL)) {
3144         return NXT_ERROR;
3145     }
3146 
3147     job->work.next = recf->jobs;
3148     recf->jobs = &job->work;
3149 
3150     job->task = tmcf->engine->task;
3151     job->work.handler = nxt_router_worker_thread_quit;
3152     job->work.task = &job->task;
3153     job->work.obj = NULL;
3154     job->work.data = NULL;
3155     job->tmcf = NULL;
3156 
3157     return NXT_OK;
3158 }
3159 
3160 
3161 static nxt_int_t
3162 nxt_router_engine_joints_delete(nxt_router_temp_conf_t *tmcf,
3163     nxt_router_engine_conf_t *recf, nxt_queue_t *sockets)
3164 {
3165     nxt_joint_job_t   *job;
3166     nxt_queue_link_t  *qlk;
3167 
3168     for (qlk = nxt_queue_first(sockets);
3169          qlk != nxt_queue_tail(sockets);
3170          qlk = nxt_queue_next(qlk))
3171     {
3172         job = nxt_mp_get(tmcf->mem_pool, sizeof(nxt_joint_job_t));
3173         if (