xref: /unit/src/nxt_router.c (revision 2033:87f86cfc2002)
1 
2 /*
3  * Copyright (C) Igor Sysoev
4  * Copyright (C) Valentin V. Bartenev
5  * Copyright (C) NGINX, Inc.
6  */
7 
8 #include <nxt_router.h>
9 #include <nxt_conf.h>
10 #if (NXT_TLS)
11 #include <nxt_cert.h>
12 #endif
13 #include <nxt_http.h>
14 #include <nxt_port_memory_int.h>
15 #include <nxt_unit_request.h>
16 #include <nxt_unit_response.h>
17 #include <nxt_router_request.h>
18 #include <nxt_app_queue.h>
19 #include <nxt_port_queue.h>
20 
21 #define NXT_SHARED_PORT_ID  0xFFFFu
22 
23 typedef struct {
24     nxt_str_t         type;
25     uint32_t          processes;
26     uint32_t          max_processes;
27     uint32_t          spare_processes;
28     nxt_msec_t        timeout;
29     nxt_msec_t        idle_timeout;
30     nxt_conf_value_t  *limits_value;
31     nxt_conf_value_t  *processes_value;
32     nxt_conf_value_t  *targets_value;
33 } nxt_router_app_conf_t;
34 
35 
36 typedef struct {
37     nxt_str_t         pass;
38     nxt_str_t         application;
39 } nxt_router_listener_conf_t;
40 
41 
42 #if (NXT_TLS)
43 
44 typedef struct {
45     nxt_str_t               name;
46     nxt_socket_conf_t       *socket_conf;
47     nxt_router_temp_conf_t  *temp_conf;
48     nxt_tls_init_t          *tls_init;
49     nxt_bool_t              last;
50 
51     nxt_queue_link_t        link;  /* for nxt_socket_conf_t.tls */
52 } nxt_router_tlssock_t;
53 
54 #endif
55 
56 
57 typedef struct {
58     nxt_str_t               *name;
59     nxt_socket_conf_t       *socket_conf;
60     nxt_router_temp_conf_t  *temp_conf;
61     nxt_bool_t              last;
62 } nxt_socket_rpc_t;
63 
64 
65 typedef struct {
66     nxt_app_t               *app;
67     nxt_router_temp_conf_t  *temp_conf;
68     uint8_t                 proto;  /* 1 bit */
69 } nxt_app_rpc_t;
70 
71 
72 typedef struct {
73     nxt_app_joint_t         *app_joint;
74     uint32_t                generation;
75     uint8_t                 proto;  /* 1 bit */
76 } nxt_app_joint_rpc_t;
77 
78 
79 static nxt_int_t nxt_router_prefork(nxt_task_t *task, nxt_process_t *process,
80     nxt_mp_t *mp);
81 static nxt_int_t nxt_router_start(nxt_task_t *task, nxt_process_data_t *data);
82 static void nxt_router_greet_controller(nxt_task_t *task,
83     nxt_port_t *controller_port);
84 
85 static nxt_int_t nxt_router_start_app_process(nxt_task_t *task, nxt_app_t *app);
86 
87 static void nxt_router_new_port_handler(nxt_task_t *task,
88     nxt_port_recv_msg_t *msg);
89 static void nxt_router_conf_data_handler(nxt_task_t *task,
90     nxt_port_recv_msg_t *msg);
91 static void nxt_router_app_restart_handler(nxt_task_t *task,
92     nxt_port_recv_msg_t *msg);
93 static void nxt_router_remove_pid_handler(nxt_task_t *task,
94     nxt_port_recv_msg_t *msg);
95 static void nxt_router_access_log_reopen_handler(nxt_task_t *task,
96     nxt_port_recv_msg_t *msg);
97 
98 static nxt_router_temp_conf_t *nxt_router_temp_conf(nxt_task_t *task);
99 static void nxt_router_conf_apply(nxt_task_t *task, void *obj, void *data);
100 static void nxt_router_conf_ready(nxt_task_t *task,
101     nxt_router_temp_conf_t *tmcf);
102 static void nxt_router_conf_error(nxt_task_t *task,
103     nxt_router_temp_conf_t *tmcf);
104 static void nxt_router_conf_send(nxt_task_t *task,
105     nxt_router_temp_conf_t *tmcf, nxt_port_msg_type_t type);
106 
107 static nxt_int_t nxt_router_conf_create(nxt_task_t *task,
108     nxt_router_temp_conf_t *tmcf, u_char *start, u_char *end);
109 static nxt_int_t nxt_router_conf_process_static(nxt_task_t *task,
110     nxt_router_conf_t *rtcf, nxt_conf_value_t *conf);
111 static nxt_int_t nxt_router_conf_process_client_ip(nxt_task_t *task,
112     nxt_router_temp_conf_t *tmcf, nxt_socket_conf_t *skcf,
113     nxt_conf_value_t *conf);
114 
115 static nxt_app_t *nxt_router_app_find(nxt_queue_t *queue, nxt_str_t *name);
116 static nxt_int_t nxt_router_apps_hash_test(nxt_lvlhsh_query_t *lhq, void *data);
117 static nxt_int_t nxt_router_apps_hash_add(nxt_router_conf_t *rtcf,
118     nxt_app_t *app);
119 static nxt_app_t *nxt_router_apps_hash_get(nxt_router_conf_t *rtcf,
120     nxt_str_t *name);
121 static void nxt_router_apps_hash_use(nxt_task_t *task, nxt_router_conf_t *rtcf,
122     int i);
123 
124 static nxt_int_t nxt_router_app_queue_init(nxt_task_t *task,
125     nxt_port_t *port);
126 static nxt_int_t nxt_router_port_queue_init(nxt_task_t *task,
127     nxt_port_t *port);
128 static nxt_int_t nxt_router_port_queue_map(nxt_task_t *task,
129     nxt_port_t *port, nxt_fd_t fd);
130 static void nxt_router_listen_socket_rpc_create(nxt_task_t *task,
131     nxt_router_temp_conf_t *tmcf, nxt_socket_conf_t *skcf);
132 static void nxt_router_listen_socket_ready(nxt_task_t *task,
133     nxt_port_recv_msg_t *msg, void *data);
134 static void nxt_router_listen_socket_error(nxt_task_t *task,
135     nxt_port_recv_msg_t *msg, void *data);
136 #if (NXT_TLS)
137 static void nxt_router_tls_rpc_handler(nxt_task_t *task,
138     nxt_port_recv_msg_t *msg, void *data);
139 static nxt_int_t nxt_router_conf_tls_insert(nxt_router_temp_conf_t *tmcf,
140     nxt_conf_value_t *value, nxt_socket_conf_t *skcf, nxt_tls_init_t *tls_init,
141     nxt_bool_t last);
142 #endif
143 static void nxt_router_app_rpc_create(nxt_task_t *task,
144     nxt_router_temp_conf_t *tmcf, nxt_app_t *app);
145 static void nxt_router_app_prefork_ready(nxt_task_t *task,
146     nxt_port_recv_msg_t *msg, void *data);
147 static void nxt_router_app_prefork_error(nxt_task_t *task,
148     nxt_port_recv_msg_t *msg, void *data);
149 static nxt_socket_conf_t *nxt_router_socket_conf(nxt_task_t *task,
150     nxt_router_temp_conf_t *tmcf, nxt_str_t *name);
151 static nxt_int_t nxt_router_listen_socket_find(nxt_router_temp_conf_t *tmcf,
152     nxt_socket_conf_t *nskcf, nxt_sockaddr_t *sa);
153 
154 static nxt_int_t nxt_router_engines_create(nxt_task_t *task,
155     nxt_router_t *router, nxt_router_temp_conf_t *tmcf,
156     const nxt_event_interface_t *interface);
157 static nxt_int_t nxt_router_engine_conf_create(nxt_router_temp_conf_t *tmcf,
158     nxt_router_engine_conf_t *recf);
159 static nxt_int_t nxt_router_engine_conf_update(nxt_router_temp_conf_t *tmcf,
160     nxt_router_engine_conf_t *recf);
161 static nxt_int_t nxt_router_engine_conf_delete(nxt_router_temp_conf_t *tmcf,
162     nxt_router_engine_conf_t *recf);
163 static nxt_int_t nxt_router_engine_joints_create(nxt_router_temp_conf_t *tmcf,
164     nxt_router_engine_conf_t *recf, nxt_queue_t *sockets,
165     nxt_work_handler_t handler);
166 static nxt_int_t nxt_router_engine_quit(nxt_router_temp_conf_t *tmcf,
167     nxt_router_engine_conf_t *recf);
168 static nxt_int_t nxt_router_engine_joints_delete(nxt_router_temp_conf_t *tmcf,
169     nxt_router_engine_conf_t *recf, nxt_queue_t *sockets);
170 
171 static nxt_int_t nxt_router_threads_create(nxt_task_t *task, nxt_runtime_t *rt,
172     nxt_router_temp_conf_t *tmcf);
173 static nxt_int_t nxt_router_thread_create(nxt_task_t *task, nxt_runtime_t *rt,
174     nxt_event_engine_t *engine);
175 static void nxt_router_apps_sort(nxt_task_t *task, nxt_router_t *router,
176     nxt_router_temp_conf_t *tmcf);
177 
178 static void nxt_router_engines_post(nxt_router_t *router,
179     nxt_router_temp_conf_t *tmcf);
180 static void nxt_router_engine_post(nxt_event_engine_t *engine,
181     nxt_work_t *jobs);
182 
183 static void nxt_router_thread_start(void *data);
184 static void nxt_router_rt_add_port(nxt_task_t *task, void *obj,
185     void *data);
186 static void nxt_router_listen_socket_create(nxt_task_t *task, void *obj,
187     void *data);
188 static void nxt_router_listen_socket_update(nxt_task_t *task, void *obj,
189     void *data);
190 static void nxt_router_listen_socket_delete(nxt_task_t *task, void *obj,
191     void *data);
192 static void nxt_router_worker_thread_quit(nxt_task_t *task, void *obj,
193     void *data);
194 static void nxt_router_listen_socket_close(nxt_task_t *task, void *obj,
195     void *data);
196 static void nxt_router_thread_exit_handler(nxt_task_t *task, void *obj,
197     void *data);
198 static void nxt_router_req_headers_ack_handler(nxt_task_t *task,
199     nxt_port_recv_msg_t *msg, nxt_request_rpc_data_t *req_rpc_data);
200 static void nxt_router_listen_socket_release(nxt_task_t *task,
201     nxt_socket_conf_t *skcf);
202 
203 static void nxt_router_access_log_writer(nxt_task_t *task,
204     nxt_http_request_t *r, nxt_router_access_log_t *access_log);
205 static u_char *nxt_router_access_log_date(u_char *buf, nxt_realtime_t *now,
206     struct tm *tm, size_t size, const char *format);
207 static void nxt_router_access_log_open(nxt_task_t *task,
208     nxt_router_temp_conf_t *tmcf);
209 static void nxt_router_access_log_ready(nxt_task_t *task,
210     nxt_port_recv_msg_t *msg, void *data);
211 static void nxt_router_access_log_error(nxt_task_t *task,
212     nxt_port_recv_msg_t *msg, void *data);
213 static void nxt_router_access_log_use(nxt_thread_spinlock_t *lock,
214     nxt_router_access_log_t *access_log);
215 static void nxt_router_access_log_release(nxt_task_t *task,
216     nxt_thread_spinlock_t *lock, nxt_router_access_log_t *access_log);
217 static void nxt_router_access_log_reopen_completion(nxt_task_t *task, void *obj,
218     void *data);
219 static void nxt_router_access_log_reopen_ready(nxt_task_t *task,
220     nxt_port_recv_msg_t *msg, void *data);
221 static void nxt_router_access_log_reopen_error(nxt_task_t *task,
222     nxt_port_recv_msg_t *msg, void *data);
223 
224 static void nxt_router_app_port_ready(nxt_task_t *task,
225     nxt_port_recv_msg_t *msg, void *data);
226 static nxt_int_t nxt_router_app_shared_port_send(nxt_task_t *task,
227     nxt_port_t *app_port);
228 static void nxt_router_app_port_error(nxt_task_t *task,
229     nxt_port_recv_msg_t *msg, void *data);
230 
231 static void nxt_router_app_use(nxt_task_t *task, nxt_app_t *app, int i);
232 static void nxt_router_app_unlink(nxt_task_t *task, nxt_app_t *app);
233 
234 static void nxt_router_app_port_release(nxt_task_t *task, nxt_app_t *app,
235     nxt_port_t *port, nxt_apr_action_t action);
236 static void nxt_router_app_port_get(nxt_task_t *task, nxt_app_t *app,
237     nxt_request_rpc_data_t *req_rpc_data);
238 static void nxt_router_http_request_error(nxt_task_t *task, void *obj,
239     void *data);
240 static void nxt_router_http_request_done(nxt_task_t *task, void *obj,
241     void *data);
242 
243 static void nxt_router_app_prepare_request(nxt_task_t *task,
244     nxt_request_rpc_data_t *req_rpc_data);
245 static nxt_buf_t *nxt_router_prepare_msg(nxt_task_t *task,
246     nxt_http_request_t *r, nxt_app_t *app, const nxt_str_t *prefix);
247 
248 static void nxt_router_app_timeout(nxt_task_t *task, void *obj, void *data);
249 static void nxt_router_adjust_idle_timer(nxt_task_t *task, void *obj,
250     void *data);
251 static void nxt_router_app_idle_timeout(nxt_task_t *task, void *obj,
252     void *data);
253 static void nxt_router_app_joint_release_handler(nxt_task_t *task, void *obj,
254     void *data);
255 static void nxt_router_free_app(nxt_task_t *task, void *obj, void *data);
256 
257 static const nxt_http_request_state_t  nxt_http_request_send_state;
258 static void nxt_http_request_send_body(nxt_task_t *task, void *obj, void *data);
259 
260 static void nxt_router_app_joint_use(nxt_task_t *task,
261     nxt_app_joint_t *app_joint, int i);
262 
263 static void nxt_router_http_request_release_post(nxt_task_t *task,
264     nxt_http_request_t *r);
265 static void nxt_router_http_request_release(nxt_task_t *task, void *obj,
266     void *data);
267 static void nxt_router_oosm_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg);
268 static void nxt_router_get_port_handler(nxt_task_t *task,
269     nxt_port_recv_msg_t *msg);
270 static void nxt_router_get_mmap_handler(nxt_task_t *task,
271     nxt_port_recv_msg_t *msg);
272 
273 extern const nxt_http_request_state_t  nxt_http_websocket;
274 
275 static nxt_router_t  *nxt_router;
276 
277 static const nxt_str_t http_prefix = nxt_string("HTTP_");
278 static const nxt_str_t empty_prefix = nxt_string("");
279 
280 static const nxt_str_t  *nxt_app_msg_prefix[] = {
281     &empty_prefix,
282     &empty_prefix,
283     &http_prefix,
284     &http_prefix,
285     &http_prefix,
286     &empty_prefix,
287 };
288 
289 
290 static const nxt_port_handlers_t  nxt_router_process_port_handlers = {
291     .quit         = nxt_signal_quit_handler,
292     .new_port     = nxt_router_new_port_handler,
293     .get_port     = nxt_router_get_port_handler,
294     .change_file  = nxt_port_change_log_file_handler,
295     .mmap         = nxt_port_mmap_handler,
296     .get_mmap     = nxt_router_get_mmap_handler,
297     .data         = nxt_router_conf_data_handler,
298     .app_restart  = nxt_router_app_restart_handler,
299     .remove_pid   = nxt_router_remove_pid_handler,
300     .access_log   = nxt_router_access_log_reopen_handler,
301     .rpc_ready    = nxt_port_rpc_handler,
302     .rpc_error    = nxt_port_rpc_handler,
303     .oosm         = nxt_router_oosm_handler,
304 };
305 
306 
307 const nxt_process_init_t  nxt_router_process = {
308     .name           = "router",
309     .type           = NXT_PROCESS_ROUTER,
310     .prefork        = nxt_router_prefork,
311     .restart        = 1,
312     .setup          = nxt_process_core_setup,
313     .start          = nxt_router_start,
314     .port_handlers  = &nxt_router_process_port_handlers,
315     .signals        = nxt_process_signals,
316 };
317 
318 
319 /* Queues of nxt_socket_conf_t */
320 nxt_queue_t  creating_sockets;
321 nxt_queue_t  pending_sockets;
322 nxt_queue_t  updating_sockets;
323 nxt_queue_t  keeping_sockets;
324 nxt_queue_t  deleting_sockets;
325 
326 
327 static nxt_int_t
328 nxt_router_prefork(nxt_task_t *task, nxt_process_t *process, nxt_mp_t *mp)
329 {
330     nxt_runtime_stop_app_processes(task, task->thread->runtime);
331 
332     return NXT_OK;
333 }
334 
335 
336 static nxt_int_t
337 nxt_router_start(nxt_task_t *task, nxt_process_data_t *data)
338 {
339     nxt_int_t      ret;
340     nxt_port_t     *controller_port;
341     nxt_router_t   *router;
342     nxt_runtime_t  *rt;
343 
344     rt = task->thread->runtime;
345 
346     nxt_log(task, NXT_LOG_INFO, "router started");
347 
348 #if (NXT_TLS)
349     rt->tls = nxt_service_get(rt->services, "SSL/TLS", "OpenSSL");
350     if (nxt_slow_path(rt->tls == NULL)) {
351         return NXT_ERROR;
352     }
353 
354     ret = rt->tls->library_init(task);
355     if (nxt_slow_path(ret != NXT_OK)) {
356         return ret;
357     }
358 #endif
359 
360     ret = nxt_http_init(task);
361     if (nxt_slow_path(ret != NXT_OK)) {
362         return ret;
363     }
364 
365     router = nxt_zalloc(sizeof(nxt_router_t));
366     if (nxt_slow_path(router == NULL)) {
367         return NXT_ERROR;
368     }
369 
370     nxt_queue_init(&router->engines);
371     nxt_queue_init(&router->sockets);
372     nxt_queue_init(&router->apps);
373 
374     nxt_router = router;
375 
376     controller_port = rt->port_by_type[NXT_PROCESS_CONTROLLER];
377     if (controller_port != NULL) {
378         nxt_router_greet_controller(task, controller_port);
379     }
380 
381     return NXT_OK;
382 }
383 
384 
385 static void
386 nxt_router_greet_controller(nxt_task_t *task, nxt_port_t *controller_port)
387 {
388     nxt_port_socket_write(task, controller_port, NXT_PORT_MSG_PROCESS_READY,
389                           -1, 0, 0, NULL);
390 }
391 
392 
393 static void
394 nxt_router_start_app_process_handler(nxt_task_t *task, nxt_port_t *port,
395     void *data)
396 {
397     size_t               size;
398     uint32_t             stream;
399     nxt_int_t            ret;
400     nxt_app_t            *app;
401     nxt_buf_t            *b;
402     nxt_port_t           *dport;
403     nxt_runtime_t        *rt;
404     nxt_app_joint_rpc_t  *app_joint_rpc;
405 
406     app = data;
407 
408     nxt_thread_mutex_lock(&app->mutex);
409 
410     dport = app->proto_port;
411 
412     nxt_thread_mutex_unlock(&app->mutex);
413 
414     if (dport != NULL) {
415         nxt_debug(task, "app '%V' %p start process", &app->name, app);
416 
417         b = NULL;
418 
419     } else {
420         if (app->proto_port_requests > 0) {
421             nxt_debug(task, "app '%V' %p wait for prototype process",
422                       &app->name, app);
423 
424             app->proto_port_requests++;
425 
426             goto skip;
427         }
428 
429         nxt_debug(task, "app '%V' %p start prototype process", &app->name, app);
430 
431         rt = task->thread->runtime;
432         dport = rt->port_by_type[NXT_PROCESS_MAIN];
433 
434         size = app->name.length + 1 + app->conf.length;
435 
436         b = nxt_buf_mem_alloc(task->thread->engine->mem_pool, size, 0);
437         if (nxt_slow_path(b == NULL)) {
438             goto failed;
439         }
440 
441         nxt_buf_cpystr(b, &app->name);
442         *b->mem.free++ = '\0';
443         nxt_buf_cpystr(b, &app->conf);
444     }
445 
446     app_joint_rpc = nxt_port_rpc_register_handler_ex(task, port,
447                                                      nxt_router_app_port_ready,
448                                                      nxt_router_app_port_error,
449                                                    sizeof(nxt_app_joint_rpc_t));
450     if (nxt_slow_path(app_joint_rpc == NULL)) {
451         goto failed;
452     }
453 
454     stream = nxt_port_rpc_ex_stream(app_joint_rpc);
455 
456     ret = nxt_port_socket_write(task, dport, NXT_PORT_MSG_START_PROCESS,
457                                 -1, stream, port->id, b);
458     if (nxt_slow_path(ret != NXT_OK)) {
459         nxt_port_rpc_cancel(task, port, stream);
460 
461         goto failed;
462     }
463 
464     app_joint_rpc->app_joint = app->joint;
465     app_joint_rpc->generation = app->generation;
466     app_joint_rpc->proto = (b != NULL);
467 
468     if (b != NULL) {
469         app->proto_port_requests++;
470 
471         b = NULL;
472     }
473 
474     nxt_router_app_joint_use(task, app->joint, 1);
475 
476 failed:
477 
478     if (b != NULL) {
479         nxt_mp_free(b->data, b);
480     }
481 
482 skip:
483 
484     nxt_router_app_use(task, app, -1);
485 }
486 
487 
488 static void
489 nxt_router_app_joint_use(nxt_task_t *task, nxt_app_joint_t *app_joint, int i)
490 {
491     app_joint->use_count += i;
492 
493     if (app_joint->use_count == 0) {
494         nxt_assert(app_joint->app == NULL);
495 
496         nxt_free(app_joint);
497     }
498 }
499 
500 
501 static nxt_int_t
502 nxt_router_start_app_process(nxt_task_t *task, nxt_app_t *app)
503 {
504     nxt_int_t      res;
505     nxt_port_t     *router_port;
506     nxt_runtime_t  *rt;
507 
508     nxt_debug(task, "app '%V' start process", &app->name);
509 
510     rt = task->thread->runtime;
511     router_port = rt->port_by_type[NXT_PROCESS_ROUTER];
512 
513     nxt_router_app_use(task, app, 1);
514 
515     res = nxt_port_post(task, router_port, nxt_router_start_app_process_handler,
516                         app);
517 
518     if (res == NXT_OK) {
519         return res;
520     }
521 
522     nxt_thread_mutex_lock(&app->mutex);
523 
524     app->pending_processes--;
525 
526     nxt_thread_mutex_unlock(&app->mutex);
527 
528     nxt_router_app_use(task, app, -1);
529 
530     return NXT_ERROR;
531 }
532 
533 
534 nxt_inline nxt_bool_t
535 nxt_router_msg_cancel(nxt_task_t *task, nxt_request_rpc_data_t *req_rpc_data)
536 {
537     nxt_buf_t       *b, *next;
538     nxt_bool_t      cancelled;
539     nxt_port_t      *app_port;
540     nxt_msg_info_t  *msg_info;
541 
542     msg_info = &req_rpc_data->msg_info;
543 
544     if (msg_info->buf == NULL) {
545         return 0;
546     }
547 
548     app_port = req_rpc_data->app_port;
549 
550     if (app_port != NULL && app_port->id == NXT_SHARED_PORT_ID) {
551         cancelled = nxt_app_queue_cancel(app_port->queue,
552                                          msg_info->tracking_cookie,
553                                          req_rpc_data->stream);
554 
555         if (cancelled) {
556             nxt_debug(task, "stream #%uD: cancelled by router",
557                       req_rpc_data->stream);
558         }
559 
560     } else {
561         cancelled = 0;
562     }
563 
564     for (b = msg_info->buf; b != NULL; b = next) {
565         next = b->next;
566         b->next = NULL;
567 
568         if (b->is_port_mmap_sent) {
569             b->is_port_mmap_sent = cancelled == 0;
570         }
571 
572         b->completion_handler(task, b, b->parent);
573     }
574 
575     msg_info->buf = NULL;
576 
577     return cancelled;
578 }
579 
580 
581 nxt_inline nxt_bool_t
582 nxt_queue_chk_remove(nxt_queue_link_t *lnk)
583 {
584     if (lnk->next != NULL) {
585         nxt_queue_remove(lnk);
586 
587         lnk->next = NULL;
588 
589         return 1;
590     }
591 
592     return 0;
593 }
594 
595 
596 nxt_inline void
597 nxt_request_rpc_data_unlink(nxt_task_t *task,
598     nxt_request_rpc_data_t *req_rpc_data)
599 {
600     nxt_app_t           *app;
601     nxt_bool_t          unlinked;
602     nxt_http_request_t  *r;
603 
604     nxt_router_msg_cancel(task, req_rpc_data);
605 
606     app = req_rpc_data->app;
607 
608     if (req_rpc_data->app_port != NULL) {
609         nxt_router_app_port_release(task, app, req_rpc_data->app_port,
610                                     req_rpc_data->apr_action);
611 
612         req_rpc_data->app_port = NULL;
613     }
614 
615     r = req_rpc_data->request;
616 
617     if (r != NULL) {
618         r->timer_data = NULL;
619 
620         nxt_router_http_request_release_post(task, r);
621 
622         r->req_rpc_data = NULL;
623         req_rpc_data->request = NULL;
624 
625         if (app != NULL) {
626             unlinked = 0;
627 
628             nxt_thread_mutex_lock(&app->mutex);
629 
630             if (r->app_link.next != NULL) {
631                 nxt_queue_remove(&r->app_link);
632                 r->app_link.next = NULL;
633 
634                 unlinked = 1;
635             }
636 
637             nxt_thread_mutex_unlock(&app->mutex);
638 
639             if (unlinked) {
640                 nxt_mp_release(r->mem_pool);
641             }
642         }
643     }
644 
645     if (app != NULL) {
646         nxt_router_app_use(task, app, -1);
647 
648         req_rpc_data->app = NULL;
649     }
650 
651     if (req_rpc_data->msg_info.body_fd != -1) {
652         nxt_fd_close(req_rpc_data->msg_info.body_fd);
653 
654         req_rpc_data->msg_info.body_fd = -1;
655     }
656 
657     if (req_rpc_data->rpc_cancel) {
658         req_rpc_data->rpc_cancel = 0;
659 
660         nxt_port_rpc_cancel(task, task->thread->engine->port,
661                             req_rpc_data->stream);
662     }
663 }
664 
665 
666 static void
667 nxt_router_new_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
668 {
669     nxt_int_t      res;
670     nxt_app_t      *app;
671     nxt_port_t     *port, *main_app_port;
672     nxt_runtime_t  *rt;
673 
674     nxt_port_new_port_handler(task, msg);
675 
676     port = msg->u.new_port;
677 
678     if (port != NULL && port->type == NXT_PROCESS_CONTROLLER) {
679         nxt_router_greet_controller(task, msg->u.new_port);
680     }
681 
682     if (port != NULL && port->type == NXT_PROCESS_PROTOTYPE)  {
683         nxt_port_rpc_handler(task, msg);
684 
685         return;
686     }
687 
688     if (port == NULL || port->type != NXT_PROCESS_APP) {
689 
690         if (msg->port_msg.stream == 0) {
691             return;
692         }
693 
694         msg->port_msg.type = _NXT_PORT_MSG_RPC_ERROR;
695 
696     } else {
697         if (msg->fd[1] != -1) {
698             res = nxt_router_port_queue_map(task, port, msg->fd[1]);
699             if (nxt_slow_path(res != NXT_OK)) {
700                 return;
701             }
702 
703             nxt_fd_close(msg->fd[1]);
704             msg->fd[1] = -1;
705         }
706     }
707 
708     if (msg->port_msg.stream != 0) {
709         nxt_port_rpc_handler(task, msg);
710         return;
711     }
712 
713     nxt_debug(task, "new port id %d (%d)", port->id, port->type);
714 
715     /*
716      * Port with "id == 0" is application 'main' port and it always
717      * should come with non-zero stream.
718      */
719     nxt_assert(port->id != 0);
720 
721     /* Find 'main' app port and get app reference. */
722     rt = task->thread->runtime;
723 
724     /*
725      * It is safe to access 'runtime->ports' hash because 'NEW_PORT'
726      * sent to main port (with id == 0) and processed in main thread.
727      */
728     main_app_port = nxt_port_hash_find(&rt->ports, port->pid, 0);
729     nxt_assert(main_app_port != NULL);
730 
731     app = main_app_port->app;
732 
733     if (nxt_fast_path(app != NULL)) {
734         nxt_thread_mutex_lock(&app->mutex);
735 
736         /* TODO here should be find-and-add code because there can be
737            port waiters in port_hash */
738         nxt_port_hash_add(&app->port_hash, port);
739         app->port_hash_count++;
740 
741         nxt_thread_mutex_unlock(&app->mutex);
742 
743         port->app = app;
744     }
745 
746     port->main_app_port = main_app_port;
747 
748     nxt_port_socket_write(task, port, NXT_PORT_MSG_PORT_ACK, -1, 0, 0, NULL);
749 }
750 
751 
752 static void
753 nxt_router_conf_data_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
754 {
755     void                    *p;
756     size_t                  size;
757     nxt_int_t               ret;
758     nxt_port_t              *port;
759     nxt_router_temp_conf_t  *tmcf;
760 
761     port = nxt_runtime_port_find(task->thread->runtime,
762                                  msg->port_msg.pid,
763                                  msg->port_msg.reply_port);
764     if (nxt_slow_path(port == NULL)) {
765         nxt_alert(task, "conf_data_handler: reply port not found");
766         return;
767     }
768 
769     p = MAP_FAILED;
770 
771     /*
772      * Ancient compilers like gcc 4.8.5 on CentOS 7 wants 'size' to be
773      * initialized in 'cleanup' section.
774      */
775     size = 0;
776 
777     tmcf = nxt_router_temp_conf(task);
778     if (nxt_slow_path(tmcf == NULL)) {
779         goto fail;
780     }
781 
782     if (nxt_slow_path(msg->fd[0] == -1)) {
783         nxt_alert(task, "conf_data_handler: invalid shm fd");
784         goto fail;
785     }
786 
787     if (nxt_buf_mem_used_size(&msg->buf->mem) != sizeof(size_t)) {
788         nxt_alert(task, "conf_data_handler: unexpected buffer size (%d)",
789                   (int) nxt_buf_mem_used_size(&msg->buf->mem));
790         goto fail;
791     }
792 
793     nxt_memcpy(&size, msg->buf->mem.pos, sizeof(size_t));
794 
795     p = nxt_mem_mmap(NULL, size, PROT_READ, MAP_SHARED, msg->fd[0], 0);
796 
797     nxt_fd_close(msg->fd[0]);
798     msg->fd[0] = -1;
799 
800     if (nxt_slow_path(p == MAP_FAILED)) {
801         goto fail;
802     }
803 
804     nxt_debug(task, "conf_data_handler(%uz): %*s", size, size, p);
805 
806     tmcf->router_conf->router = nxt_router;
807     tmcf->stream = msg->port_msg.stream;
808     tmcf->port = port;
809 
810     nxt_port_use(task, tmcf->port, 1);
811 
812     ret = nxt_router_conf_create(task, tmcf, p, nxt_pointer_to(p, size));
813 
814     if (nxt_fast_path(ret == NXT_OK)) {
815         nxt_router_conf_apply(task, tmcf, NULL);
816 
817     } else {
818         nxt_router_conf_error(task, tmcf);
819     }
820 
821     goto cleanup;
822 
823 fail:
824 
825     nxt_port_socket_write(task, port, NXT_PORT_MSG_RPC_ERROR, -1,
826                           msg->port_msg.stream, 0, NULL);
827 
828     if (tmcf != NULL) {
829         nxt_mp_release(tmcf->mem_pool);
830     }
831 
832 cleanup:
833 
834     if (p != MAP_FAILED) {
835         nxt_mem_munmap(p, size);
836     }
837 
838     if (msg->fd[0] != -1) {
839         nxt_fd_close(msg->fd[0]);
840         msg->fd[0] = -1;
841     }
842 }
843 
844 
845 static void
846 nxt_router_app_restart_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
847 {
848     nxt_app_t            *app;
849     nxt_int_t            ret;
850     nxt_str_t            app_name;
851     nxt_port_t           *reply_port, *shared_port, *old_shared_port;
852     nxt_port_t           *proto_port;
853     nxt_port_msg_type_t  reply;
854 
855     reply_port = nxt_runtime_port_find(task->thread->runtime,
856                                        msg->port_msg.pid,
857                                        msg->port_msg.reply_port);
858     if (nxt_slow_path(reply_port == NULL)) {
859         nxt_alert(task, "app_restart_handler: reply port not found");
860         return;
861     }
862 
863     app_name.length = nxt_buf_mem_used_size(&msg->buf->mem);
864     app_name.start = msg->buf->mem.pos;
865 
866     nxt_debug(task, "app_restart_handler: %V", &app_name);
867 
868     app = nxt_router_app_find(&nxt_router->apps, &app_name);
869 
870     if (nxt_fast_path(app != NULL)) {
871         shared_port = nxt_port_new(task, NXT_SHARED_PORT_ID, nxt_pid,
872                                    NXT_PROCESS_APP);
873         if (nxt_slow_path(shared_port == NULL)) {
874             goto fail;
875         }
876 
877         ret = nxt_port_socket_init(task, shared_port, 0);
878         if (nxt_slow_path(ret != NXT_OK)) {
879             nxt_port_use(task, shared_port, -1);
880             goto fail;
881         }
882 
883         ret = nxt_router_app_queue_init(task, shared_port);
884         if (nxt_slow_path(ret != NXT_OK)) {
885             nxt_port_write_close(shared_port);
886             nxt_port_read_close(shared_port);
887             nxt_port_use(task, shared_port, -1);
888             goto fail;
889         }
890 
891         nxt_port_write_enable(task, shared_port);
892 
893         nxt_thread_mutex_lock(&app->mutex);
894 
895         proto_port = app->proto_port;
896 
897         if (proto_port != NULL) {
898             nxt_debug(task, "send QUIT to prototype '%V' pid %PI", &app->name,
899                       proto_port->pid);
900 
901             app->proto_port = NULL;
902             proto_port->app = NULL;
903         }
904 
905         app->generation++;
906 
907         shared_port->app = app;
908 
909         old_shared_port = app->shared_port;
910         old_shared_port->app = NULL;
911 
912         app->shared_port = shared_port;
913 
914         nxt_thread_mutex_unlock(&app->mutex);
915 
916         nxt_port_close(task, old_shared_port);
917         nxt_port_use(task, old_shared_port, -1);
918 
919         if (proto_port != NULL) {
920             (void) nxt_port_socket_write(task, proto_port, NXT_PORT_MSG_QUIT,
921                                          -1, 0, 0, NULL);
922 
923             nxt_port_close(task, proto_port);
924 
925             nxt_port_use(task, proto_port, -1);
926         }
927 
928         reply = NXT_PORT_MSG_RPC_READY_LAST;
929 
930     } else {
931 
932 fail:
933 
934         reply = NXT_PORT_MSG_RPC_ERROR;
935     }
936 
937     nxt_port_socket_write(task, reply_port, reply, -1, msg->port_msg.stream,
938                           0, NULL);
939 }
940 
941 
942 static void
943 nxt_router_app_process_remove_pid(nxt_task_t *task, nxt_port_t *port,
944     void *data)
945 {
946     union {
947         nxt_pid_t  removed_pid;
948         void       *data;
949     } u;
950 
951     u.data = data;
952 
953     nxt_port_rpc_remove_peer(task, port, u.removed_pid);
954 }
955 
956 
957 static void
958 nxt_router_remove_pid_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
959 {
960     nxt_event_engine_t  *engine;
961 
962     nxt_port_remove_pid_handler(task, msg);
963 
964     nxt_queue_each(engine, &nxt_router->engines, nxt_event_engine_t, link0)
965     {
966         if (nxt_fast_path(engine->port != NULL)) {
967             nxt_port_post(task, engine->port, nxt_router_app_process_remove_pid,
968                           msg->u.data);
969         }
970     }
971     nxt_queue_loop;
972 
973     if (msg->port_msg.stream == 0) {
974         return;
975     }
976 
977     msg->port_msg.type = _NXT_PORT_MSG_RPC_ERROR;
978 
979     nxt_port_rpc_handler(task, msg);
980 }
981 
982 
983 static nxt_router_temp_conf_t *
984 nxt_router_temp_conf(nxt_task_t *task)
985 {
986     nxt_mp_t                *mp, *tmp;
987     nxt_router_conf_t       *rtcf;
988     nxt_router_temp_conf_t  *tmcf;
989 
990     mp = nxt_mp_create(1024, 128, 256, 32);
991     if (nxt_slow_path(mp == NULL)) {
992         return NULL;
993     }
994 
995     rtcf = nxt_mp_zget(mp, sizeof(nxt_router_conf_t));
996     if (nxt_slow_path(rtcf == NULL)) {
997         goto fail;
998     }
999 
1000     rtcf->mem_pool = mp;
1001 
1002     tmp = nxt_mp_create(1024, 128, 256, 32);
1003     if (nxt_slow_path(tmp == NULL)) {
1004         goto fail;
1005     }
1006 
1007     tmcf = nxt_mp_zget(tmp, sizeof(nxt_router_temp_conf_t));
1008     if (nxt_slow_path(tmcf == NULL)) {
1009         goto temp_fail;
1010     }
1011 
1012     tmcf->mem_pool = tmp;
1013     tmcf->router_conf = rtcf;
1014     tmcf->count = 1;
1015     tmcf->engine = task->thread->engine;
1016 
1017     tmcf->engines = nxt_array_create(tmcf->mem_pool, 4,
1018                                      sizeof(nxt_router_engine_conf_t));
1019     if (nxt_slow_path(tmcf->engines == NULL)) {
1020         goto temp_fail;
1021     }
1022 
1023     nxt_queue_init(&creating_sockets);
1024     nxt_queue_init(&pending_sockets);
1025     nxt_queue_init(&updating_sockets);
1026     nxt_queue_init(&keeping_sockets);
1027     nxt_queue_init(&deleting_sockets);
1028 
1029 #if (NXT_TLS)
1030     nxt_queue_init(&tmcf->tls);
1031 #endif
1032 
1033     nxt_queue_init(&tmcf->apps);
1034     nxt_queue_init(&tmcf->previous);
1035 
1036     return tmcf;
1037 
1038 temp_fail:
1039 
1040     nxt_mp_destroy(tmp);
1041 
1042 fail:
1043 
1044     nxt_mp_destroy(mp);
1045 
1046     return NULL;
1047 }
1048 
1049 
1050 nxt_inline nxt_bool_t
1051 nxt_router_app_can_start(nxt_app_t *app)
1052 {
1053     return app->processes + app->pending_processes < app->max_processes
1054             && app->pending_processes < app->max_pending_processes;
1055 }
1056 
1057 
1058 nxt_inline nxt_bool_t
1059 nxt_router_app_need_start(nxt_app_t *app)
1060 {
1061     return (app->active_requests
1062               > app->port_hash_count + app->pending_processes)
1063            || (app->spare_processes
1064                 > app->idle_processes + app->pending_processes);
1065 }
1066 
1067 
1068 static void
1069 nxt_router_conf_apply(nxt_task_t *task, void *obj, void *data)
1070 {
1071     nxt_int_t                    ret;
1072     nxt_app_t                    *app;
1073     nxt_router_t                 *router;
1074     nxt_runtime_t                *rt;
1075     nxt_queue_link_t             *qlk;
1076     nxt_socket_conf_t            *skcf;
1077     nxt_router_conf_t            *rtcf;
1078     nxt_router_temp_conf_t       *tmcf;
1079     const nxt_event_interface_t  *interface;
1080 #if (NXT_TLS)
1081     nxt_router_tlssock_t         *tls;
1082 #endif
1083 
1084     tmcf = obj;
1085 
1086     qlk = nxt_queue_first(&pending_sockets);
1087 
1088     if (qlk != nxt_queue_tail(&pending_sockets)) {
1089         nxt_queue_remove(qlk);
1090         nxt_queue_insert_tail(&creating_sockets, qlk);
1091 
1092         skcf = nxt_queue_link_data(qlk, nxt_socket_conf_t, link);
1093 
1094         nxt_router_listen_socket_rpc_create(task, tmcf, skcf);
1095 
1096         return;
1097     }
1098 
1099 #if (NXT_TLS)
1100     qlk = nxt_queue_last(&tmcf->tls);
1101 
1102     if (qlk != nxt_queue_head(&tmcf->tls)) {
1103         nxt_queue_remove(qlk);
1104 
1105         tls = nxt_queue_link_data(qlk, nxt_router_tlssock_t, link);
1106 
1107         nxt_cert_store_get(task, &tls->name, tmcf->mem_pool,
1108                            nxt_router_tls_rpc_handler, tls);
1109         return;
1110     }
1111 #endif
1112 
1113     nxt_queue_each(app, &tmcf->apps, nxt_app_t, link) {
1114 
1115         if (nxt_router_app_need_start(app)) {
1116             nxt_router_app_rpc_create(task, tmcf, app);
1117             return;
1118         }
1119 
1120     } nxt_queue_loop;
1121 
1122     rtcf = tmcf->router_conf;
1123 
1124     if (rtcf->access_log != NULL && rtcf->access_log->fd == -1) {
1125         nxt_router_access_log_open(task, tmcf);
1126         return;
1127     }
1128 
1129     rt = task->thread->runtime;
1130 
1131     interface = nxt_service_get(rt->services, "engine", NULL);
1132 
1133     router = rtcf->router;
1134 
1135     ret = nxt_router_engines_create(task, router, tmcf, interface);
1136     if (nxt_slow_path(ret != NXT_OK)) {
1137         goto fail;
1138     }
1139 
1140     ret = nxt_router_threads_create(task, rt, tmcf);
1141     if (nxt_slow_path(ret != NXT_OK)) {
1142         goto fail;
1143     }
1144 
1145     nxt_router_apps_sort(task, router, tmcf);
1146 
1147     nxt_router_apps_hash_use(task, rtcf, 1);
1148 
1149     nxt_router_engines_post(router, tmcf);
1150 
1151     nxt_queue_add(&router->sockets, &updating_sockets);
1152     nxt_queue_add(&router->sockets, &creating_sockets);
1153 
1154     if (router->access_log != rtcf->access_log) {
1155         nxt_router_access_log_use(&router->lock, rtcf->access_log);
1156 
1157         nxt_router_access_log_release(task, &router->lock, router->access_log);
1158 
1159         router->access_log = rtcf->access_log;
1160     }
1161 
1162     nxt_router_conf_ready(task, tmcf);
1163 
1164     return;
1165 
1166 fail:
1167 
1168     nxt_router_conf_error(task, tmcf);
1169 
1170     return;
1171 }
1172 
1173 
1174 static void
1175 nxt_router_conf_wait(nxt_task_t *task, void *obj, void *data)
1176 {
1177     nxt_joint_job_t  *job;
1178 
1179     job = obj;
1180 
1181     nxt_router_conf_ready(task, job->tmcf);
1182 }
1183 
1184 
1185 static void
1186 nxt_router_conf_ready(nxt_task_t *task, nxt_router_temp_conf_t *tmcf)
1187 {
1188     uint32_t               count;
1189     nxt_router_conf_t      *rtcf;
1190     nxt_thread_spinlock_t  *lock;
1191 
1192     nxt_debug(task, "temp conf %p count: %D", tmcf, tmcf->count);
1193 
1194     if (--tmcf->count > 0) {
1195         return;
1196     }
1197 
1198     nxt_router_conf_send(task, tmcf, NXT_PORT_MSG_RPC_READY_LAST);
1199 
1200     rtcf = tmcf->router_conf;
1201 
1202     lock = &rtcf->router->lock;
1203 
1204     nxt_thread_spin_lock(lock);
1205 
1206     count = rtcf->count;
1207 
1208     nxt_thread_spin_unlock(lock);
1209 
1210     nxt_debug(task, "rtcf %p: %D", rtcf, count);
1211 
1212     if (count == 0) {
1213         nxt_router_apps_hash_use(task, rtcf, -1);
1214 
1215         nxt_router_access_log_release(task, lock, rtcf->access_log);
1216 
1217         nxt_mp_destroy(rtcf->mem_pool);
1218     }
1219 
1220     nxt_mp_release(tmcf->mem_pool);
1221 }
1222 
1223 
1224 static void
1225 nxt_router_conf_error(nxt_task_t *task, nxt_router_temp_conf_t *tmcf)
1226 {
1227     nxt_app_t          *app;
1228     nxt_queue_t        new_socket_confs;
1229     nxt_socket_t       s;
1230     nxt_router_t       *router;
1231     nxt_queue_link_t   *qlk;
1232     nxt_socket_conf_t  *skcf;
1233     nxt_router_conf_t  *rtcf;
1234 
1235     nxt_alert(task, "failed to apply new conf");
1236 
1237     for (qlk = nxt_queue_first(&creating_sockets);
1238          qlk != nxt_queue_tail(&creating_sockets);
1239          qlk = nxt_queue_next(qlk))
1240     {
1241         skcf = nxt_queue_link_data(qlk, nxt_socket_conf_t, link);
1242         s = skcf->listen->socket;
1243 
1244         if (s != -1) {
1245             nxt_socket_close(task, s);
1246         }
1247 
1248         nxt_free(skcf->listen);
1249     }
1250 
1251     nxt_queue_init(&new_socket_confs);
1252     nxt_queue_add(&new_socket_confs, &updating_sockets);
1253     nxt_queue_add(&new_socket_confs, &pending_sockets);
1254     nxt_queue_add(&new_socket_confs, &creating_sockets);
1255 
1256     rtcf = tmcf->router_conf;
1257 
1258     nxt_queue_each(app, &tmcf->apps, nxt_app_t, link) {
1259 
1260         nxt_router_app_unlink(task, app);
1261 
1262     } nxt_queue_loop;
1263 
1264     router = rtcf->router;
1265 
1266     nxt_queue_add(&router->sockets, &keeping_sockets);
1267     nxt_queue_add(&router->sockets, &deleting_sockets);
1268 
1269     nxt_queue_add(&router->apps, &tmcf->previous);
1270 
1271     // TODO: new engines and threads
1272 
1273     nxt_router_access_log_release(task, &router->lock, rtcf->access_log);
1274 
1275     nxt_mp_destroy(rtcf->mem_pool);
1276 
1277     nxt_router_conf_send(task, tmcf, NXT_PORT_MSG_RPC_ERROR);
1278 
1279     nxt_mp_release(tmcf->mem_pool);
1280 }
1281 
1282 
1283 static void
1284 nxt_router_conf_send(nxt_task_t *task, nxt_router_temp_conf_t *tmcf,
1285     nxt_port_msg_type_t type)
1286 {
1287     nxt_port_socket_write(task, tmcf->port, type, -1, tmcf->stream, 0, NULL);
1288 
1289     nxt_port_use(task, tmcf->port, -1);
1290 
1291     tmcf->port = NULL;
1292 }
1293 
1294 
1295 static nxt_conf_map_t  nxt_router_conf[] = {
1296     {
1297         nxt_string("listeners_threads"),
1298         NXT_CONF_MAP_INT32,
1299         offsetof(nxt_router_conf_t, threads),
1300     },
1301 };
1302 
1303 
1304 static nxt_conf_map_t  nxt_router_app_conf[] = {
1305     {
1306         nxt_string("type"),
1307         NXT_CONF_MAP_STR,
1308         offsetof(nxt_router_app_conf_t, type),
1309     },
1310 
1311     {
1312         nxt_string("limits"),
1313         NXT_CONF_MAP_PTR,
1314         offsetof(nxt_router_app_conf_t, limits_value),
1315     },
1316 
1317     {
1318         nxt_string("processes"),
1319         NXT_CONF_MAP_INT32,
1320         offsetof(nxt_router_app_conf_t, processes),
1321     },
1322 
1323     {
1324         nxt_string("processes"),
1325         NXT_CONF_MAP_PTR,
1326         offsetof(nxt_router_app_conf_t, processes_value),
1327     },
1328 
1329     {
1330         nxt_string("targets"),
1331         NXT_CONF_MAP_PTR,
1332         offsetof(nxt_router_app_conf_t, targets_value),
1333     },
1334 };
1335 
1336 
1337 static nxt_conf_map_t  nxt_router_app_limits_conf[] = {
1338     {
1339         nxt_string("timeout"),
1340         NXT_CONF_MAP_MSEC,
1341         offsetof(nxt_router_app_conf_t, timeout),
1342     },
1343 };
1344 
1345 
1346 static nxt_conf_map_t  nxt_router_app_processes_conf[] = {
1347     {
1348         nxt_string("spare"),
1349         NXT_CONF_MAP_INT32,
1350         offsetof(nxt_router_app_conf_t, spare_processes),
1351     },
1352 
1353     {
1354         nxt_string("max"),
1355         NXT_CONF_MAP_INT32,
1356         offsetof(nxt_router_app_conf_t, max_processes),
1357     },
1358 
1359     {
1360         nxt_string("idle_timeout"),
1361         NXT_CONF_MAP_MSEC,
1362         offsetof(nxt_router_app_conf_t, idle_timeout),
1363     },
1364 };
1365 
1366 
1367 static nxt_conf_map_t  nxt_router_listener_conf[] = {
1368     {
1369         nxt_string("pass"),
1370         NXT_CONF_MAP_STR_COPY,
1371         offsetof(nxt_router_listener_conf_t, pass),
1372     },
1373 
1374     {
1375         nxt_string("application"),
1376         NXT_CONF_MAP_STR_COPY,
1377         offsetof(nxt_router_listener_conf_t, application),
1378     },
1379 };
1380 
1381 
1382 static nxt_conf_map_t  nxt_router_http_conf[] = {
1383     {
1384         nxt_string("header_buffer_size"),
1385         NXT_CONF_MAP_SIZE,
1386         offsetof(nxt_socket_conf_t, header_buffer_size),
1387     },
1388 
1389     {
1390         nxt_string("large_header_buffer_size"),
1391         NXT_CONF_MAP_SIZE,
1392         offsetof(nxt_socket_conf_t, large_header_buffer_size),
1393     },
1394 
1395     {
1396         nxt_string("large_header_buffers"),
1397         NXT_CONF_MAP_SIZE,
1398         offsetof(nxt_socket_conf_t, large_header_buffers),
1399     },
1400 
1401     {
1402         nxt_string("body_buffer_size"),
1403         NXT_CONF_MAP_SIZE,
1404         offsetof(nxt_socket_conf_t, body_buffer_size),
1405     },
1406 
1407     {
1408         nxt_string("max_body_size"),
1409         NXT_CONF_MAP_SIZE,
1410         offsetof(nxt_socket_conf_t, max_body_size),
1411     },
1412 
1413     {
1414         nxt_string("idle_timeout"),
1415         NXT_CONF_MAP_MSEC,
1416         offsetof(nxt_socket_conf_t, idle_timeout),
1417     },
1418 
1419     {
1420         nxt_string("header_read_timeout"),
1421         NXT_CONF_MAP_MSEC,
1422         offsetof(nxt_socket_conf_t, header_read_timeout),
1423     },
1424 
1425     {
1426         nxt_string("body_read_timeout"),
1427         NXT_CONF_MAP_MSEC,
1428         offsetof(nxt_socket_conf_t, body_read_timeout),
1429     },
1430 
1431     {
1432         nxt_string("send_timeout"),
1433         NXT_CONF_MAP_MSEC,
1434         offsetof(nxt_socket_conf_t, send_timeout),
1435     },
1436 
1437     {
1438         nxt_string("body_temp_path"),
1439         NXT_CONF_MAP_STR,
1440         offsetof(nxt_socket_conf_t, body_temp_path),
1441     },
1442 
1443     {
1444         nxt_string("discard_unsafe_fields"),
1445         NXT_CONF_MAP_INT8,
1446         offsetof(nxt_socket_conf_t, discard_unsafe_fields),
1447     },
1448 };
1449 
1450 
1451 static nxt_conf_map_t  nxt_router_websocket_conf[] = {
1452     {
1453         nxt_string("max_frame_size"),
1454         NXT_CONF_MAP_SIZE,
1455         offsetof(nxt_websocket_conf_t, max_frame_size),
1456     },
1457 
1458     {
1459         nxt_string("read_timeout"),
1460         NXT_CONF_MAP_MSEC,
1461         offsetof(nxt_websocket_conf_t, read_timeout),
1462     },
1463 
1464     {
1465         nxt_string("keepalive_interval"),
1466         NXT_CONF_MAP_MSEC,
1467         offsetof(nxt_websocket_conf_t, keepalive_interval),
1468     },
1469 
1470 };
1471 
1472 
1473 static nxt_int_t
1474 nxt_router_conf_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf,
1475     u_char *start, u_char *end)
1476 {
1477     u_char                      *p;
1478     size_t                      size;
1479     nxt_mp_t                    *mp, *app_mp;
1480     uint32_t                    next, next_target;
1481     nxt_int_t                   ret;
1482     nxt_str_t                   name, path, target;
1483     nxt_app_t                   *app, *prev;
1484     nxt_str_t                   *t, *s, *targets;
1485     nxt_uint_t                  n, i;
1486     nxt_port_t                  *port;
1487     nxt_router_t                *router;
1488     nxt_app_joint_t             *app_joint;
1489 #if (NXT_TLS)
1490     nxt_tls_init_t              *tls_init;
1491     nxt_conf_value_t            *certificate;
1492 #endif
1493     nxt_conf_value_t            *conf, *http, *value, *websocket;
1494     nxt_conf_value_t            *applications, *application;
1495     nxt_conf_value_t            *listeners, *listener;
1496     nxt_conf_value_t            *routes_conf, *static_conf, *client_ip_conf;
1497     nxt_socket_conf_t           *skcf;
1498     nxt_http_routes_t           *routes;
1499     nxt_event_engine_t          *engine;
1500     nxt_app_lang_module_t       *lang;
1501     nxt_router_app_conf_t       apcf;
1502     nxt_router_access_log_t     *access_log;
1503     nxt_router_listener_conf_t  lscf;
1504 
1505     static nxt_str_t  http_path = nxt_string("/settings/http");
1506     static nxt_str_t  applications_path = nxt_string("/applications");
1507     static nxt_str_t  listeners_path = nxt_string("/listeners");
1508     static nxt_str_t  routes_path = nxt_string("/routes");
1509     static nxt_str_t  access_log_path = nxt_string("/access_log");
1510 #if (NXT_TLS)
1511     static nxt_str_t  certificate_path = nxt_string("/tls/certificate");
1512     static nxt_str_t  conf_commands_path = nxt_string("/tls/conf_commands");
1513     static nxt_str_t  conf_cache_path = nxt_string("/tls/session/cache_size");
1514     static nxt_str_t  conf_timeout_path = nxt_string("/tls/session/timeout");
1515     static nxt_str_t  conf_tickets = nxt_string("/tls/session/tickets");
1516 #endif
1517     static nxt_str_t  static_path = nxt_string("/settings/http/static");
1518     static nxt_str_t  websocket_path = nxt_string("/settings/http/websocket");
1519     static nxt_str_t  client_ip_path = nxt_string("/client_ip");
1520 
1521     conf = nxt_conf_json_parse(tmcf->mem_pool, start, end, NULL);
1522     if (conf == NULL) {
1523         nxt_alert(task, "configuration parsing error");
1524         return NXT_ERROR;
1525     }
1526 
1527     mp = tmcf->router_conf->mem_pool;
1528 
1529     ret = nxt_conf_map_object(mp, conf, nxt_router_conf,
1530                               nxt_nitems(nxt_router_conf), tmcf->router_conf);
1531     if (ret != NXT_OK) {
1532         nxt_alert(task, "root map error");
1533         return NXT_ERROR;
1534     }
1535 
1536     if (tmcf->router_conf->threads == 0) {
1537         tmcf->router_conf->threads = nxt_ncpu;
1538     }
1539 
1540     static_conf = nxt_conf_get_path(conf, &static_path);
1541 
1542     ret = nxt_router_conf_process_static(task, tmcf->router_conf, static_conf);
1543     if (nxt_slow_path(ret != NXT_OK)) {
1544         return NXT_ERROR;
1545     }
1546 
1547     router = tmcf->router_conf->router;
1548 
1549     applications = nxt_conf_get_path(conf, &applications_path);
1550 
1551     if (applications != NULL) {
1552         next = 0;
1553 
1554         for ( ;; ) {
1555             application = nxt_conf_next_object_member(applications,
1556                                                       &name, &next);
1557             if (application == NULL) {
1558                 break;
1559             }
1560 
1561             nxt_debug(task, "application \"%V\"", &name);
1562 
1563             size = nxt_conf_json_length(application, NULL);
1564 
1565             app_mp = nxt_mp_create(4096, 128, 1024, 64);
1566             if (nxt_slow_path(app_mp == NULL)) {
1567                 goto fail;
1568             }
1569 
1570             app = nxt_mp_get(app_mp, sizeof(nxt_app_t) + name.length + size);
1571             if (app == NULL) {
1572                 goto app_fail;
1573             }
1574 
1575             nxt_memzero(app, sizeof(nxt_app_t));
1576 
1577             app->mem_pool = app_mp;
1578 
1579             app->name.start = nxt_pointer_to(app, sizeof(nxt_app_t));
1580             app->conf.start = nxt_pointer_to(app, sizeof(nxt_app_t)
1581                                                   + name.length);
1582 
1583             p = nxt_conf_json_print(app->conf.start, application, NULL);
1584             app->conf.length = p - app->conf.start;
1585 
1586             nxt_assert(app->conf.length <= size);
1587 
1588             nxt_debug(task, "application conf \"%V\"", &app->conf);
1589 
1590             prev = nxt_router_app_find(&router->apps, &name);
1591 
1592             if (prev != NULL && nxt_strstr_eq(&app->conf, &prev->conf)) {
1593                 nxt_mp_destroy(app_mp);
1594 
1595                 nxt_queue_remove(&prev->link);
1596                 nxt_queue_insert_tail(&tmcf->previous, &prev->link);
1597 
1598                 ret = nxt_router_apps_hash_add(tmcf->router_conf, prev);
1599                 if (nxt_slow_path(ret != NXT_OK)) {
1600                     goto fail;
1601                 }
1602 
1603                 continue;
1604             }
1605 
1606             apcf.processes = 1;
1607             apcf.max_processes = 1;
1608             apcf.spare_processes = 0;
1609             apcf.timeout = 0;
1610             apcf.idle_timeout = 15000;
1611             apcf.limits_value = NULL;
1612             apcf.processes_value = NULL;
1613             apcf.targets_value = NULL;
1614 
1615             app_joint = nxt_malloc(sizeof(nxt_app_joint_t));
1616             if (nxt_slow_path(app_joint == NULL)) {
1617                 goto app_fail;
1618             }
1619 
1620             nxt_memzero(app_joint, sizeof(nxt_app_joint_t));
1621 
1622             ret = nxt_conf_map_object(mp, application, nxt_router_app_conf,
1623                                       nxt_nitems(nxt_router_app_conf), &apcf);
1624             if (ret != NXT_OK) {
1625                 nxt_alert(task, "application map error");
1626                 goto app_fail;
1627             }
1628 
1629             if (apcf.limits_value != NULL) {
1630 
1631                 if (nxt_conf_type(apcf.limits_value) != NXT_CONF_OBJECT) {
1632                     nxt_alert(task, "application limits is not object");
1633                     goto app_fail;
1634                 }
1635 
1636                 ret = nxt_conf_map_object(mp, apcf.limits_value,
1637                                         nxt_router_app_limits_conf,
1638                                         nxt_nitems(nxt_router_app_limits_conf),
1639                                         &apcf);
1640                 if (ret != NXT_OK) {
1641                     nxt_alert(task, "application limits map error");
1642                     goto app_fail;
1643                 }
1644             }
1645 
1646             if (apcf.processes_value != NULL
1647                 && nxt_conf_type(apcf.processes_value) == NXT_CONF_OBJECT)
1648             {
1649                 ret = nxt_conf_map_object(mp, apcf.processes_value,
1650                                      nxt_router_app_processes_conf,
1651                                      nxt_nitems(nxt_router_app_processes_conf),
1652                                      &apcf);
1653                 if (ret != NXT_OK) {
1654                     nxt_alert(task, "application processes map error");
1655                     goto app_fail;
1656                 }
1657 
1658             } else {
1659                 apcf.max_processes = apcf.processes;
1660                 apcf.spare_processes = apcf.processes;
1661             }
1662 
1663             if (apcf.targets_value != NULL) {
1664                 n = nxt_conf_object_members_count(apcf.targets_value);
1665 
1666                 targets = nxt_mp_get(app_mp, sizeof(nxt_str_t) * n);
1667                 if (nxt_slow_path(targets == NULL)) {
1668                     goto app_fail;
1669                 }
1670 
1671                 next_target = 0;
1672 
1673                 for (i = 0; i < n; i++) {
1674                     (void) nxt_conf_next_object_member(apcf.targets_value,
1675                                                        &target, &next_target);
1676 
1677                     s = nxt_str_dup(app_mp, &targets[i], &target);
1678                     if (nxt_slow_path(s == NULL)) {
1679                         goto app_fail;
1680                     }
1681                 }
1682 
1683             } else {
1684                 targets = NULL;
1685             }
1686 
1687             nxt_debug(task, "application type: %V", &apcf.type);
1688             nxt_debug(task, "application processes: %D", apcf.processes);
1689             nxt_debug(task, "application request timeout: %M", apcf.timeout);
1690 
1691             lang = nxt_app_lang_module(task->thread->runtime, &apcf.type);
1692 
1693             if (lang == NULL) {
1694                 nxt_alert(task, "unknown application type: \"%V\"", &apcf.type);
1695                 goto app_fail;
1696             }
1697 
1698             nxt_debug(task, "application language module: \"%s\"", lang->file);
1699 
1700             ret = nxt_thread_mutex_create(&app->mutex);
1701             if (ret != NXT_OK) {
1702                 goto app_fail;
1703             }
1704 
1705             nxt_queue_init(&app->ports);
1706             nxt_queue_init(&app->spare_ports);
1707             nxt_queue_init(&app->idle_ports);
1708             nxt_queue_init(&app->ack_waiting_req);
1709 
1710             app->name.length = name.length;
1711             nxt_memcpy(app->name.start, name.start, name.length);
1712 
1713             app->type = lang->type;
1714             app->max_processes = apcf.max_processes;
1715             app->spare_processes = apcf.spare_processes;
1716             app->max_pending_processes = apcf.spare_processes
1717                                          ? apcf.spare_processes : 1;
1718             app->timeout = apcf.timeout;
1719             app->idle_timeout = apcf.idle_timeout;
1720 
1721             app->targets = targets;
1722 
1723             engine = task->thread->engine;
1724 
1725             app->engine = engine;
1726 
1727             app->adjust_idle_work.handler = nxt_router_adjust_idle_timer;
1728             app->adjust_idle_work.task = &engine->task;
1729             app->adjust_idle_work.obj = app;
1730 
1731             nxt_queue_insert_tail(&tmcf->apps, &app->link);
1732 
1733             ret = nxt_router_apps_hash_add(tmcf->router_conf, app);
1734             if (nxt_slow_path(ret != NXT_OK)) {
1735                 goto app_fail;
1736             }
1737 
1738             nxt_router_app_use(task, app, 1);
1739 
1740             app->joint = app_joint;
1741 
1742             app_joint->use_count = 1;
1743             app_joint->app = app;
1744 
1745             app_joint->idle_timer.bias = NXT_TIMER_DEFAULT_BIAS;
1746             app_joint->idle_timer.work_queue = &engine->fast_work_queue;
1747             app_joint->idle_timer.handler = nxt_router_app_idle_timeout;
1748             app_joint->idle_timer.task = &engine->task;
1749             app_joint->idle_timer.log = app_joint->idle_timer.task->log;
1750 
1751             app_joint->free_app_work.handler = nxt_router_free_app;
1752             app_joint->free_app_work.task = &engine->task;
1753             app_joint->free_app_work.obj = app_joint;
1754 
1755             port = nxt_port_new(task, NXT_SHARED_PORT_ID, nxt_pid,
1756                                 NXT_PROCESS_APP);
1757             if (nxt_slow_path(port == NULL)) {
1758                 return NXT_ERROR;
1759             }
1760 
1761             ret = nxt_port_socket_init(task, port, 0);
1762             if (nxt_slow_path(ret != NXT_OK)) {
1763                 nxt_port_use(task, port, -1);
1764                 return NXT_ERROR;
1765             }
1766 
1767             ret = nxt_router_app_queue_init(task, port);
1768             if (nxt_slow_path(ret != NXT_OK)) {
1769                 nxt_port_write_close(port);
1770                 nxt_port_read_close(port);
1771                 nxt_port_use(task, port, -1);
1772                 return NXT_ERROR;
1773             }
1774 
1775             nxt_port_write_enable(task, port);
1776             port->app = app;
1777 
1778             app->shared_port = port;
1779 
1780             nxt_thread_mutex_create(&app->outgoing.mutex);
1781         }
1782     }
1783 
1784     routes_conf = nxt_conf_get_path(conf, &routes_path);
1785     if (nxt_fast_path(routes_conf != NULL)) {
1786         routes = nxt_http_routes_create(task, tmcf, routes_conf);
1787         if (nxt_slow_path(routes == NULL)) {
1788             return NXT_ERROR;
1789         }
1790         tmcf->router_conf->routes = routes;
1791     }
1792 
1793     ret = nxt_upstreams_create(task, tmcf, conf);
1794     if (nxt_slow_path(ret != NXT_OK)) {
1795         return ret;
1796     }
1797 
1798     http = nxt_conf_get_path(conf, &http_path);
1799 #if 0
1800     if (http == NULL) {
1801         nxt_alert(task, "no \"http\" block");
1802         return NXT_ERROR;
1803     }
1804 #endif
1805 
1806     websocket = nxt_conf_get_path(conf, &websocket_path);
1807 
1808     listeners = nxt_conf_get_path(conf, &listeners_path);
1809 
1810     if (listeners != NULL) {
1811         next = 0;
1812 
1813         for ( ;; ) {
1814             listener = nxt_conf_next_object_member(listeners, &name, &next);
1815             if (listener == NULL) {
1816                 break;
1817             }
1818 
1819             skcf = nxt_router_socket_conf(task, tmcf, &name);
1820             if (skcf == NULL) {
1821                 goto fail;
1822             }
1823 
1824             nxt_memzero(&lscf, sizeof(lscf));
1825 
1826             ret = nxt_conf_map_object(mp, listener, nxt_router_listener_conf,
1827                                       nxt_nitems(nxt_router_listener_conf),
1828                                       &lscf);
1829             if (ret != NXT_OK) {
1830                 nxt_alert(task, "listener map error");
1831                 goto fail;
1832             }
1833 
1834             nxt_debug(task, "application: %V", &lscf.application);
1835 
1836             // STUB, default values if http block is not defined.
1837             skcf->header_buffer_size = 2048;
1838             skcf->large_header_buffer_size = 8192;
1839             skcf->large_header_buffers = 4;
1840             skcf->discard_unsafe_fields = 1;
1841             skcf->body_buffer_size = 16 * 1024;
1842             skcf->max_body_size = 8 * 1024 * 1024;
1843             skcf->proxy_header_buffer_size = 64 * 1024;
1844             skcf->proxy_buffer_size = 4096;
1845             skcf->proxy_buffers = 256;
1846             skcf->idle_timeout = 180 * 1000;
1847             skcf->header_read_timeout = 30 * 1000;
1848             skcf->body_read_timeout = 30 * 1000;
1849             skcf->send_timeout = 30 * 1000;
1850             skcf->proxy_timeout = 60 * 1000;
1851             skcf->proxy_send_timeout = 30 * 1000;
1852             skcf->proxy_read_timeout = 30 * 1000;
1853 
1854             skcf->websocket_conf.max_frame_size = 1024 * 1024;
1855             skcf->websocket_conf.read_timeout = 60 * 1000;
1856             skcf->websocket_conf.keepalive_interval = 30 * 1000;
1857 
1858             nxt_str_null(&skcf->body_temp_path);
1859 
1860             if (http != NULL) {
1861                 ret = nxt_conf_map_object(mp, http, nxt_router_http_conf,
1862                                           nxt_nitems(nxt_router_http_conf),
1863                                           skcf);
1864                 if (ret != NXT_OK) {
1865                     nxt_alert(task, "http map error");
1866                     goto fail;
1867                 }
1868             }
1869 
1870             if (websocket != NULL) {
1871                 ret = nxt_conf_map_object(mp, websocket,
1872                                           nxt_router_websocket_conf,
1873                                           nxt_nitems(nxt_router_websocket_conf),
1874                                           &skcf->websocket_conf);
1875                 if (ret != NXT_OK) {
1876                     nxt_alert(task, "websocket map error");
1877                     goto fail;
1878                 }
1879             }
1880 
1881             t = &skcf->body_temp_path;
1882 
1883             if (t->length == 0) {
1884                 t->start = (u_char *) task->thread->runtime->tmp;
1885                 t->length = nxt_strlen(t->start);
1886             }
1887 
1888             client_ip_conf = nxt_conf_get_path(listener, &client_ip_path);
1889             ret = nxt_router_conf_process_client_ip(task, tmcf, skcf,
1890                                                     client_ip_conf);
1891             if (nxt_slow_path(ret != NXT_OK)) {
1892                 return NXT_ERROR;
1893             }
1894 
1895 #if (NXT_TLS)
1896             certificate = nxt_conf_get_path(listener, &certificate_path);
1897 
1898             if (certificate != NULL) {
1899                 tls_init = nxt_mp_get(tmcf->mem_pool, sizeof(nxt_tls_init_t));
1900                 if (nxt_slow_path(tls_init == NULL)) {
1901                     return NXT_ERROR;
1902                 }
1903 
1904                 tls_init->cache_size = 0;
1905                 tls_init->timeout = 300;
1906 
1907                 value = nxt_conf_get_path(listener, &conf_cache_path);
1908                 if (value != NULL) {
1909                     tls_init->cache_size = nxt_conf_get_number(value);
1910                 }
1911 
1912                 value = nxt_conf_get_path(listener, &conf_timeout_path);
1913                 if (value != NULL) {
1914                     tls_init->timeout = nxt_conf_get_number(value);
1915                 }
1916 
1917                 tls_init->conf_cmds = nxt_conf_get_path(listener,
1918                                                         &conf_commands_path);
1919 
1920                 tls_init->tickets_conf = nxt_conf_get_path(listener,
1921                                                            &conf_tickets);
1922 
1923                 if (nxt_conf_type(certificate) == NXT_CONF_ARRAY) {
1924                     n = nxt_conf_array_elements_count(certificate);
1925 
1926                     for (i = 0; i < n; i++) {
1927                         value = nxt_conf_get_array_element(certificate, i);
1928 
1929                         nxt_assert(value != NULL);
1930 
1931                         ret = nxt_router_conf_tls_insert(tmcf, value, skcf,
1932                                                          tls_init, i == 0);
1933                         if (nxt_slow_path(ret != NXT_OK)) {
1934                             goto fail;
1935                         }
1936                     }
1937 
1938                 } else {
1939                     /* NXT_CONF_STRING */
1940                     ret = nxt_router_conf_tls_insert(tmcf, certificate, skcf,
1941                                                      tls_init, 1);
1942                     if (nxt_slow_path(ret != NXT_OK)) {
1943                         goto fail;
1944                     }
1945                 }
1946             }
1947 #endif
1948 
1949             skcf->listen->handler = nxt_http_conn_init;
1950             skcf->router_conf = tmcf->router_conf;
1951             skcf->router_conf->count++;
1952 
1953             if (lscf.pass.length != 0) {
1954                 skcf->action = nxt_http_action_create(task, tmcf, &lscf.pass);
1955 
1956             /* COMPATIBILITY: listener application. */
1957             } else if (lscf.application.length > 0) {
1958                 skcf->action = nxt_http_pass_application(task,
1959                                                          tmcf->router_conf,
1960                                                          &lscf.application);
1961             }
1962 
1963             if (nxt_slow_path(skcf->action == NULL)) {
1964                 goto fail;
1965             }
1966         }
1967     }
1968 
1969     ret = nxt_http_routes_resolve(task, tmcf);
1970     if (nxt_slow_path(ret != NXT_OK)) {
1971         goto fail;
1972     }
1973 
1974     value = nxt_conf_get_path(conf, &access_log_path);
1975 
1976     if (value != NULL) {
1977         nxt_conf_get_string(value, &path);
1978 
1979         access_log = router->access_log;
1980 
1981         if (access_log != NULL && nxt_strstr_eq(&path, &access_log->path)) {
1982             nxt_router_access_log_use(&router->lock, access_log);
1983 
1984         } else {
1985             access_log = nxt_malloc(sizeof(nxt_router_access_log_t)
1986                                     + path.length);
1987             if (access_log == NULL) {
1988                 nxt_alert(task, "failed to allocate access log structure");
1989                 goto fail;
1990             }
1991 
1992             access_log->fd = -1;
1993             access_log->handler = &nxt_router_access_log_writer;
1994             access_log->count = 1;
1995 
1996             access_log->path.length = path.length;
1997             access_log->path.start = (u_char *) access_log
1998                                      + sizeof(nxt_router_access_log_t);
1999 
2000             nxt_memcpy(access_log->path.start, path.start, path.length);
2001         }
2002 
2003         tmcf->router_conf->access_log = access_log;
2004     }
2005 
2006     nxt_queue_add(&deleting_sockets, &router->sockets);
2007     nxt_queue_init(&router->sockets);
2008 
2009     return NXT_OK;
2010 
2011 app_fail:
2012 
2013     nxt_mp_destroy(app_mp);
2014 
2015 fail:
2016 
2017     nxt_queue_each(app, &tmcf->apps, nxt_app_t, link) {
2018 
2019         nxt_queue_remove(&app->link);
2020         nxt_thread_mutex_destroy(&app->mutex);
2021         nxt_mp_destroy(app->mem_pool);
2022 
2023     } nxt_queue_loop;
2024 
2025     return NXT_ERROR;
2026 }
2027 
2028 
2029 #if (NXT_TLS)
2030 
2031 static nxt_int_t
2032 nxt_router_conf_tls_insert(nxt_router_temp_conf_t *tmcf,
2033     nxt_conf_value_t *value, nxt_socket_conf_t *skcf,
2034     nxt_tls_init_t *tls_init, nxt_bool_t last)
2035 {
2036     nxt_router_tlssock_t  *tls;
2037 
2038     tls = nxt_mp_get(tmcf->mem_pool, sizeof(nxt_router_tlssock_t));
2039     if (nxt_slow_path(tls == NULL)) {
2040         return NXT_ERROR;
2041     }
2042 
2043     tls->tls_init = tls_init;
2044     tls->socket_conf = skcf;
2045     tls->temp_conf = tmcf;
2046     tls->last = last;
2047     nxt_conf_get_string(value, &tls->name);
2048 
2049     nxt_queue_insert_tail(&tmcf->tls, &tls->link);
2050 
2051     return NXT_OK;
2052 }
2053 
2054 #endif
2055 
2056 
2057 static nxt_int_t
2058 nxt_router_conf_process_static(nxt_task_t *task, nxt_router_conf_t *rtcf,
2059     nxt_conf_value_t *conf)
2060 {
2061     uint32_t          next, i;
2062     nxt_mp_t          *mp;
2063     nxt_str_t         *type, exten, str;
2064     nxt_int_t         ret;
2065     nxt_uint_t        exts;
2066     nxt_conf_value_t  *mtypes_conf, *ext_conf, *value;
2067 
2068     static nxt_str_t  mtypes_path = nxt_string("/mime_types");
2069 
2070     mp = rtcf->mem_pool;
2071 
2072     ret = nxt_http_static_mtypes_init(mp, &rtcf->mtypes_hash);
2073     if (nxt_slow_path(ret != NXT_OK)) {
2074         return NXT_ERROR;
2075     }
2076 
2077     if (conf == NULL) {
2078         return NXT_OK;
2079     }
2080 
2081     mtypes_conf = nxt_conf_get_path(conf, &mtypes_path);
2082 
2083     if (mtypes_conf != NULL) {
2084         next = 0;
2085 
2086         for ( ;; ) {
2087             ext_conf = nxt_conf_next_object_member(mtypes_conf, &str, &next);
2088 
2089             if (ext_conf == NULL) {
2090                 break;
2091             }
2092 
2093             type = nxt_str_dup(mp, NULL, &str);
2094             if (nxt_slow_path(type == NULL)) {
2095                 return NXT_ERROR;
2096             }
2097 
2098             if (nxt_conf_type(ext_conf) == NXT_CONF_STRING) {
2099                 nxt_conf_get_string(ext_conf, &str);
2100 
2101                 if (nxt_slow_path(nxt_str_dup(mp, &exten, &str) == NULL)) {
2102                     return NXT_ERROR;
2103                 }
2104 
2105                 ret = nxt_http_static_mtypes_hash_add(mp, &rtcf->mtypes_hash,
2106                                                       &exten, type);
2107                 if (nxt_slow_path(ret != NXT_OK)) {
2108                     return NXT_ERROR;
2109                 }
2110 
2111                 continue;
2112             }
2113 
2114             exts = nxt_conf_array_elements_count(ext_conf);
2115 
2116             for (i = 0; i < exts; i++) {
2117                 value = nxt_conf_get_array_element(ext_conf, i);
2118 
2119                 nxt_conf_get_string(value, &str);
2120 
2121                 if (nxt_slow_path(nxt_str_dup(mp, &exten, &str) == NULL)) {
2122                     return NXT_ERROR;
2123                 }
2124 
2125                 ret = nxt_http_static_mtypes_hash_add(mp, &rtcf->mtypes_hash,
2126                                                       &exten, type);
2127                 if (nxt_slow_path(ret != NXT_OK)) {
2128                     return NXT_ERROR;
2129                 }
2130             }
2131         }
2132     }
2133 
2134     return NXT_OK;
2135 }
2136 
2137 
2138 static nxt_int_t
2139 nxt_router_conf_process_client_ip(nxt_task_t *task, nxt_router_temp_conf_t *tmcf,
2140     nxt_socket_conf_t *skcf, nxt_conf_value_t *conf)
2141 {
2142     char                        c;
2143     size_t                      i;
2144     nxt_mp_t                    *mp;
2145     uint32_t                    hash;
2146     nxt_str_t                   header;
2147     nxt_conf_value_t            *source_conf, *header_conf, *recursive_conf;
2148     nxt_http_client_ip_t        *client_ip;
2149     nxt_http_route_addr_rule_t  *source;
2150 
2151     static nxt_str_t  header_path = nxt_string("/header");
2152     static nxt_str_t  source_path = nxt_string("/source");
2153     static nxt_str_t  recursive_path = nxt_string("/recursive");
2154 
2155     if (conf == NULL) {
2156         skcf->client_ip = NULL;
2157 
2158         return NXT_OK;
2159     }
2160 
2161     mp = tmcf->router_conf->mem_pool;
2162 
2163     source_conf = nxt_conf_get_path(conf, &source_path);
2164     header_conf = nxt_conf_get_path(conf, &header_path);
2165     recursive_conf = nxt_conf_get_path(conf, &recursive_path);
2166 
2167     if (source_conf == NULL || header_conf == NULL) {
2168         return NXT_ERROR;
2169     }
2170 
2171     client_ip = nxt_mp_zget(mp, sizeof(nxt_http_client_ip_t));
2172     if (nxt_slow_path(client_ip == NULL)) {
2173         return NXT_ERROR;
2174     }
2175 
2176     source = nxt_http_route_addr_rule_create(task, mp, source_conf);
2177     if (nxt_slow_path(source == NULL)) {
2178         return NXT_ERROR;
2179     }
2180 
2181     client_ip->source = source;
2182 
2183     nxt_conf_get_string(header_conf, &header);
2184 
2185     if (recursive_conf != NULL) {
2186         client_ip->recursive = nxt_conf_get_boolean(recursive_conf);
2187     }
2188 
2189     client_ip->header = nxt_str_dup(mp, NULL, &header);
2190     if (nxt_slow_path(client_ip->header == NULL)) {
2191         return NXT_ERROR;
2192     }
2193 
2194     hash = NXT_HTTP_FIELD_HASH_INIT;
2195 
2196     for (i = 0; i < client_ip->header->length; i++) {
2197         c = client_ip->header->start[i];
2198         hash = nxt_http_field_hash_char(hash, nxt_lowcase(c));
2199     }
2200 
2201     hash = nxt_http_field_hash_end(hash) & 0xFFFF;
2202 
2203     client_ip->header_hash = hash;
2204 
2205     skcf->client_ip = client_ip;
2206 
2207     return NXT_OK;
2208 }
2209 
2210 
2211 static nxt_app_t *
2212 nxt_router_app_find(nxt_queue_t *queue, nxt_str_t *name)
2213 {
2214     nxt_app_t  *app;
2215 
2216     nxt_queue_each(app, queue, nxt_app_t, link) {
2217 
2218         if (nxt_strstr_eq(name, &app->name)) {
2219             return app;
2220         }
2221 
2222     } nxt_queue_loop;
2223 
2224     return NULL;
2225 }
2226 
2227 
2228 static nxt_int_t
2229 nxt_router_app_queue_init(nxt_task_t *task, nxt_port_t *port)
2230 {
2231     void       *mem;
2232     nxt_int_t  fd;
2233 
2234     fd = nxt_shm_open(task, sizeof(nxt_app_queue_t));
2235     if (nxt_slow_path(fd == -1)) {
2236         return NXT_ERROR;
2237     }
2238 
2239     mem = nxt_mem_mmap(NULL, sizeof(nxt_app_queue_t),
2240                        PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
2241     if (nxt_slow_path(mem == MAP_FAILED)) {
2242         nxt_fd_close(fd);
2243 
2244         return NXT_ERROR;
2245     }
2246 
2247     nxt_app_queue_init(mem);
2248 
2249     port->queue_fd = fd;
2250     port->queue = mem;
2251 
2252     return NXT_OK;
2253 }
2254 
2255 
2256 static nxt_int_t
2257 nxt_router_port_queue_init(nxt_task_t *task, nxt_port_t *port)
2258 {
2259     void       *mem;
2260     nxt_int_t  fd;
2261 
2262     fd = nxt_shm_open(task, sizeof(nxt_port_queue_t));
2263     if (nxt_slow_path(fd == -1)) {
2264         return NXT_ERROR;
2265     }
2266 
2267     mem = nxt_mem_mmap(NULL, sizeof(nxt_port_queue_t),
2268                        PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
2269     if (nxt_slow_path(mem == MAP_FAILED)) {
2270         nxt_fd_close(fd);
2271 
2272         return NXT_ERROR;
2273     }
2274 
2275     nxt_port_queue_init(mem);
2276 
2277     port->queue_fd = fd;
2278     port->queue = mem;
2279 
2280     return NXT_OK;
2281 }
2282 
2283 
2284 static nxt_int_t
2285 nxt_router_port_queue_map(nxt_task_t *task, nxt_port_t *port, nxt_fd_t fd)
2286 {
2287     void  *mem;
2288 
2289     nxt_assert(fd != -1);
2290 
2291     mem = nxt_mem_mmap(NULL, sizeof(nxt_port_queue_t),
2292                        PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
2293     if (nxt_slow_path(mem == MAP_FAILED)) {
2294 
2295         return NXT_ERROR;
2296     }
2297 
2298     port->queue = mem;
2299 
2300     return NXT_OK;
2301 }
2302 
2303 
2304 static const nxt_lvlhsh_proto_t  nxt_router_apps_hash_proto  nxt_aligned(64) = {
2305     NXT_LVLHSH_DEFAULT,
2306     nxt_router_apps_hash_test,
2307     nxt_mp_lvlhsh_alloc,
2308     nxt_mp_lvlhsh_free,
2309 };
2310 
2311 
2312 static nxt_int_t
2313 nxt_router_apps_hash_test(nxt_lvlhsh_query_t *lhq, void *data)
2314 {
2315     nxt_app_t  *app;
2316 
2317     app = data;
2318 
2319     return nxt_strstr_eq(&lhq->key, &app->name) ? NXT_OK : NXT_DECLINED;
2320 }
2321 
2322 
2323 static nxt_int_t
2324 nxt_router_apps_hash_add(nxt_router_conf_t *rtcf, nxt_app_t *app)
2325 {
2326     nxt_lvlhsh_query_t  lhq;
2327 
2328     lhq.key_hash = nxt_djb_hash(app->name.start, app->name.length);
2329     lhq.replace = 0;
2330     lhq.key = app->name;
2331     lhq.value = app;
2332     lhq.proto = &nxt_router_apps_hash_proto;
2333     lhq.pool = rtcf->mem_pool;
2334 
2335     switch (nxt_lvlhsh_insert(&rtcf->apps_hash, &lhq)) {
2336 
2337     case NXT_OK:
2338         return NXT_OK;
2339 
2340     case NXT_DECLINED:
2341         nxt_thread_log_alert("router app hash adding failed: "
2342                              "\"%V\" is already in hash", &lhq.key);
2343         /* Fall through. */
2344     default:
2345         return NXT_ERROR;
2346     }
2347 }
2348 
2349 
2350 static nxt_app_t *
2351 nxt_router_apps_hash_get(nxt_router_conf_t *rtcf, nxt_str_t *name)
2352 {
2353     nxt_lvlhsh_query_t  lhq;
2354 
2355     lhq.key_hash = nxt_djb_hash(name->start, name->length);
2356     lhq.key = *name;
2357     lhq.proto = &nxt_router_apps_hash_proto;
2358 
2359     if (nxt_lvlhsh_find(&rtcf->apps_hash, &lhq) != NXT_OK) {
2360         return NULL;
2361     }
2362 
2363     return lhq.value;
2364 }
2365 
2366 
2367 static void
2368 nxt_router_apps_hash_use(nxt_task_t *task, nxt_router_conf_t *rtcf, int i)
2369 {
2370     nxt_app_t          *app;
2371     nxt_lvlhsh_each_t  lhe;
2372 
2373     nxt_lvlhsh_each_init(&lhe, &nxt_router_apps_hash_proto);
2374 
2375     for ( ;; ) {
2376         app = nxt_lvlhsh_each(&rtcf->apps_hash, &lhe);
2377 
2378         if (app == NULL) {
2379             break;
2380         }
2381 
2382         nxt_router_app_use(task, app, i);
2383     }
2384 }
2385 
2386 
2387 typedef struct {
2388     nxt_app_t  *app;
2389     nxt_int_t  target;
2390 } nxt_http_app_conf_t;
2391 
2392 
2393 nxt_int_t
2394 nxt_router_application_init(nxt_router_conf_t *rtcf, nxt_str_t *name,
2395     nxt_str_t *target, nxt_http_action_t *action)
2396 {
2397     nxt_app_t            *app;
2398     nxt_str_t            *targets;
2399     nxt_uint_t           i;
2400     nxt_http_app_conf_t  *conf;
2401 
2402     app = nxt_router_apps_hash_get(rtcf, name);
2403     if (app == NULL) {
2404         return NXT_DECLINED;
2405     }
2406 
2407     conf = nxt_mp_get(rtcf->mem_pool, sizeof(nxt_http_app_conf_t));
2408     if (nxt_slow_path(conf == NULL)) {
2409         return NXT_ERROR;
2410     }
2411 
2412     action->handler = nxt_http_application_handler;
2413     action->u.conf = conf;
2414 
2415     conf->app = app;
2416 
2417     if (target != NULL && target->length != 0) {
2418         targets = app->targets;
2419 
2420         for (i = 0; !nxt_strstr_eq(target, &targets[i]); i++);
2421 
2422         conf->target = i;
2423 
2424     } else {
2425         conf->target = 0;
2426     }
2427 
2428     return NXT_OK;
2429 }
2430 
2431 
2432 static nxt_socket_conf_t *
2433 nxt_router_socket_conf(nxt_task_t *task, nxt_router_temp_conf_t *tmcf,
2434     nxt_str_t *name)
2435 {
2436     size_t               size;
2437     nxt_int_t            ret;
2438     nxt_bool_t           wildcard;
2439     nxt_sockaddr_t       *sa;
2440     nxt_socket_conf_t    *skcf;
2441     nxt_listen_socket_t  *ls;
2442 
2443     sa = nxt_sockaddr_parse(tmcf->mem_pool, name);
2444     if (nxt_slow_path(sa == NULL)) {
2445         nxt_alert(task, "invalid listener \"%V\"", name);
2446         return NULL;
2447     }
2448 
2449     sa->type = SOCK_STREAM;
2450 
2451     nxt_debug(task, "router listener: \"%*s\"",
2452               (size_t) sa->length, nxt_sockaddr_start(sa));
2453 
2454     skcf = nxt_mp_zget(tmcf->router_conf->mem_pool, sizeof(nxt_socket_conf_t));
2455     if (nxt_slow_path(skcf == NULL)) {
2456         return NULL;
2457     }
2458 
2459     size = nxt_sockaddr_size(sa);
2460 
2461     ret = nxt_router_listen_socket_find(tmcf, skcf, sa);
2462 
2463     if (ret != NXT_OK) {
2464 
2465         ls = nxt_zalloc(sizeof(nxt_listen_socket_t) + size);
2466         if (nxt_slow_path(ls == NULL)) {
2467             return NULL;
2468         }
2469 
2470         skcf->listen = ls;
2471 
2472         ls->sockaddr = nxt_pointer_to(ls, sizeof(nxt_listen_socket_t));
2473         nxt_memcpy(ls->sockaddr, sa, size);
2474 
2475         nxt_listen_socket_remote_size(ls);
2476 
2477         ls->socket = -1;
2478         ls->backlog = NXT_LISTEN_BACKLOG;
2479         ls->flags = NXT_NONBLOCK;
2480         ls->read_after_accept = 1;
2481     }
2482 
2483     switch (sa->u.sockaddr.sa_family) {
2484 #if (NXT_HAVE_UNIX_DOMAIN)
2485     case AF_UNIX:
2486         wildcard = 0;
2487         break;
2488 #endif
2489 #if (NXT_INET6)
2490     case AF_INET6:
2491         wildcard = IN6_IS_ADDR_UNSPECIFIED(&sa->u.sockaddr_in6.sin6_addr);
2492         break;
2493 #endif
2494     case AF_INET:
2495     default:
2496         wildcard = (sa->u.sockaddr_in.sin_addr.s_addr == INADDR_ANY);
2497         break;
2498     }
2499 
2500     if (!wildcard) {
2501         skcf->sockaddr = nxt_mp_zget(tmcf->router_conf->mem_pool, size);
2502         if (nxt_slow_path(skcf->sockaddr == NULL)) {
2503             return NULL;
2504         }
2505 
2506         nxt_memcpy(skcf->sockaddr, sa, size);
2507     }
2508 
2509     return skcf;
2510 }
2511 
2512 
2513 static nxt_int_t
2514 nxt_router_listen_socket_find(nxt_router_temp_conf_t *tmcf,
2515     nxt_socket_conf_t *nskcf, nxt_sockaddr_t *sa)
2516 {
2517     nxt_router_t       *router;
2518     nxt_queue_link_t   *qlk;
2519     nxt_socket_conf_t  *skcf;
2520 
2521     router = tmcf->router_conf->router;
2522 
2523     for (qlk = nxt_queue_first(&router->sockets);
2524          qlk != nxt_queue_tail(&router->sockets);
2525          qlk = nxt_queue_next(qlk))
2526     {
2527         skcf = nxt_queue_link_data(qlk, nxt_socket_conf_t, link);
2528 
2529         if (nxt_sockaddr_cmp(skcf->listen->sockaddr, sa)) {
2530             nskcf->listen = skcf->listen;
2531 
2532             nxt_queue_remove(qlk);
2533             nxt_queue_insert_tail(&keeping_sockets, qlk);
2534 
2535             nxt_queue_insert_tail(&updating_sockets, &nskcf->link);
2536 
2537             return NXT_OK;
2538         }
2539     }
2540 
2541     nxt_queue_insert_tail(&pending_sockets, &nskcf->link);
2542 
2543     return NXT_DECLINED;
2544 }
2545 
2546 
2547 static void
2548 nxt_router_listen_socket_rpc_create(nxt_task_t *task,
2549     nxt_router_temp_conf_t *tmcf, nxt_socket_conf_t *skcf)
2550 {
2551     size_t            size;
2552     uint32_t          stream;
2553     nxt_int_t         ret;
2554     nxt_buf_t         *b;
2555     nxt_port_t        *main_port, *router_port;
2556     nxt_runtime_t     *rt;
2557     nxt_socket_rpc_t  *rpc;
2558 
2559     rpc = nxt_mp_alloc(tmcf->mem_pool, sizeof(nxt_socket_rpc_t));
2560     if (rpc == NULL) {
2561         goto fail;
2562     }
2563 
2564     rpc->socket_conf = skcf;
2565     rpc->temp_conf = tmcf;
2566 
2567     size = nxt_sockaddr_size(skcf->listen->sockaddr);
2568 
2569     b = nxt_buf_mem_alloc(tmcf->mem_pool, size, 0);
2570     if (b == NULL) {
2571         goto fail;
2572     }
2573 
2574     b->completion_handler = nxt_buf_dummy_completion;
2575 
2576     b->mem.free = nxt_cpymem(b->mem.free, skcf->listen->sockaddr, size);
2577 
2578     rt = task->thread->runtime;
2579     main_port = rt->port_by_type[NXT_PROCESS_MAIN];
2580     router_port = rt->port_by_type[NXT_PROCESS_ROUTER];
2581 
2582     stream = nxt_port_rpc_register_handler(task, router_port,
2583                                            nxt_router_listen_socket_ready,
2584                                            nxt_router_listen_socket_error,
2585                                            main_port->pid, rpc);
2586     if (nxt_slow_path(stream == 0)) {
2587         goto fail;
2588     }
2589 
2590     ret = nxt_port_socket_write(task, main_port, NXT_PORT_MSG_SOCKET, -1,
2591                                 stream, router_port->id, b);
2592 
2593     if (nxt_slow_path(ret != NXT_OK)) {
2594         nxt_port_rpc_cancel(task, router_port, stream);
2595         goto fail;
2596     }
2597 
2598     return;
2599 
2600 fail:
2601 
2602     nxt_router_conf_error(task, tmcf);
2603 }
2604 
2605 
2606 static void
2607 nxt_router_listen_socket_ready(nxt_task_t *task, nxt_port_recv_msg_t *msg,
2608     void *data)
2609 {
2610     nxt_int_t         ret;
2611     nxt_socket_t      s;
2612     nxt_socket_rpc_t  *rpc;
2613 
2614     rpc = data;
2615 
2616     s = msg->fd[0];
2617 
2618     ret = nxt_socket_nonblocking(task, s);
2619     if (nxt_slow_path(ret != NXT_OK)) {
2620         goto fail;
2621     }
2622 
2623     nxt_socket_defer_accept(task, s, rpc->socket_conf->listen->sockaddr);
2624 
2625     ret = nxt_listen_socket(task, s, NXT_LISTEN_BACKLOG);
2626     if (nxt_slow_path(ret != NXT_OK)) {
2627         goto fail;
2628     }
2629 
2630     rpc->socket_conf->listen->socket = s;
2631 
2632     nxt_work_queue_add(&task->thread->engine->fast_work_queue,
2633                        nxt_router_conf_apply, task, rpc->temp_conf, NULL);
2634 
2635     return;
2636 
2637 fail:
2638 
2639     nxt_socket_close(task, s);
2640 
2641     nxt_router_conf_error(task, rpc->temp_conf);
2642 }
2643 
2644 
2645 static void
2646 nxt_router_listen_socket_error(nxt_task_t *task, nxt_port_recv_msg_t *msg,
2647     void *data)
2648 {
2649     nxt_socket_rpc_t        *rpc;
2650     nxt_router_temp_conf_t  *tmcf;
2651 
2652     rpc = data;
2653     tmcf = rpc->temp_conf;
2654 
2655 #if 0
2656     u_char                  *p;
2657     size_t                  size;
2658     uint8_t                 error;
2659     nxt_buf_t               *in, *out;
2660     nxt_sockaddr_t          *sa;
2661 
2662     static nxt_str_t  socket_errors[] = {
2663         nxt_string("ListenerSystem"),
2664         nxt_string("ListenerNoIPv6"),
2665         nxt_string("ListenerPort"),
2666         nxt_string("ListenerInUse"),
2667         nxt_string("ListenerNoAddress"),
2668         nxt_string("ListenerNoAccess"),
2669         nxt_string("ListenerPath"),
2670     };
2671 
2672     sa = rpc->socket_conf->listen->sockaddr;
2673 
2674     in = nxt_buf_chk_make_plain(tmcf->mem_pool, msg->buf, msg->size);
2675 
2676     if (nxt_slow_path(in == NULL)) {
2677         return;
2678     }
2679 
2680     p = in->mem.pos;
2681 
2682     error = *p++;
2683 
2684     size = nxt_length("listen socket error: ")
2685            + nxt_length("{listener: \"\", code:\"\", message: \"\"}")
2686            + sa->length + socket_errors[error].length + (in->mem.free - p);
2687 
2688     out = nxt_buf_mem_alloc(tmcf->mem_pool, size, 0);
2689     if (nxt_slow_path(out == NULL)) {
2690         return;
2691     }
2692 
2693     out->mem.free = nxt_sprintf(out->mem.free, out->mem.end,
2694                         "listen socket error: "
2695                         "{listener: \"%*s\", code:\"%V\", message: \"%*s\"}",
2696                         (size_t) sa->length, nxt_sockaddr_start(sa),
2697                         &socket_errors[error], in->mem.free - p, p);
2698 
2699     nxt_debug(task, "%*s", out->mem.free - out->mem.pos, out->mem.pos);
2700 #endif
2701 
2702     nxt_router_conf_error(task, tmcf);
2703 }
2704 
2705 
2706 #if (NXT_TLS)
2707 
2708 static void
2709 nxt_router_tls_rpc_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg,
2710     void *data)
2711 {
2712     nxt_mp_t                *mp;
2713     nxt_int_t               ret;
2714     nxt_tls_conf_t          *tlscf;
2715     nxt_router_tlssock_t    *tls;
2716     nxt_tls_bundle_conf_t   *bundle;
2717     nxt_router_temp_conf_t  *tmcf;
2718 
2719     nxt_debug(task, "tls rpc handler");
2720 
2721     tls = data;
2722     tmcf = tls->temp_conf;
2723 
2724     if (msg == NULL || msg->port_msg.type == _NXT_PORT_MSG_RPC_ERROR) {
2725         goto fail;
2726     }
2727 
2728     mp = tmcf->router_conf->mem_pool;
2729 
2730     if (tls->socket_conf->tls == NULL){
2731         tlscf = nxt_mp_zget(mp, sizeof(nxt_tls_conf_t));
2732         if (nxt_slow_path(tlscf == NULL)) {
2733             goto fail;
2734         }
2735 
2736         tlscf->no_wait_shutdown = 1;
2737         tls->socket_conf->tls = tlscf;
2738 
2739     } else {
2740         tlscf = tls->socket_conf->tls;
2741     }
2742 
2743     tls->tls_init->conf = tlscf;
2744 
2745     bundle = nxt_mp_get(mp, sizeof(nxt_tls_bundle_conf_t));
2746     if (nxt_slow_path(bundle == NULL)) {
2747         goto fail;
2748     }
2749 
2750     if (nxt_slow_path(nxt_str_dup(mp, &bundle->name, &tls->name) == NULL)) {
2751         goto fail;
2752     }
2753 
2754     bundle->chain_file = msg->fd[0];
2755     bundle->next = tlscf->bundle;
2756     tlscf->bundle = bundle;
2757 
2758     ret = task->thread->runtime->tls->server_init(task, mp, tls->tls_init,
2759                                                   tls->last);
2760     if (nxt_slow_path(ret != NXT_OK)) {
2761         goto fail;
2762     }
2763 
2764     nxt_work_queue_add(&task->thread->engine->fast_work_queue,
2765                        nxt_router_conf_apply, task, tmcf, NULL);
2766     return;
2767 
2768 fail:
2769 
2770     nxt_router_conf_error(task, tmcf);
2771 }
2772 
2773 #endif
2774 
2775 
2776 static void
2777 nxt_router_app_rpc_create(nxt_task_t *task,
2778     nxt_router_temp_conf_t *tmcf, nxt_app_t *app)
2779 {
2780     size_t         size;
2781     uint32_t       stream;
2782     nxt_int_t      ret;
2783     nxt_buf_t      *b;
2784     nxt_port_t     *router_port, *dport;
2785     nxt_runtime_t  *rt;
2786     nxt_app_rpc_t  *rpc;
2787 
2788     rt = task->thread->runtime;
2789 
2790     dport = app->proto_port;
2791 
2792     if (dport == NULL) {
2793         nxt_debug(task, "app '%V' prototype prefork", &app->name);
2794 
2795         size = app->name.length + 1 + app->conf.length;
2796 
2797         b = nxt_buf_mem_alloc(tmcf->mem_pool, size, 0);
2798         if (nxt_slow_path(b == NULL)) {
2799             goto fail;
2800         }
2801 
2802         b->completion_handler = nxt_buf_dummy_completion;
2803 
2804         nxt_buf_cpystr(b, &app->name);
2805         *b->mem.free++ = '\0';
2806         nxt_buf_cpystr(b, &app->conf);
2807 
2808         dport = rt->port_by_type[NXT_PROCESS_MAIN];
2809 
2810     } else {
2811         nxt_debug(task, "app '%V' prefork", &app->name);
2812 
2813         b = NULL;
2814     }
2815 
2816     router_port = rt->port_by_type[NXT_PROCESS_ROUTER];
2817 
2818     rpc = nxt_port_rpc_register_handler_ex(task, router_port,
2819                                            nxt_router_app_prefork_ready,
2820                                            nxt_router_app_prefork_error,
2821                                            sizeof(nxt_app_rpc_t));
2822     if (nxt_slow_path(rpc == NULL)) {
2823         goto fail;
2824     }
2825 
2826     rpc->app = app;
2827     rpc->temp_conf = tmcf;
2828     rpc->proto = (b != NULL);
2829 
2830     stream = nxt_port_rpc_ex_stream(rpc);
2831 
2832     ret = nxt_port_socket_write(task, dport,
2833                                 NXT_PORT_MSG_START_PROCESS,
2834                                 -1, stream, router_port->id, b);
2835     if (nxt_slow_path(ret != NXT_OK)) {
2836         nxt_port_rpc_cancel(task, router_port, stream);
2837         goto fail;
2838     }
2839 
2840     if (b == NULL) {
2841         nxt_port_rpc_ex_set_peer(task, router_port, rpc, dport->pid);
2842 
2843         app->pending_processes++;
2844     }
2845 
2846     return;
2847 
2848 fail:
2849 
2850     nxt_router_conf_error(task, tmcf);
2851 }
2852 
2853 
2854 static void
2855 nxt_router_app_prefork_ready(nxt_task_t *task, nxt_port_recv_msg_t *msg,
2856     void *data)
2857 {
2858     nxt_app_t           *app;
2859     nxt_port_t          *port;
2860     nxt_app_rpc_t       *rpc;
2861     nxt_event_engine_t  *engine;
2862 
2863     rpc = data;
2864     app = rpc->app;
2865 
2866     port = msg->u.new_port;
2867 
2868     nxt_assert(port != NULL);
2869     nxt_assert(port->id == 0);
2870 
2871     if (rpc->proto) {
2872         nxt_assert(app->proto_port == NULL);
2873         nxt_assert(port->type == NXT_PROCESS_PROTOTYPE);
2874 
2875         nxt_port_inc_use(port);
2876 
2877         app->proto_port = port;
2878         port->app = app;
2879 
2880         nxt_router_app_rpc_create(task, rpc->temp_conf, app);
2881 
2882         return;
2883     }
2884 
2885     nxt_assert(port->type == NXT_PROCESS_APP);
2886 
2887     port->app = app;
2888     port->main_app_port = port;
2889 
2890     app->pending_processes--;
2891     app->processes++;
2892     app->idle_processes++;
2893 
2894     engine = task->thread->engine;
2895 
2896     nxt_queue_insert_tail(&app->ports, &port->app_link);
2897     nxt_queue_insert_tail(&app->spare_ports, &port->idle_link);
2898 
2899     nxt_debug(task, "app '%V' move new port %PI:%d to spare_ports",
2900               &app->name, port->pid, port->id);
2901 
2902     nxt_port_hash_add(&app->port_hash, port);
2903     app->port_hash_count++;
2904 
2905     port->idle_start = 0;
2906 
2907     nxt_port_inc_use(port);
2908 
2909     nxt_router_app_shared_port_send(task, port);
2910 
2911     nxt_work_queue_add(&engine->fast_work_queue,
2912                        nxt_router_conf_apply, task, rpc->temp_conf, NULL);
2913 }
2914 
2915 
2916 static void
2917 nxt_router_app_prefork_error(nxt_task_t *task, nxt_port_recv_msg_t *msg,
2918     void *data)
2919 {
2920     nxt_app_t               *app;
2921     nxt_app_rpc_t           *rpc;
2922     nxt_router_temp_conf_t  *tmcf;
2923 
2924     rpc = data;
2925     app = rpc->app;
2926     tmcf = rpc->temp_conf;
2927 
2928     if (rpc->proto) {
2929         nxt_log(task, NXT_LOG_WARN, "failed to start prototype \"%V\"",
2930                 &app->name);
2931 
2932     } else {
2933         nxt_log(task, NXT_LOG_WARN, "failed to start application \"%V\"",
2934                 &app->name);
2935 
2936         app->pending_processes--;
2937     }
2938 
2939     nxt_router_conf_error(task, tmcf);
2940 }
2941 
2942 
2943 static nxt_int_t
2944 nxt_router_engines_create(nxt_task_t *task, nxt_router_t *router,
2945     nxt_router_temp_conf_t *tmcf, const nxt_event_interface_t *interface)
2946 {
2947     nxt_int_t                 ret;
2948     nxt_uint_t                n, threads;
2949     nxt_queue_link_t          *qlk;
2950     nxt_router_engine_conf_t  *recf;
2951 
2952     threads = tmcf->router_conf->threads;
2953 
2954     tmcf->engines = nxt_array_create(tmcf->mem_pool, threads,
2955                                      sizeof(nxt_router_engine_conf_t));
2956     if (nxt_slow_path(tmcf->engines == NULL)) {
2957         return NXT_ERROR;
2958     }
2959 
2960     n = 0;
2961 
2962     for (qlk = nxt_queue_first(&router->engines);
2963          qlk != nxt_queue_tail(&router->engines);
2964          qlk = nxt_queue_next(qlk))
2965     {
2966         recf = nxt_array_zero_add(tmcf->engines);
2967         if (nxt_slow_path(recf == NULL)) {
2968             return NXT_ERROR;
2969         }
2970 
2971         recf->engine = nxt_queue_link_data(qlk, nxt_event_engine_t, link0);
2972 
2973         if (n < threads) {
2974             recf->action = NXT_ROUTER_ENGINE_KEEP;
2975             ret = nxt_router_engine_conf_update(tmcf, recf);
2976 
2977         } else {
2978             recf->action = NXT_ROUTER_ENGINE_DELETE;
2979             ret = nxt_router_engine_conf_delete(tmcf, recf);
2980         }
2981 
2982         if (nxt_slow_path(ret != NXT_OK)) {
2983             return ret;
2984         }
2985 
2986         n++;
2987     }
2988 
2989     tmcf->new_threads = n;
2990 
2991     while (n < threads) {
2992         recf = nxt_array_zero_add(tmcf->engines);
2993         if (nxt_slow_path(recf == NULL)) {
2994             return NXT_ERROR;
2995         }
2996 
2997         recf->action = NXT_ROUTER_ENGINE_ADD;
2998 
2999         recf->engine = nxt_event_engine_create(task, interface, NULL, 0, 0);
3000         if (nxt_slow_path(recf->engine == NULL)) {
3001             return NXT_ERROR;
3002         }
3003 
3004         ret = nxt_router_engine_conf_create(tmcf, recf);
3005         if (nxt_slow_path(ret != NXT_OK)) {
3006             return ret;
3007         }
3008 
3009         n++;
3010     }
3011 
3012     return NXT_OK;
3013 }
3014 
3015 
3016 static nxt_int_t
3017 nxt_router_engine_conf_create(nxt_router_temp_conf_t *tmcf,
3018     nxt_router_engine_conf_t *recf)
3019 {
3020     nxt_int_t  ret;
3021 
3022     ret = nxt_router_engine_joints_create(tmcf, recf, &creating_sockets,
3023                                           nxt_router_listen_socket_create);
3024     if (nxt_slow_path(ret != NXT_OK)) {
3025         return ret;
3026     }
3027 
3028     ret = nxt_router_engine_joints_create(tmcf, recf, &updating_sockets,
3029                                           nxt_router_listen_socket_create);
3030     if (nxt_slow_path(ret != NXT_OK)) {
3031         return ret;
3032     }
3033 
3034     return ret;
3035 }
3036 
3037 
3038 static nxt_int_t
3039 nxt_router_engine_conf_update(nxt_router_temp_conf_t *tmcf,
3040     nxt_router_engine_conf_t *recf)
3041 {
3042     nxt_int_t  ret;
3043 
3044     ret = nxt_router_engine_joints_create(tmcf, recf, &creating_sockets,
3045                                           nxt_router_listen_socket_create);
3046     if (nxt_slow_path(ret != NXT_OK)) {
3047         return ret;
3048     }
3049 
3050     ret = nxt_router_engine_joints_create(tmcf, recf, &updating_sockets,
3051                                           nxt_router_listen_socket_update);
3052     if (nxt_slow_path(ret != NXT_OK)) {
3053         return ret;
3054     }
3055 
3056     ret = nxt_router_engine_joints_delete(tmcf, recf, &deleting_sockets);
3057     if (nxt_slow_path(ret != NXT_OK)) {
3058         return ret;
3059     }
3060 
3061     return ret;
3062 }
3063 
3064 
3065 static nxt_int_t
3066 nxt_router_engine_conf_delete(nxt_router_temp_conf_t *tmcf,
3067     nxt_router_engine_conf_t *recf)
3068 {
3069     nxt_int_t  ret;
3070 
3071     ret = nxt_router_engine_quit(tmcf, recf);
3072     if (nxt_slow_path(ret != NXT_OK)) {
3073         return ret;
3074     }
3075 
3076     ret = nxt_router_engine_joints_delete(tmcf, recf, &updating_sockets);
3077     if (nxt_slow_path(ret != NXT_OK)) {
3078         return ret;
3079     }
3080 
3081     return nxt_router_engine_joints_delete(tmcf, recf, &deleting_sockets);
3082 }
3083 
3084 
3085 static nxt_int_t
3086 nxt_router_engine_joints_create(nxt_router_temp_conf_t *tmcf,
3087     nxt_router_engine_conf_t *recf, nxt_queue_t *sockets,
3088     nxt_work_handler_t handler)
3089 {
3090     nxt_int_t                ret;
3091     nxt_joint_job_t          *job;
3092     nxt_queue_link_t         *qlk;
3093     nxt_socket_conf_t        *skcf;
3094     nxt_socket_conf_joint_t  *joint;
3095 
3096     for (qlk = nxt_queue_first(sockets);
3097          qlk != nxt_queue_tail(sockets);
3098          qlk = nxt_queue_next(qlk))
3099     {
3100         job = nxt_mp_get(tmcf->mem_pool, sizeof(nxt_joint_job_t));
3101         if (nxt_slow_path(job == NULL)) {
3102             return NXT_ERROR;
3103         }
3104 
3105         job->work.next = recf->jobs;
3106         recf->jobs = &job->work;
3107 
3108         job->task = tmcf->engine->task;
3109         job->work.handler = handler;
3110         job->work.task = &job->task;
3111         job->work.obj = job;
3112         job->tmcf = tmcf;
3113 
3114         tmcf->count++;
3115 
3116         joint = nxt_mp_alloc(tmcf->router_conf->mem_pool,
3117                              sizeof(nxt_socket_conf_joint_t));
3118         if (nxt_slow_path(joint == NULL)) {
3119             return NXT_ERROR;
3120         }
3121 
3122         job->work.data = joint;
3123 
3124         ret = nxt_upstreams_joint_create(tmcf, &joint->upstreams);
3125         if (nxt_slow_path(ret != NXT_OK)) {
3126             return ret;
3127         }
3128 
3129         joint->count = 1;
3130 
3131         skcf = nxt_queue_link_data(qlk, nxt_socket_conf_t, link);
3132         skcf->count++;
3133         joint->socket_conf = skcf;
3134 
3135         joint->engine = recf->engine;
3136     }
3137 
3138     return NXT_OK;
3139 }
3140 
3141 
3142 static nxt_int_t
3143 nxt_router_engine_quit(nxt_router_temp_conf_t *tmcf,
3144     nxt_router_engine_conf_t *recf)
3145 {
3146     nxt_joint_job_t  *job;
3147 
3148     job = nxt_mp_get(tmcf->mem_pool, sizeof(nxt_joint_job_t));
3149     if (nxt_slow_path(job == NULL)) {
3150         return NXT_ERROR;
3151     }
3152 
3153     job->work.next = recf->jobs;
3154     recf->jobs = &job->work;
3155 
3156     job->task = tmcf->engine->task;
3157     job->work.handler = nxt_router_worker_thread_quit;
3158     job->work.task = &job->task;
3159     job->work.obj = NULL;
3160     job->work.data = NULL;
3161     job->tmcf = NULL;
3162 
3163     return NXT_OK;
3164 }
3165 
3166 
3167 static nxt_int_t
3168 nxt_router_engine_joints_delete(nxt_router_temp_conf_t *tmcf,
3169     nxt_router_engine_conf_t *recf, nxt_queue_t *sockets)
3170 {
3171     nxt_joint_job_t   *job;
3172