xref: /unit/src/nxt_router.c (revision 1942:296628096d6c)
1 
2 /*
3  * Copyright (C) Igor Sysoev
4  * Copyright (C) Valentin V. Bartenev
5  * Copyright (C) NGINX, Inc.
6  */
7 
8 #include <nxt_router.h>
9 #include <nxt_conf.h>
10 #if (NXT_TLS)
11 #include <nxt_cert.h>
12 #endif
13 #include <nxt_http.h>
14 #include <nxt_port_memory_int.h>
15 #include <nxt_unit_request.h>
16 #include <nxt_unit_response.h>
17 #include <nxt_router_request.h>
18 #include <nxt_app_queue.h>
19 #include <nxt_port_queue.h>
20 
21 #define NXT_SHARED_PORT_ID  0xFFFFu
22 
23 typedef struct {
24     nxt_str_t         type;
25     uint32_t          processes;
26     uint32_t          max_processes;
27     uint32_t          spare_processes;
28     nxt_msec_t        timeout;
29     nxt_msec_t        idle_timeout;
30     uint32_t          requests;
31     nxt_conf_value_t  *limits_value;
32     nxt_conf_value_t  *processes_value;
33     nxt_conf_value_t  *targets_value;
34 } nxt_router_app_conf_t;
35 
36 
37 typedef struct {
38     nxt_str_t         pass;
39     nxt_str_t         application;
40 } nxt_router_listener_conf_t;
41 
42 
43 #if (NXT_TLS)
44 
45 typedef struct {
46     nxt_str_t               name;
47     nxt_socket_conf_t       *socket_conf;
48     nxt_router_temp_conf_t  *temp_conf;
49     nxt_tls_init_t          *tls_init;
50     nxt_bool_t              last;
51 
52     nxt_queue_link_t        link;  /* for nxt_socket_conf_t.tls */
53 } nxt_router_tlssock_t;
54 
55 #endif
56 
57 
58 typedef struct {
59     nxt_str_t               *name;
60     nxt_socket_conf_t       *socket_conf;
61     nxt_router_temp_conf_t  *temp_conf;
62     nxt_bool_t              last;
63 } nxt_socket_rpc_t;
64 
65 
66 typedef struct {
67     nxt_app_t               *app;
68     nxt_router_temp_conf_t  *temp_conf;
69 } nxt_app_rpc_t;
70 
71 
72 typedef struct {
73     nxt_app_joint_t         *app_joint;
74     uint32_t                generation;
75 } nxt_app_joint_rpc_t;
76 
77 
78 static nxt_int_t nxt_router_prefork(nxt_task_t *task, nxt_process_t *process,
79     nxt_mp_t *mp);
80 static nxt_int_t nxt_router_start(nxt_task_t *task, nxt_process_data_t *data);
81 static void nxt_router_greet_controller(nxt_task_t *task,
82     nxt_port_t *controller_port);
83 
84 static nxt_int_t nxt_router_start_app_process(nxt_task_t *task, nxt_app_t *app);
85 
86 static void nxt_router_new_port_handler(nxt_task_t *task,
87     nxt_port_recv_msg_t *msg);
88 static void nxt_router_conf_data_handler(nxt_task_t *task,
89     nxt_port_recv_msg_t *msg);
90 static void nxt_router_app_restart_handler(nxt_task_t *task,
91     nxt_port_recv_msg_t *msg);
92 static void nxt_router_remove_pid_handler(nxt_task_t *task,
93     nxt_port_recv_msg_t *msg);
94 static void nxt_router_access_log_reopen_handler(nxt_task_t *task,
95     nxt_port_recv_msg_t *msg);
96 
97 static nxt_router_temp_conf_t *nxt_router_temp_conf(nxt_task_t *task);
98 static void nxt_router_conf_apply(nxt_task_t *task, void *obj, void *data);
99 static void nxt_router_conf_ready(nxt_task_t *task,
100     nxt_router_temp_conf_t *tmcf);
101 static void nxt_router_conf_error(nxt_task_t *task,
102     nxt_router_temp_conf_t *tmcf);
103 static void nxt_router_conf_send(nxt_task_t *task,
104     nxt_router_temp_conf_t *tmcf, nxt_port_msg_type_t type);
105 
106 static nxt_int_t nxt_router_conf_create(nxt_task_t *task,
107     nxt_router_temp_conf_t *tmcf, u_char *start, u_char *end);
108 static nxt_int_t nxt_router_conf_process_static(nxt_task_t *task,
109     nxt_router_conf_t *rtcf, nxt_conf_value_t *conf);
110 static nxt_int_t nxt_router_conf_process_client_ip(nxt_task_t *task,
111     nxt_router_temp_conf_t *tmcf, nxt_socket_conf_t *skcf,
112     nxt_conf_value_t *conf);
113 
114 static nxt_app_t *nxt_router_app_find(nxt_queue_t *queue, nxt_str_t *name);
115 static nxt_int_t nxt_router_apps_hash_test(nxt_lvlhsh_query_t *lhq, void *data);
116 static nxt_int_t nxt_router_apps_hash_add(nxt_router_conf_t *rtcf,
117     nxt_app_t *app);
118 static nxt_app_t *nxt_router_apps_hash_get(nxt_router_conf_t *rtcf,
119     nxt_str_t *name);
120 static void nxt_router_apps_hash_use(nxt_task_t *task, nxt_router_conf_t *rtcf,
121     int i);
122 
123 static nxt_int_t nxt_router_app_queue_init(nxt_task_t *task,
124     nxt_port_t *port);
125 static nxt_int_t nxt_router_port_queue_init(nxt_task_t *task,
126     nxt_port_t *port);
127 static nxt_int_t nxt_router_port_queue_map(nxt_task_t *task,
128     nxt_port_t *port, nxt_fd_t fd);
129 static void nxt_router_listen_socket_rpc_create(nxt_task_t *task,
130     nxt_router_temp_conf_t *tmcf, nxt_socket_conf_t *skcf);
131 static void nxt_router_listen_socket_ready(nxt_task_t *task,
132     nxt_port_recv_msg_t *msg, void *data);
133 static void nxt_router_listen_socket_error(nxt_task_t *task,
134     nxt_port_recv_msg_t *msg, void *data);
135 #if (NXT_TLS)
136 static void nxt_router_tls_rpc_handler(nxt_task_t *task,
137     nxt_port_recv_msg_t *msg, void *data);
138 static nxt_int_t nxt_router_conf_tls_insert(nxt_router_temp_conf_t *tmcf,
139     nxt_conf_value_t *value, nxt_socket_conf_t *skcf, nxt_tls_init_t *tls_init,
140     nxt_bool_t last);
141 #endif
142 static void nxt_router_app_rpc_create(nxt_task_t *task,
143     nxt_router_temp_conf_t *tmcf, nxt_app_t *app);
144 static void nxt_router_app_prefork_ready(nxt_task_t *task,
145     nxt_port_recv_msg_t *msg, void *data);
146 static void nxt_router_app_prefork_error(nxt_task_t *task,
147     nxt_port_recv_msg_t *msg, void *data);
148 static nxt_socket_conf_t *nxt_router_socket_conf(nxt_task_t *task,
149     nxt_router_temp_conf_t *tmcf, nxt_str_t *name);
150 static nxt_int_t nxt_router_listen_socket_find(nxt_router_temp_conf_t *tmcf,
151     nxt_socket_conf_t *nskcf, nxt_sockaddr_t *sa);
152 
153 static nxt_int_t nxt_router_engines_create(nxt_task_t *task,
154     nxt_router_t *router, nxt_router_temp_conf_t *tmcf,
155     const nxt_event_interface_t *interface);
156 static nxt_int_t nxt_router_engine_conf_create(nxt_router_temp_conf_t *tmcf,
157     nxt_router_engine_conf_t *recf);
158 static nxt_int_t nxt_router_engine_conf_update(nxt_router_temp_conf_t *tmcf,
159     nxt_router_engine_conf_t *recf);
160 static nxt_int_t nxt_router_engine_conf_delete(nxt_router_temp_conf_t *tmcf,
161     nxt_router_engine_conf_t *recf);
162 static nxt_int_t nxt_router_engine_joints_create(nxt_router_temp_conf_t *tmcf,
163     nxt_router_engine_conf_t *recf, nxt_queue_t *sockets,
164     nxt_work_handler_t handler);
165 static nxt_int_t nxt_router_engine_quit(nxt_router_temp_conf_t *tmcf,
166     nxt_router_engine_conf_t *recf);
167 static nxt_int_t nxt_router_engine_joints_delete(nxt_router_temp_conf_t *tmcf,
168     nxt_router_engine_conf_t *recf, nxt_queue_t *sockets);
169 
170 static nxt_int_t nxt_router_threads_create(nxt_task_t *task, nxt_runtime_t *rt,
171     nxt_router_temp_conf_t *tmcf);
172 static nxt_int_t nxt_router_thread_create(nxt_task_t *task, nxt_runtime_t *rt,
173     nxt_event_engine_t *engine);
174 static void nxt_router_apps_sort(nxt_task_t *task, nxt_router_t *router,
175     nxt_router_temp_conf_t *tmcf);
176 
177 static void nxt_router_engines_post(nxt_router_t *router,
178     nxt_router_temp_conf_t *tmcf);
179 static void nxt_router_engine_post(nxt_event_engine_t *engine,
180     nxt_work_t *jobs);
181 
182 static void nxt_router_thread_start(void *data);
183 static void nxt_router_rt_add_port(nxt_task_t *task, void *obj,
184     void *data);
185 static void nxt_router_listen_socket_create(nxt_task_t *task, void *obj,
186     void *data);
187 static void nxt_router_listen_socket_update(nxt_task_t *task, void *obj,
188     void *data);
189 static void nxt_router_listen_socket_delete(nxt_task_t *task, void *obj,
190     void *data);
191 static void nxt_router_worker_thread_quit(nxt_task_t *task, void *obj,
192     void *data);
193 static void nxt_router_listen_socket_close(nxt_task_t *task, void *obj,
194     void *data);
195 static void nxt_router_thread_exit_handler(nxt_task_t *task, void *obj,
196     void *data);
197 static void nxt_router_req_headers_ack_handler(nxt_task_t *task,
198     nxt_port_recv_msg_t *msg, nxt_request_rpc_data_t *req_rpc_data);
199 static void nxt_router_listen_socket_release(nxt_task_t *task,
200     nxt_socket_conf_t *skcf);
201 
202 static void nxt_router_access_log_writer(nxt_task_t *task,
203     nxt_http_request_t *r, nxt_router_access_log_t *access_log);
204 static u_char *nxt_router_access_log_date(u_char *buf, nxt_realtime_t *now,
205     struct tm *tm, size_t size, const char *format);
206 static void nxt_router_access_log_open(nxt_task_t *task,
207     nxt_router_temp_conf_t *tmcf);
208 static void nxt_router_access_log_ready(nxt_task_t *task,
209     nxt_port_recv_msg_t *msg, void *data);
210 static void nxt_router_access_log_error(nxt_task_t *task,
211     nxt_port_recv_msg_t *msg, void *data);
212 static void nxt_router_access_log_release(nxt_task_t *task,
213     nxt_thread_spinlock_t *lock, nxt_router_access_log_t *access_log);
214 static void nxt_router_access_log_reopen_completion(nxt_task_t *task, void *obj,
215     void *data);
216 static void nxt_router_access_log_reopen_ready(nxt_task_t *task,
217     nxt_port_recv_msg_t *msg, void *data);
218 static void nxt_router_access_log_reopen_error(nxt_task_t *task,
219     nxt_port_recv_msg_t *msg, void *data);
220 
221 static void nxt_router_app_port_ready(nxt_task_t *task,
222     nxt_port_recv_msg_t *msg, void *data);
223 static nxt_int_t nxt_router_app_shared_port_send(nxt_task_t *task,
224     nxt_port_t *app_port);
225 static void nxt_router_app_port_error(nxt_task_t *task,
226     nxt_port_recv_msg_t *msg, void *data);
227 
228 static void nxt_router_app_use(nxt_task_t *task, nxt_app_t *app, int i);
229 static void nxt_router_app_unlink(nxt_task_t *task, nxt_app_t *app);
230 
231 static void nxt_router_app_port_release(nxt_task_t *task, nxt_port_t *port,
232     nxt_apr_action_t action);
233 static void nxt_router_app_port_get(nxt_task_t *task, nxt_app_t *app,
234     nxt_request_rpc_data_t *req_rpc_data);
235 static void nxt_router_http_request_error(nxt_task_t *task, void *obj,
236     void *data);
237 static void nxt_router_http_request_done(nxt_task_t *task, void *obj,
238     void *data);
239 
240 static void nxt_router_app_prepare_request(nxt_task_t *task,
241     nxt_request_rpc_data_t *req_rpc_data);
242 static nxt_buf_t *nxt_router_prepare_msg(nxt_task_t *task,
243     nxt_http_request_t *r, nxt_app_t *app, const nxt_str_t *prefix);
244 
245 static void nxt_router_app_timeout(nxt_task_t *task, void *obj, void *data);
246 static void nxt_router_adjust_idle_timer(nxt_task_t *task, void *obj,
247     void *data);
248 static void nxt_router_app_idle_timeout(nxt_task_t *task, void *obj,
249     void *data);
250 static void nxt_router_app_joint_release_handler(nxt_task_t *task, void *obj,
251     void *data);
252 static void nxt_router_free_app(nxt_task_t *task, void *obj, void *data);
253 
254 static const nxt_http_request_state_t  nxt_http_request_send_state;
255 static void nxt_http_request_send_body(nxt_task_t *task, void *obj, void *data);
256 
257 static void nxt_router_app_joint_use(nxt_task_t *task,
258     nxt_app_joint_t *app_joint, int i);
259 
260 static void nxt_router_http_request_release_post(nxt_task_t *task,
261     nxt_http_request_t *r);
262 static void nxt_router_http_request_release(nxt_task_t *task, void *obj,
263     void *data);
264 static void nxt_router_oosm_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg);
265 static void nxt_router_get_port_handler(nxt_task_t *task,
266     nxt_port_recv_msg_t *msg);
267 static void nxt_router_get_mmap_handler(nxt_task_t *task,
268     nxt_port_recv_msg_t *msg);
269 
270 extern const nxt_http_request_state_t  nxt_http_websocket;
271 
272 static nxt_router_t  *nxt_router;
273 
274 static const nxt_str_t http_prefix = nxt_string("HTTP_");
275 static const nxt_str_t empty_prefix = nxt_string("");
276 
277 static const nxt_str_t  *nxt_app_msg_prefix[] = {
278     &empty_prefix,
279     &empty_prefix,
280     &http_prefix,
281     &http_prefix,
282     &http_prefix,
283     &empty_prefix,
284 };
285 
286 
287 static const nxt_port_handlers_t  nxt_router_process_port_handlers = {
288     .quit         = nxt_signal_quit_handler,
289     .new_port     = nxt_router_new_port_handler,
290     .get_port     = nxt_router_get_port_handler,
291     .change_file  = nxt_port_change_log_file_handler,
292     .mmap         = nxt_port_mmap_handler,
293     .get_mmap     = nxt_router_get_mmap_handler,
294     .data         = nxt_router_conf_data_handler,
295     .app_restart  = nxt_router_app_restart_handler,
296     .remove_pid   = nxt_router_remove_pid_handler,
297     .access_log   = nxt_router_access_log_reopen_handler,
298     .rpc_ready    = nxt_port_rpc_handler,
299     .rpc_error    = nxt_port_rpc_handler,
300     .oosm         = nxt_router_oosm_handler,
301 };
302 
303 
304 const nxt_process_init_t  nxt_router_process = {
305     .name           = "router",
306     .type           = NXT_PROCESS_ROUTER,
307     .prefork        = nxt_router_prefork,
308     .restart        = 1,
309     .setup          = nxt_process_core_setup,
310     .start          = nxt_router_start,
311     .port_handlers  = &nxt_router_process_port_handlers,
312     .signals        = nxt_process_signals,
313 };
314 
315 
316 /* Queues of nxt_socket_conf_t */
317 nxt_queue_t  creating_sockets;
318 nxt_queue_t  pending_sockets;
319 nxt_queue_t  updating_sockets;
320 nxt_queue_t  keeping_sockets;
321 nxt_queue_t  deleting_sockets;
322 
323 
324 static nxt_int_t
325 nxt_router_prefork(nxt_task_t *task, nxt_process_t *process, nxt_mp_t *mp)
326 {
327     nxt_runtime_stop_app_processes(task, task->thread->runtime);
328 
329     return NXT_OK;
330 }
331 
332 
333 static nxt_int_t
334 nxt_router_start(nxt_task_t *task, nxt_process_data_t *data)
335 {
336     nxt_int_t      ret;
337     nxt_port_t     *controller_port;
338     nxt_router_t   *router;
339     nxt_runtime_t  *rt;
340 
341     rt = task->thread->runtime;
342 
343     nxt_log(task, NXT_LOG_INFO, "router started");
344 
345 #if (NXT_TLS)
346     rt->tls = nxt_service_get(rt->services, "SSL/TLS", "OpenSSL");
347     if (nxt_slow_path(rt->tls == NULL)) {
348         return NXT_ERROR;
349     }
350 
351     ret = rt->tls->library_init(task);
352     if (nxt_slow_path(ret != NXT_OK)) {
353         return ret;
354     }
355 #endif
356 
357     ret = nxt_http_init(task);
358     if (nxt_slow_path(ret != NXT_OK)) {
359         return ret;
360     }
361 
362     router = nxt_zalloc(sizeof(nxt_router_t));
363     if (nxt_slow_path(router == NULL)) {
364         return NXT_ERROR;
365     }
366 
367     nxt_queue_init(&router->engines);
368     nxt_queue_init(&router->sockets);
369     nxt_queue_init(&router->apps);
370 
371     nxt_router = router;
372 
373     controller_port = rt->port_by_type[NXT_PROCESS_CONTROLLER];
374     if (controller_port != NULL) {
375         nxt_router_greet_controller(task, controller_port);
376     }
377 
378     return NXT_OK;
379 }
380 
381 
382 static void
383 nxt_router_greet_controller(nxt_task_t *task, nxt_port_t *controller_port)
384 {
385     nxt_port_socket_write(task, controller_port, NXT_PORT_MSG_PROCESS_READY,
386                           -1, 0, 0, NULL);
387 }
388 
389 
390 static void
391 nxt_router_start_app_process_handler(nxt_task_t *task, nxt_port_t *port,
392     void *data)
393 {
394     size_t               size;
395     uint32_t             stream;
396     nxt_mp_t             *mp;
397     nxt_int_t            ret;
398     nxt_app_t            *app;
399     nxt_buf_t            *b;
400     nxt_port_t           *main_port;
401     nxt_runtime_t        *rt;
402     nxt_app_joint_rpc_t  *app_joint_rpc;
403 
404     app = data;
405 
406     rt = task->thread->runtime;
407     main_port = rt->port_by_type[NXT_PROCESS_MAIN];
408 
409     nxt_debug(task, "app '%V' %p start process", &app->name, app);
410 
411     size = app->name.length + 1 + app->conf.length;
412 
413     b = nxt_buf_mem_ts_alloc(task, task->thread->engine->mem_pool, size);
414 
415     if (nxt_slow_path(b == NULL)) {
416         goto failed;
417     }
418 
419     nxt_buf_cpystr(b, &app->name);
420     *b->mem.free++ = '\0';
421     nxt_buf_cpystr(b, &app->conf);
422 
423     app_joint_rpc = nxt_port_rpc_register_handler_ex(task, port,
424                                                      nxt_router_app_port_ready,
425                                                      nxt_router_app_port_error,
426                                                    sizeof(nxt_app_joint_rpc_t));
427     if (nxt_slow_path(app_joint_rpc == NULL)) {
428         goto failed;
429     }
430 
431     stream = nxt_port_rpc_ex_stream(app_joint_rpc);
432 
433     ret = nxt_port_socket_write(task, main_port, NXT_PORT_MSG_START_PROCESS,
434                                 -1, stream, port->id, b);
435     if (nxt_slow_path(ret != NXT_OK)) {
436         nxt_port_rpc_cancel(task, port, stream);
437 
438         goto failed;
439     }
440 
441     app_joint_rpc->app_joint = app->joint;
442     app_joint_rpc->generation = app->generation;
443 
444     nxt_router_app_joint_use(task, app->joint, 1);
445 
446     nxt_router_app_use(task, app, -1);
447 
448     return;
449 
450 failed:
451 
452     if (b != NULL) {
453         mp = b->data;
454         nxt_mp_free(mp, b);
455         nxt_mp_release(mp);
456     }
457 
458     nxt_thread_mutex_lock(&app->mutex);
459 
460     app->pending_processes--;
461 
462     nxt_thread_mutex_unlock(&app->mutex);
463 
464     nxt_router_app_use(task, app, -1);
465 }
466 
467 
468 static void
469 nxt_router_app_joint_use(nxt_task_t *task, nxt_app_joint_t *app_joint, int i)
470 {
471     app_joint->use_count += i;
472 
473     if (app_joint->use_count == 0) {
474         nxt_assert(app_joint->app == NULL);
475 
476         nxt_free(app_joint);
477     }
478 }
479 
480 
481 static nxt_int_t
482 nxt_router_start_app_process(nxt_task_t *task, nxt_app_t *app)
483 {
484     nxt_int_t      res;
485     nxt_port_t     *router_port;
486     nxt_runtime_t  *rt;
487 
488     nxt_debug(task, "app '%V' start process", &app->name);
489 
490     rt = task->thread->runtime;
491     router_port = rt->port_by_type[NXT_PROCESS_ROUTER];
492 
493     nxt_router_app_use(task, app, 1);
494 
495     res = nxt_port_post(task, router_port, nxt_router_start_app_process_handler,
496                         app);
497 
498     if (res == NXT_OK) {
499         return res;
500     }
501 
502     nxt_thread_mutex_lock(&app->mutex);
503 
504     app->pending_processes--;
505 
506     nxt_thread_mutex_unlock(&app->mutex);
507 
508     nxt_router_app_use(task, app, -1);
509 
510     return NXT_ERROR;
511 }
512 
513 
514 nxt_inline nxt_bool_t
515 nxt_router_msg_cancel(nxt_task_t *task, nxt_request_rpc_data_t *req_rpc_data)
516 {
517     nxt_buf_t       *b, *next;
518     nxt_bool_t      cancelled;
519     nxt_port_t      *app_port;
520     nxt_msg_info_t  *msg_info;
521 
522     msg_info = &req_rpc_data->msg_info;
523 
524     if (msg_info->buf == NULL) {
525         return 0;
526     }
527 
528     app_port = req_rpc_data->app_port;
529 
530     if (app_port != NULL && app_port->id == NXT_SHARED_PORT_ID) {
531         cancelled = nxt_app_queue_cancel(app_port->queue,
532                                          msg_info->tracking_cookie,
533                                          req_rpc_data->stream);
534 
535         if (cancelled) {
536             nxt_debug(task, "stream #%uD: cancelled by router",
537                       req_rpc_data->stream);
538         }
539 
540     } else {
541         cancelled = 0;
542     }
543 
544     for (b = msg_info->buf; b != NULL; b = next) {
545         next = b->next;
546         b->next = NULL;
547 
548         if (b->is_port_mmap_sent) {
549             b->is_port_mmap_sent = cancelled == 0;
550         }
551 
552         b->completion_handler(task, b, b->parent);
553     }
554 
555     msg_info->buf = NULL;
556 
557     return cancelled;
558 }
559 
560 
561 nxt_inline nxt_bool_t
562 nxt_queue_chk_remove(nxt_queue_link_t *lnk)
563 {
564     if (lnk->next != NULL) {
565         nxt_queue_remove(lnk);
566 
567         lnk->next = NULL;
568 
569         return 1;
570     }
571 
572     return 0;
573 }
574 
575 
576 nxt_inline void
577 nxt_request_rpc_data_unlink(nxt_task_t *task,
578     nxt_request_rpc_data_t *req_rpc_data)
579 {
580     nxt_app_t           *app;
581     nxt_bool_t          unlinked;
582     nxt_http_request_t  *r;
583 
584     nxt_router_msg_cancel(task, req_rpc_data);
585 
586     if (req_rpc_data->app_port != NULL) {
587         nxt_router_app_port_release(task, req_rpc_data->app_port,
588                                     req_rpc_data->apr_action);
589 
590         req_rpc_data->app_port = NULL;
591     }
592 
593     app = req_rpc_data->app;
594     r = req_rpc_data->request;
595 
596     if (r != NULL) {
597         r->timer_data = NULL;
598 
599         nxt_router_http_request_release_post(task, r);
600 
601         r->req_rpc_data = NULL;
602         req_rpc_data->request = NULL;
603 
604         if (app != NULL) {
605             unlinked = 0;
606 
607             nxt_thread_mutex_lock(&app->mutex);
608 
609             if (r->app_link.next != NULL) {
610                 nxt_queue_remove(&r->app_link);
611                 r->app_link.next = NULL;
612 
613                 unlinked = 1;
614             }
615 
616             nxt_thread_mutex_unlock(&app->mutex);
617 
618             if (unlinked) {
619                 nxt_mp_release(r->mem_pool);
620             }
621         }
622     }
623 
624     if (app != NULL) {
625         nxt_router_app_use(task, app, -1);
626 
627         req_rpc_data->app = NULL;
628     }
629 
630     if (req_rpc_data->msg_info.body_fd != -1) {
631         nxt_fd_close(req_rpc_data->msg_info.body_fd);
632 
633         req_rpc_data->msg_info.body_fd = -1;
634     }
635 
636     if (req_rpc_data->rpc_cancel) {
637         req_rpc_data->rpc_cancel = 0;
638 
639         nxt_port_rpc_cancel(task, task->thread->engine->port,
640                             req_rpc_data->stream);
641     }
642 }
643 
644 
645 static void
646 nxt_router_new_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
647 {
648     nxt_int_t      res;
649     nxt_app_t      *app;
650     nxt_port_t     *port, *main_app_port;
651     nxt_runtime_t  *rt;
652 
653     nxt_port_new_port_handler(task, msg);
654 
655     port = msg->u.new_port;
656 
657     if (port != NULL && port->type == NXT_PROCESS_CONTROLLER) {
658         nxt_router_greet_controller(task, msg->u.new_port);
659     }
660 
661     if (port == NULL || port->type != NXT_PROCESS_APP) {
662 
663         if (msg->port_msg.stream == 0) {
664             return;
665         }
666 
667         msg->port_msg.type = _NXT_PORT_MSG_RPC_ERROR;
668 
669     } else {
670         if (msg->fd[1] != -1) {
671             res = nxt_router_port_queue_map(task, port, msg->fd[1]);
672             if (nxt_slow_path(res != NXT_OK)) {
673                 return;
674             }
675 
676             nxt_fd_close(msg->fd[1]);
677             msg->fd[1] = -1;
678         }
679     }
680 
681     if (msg->port_msg.stream != 0) {
682         nxt_port_rpc_handler(task, msg);
683         return;
684     }
685 
686     /*
687      * Port with "id == 0" is application 'main' port and it always
688      * should come with non-zero stream.
689      */
690     nxt_assert(port->id != 0);
691 
692     /* Find 'main' app port and get app reference. */
693     rt = task->thread->runtime;
694 
695     /*
696      * It is safe to access 'runtime->ports' hash because 'NEW_PORT'
697      * sent to main port (with id == 0) and processed in main thread.
698      */
699     main_app_port = nxt_port_hash_find(&rt->ports, port->pid, 0);
700     nxt_assert(main_app_port != NULL);
701 
702     app = main_app_port->app;
703 
704     if (nxt_fast_path(app != NULL)) {
705         nxt_thread_mutex_lock(&app->mutex);
706 
707         /* TODO here should be find-and-add code because there can be
708            port waiters in port_hash */
709         nxt_port_hash_add(&app->port_hash, port);
710         app->port_hash_count++;
711 
712         nxt_thread_mutex_unlock(&app->mutex);
713 
714         port->app = app;
715     }
716 
717     port->main_app_port = main_app_port;
718 
719     nxt_port_socket_write(task, port, NXT_PORT_MSG_PORT_ACK, -1, 0, 0, NULL);
720 }
721 
722 
723 static void
724 nxt_router_conf_data_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
725 {
726     void                    *p;
727     size_t                  size;
728     nxt_int_t               ret;
729     nxt_port_t              *port;
730     nxt_router_temp_conf_t  *tmcf;
731 
732     port = nxt_runtime_port_find(task->thread->runtime,
733                                  msg->port_msg.pid,
734                                  msg->port_msg.reply_port);
735     if (nxt_slow_path(port == NULL)) {
736         nxt_alert(task, "conf_data_handler: reply port not found");
737         return;
738     }
739 
740     p = MAP_FAILED;
741 
742     /*
743      * Ancient compilers like gcc 4.8.5 on CentOS 7 wants 'size' to be
744      * initialized in 'cleanup' section.
745      */
746     size = 0;
747 
748     tmcf = nxt_router_temp_conf(task);
749     if (nxt_slow_path(tmcf == NULL)) {
750         goto fail;
751     }
752 
753     if (nxt_slow_path(msg->fd[0] == -1)) {
754         nxt_alert(task, "conf_data_handler: invalid shm fd");
755         goto fail;
756     }
757 
758     if (nxt_buf_mem_used_size(&msg->buf->mem) != sizeof(size_t)) {
759         nxt_alert(task, "conf_data_handler: unexpected buffer size (%d)",
760                   (int) nxt_buf_mem_used_size(&msg->buf->mem));
761         goto fail;
762     }
763 
764     nxt_memcpy(&size, msg->buf->mem.pos, sizeof(size_t));
765 
766     p = nxt_mem_mmap(NULL, size, PROT_READ, MAP_SHARED, msg->fd[0], 0);
767 
768     nxt_fd_close(msg->fd[0]);
769     msg->fd[0] = -1;
770 
771     if (nxt_slow_path(p == MAP_FAILED)) {
772         goto fail;
773     }
774 
775     nxt_debug(task, "conf_data_handler(%uz): %*s", size, size, p);
776 
777     tmcf->router_conf->router = nxt_router;
778     tmcf->stream = msg->port_msg.stream;
779     tmcf->port = port;
780 
781     nxt_port_use(task, tmcf->port, 1);
782 
783     ret = nxt_router_conf_create(task, tmcf, p, nxt_pointer_to(p, size));
784 
785     if (nxt_fast_path(ret == NXT_OK)) {
786         nxt_router_conf_apply(task, tmcf, NULL);
787 
788     } else {
789         nxt_router_conf_error(task, tmcf);
790     }
791 
792     goto cleanup;
793 
794 fail:
795 
796     nxt_port_socket_write(task, port, NXT_PORT_MSG_RPC_ERROR, -1,
797                           msg->port_msg.stream, 0, NULL);
798 
799     if (tmcf != NULL) {
800         nxt_mp_release(tmcf->mem_pool);
801     }
802 
803 cleanup:
804 
805     if (p != MAP_FAILED) {
806         nxt_mem_munmap(p, size);
807     }
808 
809     if (msg->fd[0] != -1) {
810         nxt_fd_close(msg->fd[0]);
811         msg->fd[0] = -1;
812     }
813 }
814 
815 
816 static void
817 nxt_router_app_restart_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
818 {
819     nxt_app_t            *app;
820     nxt_int_t            ret;
821     nxt_str_t            app_name;
822     nxt_port_t           *port, *reply_port, *shared_port, *old_shared_port;
823     nxt_port_msg_type_t  reply;
824 
825     reply_port = nxt_runtime_port_find(task->thread->runtime,
826                                        msg->port_msg.pid,
827                                        msg->port_msg.reply_port);
828     if (nxt_slow_path(reply_port == NULL)) {
829         nxt_alert(task, "app_restart_handler: reply port not found");
830         return;
831     }
832 
833     app_name.length = nxt_buf_mem_used_size(&msg->buf->mem);
834     app_name.start = msg->buf->mem.pos;
835 
836     nxt_debug(task, "app_restart_handler: %V", &app_name);
837 
838     app = nxt_router_app_find(&nxt_router->apps, &app_name);
839 
840     if (nxt_fast_path(app != NULL)) {
841         shared_port = nxt_port_new(task, NXT_SHARED_PORT_ID, nxt_pid,
842                                    NXT_PROCESS_APP);
843         if (nxt_slow_path(shared_port == NULL)) {
844             goto fail;
845         }
846 
847         ret = nxt_port_socket_init(task, shared_port, 0);
848         if (nxt_slow_path(ret != NXT_OK)) {
849             nxt_port_use(task, shared_port, -1);
850             goto fail;
851         }
852 
853         ret = nxt_router_app_queue_init(task, shared_port);
854         if (nxt_slow_path(ret != NXT_OK)) {
855             nxt_port_write_close(shared_port);
856             nxt_port_read_close(shared_port);
857             nxt_port_use(task, shared_port, -1);
858             goto fail;
859         }
860 
861         nxt_port_write_enable(task, shared_port);
862 
863         nxt_thread_mutex_lock(&app->mutex);
864 
865         nxt_queue_each(port, &app->ports, nxt_port_t, app_link) {
866 
867             (void) nxt_port_socket_write(task, port, NXT_PORT_MSG_QUIT, -1,
868                                          0, 0, NULL);
869 
870         } nxt_queue_loop;
871 
872         app->generation++;
873 
874         shared_port->app = app;
875 
876         old_shared_port = app->shared_port;
877         old_shared_port->app = NULL;
878 
879         app->shared_port = shared_port;
880 
881         nxt_thread_mutex_unlock(&app->mutex);
882 
883         nxt_port_close(task, old_shared_port);
884         nxt_port_use(task, old_shared_port, -1);
885 
886         reply = NXT_PORT_MSG_RPC_READY_LAST;
887 
888     } else {
889 
890 fail:
891 
892         reply = NXT_PORT_MSG_RPC_ERROR;
893     }
894 
895     nxt_port_socket_write(task, reply_port, reply, -1, msg->port_msg.stream,
896                           0, NULL);
897 }
898 
899 
900 static void
901 nxt_router_app_process_remove_pid(nxt_task_t *task, nxt_port_t *port,
902     void *data)
903 {
904     union {
905         nxt_pid_t  removed_pid;
906         void       *data;
907     } u;
908 
909     u.data = data;
910 
911     nxt_port_rpc_remove_peer(task, port, u.removed_pid);
912 }
913 
914 
915 static void
916 nxt_router_remove_pid_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
917 {
918     nxt_event_engine_t  *engine;
919 
920     nxt_port_remove_pid_handler(task, msg);
921 
922     nxt_queue_each(engine, &nxt_router->engines, nxt_event_engine_t, link0)
923     {
924         if (nxt_fast_path(engine->port != NULL)) {
925             nxt_port_post(task, engine->port, nxt_router_app_process_remove_pid,
926                           msg->u.data);
927         }
928     }
929     nxt_queue_loop;
930 
931     if (msg->port_msg.stream == 0) {
932         return;
933     }
934 
935     msg->port_msg.type = _NXT_PORT_MSG_RPC_ERROR;
936 
937     nxt_port_rpc_handler(task, msg);
938 }
939 
940 
941 static nxt_router_temp_conf_t *
942 nxt_router_temp_conf(nxt_task_t *task)
943 {
944     nxt_mp_t                *mp, *tmp;
945     nxt_router_conf_t       *rtcf;
946     nxt_router_temp_conf_t  *tmcf;
947 
948     mp = nxt_mp_create(1024, 128, 256, 32);
949     if (nxt_slow_path(mp == NULL)) {
950         return NULL;
951     }
952 
953     rtcf = nxt_mp_zget(mp, sizeof(nxt_router_conf_t));
954     if (nxt_slow_path(rtcf == NULL)) {
955         goto fail;
956     }
957 
958     rtcf->mem_pool = mp;
959 
960     tmp = nxt_mp_create(1024, 128, 256, 32);
961     if (nxt_slow_path(tmp == NULL)) {
962         goto fail;
963     }
964 
965     tmcf = nxt_mp_zget(tmp, sizeof(nxt_router_temp_conf_t));
966     if (nxt_slow_path(tmcf == NULL)) {
967         goto temp_fail;
968     }
969 
970     tmcf->mem_pool = tmp;
971     tmcf->router_conf = rtcf;
972     tmcf->count = 1;
973     tmcf->engine = task->thread->engine;
974 
975     tmcf->engines = nxt_array_create(tmcf->mem_pool, 4,
976                                      sizeof(nxt_router_engine_conf_t));
977     if (nxt_slow_path(tmcf->engines == NULL)) {
978         goto temp_fail;
979     }
980 
981     nxt_queue_init(&creating_sockets);
982     nxt_queue_init(&pending_sockets);
983     nxt_queue_init(&updating_sockets);
984     nxt_queue_init(&keeping_sockets);
985     nxt_queue_init(&deleting_sockets);
986 
987 #if (NXT_TLS)
988     nxt_queue_init(&tmcf->tls);
989 #endif
990 
991     nxt_queue_init(&tmcf->apps);
992     nxt_queue_init(&tmcf->previous);
993 
994     return tmcf;
995 
996 temp_fail:
997 
998     nxt_mp_destroy(tmp);
999 
1000 fail:
1001 
1002     nxt_mp_destroy(mp);
1003 
1004     return NULL;
1005 }
1006 
1007 
1008 nxt_inline nxt_bool_t
1009 nxt_router_app_can_start(nxt_app_t *app)
1010 {
1011     return app->processes + app->pending_processes < app->max_processes
1012             && app->pending_processes < app->max_pending_processes;
1013 }
1014 
1015 
1016 nxt_inline nxt_bool_t
1017 nxt_router_app_need_start(nxt_app_t *app)
1018 {
1019     return (app->active_requests
1020               > app->port_hash_count + app->pending_processes)
1021            || (app->spare_processes
1022                 > app->idle_processes + app->pending_processes);
1023 }
1024 
1025 
1026 static void
1027 nxt_router_conf_apply(nxt_task_t *task, void *obj, void *data)
1028 {
1029     nxt_int_t                    ret;
1030     nxt_app_t                    *app;
1031     nxt_router_t                 *router;
1032     nxt_runtime_t                *rt;
1033     nxt_queue_link_t             *qlk;
1034     nxt_socket_conf_t            *skcf;
1035     nxt_router_conf_t            *rtcf;
1036     nxt_router_temp_conf_t       *tmcf;
1037     const nxt_event_interface_t  *interface;
1038 #if (NXT_TLS)
1039     nxt_router_tlssock_t         *tls;
1040 #endif
1041 
1042     tmcf = obj;
1043 
1044     qlk = nxt_queue_first(&pending_sockets);
1045 
1046     if (qlk != nxt_queue_tail(&pending_sockets)) {
1047         nxt_queue_remove(qlk);
1048         nxt_queue_insert_tail(&creating_sockets, qlk);
1049 
1050         skcf = nxt_queue_link_data(qlk, nxt_socket_conf_t, link);
1051 
1052         nxt_router_listen_socket_rpc_create(task, tmcf, skcf);
1053 
1054         return;
1055     }
1056 
1057 #if (NXT_TLS)
1058     qlk = nxt_queue_last(&tmcf->tls);
1059 
1060     if (qlk != nxt_queue_head(&tmcf->tls)) {
1061         nxt_queue_remove(qlk);
1062 
1063         tls = nxt_queue_link_data(qlk, nxt_router_tlssock_t, link);
1064 
1065         nxt_cert_store_get(task, &tls->name, tmcf->mem_pool,
1066                            nxt_router_tls_rpc_handler, tls);
1067         return;
1068     }
1069 #endif
1070 
1071     nxt_queue_each(app, &tmcf->apps, nxt_app_t, link) {
1072 
1073         if (nxt_router_app_need_start(app)) {
1074             nxt_router_app_rpc_create(task, tmcf, app);
1075             return;
1076         }
1077 
1078     } nxt_queue_loop;
1079 
1080     rtcf = tmcf->router_conf;
1081 
1082     if (rtcf->access_log != NULL && rtcf->access_log->fd == -1) {
1083         nxt_router_access_log_open(task, tmcf);
1084         return;
1085     }
1086 
1087     rt = task->thread->runtime;
1088 
1089     interface = nxt_service_get(rt->services, "engine", NULL);
1090 
1091     router = rtcf->router;
1092 
1093     ret = nxt_router_engines_create(task, router, tmcf, interface);
1094     if (nxt_slow_path(ret != NXT_OK)) {
1095         goto fail;
1096     }
1097 
1098     ret = nxt_router_threads_create(task, rt, tmcf);
1099     if (nxt_slow_path(ret != NXT_OK)) {
1100         goto fail;
1101     }
1102 
1103     nxt_router_apps_sort(task, router, tmcf);
1104 
1105     nxt_router_apps_hash_use(task, rtcf, 1);
1106 
1107     nxt_router_engines_post(router, tmcf);
1108 
1109     nxt_queue_add(&router->sockets, &updating_sockets);
1110     nxt_queue_add(&router->sockets, &creating_sockets);
1111 
1112     router->access_log = rtcf->access_log;
1113 
1114     nxt_router_conf_ready(task, tmcf);
1115 
1116     return;
1117 
1118 fail:
1119 
1120     nxt_router_conf_error(task, tmcf);
1121 
1122     return;
1123 }
1124 
1125 
1126 static void
1127 nxt_router_conf_wait(nxt_task_t *task, void *obj, void *data)
1128 {
1129     nxt_joint_job_t  *job;
1130 
1131     job = obj;
1132 
1133     nxt_router_conf_ready(task, job->tmcf);
1134 }
1135 
1136 
1137 static void
1138 nxt_router_conf_ready(nxt_task_t *task, nxt_router_temp_conf_t *tmcf)
1139 {
1140     uint32_t               count;
1141     nxt_router_conf_t      *rtcf;
1142     nxt_thread_spinlock_t  *lock;
1143 
1144     nxt_debug(task, "temp conf %p count: %D", tmcf, tmcf->count);
1145 
1146     if (--tmcf->count > 0) {
1147         return;
1148     }
1149 
1150     nxt_router_conf_send(task, tmcf, NXT_PORT_MSG_RPC_READY_LAST);
1151 
1152     rtcf = tmcf->router_conf;
1153 
1154     lock = &rtcf->router->lock;
1155 
1156     nxt_thread_spin_lock(lock);
1157 
1158     count = rtcf->count;
1159 
1160     nxt_thread_spin_unlock(lock);
1161 
1162     nxt_debug(task, "rtcf %p: %D", rtcf, count);
1163 
1164     if (count == 0) {
1165         nxt_router_apps_hash_use(task, rtcf, -1);
1166 
1167         nxt_router_access_log_release(task, lock, rtcf->access_log);
1168 
1169         nxt_mp_destroy(rtcf->mem_pool);
1170     }
1171 
1172     nxt_mp_release(tmcf->mem_pool);
1173 }
1174 
1175 
1176 static void
1177 nxt_router_conf_error(nxt_task_t *task, nxt_router_temp_conf_t *tmcf)
1178 {
1179     nxt_app_t          *app;
1180     nxt_queue_t        new_socket_confs;
1181     nxt_socket_t       s;
1182     nxt_router_t       *router;
1183     nxt_queue_link_t   *qlk;
1184     nxt_socket_conf_t  *skcf;
1185     nxt_router_conf_t  *rtcf;
1186 
1187     nxt_alert(task, "failed to apply new conf");
1188 
1189     for (qlk = nxt_queue_first(&creating_sockets);
1190          qlk != nxt_queue_tail(&creating_sockets);
1191          qlk = nxt_queue_next(qlk))
1192     {
1193         skcf = nxt_queue_link_data(qlk, nxt_socket_conf_t, link);
1194         s = skcf->listen->socket;
1195 
1196         if (s != -1) {
1197             nxt_socket_close(task, s);
1198         }
1199 
1200         nxt_free(skcf->listen);
1201     }
1202 
1203     nxt_queue_init(&new_socket_confs);
1204     nxt_queue_add(&new_socket_confs, &updating_sockets);
1205     nxt_queue_add(&new_socket_confs, &pending_sockets);
1206     nxt_queue_add(&new_socket_confs, &creating_sockets);
1207 
1208     rtcf = tmcf->router_conf;
1209 
1210     nxt_queue_each(app, &tmcf->apps, nxt_app_t, link) {
1211 
1212         nxt_router_app_unlink(task, app);
1213 
1214     } nxt_queue_loop;
1215 
1216     router = rtcf->router;
1217 
1218     nxt_queue_add(&router->sockets, &keeping_sockets);
1219     nxt_queue_add(&router->sockets, &deleting_sockets);
1220 
1221     nxt_queue_add(&router->apps, &tmcf->previous);
1222 
1223     // TODO: new engines and threads
1224 
1225     nxt_router_access_log_release(task, &router->lock, rtcf->access_log);
1226 
1227     nxt_mp_destroy(rtcf->mem_pool);
1228 
1229     nxt_router_conf_send(task, tmcf, NXT_PORT_MSG_RPC_ERROR);
1230 
1231     nxt_mp_release(tmcf->mem_pool);
1232 }
1233 
1234 
1235 static void
1236 nxt_router_conf_send(nxt_task_t *task, nxt_router_temp_conf_t *tmcf,
1237     nxt_port_msg_type_t type)
1238 {
1239     nxt_port_socket_write(task, tmcf->port, type, -1, tmcf->stream, 0, NULL);
1240 
1241     nxt_port_use(task, tmcf->port, -1);
1242 
1243     tmcf->port = NULL;
1244 }
1245 
1246 
1247 static nxt_conf_map_t  nxt_router_conf[] = {
1248     {
1249         nxt_string("listeners_threads"),
1250         NXT_CONF_MAP_INT32,
1251         offsetof(nxt_router_conf_t, threads),
1252     },
1253 };
1254 
1255 
1256 static nxt_conf_map_t  nxt_router_app_conf[] = {
1257     {
1258         nxt_string("type"),
1259         NXT_CONF_MAP_STR,
1260         offsetof(nxt_router_app_conf_t, type),
1261     },
1262 
1263     {
1264         nxt_string("limits"),
1265         NXT_CONF_MAP_PTR,
1266         offsetof(nxt_router_app_conf_t, limits_value),
1267     },
1268 
1269     {
1270         nxt_string("processes"),
1271         NXT_CONF_MAP_INT32,
1272         offsetof(nxt_router_app_conf_t, processes),
1273     },
1274 
1275     {
1276         nxt_string("processes"),
1277         NXT_CONF_MAP_PTR,
1278         offsetof(nxt_router_app_conf_t, processes_value),
1279     },
1280 
1281     {
1282         nxt_string("targets"),
1283         NXT_CONF_MAP_PTR,
1284         offsetof(nxt_router_app_conf_t, targets_value),
1285     },
1286 };
1287 
1288 
1289 static nxt_conf_map_t  nxt_router_app_limits_conf[] = {
1290     {
1291         nxt_string("timeout"),
1292         NXT_CONF_MAP_MSEC,
1293         offsetof(nxt_router_app_conf_t, timeout),
1294     },
1295 
1296     {
1297         nxt_string("requests"),
1298         NXT_CONF_MAP_INT32,
1299         offsetof(nxt_router_app_conf_t, requests),
1300     },
1301 };
1302 
1303 
1304 static nxt_conf_map_t  nxt_router_app_processes_conf[] = {
1305     {
1306         nxt_string("spare"),
1307         NXT_CONF_MAP_INT32,
1308         offsetof(nxt_router_app_conf_t, spare_processes),
1309     },
1310 
1311     {
1312         nxt_string("max"),
1313         NXT_CONF_MAP_INT32,
1314         offsetof(nxt_router_app_conf_t, max_processes),
1315     },
1316 
1317     {
1318         nxt_string("idle_timeout"),
1319         NXT_CONF_MAP_MSEC,
1320         offsetof(nxt_router_app_conf_t, idle_timeout),
1321     },
1322 };
1323 
1324 
1325 static nxt_conf_map_t  nxt_router_listener_conf[] = {
1326     {
1327         nxt_string("pass"),
1328         NXT_CONF_MAP_STR_COPY,
1329         offsetof(nxt_router_listener_conf_t, pass),
1330     },
1331 
1332     {
1333         nxt_string("application"),
1334         NXT_CONF_MAP_STR_COPY,
1335         offsetof(nxt_router_listener_conf_t, application),
1336     },
1337 };
1338 
1339 
1340 static nxt_conf_map_t  nxt_router_http_conf[] = {
1341     {
1342         nxt_string("header_buffer_size"),
1343         NXT_CONF_MAP_SIZE,
1344         offsetof(nxt_socket_conf_t, header_buffer_size),
1345     },
1346 
1347     {
1348         nxt_string("large_header_buffer_size"),
1349         NXT_CONF_MAP_SIZE,
1350         offsetof(nxt_socket_conf_t, large_header_buffer_size),
1351     },
1352 
1353     {
1354         nxt_string("large_header_buffers"),
1355         NXT_CONF_MAP_SIZE,
1356         offsetof(nxt_socket_conf_t, large_header_buffers),
1357     },
1358 
1359     {
1360         nxt_string("body_buffer_size"),
1361         NXT_CONF_MAP_SIZE,
1362         offsetof(nxt_socket_conf_t, body_buffer_size),
1363     },
1364 
1365     {
1366         nxt_string("max_body_size"),
1367         NXT_CONF_MAP_SIZE,
1368         offsetof(nxt_socket_conf_t, max_body_size),
1369     },
1370 
1371     {
1372         nxt_string("idle_timeout"),
1373         NXT_CONF_MAP_MSEC,
1374         offsetof(nxt_socket_conf_t, idle_timeout),
1375     },
1376 
1377     {
1378         nxt_string("header_read_timeout"),
1379         NXT_CONF_MAP_MSEC,
1380         offsetof(nxt_socket_conf_t, header_read_timeout),
1381     },
1382 
1383     {
1384         nxt_string("body_read_timeout"),
1385         NXT_CONF_MAP_MSEC,
1386         offsetof(nxt_socket_conf_t, body_read_timeout),
1387     },
1388 
1389     {
1390         nxt_string("send_timeout"),
1391         NXT_CONF_MAP_MSEC,
1392         offsetof(nxt_socket_conf_t, send_timeout),
1393     },
1394 
1395     {
1396         nxt_string("body_temp_path"),
1397         NXT_CONF_MAP_STR,
1398         offsetof(nxt_socket_conf_t, body_temp_path),
1399     },
1400 
1401     {
1402         nxt_string("discard_unsafe_fields"),
1403         NXT_CONF_MAP_INT8,
1404         offsetof(nxt_socket_conf_t, discard_unsafe_fields),
1405     },
1406 };
1407 
1408 
1409 static nxt_conf_map_t  nxt_router_websocket_conf[] = {
1410     {
1411         nxt_string("max_frame_size"),
1412         NXT_CONF_MAP_SIZE,
1413         offsetof(nxt_websocket_conf_t, max_frame_size),
1414     },
1415 
1416     {
1417         nxt_string("read_timeout"),
1418         NXT_CONF_MAP_MSEC,
1419         offsetof(nxt_websocket_conf_t, read_timeout),
1420     },
1421 
1422     {
1423         nxt_string("keepalive_interval"),
1424         NXT_CONF_MAP_MSEC,
1425         offsetof(nxt_websocket_conf_t, keepalive_interval),
1426     },
1427 
1428 };
1429 
1430 
1431 static nxt_int_t
1432 nxt_router_conf_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf,
1433     u_char *start, u_char *end)
1434 {
1435     u_char                      *p;
1436     size_t                      size;
1437     nxt_mp_t                    *mp, *app_mp;
1438     uint32_t                    next, next_target;
1439     nxt_int_t                   ret;
1440     nxt_str_t                   name, path, target;
1441     nxt_app_t                   *app, *prev;
1442     nxt_str_t                   *t, *s, *targets;
1443     nxt_uint_t                  n, i;
1444     nxt_port_t                  *port;
1445     nxt_router_t                *router;
1446     nxt_app_joint_t             *app_joint;
1447 #if (NXT_TLS)
1448     nxt_tls_init_t              *tls_init;
1449     nxt_conf_value_t            *certificate;
1450 #endif
1451     nxt_conf_value_t            *conf, *http, *value, *websocket;
1452     nxt_conf_value_t            *applications, *application;
1453     nxt_conf_value_t            *listeners, *listener;
1454     nxt_conf_value_t            *routes_conf, *static_conf, *client_ip_conf;
1455     nxt_socket_conf_t           *skcf;
1456     nxt_http_routes_t           *routes;
1457     nxt_event_engine_t          *engine;
1458     nxt_app_lang_module_t       *lang;
1459     nxt_router_app_conf_t       apcf;
1460     nxt_router_access_log_t     *access_log;
1461     nxt_router_listener_conf_t  lscf;
1462 
1463     static nxt_str_t  http_path = nxt_string("/settings/http");
1464     static nxt_str_t  applications_path = nxt_string("/applications");
1465     static nxt_str_t  listeners_path = nxt_string("/listeners");
1466     static nxt_str_t  routes_path = nxt_string("/routes");
1467     static nxt_str_t  access_log_path = nxt_string("/access_log");
1468 #if (NXT_TLS)
1469     static nxt_str_t  certificate_path = nxt_string("/tls/certificate");
1470     static nxt_str_t  conf_commands_path = nxt_string("/tls/conf_commands");
1471     static nxt_str_t  conf_cache_path = nxt_string("/tls/session/cache_size");
1472     static nxt_str_t  conf_timeout_path = nxt_string("/tls/session/timeout");
1473     static nxt_str_t  conf_tickets = nxt_string("/tls/session/tickets");
1474 #endif
1475     static nxt_str_t  static_path = nxt_string("/settings/http/static");
1476     static nxt_str_t  websocket_path = nxt_string("/settings/http/websocket");
1477     static nxt_str_t  client_ip_path = nxt_string("/client_ip");
1478 
1479     conf = nxt_conf_json_parse(tmcf->mem_pool, start, end, NULL);
1480     if (conf == NULL) {
1481         nxt_alert(task, "configuration parsing error");
1482         return NXT_ERROR;
1483     }
1484 
1485     mp = tmcf->router_conf->mem_pool;
1486 
1487     ret = nxt_conf_map_object(mp, conf, nxt_router_conf,
1488                               nxt_nitems(nxt_router_conf), tmcf->router_conf);
1489     if (ret != NXT_OK) {
1490         nxt_alert(task, "root map error");
1491         return NXT_ERROR;
1492     }
1493 
1494     if (tmcf->router_conf->threads == 0) {
1495         tmcf->router_conf->threads = nxt_ncpu;
1496     }
1497 
1498     static_conf = nxt_conf_get_path(conf, &static_path);
1499 
1500     ret = nxt_router_conf_process_static(task, tmcf->router_conf, static_conf);
1501     if (nxt_slow_path(ret != NXT_OK)) {
1502         return NXT_ERROR;
1503     }
1504 
1505     router = tmcf->router_conf->router;
1506 
1507     applications = nxt_conf_get_path(conf, &applications_path);
1508 
1509     if (applications != NULL) {
1510         next = 0;
1511 
1512         for ( ;; ) {
1513             application = nxt_conf_next_object_member(applications,
1514                                                       &name, &next);
1515             if (application == NULL) {
1516                 break;
1517             }
1518 
1519             nxt_debug(task, "application \"%V\"", &name);
1520 
1521             size = nxt_conf_json_length(application, NULL);
1522 
1523             app_mp = nxt_mp_create(4096, 128, 1024, 64);
1524             if (nxt_slow_path(app_mp == NULL)) {
1525                 goto fail;
1526             }
1527 
1528             app = nxt_mp_get(app_mp, sizeof(nxt_app_t) + name.length + size);
1529             if (app == NULL) {
1530                 goto app_fail;
1531             }
1532 
1533             nxt_memzero(app, sizeof(nxt_app_t));
1534 
1535             app->mem_pool = app_mp;
1536 
1537             app->name.start = nxt_pointer_to(app, sizeof(nxt_app_t));
1538             app->conf.start = nxt_pointer_to(app, sizeof(nxt_app_t)
1539                                                   + name.length);
1540 
1541             p = nxt_conf_json_print(app->conf.start, application, NULL);
1542             app->conf.length = p - app->conf.start;
1543 
1544             nxt_assert(app->conf.length <= size);
1545 
1546             nxt_debug(task, "application conf \"%V\"", &app->conf);
1547 
1548             prev = nxt_router_app_find(&router->apps, &name);
1549 
1550             if (prev != NULL && nxt_strstr_eq(&app->conf, &prev->conf)) {
1551                 nxt_mp_destroy(app_mp);
1552 
1553                 nxt_queue_remove(&prev->link);
1554                 nxt_queue_insert_tail(&tmcf->previous, &prev->link);
1555 
1556                 ret = nxt_router_apps_hash_add(tmcf->router_conf, prev);
1557                 if (nxt_slow_path(ret != NXT_OK)) {
1558                     goto fail;
1559                 }
1560 
1561                 continue;
1562             }
1563 
1564             apcf.processes = 1;
1565             apcf.max_processes = 1;
1566             apcf.spare_processes = 0;
1567             apcf.timeout = 0;
1568             apcf.idle_timeout = 15000;
1569             apcf.requests = 0;
1570             apcf.limits_value = NULL;
1571             apcf.processes_value = NULL;
1572             apcf.targets_value = NULL;
1573 
1574             app_joint = nxt_malloc(sizeof(nxt_app_joint_t));
1575             if (nxt_slow_path(app_joint == NULL)) {
1576                 goto app_fail;
1577             }
1578 
1579             nxt_memzero(app_joint, sizeof(nxt_app_joint_t));
1580 
1581             ret = nxt_conf_map_object(mp, application, nxt_router_app_conf,
1582                                       nxt_nitems(nxt_router_app_conf), &apcf);
1583             if (ret != NXT_OK) {
1584                 nxt_alert(task, "application map error");
1585                 goto app_fail;
1586             }
1587 
1588             if (apcf.limits_value != NULL) {
1589 
1590                 if (nxt_conf_type(apcf.limits_value) != NXT_CONF_OBJECT) {
1591                     nxt_alert(task, "application limits is not object");
1592                     goto app_fail;
1593                 }
1594 
1595                 ret = nxt_conf_map_object(mp, apcf.limits_value,
1596                                         nxt_router_app_limits_conf,
1597                                         nxt_nitems(nxt_router_app_limits_conf),
1598                                         &apcf);
1599                 if (ret != NXT_OK) {
1600                     nxt_alert(task, "application limits map error");
1601                     goto app_fail;
1602                 }
1603             }
1604 
1605             if (apcf.processes_value != NULL
1606                 && nxt_conf_type(apcf.processes_value) == NXT_CONF_OBJECT)
1607             {
1608                 ret = nxt_conf_map_object(mp, apcf.processes_value,
1609                                      nxt_router_app_processes_conf,
1610                                      nxt_nitems(nxt_router_app_processes_conf),
1611                                      &apcf);
1612                 if (ret != NXT_OK) {
1613                     nxt_alert(task, "application processes map error");
1614                     goto app_fail;
1615                 }
1616 
1617             } else {
1618                 apcf.max_processes = apcf.processes;
1619                 apcf.spare_processes = apcf.processes;
1620             }
1621 
1622             if (apcf.targets_value != NULL) {
1623                 n = nxt_conf_object_members_count(apcf.targets_value);
1624 
1625                 targets = nxt_mp_get(app_mp, sizeof(nxt_str_t) * n);
1626                 if (nxt_slow_path(targets == NULL)) {
1627                     goto app_fail;
1628                 }
1629 
1630                 next_target = 0;
1631 
1632                 for (i = 0; i < n; i++) {
1633                     (void) nxt_conf_next_object_member(apcf.targets_value,
1634                                                        &target, &next_target);
1635 
1636                     s = nxt_str_dup(app_mp, &targets[i], &target);
1637                     if (nxt_slow_path(s == NULL)) {
1638                         goto app_fail;
1639                     }
1640                 }
1641 
1642             } else {
1643                 targets = NULL;
1644             }
1645 
1646             nxt_debug(task, "application type: %V", &apcf.type);
1647             nxt_debug(task, "application processes: %D", apcf.processes);
1648             nxt_debug(task, "application request timeout: %M", apcf.timeout);
1649             nxt_debug(task, "application requests: %D", apcf.requests);
1650 
1651             lang = nxt_app_lang_module(task->thread->runtime, &apcf.type);
1652 
1653             if (lang == NULL) {
1654                 nxt_alert(task, "unknown application type: \"%V\"", &apcf.type);
1655                 goto app_fail;
1656             }
1657 
1658             nxt_debug(task, "application language module: \"%s\"", lang->file);
1659 
1660             ret = nxt_thread_mutex_create(&app->mutex);
1661             if (ret != NXT_OK) {
1662                 goto app_fail;
1663             }
1664 
1665             nxt_queue_init(&app->ports);
1666             nxt_queue_init(&app->spare_ports);
1667             nxt_queue_init(&app->idle_ports);
1668             nxt_queue_init(&app->ack_waiting_req);
1669 
1670             app->name.length = name.length;
1671             nxt_memcpy(app->name.start, name.start, name.length);
1672 
1673             app->type = lang->type;
1674             app->max_processes = apcf.max_processes;
1675             app->spare_processes = apcf.spare_processes;
1676             app->max_pending_processes = apcf.spare_processes
1677                                          ? apcf.spare_processes : 1;
1678             app->timeout = apcf.timeout;
1679             app->idle_timeout = apcf.idle_timeout;
1680             app->max_requests = apcf.requests;
1681 
1682             app->targets = targets;
1683 
1684             engine = task->thread->engine;
1685 
1686             app->engine = engine;
1687 
1688             app->adjust_idle_work.handler = nxt_router_adjust_idle_timer;
1689             app->adjust_idle_work.task = &engine->task;
1690             app->adjust_idle_work.obj = app;
1691 
1692             nxt_queue_insert_tail(&tmcf->apps, &app->link);
1693 
1694             ret = nxt_router_apps_hash_add(tmcf->router_conf, app);
1695             if (nxt_slow_path(ret != NXT_OK)) {
1696                 goto app_fail;
1697             }
1698 
1699             nxt_router_app_use(task, app, 1);
1700 
1701             app->joint = app_joint;
1702 
1703             app_joint->use_count = 1;
1704             app_joint->app = app;
1705 
1706             app_joint->idle_timer.bias = NXT_TIMER_DEFAULT_BIAS;
1707             app_joint->idle_timer.work_queue = &engine->fast_work_queue;
1708             app_joint->idle_timer.handler = nxt_router_app_idle_timeout;
1709             app_joint->idle_timer.task = &engine->task;
1710             app_joint->idle_timer.log = app_joint->idle_timer.task->log;
1711 
1712             app_joint->free_app_work.handler = nxt_router_free_app;
1713             app_joint->free_app_work.task = &engine->task;
1714             app_joint->free_app_work.obj = app_joint;
1715 
1716             port = nxt_port_new(task, NXT_SHARED_PORT_ID, nxt_pid,
1717                                 NXT_PROCESS_APP);
1718             if (nxt_slow_path(port == NULL)) {
1719                 return NXT_ERROR;
1720             }
1721 
1722             ret = nxt_port_socket_init(task, port, 0);
1723             if (nxt_slow_path(ret != NXT_OK)) {
1724                 nxt_port_use(task, port, -1);
1725                 return NXT_ERROR;
1726             }
1727 
1728             ret = nxt_router_app_queue_init(task, port);
1729             if (nxt_slow_path(ret != NXT_OK)) {
1730                 nxt_port_write_close(port);
1731                 nxt_port_read_close(port);
1732                 nxt_port_use(task, port, -1);
1733                 return NXT_ERROR;
1734             }
1735 
1736             nxt_port_write_enable(task, port);
1737             port->app = app;
1738 
1739             app->shared_port = port;
1740 
1741             nxt_thread_mutex_create(&app->outgoing.mutex);
1742         }
1743     }
1744 
1745     routes_conf = nxt_conf_get_path(conf, &routes_path);
1746     if (nxt_fast_path(routes_conf != NULL)) {
1747         routes = nxt_http_routes_create(task, tmcf, routes_conf);
1748         if (nxt_slow_path(routes == NULL)) {
1749             return NXT_ERROR;
1750         }
1751         tmcf->router_conf->routes = routes;
1752     }
1753 
1754     ret = nxt_upstreams_create(task, tmcf, conf);
1755     if (nxt_slow_path(ret != NXT_OK)) {
1756         return ret;
1757     }
1758 
1759     http = nxt_conf_get_path(conf, &http_path);
1760 #if 0
1761     if (http == NULL) {
1762         nxt_alert(task, "no \"http\" block");
1763         return NXT_ERROR;
1764     }
1765 #endif
1766 
1767     websocket = nxt_conf_get_path(conf, &websocket_path);
1768 
1769     listeners = nxt_conf_get_path(conf, &listeners_path);
1770 
1771     if (listeners != NULL) {
1772         next = 0;
1773 
1774         for ( ;; ) {
1775             listener = nxt_conf_next_object_member(listeners, &name, &next);
1776             if (listener == NULL) {
1777                 break;
1778             }
1779 
1780             skcf = nxt_router_socket_conf(task, tmcf, &name);
1781             if (skcf == NULL) {
1782                 goto fail;
1783             }
1784 
1785             nxt_memzero(&lscf, sizeof(lscf));
1786 
1787             ret = nxt_conf_map_object(mp, listener, nxt_router_listener_conf,
1788                                       nxt_nitems(nxt_router_listener_conf),
1789                                       &lscf);
1790             if (ret != NXT_OK) {
1791                 nxt_alert(task, "listener map error");
1792                 goto fail;
1793             }
1794 
1795             nxt_debug(task, "application: %V", &lscf.application);
1796 
1797             // STUB, default values if http block is not defined.
1798             skcf->header_buffer_size = 2048;
1799             skcf->large_header_buffer_size = 8192;
1800             skcf->large_header_buffers = 4;
1801             skcf->discard_unsafe_fields = 1;
1802             skcf->body_buffer_size = 16 * 1024;
1803             skcf->max_body_size = 8 * 1024 * 1024;
1804             skcf->proxy_header_buffer_size = 64 * 1024;
1805             skcf->proxy_buffer_size = 4096;
1806             skcf->proxy_buffers = 256;
1807             skcf->idle_timeout = 180 * 1000;
1808             skcf->header_read_timeout = 30 * 1000;
1809             skcf->body_read_timeout = 30 * 1000;
1810             skcf->send_timeout = 30 * 1000;
1811             skcf->proxy_timeout = 60 * 1000;
1812             skcf->proxy_send_timeout = 30 * 1000;
1813             skcf->proxy_read_timeout = 30 * 1000;
1814 
1815             skcf->websocket_conf.max_frame_size = 1024 * 1024;
1816             skcf->websocket_conf.read_timeout = 60 * 1000;
1817             skcf->websocket_conf.keepalive_interval = 30 * 1000;
1818 
1819             nxt_str_null(&skcf->body_temp_path);
1820 
1821             if (http != NULL) {
1822                 ret = nxt_conf_map_object(mp, http, nxt_router_http_conf,
1823                                           nxt_nitems(nxt_router_http_conf),
1824                                           skcf);
1825                 if (ret != NXT_OK) {
1826                     nxt_alert(task, "http map error");
1827                     goto fail;
1828                 }
1829             }
1830 
1831             if (websocket != NULL) {
1832                 ret = nxt_conf_map_object(mp, websocket,
1833                                           nxt_router_websocket_conf,
1834                                           nxt_nitems(nxt_router_websocket_conf),
1835                                           &skcf->websocket_conf);
1836                 if (ret != NXT_OK) {
1837                     nxt_alert(task, "websocket map error");
1838                     goto fail;
1839                 }
1840             }
1841 
1842             t = &skcf->body_temp_path;
1843 
1844             if (t->length == 0) {
1845                 t->start = (u_char *) task->thread->runtime->tmp;
1846                 t->length = nxt_strlen(t->start);
1847             }
1848 
1849             client_ip_conf = nxt_conf_get_path(listener, &client_ip_path);
1850             ret = nxt_router_conf_process_client_ip(task, tmcf, skcf,
1851                                                     client_ip_conf);
1852             if (nxt_slow_path(ret != NXT_OK)) {
1853                 return NXT_ERROR;
1854             }
1855 
1856 #if (NXT_TLS)
1857             certificate = nxt_conf_get_path(listener, &certificate_path);
1858 
1859             if (certificate != NULL) {
1860                 tls_init = nxt_mp_get(tmcf->mem_pool, sizeof(nxt_tls_init_t));
1861                 if (nxt_slow_path(tls_init == NULL)) {
1862                     return NXT_ERROR;
1863                 }
1864 
1865                 tls_init->cache_size = 0;
1866                 tls_init->timeout = 300;
1867 
1868                 value = nxt_conf_get_path(listener, &conf_cache_path);
1869                 if (value != NULL) {
1870                     tls_init->cache_size = nxt_conf_get_number(value);
1871                 }
1872 
1873                 value = nxt_conf_get_path(listener, &conf_timeout_path);
1874                 if (value != NULL) {
1875                     tls_init->timeout = nxt_conf_get_number(value);
1876                 }
1877 
1878                 tls_init->conf_cmds = nxt_conf_get_path(listener,
1879                                                         &conf_commands_path);
1880 
1881                 tls_init->tickets_conf = nxt_conf_get_path(listener,
1882                                                            &conf_tickets);
1883 
1884                 if (nxt_conf_type(certificate) == NXT_CONF_ARRAY) {
1885                     n = nxt_conf_array_elements_count(certificate);
1886 
1887                     for (i = 0; i < n; i++) {
1888                         value = nxt_conf_get_array_element(certificate, i);
1889 
1890                         nxt_assert(value != NULL);
1891 
1892                         ret = nxt_router_conf_tls_insert(tmcf, value, skcf,
1893                                                          tls_init, i == 0);
1894                         if (nxt_slow_path(ret != NXT_OK)) {
1895                             goto fail;
1896                         }
1897                     }
1898 
1899                 } else {
1900                     /* NXT_CONF_STRING */
1901                     ret = nxt_router_conf_tls_insert(tmcf, certificate, skcf,
1902                                                      tls_init, 1);
1903                     if (nxt_slow_path(ret != NXT_OK)) {
1904                         goto fail;
1905                     }
1906                 }
1907             }
1908 #endif
1909 
1910             skcf->listen->handler = nxt_http_conn_init;
1911             skcf->router_conf = tmcf->router_conf;
1912             skcf->router_conf->count++;
1913 
1914             if (lscf.pass.length != 0) {
1915                 skcf->action = nxt_http_action_create(task, tmcf, &lscf.pass);
1916 
1917             /* COMPATIBILITY: listener application. */
1918             } else if (lscf.application.length > 0) {
1919                 skcf->action = nxt_http_pass_application(task,
1920                                                          tmcf->router_conf,
1921                                                          &lscf.application);
1922             }
1923 
1924             if (nxt_slow_path(skcf->action == NULL)) {
1925                 goto fail;
1926             }
1927         }
1928     }
1929 
1930     ret = nxt_http_routes_resolve(task, tmcf);
1931     if (nxt_slow_path(ret != NXT_OK)) {
1932         goto fail;
1933     }
1934 
1935     value = nxt_conf_get_path(conf, &access_log_path);
1936 
1937     if (value != NULL) {
1938         nxt_conf_get_string(value, &path);
1939 
1940         access_log = router->access_log;
1941 
1942         if (access_log != NULL && nxt_strstr_eq(&path, &access_log->path)) {
1943             nxt_thread_spin_lock(&router->lock);
1944             access_log->count++;
1945             nxt_thread_spin_unlock(&router->lock);
1946 
1947         } else {
1948             access_log = nxt_malloc(sizeof(nxt_router_access_log_t)
1949                                     + path.length);
1950             if (access_log == NULL) {
1951                 nxt_alert(task, "failed to allocate access log structure");
1952                 goto fail;
1953             }
1954 
1955             access_log->fd = -1;
1956             access_log->handler = &nxt_router_access_log_writer;
1957             access_log->count = 1;
1958 
1959             access_log->path.length = path.length;
1960             access_log->path.start = (u_char *) access_log
1961                                      + sizeof(nxt_router_access_log_t);
1962 
1963             nxt_memcpy(access_log->path.start, path.start, path.length);
1964         }
1965 
1966         tmcf->router_conf->access_log = access_log;
1967     }
1968 
1969     nxt_queue_add(&deleting_sockets, &router->sockets);
1970     nxt_queue_init(&router->sockets);
1971 
1972     return NXT_OK;
1973 
1974 app_fail:
1975 
1976     nxt_mp_destroy(app_mp);
1977 
1978 fail:
1979 
1980     nxt_queue_each(app, &tmcf->apps, nxt_app_t, link) {
1981 
1982         nxt_queue_remove(&app->link);
1983         nxt_thread_mutex_destroy(&app->mutex);
1984         nxt_mp_destroy(app->mem_pool);
1985 
1986     } nxt_queue_loop;
1987 
1988     return NXT_ERROR;
1989 }
1990 
1991 
1992 #if (NXT_TLS)
1993 
1994 static nxt_int_t
1995 nxt_router_conf_tls_insert(nxt_router_temp_conf_t *tmcf,
1996     nxt_conf_value_t *value, nxt_socket_conf_t *skcf,
1997     nxt_tls_init_t *tls_init, nxt_bool_t last)
1998 {
1999     nxt_router_tlssock_t  *tls;
2000 
2001     tls = nxt_mp_get(tmcf->mem_pool, sizeof(nxt_router_tlssock_t));
2002     if (nxt_slow_path(tls == NULL)) {
2003         return NXT_ERROR;
2004     }
2005 
2006     tls->tls_init = tls_init;
2007     tls->socket_conf = skcf;
2008     tls->temp_conf = tmcf;
2009     tls->last = last;
2010     nxt_conf_get_string(value, &tls->name);
2011 
2012     nxt_queue_insert_tail(&tmcf->tls, &tls->link);
2013 
2014     return NXT_OK;
2015 }
2016 
2017 #endif
2018 
2019 
2020 static nxt_int_t
2021 nxt_router_conf_process_static(nxt_task_t *task, nxt_router_conf_t *rtcf,
2022     nxt_conf_value_t *conf)
2023 {
2024     uint32_t          next, i;
2025     nxt_mp_t          *mp;
2026     nxt_str_t         *type, exten, str;
2027     nxt_int_t         ret;
2028     nxt_uint_t        exts;
2029     nxt_conf_value_t  *mtypes_conf, *ext_conf, *value;
2030 
2031     static nxt_str_t  mtypes_path = nxt_string("/mime_types");
2032 
2033     mp = rtcf->mem_pool;
2034 
2035     ret = nxt_http_static_mtypes_init(mp, &rtcf->mtypes_hash);
2036     if (nxt_slow_path(ret != NXT_OK)) {
2037         return NXT_ERROR;
2038     }
2039 
2040     if (conf == NULL) {
2041         return NXT_OK;
2042     }
2043 
2044     mtypes_conf = nxt_conf_get_path(conf, &mtypes_path);
2045 
2046     if (mtypes_conf != NULL) {
2047         next = 0;
2048 
2049         for ( ;; ) {
2050             ext_conf = nxt_conf_next_object_member(mtypes_conf, &str, &next);
2051 
2052             if (ext_conf == NULL) {
2053                 break;
2054             }
2055 
2056             type = nxt_str_dup(mp, NULL, &str);
2057             if (nxt_slow_path(type == NULL)) {
2058                 return NXT_ERROR;
2059             }
2060 
2061             if (nxt_conf_type(ext_conf) == NXT_CONF_STRING) {
2062                 nxt_conf_get_string(ext_conf, &str);
2063 
2064                 if (nxt_slow_path(nxt_str_dup(mp, &exten, &str) == NULL)) {
2065                     return NXT_ERROR;
2066                 }
2067 
2068                 ret = nxt_http_static_mtypes_hash_add(mp, &rtcf->mtypes_hash,
2069                                                       &exten, type);
2070                 if (nxt_slow_path(ret != NXT_OK)) {
2071                     return NXT_ERROR;
2072                 }
2073 
2074                 continue;
2075             }
2076 
2077             exts = nxt_conf_array_elements_count(ext_conf);
2078 
2079             for (i = 0; i < exts; i++) {
2080                 value = nxt_conf_get_array_element(ext_conf, i);
2081 
2082                 nxt_conf_get_string(value, &str);
2083 
2084                 if (nxt_slow_path(nxt_str_dup(mp, &exten, &str) == NULL)) {
2085                     return NXT_ERROR;
2086                 }
2087 
2088                 ret = nxt_http_static_mtypes_hash_add(mp, &rtcf->mtypes_hash,
2089                                                       &exten, type);
2090                 if (nxt_slow_path(ret != NXT_OK)) {
2091                     return NXT_ERROR;
2092                 }
2093             }
2094         }
2095     }
2096 
2097     return NXT_OK;
2098 }
2099 
2100 
2101 static nxt_int_t
2102 nxt_router_conf_process_client_ip(nxt_task_t *task, nxt_router_temp_conf_t *tmcf,
2103     nxt_socket_conf_t *skcf, nxt_conf_value_t *conf)
2104 {
2105     char                        c;
2106     size_t                      i;
2107     nxt_mp_t                    *mp;
2108     uint32_t                    hash;
2109     nxt_str_t                   header;
2110     nxt_conf_value_t            *source_conf, *header_conf, *recursive_conf;
2111     nxt_http_client_ip_t        *client_ip;
2112     nxt_http_route_addr_rule_t  *source;
2113 
2114     static nxt_str_t  header_path = nxt_string("/header");
2115     static nxt_str_t  source_path = nxt_string("/source");
2116     static nxt_str_t  recursive_path = nxt_string("/recursive");
2117 
2118     if (conf == NULL) {
2119         skcf->client_ip = NULL;
2120 
2121         return NXT_OK;
2122     }
2123 
2124     mp = tmcf->router_conf->mem_pool;
2125 
2126     source_conf = nxt_conf_get_path(conf, &source_path);
2127     header_conf = nxt_conf_get_path(conf, &header_path);
2128     recursive_conf = nxt_conf_get_path(conf, &recursive_path);
2129 
2130     if (source_conf == NULL || header_conf == NULL) {
2131         return NXT_ERROR;
2132     }
2133 
2134     client_ip = nxt_mp_zget(mp, sizeof(nxt_http_client_ip_t));
2135     if (nxt_slow_path(client_ip == NULL)) {
2136         return NXT_ERROR;
2137     }
2138 
2139     source = nxt_http_route_addr_rule_create(task, mp, source_conf);
2140     if (nxt_slow_path(source == NULL)) {
2141         return NXT_ERROR;
2142     }
2143 
2144     client_ip->source = source;
2145 
2146     nxt_conf_get_string(header_conf, &header);
2147 
2148     if (recursive_conf != NULL) {
2149         client_ip->recursive = nxt_conf_get_boolean(recursive_conf);
2150     }
2151 
2152     client_ip->header = nxt_str_dup(mp, NULL, &header);
2153     if (nxt_slow_path(client_ip->header == NULL)) {
2154         return NXT_ERROR;
2155     }
2156 
2157     hash = NXT_HTTP_FIELD_HASH_INIT;
2158 
2159     for (i = 0; i < client_ip->header->length; i++) {
2160         c = client_ip->header->start[i];
2161         hash = nxt_http_field_hash_char(hash, nxt_lowcase(c));
2162     }
2163 
2164     hash = nxt_http_field_hash_end(hash) & 0xFFFF;
2165 
2166     client_ip->header_hash = hash;
2167 
2168     skcf->client_ip = client_ip;
2169 
2170     return NXT_OK;
2171 }
2172 
2173 
2174 static nxt_app_t *
2175 nxt_router_app_find(nxt_queue_t *queue, nxt_str_t *name)
2176 {
2177     nxt_app_t  *app;
2178 
2179     nxt_queue_each(app, queue, nxt_app_t, link) {
2180 
2181         if (nxt_strstr_eq(name, &app->name)) {
2182             return app;
2183         }
2184 
2185     } nxt_queue_loop;
2186 
2187     return NULL;
2188 }
2189 
2190 
2191 static nxt_int_t
2192 nxt_router_app_queue_init(nxt_task_t *task, nxt_port_t *port)
2193 {
2194     void       *mem;
2195     nxt_int_t  fd;
2196 
2197     fd = nxt_shm_open(task, sizeof(nxt_app_queue_t));
2198     if (nxt_slow_path(fd == -1)) {
2199         return NXT_ERROR;
2200     }
2201 
2202     mem = nxt_mem_mmap(NULL, sizeof(nxt_app_queue_t),
2203                        PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
2204     if (nxt_slow_path(mem == MAP_FAILED)) {
2205         nxt_fd_close(fd);
2206 
2207         return NXT_ERROR;
2208     }
2209 
2210     nxt_app_queue_init(mem);
2211 
2212     port->queue_fd = fd;
2213     port->queue = mem;
2214 
2215     return NXT_OK;
2216 }
2217 
2218 
2219 static nxt_int_t
2220 nxt_router_port_queue_init(nxt_task_t *task, nxt_port_t *port)
2221 {
2222     void       *mem;
2223     nxt_int_t  fd;
2224 
2225     fd = nxt_shm_open(task, sizeof(nxt_port_queue_t));
2226     if (nxt_slow_path(fd == -1)) {
2227         return NXT_ERROR;
2228     }
2229 
2230     mem = nxt_mem_mmap(NULL, sizeof(nxt_port_queue_t),
2231                        PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
2232     if (nxt_slow_path(mem == MAP_FAILED)) {
2233         nxt_fd_close(fd);
2234 
2235         return NXT_ERROR;
2236     }
2237 
2238     nxt_port_queue_init(mem);
2239 
2240     port->queue_fd = fd;
2241     port->queue = mem;
2242 
2243     return NXT_OK;
2244 }
2245 
2246 
2247 static nxt_int_t
2248 nxt_router_port_queue_map(nxt_task_t *task, nxt_port_t *port, nxt_fd_t fd)
2249 {
2250     void  *mem;
2251 
2252     nxt_assert(fd != -1);
2253 
2254     mem = nxt_mem_mmap(NULL, sizeof(nxt_port_queue_t),
2255                        PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
2256     if (nxt_slow_path(mem == MAP_FAILED)) {
2257 
2258         return NXT_ERROR;
2259     }
2260 
2261     port->queue = mem;
2262 
2263     return NXT_OK;
2264 }
2265 
2266 
2267 static const nxt_lvlhsh_proto_t  nxt_router_apps_hash_proto  nxt_aligned(64) = {
2268     NXT_LVLHSH_DEFAULT,
2269     nxt_router_apps_hash_test,
2270     nxt_mp_lvlhsh_alloc,
2271     nxt_mp_lvlhsh_free,
2272 };
2273 
2274 
2275 static nxt_int_t
2276 nxt_router_apps_hash_test(nxt_lvlhsh_query_t *lhq, void *data)
2277 {
2278     nxt_app_t  *app;
2279 
2280     app = data;
2281 
2282     return nxt_strstr_eq(&lhq->key, &app->name) ? NXT_OK : NXT_DECLINED;
2283 }
2284 
2285 
2286 static nxt_int_t
2287 nxt_router_apps_hash_add(nxt_router_conf_t *rtcf, nxt_app_t *app)
2288 {
2289     nxt_lvlhsh_query_t  lhq;
2290 
2291     lhq.key_hash = nxt_djb_hash(app->name.start, app->name.length);
2292     lhq.replace = 0;
2293     lhq.key = app->name;
2294     lhq.value = app;
2295     lhq.proto = &nxt_router_apps_hash_proto;
2296     lhq.pool = rtcf->mem_pool;
2297 
2298     switch (nxt_lvlhsh_insert(&rtcf->apps_hash, &lhq)) {
2299 
2300     case NXT_OK:
2301         return NXT_OK;
2302 
2303     case NXT_DECLINED:
2304         nxt_thread_log_alert("router app hash adding failed: "
2305                              "\"%V\" is already in hash", &lhq.key);
2306         /* Fall through. */
2307     default:
2308         return NXT_ERROR;
2309     }
2310 }
2311 
2312 
2313 static nxt_app_t *
2314 nxt_router_apps_hash_get(nxt_router_conf_t *rtcf, nxt_str_t *name)
2315 {
2316     nxt_lvlhsh_query_t  lhq;
2317 
2318     lhq.key_hash = nxt_djb_hash(name->start, name->length);
2319     lhq.key = *name;
2320     lhq.proto = &nxt_router_apps_hash_proto;
2321 
2322     if (nxt_lvlhsh_find(&rtcf->apps_hash, &lhq) != NXT_OK) {
2323         return NULL;
2324     }
2325 
2326     return lhq.value;
2327 }
2328 
2329 
2330 static void
2331 nxt_router_apps_hash_use(nxt_task_t *task, nxt_router_conf_t *rtcf, int i)
2332 {
2333     nxt_app_t          *app;
2334     nxt_lvlhsh_each_t  lhe;
2335 
2336     nxt_lvlhsh_each_init(&lhe, &nxt_router_apps_hash_proto);
2337 
2338     for ( ;; ) {
2339         app = nxt_lvlhsh_each(&rtcf->apps_hash, &lhe);
2340 
2341         if (app == NULL) {
2342             break;
2343         }
2344 
2345         nxt_router_app_use(task, app, i);
2346     }
2347 }
2348 
2349 
2350 typedef struct {
2351     nxt_app_t  *app;
2352     nxt_int_t  target;
2353 } nxt_http_app_conf_t;
2354 
2355 
2356 nxt_int_t
2357 nxt_router_application_init(nxt_router_conf_t *rtcf, nxt_str_t *name,
2358     nxt_str_t *target, nxt_http_action_t *action)
2359 {
2360     nxt_app_t            *app;
2361     nxt_str_t            *targets;
2362     nxt_uint_t           i;
2363     nxt_http_app_conf_t  *conf;
2364 
2365     app = nxt_router_apps_hash_get(rtcf, name);
2366     if (app == NULL) {
2367         return NXT_DECLINED;
2368     }
2369 
2370     conf = nxt_mp_get(rtcf->mem_pool, sizeof(nxt_http_app_conf_t));
2371     if (nxt_slow_path(conf == NULL)) {
2372         return NXT_ERROR;
2373     }
2374 
2375     action->handler = nxt_http_application_handler;
2376     action->u.conf = conf;
2377 
2378     conf->app = app;
2379 
2380     if (target != NULL && target->length != 0) {
2381         targets = app->targets;
2382 
2383         for (i = 0; !nxt_strstr_eq(target, &targets[i]); i++);
2384 
2385         conf->target = i;
2386 
2387     } else {
2388         conf->target = 0;
2389     }
2390 
2391     return NXT_OK;
2392 }
2393 
2394 
2395 static nxt_socket_conf_t *
2396 nxt_router_socket_conf(nxt_task_t *task, nxt_router_temp_conf_t *tmcf,
2397     nxt_str_t *name)
2398 {
2399     size_t               size;
2400     nxt_int_t            ret;
2401     nxt_bool_t           wildcard;
2402     nxt_sockaddr_t       *sa;
2403     nxt_socket_conf_t    *skcf;
2404     nxt_listen_socket_t  *ls;
2405 
2406     sa = nxt_sockaddr_parse(tmcf->mem_pool, name);
2407     if (nxt_slow_path(sa == NULL)) {
2408         nxt_alert(task, "invalid listener \"%V\"", name);
2409         return NULL;
2410     }
2411 
2412     sa->type = SOCK_STREAM;
2413 
2414     nxt_debug(task, "router listener: \"%*s\"",
2415               (size_t) sa->length, nxt_sockaddr_start(sa));
2416 
2417     skcf = nxt_mp_zget(tmcf->router_conf->mem_pool, sizeof(nxt_socket_conf_t));
2418     if (nxt_slow_path(skcf == NULL)) {
2419         return NULL;
2420     }
2421 
2422     size = nxt_sockaddr_size(sa);
2423 
2424     ret = nxt_router_listen_socket_find(tmcf, skcf, sa);
2425 
2426     if (ret != NXT_OK) {
2427 
2428         ls = nxt_zalloc(sizeof(nxt_listen_socket_t) + size);
2429         if (nxt_slow_path(ls == NULL)) {
2430             return NULL;
2431         }
2432 
2433         skcf->listen = ls;
2434 
2435         ls->sockaddr = nxt_pointer_to(ls, sizeof(nxt_listen_socket_t));
2436         nxt_memcpy(ls->sockaddr, sa, size);
2437 
2438         nxt_listen_socket_remote_size(ls);
2439 
2440         ls->socket = -1;
2441         ls->backlog = NXT_LISTEN_BACKLOG;
2442         ls->flags = NXT_NONBLOCK;
2443         ls->read_after_accept = 1;
2444     }
2445 
2446     switch (sa->u.sockaddr.sa_family) {
2447 #if (NXT_HAVE_UNIX_DOMAIN)
2448     case AF_UNIX:
2449         wildcard = 0;
2450         break;
2451 #endif
2452 #if (NXT_INET6)
2453     case AF_INET6:
2454         wildcard = IN6_IS_ADDR_UNSPECIFIED(&sa->u.sockaddr_in6.sin6_addr);
2455         break;
2456 #endif
2457     case AF_INET:
2458     default:
2459         wildcard = (sa->u.sockaddr_in.sin_addr.s_addr == INADDR_ANY);
2460         break;
2461     }
2462 
2463     if (!wildcard) {
2464         skcf->sockaddr = nxt_mp_zget(tmcf->router_conf->mem_pool, size);
2465         if (nxt_slow_path(skcf->sockaddr == NULL)) {
2466             return NULL;
2467         }
2468 
2469         nxt_memcpy(skcf->sockaddr, sa, size);
2470     }
2471 
2472     return skcf;
2473 }
2474 
2475 
2476 static nxt_int_t
2477 nxt_router_listen_socket_find(nxt_router_temp_conf_t *tmcf,
2478     nxt_socket_conf_t *nskcf, nxt_sockaddr_t *sa)
2479 {
2480     nxt_router_t       *router;
2481     nxt_queue_link_t   *qlk;
2482     nxt_socket_conf_t  *skcf;
2483 
2484     router = tmcf->router_conf->router;
2485 
2486     for (qlk = nxt_queue_first(&router->sockets);
2487          qlk != nxt_queue_tail(&router->sockets);
2488          qlk = nxt_queue_next(qlk))
2489     {
2490         skcf = nxt_queue_link_data(qlk, nxt_socket_conf_t, link);
2491 
2492         if (nxt_sockaddr_cmp(skcf->listen->sockaddr, sa)) {
2493             nskcf->listen = skcf->listen;
2494 
2495             nxt_queue_remove(qlk);
2496             nxt_queue_insert_tail(&keeping_sockets, qlk);
2497 
2498             nxt_queue_insert_tail(&updating_sockets, &nskcf->link);
2499 
2500             return NXT_OK;
2501         }
2502     }
2503 
2504     nxt_queue_insert_tail(&pending_sockets, &nskcf->link);
2505 
2506     return NXT_DECLINED;
2507 }
2508 
2509 
2510 static void
2511 nxt_router_listen_socket_rpc_create(nxt_task_t *task,
2512     nxt_router_temp_conf_t *tmcf, nxt_socket_conf_t *skcf)
2513 {
2514     size_t            size;
2515     uint32_t          stream;
2516     nxt_int_t         ret;
2517     nxt_buf_t         *b;
2518     nxt_port_t        *main_port, *router_port;
2519     nxt_runtime_t     *rt;
2520     nxt_socket_rpc_t  *rpc;
2521 
2522     rpc = nxt_mp_alloc(tmcf->mem_pool, sizeof(nxt_socket_rpc_t));
2523     if (rpc == NULL) {
2524         goto fail;
2525     }
2526 
2527     rpc->socket_conf = skcf;
2528     rpc->temp_conf = tmcf;
2529 
2530     size = nxt_sockaddr_size(skcf->listen->sockaddr);
2531 
2532     b = nxt_buf_mem_alloc(tmcf->mem_pool, size, 0);
2533     if (b == NULL) {
2534         goto fail;
2535     }
2536 
2537     b->completion_handler = nxt_buf_dummy_completion;
2538 
2539     b->mem.free = nxt_cpymem(b->mem.free, skcf->listen->sockaddr, size);
2540 
2541     rt = task->thread->runtime;
2542     main_port = rt->port_by_type[NXT_PROCESS_MAIN];
2543     router_port = rt->port_by_type[NXT_PROCESS_ROUTER];
2544 
2545     stream = nxt_port_rpc_register_handler(task, router_port,
2546                                            nxt_router_listen_socket_ready,
2547                                            nxt_router_listen_socket_error,
2548                                            main_port->pid, rpc);
2549     if (nxt_slow_path(stream == 0)) {
2550         goto fail;
2551     }
2552 
2553     ret = nxt_port_socket_write(task, main_port, NXT_PORT_MSG_SOCKET, -1,
2554                                 stream, router_port->id, b);
2555 
2556     if (nxt_slow_path(ret != NXT_OK)) {
2557         nxt_port_rpc_cancel(task, router_port, stream);
2558         goto fail;
2559     }
2560 
2561     return;
2562 
2563 fail:
2564 
2565     nxt_router_conf_error(task, tmcf);
2566 }
2567 
2568 
2569 static void
2570 nxt_router_listen_socket_ready(nxt_task_t *task, nxt_port_recv_msg_t *msg,
2571     void *data)
2572 {
2573     nxt_int_t         ret;
2574     nxt_socket_t      s;
2575     nxt_socket_rpc_t  *rpc;
2576 
2577     rpc = data;
2578 
2579     s = msg->fd[0];
2580 
2581     ret = nxt_socket_nonblocking(task, s);
2582     if (nxt_slow_path(ret != NXT_OK)) {
2583         goto fail;
2584     }
2585 
2586     nxt_socket_defer_accept(task, s, rpc->socket_conf->listen->sockaddr);
2587 
2588     ret = nxt_listen_socket(task, s, NXT_LISTEN_BACKLOG);
2589     if (nxt_slow_path(ret != NXT_OK)) {
2590         goto fail;
2591     }
2592 
2593     rpc->socket_conf->listen->socket = s;
2594 
2595     nxt_work_queue_add(&task->thread->engine->fast_work_queue,
2596                        nxt_router_conf_apply, task, rpc->temp_conf, NULL);
2597 
2598     return;
2599 
2600 fail:
2601 
2602     nxt_socket_close(task, s);
2603 
2604     nxt_router_conf_error(task, rpc->temp_conf);
2605 }
2606 
2607 
2608 static void
2609 nxt_router_listen_socket_error(nxt_task_t *task, nxt_port_recv_msg_t *msg,
2610     void *data)
2611 {
2612     nxt_socket_rpc_t        *rpc;
2613     nxt_router_temp_conf_t  *tmcf;
2614 
2615     rpc = data;
2616     tmcf = rpc->temp_conf;
2617 
2618 #if 0
2619     u_char                  *p;
2620     size_t                  size;
2621     uint8_t                 error;
2622     nxt_buf_t               *in, *out;
2623     nxt_sockaddr_t          *sa;
2624 
2625     static nxt_str_t  socket_errors[] = {
2626         nxt_string("ListenerSystem"),
2627         nxt_string("ListenerNoIPv6"),
2628         nxt_string("ListenerPort"),
2629         nxt_string("ListenerInUse"),
2630         nxt_string("ListenerNoAddress"),
2631         nxt_string("ListenerNoAccess"),
2632         nxt_string("ListenerPath"),
2633     };
2634 
2635     sa = rpc->socket_conf->listen->sockaddr;
2636 
2637     in = nxt_buf_chk_make_plain(tmcf->mem_pool, msg->buf, msg->size);
2638 
2639     if (nxt_slow_path(in == NULL)) {
2640         return;
2641     }
2642 
2643     p = in->mem.pos;
2644 
2645     error = *p++;
2646 
2647     size = nxt_length("listen socket error: ")
2648            + nxt_length("{listener: \"\", code:\"\", message: \"\"}")
2649            + sa->length + socket_errors[error].length + (in->mem.free - p);
2650 
2651     out = nxt_buf_mem_alloc(tmcf->mem_pool, size, 0);
2652     if (nxt_slow_path(out == NULL)) {
2653         return;
2654     }
2655 
2656     out->mem.free = nxt_sprintf(out->mem.free, out->mem.end,
2657                         "listen socket error: "
2658                         "{listener: \"%*s\", code:\"%V\", message: \"%*s\"}",
2659                         (size_t) sa->length, nxt_sockaddr_start(sa),
2660                         &socket_errors[error], in->mem.free - p, p);
2661 
2662     nxt_debug(task, "%*s", out->mem.free - out->mem.pos, out->mem.pos);
2663 #endif
2664 
2665     nxt_router_conf_error(task, tmcf);
2666 }
2667 
2668 
2669 #if (NXT_TLS)
2670 
2671 static void
2672 nxt_router_tls_rpc_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg,
2673     void *data)
2674 {
2675     nxt_mp_t                *mp;
2676     nxt_int_t               ret;
2677     nxt_tls_conf_t          *tlscf;
2678     nxt_router_tlssock_t    *tls;
2679     nxt_tls_bundle_conf_t   *bundle;
2680     nxt_router_temp_conf_t  *tmcf;
2681 
2682     nxt_debug(task, "tls rpc handler");
2683 
2684     tls = data;
2685     tmcf = tls->temp_conf;
2686 
2687     if (msg == NULL || msg->port_msg.type == _NXT_PORT_MSG_RPC_ERROR) {
2688         goto fail;
2689     }
2690 
2691     mp = tmcf->router_conf->mem_pool;
2692 
2693     if (tls->socket_conf->tls == NULL){
2694         tlscf = nxt_mp_zget(mp, sizeof(nxt_tls_conf_t));
2695         if (nxt_slow_path(tlscf == NULL)) {
2696             goto fail;
2697         }
2698 
2699         tlscf->no_wait_shutdown = 1;
2700         tls->socket_conf->tls = tlscf;
2701 
2702     } else {
2703         tlscf = tls->socket_conf->tls;
2704     }
2705 
2706     tls->tls_init->conf = tlscf;
2707 
2708     bundle = nxt_mp_get(mp, sizeof(nxt_tls_bundle_conf_t));
2709     if (nxt_slow_path(bundle == NULL)) {
2710         goto fail;
2711     }
2712 
2713     if (nxt_slow_path(nxt_str_dup(mp, &bundle->name, &tls->name) == NULL)) {
2714         goto fail;
2715     }
2716 
2717     bundle->chain_file = msg->fd[0];
2718     bundle->next = tlscf->bundle;
2719     tlscf->bundle = bundle;
2720 
2721     ret = task->thread->runtime->tls->server_init(task, mp, tls->tls_init,
2722                                                   tls->last);
2723     if (nxt_slow_path(ret != NXT_OK)) {
2724         goto fail;
2725     }
2726 
2727     nxt_work_queue_add(&task->thread->engine->fast_work_queue,
2728                        nxt_router_conf_apply, task, tmcf, NULL);
2729     return;
2730 
2731 fail:
2732 
2733     nxt_router_conf_error(task, tmcf);
2734 }
2735 
2736 #endif
2737 
2738 
2739 static void
2740 nxt_router_app_rpc_create(nxt_task_t *task,
2741     nxt_router_temp_conf_t *tmcf, nxt_app_t *app)
2742 {
2743     size_t         size;
2744     uint32_t       stream;
2745     nxt_int_t      ret;
2746     nxt_buf_t      *b;
2747     nxt_port_t     *main_port, *router_port;
2748     nxt_runtime_t  *rt;
2749     nxt_app_rpc_t  *rpc;
2750 
2751     rpc = nxt_mp_alloc(tmcf->mem_pool, sizeof(nxt_app_rpc_t));
2752     if (rpc == NULL) {
2753         goto fail;
2754     }
2755 
2756     rpc->app = app;
2757     rpc->temp_conf = tmcf;
2758 
2759     nxt_debug(task, "app '%V' prefork", &app->name);
2760 
2761     size = app->name.length + 1 + app->conf.length;
2762 
2763     b = nxt_buf_mem_alloc(tmcf->mem_pool, size, 0);
2764     if (nxt_slow_path(b == NULL)) {
2765         goto fail;
2766     }
2767 
2768     b->completion_handler = nxt_buf_dummy_completion;
2769 
2770     nxt_buf_cpystr(b, &app->name);
2771     *b->mem.free++ = '\0';
2772     nxt_buf_cpystr(b, &app->conf);
2773 
2774     rt = task->thread->runtime;
2775     main_port = rt->port_by_type[NXT_PROCESS_MAIN];
2776     router_port = rt->port_by_type[NXT_PROCESS_ROUTER];
2777 
2778     stream = nxt_port_rpc_register_handler(task, router_port,
2779                                            nxt_router_app_prefork_ready,
2780                                            nxt_router_app_prefork_error,
2781                                            -1, rpc);
2782     if (nxt_slow_path(stream == 0)) {
2783         goto fail;
2784     }
2785 
2786     ret = nxt_port_socket_write(task, main_port, NXT_PORT_MSG_START_PROCESS,
2787                                 -1, stream, router_port->id, b);
2788 
2789     if (nxt_slow_path(ret != NXT_OK)) {
2790         nxt_port_rpc_cancel(task, router_port, stream);
2791         goto fail;
2792     }
2793 
2794     app->pending_processes++;
2795 
2796     return;
2797 
2798 fail:
2799 
2800     nxt_router_conf_error(task, tmcf);
2801 }
2802 
2803 
2804 static void
2805 nxt_router_app_prefork_ready(nxt_task_t *task, nxt_port_recv_msg_t *msg,
2806     void *data)
2807 {
2808     nxt_app_t           *app;
2809     nxt_port_t          *port;
2810     nxt_app_rpc_t       *rpc;
2811     nxt_event_engine_t  *engine;
2812 
2813     rpc = data;
2814     app = rpc->app;
2815 
2816     port = msg->u.new_port;
2817 
2818     nxt_assert(port != NULL);
2819     nxt_assert(port->type == NXT_PROCESS_APP);
2820     nxt_assert(port->id == 0);
2821 
2822     port->app = app;
2823     port->main_app_port = port;
2824 
2825     app->pending_processes--;
2826     app->processes++;
2827     app->idle_processes++;
2828 
2829     engine = task->thread->engine;
2830 
2831     nxt_queue_insert_tail(&app->ports, &port->app_link);
2832     nxt_queue_insert_tail(&app->spare_ports, &port->idle_link);
2833 
2834     nxt_debug(task, "app '%V' move new port %PI:%d to spare_ports",
2835               &app->name, port->pid, port->id);
2836 
2837     nxt_port_hash_add(&app->port_hash, port);
2838     app->port_hash_count++;
2839 
2840     port->idle_start = 0;
2841 
2842     nxt_port_inc_use(port);
2843 
2844     nxt_router_app_shared_port_send(task, port);
2845 
2846     nxt_work_queue_add(&engine->fast_work_queue,
2847                        nxt_router_conf_apply, task, rpc->temp_conf, NULL);
2848 }
2849 
2850 
2851 static void
2852 nxt_router_app_prefork_error(nxt_task_t *task, nxt_port_recv_msg_t *msg,
2853     void *data)
2854 {
2855     nxt_app_t               *app;
2856     nxt_app_rpc_t           *rpc;
2857     nxt_router_temp_conf_t  *tmcf;
2858 
2859     rpc = data;
2860     app = rpc->app;
2861     tmcf = rpc->temp_conf;
2862 
2863     nxt_log(task, NXT_LOG_WARN, "failed to start application \"%V\"",
2864             &app->name);
2865 
2866     app->pending_processes--;
2867 
2868     nxt_router_conf_error(task, tmcf);
2869 }
2870 
2871 
2872 static nxt_int_t
2873 nxt_router_engines_create(nxt_task_t *task, nxt_router_t *router,
2874     nxt_router_temp_conf_t *tmcf, const nxt_event_interface_t *interface)
2875 {
2876     nxt_int_t                 ret;
2877     nxt_uint_t                n, threads;
2878     nxt_queue_link_t          *qlk;
2879     nxt_router_engine_conf_t  *recf;
2880 
2881     threads = tmcf->router_conf->threads;
2882 
2883     tmcf->engines = nxt_array_create(tmcf->mem_pool, threads,
2884                                      sizeof(nxt_router_engine_conf_t));
2885     if (nxt_slow_path(tmcf->engines == NULL)) {
2886         return NXT_ERROR;
2887     }
2888 
2889     n = 0;
2890 
2891     for (qlk = nxt_queue_first(&router->engines);
2892          qlk != nxt_queue_tail(&router->engines);
2893          qlk = nxt_queue_next(qlk))
2894     {
2895         recf = nxt_array_zero_add(tmcf->engines);
2896         if (nxt_slow_path(recf == NULL)) {
2897             return NXT_ERROR;
2898         }
2899 
2900         recf->engine = nxt_queue_link_data(qlk, nxt_event_engine_t, link0);
2901 
2902         if (n < threads) {
2903             recf->action = NXT_ROUTER_ENGINE_KEEP;
2904             ret = nxt_router_engine_conf_update(tmcf, recf);
2905 
2906         } else {
2907             recf->action = NXT_ROUTER_ENGINE_DELETE;
2908             ret = nxt_router_engine_conf_delete(tmcf, recf);
2909         }
2910 
2911         if (nxt_slow_path(ret != NXT_OK)) {
2912             return ret;
2913         }
2914 
2915         n++;
2916     }
2917 
2918     tmcf->new_threads = n;
2919 
2920     while (n < threads) {
2921         recf = nxt_array_zero_add(tmcf->engines);
2922         if (nxt_slow_path(recf == NULL)) {
2923             return NXT_ERROR;
2924         }
2925 
2926         recf->action = NXT_ROUTER_ENGINE_ADD;
2927 
2928         recf->engine = nxt_event_engine_create(task, interface, NULL, 0, 0);
2929         if (nxt_slow_path(recf->engine == NULL)) {
2930             return NXT_ERROR;
2931         }
2932 
2933         ret = nxt_router_engine_conf_create(tmcf, recf);
2934         if (nxt_slow_path(ret != NXT_OK)) {
2935             return ret;
2936         }
2937 
2938         n++;
2939     }
2940 
2941     return NXT_OK;
2942 }
2943 
2944 
2945 static nxt_int_t
2946 nxt_router_engine_conf_create(nxt_router_temp_conf_t *tmcf,
2947     nxt_router_engine_conf_t *recf)
2948 {
2949     nxt_int_t  ret;
2950 
2951     ret = nxt_router_engine_joints_create(tmcf, recf, &creating_sockets,
2952                                           nxt_router_listen_socket_create);
2953     if (nxt_slow_path(ret != NXT_OK)) {
2954         return ret;
2955     }
2956 
2957     ret = nxt_router_engine_joints_create(tmcf, recf, &updating_sockets,
2958                                           nxt_router_listen_socket_create);
2959     if (nxt_slow_path(ret != NXT_OK)) {
2960         return ret;
2961     }
2962 
2963     return ret;
2964 }
2965 
2966 
2967 static nxt_int_t
2968 nxt_router_engine_conf_update(nxt_router_temp_conf_t *tmcf,
2969     nxt_router_engine_conf_t *recf)
2970 {
2971     nxt_int_t  ret;
2972 
2973     ret = nxt_router_engine_joints_create(tmcf, recf, &creating_sockets,
2974                                           nxt_router_listen_socket_create);
2975     if (nxt_slow_path(ret != NXT_OK)) {
2976         return ret;
2977     }
2978 
2979     ret = nxt_router_engine_joints_create(tmcf, recf, &updating_sockets,
2980                                           nxt_router_listen_socket_update);
2981     if (nxt_slow_path(ret != NXT_OK)) {
2982         return ret;
2983     }
2984 
2985     ret = nxt_router_engine_joints_delete(tmcf, recf, &deleting_sockets);
2986     if (nxt_slow_path(ret != NXT_OK)) {
2987         return ret;
2988     }
2989 
2990     return ret;
2991 }
2992 
2993 
2994 static nxt_int_t
2995 nxt_router_engine_conf_delete(nxt_router_temp_conf_t *tmcf,
2996     nxt_router_engine_conf_t *recf)
2997 {
2998     nxt_int_t  ret;
2999 
3000     ret = nxt_router_engine_quit(tmcf, recf);
3001     if (nxt_slow_path(ret != NXT_OK)) {
3002         return ret;
3003     }
3004 
3005     ret = nxt_router_engine_joints_delete(tmcf, recf, &updating_sockets);
3006     if (nxt_slow_path(ret != NXT_OK)) {
3007         return ret;
3008     }
3009 
3010     return nxt_router_engine_joints_delete(tmcf, recf, &deleting_sockets);
3011 }
3012 
3013 
3014 static nxt_int_t
3015 nxt_router_engine_joints_create(nxt_router_temp_conf_t *tmcf,
3016     nxt_router_engine_conf_t *recf, nxt_queue_t *sockets,
3017     nxt_work_handler_t handler)
3018 {
3019     nxt_int_t                ret;
3020     nxt_joint_job_t          *job;
3021     nxt_queue_link_t         *qlk;
3022     nxt_socket_conf_t        *skcf;
3023     nxt_socket_conf_joint_t  *joint;
3024 
3025     for (qlk = nxt_queue_first(sockets);
3026          qlk != nxt_queue_tail(sockets);
3027          qlk = nxt_queue_next(qlk))
3028     {
3029         job = nxt_mp_get(tmcf->mem_pool, sizeof(nxt_joint_job_t));
3030         if (nxt_slow_path(job == NULL)) {
3031             return NXT_ERROR;
3032         }
3033 
3034         job->work.next = recf->jobs;
3035         recf->jobs = &job->work;
3036 
3037         job->task = tmcf->engine->task;
3038         job->work.handler = handler;
3039         job->work.task = &job->task;
3040         job->work.obj = job;
3041         job->tmcf = tmcf;
3042 
3043         tmcf->count++;
3044 
3045         joint = nxt_mp_alloc(tmcf->router_conf->mem_pool,
3046                              sizeof(nxt_socket_conf_joint_t));
3047         if (nxt_slow_path(joint == NULL)) {
3048             return NXT_ERROR;
3049         }
3050 
3051         job->work.data = joint;
3052 
3053         ret = nxt_upstreams_joint_create(tmcf, &joint->upstreams);
3054         if (nxt_slow_path(ret != NXT_OK)) {
3055             return ret;
3056         }
3057 
3058         joint->count = 1;
3059 
3060         skcf = nxt_queue_link_data(qlk, nxt_socket_conf_t, link);
3061         skcf->count++;
3062         joint->socket_conf = skcf;
3063 
3064         joint->engine = recf->engine;
3065     }
3066 
3067     return NXT_OK;
3068 }
3069 
3070 
3071 static nxt_int_t
3072 nxt_router_engine_quit(nxt_router_temp_conf_t *tmcf,
3073     nxt_router_engine_conf_t *recf)
3074 {
3075     nxt_joint_job_t  *job;
3076 
3077     job = nxt_mp_get(tmcf->mem_pool, sizeof(nxt_joint_job_t));
3078     if (nxt_slow_path(job == NULL)) {
3079         return NXT_ERROR;
3080     }
3081 
3082     job->work.next = recf->jobs;
3083     recf->jobs = &job->work;
3084 
3085     job->task = tmcf->engine->task;
3086     job->work.handler = nxt_router_worker_thread_quit;
3087     job->work.task = &job->task;
3088     job->work.obj = NULL;
3089     job->work.data = NULL;
3090     job->tmcf = NULL;
3091 
3092     return NXT_OK;
3093 }
3094 
3095 
3096 static nxt_int_t
3097 nxt_router_engine_joints_delete(nxt_router_temp_conf_t *tmcf,
3098     nxt_router_engine_conf_t *recf, nxt_queue_t *sockets)
3099 {
3100     nxt_joint_job_t   *job;
3101     nxt_queue_link_t  *qlk;
3102 
3103     for (qlk = nxt_queue_first(sockets);
3104          qlk != nxt_queue_tail(sockets);
3105          qlk = nxt_queue_next(qlk))
3106     {
3107         job = nxt_mp_get(tmcf->mem_pool, sizeof(nxt_joint_job_t));
3108         if (nxt_slow_path(job == NULL)) {
3109             return NXT_ERROR;
3110         }
3111 
3112         job->work.next = recf->jobs;
3113         recf->jobs = &job->work;
3114 
3115         job->task = tmcf->engine->task;
3116         job->work.handler = nxt_router_listen_socket_delete;
3117         job->work.task = &job->task;
3118         job->work.obj = job;
3119         job->work.data = nxt_queue_link_data(qlk, nxt_socket_conf_t, link);
3120         job->tmcf = tmcf;
3121 
3122         tmcf->count++;
3123     }
3124 
3125     return NXT_OK;
3126 }
3127 
3128 
3129 static nxt_int_t
3130 nxt_router_threads_create(nxt_task_t *task, nxt_runtime_t *rt,
3131     nxt_router_temp_conf_t *tmcf)
3132 {
3133     nxt_int_t                 ret;
3134     nxt_uint_t                i, threads;
3135     nxt_router_engine_conf_t  *recf;
3136 
3137     recf = tmcf->engines->elts;
3138     threads = tmcf->router_conf->threads;
3139 
3140     for (i = tmcf->new_threads; i < threads; i++) {
3141         ret = nxt_router_thread_create(task, rt, recf[i].engine);
3142         if (nxt_slow_path(ret != NXT_OK)) {
3143             return ret;
3144         }
3145     }
3146 
3147     return NXT_OK;
3148 }
3149 
3150 
3151 static nxt_int_t
3152 nxt_router_thread_create(nxt_task_t *task, nxt_runtime_t *rt,
3153     nxt_event_engine_t *engine)
3154 {
3155     nxt_int_t            ret;
3156     nxt_thread_link_t    *link;
3157     nxt_thread_handle_t  handle;
3158 
3159     link = nxt_zalloc(sizeof(nxt_thread_link_t));
3160 
3161     if (nxt_slow_path(link ==