xref: /unit/src/nxt_router.c (revision 1885:09b857a2cca9)
1 
2 /*
3  * Copyright (C) Igor Sysoev
4  * Copyright (C) Valentin V. Bartenev
5  * Copyright (C) NGINX, Inc.
6  */
7 
8 #include <nxt_router.h>
9 #include <nxt_conf.h>
10 #if (NXT_TLS)
11 #include <nxt_cert.h>
12 #endif
13 #include <nxt_http.h>
14 #include <nxt_port_memory_int.h>
15 #include <nxt_unit_request.h>
16 #include <nxt_unit_response.h>
17 #include <nxt_router_request.h>
18 #include <nxt_app_queue.h>
19 #include <nxt_port_queue.h>
20 
21 typedef struct {
22     nxt_str_t         type;
23     uint32_t          processes;
24     uint32_t          max_processes;
25     uint32_t          spare_processes;
26     nxt_msec_t        timeout;
27     nxt_msec_t        idle_timeout;
28     uint32_t          requests;
29     nxt_conf_value_t  *limits_value;
30     nxt_conf_value_t  *processes_value;
31     nxt_conf_value_t  *targets_value;
32 } nxt_router_app_conf_t;
33 
34 
35 typedef struct {
36     nxt_str_t         pass;
37     nxt_str_t         application;
38 } nxt_router_listener_conf_t;
39 
40 
41 #if (NXT_TLS)
42 
43 typedef struct {
44     nxt_str_t               name;
45     nxt_socket_conf_t       *socket_conf;
46     nxt_router_temp_conf_t  *temp_conf;
47     nxt_conf_value_t        *conf_cmds;
48     nxt_bool_t              last;
49 
50     nxt_queue_link_t   link;  /* for nxt_socket_conf_t.tls */
51 } nxt_router_tlssock_t;
52 
53 #endif
54 
55 
56 typedef struct {
57     nxt_str_t               *name;
58     nxt_socket_conf_t       *socket_conf;
59     nxt_router_temp_conf_t  *temp_conf;
60     nxt_bool_t              last;
61 } nxt_socket_rpc_t;
62 
63 
64 typedef struct {
65     nxt_app_t               *app;
66     nxt_router_temp_conf_t  *temp_conf;
67 } nxt_app_rpc_t;
68 
69 
70 static nxt_int_t nxt_router_prefork(nxt_task_t *task, nxt_process_t *process,
71     nxt_mp_t *mp);
72 static nxt_int_t nxt_router_start(nxt_task_t *task, nxt_process_data_t *data);
73 static void nxt_router_greet_controller(nxt_task_t *task,
74     nxt_port_t *controller_port);
75 
76 static nxt_int_t nxt_router_start_app_process(nxt_task_t *task, nxt_app_t *app);
77 
78 static void nxt_router_new_port_handler(nxt_task_t *task,
79     nxt_port_recv_msg_t *msg);
80 static void nxt_router_conf_data_handler(nxt_task_t *task,
81     nxt_port_recv_msg_t *msg);
82 static void nxt_router_remove_pid_handler(nxt_task_t *task,
83     nxt_port_recv_msg_t *msg);
84 static void nxt_router_access_log_reopen_handler(nxt_task_t *task,
85     nxt_port_recv_msg_t *msg);
86 
87 static nxt_router_temp_conf_t *nxt_router_temp_conf(nxt_task_t *task);
88 static void nxt_router_conf_apply(nxt_task_t *task, void *obj, void *data);
89 static void nxt_router_conf_ready(nxt_task_t *task,
90     nxt_router_temp_conf_t *tmcf);
91 static void nxt_router_conf_error(nxt_task_t *task,
92     nxt_router_temp_conf_t *tmcf);
93 static void nxt_router_conf_send(nxt_task_t *task,
94     nxt_router_temp_conf_t *tmcf, nxt_port_msg_type_t type);
95 
96 static nxt_int_t nxt_router_conf_create(nxt_task_t *task,
97     nxt_router_temp_conf_t *tmcf, u_char *start, u_char *end);
98 static nxt_int_t nxt_router_conf_process_static(nxt_task_t *task,
99     nxt_router_conf_t *rtcf, nxt_conf_value_t *conf);
100 
101 static nxt_app_t *nxt_router_app_find(nxt_queue_t *queue, nxt_str_t *name);
102 static nxt_int_t nxt_router_apps_hash_test(nxt_lvlhsh_query_t *lhq, void *data);
103 static nxt_int_t nxt_router_apps_hash_add(nxt_router_conf_t *rtcf,
104     nxt_app_t *app);
105 static nxt_app_t *nxt_router_apps_hash_get(nxt_router_conf_t *rtcf,
106     nxt_str_t *name);
107 static void nxt_router_apps_hash_use(nxt_task_t *task, nxt_router_conf_t *rtcf,
108     int i);
109 
110 static nxt_int_t nxt_router_app_queue_init(nxt_task_t *task,
111     nxt_port_t *port);
112 static nxt_int_t nxt_router_port_queue_init(nxt_task_t *task,
113     nxt_port_t *port);
114 static nxt_int_t nxt_router_port_queue_map(nxt_task_t *task,
115     nxt_port_t *port, nxt_fd_t fd);
116 static void nxt_router_listen_socket_rpc_create(nxt_task_t *task,
117     nxt_router_temp_conf_t *tmcf, nxt_socket_conf_t *skcf);
118 static void nxt_router_listen_socket_ready(nxt_task_t *task,
119     nxt_port_recv_msg_t *msg, void *data);
120 static void nxt_router_listen_socket_error(nxt_task_t *task,
121     nxt_port_recv_msg_t *msg, void *data);
122 #if (NXT_TLS)
123 static void nxt_router_tls_rpc_handler(nxt_task_t *task,
124     nxt_port_recv_msg_t *msg, void *data);
125 static nxt_int_t nxt_router_conf_tls_insert(nxt_router_temp_conf_t *tmcf,
126     nxt_conf_value_t *value, nxt_socket_conf_t *skcf,
127     nxt_conf_value_t * conf_cmds);
128 #endif
129 static void nxt_router_app_rpc_create(nxt_task_t *task,
130     nxt_router_temp_conf_t *tmcf, nxt_app_t *app);
131 static void nxt_router_app_prefork_ready(nxt_task_t *task,
132     nxt_port_recv_msg_t *msg, void *data);
133 static void nxt_router_app_prefork_error(nxt_task_t *task,
134     nxt_port_recv_msg_t *msg, void *data);
135 static nxt_socket_conf_t *nxt_router_socket_conf(nxt_task_t *task,
136     nxt_router_temp_conf_t *tmcf, nxt_str_t *name);
137 static nxt_int_t nxt_router_listen_socket_find(nxt_router_temp_conf_t *tmcf,
138     nxt_socket_conf_t *nskcf, nxt_sockaddr_t *sa);
139 
140 static nxt_int_t nxt_router_engines_create(nxt_task_t *task,
141     nxt_router_t *router, nxt_router_temp_conf_t *tmcf,
142     const nxt_event_interface_t *interface);
143 static nxt_int_t nxt_router_engine_conf_create(nxt_router_temp_conf_t *tmcf,
144     nxt_router_engine_conf_t *recf);
145 static nxt_int_t nxt_router_engine_conf_update(nxt_router_temp_conf_t *tmcf,
146     nxt_router_engine_conf_t *recf);
147 static nxt_int_t nxt_router_engine_conf_delete(nxt_router_temp_conf_t *tmcf,
148     nxt_router_engine_conf_t *recf);
149 static nxt_int_t nxt_router_engine_joints_create(nxt_router_temp_conf_t *tmcf,
150     nxt_router_engine_conf_t *recf, nxt_queue_t *sockets,
151     nxt_work_handler_t handler);
152 static nxt_int_t nxt_router_engine_quit(nxt_router_temp_conf_t *tmcf,
153     nxt_router_engine_conf_t *recf);
154 static nxt_int_t nxt_router_engine_joints_delete(nxt_router_temp_conf_t *tmcf,
155     nxt_router_engine_conf_t *recf, nxt_queue_t *sockets);
156 
157 static nxt_int_t nxt_router_threads_create(nxt_task_t *task, nxt_runtime_t *rt,
158     nxt_router_temp_conf_t *tmcf);
159 static nxt_int_t nxt_router_thread_create(nxt_task_t *task, nxt_runtime_t *rt,
160     nxt_event_engine_t *engine);
161 static void nxt_router_apps_sort(nxt_task_t *task, nxt_router_t *router,
162     nxt_router_temp_conf_t *tmcf);
163 
164 static void nxt_router_engines_post(nxt_router_t *router,
165     nxt_router_temp_conf_t *tmcf);
166 static void nxt_router_engine_post(nxt_event_engine_t *engine,
167     nxt_work_t *jobs);
168 
169 static void nxt_router_thread_start(void *data);
170 static void nxt_router_rt_add_port(nxt_task_t *task, void *obj,
171     void *data);
172 static void nxt_router_listen_socket_create(nxt_task_t *task, void *obj,
173     void *data);
174 static void nxt_router_listen_socket_update(nxt_task_t *task, void *obj,
175     void *data);
176 static void nxt_router_listen_socket_delete(nxt_task_t *task, void *obj,
177     void *data);
178 static void nxt_router_worker_thread_quit(nxt_task_t *task, void *obj,
179     void *data);
180 static void nxt_router_listen_socket_close(nxt_task_t *task, void *obj,
181     void *data);
182 static void nxt_router_thread_exit_handler(nxt_task_t *task, void *obj,
183     void *data);
184 static void nxt_router_req_headers_ack_handler(nxt_task_t *task,
185     nxt_port_recv_msg_t *msg, nxt_request_rpc_data_t *req_rpc_data);
186 static void nxt_router_listen_socket_release(nxt_task_t *task,
187     nxt_socket_conf_t *skcf);
188 
189 static void nxt_router_access_log_writer(nxt_task_t *task,
190     nxt_http_request_t *r, nxt_router_access_log_t *access_log);
191 static u_char *nxt_router_access_log_date(u_char *buf, nxt_realtime_t *now,
192     struct tm *tm, size_t size, const char *format);
193 static void nxt_router_access_log_open(nxt_task_t *task,
194     nxt_router_temp_conf_t *tmcf);
195 static void nxt_router_access_log_ready(nxt_task_t *task,
196     nxt_port_recv_msg_t *msg, void *data);
197 static void nxt_router_access_log_error(nxt_task_t *task,
198     nxt_port_recv_msg_t *msg, void *data);
199 static void nxt_router_access_log_release(nxt_task_t *task,
200     nxt_thread_spinlock_t *lock, nxt_router_access_log_t *access_log);
201 static void nxt_router_access_log_reopen_completion(nxt_task_t *task, void *obj,
202     void *data);
203 static void nxt_router_access_log_reopen_ready(nxt_task_t *task,
204     nxt_port_recv_msg_t *msg, void *data);
205 static void nxt_router_access_log_reopen_error(nxt_task_t *task,
206     nxt_port_recv_msg_t *msg, void *data);
207 
208 static void nxt_router_app_port_ready(nxt_task_t *task,
209     nxt_port_recv_msg_t *msg, void *data);
210 static nxt_int_t nxt_router_app_shared_port_send(nxt_task_t *task,
211     nxt_port_t *app_port);
212 static void nxt_router_app_port_error(nxt_task_t *task,
213     nxt_port_recv_msg_t *msg, void *data);
214 
215 static void nxt_router_app_use(nxt_task_t *task, nxt_app_t *app, int i);
216 static void nxt_router_app_unlink(nxt_task_t *task, nxt_app_t *app);
217 
218 static void nxt_router_app_port_release(nxt_task_t *task, nxt_port_t *port,
219     nxt_apr_action_t action);
220 static void nxt_router_app_port_get(nxt_task_t *task, nxt_app_t *app,
221     nxt_request_rpc_data_t *req_rpc_data);
222 static void nxt_router_http_request_error(nxt_task_t *task, void *obj,
223     void *data);
224 static void nxt_router_http_request_done(nxt_task_t *task, void *obj,
225     void *data);
226 
227 static void nxt_router_dummy_buf_completion(nxt_task_t *task, void *obj,
228     void *data);
229 static void nxt_router_app_prepare_request(nxt_task_t *task,
230     nxt_request_rpc_data_t *req_rpc_data);
231 static nxt_buf_t *nxt_router_prepare_msg(nxt_task_t *task,
232     nxt_http_request_t *r, nxt_app_t *app, const nxt_str_t *prefix);
233 
234 static void nxt_router_app_timeout(nxt_task_t *task, void *obj, void *data);
235 static void nxt_router_adjust_idle_timer(nxt_task_t *task, void *obj,
236     void *data);
237 static void nxt_router_app_idle_timeout(nxt_task_t *task, void *obj,
238     void *data);
239 static void nxt_router_app_joint_release_handler(nxt_task_t *task, void *obj,
240     void *data);
241 static void nxt_router_free_app(nxt_task_t *task, void *obj, void *data);
242 
243 static const nxt_http_request_state_t  nxt_http_request_send_state;
244 static void nxt_http_request_send_body(nxt_task_t *task, void *obj, void *data);
245 
246 static void nxt_router_app_joint_use(nxt_task_t *task,
247     nxt_app_joint_t *app_joint, int i);
248 
249 static void nxt_router_http_request_release_post(nxt_task_t *task,
250     nxt_http_request_t *r);
251 static void nxt_router_http_request_release(nxt_task_t *task, void *obj,
252     void *data);
253 static void nxt_router_oosm_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg);
254 static void nxt_router_get_port_handler(nxt_task_t *task,
255     nxt_port_recv_msg_t *msg);
256 static void nxt_router_get_mmap_handler(nxt_task_t *task,
257     nxt_port_recv_msg_t *msg);
258 
259 extern const nxt_http_request_state_t  nxt_http_websocket;
260 
261 static nxt_router_t  *nxt_router;
262 
263 static const nxt_str_t http_prefix = nxt_string("HTTP_");
264 static const nxt_str_t empty_prefix = nxt_string("");
265 
266 static const nxt_str_t  *nxt_app_msg_prefix[] = {
267     &empty_prefix,
268     &empty_prefix,
269     &http_prefix,
270     &http_prefix,
271     &http_prefix,
272     &empty_prefix,
273 };
274 
275 
276 static const nxt_port_handlers_t  nxt_router_process_port_handlers = {
277     .quit         = nxt_signal_quit_handler,
278     .new_port     = nxt_router_new_port_handler,
279     .get_port     = nxt_router_get_port_handler,
280     .change_file  = nxt_port_change_log_file_handler,
281     .mmap         = nxt_port_mmap_handler,
282     .get_mmap     = nxt_router_get_mmap_handler,
283     .data         = nxt_router_conf_data_handler,
284     .remove_pid   = nxt_router_remove_pid_handler,
285     .access_log   = nxt_router_access_log_reopen_handler,
286     .rpc_ready    = nxt_port_rpc_handler,
287     .rpc_error    = nxt_port_rpc_handler,
288     .oosm         = nxt_router_oosm_handler,
289 };
290 
291 
292 const nxt_process_init_t  nxt_router_process = {
293     .name           = "router",
294     .type           = NXT_PROCESS_ROUTER,
295     .prefork        = nxt_router_prefork,
296     .restart        = 1,
297     .setup          = nxt_process_core_setup,
298     .start          = nxt_router_start,
299     .port_handlers  = &nxt_router_process_port_handlers,
300     .signals        = nxt_process_signals,
301 };
302 
303 
304 /* Queues of nxt_socket_conf_t */
305 nxt_queue_t  creating_sockets;
306 nxt_queue_t  pending_sockets;
307 nxt_queue_t  updating_sockets;
308 nxt_queue_t  keeping_sockets;
309 nxt_queue_t  deleting_sockets;
310 
311 
312 static nxt_int_t
313 nxt_router_prefork(nxt_task_t *task, nxt_process_t *process, nxt_mp_t *mp)
314 {
315     nxt_runtime_stop_app_processes(task, task->thread->runtime);
316 
317     return NXT_OK;
318 }
319 
320 
321 static nxt_int_t
322 nxt_router_start(nxt_task_t *task, nxt_process_data_t *data)
323 {
324     nxt_int_t      ret;
325     nxt_port_t     *controller_port;
326     nxt_router_t   *router;
327     nxt_runtime_t  *rt;
328 
329     rt = task->thread->runtime;
330 
331     nxt_log(task, NXT_LOG_INFO, "router started");
332 
333 #if (NXT_TLS)
334     rt->tls = nxt_service_get(rt->services, "SSL/TLS", "OpenSSL");
335     if (nxt_slow_path(rt->tls == NULL)) {
336         return NXT_ERROR;
337     }
338 
339     ret = rt->tls->library_init(task);
340     if (nxt_slow_path(ret != NXT_OK)) {
341         return ret;
342     }
343 #endif
344 
345     ret = nxt_http_init(task);
346     if (nxt_slow_path(ret != NXT_OK)) {
347         return ret;
348     }
349 
350     router = nxt_zalloc(sizeof(nxt_router_t));
351     if (nxt_slow_path(router == NULL)) {
352         return NXT_ERROR;
353     }
354 
355     nxt_queue_init(&router->engines);
356     nxt_queue_init(&router->sockets);
357     nxt_queue_init(&router->apps);
358 
359     nxt_router = router;
360 
361     controller_port = rt->port_by_type[NXT_PROCESS_CONTROLLER];
362     if (controller_port != NULL) {
363         nxt_router_greet_controller(task, controller_port);
364     }
365 
366     return NXT_OK;
367 }
368 
369 
370 static void
371 nxt_router_greet_controller(nxt_task_t *task, nxt_port_t *controller_port)
372 {
373     nxt_port_socket_write(task, controller_port, NXT_PORT_MSG_PROCESS_READY,
374                           -1, 0, 0, NULL);
375 }
376 
377 
378 static void
379 nxt_router_start_app_process_handler(nxt_task_t *task, nxt_port_t *port,
380     void *data)
381 {
382     size_t         size;
383     uint32_t       stream;
384     nxt_mp_t       *mp;
385     nxt_int_t      ret;
386     nxt_app_t      *app;
387     nxt_buf_t      *b;
388     nxt_port_t     *main_port;
389     nxt_runtime_t  *rt;
390 
391     app = data;
392 
393     rt = task->thread->runtime;
394     main_port = rt->port_by_type[NXT_PROCESS_MAIN];
395 
396     nxt_debug(task, "app '%V' %p start process", &app->name, app);
397 
398     size = app->name.length + 1 + app->conf.length;
399 
400     b = nxt_buf_mem_ts_alloc(task, task->thread->engine->mem_pool, size);
401 
402     if (nxt_slow_path(b == NULL)) {
403         goto failed;
404     }
405 
406     nxt_buf_cpystr(b, &app->name);
407     *b->mem.free++ = '\0';
408     nxt_buf_cpystr(b, &app->conf);
409 
410     nxt_router_app_joint_use(task, app->joint, 1);
411 
412     stream = nxt_port_rpc_register_handler(task, port,
413                                            nxt_router_app_port_ready,
414                                            nxt_router_app_port_error,
415                                            -1, app->joint);
416 
417     if (nxt_slow_path(stream == 0)) {
418         nxt_router_app_joint_use(task, app->joint, -1);
419 
420         goto failed;
421     }
422 
423     ret = nxt_port_socket_write(task, main_port, NXT_PORT_MSG_START_PROCESS,
424                                 -1, stream, port->id, b);
425 
426     if (nxt_slow_path(ret != NXT_OK)) {
427         nxt_port_rpc_cancel(task, port, stream);
428 
429         nxt_router_app_joint_use(task, app->joint, -1);
430 
431         goto failed;
432     }
433 
434     nxt_router_app_use(task, app, -1);
435 
436     return;
437 
438 failed:
439 
440     if (b != NULL) {
441         mp = b->data;
442         nxt_mp_free(mp, b);
443         nxt_mp_release(mp);
444     }
445 
446     nxt_thread_mutex_lock(&app->mutex);
447 
448     app->pending_processes--;
449 
450     nxt_thread_mutex_unlock(&app->mutex);
451 
452     nxt_router_app_use(task, app, -1);
453 }
454 
455 
456 static void
457 nxt_router_app_joint_use(nxt_task_t *task, nxt_app_joint_t *app_joint, int i)
458 {
459     app_joint->use_count += i;
460 
461     if (app_joint->use_count == 0) {
462         nxt_assert(app_joint->app == NULL);
463 
464         nxt_free(app_joint);
465     }
466 }
467 
468 
469 static nxt_int_t
470 nxt_router_start_app_process(nxt_task_t *task, nxt_app_t *app)
471 {
472     nxt_int_t      res;
473     nxt_port_t     *router_port;
474     nxt_runtime_t  *rt;
475 
476     nxt_debug(task, "app '%V' start process", &app->name);
477 
478     rt = task->thread->runtime;
479     router_port = rt->port_by_type[NXT_PROCESS_ROUTER];
480 
481     nxt_router_app_use(task, app, 1);
482 
483     res = nxt_port_post(task, router_port, nxt_router_start_app_process_handler,
484                         app);
485 
486     if (res == NXT_OK) {
487         return res;
488     }
489 
490     nxt_thread_mutex_lock(&app->mutex);
491 
492     app->pending_processes--;
493 
494     nxt_thread_mutex_unlock(&app->mutex);
495 
496     nxt_router_app_use(task, app, -1);
497 
498     return NXT_ERROR;
499 }
500 
501 
502 nxt_inline nxt_bool_t
503 nxt_router_msg_cancel(nxt_task_t *task, nxt_request_rpc_data_t *req_rpc_data)
504 {
505     nxt_buf_t       *b, *next;
506     nxt_bool_t      cancelled;
507     nxt_msg_info_t  *msg_info;
508 
509     msg_info = &req_rpc_data->msg_info;
510 
511     if (msg_info->buf == NULL) {
512         return 0;
513     }
514 
515     cancelled = nxt_app_queue_cancel(req_rpc_data->app->shared_port->queue,
516                                      msg_info->tracking_cookie,
517                                      req_rpc_data->stream);
518 
519     if (cancelled) {
520         nxt_debug(task, "stream #%uD: cancelled by router",
521                   req_rpc_data->stream);
522     }
523 
524     for (b = msg_info->buf; b != NULL; b = next) {
525         next = b->next;
526         b->next = NULL;
527 
528         if (b->is_port_mmap_sent) {
529             b->is_port_mmap_sent = cancelled == 0;
530         }
531 
532         b->completion_handler(task, b, b->parent);
533     }
534 
535     msg_info->buf = NULL;
536 
537     return cancelled;
538 }
539 
540 
541 nxt_inline nxt_bool_t
542 nxt_queue_chk_remove(nxt_queue_link_t *lnk)
543 {
544     if (lnk->next != NULL) {
545         nxt_queue_remove(lnk);
546 
547         lnk->next = NULL;
548 
549         return 1;
550     }
551 
552     return 0;
553 }
554 
555 
556 nxt_inline void
557 nxt_request_rpc_data_unlink(nxt_task_t *task,
558     nxt_request_rpc_data_t *req_rpc_data)
559 {
560     nxt_app_t           *app;
561     nxt_bool_t          unlinked;
562     nxt_http_request_t  *r;
563 
564     nxt_router_msg_cancel(task, req_rpc_data);
565 
566     if (req_rpc_data->app_port != NULL) {
567         nxt_router_app_port_release(task, req_rpc_data->app_port,
568                                     req_rpc_data->apr_action);
569 
570         req_rpc_data->app_port = NULL;
571     }
572 
573     app = req_rpc_data->app;
574     r = req_rpc_data->request;
575 
576     if (r != NULL) {
577         r->timer_data = NULL;
578 
579         nxt_router_http_request_release_post(task, r);
580 
581         r->req_rpc_data = NULL;
582         req_rpc_data->request = NULL;
583 
584         if (app != NULL) {
585             unlinked = 0;
586 
587             nxt_thread_mutex_lock(&app->mutex);
588 
589             if (r->app_link.next != NULL) {
590                 nxt_queue_remove(&r->app_link);
591                 r->app_link.next = NULL;
592 
593                 unlinked = 1;
594             }
595 
596             nxt_thread_mutex_unlock(&app->mutex);
597 
598             if (unlinked) {
599                 nxt_mp_release(r->mem_pool);
600             }
601         }
602     }
603 
604     if (app != NULL) {
605         nxt_router_app_use(task, app, -1);
606 
607         req_rpc_data->app = NULL;
608     }
609 
610     if (req_rpc_data->msg_info.body_fd != -1) {
611         nxt_fd_close(req_rpc_data->msg_info.body_fd);
612 
613         req_rpc_data->msg_info.body_fd = -1;
614     }
615 
616     if (req_rpc_data->rpc_cancel) {
617         req_rpc_data->rpc_cancel = 0;
618 
619         nxt_port_rpc_cancel(task, task->thread->engine->port,
620                             req_rpc_data->stream);
621     }
622 }
623 
624 
625 static void
626 nxt_router_new_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
627 {
628     nxt_int_t      res;
629     nxt_app_t      *app;
630     nxt_port_t     *port, *main_app_port;
631     nxt_runtime_t  *rt;
632 
633     nxt_port_new_port_handler(task, msg);
634 
635     port = msg->u.new_port;
636 
637     if (port != NULL && port->type == NXT_PROCESS_CONTROLLER) {
638         nxt_router_greet_controller(task, msg->u.new_port);
639     }
640 
641     if (port == NULL || port->type != NXT_PROCESS_APP) {
642 
643         if (msg->port_msg.stream == 0) {
644             return;
645         }
646 
647         msg->port_msg.type = _NXT_PORT_MSG_RPC_ERROR;
648 
649     } else {
650         if (msg->fd[1] != -1) {
651             res = nxt_router_port_queue_map(task, port, msg->fd[1]);
652             if (nxt_slow_path(res != NXT_OK)) {
653                 return;
654             }
655 
656             nxt_fd_close(msg->fd[1]);
657             msg->fd[1] = -1;
658         }
659     }
660 
661     if (msg->port_msg.stream != 0) {
662         nxt_port_rpc_handler(task, msg);
663         return;
664     }
665 
666     /*
667      * Port with "id == 0" is application 'main' port and it always
668      * should come with non-zero stream.
669      */
670     nxt_assert(port->id != 0);
671 
672     /* Find 'main' app port and get app reference. */
673     rt = task->thread->runtime;
674 
675     /*
676      * It is safe to access 'runtime->ports' hash because 'NEW_PORT'
677      * sent to main port (with id == 0) and processed in main thread.
678      */
679     main_app_port = nxt_port_hash_find(&rt->ports, port->pid, 0);
680     nxt_assert(main_app_port != NULL);
681 
682     app = main_app_port->app;
683     nxt_assert(app != NULL);
684 
685     nxt_thread_mutex_lock(&app->mutex);
686 
687     /* TODO here should be find-and-add code because there can be
688        port waiters in port_hash */
689     nxt_port_hash_add(&app->port_hash, port);
690     app->port_hash_count++;
691 
692     nxt_thread_mutex_unlock(&app->mutex);
693 
694     port->app = app;
695     port->main_app_port = main_app_port;
696 
697     nxt_port_socket_write(task, port, NXT_PORT_MSG_PORT_ACK, -1, 0, 0, NULL);
698 }
699 
700 
701 static void
702 nxt_router_conf_data_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
703 {
704     void                    *p;
705     size_t                  size;
706     nxt_int_t               ret;
707     nxt_port_t              *port;
708     nxt_router_temp_conf_t  *tmcf;
709 
710     port = nxt_runtime_port_find(task->thread->runtime,
711                                  msg->port_msg.pid,
712                                  msg->port_msg.reply_port);
713     if (nxt_slow_path(port == NULL)) {
714         nxt_alert(task, "conf_data_handler: reply port not found");
715         return;
716     }
717 
718     p = MAP_FAILED;
719 
720     /*
721      * Ancient compilers like gcc 4.8.5 on CentOS 7 wants 'size' to be
722      * initialized in 'cleanup' section.
723      */
724     size = 0;
725 
726     tmcf = nxt_router_temp_conf(task);
727     if (nxt_slow_path(tmcf == NULL)) {
728         goto fail;
729     }
730 
731     if (nxt_slow_path(msg->fd[0] == -1)) {
732         nxt_alert(task, "conf_data_handler: invalid shm fd");
733         goto fail;
734     }
735 
736     if (nxt_buf_mem_used_size(&msg->buf->mem) != sizeof(size_t)) {
737         nxt_alert(task, "conf_data_handler: unexpected buffer size (%d)",
738                   (int) nxt_buf_mem_used_size(&msg->buf->mem));
739         goto fail;
740     }
741 
742     nxt_memcpy(&size, msg->buf->mem.pos, sizeof(size_t));
743 
744     p = nxt_mem_mmap(NULL, size, PROT_READ, MAP_SHARED, msg->fd[0], 0);
745 
746     nxt_fd_close(msg->fd[0]);
747     msg->fd[0] = -1;
748 
749     if (nxt_slow_path(p == MAP_FAILED)) {
750         goto fail;
751     }
752 
753     nxt_debug(task, "conf_data_handler(%uz): %*s", size, size, p);
754 
755     tmcf->router_conf->router = nxt_router;
756     tmcf->stream = msg->port_msg.stream;
757     tmcf->port = port;
758 
759     nxt_port_use(task, tmcf->port, 1);
760 
761     ret = nxt_router_conf_create(task, tmcf, p, nxt_pointer_to(p, size));
762 
763     if (nxt_fast_path(ret == NXT_OK)) {
764         nxt_router_conf_apply(task, tmcf, NULL);
765 
766     } else {
767         nxt_router_conf_error(task, tmcf);
768     }
769 
770     goto cleanup;
771 
772 fail:
773 
774     nxt_port_socket_write(task, port, NXT_PORT_MSG_RPC_ERROR, -1,
775                           msg->port_msg.stream, 0, NULL);
776 
777     if (tmcf != NULL) {
778         nxt_mp_release(tmcf->mem_pool);
779     }
780 
781 cleanup:
782 
783     if (p != MAP_FAILED) {
784         nxt_mem_munmap(p, size);
785     }
786 
787     if (msg->fd[0] != -1) {
788         nxt_fd_close(msg->fd[0]);
789         msg->fd[0] = -1;
790     }
791 }
792 
793 
794 static void
795 nxt_router_app_process_remove_pid(nxt_task_t *task, nxt_port_t *port,
796     void *data)
797 {
798     union {
799         nxt_pid_t  removed_pid;
800         void       *data;
801     } u;
802 
803     u.data = data;
804 
805     nxt_port_rpc_remove_peer(task, port, u.removed_pid);
806 }
807 
808 
809 static void
810 nxt_router_remove_pid_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
811 {
812     nxt_event_engine_t  *engine;
813 
814     nxt_port_remove_pid_handler(task, msg);
815 
816     nxt_queue_each(engine, &nxt_router->engines, nxt_event_engine_t, link0)
817     {
818         if (nxt_fast_path(engine->port != NULL)) {
819             nxt_port_post(task, engine->port, nxt_router_app_process_remove_pid,
820                           msg->u.data);
821         }
822     }
823     nxt_queue_loop;
824 
825     if (msg->port_msg.stream == 0) {
826         return;
827     }
828 
829     msg->port_msg.type = _NXT_PORT_MSG_RPC_ERROR;
830 
831     nxt_port_rpc_handler(task, msg);
832 }
833 
834 
835 static nxt_router_temp_conf_t *
836 nxt_router_temp_conf(nxt_task_t *task)
837 {
838     nxt_mp_t                *mp, *tmp;
839     nxt_router_conf_t       *rtcf;
840     nxt_router_temp_conf_t  *tmcf;
841 
842     mp = nxt_mp_create(1024, 128, 256, 32);
843     if (nxt_slow_path(mp == NULL)) {
844         return NULL;
845     }
846 
847     rtcf = nxt_mp_zget(mp, sizeof(nxt_router_conf_t));
848     if (nxt_slow_path(rtcf == NULL)) {
849         goto fail;
850     }
851 
852     rtcf->mem_pool = mp;
853 
854     tmp = nxt_mp_create(1024, 128, 256, 32);
855     if (nxt_slow_path(tmp == NULL)) {
856         goto fail;
857     }
858 
859     tmcf = nxt_mp_zget(tmp, sizeof(nxt_router_temp_conf_t));
860     if (nxt_slow_path(tmcf == NULL)) {
861         goto temp_fail;
862     }
863 
864     tmcf->mem_pool = tmp;
865     tmcf->router_conf = rtcf;
866     tmcf->count = 1;
867     tmcf->engine = task->thread->engine;
868 
869     tmcf->engines = nxt_array_create(tmcf->mem_pool, 4,
870                                      sizeof(nxt_router_engine_conf_t));
871     if (nxt_slow_path(tmcf->engines == NULL)) {
872         goto temp_fail;
873     }
874 
875     nxt_queue_init(&creating_sockets);
876     nxt_queue_init(&pending_sockets);
877     nxt_queue_init(&updating_sockets);
878     nxt_queue_init(&keeping_sockets);
879     nxt_queue_init(&deleting_sockets);
880 
881 #if (NXT_TLS)
882     nxt_queue_init(&tmcf->tls);
883 #endif
884 
885     nxt_queue_init(&tmcf->apps);
886     nxt_queue_init(&tmcf->previous);
887 
888     return tmcf;
889 
890 temp_fail:
891 
892     nxt_mp_destroy(tmp);
893 
894 fail:
895 
896     nxt_mp_destroy(mp);
897 
898     return NULL;
899 }
900 
901 
902 nxt_inline nxt_bool_t
903 nxt_router_app_can_start(nxt_app_t *app)
904 {
905     return app->processes + app->pending_processes < app->max_processes
906             && app->pending_processes < app->max_pending_processes;
907 }
908 
909 
910 nxt_inline nxt_bool_t
911 nxt_router_app_need_start(nxt_app_t *app)
912 {
913     return (app->active_requests
914               > app->port_hash_count + app->pending_processes)
915            || (app->spare_processes
916                 > app->idle_processes + app->pending_processes);
917 }
918 
919 
920 static void
921 nxt_router_conf_apply(nxt_task_t *task, void *obj, void *data)
922 {
923     nxt_int_t                    ret;
924     nxt_app_t                    *app;
925     nxt_router_t                 *router;
926     nxt_runtime_t                *rt;
927     nxt_queue_link_t             *qlk;
928     nxt_socket_conf_t            *skcf;
929     nxt_router_conf_t            *rtcf;
930     nxt_router_temp_conf_t       *tmcf;
931     const nxt_event_interface_t  *interface;
932 #if (NXT_TLS)
933     nxt_router_tlssock_t         *tls;
934 #endif
935 
936     tmcf = obj;
937 
938     qlk = nxt_queue_first(&pending_sockets);
939 
940     if (qlk != nxt_queue_tail(&pending_sockets)) {
941         nxt_queue_remove(qlk);
942         nxt_queue_insert_tail(&creating_sockets, qlk);
943 
944         skcf = nxt_queue_link_data(qlk, nxt_socket_conf_t, link);
945 
946         nxt_router_listen_socket_rpc_create(task, tmcf, skcf);
947 
948         return;
949     }
950 
951 #if (NXT_TLS)
952     qlk = nxt_queue_last(&tmcf->tls);
953 
954     if (qlk != nxt_queue_head(&tmcf->tls)) {
955         nxt_queue_remove(qlk);
956 
957         tls = nxt_queue_link_data(qlk, nxt_router_tlssock_t, link);
958 
959         tls->last = nxt_queue_is_empty(&tmcf->tls);
960 
961         nxt_cert_store_get(task, &tls->name, tmcf->mem_pool,
962                            nxt_router_tls_rpc_handler, tls);
963         return;
964     }
965 #endif
966 
967     nxt_queue_each(app, &tmcf->apps, nxt_app_t, link) {
968 
969         if (nxt_router_app_need_start(app)) {
970             nxt_router_app_rpc_create(task, tmcf, app);
971             return;
972         }
973 
974     } nxt_queue_loop;
975 
976     rtcf = tmcf->router_conf;
977 
978     if (rtcf->access_log != NULL && rtcf->access_log->fd == -1) {
979         nxt_router_access_log_open(task, tmcf);
980         return;
981     }
982 
983     rt = task->thread->runtime;
984 
985     interface = nxt_service_get(rt->services, "engine", NULL);
986 
987     router = rtcf->router;
988 
989     ret = nxt_router_engines_create(task, router, tmcf, interface);
990     if (nxt_slow_path(ret != NXT_OK)) {
991         goto fail;
992     }
993 
994     ret = nxt_router_threads_create(task, rt, tmcf);
995     if (nxt_slow_path(ret != NXT_OK)) {
996         goto fail;
997     }
998 
999     nxt_router_apps_sort(task, router, tmcf);
1000 
1001     nxt_router_apps_hash_use(task, rtcf, 1);
1002 
1003     nxt_router_engines_post(router, tmcf);
1004 
1005     nxt_queue_add(&router->sockets, &updating_sockets);
1006     nxt_queue_add(&router->sockets, &creating_sockets);
1007 
1008     router->access_log = rtcf->access_log;
1009 
1010     nxt_router_conf_ready(task, tmcf);
1011 
1012     return;
1013 
1014 fail:
1015 
1016     nxt_router_conf_error(task, tmcf);
1017 
1018     return;
1019 }
1020 
1021 
1022 static void
1023 nxt_router_conf_wait(nxt_task_t *task, void *obj, void *data)
1024 {
1025     nxt_joint_job_t  *job;
1026 
1027     job = obj;
1028 
1029     nxt_router_conf_ready(task, job->tmcf);
1030 }
1031 
1032 
1033 static void
1034 nxt_router_conf_ready(nxt_task_t *task, nxt_router_temp_conf_t *tmcf)
1035 {
1036     uint32_t               count;
1037     nxt_router_conf_t      *rtcf;
1038     nxt_thread_spinlock_t  *lock;
1039 
1040     nxt_debug(task, "temp conf %p count: %D", tmcf, tmcf->count);
1041 
1042     if (--tmcf->count > 0) {
1043         return;
1044     }
1045 
1046     nxt_router_conf_send(task, tmcf, NXT_PORT_MSG_RPC_READY_LAST);
1047 
1048     rtcf = tmcf->router_conf;
1049 
1050     lock = &rtcf->router->lock;
1051 
1052     nxt_thread_spin_lock(lock);
1053 
1054     count = rtcf->count;
1055 
1056     nxt_thread_spin_unlock(lock);
1057 
1058     nxt_debug(task, "rtcf %p: %D", rtcf, count);
1059 
1060     if (count == 0) {
1061         nxt_router_apps_hash_use(task, rtcf, -1);
1062 
1063         nxt_router_access_log_release(task, lock, rtcf->access_log);
1064 
1065         nxt_mp_destroy(rtcf->mem_pool);
1066     }
1067 
1068     nxt_mp_release(tmcf->mem_pool);
1069 }
1070 
1071 
1072 static void
1073 nxt_router_conf_error(nxt_task_t *task, nxt_router_temp_conf_t *tmcf)
1074 {
1075     nxt_app_t          *app;
1076     nxt_queue_t        new_socket_confs;
1077     nxt_socket_t       s;
1078     nxt_router_t       *router;
1079     nxt_queue_link_t   *qlk;
1080     nxt_socket_conf_t  *skcf;
1081     nxt_router_conf_t  *rtcf;
1082 
1083     nxt_alert(task, "failed to apply new conf");
1084 
1085     for (qlk = nxt_queue_first(&creating_sockets);
1086          qlk != nxt_queue_tail(&creating_sockets);
1087          qlk = nxt_queue_next(qlk))
1088     {
1089         skcf = nxt_queue_link_data(qlk, nxt_socket_conf_t, link);
1090         s = skcf->listen->socket;
1091 
1092         if (s != -1) {
1093             nxt_socket_close(task, s);
1094         }
1095 
1096         nxt_free(skcf->listen);
1097     }
1098 
1099     nxt_queue_init(&new_socket_confs);
1100     nxt_queue_add(&new_socket_confs, &updating_sockets);
1101     nxt_queue_add(&new_socket_confs, &pending_sockets);
1102     nxt_queue_add(&new_socket_confs, &creating_sockets);
1103 
1104     rtcf = tmcf->router_conf;
1105 
1106     nxt_queue_each(app, &tmcf->apps, nxt_app_t, link) {
1107 
1108         nxt_router_app_unlink(task, app);
1109 
1110     } nxt_queue_loop;
1111 
1112     router = rtcf->router;
1113 
1114     nxt_queue_add(&router->sockets, &keeping_sockets);
1115     nxt_queue_add(&router->sockets, &deleting_sockets);
1116 
1117     nxt_queue_add(&router->apps, &tmcf->previous);
1118 
1119     // TODO: new engines and threads
1120 
1121     nxt_router_access_log_release(task, &router->lock, rtcf->access_log);
1122 
1123     nxt_mp_destroy(rtcf->mem_pool);
1124 
1125     nxt_router_conf_send(task, tmcf, NXT_PORT_MSG_RPC_ERROR);
1126 
1127     nxt_mp_release(tmcf->mem_pool);
1128 }
1129 
1130 
1131 static void
1132 nxt_router_conf_send(nxt_task_t *task, nxt_router_temp_conf_t *tmcf,
1133     nxt_port_msg_type_t type)
1134 {
1135     nxt_port_socket_write(task, tmcf->port, type, -1, tmcf->stream, 0, NULL);
1136 
1137     nxt_port_use(task, tmcf->port, -1);
1138 
1139     tmcf->port = NULL;
1140 }
1141 
1142 
1143 static nxt_conf_map_t  nxt_router_conf[] = {
1144     {
1145         nxt_string("listeners_threads"),
1146         NXT_CONF_MAP_INT32,
1147         offsetof(nxt_router_conf_t, threads),
1148     },
1149 };
1150 
1151 
1152 static nxt_conf_map_t  nxt_router_app_conf[] = {
1153     {
1154         nxt_string("type"),
1155         NXT_CONF_MAP_STR,
1156         offsetof(nxt_router_app_conf_t, type),
1157     },
1158 
1159     {
1160         nxt_string("limits"),
1161         NXT_CONF_MAP_PTR,
1162         offsetof(nxt_router_app_conf_t, limits_value),
1163     },
1164 
1165     {
1166         nxt_string("processes"),
1167         NXT_CONF_MAP_INT32,
1168         offsetof(nxt_router_app_conf_t, processes),
1169     },
1170 
1171     {
1172         nxt_string("processes"),
1173         NXT_CONF_MAP_PTR,
1174         offsetof(nxt_router_app_conf_t, processes_value),
1175     },
1176 
1177     {
1178         nxt_string("targets"),
1179         NXT_CONF_MAP_PTR,
1180         offsetof(nxt_router_app_conf_t, targets_value),
1181     },
1182 };
1183 
1184 
1185 static nxt_conf_map_t  nxt_router_app_limits_conf[] = {
1186     {
1187         nxt_string("timeout"),
1188         NXT_CONF_MAP_MSEC,
1189         offsetof(nxt_router_app_conf_t, timeout),
1190     },
1191 
1192     {
1193         nxt_string("requests"),
1194         NXT_CONF_MAP_INT32,
1195         offsetof(nxt_router_app_conf_t, requests),
1196     },
1197 };
1198 
1199 
1200 static nxt_conf_map_t  nxt_router_app_processes_conf[] = {
1201     {
1202         nxt_string("spare"),
1203         NXT_CONF_MAP_INT32,
1204         offsetof(nxt_router_app_conf_t, spare_processes),
1205     },
1206 
1207     {
1208         nxt_string("max"),
1209         NXT_CONF_MAP_INT32,
1210         offsetof(nxt_router_app_conf_t, max_processes),
1211     },
1212 
1213     {
1214         nxt_string("idle_timeout"),
1215         NXT_CONF_MAP_MSEC,
1216         offsetof(nxt_router_app_conf_t, idle_timeout),
1217     },
1218 };
1219 
1220 
1221 static nxt_conf_map_t  nxt_router_listener_conf[] = {
1222     {
1223         nxt_string("pass"),
1224         NXT_CONF_MAP_STR_COPY,
1225         offsetof(nxt_router_listener_conf_t, pass),
1226     },
1227 
1228     {
1229         nxt_string("application"),
1230         NXT_CONF_MAP_STR_COPY,
1231         offsetof(nxt_router_listener_conf_t, application),
1232     },
1233 };
1234 
1235 
1236 static nxt_conf_map_t  nxt_router_http_conf[] = {
1237     {
1238         nxt_string("header_buffer_size"),
1239         NXT_CONF_MAP_SIZE,
1240         offsetof(nxt_socket_conf_t, header_buffer_size),
1241     },
1242 
1243     {
1244         nxt_string("large_header_buffer_size"),
1245         NXT_CONF_MAP_SIZE,
1246         offsetof(nxt_socket_conf_t, large_header_buffer_size),
1247     },
1248 
1249     {
1250         nxt_string("large_header_buffers"),
1251         NXT_CONF_MAP_SIZE,
1252         offsetof(nxt_socket_conf_t, large_header_buffers),
1253     },
1254 
1255     {
1256         nxt_string("body_buffer_size"),
1257         NXT_CONF_MAP_SIZE,
1258         offsetof(nxt_socket_conf_t, body_buffer_size),
1259     },
1260 
1261     {
1262         nxt_string("max_body_size"),
1263         NXT_CONF_MAP_SIZE,
1264         offsetof(nxt_socket_conf_t, max_body_size),
1265     },
1266 
1267     {
1268         nxt_string("idle_timeout"),
1269         NXT_CONF_MAP_MSEC,
1270         offsetof(nxt_socket_conf_t, idle_timeout),
1271     },
1272 
1273     {
1274         nxt_string("header_read_timeout"),
1275         NXT_CONF_MAP_MSEC,
1276         offsetof(nxt_socket_conf_t, header_read_timeout),
1277     },
1278 
1279     {
1280         nxt_string("body_read_timeout"),
1281         NXT_CONF_MAP_MSEC,
1282         offsetof(nxt_socket_conf_t, body_read_timeout),
1283     },
1284 
1285     {
1286         nxt_string("send_timeout"),
1287         NXT_CONF_MAP_MSEC,
1288         offsetof(nxt_socket_conf_t, send_timeout),
1289     },
1290 
1291     {
1292         nxt_string("body_temp_path"),
1293         NXT_CONF_MAP_STR,
1294         offsetof(nxt_socket_conf_t, body_temp_path),
1295     },
1296 
1297     {
1298         nxt_string("discard_unsafe_fields"),
1299         NXT_CONF_MAP_INT8,
1300         offsetof(nxt_socket_conf_t, discard_unsafe_fields),
1301     },
1302 };
1303 
1304 
1305 static nxt_conf_map_t  nxt_router_websocket_conf[] = {
1306     {
1307         nxt_string("max_frame_size"),
1308         NXT_CONF_MAP_SIZE,
1309         offsetof(nxt_websocket_conf_t, max_frame_size),
1310     },
1311 
1312     {
1313         nxt_string("read_timeout"),
1314         NXT_CONF_MAP_MSEC,
1315         offsetof(nxt_websocket_conf_t, read_timeout),
1316     },
1317 
1318     {
1319         nxt_string("keepalive_interval"),
1320         NXT_CONF_MAP_MSEC,
1321         offsetof(nxt_websocket_conf_t, keepalive_interval),
1322     },
1323 
1324 };
1325 
1326 
1327 static nxt_int_t
1328 nxt_router_conf_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf,
1329     u_char *start, u_char *end)
1330 {
1331     u_char                      *p;
1332     size_t                      size;
1333     nxt_mp_t                    *mp, *app_mp;
1334     uint32_t                    next, next_target;
1335     nxt_int_t                   ret;
1336     nxt_str_t                   name, path, target;
1337     nxt_app_t                   *app, *prev;
1338     nxt_str_t                   *t, *s, *targets;
1339     nxt_uint_t                  n, i;
1340     nxt_port_t                  *port;
1341     nxt_router_t                *router;
1342     nxt_app_joint_t             *app_joint;
1343 #if (NXT_TLS)
1344     nxt_conf_value_t            *certificate, *conf_cmds;
1345 #endif
1346     nxt_conf_value_t            *conf, *http, *value, *websocket;
1347     nxt_conf_value_t            *applications, *application;
1348     nxt_conf_value_t            *listeners, *listener;
1349     nxt_conf_value_t            *routes_conf, *static_conf;
1350     nxt_socket_conf_t           *skcf;
1351     nxt_http_routes_t           *routes;
1352     nxt_event_engine_t          *engine;
1353     nxt_app_lang_module_t       *lang;
1354     nxt_router_app_conf_t       apcf;
1355     nxt_router_access_log_t     *access_log;
1356     nxt_router_listener_conf_t  lscf;
1357 
1358     static nxt_str_t  http_path = nxt_string("/settings/http");
1359     static nxt_str_t  applications_path = nxt_string("/applications");
1360     static nxt_str_t  listeners_path = nxt_string("/listeners");
1361     static nxt_str_t  routes_path = nxt_string("/routes");
1362     static nxt_str_t  access_log_path = nxt_string("/access_log");
1363 #if (NXT_TLS)
1364     static nxt_str_t  certificate_path = nxt_string("/tls/certificate");
1365     static nxt_str_t  conf_commands_path = nxt_string("/tls/conf_commands");
1366 #endif
1367     static nxt_str_t  static_path = nxt_string("/settings/http/static");
1368     static nxt_str_t  websocket_path = nxt_string("/settings/http/websocket");
1369 
1370     conf = nxt_conf_json_parse(tmcf->mem_pool, start, end, NULL);
1371     if (conf == NULL) {
1372         nxt_alert(task, "configuration parsing error");
1373         return NXT_ERROR;
1374     }
1375 
1376     mp = tmcf->router_conf->mem_pool;
1377 
1378     ret = nxt_conf_map_object(mp, conf, nxt_router_conf,
1379                               nxt_nitems(nxt_router_conf), tmcf->router_conf);
1380     if (ret != NXT_OK) {
1381         nxt_alert(task, "root map error");
1382         return NXT_ERROR;
1383     }
1384 
1385     if (tmcf->router_conf->threads == 0) {
1386         tmcf->router_conf->threads = nxt_ncpu;
1387     }
1388 
1389     static_conf = nxt_conf_get_path(conf, &static_path);
1390 
1391     ret = nxt_router_conf_process_static(task, tmcf->router_conf, static_conf);
1392     if (nxt_slow_path(ret != NXT_OK)) {
1393         return NXT_ERROR;
1394     }
1395 
1396     router = tmcf->router_conf->router;
1397 
1398     applications = nxt_conf_get_path(conf, &applications_path);
1399 
1400     if (applications != NULL) {
1401         next = 0;
1402 
1403         for ( ;; ) {
1404             application = nxt_conf_next_object_member(applications,
1405                                                       &name, &next);
1406             if (application == NULL) {
1407                 break;
1408             }
1409 
1410             nxt_debug(task, "application \"%V\"", &name);
1411 
1412             size = nxt_conf_json_length(application, NULL);
1413 
1414             app_mp = nxt_mp_create(4096, 128, 1024, 64);
1415             if (nxt_slow_path(app_mp == NULL)) {
1416                 goto fail;
1417             }
1418 
1419             app = nxt_mp_get(app_mp, sizeof(nxt_app_t) + name.length + size);
1420             if (app == NULL) {
1421                 goto app_fail;
1422             }
1423 
1424             nxt_memzero(app, sizeof(nxt_app_t));
1425 
1426             app->mem_pool = app_mp;
1427 
1428             app->name.start = nxt_pointer_to(app, sizeof(nxt_app_t));
1429             app->conf.start = nxt_pointer_to(app, sizeof(nxt_app_t)
1430                                                   + name.length);
1431 
1432             p = nxt_conf_json_print(app->conf.start, application, NULL);
1433             app->conf.length = p - app->conf.start;
1434 
1435             nxt_assert(app->conf.length <= size);
1436 
1437             nxt_debug(task, "application conf \"%V\"", &app->conf);
1438 
1439             prev = nxt_router_app_find(&router->apps, &name);
1440 
1441             if (prev != NULL && nxt_strstr_eq(&app->conf, &prev->conf)) {
1442                 nxt_mp_destroy(app_mp);
1443 
1444                 nxt_queue_remove(&prev->link);
1445                 nxt_queue_insert_tail(&tmcf->previous, &prev->link);
1446 
1447                 ret = nxt_router_apps_hash_add(tmcf->router_conf, prev);
1448                 if (nxt_slow_path(ret != NXT_OK)) {
1449                     goto fail;
1450                 }
1451 
1452                 continue;
1453             }
1454 
1455             apcf.processes = 1;
1456             apcf.max_processes = 1;
1457             apcf.spare_processes = 0;
1458             apcf.timeout = 0;
1459             apcf.idle_timeout = 15000;
1460             apcf.requests = 0;
1461             apcf.limits_value = NULL;
1462             apcf.processes_value = NULL;
1463             apcf.targets_value = NULL;
1464 
1465             app_joint = nxt_malloc(sizeof(nxt_app_joint_t));
1466             if (nxt_slow_path(app_joint == NULL)) {
1467                 goto app_fail;
1468             }
1469 
1470             nxt_memzero(app_joint, sizeof(nxt_app_joint_t));
1471 
1472             ret = nxt_conf_map_object(mp, application, nxt_router_app_conf,
1473                                       nxt_nitems(nxt_router_app_conf), &apcf);
1474             if (ret != NXT_OK) {
1475                 nxt_alert(task, "application map error");
1476                 goto app_fail;
1477             }
1478 
1479             if (apcf.limits_value != NULL) {
1480 
1481                 if (nxt_conf_type(apcf.limits_value) != NXT_CONF_OBJECT) {
1482                     nxt_alert(task, "application limits is not object");
1483                     goto app_fail;
1484                 }
1485 
1486                 ret = nxt_conf_map_object(mp, apcf.limits_value,
1487                                         nxt_router_app_limits_conf,
1488                                         nxt_nitems(nxt_router_app_limits_conf),
1489                                         &apcf);
1490                 if (ret != NXT_OK) {
1491                     nxt_alert(task, "application limits map error");
1492                     goto app_fail;
1493                 }
1494             }
1495 
1496             if (apcf.processes_value != NULL
1497                 && nxt_conf_type(apcf.processes_value) == NXT_CONF_OBJECT)
1498             {
1499                 ret = nxt_conf_map_object(mp, apcf.processes_value,
1500                                      nxt_router_app_processes_conf,
1501                                      nxt_nitems(nxt_router_app_processes_conf),
1502                                      &apcf);
1503                 if (ret != NXT_OK) {
1504                     nxt_alert(task, "application processes map error");
1505                     goto app_fail;
1506                 }
1507 
1508             } else {
1509                 apcf.max_processes = apcf.processes;
1510                 apcf.spare_processes = apcf.processes;
1511             }
1512 
1513             if (apcf.targets_value != NULL) {
1514                 n = nxt_conf_object_members_count(apcf.targets_value);
1515 
1516                 targets = nxt_mp_get(app_mp, sizeof(nxt_str_t) * n);
1517                 if (nxt_slow_path(targets == NULL)) {
1518                     goto app_fail;
1519                 }
1520 
1521                 next_target = 0;
1522 
1523                 for (i = 0; i < n; i++) {
1524                     (void) nxt_conf_next_object_member(apcf.targets_value,
1525                                                        &target, &next_target);
1526 
1527                     s = nxt_str_dup(app_mp, &targets[i], &target);
1528                     if (nxt_slow_path(s == NULL)) {
1529                         goto app_fail;
1530                     }
1531                 }
1532 
1533             } else {
1534                 targets = NULL;
1535             }
1536 
1537             nxt_debug(task, "application type: %V", &apcf.type);
1538             nxt_debug(task, "application processes: %D", apcf.processes);
1539             nxt_debug(task, "application request timeout: %M", apcf.timeout);
1540             nxt_debug(task, "application requests: %D", apcf.requests);
1541 
1542             lang = nxt_app_lang_module(task->thread->runtime, &apcf.type);
1543 
1544             if (lang == NULL) {
1545                 nxt_alert(task, "unknown application type: \"%V\"", &apcf.type);
1546                 goto app_fail;
1547             }
1548 
1549             nxt_debug(task, "application language module: \"%s\"", lang->file);
1550 
1551             ret = nxt_thread_mutex_create(&app->mutex);
1552             if (ret != NXT_OK) {
1553                 goto app_fail;
1554             }
1555 
1556             nxt_queue_init(&app->ports);
1557             nxt_queue_init(&app->spare_ports);
1558             nxt_queue_init(&app->idle_ports);
1559             nxt_queue_init(&app->ack_waiting_req);
1560 
1561             app->name.length = name.length;
1562             nxt_memcpy(app->name.start, name.start, name.length);
1563 
1564             app->type = lang->type;
1565             app->max_processes = apcf.max_processes;
1566             app->spare_processes = apcf.spare_processes;
1567             app->max_pending_processes = apcf.spare_processes
1568                                          ? apcf.spare_processes : 1;
1569             app->timeout = apcf.timeout;
1570             app->idle_timeout = apcf.idle_timeout;
1571             app->max_requests = apcf.requests;
1572 
1573             app->targets = targets;
1574 
1575             engine = task->thread->engine;
1576 
1577             app->engine = engine;
1578 
1579             app->adjust_idle_work.handler = nxt_router_adjust_idle_timer;
1580             app->adjust_idle_work.task = &engine->task;
1581             app->adjust_idle_work.obj = app;
1582 
1583             nxt_queue_insert_tail(&tmcf->apps, &app->link);
1584 
1585             ret = nxt_router_apps_hash_add(tmcf->router_conf, app);
1586             if (nxt_slow_path(ret != NXT_OK)) {
1587                 goto app_fail;
1588             }
1589 
1590             nxt_router_app_use(task, app, 1);
1591 
1592             app->joint = app_joint;
1593 
1594             app_joint->use_count = 1;
1595             app_joint->app = app;
1596 
1597             app_joint->idle_timer.bias = NXT_TIMER_DEFAULT_BIAS;
1598             app_joint->idle_timer.work_queue = &engine->fast_work_queue;
1599             app_joint->idle_timer.handler = nxt_router_app_idle_timeout;
1600             app_joint->idle_timer.task = &engine->task;
1601             app_joint->idle_timer.log = app_joint->idle_timer.task->log;
1602 
1603             app_joint->free_app_work.handler = nxt_router_free_app;
1604             app_joint->free_app_work.task = &engine->task;
1605             app_joint->free_app_work.obj = app_joint;
1606 
1607             port = nxt_port_new(task, (nxt_port_id_t) -1, nxt_pid,
1608                                 NXT_PROCESS_APP);
1609             if (nxt_slow_path(port == NULL)) {
1610                 return NXT_ERROR;
1611             }
1612 
1613             ret = nxt_port_socket_init(task, port, 0);
1614             if (nxt_slow_path(ret != NXT_OK)) {
1615                 nxt_port_use(task, port, -1);
1616                 return NXT_ERROR;
1617             }
1618 
1619             ret = nxt_router_app_queue_init(task, port);
1620             if (nxt_slow_path(ret != NXT_OK)) {
1621                 nxt_port_write_close(port);
1622                 nxt_port_read_close(port);
1623                 nxt_port_use(task, port, -1);
1624                 return NXT_ERROR;
1625             }
1626 
1627             nxt_port_write_enable(task, port);
1628             port->app = app;
1629 
1630             app->shared_port = port;
1631 
1632             nxt_thread_mutex_create(&app->outgoing.mutex);
1633         }
1634     }
1635 
1636     routes_conf = nxt_conf_get_path(conf, &routes_path);
1637     if (nxt_fast_path(routes_conf != NULL)) {
1638         routes = nxt_http_routes_create(task, tmcf, routes_conf);
1639         if (nxt_slow_path(routes == NULL)) {
1640             return NXT_ERROR;
1641         }
1642         tmcf->router_conf->routes = routes;
1643     }
1644 
1645     ret = nxt_upstreams_create(task, tmcf, conf);
1646     if (nxt_slow_path(ret != NXT_OK)) {
1647         return ret;
1648     }
1649 
1650     http = nxt_conf_get_path(conf, &http_path);
1651 #if 0
1652     if (http == NULL) {
1653         nxt_alert(task, "no \"http\" block");
1654         return NXT_ERROR;
1655     }
1656 #endif
1657 
1658     websocket = nxt_conf_get_path(conf, &websocket_path);
1659 
1660     listeners = nxt_conf_get_path(conf, &listeners_path);
1661 
1662     if (listeners != NULL) {
1663         next = 0;
1664 
1665         for ( ;; ) {
1666             listener = nxt_conf_next_object_member(listeners, &name, &next);
1667             if (listener == NULL) {
1668                 break;
1669             }
1670 
1671             skcf = nxt_router_socket_conf(task, tmcf, &name);
1672             if (skcf == NULL) {
1673                 goto fail;
1674             }
1675 
1676             nxt_memzero(&lscf, sizeof(lscf));
1677 
1678             ret = nxt_conf_map_object(mp, listener, nxt_router_listener_conf,
1679                                       nxt_nitems(nxt_router_listener_conf),
1680                                       &lscf);
1681             if (ret != NXT_OK) {
1682                 nxt_alert(task, "listener map error");
1683                 goto fail;
1684             }
1685 
1686             nxt_debug(task, "application: %V", &lscf.application);
1687 
1688             // STUB, default values if http block is not defined.
1689             skcf->header_buffer_size = 2048;
1690             skcf->large_header_buffer_size = 8192;
1691             skcf->large_header_buffers = 4;
1692             skcf->discard_unsafe_fields = 1;
1693             skcf->body_buffer_size = 16 * 1024;
1694             skcf->max_body_size = 8 * 1024 * 1024;
1695             skcf->proxy_header_buffer_size = 64 * 1024;
1696             skcf->proxy_buffer_size = 4096;
1697             skcf->proxy_buffers = 256;
1698             skcf->idle_timeout = 180 * 1000;
1699             skcf->header_read_timeout = 30 * 1000;
1700             skcf->body_read_timeout = 30 * 1000;
1701             skcf->send_timeout = 30 * 1000;
1702             skcf->proxy_timeout = 60 * 1000;
1703             skcf->proxy_send_timeout = 30 * 1000;
1704             skcf->proxy_read_timeout = 30 * 1000;
1705 
1706             skcf->websocket_conf.max_frame_size = 1024 * 1024;
1707             skcf->websocket_conf.read_timeout = 60 * 1000;
1708             skcf->websocket_conf.keepalive_interval = 30 * 1000;
1709 
1710             nxt_str_null(&skcf->body_temp_path);
1711 
1712             if (http != NULL) {
1713                 ret = nxt_conf_map_object(mp, http, nxt_router_http_conf,
1714                                           nxt_nitems(nxt_router_http_conf),
1715                                           skcf);
1716                 if (ret != NXT_OK) {
1717                     nxt_alert(task, "http map error");
1718                     goto fail;
1719                 }
1720             }
1721 
1722             if (websocket != NULL) {
1723                 ret = nxt_conf_map_object(mp, websocket,
1724                                           nxt_router_websocket_conf,
1725                                           nxt_nitems(nxt_router_websocket_conf),
1726                                           &skcf->websocket_conf);
1727                 if (ret != NXT_OK) {
1728                     nxt_alert(task, "websocket map error");
1729                     goto fail;
1730                 }
1731             }
1732 
1733             t = &skcf->body_temp_path;
1734 
1735             if (t->length == 0) {
1736                 t->start = (u_char *) task->thread->runtime->tmp;
1737                 t->length = nxt_strlen(t->start);
1738             }
1739 
1740 #if (NXT_TLS)
1741             certificate = nxt_conf_get_path(listener, &certificate_path);
1742 
1743             if (certificate != NULL) {
1744                 conf_cmds = nxt_conf_get_path(listener, &conf_commands_path);
1745 
1746                 if (nxt_conf_type(certificate) == NXT_CONF_ARRAY) {
1747                     n = nxt_conf_array_elements_count(certificate);
1748 
1749                     for (i = 0; i < n; i++) {
1750                         value = nxt_conf_get_array_element(certificate, i);
1751 
1752                         nxt_assert(value != NULL);
1753 
1754                         ret = nxt_router_conf_tls_insert(tmcf, value, skcf,
1755                                                          conf_cmds);
1756                         if (nxt_slow_path(ret != NXT_OK)) {
1757                             goto fail;
1758                         }
1759                     }
1760 
1761                 } else {
1762                     /* NXT_CONF_STRING */
1763                     ret = nxt_router_conf_tls_insert(tmcf, certificate, skcf,
1764                                                      conf_cmds);
1765                     if (nxt_slow_path(ret != NXT_OK)) {
1766                         goto fail;
1767                     }
1768                 }
1769             }
1770 #endif
1771 
1772             skcf->listen->handler = nxt_http_conn_init;
1773             skcf->router_conf = tmcf->router_conf;
1774             skcf->router_conf->count++;
1775 
1776             if (lscf.pass.length != 0) {
1777                 skcf->action = nxt_http_action_create(task, tmcf, &lscf.pass);
1778 
1779             /* COMPATIBILITY: listener application. */
1780             } else if (lscf.application.length > 0) {
1781                 skcf->action = nxt_http_pass_application(task,
1782                                                          tmcf->router_conf,
1783                                                          &lscf.application);
1784             }
1785 
1786             if (nxt_slow_path(skcf->action == NULL)) {
1787                 goto fail;
1788             }
1789         }
1790     }
1791 
1792     ret = nxt_http_routes_resolve(task, tmcf);
1793     if (nxt_slow_path(ret != NXT_OK)) {
1794         goto fail;
1795     }
1796 
1797     value = nxt_conf_get_path(conf, &access_log_path);
1798 
1799     if (value != NULL) {
1800         nxt_conf_get_string(value, &path);
1801 
1802         access_log = router->access_log;
1803 
1804         if (access_log != NULL && nxt_strstr_eq(&path, &access_log->path)) {
1805             nxt_thread_spin_lock(&router->lock);
1806             access_log->count++;
1807             nxt_thread_spin_unlock(&router->lock);
1808 
1809         } else {
1810             access_log = nxt_malloc(sizeof(nxt_router_access_log_t)
1811                                     + path.length);
1812             if (access_log == NULL) {
1813                 nxt_alert(task, "failed to allocate access log structure");
1814                 goto fail;
1815             }
1816 
1817             access_log->fd = -1;
1818             access_log->handler = &nxt_router_access_log_writer;
1819             access_log->count = 1;
1820 
1821             access_log->path.length = path.length;
1822             access_log->path.start = (u_char *) access_log
1823                                      + sizeof(nxt_router_access_log_t);
1824 
1825             nxt_memcpy(access_log->path.start, path.start, path.length);
1826         }
1827 
1828         tmcf->router_conf->access_log = access_log;
1829     }
1830 
1831     nxt_queue_add(&deleting_sockets, &router->sockets);
1832     nxt_queue_init(&router->sockets);
1833 
1834     return NXT_OK;
1835 
1836 app_fail:
1837 
1838     nxt_mp_destroy(app_mp);
1839 
1840 fail:
1841 
1842     nxt_queue_each(app, &tmcf->apps, nxt_app_t, link) {
1843 
1844         nxt_queue_remove(&app->link);
1845         nxt_thread_mutex_destroy(&app->mutex);
1846         nxt_mp_destroy(app->mem_pool);
1847 
1848     } nxt_queue_loop;
1849 
1850     return NXT_ERROR;
1851 }
1852 
1853 
1854 #if (NXT_TLS)
1855 
1856 static nxt_int_t
1857 nxt_router_conf_tls_insert(nxt_router_temp_conf_t *tmcf,
1858     nxt_conf_value_t *value, nxt_socket_conf_t *skcf,
1859     nxt_conf_value_t *conf_cmds)
1860 {
1861     nxt_router_tlssock_t  *tls;
1862 
1863     tls = nxt_mp_get(tmcf->mem_pool, sizeof(nxt_router_tlssock_t));
1864     if (nxt_slow_path(tls == NULL)) {
1865         return NXT_ERROR;
1866     }
1867 
1868     tls->socket_conf = skcf;
1869     tls->conf_cmds = conf_cmds;
1870     tls->temp_conf = tmcf;
1871     nxt_conf_get_string(value, &tls->name);
1872 
1873     nxt_queue_insert_tail(&tmcf->tls, &tls->link);
1874 
1875     return NXT_OK;
1876 }
1877 
1878 #endif
1879 
1880 
1881 static nxt_int_t
1882 nxt_router_conf_process_static(nxt_task_t *task, nxt_router_conf_t *rtcf,
1883     nxt_conf_value_t *conf)
1884 {
1885     uint32_t          next, i;
1886     nxt_mp_t          *mp;
1887     nxt_str_t         *type, extension, str;
1888     nxt_int_t         ret;
1889     nxt_uint_t        exts;
1890     nxt_conf_value_t  *mtypes_conf, *ext_conf, *value;
1891 
1892     static nxt_str_t  mtypes_path = nxt_string("/mime_types");
1893 
1894     mp = rtcf->mem_pool;
1895 
1896     ret = nxt_http_static_mtypes_init(mp, &rtcf->mtypes_hash);
1897     if (nxt_slow_path(ret != NXT_OK)) {
1898         return NXT_ERROR;
1899     }
1900 
1901     if (conf == NULL) {
1902         return NXT_OK;
1903     }
1904 
1905     mtypes_conf = nxt_conf_get_path(conf, &mtypes_path);
1906 
1907     if (mtypes_conf != NULL) {
1908         next = 0;
1909 
1910         for ( ;; ) {
1911             ext_conf = nxt_conf_next_object_member(mtypes_conf, &str, &next);
1912 
1913             if (ext_conf == NULL) {
1914                 break;
1915             }
1916 
1917             type = nxt_str_dup(mp, NULL, &str);
1918             if (nxt_slow_path(type == NULL)) {
1919                 return NXT_ERROR;
1920             }
1921 
1922             if (nxt_conf_type(ext_conf) == NXT_CONF_STRING) {
1923                 nxt_conf_get_string(ext_conf, &str);
1924 
1925                 if (nxt_slow_path(nxt_str_dup(mp, &extension, &str) == NULL)) {
1926                     return NXT_ERROR;
1927                 }
1928 
1929                 ret = nxt_http_static_mtypes_hash_add(mp, &rtcf->mtypes_hash,
1930                                                       &extension, type);
1931                 if (nxt_slow_path(ret != NXT_OK)) {
1932                     return NXT_ERROR;
1933                 }
1934 
1935                 continue;
1936             }
1937 
1938             exts = nxt_conf_array_elements_count(ext_conf);
1939 
1940             for (i = 0; i < exts; i++) {
1941                 value = nxt_conf_get_array_element(ext_conf, i);
1942 
1943                 nxt_conf_get_string(value, &str);
1944 
1945                 if (nxt_slow_path(nxt_str_dup(mp, &extension, &str) == NULL)) {
1946                     return NXT_ERROR;
1947                 }
1948 
1949                 ret = nxt_http_static_mtypes_hash_add(mp, &rtcf->mtypes_hash,
1950                                                       &extension, type);
1951                 if (nxt_slow_path(ret != NXT_OK)) {
1952                     return NXT_ERROR;
1953                 }
1954             }
1955         }
1956     }
1957 
1958     return NXT_OK;
1959 }
1960 
1961 
1962 static nxt_app_t *
1963 nxt_router_app_find(nxt_queue_t *queue, nxt_str_t *name)
1964 {
1965     nxt_app_t  *app;
1966 
1967     nxt_queue_each(app, queue, nxt_app_t, link) {
1968 
1969         if (nxt_strstr_eq(name, &app->name)) {
1970             return app;
1971         }
1972 
1973     } nxt_queue_loop;
1974 
1975     return NULL;
1976 }
1977 
1978 
1979 static nxt_int_t
1980 nxt_router_app_queue_init(nxt_task_t *task, nxt_port_t *port)
1981 {
1982     void       *mem;
1983     nxt_int_t  fd;
1984 
1985     fd = nxt_shm_open(task, sizeof(nxt_app_queue_t));
1986     if (nxt_slow_path(fd == -1)) {
1987         return NXT_ERROR;
1988     }
1989 
1990     mem = nxt_mem_mmap(NULL, sizeof(nxt_app_queue_t),
1991                        PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
1992     if (nxt_slow_path(mem == MAP_FAILED)) {
1993         nxt_fd_close(fd);
1994 
1995         return NXT_ERROR;
1996     }
1997 
1998     nxt_app_queue_init(mem);
1999 
2000     port->queue_fd = fd;
2001     port->queue = mem;
2002 
2003     return NXT_OK;
2004 }
2005 
2006 
2007 static nxt_int_t
2008 nxt_router_port_queue_init(nxt_task_t *task, nxt_port_t *port)
2009 {
2010     void       *mem;
2011     nxt_int_t  fd;
2012 
2013     fd = nxt_shm_open(task, sizeof(nxt_port_queue_t));
2014     if (nxt_slow_path(fd == -1)) {
2015         return NXT_ERROR;
2016     }
2017 
2018     mem = nxt_mem_mmap(NULL, sizeof(nxt_port_queue_t),
2019                        PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
2020     if (nxt_slow_path(mem == MAP_FAILED)) {
2021         nxt_fd_close(fd);
2022 
2023         return NXT_ERROR;
2024     }
2025 
2026     nxt_port_queue_init(mem);
2027 
2028     port->queue_fd = fd;
2029     port->queue = mem;
2030 
2031     return NXT_OK;
2032 }
2033 
2034 
2035 static nxt_int_t
2036 nxt_router_port_queue_map(nxt_task_t *task, nxt_port_t *port, nxt_fd_t fd)
2037 {
2038     void  *mem;
2039 
2040     nxt_assert(fd != -1);
2041 
2042     mem = nxt_mem_mmap(NULL, sizeof(nxt_port_queue_t),
2043                        PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
2044     if (nxt_slow_path(mem == MAP_FAILED)) {
2045 
2046         return NXT_ERROR;
2047     }
2048 
2049     port->queue = mem;
2050 
2051     return NXT_OK;
2052 }
2053 
2054 
2055 static const nxt_lvlhsh_proto_t  nxt_router_apps_hash_proto  nxt_aligned(64) = {
2056     NXT_LVLHSH_DEFAULT,
2057     nxt_router_apps_hash_test,
2058     nxt_mp_lvlhsh_alloc,
2059     nxt_mp_lvlhsh_free,
2060 };
2061 
2062 
2063 static nxt_int_t
2064 nxt_router_apps_hash_test(nxt_lvlhsh_query_t *lhq, void *data)
2065 {
2066     nxt_app_t  *app;
2067 
2068     app = data;
2069 
2070     return nxt_strstr_eq(&lhq->key, &app->name) ? NXT_OK : NXT_DECLINED;
2071 }
2072 
2073 
2074 static nxt_int_t
2075 nxt_router_apps_hash_add(nxt_router_conf_t *rtcf, nxt_app_t *app)
2076 {
2077     nxt_lvlhsh_query_t  lhq;
2078 
2079     lhq.key_hash = nxt_djb_hash(app->name.start, app->name.length);
2080     lhq.replace = 0;
2081     lhq.key = app->name;
2082     lhq.value = app;
2083     lhq.proto = &nxt_router_apps_hash_proto;
2084     lhq.pool = rtcf->mem_pool;
2085 
2086     switch (nxt_lvlhsh_insert(&rtcf->apps_hash, &lhq)) {
2087 
2088     case NXT_OK:
2089         return NXT_OK;
2090 
2091     case NXT_DECLINED:
2092         nxt_thread_log_alert("router app hash adding failed: "
2093                              "\"%V\" is already in hash", &lhq.key);
2094         /* Fall through. */
2095     default:
2096         return NXT_ERROR;
2097     }
2098 }
2099 
2100 
2101 static nxt_app_t *
2102 nxt_router_apps_hash_get(nxt_router_conf_t *rtcf, nxt_str_t *name)
2103 {
2104     nxt_lvlhsh_query_t  lhq;
2105 
2106     lhq.key_hash = nxt_djb_hash(name->start, name->length);
2107     lhq.key = *name;
2108     lhq.proto = &nxt_router_apps_hash_proto;
2109 
2110     if (nxt_lvlhsh_find(&rtcf->apps_hash, &lhq) != NXT_OK) {
2111         return NULL;
2112     }
2113 
2114     return lhq.value;
2115 }
2116 
2117 
2118 static void
2119 nxt_router_apps_hash_use(nxt_task_t *task, nxt_router_conf_t *rtcf, int i)
2120 {
2121     nxt_app_t          *app;
2122     nxt_lvlhsh_each_t  lhe;
2123 
2124     nxt_lvlhsh_each_init(&lhe, &nxt_router_apps_hash_proto);
2125 
2126     for ( ;; ) {
2127         app = nxt_lvlhsh_each(&rtcf->apps_hash, &lhe);
2128 
2129         if (app == NULL) {
2130             break;
2131         }
2132 
2133         nxt_router_app_use(task, app, i);
2134     }
2135 }
2136 
2137 
2138 
2139 nxt_int_t
2140 nxt_router_listener_application(nxt_router_conf_t *rtcf, nxt_str_t *name,
2141     nxt_http_action_t *action)
2142 {
2143     nxt_app_t  *app;
2144 
2145     app = nxt_router_apps_hash_get(rtcf, name);
2146 
2147     if (app == NULL) {
2148         return NXT_DECLINED;
2149     }
2150 
2151     action->u.app.application = app;
2152     action->handler = nxt_http_application_handler;
2153 
2154     return NXT_OK;
2155 }
2156 
2157 
2158 static nxt_socket_conf_t *
2159 nxt_router_socket_conf(nxt_task_t *task, nxt_router_temp_conf_t *tmcf,
2160     nxt_str_t *name)
2161 {
2162     size_t               size;
2163     nxt_int_t            ret;
2164     nxt_bool_t           wildcard;
2165     nxt_sockaddr_t       *sa;
2166     nxt_socket_conf_t    *skcf;
2167     nxt_listen_socket_t  *ls;
2168 
2169     sa = nxt_sockaddr_parse(tmcf->mem_pool, name);
2170     if (nxt_slow_path(sa == NULL)) {
2171         nxt_alert(task, "invalid listener \"%V\"", name);
2172         return NULL;
2173     }
2174 
2175     sa->type = SOCK_STREAM;
2176 
2177     nxt_debug(task, "router listener: \"%*s\"",
2178               (size_t) sa->length, nxt_sockaddr_start(sa));
2179 
2180     skcf = nxt_mp_zget(tmcf->router_conf->mem_pool, sizeof(nxt_socket_conf_t));
2181     if (nxt_slow_path(skcf == NULL)) {
2182         return NULL;
2183     }
2184 
2185     size = nxt_sockaddr_size(sa);
2186 
2187     ret = nxt_router_listen_socket_find(tmcf, skcf, sa);
2188 
2189     if (ret != NXT_OK) {
2190 
2191         ls = nxt_zalloc(sizeof(nxt_listen_socket_t) + size);
2192         if (nxt_slow_path(ls == NULL)) {
2193             return NULL;
2194         }
2195 
2196         skcf->listen = ls;
2197 
2198         ls->sockaddr = nxt_pointer_to(ls, sizeof(nxt_listen_socket_t));
2199         nxt_memcpy(ls->sockaddr, sa, size);
2200 
2201         nxt_listen_socket_remote_size(ls);
2202 
2203         ls->socket = -1;
2204         ls->backlog = NXT_LISTEN_BACKLOG;
2205         ls->flags = NXT_NONBLOCK;
2206         ls->read_after_accept = 1;
2207     }
2208 
2209     switch (sa->u.sockaddr.sa_family) {
2210 #if (NXT_HAVE_UNIX_DOMAIN)
2211     case AF_UNIX:
2212         wildcard = 0;
2213         break;
2214 #endif
2215 #if (NXT_INET6)
2216     case AF_INET6:
2217         wildcard = IN6_IS_ADDR_UNSPECIFIED(&sa->u.sockaddr_in6.sin6_addr);
2218         break;
2219 #endif
2220     case AF_INET:
2221     default:
2222         wildcard = (sa->u.sockaddr_in.sin_addr.s_addr == INADDR_ANY);
2223         break;
2224     }
2225 
2226     if (!wildcard) {
2227         skcf->sockaddr = nxt_mp_zget(tmcf->router_conf->mem_pool, size);
2228         if (nxt_slow_path(skcf->sockaddr == NULL)) {
2229             return NULL;
2230         }
2231 
2232         nxt_memcpy(skcf->sockaddr, sa, size);
2233     }
2234 
2235     return skcf;
2236 }
2237 
2238 
2239 static nxt_int_t
2240 nxt_router_listen_socket_find(nxt_router_temp_conf_t *tmcf,
2241     nxt_socket_conf_t *nskcf, nxt_sockaddr_t *sa)
2242 {
2243     nxt_router_t       *router;
2244     nxt_queue_link_t   *qlk;
2245     nxt_socket_conf_t  *skcf;
2246 
2247     router = tmcf->router_conf->router;
2248 
2249     for (qlk = nxt_queue_first(&router->sockets);
2250          qlk != nxt_queue_tail(&router->sockets);
2251          qlk = nxt_queue_next(qlk))
2252     {
2253         skcf = nxt_queue_link_data(qlk, nxt_socket_conf_t, link);
2254 
2255         if (nxt_sockaddr_cmp(skcf->listen->sockaddr, sa)) {
2256             nskcf->listen = skcf->listen;
2257 
2258             nxt_queue_remove(qlk);
2259             nxt_queue_insert_tail(&keeping_sockets, qlk);
2260 
2261             nxt_queue_insert_tail(&updating_sockets, &nskcf->link);
2262 
2263             return NXT_OK;
2264         }
2265     }
2266 
2267     nxt_queue_insert_tail(&pending_sockets, &nskcf->link);
2268 
2269     return NXT_DECLINED;
2270 }
2271 
2272 
2273 static void
2274 nxt_router_listen_socket_rpc_create(nxt_task_t *task,
2275     nxt_router_temp_conf_t *tmcf, nxt_socket_conf_t *skcf)
2276 {
2277     size_t            size;
2278     uint32_t          stream;
2279     nxt_int_t         ret;
2280     nxt_buf_t         *b;
2281     nxt_port_t        *main_port, *router_port;
2282     nxt_runtime_t     *rt;
2283     nxt_socket_rpc_t  *rpc;
2284 
2285     rpc = nxt_mp_alloc(tmcf->mem_pool, sizeof(nxt_socket_rpc_t));
2286     if (rpc == NULL) {
2287         goto fail;
2288     }
2289 
2290     rpc->socket_conf = skcf;
2291     rpc->temp_conf = tmcf;
2292 
2293     size = nxt_sockaddr_size(skcf->listen->sockaddr);
2294 
2295     b = nxt_buf_mem_alloc(tmcf->mem_pool, size, 0);
2296     if (b == NULL) {
2297         goto fail;
2298     }
2299 
2300     b->completion_handler = nxt_router_dummy_buf_completion;
2301 
2302     b->mem.free = nxt_cpymem(b->mem.free, skcf->listen->sockaddr, size);
2303 
2304     rt = task->thread->runtime;
2305     main_port = rt->port_by_type[NXT_PROCESS_MAIN];
2306     router_port = rt->port_by_type[NXT_PROCESS_ROUTER];
2307 
2308     stream = nxt_port_rpc_register_handler(task, router_port,
2309                                            nxt_router_listen_socket_ready,
2310                                            nxt_router_listen_socket_error,
2311                                            main_port->pid, rpc);
2312     if (nxt_slow_path(stream == 0)) {
2313         goto fail;
2314     }
2315 
2316     ret = nxt_port_socket_write(task, main_port, NXT_PORT_MSG_SOCKET, -1,
2317                                 stream, router_port->id, b);
2318 
2319     if (nxt_slow_path(ret != NXT_OK)) {
2320         nxt_port_rpc_cancel(task, router_port, stream);
2321         goto fail;
2322     }
2323 
2324     return;
2325 
2326 fail:
2327 
2328     nxt_router_conf_error(task, tmcf);
2329 }
2330 
2331 
2332 static void
2333 nxt_router_listen_socket_ready(nxt_task_t *task, nxt_port_recv_msg_t *msg,
2334     void *data)
2335 {
2336     nxt_int_t         ret;
2337     nxt_socket_t      s;
2338     nxt_socket_rpc_t  *rpc;
2339 
2340     rpc = data;
2341 
2342     s = msg->fd[0];
2343 
2344     ret = nxt_socket_nonblocking(task, s);
2345     if (nxt_slow_path(ret != NXT_OK)) {
2346         goto fail;
2347     }
2348 
2349     nxt_socket_defer_accept(task, s, rpc->socket_conf->listen->sockaddr);
2350 
2351     ret = nxt_listen_socket(task, s, NXT_LISTEN_BACKLOG);
2352     if (nxt_slow_path(ret != NXT_OK)) {
2353         goto fail;
2354     }
2355 
2356     rpc->socket_conf->listen->socket = s;
2357 
2358     nxt_work_queue_add(&task->thread->engine->fast_work_queue,
2359                        nxt_router_conf_apply, task, rpc->temp_conf, NULL);
2360 
2361     return;
2362 
2363 fail:
2364 
2365     nxt_socket_close(task, s);
2366 
2367     nxt_router_conf_error(task, rpc->temp_conf);
2368 }
2369 
2370 
2371 static void
2372 nxt_router_listen_socket_error(nxt_task_t *task, nxt_port_recv_msg_t *msg,
2373     void *data)
2374 {
2375     nxt_socket_rpc_t        *rpc;
2376     nxt_router_temp_conf_t  *tmcf;
2377 
2378     rpc = data;
2379     tmcf = rpc->temp_conf;
2380 
2381 #if 0
2382     u_char                  *p;
2383     size_t                  size;
2384     uint8_t                 error;
2385     nxt_buf_t               *in, *out;
2386     nxt_sockaddr_t          *sa;
2387 
2388     static nxt_str_t  socket_errors[] = {
2389         nxt_string("ListenerSystem"),
2390         nxt_string("ListenerNoIPv6"),
2391         nxt_string("ListenerPort"),
2392         nxt_string("ListenerInUse"),
2393         nxt_string("ListenerNoAddress"),
2394         nxt_string("ListenerNoAccess"),
2395         nxt_string("ListenerPath"),
2396     };
2397 
2398     sa = rpc->socket_conf->listen->sockaddr;
2399 
2400     in = nxt_buf_chk_make_plain(tmcf->mem_pool, msg->buf, msg->size);
2401 
2402     if (nxt_slow_path(in == NULL)) {
2403         return;
2404     }
2405 
2406     p = in->mem.pos;
2407 
2408     error = *p++;
2409 
2410     size = nxt_length("listen socket error: ")
2411            + nxt_length("{listener: \"\", code:\"\", message: \"\"}")
2412            + sa->length + socket_errors[error].length + (in->mem.free - p);
2413 
2414     out = nxt_buf_mem_alloc(tmcf->mem_pool, size, 0);
2415     if (nxt_slow_path(out == NULL)) {
2416         return;
2417     }
2418 
2419     out->mem.free = nxt_sprintf(out->mem.free, out->mem.end,
2420                         "listen socket error: "
2421                         "{listener: \"%*s\", code:\"%V\", message: \"%*s\"}",
2422                         (size_t) sa->length, nxt_sockaddr_start(sa),
2423                         &socket_errors[error], in->mem.free - p, p);
2424 
2425     nxt_debug(task, "%*s", out->mem.free - out->mem.pos, out->mem.pos);
2426 #endif
2427 
2428     nxt_router_conf_error(task, tmcf);
2429 }
2430 
2431 
2432 #if (NXT_TLS)
2433 
2434 static void
2435 nxt_router_tls_rpc_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg,
2436     void *data)
2437 {
2438     nxt_mp_t                *mp;
2439     nxt_int_t               ret;
2440     nxt_tls_conf_t          *tlscf;
2441     nxt_router_tlssock_t    *tls;
2442     nxt_tls_bundle_conf_t   *bundle;
2443     nxt_router_temp_conf_t  *tmcf;
2444 
2445     nxt_debug(task, "tls rpc handler");
2446 
2447     tls = data;
2448     tmcf = tls->temp_conf;
2449 
2450     if (msg == NULL || msg->port_msg.type == _NXT_PORT_MSG_RPC_ERROR) {
2451         goto fail;
2452     }
2453 
2454     mp = tmcf->router_conf->mem_pool;
2455 
2456     if (tls->socket_conf->tls == NULL){
2457         tlscf = nxt_mp_zget(mp, sizeof(nxt_tls_conf_t));
2458         if (nxt_slow_path(tlscf == NULL)) {
2459             goto fail;
2460         }
2461 
2462         tlscf->no_wait_shutdown = 1;
2463         tls->socket_conf->tls = tlscf;
2464 
2465     } else {
2466         tlscf = tls->socket_conf->tls;
2467     }
2468 
2469     bundle = nxt_mp_get(mp, sizeof(nxt_tls_bundle_conf_t));
2470     if (nxt_slow_path(bundle == NULL)) {
2471         goto fail;
2472     }
2473 
2474     if (nxt_slow_path(nxt_str_dup(mp, &bundle->name, &tls->name) == NULL)) {
2475         goto fail;
2476     }
2477 
2478     bundle->chain_file = msg->fd[0];
2479     bundle->next = tlscf->bundle;
2480     tlscf->bundle = bundle;
2481 
2482     ret = task->thread->runtime->tls->server_init(task, tlscf, mp,
2483                                                   tls->conf_cmds, tls->last);
2484     if (nxt_slow_path(ret != NXT_OK)) {
2485         goto fail;
2486     }
2487 
2488     nxt_work_queue_add(&task->thread->engine->fast_work_queue,
2489                        nxt_router_conf_apply, task, tmcf, NULL);
2490     return;
2491 
2492 fail:
2493 
2494     nxt_router_conf_error(task, tmcf);
2495 }
2496 
2497 #endif
2498 
2499 
2500 static void
2501 nxt_router_app_rpc_create(nxt_task_t *task,
2502     nxt_router_temp_conf_t *tmcf, nxt_app_t *app)
2503 {
2504     size_t         size;
2505     uint32_t       stream;
2506     nxt_int_t      ret;
2507     nxt_buf_t      *b;
2508     nxt_port_t     *main_port, *router_port;
2509     nxt_runtime_t  *rt;
2510     nxt_app_rpc_t  *rpc;
2511 
2512     rpc = nxt_mp_alloc(tmcf->mem_pool, sizeof(nxt_app_rpc_t));
2513     if (rpc == NULL) {
2514         goto fail;
2515     }
2516 
2517     rpc->app = app;
2518     rpc->temp_conf = tmcf;
2519 
2520     nxt_debug(task, "app '%V' prefork", &app->name);
2521 
2522     size = app->name.length + 1 + app->conf.length;
2523 
2524     b = nxt_buf_mem_alloc(tmcf->mem_pool, size, 0);
2525     if (nxt_slow_path(b == NULL)) {
2526         goto fail;
2527     }
2528 
2529     b->completion_handler = nxt_router_dummy_buf_completion;
2530 
2531     nxt_buf_cpystr(b, &app->name);
2532     *b->mem.free++ = '\0';
2533     nxt_buf_cpystr(b, &app->conf);
2534 
2535     rt = task->thread->runtime;
2536     main_port = rt->port_by_type[NXT_PROCESS_MAIN];
2537     router_port = rt->port_by_type[NXT_PROCESS_ROUTER];
2538 
2539     stream = nxt_port_rpc_register_handler(task, router_port,
2540                                            nxt_router_app_prefork_ready,
2541                                            nxt_router_app_prefork_error,
2542                                            -1, rpc);
2543     if (nxt_slow_path(stream == 0)) {
2544         goto fail;
2545     }
2546 
2547     ret = nxt_port_socket_write(task, main_port, NXT_PORT_MSG_START_PROCESS,
2548                                 -1, stream, router_port->id, b);
2549 
2550     if (nxt_slow_path(ret != NXT_OK)) {
2551         nxt_port_rpc_cancel(task, router_port, stream);
2552         goto fail;
2553     }
2554 
2555     app->pending_processes++;
2556 
2557     return;
2558 
2559 fail:
2560 
2561     nxt_router_conf_error(task, tmcf);
2562 }
2563 
2564 
2565 static void
2566 nxt_router_app_prefork_ready(nxt_task_t *task, nxt_port_recv_msg_t *msg,
2567     void *data)
2568 {
2569     nxt_app_t           *app;
2570     nxt_port_t          *port;
2571     nxt_app_rpc_t       *rpc;
2572     nxt_event_engine_t  *engine;
2573 
2574     rpc = data;
2575     app = rpc->app;
2576 
2577     port = msg->u.new_port;
2578 
2579     nxt_assert(port != NULL);
2580     nxt_assert(port->type == NXT_PROCESS_APP);
2581     nxt_assert(port->id == 0);
2582 
2583     port->app = app;
2584     port->main_app_port = port;
2585 
2586     app->pending_processes--;
2587     app->processes++;
2588     app->idle_processes++;
2589 
2590     engine = task->thread->engine;
2591 
2592     nxt_queue_insert_tail(&app->ports, &port->app_link);
2593     nxt_queue_insert_tail(&app->spare_ports, &port->idle_link);
2594 
2595     nxt_debug(task, "app '%V' move new port %PI:%d to spare_ports",
2596               &app->name, port->pid, port->id);
2597 
2598     nxt_port_hash_add(&app->port_hash, port);
2599     app->port_hash_count++;
2600 
2601     port->idle_start = 0;
2602 
2603     nxt_port_inc_use(port);
2604 
2605     nxt_router_app_shared_port_send(task, port);
2606 
2607     nxt_work_queue_add(&engine->fast_work_queue,
2608                        nxt_router_conf_apply, task, rpc->temp_conf, NULL);
2609 }
2610 
2611 
2612 static void
2613 nxt_router_app_prefork_error(nxt_task_t *task, nxt_port_recv_msg_t *msg,
2614     void *data)
2615 {
2616     nxt_app_t               *app;
2617     nxt_app_rpc_t           *rpc;
2618     nxt_router_temp_conf_t  *tmcf;
2619 
2620     rpc = data;
2621     app = rpc->app;
2622     tmcf = rpc->temp_conf;
2623 
2624     nxt_log(task, NXT_LOG_WARN, "failed to start application \"%V\"",
2625             &app->name);
2626 
2627     app->pending_processes--;
2628 
2629     nxt_router_conf_error(task, tmcf);
2630 }
2631 
2632 
2633 static nxt_int_t
2634 nxt_router_engines_create(nxt_task_t *task, nxt_router_t *router,
2635     nxt_router_temp_conf_t *tmcf, const nxt_event_interface_t *interface)
2636 {
2637     nxt_int_t                 ret;
2638     nxt_uint_t                n, threads;
2639     nxt_queue_link_t          *qlk;
2640     nxt_router_engine_conf_t  *recf;
2641 
2642     threads = tmcf->router_conf->threads;
2643 
2644     tmcf->engines = nxt_array_create(tmcf->mem_pool, threads,
2645                                      sizeof(nxt_router_engine_conf_t));
2646     if (nxt_slow_path(tmcf->engines == NULL)) {
2647         return NXT_ERROR;
2648     }
2649 
2650     n = 0;
2651 
2652     for (qlk = nxt_queue_first(&router->engines);
2653          qlk != nxt_queue_tail(&router->engines);
2654          qlk = nxt_queue_next(qlk))
2655     {
2656         recf = nxt_array_zero_add(tmcf->engines);
2657         if (nxt_slow_path(recf == NULL)) {
2658             return NXT_ERROR;
2659         }
2660 
2661         recf->engine = nxt_queue_link_data(qlk, nxt_event_engine_t, link0);
2662 
2663         if (n < threads) {
2664             recf->action = NXT_ROUTER_ENGINE_KEEP;
2665             ret = nxt_router_engine_conf_update(tmcf, recf);
2666 
2667         } else {
2668             recf->action = NXT_ROUTER_ENGINE_DELETE;
2669             ret = nxt_router_engine_conf_delete(tmcf, recf);
2670         }
2671 
2672         if (nxt_slow_path(ret != NXT_OK)) {
2673             return ret;
2674         }
2675 
2676         n++;
2677     }
2678 
2679     tmcf->new_threads = n;
2680 
2681     while (n < threads) {
2682         recf = nxt_array_zero_add(tmcf->engines);
2683         if (nxt_slow_path(recf == NULL)) {
2684             return NXT_ERROR;
2685         }
2686 
2687         recf->action = NXT_ROUTER_ENGINE_ADD;
2688 
2689         recf->engine = nxt_event_engine_create(task, interface, NULL, 0, 0);
2690         if (nxt_slow_path(recf->engine == NULL)) {
2691             return NXT_ERROR;
2692         }
2693 
2694         ret = nxt_router_engine_conf_create(tmcf, recf);
2695         if (nxt_slow_path(ret != NXT_OK)) {
2696             return ret;
2697         }
2698 
2699         n++;
2700     }
2701 
2702     return NXT_OK;
2703 }
2704 
2705 
2706 static nxt_int_t
2707 nxt_router_engine_conf_create(nxt_router_temp_conf_t *tmcf,
2708     nxt_router_engine_conf_t *recf)
2709 {
2710     nxt_int_t  ret;
2711 
2712     ret = nxt_router_engine_joints_create(tmcf, recf, &creating_sockets,
2713                                           nxt_router_listen_socket_create);
2714     if (nxt_slow_path(ret != NXT_OK)) {
2715         return ret;
2716     }
2717 
2718     ret = nxt_router_engine_joints_create(tmcf, recf, &updating_sockets,
2719                                           nxt_router_listen_socket_create);
2720     if (nxt_slow_path(ret != NXT_OK)) {
2721         return ret;
2722     }
2723 
2724     return ret;
2725 }
2726 
2727 
2728 static nxt_int_t
2729 nxt_router_engine_conf_update(nxt_router_temp_conf_t *tmcf,
2730     nxt_router_engine_conf_t *recf)
2731 {
2732     nxt_int_t  ret;
2733 
2734     ret = nxt_router_engine_joints_create(tmcf, recf, &creating_sockets,
2735                                           nxt_router_listen_socket_create);
2736     if (nxt_slow_path(ret != NXT_OK)) {
2737         return ret;
2738     }
2739 
2740     ret = nxt_router_engine_joints_create(tmcf, recf, &updating_sockets,
2741                                           nxt_router_listen_socket_update);
2742     if (nxt_slow_path(ret != NXT_OK)) {
2743         return ret;
2744     }
2745 
2746     ret = nxt_router_engine_joints_delete(tmcf, recf, &deleting_sockets);
2747     if (nxt_slow_path(ret != NXT_OK)) {
2748         return ret;
2749     }
2750 
2751     return ret;
2752 }
2753 
2754 
2755 static nxt_int_t
2756 nxt_router_engine_conf_delete(nxt_router_temp_conf_t *tmcf,
2757     nxt_router_engine_conf_t *recf)
2758 {
2759     nxt_int_t  ret;
2760 
2761     ret = nxt_router_engine_quit(tmcf, recf);
2762     if (nxt_slow_path(ret != NXT_OK)) {
2763         return ret;
2764     }
2765 
2766     ret = nxt_router_engine_joints_delete(tmcf, recf, &updating_sockets);
2767     if (nxt_slow_path(ret != NXT_OK)) {
2768         return ret;
2769     }
2770 
2771     return nxt_router_engine_joints_delete(tmcf, recf, &deleting_sockets);
2772 }
2773 
2774 
2775 static nxt_int_t
2776 nxt_router_engine_joints_create(nxt_router_temp_conf_t *tmcf,
2777     nxt_router_engine_conf_t *recf, nxt_queue_t *sockets,
2778     nxt_work_handler_t handler)
2779 {
2780     nxt_int_t                ret;
2781     nxt_joint_job_t          *job;
2782     nxt_queue_link_t         *qlk;
2783     nxt_socket_conf_t        *skcf;
2784     nxt_socket_conf_joint_t  *joint;
2785 
2786     for (qlk = nxt_queue_first(sockets);
2787          qlk != nxt_queue_tail(sockets);
2788          qlk = nxt_queue_next(qlk))
2789     {
2790         job = nxt_mp_get(tmcf->mem_pool, sizeof(nxt_joint_job_t));
2791         if (nxt_slow_path(job == NULL)) {
2792             return NXT_ERROR;
2793         }
2794 
2795         job->work.next = recf->jobs;
2796         recf->jobs = &job->work;
2797 
2798         job->task = tmcf->engine->task;
2799         job->work.handler = handler;
2800         job->work.task = &job->task;
2801         job->work.obj = job;
2802         job->tmcf = tmcf;
2803 
2804         tmcf->count++;
2805 
2806         joint = nxt_mp_alloc(tmcf->router_conf->mem_pool,
2807                              sizeof(nxt_socket_conf_joint_t));
2808         if (nxt_slow_path(joint == NULL)) {
2809             return NXT_ERROR;
2810         }
2811 
2812         job->work.data = joint;
2813 
2814         ret = nxt_upstreams_joint_create(tmcf, &joint->upstreams);
2815         if (nxt_slow_path(ret != NXT_OK)) {
2816             return ret;
2817         }
2818 
2819         joint->count = 1;
2820 
2821         skcf = nxt_queue_link_data(qlk, nxt_socket_conf_t, link);
2822         skcf->count++;
2823         joint->socket_conf = skcf;
2824 
2825         joint->engine = recf->engine;
2826     }
2827 
2828     return NXT_OK;
2829 }
2830 
2831 
2832 static nxt_int_t
2833 nxt_router_engine_quit(nxt_router_temp_conf_t *tmcf,
2834     nxt_router_engine_conf_t *recf)
2835 {
2836     nxt_joint_job_t  *job;
2837 
2838     job = nxt_mp_get(tmcf->mem_pool, sizeof(nxt_joint_job_t));
2839     if (nxt_slow_path(job == NULL)) {
2840         return NXT_ERROR;
2841     }
2842 
2843     job->work.next = recf->jobs;
2844     recf->jobs = &job->work;
2845 
2846     job->task = tmcf->engine->task;
2847     job->work.handler = nxt_router_worker_thread_quit;
2848     job->work.task = &job->task;
2849     job->work.obj = NULL;
2850     job->work.data = NULL;
2851     job->tmcf = NULL;
2852 
2853     return NXT_OK;
2854 }
2855 
2856 
2857 static nxt_int_t
2858 nxt_router_engine_joints_delete(nxt_router_temp_conf_t *tmcf,
2859     nxt_router_engine_conf_t *recf, nxt_queue_t *sockets)
2860 {
2861     nxt_joint_job_t   *job;
2862     nxt_queue_link_t  *qlk;
2863 
2864     for (qlk = nxt_queue_first(sockets);
2865          qlk != nxt_queue_tail(sockets);
2866          qlk = nxt_queue_next(qlk))
2867     {
2868         job = nxt_mp_get(tmcf->mem_pool, sizeof(nxt_joint_job_t));
2869         if (nxt_slow_path(job == NULL)) {
2870             return NXT_ERROR;
2871         }
2872 
2873         job->work.next = recf->jobs;
2874         recf->jobs = &job->work;
2875 
2876         job->task = tmcf->engine->task;
2877         job->work.handler = nxt_router_listen_socket_delete;
2878         job->work.task = &job->task;
2879         job->work.obj = job;
2880         job->work.data = nxt_queue_link_data(qlk, nxt_socket_conf_t, link);
2881         job->tmcf = tmcf;
2882 
2883         tmcf->count++;
2884     }
2885 
2886     return NXT_OK;
2887 }
2888 
2889 
2890 static nxt_int_t
2891 nxt_router_threads_create(nxt_task_t *task, nxt_runtime_t *rt,
2892     nxt_router_temp_conf_t *tmcf)
2893 {
2894     nxt_int_t                 ret;
2895     nxt_uint_t                i, threads;
2896     nxt_router_engine_conf_t  *recf;
2897 
2898     recf = tmcf->engines->elts;
2899     threads = tmcf->router_conf->threads;
2900 
2901     for (i = tmcf->new_threads; i < threads; i++) {
2902         ret = nxt_router_thread_create(task, rt, recf[i].engine);
2903         if (nxt_slow_path(ret != NXT_OK)) {
2904             return ret;
2905         }
2906     }
2907 
2908     return NXT_OK;
2909 }
2910 
2911 
2912 static nxt_int_t
2913 nxt_router_thread_create(nxt_task_t *task, nxt_runtime_t *rt,
2914     nxt_event_engine_t *engine)
2915 {
2916     nxt_int_t            ret;
2917     nxt_thread_link_t    *link;
2918     nxt_thread_handle_t  handle;
2919 
2920     link = nxt_zalloc(sizeof(nxt_thread_link_t));
2921 
2922     if (nxt_slow_path(link == NULL)) {
2923         return NXT_ERROR;
2924     }
2925 
2926     link->start = nxt_router_thread_start;
2927     link->engine = engine;
2928     link->work.handler = nxt_router_thread_exit_handler;
2929     link->work.task = task;
2930     link->work.data = link;
2931 
2932     nxt_queue_insert_tail(&rt->engines, &engine->link);
2933 
2934     ret = nxt_thread_create(&handle, link);
2935 
2936     if (nxt_slow_path(ret != NXT_OK)) {
2937         nxt_queue_remove(&engine->link);
2938     }
2939 
2940     return ret;
2941 }
2942 
2943 
2944 static void
2945 nxt_router_apps_sort(nxt_task_t *task, nxt_router_t *router,
2946     nxt_router_temp_conf_t *tmcf)
2947 {
2948     nxt_app_t  *app;
2949 
2950     nxt_queue_each(app, &router->apps, nxt_app_t, link) {
2951 
2952         nxt_router_app_unlink(task, app);
2953 
2954     } nxt_queue_loop;
2955 
2956     nxt_queue_add(&router->apps, &tmcf->previous);
2957     nxt_queue_add(&router->apps, &tmcf->apps);
2958 }
2959 
2960 
2961 static void
2962 nxt_router_engines_post(nxt_router_t *router, nxt_router_temp_conf_t *tmcf)
2963 {
2964     nxt_uint_t                n;
2965     nxt_event_engine_t        *engine;
2966     nxt_router_engine_conf_t  *recf;
2967 
2968     recf = tmcf->engines->elts;
2969 
2970     for (n = tmcf->engines->nelts; n != 0; n--) {
2971         engine = recf->engine;
2972 
2973         switch (recf->action) {
2974 
2975         case NXT_ROUTER_ENGINE_KEEP:
2976             break;
2977 
2978         case NXT_ROUTER_ENGINE_ADD:
2979             nxt_queue_insert_tail(&router->engines, &engine->link0);
2980             break;
2981 
2982         case NXT_ROUTER_ENGINE_DELETE:
2983             nxt_queue_remove(&engine->link0);
2984             break;
2985         }
2986 
2987         nxt_router_engine_post(engine, recf->jobs);
2988 
2989         recf++;
2990     }
2991 }
2992 
2993 
2994 static void
2995 nxt_router_engine_post(nxt_event_engine_t *engine, nxt_work_t *jobs)
2996 {
2997     nxt_work_t  *work, *next;
2998 
2999     for (work = jobs; work != NULL; work = next) {
3000         next = work->next;
3001         work->next = NULL;
3002 
3003         nxt_event_engine_post(engine, work);
3004     }
3005 }
3006 
3007 
3008 static nxt_port_handlers_t  nxt_router_app_port_handlers = {
3009     .rpc_error       = nxt_port_rpc_handler,
3010     .mmap            = nxt_port_mmap_handler,
3011     .data            = nxt_port_rpc_handler,
3012     .oosm            = nxt_router_oosm_handler,
3013     .req_headers_ack = nxt_port_rpc_handler,
3014 };
3015 
3016 
3017 static void
3018 nxt_router_thread_start(void *data)
3019 {
3020     nxt_int_t           ret;
3021     nxt_port_t          *port;
3022     nxt_task_t          *task;
3023     nxt_work_t          *work;
3024     nxt_thread_t        *thread;
3025     nxt_thread_link_t   *link;
3026     nxt_event_engine_t  *engine;
3027 
3028     link = data;
3029     engine = link->engine;
3030     task = &engine->task;
3031 
3032     thread = nxt_thread();
3033 
3034     nxt_event_engine_thread_adopt(engine);
3035 
3036     /* STUB */
3037     thread->runtime = engine->task.thread->runtime;
3038 
3039     engine->task.thread = thread;
3040     engine->task.log = thread->log;
3041     thread->engine = engine;
3042     thread->task = &engine->task;
3043 #if 0
3044     thread->fiber = &engine->fibers->fiber;
3045 #endif
3046 
3047     engine->mem_pool = nxt_mp_create(4096, 128, 1024, 64);
3048     if (nxt_slow_path(engine->mem_pool == NULL)) {
3049         return;
3050     }
3051 
3052     port = nxt_port_new(task, nxt_port_get_next_id(), nxt_pid,
3053                         NXT_PROCESS_ROUTER);
3054     if (nxt_slow_path(port == NULL)) {
3055         return;
3056     }
3057 
3058     ret = nxt_port_socket_init(task, port, 0);
3059     if (nxt_slow_path(ret != NXT_OK)) {
3060         nxt_port_use(task, port, -1);
3061         return;
3062     }
3063 
3064     ret = nxt_router_port_queue_init(task, port);
3065     if (nxt_slow_path(ret != NXT_OK)) {
3066         nxt_port_use(task, port, -1);
3067         return;
3068     }
3069 
3070     engine->port = port;
3071 
3072     nxt_port_enable(task, port, &nxt_router_app_port_handlers);
3073 
3074     work = nxt_zalloc(sizeof(nxt_work_t));
3075     if (nxt_slow_path(work == NULL)) {
3076         return;
3077     }
3078 
3079     work->handler = nxt_router_rt_add_port;
3080     work->task = link->work.task;
3081     work->obj = work;
3082     work->data = port;
3083 
3084     nxt_event_engine_post(link->work.task->thread->engine, work);
3085 
3086     nxt_event_engine_start(engine);
3087 }
3088 
3089 
3090 static void
3091 nxt_router_rt_add_port(nxt_task_t *task, void *obj, void *data)
3092 {
3093     nxt_int_t      res;
3094     nxt_port_t     *port;
3095     nxt_runtime_t  *rt;
3096 
3097     rt = task->thread->runtime;
3098     port = data;
3099 
3100     nxt_free(obj);
3101 
3102     res = nxt_port_hash_add(&rt->ports, port);
3103 
3104     if (nxt_fast_path(res == NXT_OK)) {
3105         nxt_port_use(task, port, 1);
3106     }
3107 }
3108 
3109 
3110 static void
3111 nxt_router_listen_socket_create(nxt_task_t *task, void *obj, void *data)
3112 {
3113     nxt_joint_job_t          *job;
3114     nxt_socket_conf_t        *skcf;
3115     nxt_listen_event_t       *lev;
3116     nxt_listen_socket_t      *ls;
3117     nxt_thread_spinlock_t    *lock;
3118     nxt_socket_conf_joint_t  *joint;
3119 
3120     job = obj;
3121     joint = data;
3122 
3123     nxt_queue_insert_tail(&task->thread->engine->joints, &joint->link);
3124 
3125     skcf = joint->socket_conf;
3126     ls = skcf->listen;
3127 
3128     lev = nxt_listen_event(task, ls);
3129     if (nxt_slow_path(lev == NULL)) {
3130         nxt_router_listen_socket_release(task, skcf);
3131         return;
3132     }
3133 
3134     lev->socket.data = joint;
3135 
3136     lock = &skcf->router_conf->router->lock;
3137 
3138     nxt_thread_spin_lock(lock);
3139     ls->count++;
3140     nxt_thread_spin_unlock(lock);
3141 
3142     job->work.next = NULL;
3143     job->work.handler = nxt_router_conf_wait;
3144 
3145     nxt_event_engine_post(job->tmcf->engine, &job->work);
3146 }
3147 
3148 
3149 nxt_inline nxt_listen_event_t *
3150 nxt_router_listen_event(nxt_queue_t *listen_connections,
3151     nxt_socket_conf_t *skcf)
3152 {
3153     nxt_socket_t        fd;
3154     nxt_queue_link_t    *qlk;
3155     nxt_listen_event_t  *lev;
3156 
3157     fd = skcf->listen->socket;
3158 
3159     for (qlk = nxt_queue_first(listen_connections);
3160          qlk != nxt_queue_tail(listen_connections);
3161          qlk = nxt_queue_next(qlk))
3162     {
3163         lev = nxt_queue_link_data(qlk, nxt_listen_event_t, link);
3164 
3165         if (fd == lev->socket.fd) {
3166             return lev;
3167         }
3168     }
3169 
3170     return NULL;
3171 }
3172 
3173 
3174 static void
3175 nxt_router_listen_socket_update(nxt_task_t *task, void *obj, void *data)
3176 {
3177     nxt_joint_job_t          *job;
3178     nxt_event_engine_t       *engine;
3179     nxt_listen_event_t       *lev;
3180     nxt_socket_conf_joint_t  *joint, *old;
3181 
3182     job = obj;
3183     joint = data;
3184 
3185     engine = task->thread->