xref: /unit/src/nxt_router.c (revision 2077:624e51cfe97a)
1 
2 /*
3  * Copyright (C) Igor Sysoev
4  * Copyright (C) Valentin V. Bartenev
5  * Copyright (C) NGINX, Inc.
6  */
7 
8 #include <nxt_router.h>
9 #include <nxt_conf.h>
10 #if (NXT_TLS)
11 #include <nxt_cert.h>
12 #endif
13 #include <nxt_http.h>
14 #include <nxt_port_memory_int.h>
15 #include <nxt_unit_request.h>
16 #include <nxt_unit_response.h>
17 #include <nxt_router_request.h>
18 #include <nxt_app_queue.h>
19 #include <nxt_port_queue.h>
20 
21 #define NXT_SHARED_PORT_ID  0xFFFFu
22 
23 typedef struct {
24     nxt_str_t         type;
25     uint32_t          processes;
26     uint32_t          max_processes;
27     uint32_t          spare_processes;
28     nxt_msec_t        timeout;
29     nxt_msec_t        idle_timeout;
30     nxt_conf_value_t  *limits_value;
31     nxt_conf_value_t  *processes_value;
32     nxt_conf_value_t  *targets_value;
33 } nxt_router_app_conf_t;
34 
35 
36 typedef struct {
37     nxt_str_t         pass;
38     nxt_str_t         application;
39 } nxt_router_listener_conf_t;
40 
41 
42 #if (NXT_TLS)
43 
44 typedef struct {
45     nxt_str_t               name;
46     nxt_socket_conf_t       *socket_conf;
47     nxt_router_temp_conf_t  *temp_conf;
48     nxt_tls_init_t          *tls_init;
49     nxt_bool_t              last;
50 
51     nxt_queue_link_t        link;  /* for nxt_socket_conf_t.tls */
52 } nxt_router_tlssock_t;
53 
54 #endif
55 
56 
57 typedef struct {
58     nxt_str_t               *name;
59     nxt_socket_conf_t       *socket_conf;
60     nxt_router_temp_conf_t  *temp_conf;
61     nxt_bool_t              last;
62 } nxt_socket_rpc_t;
63 
64 
65 typedef struct {
66     nxt_app_t               *app;
67     nxt_router_temp_conf_t  *temp_conf;
68     uint8_t                 proto;  /* 1 bit */
69 } nxt_app_rpc_t;
70 
71 
72 typedef struct {
73     nxt_app_joint_t         *app_joint;
74     uint32_t                generation;
75     uint8_t                 proto;  /* 1 bit */
76 } nxt_app_joint_rpc_t;
77 
78 
79 static nxt_int_t nxt_router_prefork(nxt_task_t *task, nxt_process_t *process,
80     nxt_mp_t *mp);
81 static nxt_int_t nxt_router_start(nxt_task_t *task, nxt_process_data_t *data);
82 static void nxt_router_greet_controller(nxt_task_t *task,
83     nxt_port_t *controller_port);
84 
85 static nxt_int_t nxt_router_start_app_process(nxt_task_t *task, nxt_app_t *app);
86 
87 static void nxt_router_new_port_handler(nxt_task_t *task,
88     nxt_port_recv_msg_t *msg);
89 static void nxt_router_conf_data_handler(nxt_task_t *task,
90     nxt_port_recv_msg_t *msg);
91 static void nxt_router_app_restart_handler(nxt_task_t *task,
92     nxt_port_recv_msg_t *msg);
93 static void nxt_router_remove_pid_handler(nxt_task_t *task,
94     nxt_port_recv_msg_t *msg);
95 static void nxt_router_access_log_reopen_handler(nxt_task_t *task,
96     nxt_port_recv_msg_t *msg);
97 
98 static nxt_router_temp_conf_t *nxt_router_temp_conf(nxt_task_t *task);
99 static void nxt_router_conf_apply(nxt_task_t *task, void *obj, void *data);
100 static void nxt_router_conf_ready(nxt_task_t *task,
101     nxt_router_temp_conf_t *tmcf);
102 static void nxt_router_conf_error(nxt_task_t *task,
103     nxt_router_temp_conf_t *tmcf);
104 static void nxt_router_conf_send(nxt_task_t *task,
105     nxt_router_temp_conf_t *tmcf, nxt_port_msg_type_t type);
106 
107 static nxt_int_t nxt_router_conf_create(nxt_task_t *task,
108     nxt_router_temp_conf_t *tmcf, u_char *start, u_char *end);
109 static nxt_int_t nxt_router_conf_process_static(nxt_task_t *task,
110     nxt_router_conf_t *rtcf, nxt_conf_value_t *conf);
111 static nxt_int_t nxt_router_conf_process_client_ip(nxt_task_t *task,
112     nxt_router_temp_conf_t *tmcf, nxt_socket_conf_t *skcf,
113     nxt_conf_value_t *conf);
114 
115 static nxt_app_t *nxt_router_app_find(nxt_queue_t *queue, nxt_str_t *name);
116 static nxt_int_t nxt_router_apps_hash_test(nxt_lvlhsh_query_t *lhq, void *data);
117 static nxt_int_t nxt_router_apps_hash_add(nxt_router_conf_t *rtcf,
118     nxt_app_t *app);
119 static nxt_app_t *nxt_router_apps_hash_get(nxt_router_conf_t *rtcf,
120     nxt_str_t *name);
121 static void nxt_router_apps_hash_use(nxt_task_t *task, nxt_router_conf_t *rtcf,
122     int i);
123 
124 static nxt_int_t nxt_router_app_queue_init(nxt_task_t *task,
125     nxt_port_t *port);
126 static nxt_int_t nxt_router_port_queue_init(nxt_task_t *task,
127     nxt_port_t *port);
128 static nxt_int_t nxt_router_port_queue_map(nxt_task_t *task,
129     nxt_port_t *port, nxt_fd_t fd);
130 static void nxt_router_listen_socket_rpc_create(nxt_task_t *task,
131     nxt_router_temp_conf_t *tmcf, nxt_socket_conf_t *skcf);
132 static void nxt_router_listen_socket_ready(nxt_task_t *task,
133     nxt_port_recv_msg_t *msg, void *data);
134 static void nxt_router_listen_socket_error(nxt_task_t *task,
135     nxt_port_recv_msg_t *msg, void *data);
136 #if (NXT_TLS)
137 static void nxt_router_tls_rpc_handler(nxt_task_t *task,
138     nxt_port_recv_msg_t *msg, void *data);
139 static nxt_int_t nxt_router_conf_tls_insert(nxt_router_temp_conf_t *tmcf,
140     nxt_conf_value_t *value, nxt_socket_conf_t *skcf, nxt_tls_init_t *tls_init,
141     nxt_bool_t last);
142 #endif
143 static void nxt_router_app_rpc_create(nxt_task_t *task,
144     nxt_router_temp_conf_t *tmcf, nxt_app_t *app);
145 static void nxt_router_app_prefork_ready(nxt_task_t *task,
146     nxt_port_recv_msg_t *msg, void *data);
147 static void nxt_router_app_prefork_error(nxt_task_t *task,
148     nxt_port_recv_msg_t *msg, void *data);
149 static nxt_socket_conf_t *nxt_router_socket_conf(nxt_task_t *task,
150     nxt_router_temp_conf_t *tmcf, nxt_str_t *name);
151 static nxt_int_t nxt_router_listen_socket_find(nxt_router_temp_conf_t *tmcf,
152     nxt_socket_conf_t *nskcf, nxt_sockaddr_t *sa);
153 
154 static nxt_int_t nxt_router_engines_create(nxt_task_t *task,
155     nxt_router_t *router, nxt_router_temp_conf_t *tmcf,
156     const nxt_event_interface_t *interface);
157 static nxt_int_t nxt_router_engine_conf_create(nxt_router_temp_conf_t *tmcf,
158     nxt_router_engine_conf_t *recf);
159 static nxt_int_t nxt_router_engine_conf_update(nxt_router_temp_conf_t *tmcf,
160     nxt_router_engine_conf_t *recf);
161 static nxt_int_t nxt_router_engine_conf_delete(nxt_router_temp_conf_t *tmcf,
162     nxt_router_engine_conf_t *recf);
163 static nxt_int_t nxt_router_engine_joints_create(nxt_router_temp_conf_t *tmcf,
164     nxt_router_engine_conf_t *recf, nxt_queue_t *sockets,
165     nxt_work_handler_t handler);
166 static nxt_int_t nxt_router_engine_quit(nxt_router_temp_conf_t *tmcf,
167     nxt_router_engine_conf_t *recf);
168 static nxt_int_t nxt_router_engine_joints_delete(nxt_router_temp_conf_t *tmcf,
169     nxt_router_engine_conf_t *recf, nxt_queue_t *sockets);
170 
171 static nxt_int_t nxt_router_threads_create(nxt_task_t *task, nxt_runtime_t *rt,
172     nxt_router_temp_conf_t *tmcf);
173 static nxt_int_t nxt_router_thread_create(nxt_task_t *task, nxt_runtime_t *rt,
174     nxt_event_engine_t *engine);
175 static void nxt_router_apps_sort(nxt_task_t *task, nxt_router_t *router,
176     nxt_router_temp_conf_t *tmcf);
177 
178 static void nxt_router_engines_post(nxt_router_t *router,
179     nxt_router_temp_conf_t *tmcf);
180 static void nxt_router_engine_post(nxt_event_engine_t *engine,
181     nxt_work_t *jobs);
182 
183 static void nxt_router_thread_start(void *data);
184 static void nxt_router_rt_add_port(nxt_task_t *task, void *obj,
185     void *data);
186 static void nxt_router_listen_socket_create(nxt_task_t *task, void *obj,
187     void *data);
188 static void nxt_router_listen_socket_update(nxt_task_t *task, void *obj,
189     void *data);
190 static void nxt_router_listen_socket_delete(nxt_task_t *task, void *obj,
191     void *data);
192 static void nxt_router_worker_thread_quit(nxt_task_t *task, void *obj,
193     void *data);
194 static void nxt_router_listen_socket_close(nxt_task_t *task, void *obj,
195     void *data);
196 static void nxt_router_thread_exit_handler(nxt_task_t *task, void *obj,
197     void *data);
198 static void nxt_router_req_headers_ack_handler(nxt_task_t *task,
199     nxt_port_recv_msg_t *msg, nxt_request_rpc_data_t *req_rpc_data);
200 static void nxt_router_listen_socket_release(nxt_task_t *task,
201     nxt_socket_conf_t *skcf);
202 
203 static void nxt_router_access_log_writer(nxt_task_t *task,
204     nxt_http_request_t *r, nxt_router_access_log_t *access_log);
205 static u_char *nxt_router_access_log_date(u_char *buf, nxt_realtime_t *now,
206     struct tm *tm, size_t size, const char *format);
207 static void nxt_router_access_log_open(nxt_task_t *task,
208     nxt_router_temp_conf_t *tmcf);
209 static void nxt_router_access_log_ready(nxt_task_t *task,
210     nxt_port_recv_msg_t *msg, void *data);
211 static void nxt_router_access_log_error(nxt_task_t *task,
212     nxt_port_recv_msg_t *msg, void *data);
213 static void nxt_router_access_log_use(nxt_thread_spinlock_t *lock,
214     nxt_router_access_log_t *access_log);
215 static void nxt_router_access_log_release(nxt_task_t *task,
216     nxt_thread_spinlock_t *lock, nxt_router_access_log_t *access_log);
217 static void nxt_router_access_log_reopen_completion(nxt_task_t *task, void *obj,
218     void *data);
219 static void nxt_router_access_log_reopen_ready(nxt_task_t *task,
220     nxt_port_recv_msg_t *msg, void *data);
221 static void nxt_router_access_log_reopen_error(nxt_task_t *task,
222     nxt_port_recv_msg_t *msg, void *data);
223 
224 static void nxt_router_app_port_ready(nxt_task_t *task,
225     nxt_port_recv_msg_t *msg, void *data);
226 static void nxt_router_app_port_error(nxt_task_t *task,
227     nxt_port_recv_msg_t *msg, void *data);
228 
229 static void nxt_router_app_use(nxt_task_t *task, nxt_app_t *app, int i);
230 static void nxt_router_app_unlink(nxt_task_t *task, nxt_app_t *app);
231 
232 static void nxt_router_app_port_release(nxt_task_t *task, nxt_app_t *app,
233     nxt_port_t *port, nxt_apr_action_t action);
234 static void nxt_router_app_port_get(nxt_task_t *task, nxt_app_t *app,
235     nxt_request_rpc_data_t *req_rpc_data);
236 static void nxt_router_http_request_error(nxt_task_t *task, void *obj,
237     void *data);
238 static void nxt_router_http_request_done(nxt_task_t *task, void *obj,
239     void *data);
240 
241 static void nxt_router_app_prepare_request(nxt_task_t *task,
242     nxt_request_rpc_data_t *req_rpc_data);
243 static nxt_buf_t *nxt_router_prepare_msg(nxt_task_t *task,
244     nxt_http_request_t *r, nxt_app_t *app, const nxt_str_t *prefix);
245 
246 static void nxt_router_app_timeout(nxt_task_t *task, void *obj, void *data);
247 static void nxt_router_adjust_idle_timer(nxt_task_t *task, void *obj,
248     void *data);
249 static void nxt_router_app_idle_timeout(nxt_task_t *task, void *obj,
250     void *data);
251 static void nxt_router_app_joint_release_handler(nxt_task_t *task, void *obj,
252     void *data);
253 static void nxt_router_free_app(nxt_task_t *task, void *obj, void *data);
254 
255 static const nxt_http_request_state_t  nxt_http_request_send_state;
256 static void nxt_http_request_send_body(nxt_task_t *task, void *obj, void *data);
257 
258 static void nxt_router_app_joint_use(nxt_task_t *task,
259     nxt_app_joint_t *app_joint, int i);
260 
261 static void nxt_router_http_request_release_post(nxt_task_t *task,
262     nxt_http_request_t *r);
263 static void nxt_router_http_request_release(nxt_task_t *task, void *obj,
264     void *data);
265 static void nxt_router_oosm_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg);
266 static void nxt_router_get_port_handler(nxt_task_t *task,
267     nxt_port_recv_msg_t *msg);
268 static void nxt_router_get_mmap_handler(nxt_task_t *task,
269     nxt_port_recv_msg_t *msg);
270 
271 extern const nxt_http_request_state_t  nxt_http_websocket;
272 
273 static nxt_router_t  *nxt_router;
274 
275 static const nxt_str_t http_prefix = nxt_string("HTTP_");
276 static const nxt_str_t empty_prefix = nxt_string("");
277 
278 static const nxt_str_t  *nxt_app_msg_prefix[] = {
279     &empty_prefix,
280     &empty_prefix,
281     &http_prefix,
282     &http_prefix,
283     &http_prefix,
284     &empty_prefix,
285 };
286 
287 
288 static const nxt_port_handlers_t  nxt_router_process_port_handlers = {
289     .quit         = nxt_signal_quit_handler,
290     .new_port     = nxt_router_new_port_handler,
291     .get_port     = nxt_router_get_port_handler,
292     .change_file  = nxt_port_change_log_file_handler,
293     .mmap         = nxt_port_mmap_handler,
294     .get_mmap     = nxt_router_get_mmap_handler,
295     .data         = nxt_router_conf_data_handler,
296     .app_restart  = nxt_router_app_restart_handler,
297     .remove_pid   = nxt_router_remove_pid_handler,
298     .access_log   = nxt_router_access_log_reopen_handler,
299     .rpc_ready    = nxt_port_rpc_handler,
300     .rpc_error    = nxt_port_rpc_handler,
301     .oosm         = nxt_router_oosm_handler,
302 };
303 
304 
305 const nxt_process_init_t  nxt_router_process = {
306     .name           = "router",
307     .type           = NXT_PROCESS_ROUTER,
308     .prefork        = nxt_router_prefork,
309     .restart        = 1,
310     .setup          = nxt_process_core_setup,
311     .start          = nxt_router_start,
312     .port_handlers  = &nxt_router_process_port_handlers,
313     .signals        = nxt_process_signals,
314 };
315 
316 
317 /* Queues of nxt_socket_conf_t */
318 nxt_queue_t  creating_sockets;
319 nxt_queue_t  pending_sockets;
320 nxt_queue_t  updating_sockets;
321 nxt_queue_t  keeping_sockets;
322 nxt_queue_t  deleting_sockets;
323 
324 
325 static nxt_int_t
326 nxt_router_prefork(nxt_task_t *task, nxt_process_t *process, nxt_mp_t *mp)
327 {
328     nxt_runtime_stop_app_processes(task, task->thread->runtime);
329 
330     return NXT_OK;
331 }
332 
333 
334 static nxt_int_t
335 nxt_router_start(nxt_task_t *task, nxt_process_data_t *data)
336 {
337     nxt_int_t      ret;
338     nxt_port_t     *controller_port;
339     nxt_router_t   *router;
340     nxt_runtime_t  *rt;
341 
342     rt = task->thread->runtime;
343 
344     nxt_log(task, NXT_LOG_INFO, "router started");
345 
346 #if (NXT_TLS)
347     rt->tls = nxt_service_get(rt->services, "SSL/TLS", "OpenSSL");
348     if (nxt_slow_path(rt->tls == NULL)) {
349         return NXT_ERROR;
350     }
351 
352     ret = rt->tls->library_init(task);
353     if (nxt_slow_path(ret != NXT_OK)) {
354         return ret;
355     }
356 #endif
357 
358     ret = nxt_http_init(task);
359     if (nxt_slow_path(ret != NXT_OK)) {
360         return ret;
361     }
362 
363     router = nxt_zalloc(sizeof(nxt_router_t));
364     if (nxt_slow_path(router == NULL)) {
365         return NXT_ERROR;
366     }
367 
368     nxt_queue_init(&router->engines);
369     nxt_queue_init(&router->sockets);
370     nxt_queue_init(&router->apps);
371 
372     nxt_router = router;
373 
374     controller_port = rt->port_by_type[NXT_PROCESS_CONTROLLER];
375     if (controller_port != NULL) {
376         nxt_router_greet_controller(task, controller_port);
377     }
378 
379     return NXT_OK;
380 }
381 
382 
383 static void
384 nxt_router_greet_controller(nxt_task_t *task, nxt_port_t *controller_port)
385 {
386     nxt_port_socket_write(task, controller_port, NXT_PORT_MSG_PROCESS_READY,
387                           -1, 0, 0, NULL);
388 }
389 
390 
391 static void
392 nxt_router_start_app_process_handler(nxt_task_t *task, nxt_port_t *port,
393     void *data)
394 {
395     size_t               size;
396     uint32_t             stream;
397     nxt_fd_t             port_fd, queue_fd;
398     nxt_int_t            ret;
399     nxt_app_t            *app;
400     nxt_buf_t            *b;
401     nxt_port_t           *dport;
402     nxt_runtime_t        *rt;
403     nxt_app_joint_rpc_t  *app_joint_rpc;
404 
405     app = data;
406 
407     nxt_thread_mutex_lock(&app->mutex);
408 
409     dport = app->proto_port;
410 
411     nxt_thread_mutex_unlock(&app->mutex);
412 
413     if (dport != NULL) {
414         nxt_debug(task, "app '%V' %p start process", &app->name, app);
415 
416         b = NULL;
417         port_fd = -1;
418         queue_fd = -1;
419 
420     } else {
421         if (app->proto_port_requests > 0) {
422             nxt_debug(task, "app '%V' %p wait for prototype process",
423                       &app->name, app);
424 
425             app->proto_port_requests++;
426 
427             goto skip;
428         }
429 
430         nxt_debug(task, "app '%V' %p start prototype process", &app->name, app);
431 
432         rt = task->thread->runtime;
433         dport = rt->port_by_type[NXT_PROCESS_MAIN];
434 
435         size = app->name.length + 1 + app->conf.length;
436 
437         b = nxt_buf_mem_alloc(task->thread->engine->mem_pool, size, 0);
438         if (nxt_slow_path(b == NULL)) {
439             goto failed;
440         }
441 
442         nxt_buf_cpystr(b, &app->name);
443         *b->mem.free++ = '\0';
444         nxt_buf_cpystr(b, &app->conf);
445 
446         port_fd = app->shared_port->pair[0];
447         queue_fd = app->shared_port->queue_fd;
448     }
449 
450     app_joint_rpc = nxt_port_rpc_register_handler_ex(task, port,
451                                                      nxt_router_app_port_ready,
452                                                      nxt_router_app_port_error,
453                                                    sizeof(nxt_app_joint_rpc_t));
454     if (nxt_slow_path(app_joint_rpc == NULL)) {
455         goto failed;
456     }
457 
458     stream = nxt_port_rpc_ex_stream(app_joint_rpc);
459 
460     ret = nxt_port_socket_write2(task, dport, NXT_PORT_MSG_START_PROCESS,
461                                  port_fd, queue_fd, stream, port->id, b);
462     if (nxt_slow_path(ret != NXT_OK)) {
463         nxt_port_rpc_cancel(task, port, stream);
464 
465         goto failed;
466     }
467 
468     app_joint_rpc->app_joint = app->joint;
469     app_joint_rpc->generation = app->generation;
470     app_joint_rpc->proto = (b != NULL);
471 
472     if (b != NULL) {
473         app->proto_port_requests++;
474 
475         b = NULL;
476     }
477 
478     nxt_router_app_joint_use(task, app->joint, 1);
479 
480 failed:
481 
482     if (b != NULL) {
483         nxt_mp_free(b->data, b);
484     }
485 
486 skip:
487 
488     nxt_router_app_use(task, app, -1);
489 }
490 
491 
492 static void
493 nxt_router_app_joint_use(nxt_task_t *task, nxt_app_joint_t *app_joint, int i)
494 {
495     app_joint->use_count += i;
496 
497     if (app_joint->use_count == 0) {
498         nxt_assert(app_joint->app == NULL);
499 
500         nxt_free(app_joint);
501     }
502 }
503 
504 
505 static nxt_int_t
506 nxt_router_start_app_process(nxt_task_t *task, nxt_app_t *app)
507 {
508     nxt_int_t      res;
509     nxt_port_t     *router_port;
510     nxt_runtime_t  *rt;
511 
512     nxt_debug(task, "app '%V' start process", &app->name);
513 
514     rt = task->thread->runtime;
515     router_port = rt->port_by_type[NXT_PROCESS_ROUTER];
516 
517     nxt_router_app_use(task, app, 1);
518 
519     res = nxt_port_post(task, router_port, nxt_router_start_app_process_handler,
520                         app);
521 
522     if (res == NXT_OK) {
523         return res;
524     }
525 
526     nxt_thread_mutex_lock(&app->mutex);
527 
528     app->pending_processes--;
529 
530     nxt_thread_mutex_unlock(&app->mutex);
531 
532     nxt_router_app_use(task, app, -1);
533 
534     return NXT_ERROR;
535 }
536 
537 
538 nxt_inline nxt_bool_t
539 nxt_router_msg_cancel(nxt_task_t *task, nxt_request_rpc_data_t *req_rpc_data)
540 {
541     nxt_buf_t       *b, *next;
542     nxt_bool_t      cancelled;
543     nxt_port_t      *app_port;
544     nxt_msg_info_t  *msg_info;
545 
546     msg_info = &req_rpc_data->msg_info;
547 
548     if (msg_info->buf == NULL) {
549         return 0;
550     }
551 
552     app_port = req_rpc_data->app_port;
553 
554     if (app_port != NULL && app_port->id == NXT_SHARED_PORT_ID) {
555         cancelled = nxt_app_queue_cancel(app_port->queue,
556                                          msg_info->tracking_cookie,
557                                          req_rpc_data->stream);
558 
559         if (cancelled) {
560             nxt_debug(task, "stream #%uD: cancelled by router",
561                       req_rpc_data->stream);
562         }
563 
564     } else {
565         cancelled = 0;
566     }
567 
568     for (b = msg_info->buf; b != NULL; b = next) {
569         next = b->next;
570         b->next = NULL;
571 
572         if (b->is_port_mmap_sent) {
573             b->is_port_mmap_sent = cancelled == 0;
574         }
575 
576         b->completion_handler(task, b, b->parent);
577     }
578 
579     msg_info->buf = NULL;
580 
581     return cancelled;
582 }
583 
584 
585 nxt_inline nxt_bool_t
586 nxt_queue_chk_remove(nxt_queue_link_t *lnk)
587 {
588     if (lnk->next != NULL) {
589         nxt_queue_remove(lnk);
590 
591         lnk->next = NULL;
592 
593         return 1;
594     }
595 
596     return 0;
597 }
598 
599 
600 nxt_inline void
601 nxt_request_rpc_data_unlink(nxt_task_t *task,
602     nxt_request_rpc_data_t *req_rpc_data)
603 {
604     nxt_app_t           *app;
605     nxt_bool_t          unlinked;
606     nxt_http_request_t  *r;
607 
608     nxt_router_msg_cancel(task, req_rpc_data);
609 
610     app = req_rpc_data->app;
611 
612     if (req_rpc_data->app_port != NULL) {
613         nxt_router_app_port_release(task, app, req_rpc_data->app_port,
614                                     req_rpc_data->apr_action);
615 
616         req_rpc_data->app_port = NULL;
617     }
618 
619     r = req_rpc_data->request;
620 
621     if (r != NULL) {
622         r->timer_data = NULL;
623 
624         nxt_router_http_request_release_post(task, r);
625 
626         r->req_rpc_data = NULL;
627         req_rpc_data->request = NULL;
628 
629         if (app != NULL) {
630             unlinked = 0;
631 
632             nxt_thread_mutex_lock(&app->mutex);
633 
634             if (r->app_link.next != NULL) {
635                 nxt_queue_remove(&r->app_link);
636                 r->app_link.next = NULL;
637 
638                 unlinked = 1;
639             }
640 
641             nxt_thread_mutex_unlock(&app->mutex);
642 
643             if (unlinked) {
644                 nxt_mp_release(r->mem_pool);
645             }
646         }
647     }
648 
649     if (app != NULL) {
650         nxt_router_app_use(task, app, -1);
651 
652         req_rpc_data->app = NULL;
653     }
654 
655     if (req_rpc_data->msg_info.body_fd != -1) {
656         nxt_fd_close(req_rpc_data->msg_info.body_fd);
657 
658         req_rpc_data->msg_info.body_fd = -1;
659     }
660 
661     if (req_rpc_data->rpc_cancel) {
662         req_rpc_data->rpc_cancel = 0;
663 
664         nxt_port_rpc_cancel(task, task->thread->engine->port,
665                             req_rpc_data->stream);
666     }
667 }
668 
669 
670 static void
671 nxt_router_new_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
672 {
673     nxt_int_t      res;
674     nxt_app_t      *app;
675     nxt_port_t     *port, *main_app_port;
676     nxt_runtime_t  *rt;
677 
678     nxt_port_new_port_handler(task, msg);
679 
680     port = msg->u.new_port;
681 
682     if (port != NULL && port->type == NXT_PROCESS_CONTROLLER) {
683         nxt_router_greet_controller(task, msg->u.new_port);
684     }
685 
686     if (port != NULL && port->type == NXT_PROCESS_PROTOTYPE)  {
687         nxt_port_rpc_handler(task, msg);
688 
689         return;
690     }
691 
692     if (port == NULL || port->type != NXT_PROCESS_APP) {
693 
694         if (msg->port_msg.stream == 0) {
695             return;
696         }
697 
698         msg->port_msg.type = _NXT_PORT_MSG_RPC_ERROR;
699 
700     } else {
701         if (msg->fd[1] != -1) {
702             res = nxt_router_port_queue_map(task, port, msg->fd[1]);
703             if (nxt_slow_path(res != NXT_OK)) {
704                 return;
705             }
706 
707             nxt_fd_close(msg->fd[1]);
708             msg->fd[1] = -1;
709         }
710     }
711 
712     if (msg->port_msg.stream != 0) {
713         nxt_port_rpc_handler(task, msg);
714         return;
715     }
716 
717     nxt_debug(task, "new port id %d (%d)", port->id, port->type);
718 
719     /*
720      * Port with "id == 0" is application 'main' port and it always
721      * should come with non-zero stream.
722      */
723     nxt_assert(port->id != 0);
724 
725     /* Find 'main' app port and get app reference. */
726     rt = task->thread->runtime;
727 
728     /*
729      * It is safe to access 'runtime->ports' hash because 'NEW_PORT'
730      * sent to main port (with id == 0) and processed in main thread.
731      */
732     main_app_port = nxt_port_hash_find(&rt->ports, port->pid, 0);
733     nxt_assert(main_app_port != NULL);
734 
735     app = main_app_port->app;
736 
737     if (nxt_fast_path(app != NULL)) {
738         nxt_thread_mutex_lock(&app->mutex);
739 
740         /* TODO here should be find-and-add code because there can be
741            port waiters in port_hash */
742         nxt_port_hash_add(&app->port_hash, port);
743         app->port_hash_count++;
744 
745         nxt_thread_mutex_unlock(&app->mutex);
746 
747         port->app = app;
748     }
749 
750     port->main_app_port = main_app_port;
751 
752     nxt_port_socket_write(task, port, NXT_PORT_MSG_PORT_ACK, -1, 0, 0, NULL);
753 }
754 
755 
756 static void
757 nxt_router_conf_data_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
758 {
759     void                    *p;
760     size_t                  size;
761     nxt_int_t               ret;
762     nxt_port_t              *port;
763     nxt_router_temp_conf_t  *tmcf;
764 
765     port = nxt_runtime_port_find(task->thread->runtime,
766                                  msg->port_msg.pid,
767                                  msg->port_msg.reply_port);
768     if (nxt_slow_path(port == NULL)) {
769         nxt_alert(task, "conf_data_handler: reply port not found");
770         return;
771     }
772 
773     p = MAP_FAILED;
774 
775     /*
776      * Ancient compilers like gcc 4.8.5 on CentOS 7 wants 'size' to be
777      * initialized in 'cleanup' section.
778      */
779     size = 0;
780 
781     tmcf = nxt_router_temp_conf(task);
782     if (nxt_slow_path(tmcf == NULL)) {
783         goto fail;
784     }
785 
786     if (nxt_slow_path(msg->fd[0] == -1)) {
787         nxt_alert(task, "conf_data_handler: invalid shm fd");
788         goto fail;
789     }
790 
791     if (nxt_buf_mem_used_size(&msg->buf->mem) != sizeof(size_t)) {
792         nxt_alert(task, "conf_data_handler: unexpected buffer size (%d)",
793                   (int) nxt_buf_mem_used_size(&msg->buf->mem));
794         goto fail;
795     }
796 
797     nxt_memcpy(&size, msg->buf->mem.pos, sizeof(size_t));
798 
799     p = nxt_mem_mmap(NULL, size, PROT_READ, MAP_SHARED, msg->fd[0], 0);
800 
801     nxt_fd_close(msg->fd[0]);
802     msg->fd[0] = -1;
803 
804     if (nxt_slow_path(p == MAP_FAILED)) {
805         goto fail;
806     }
807 
808     nxt_debug(task, "conf_data_handler(%uz): %*s", size, size, p);
809 
810     tmcf->router_conf->router = nxt_router;
811     tmcf->stream = msg->port_msg.stream;
812     tmcf->port = port;
813 
814     nxt_port_use(task, tmcf->port, 1);
815 
816     ret = nxt_router_conf_create(task, tmcf, p, nxt_pointer_to(p, size));
817 
818     if (nxt_fast_path(ret == NXT_OK)) {
819         nxt_router_conf_apply(task, tmcf, NULL);
820 
821     } else {
822         nxt_router_conf_error(task, tmcf);
823     }
824 
825     goto cleanup;
826 
827 fail:
828 
829     nxt_port_socket_write(task, port, NXT_PORT_MSG_RPC_ERROR, -1,
830                           msg->port_msg.stream, 0, NULL);
831 
832     if (tmcf != NULL) {
833         nxt_mp_release(tmcf->mem_pool);
834     }
835 
836 cleanup:
837 
838     if (p != MAP_FAILED) {
839         nxt_mem_munmap(p, size);
840     }
841 
842     if (msg->fd[0] != -1) {
843         nxt_fd_close(msg->fd[0]);
844         msg->fd[0] = -1;
845     }
846 }
847 
848 
849 static void
850 nxt_router_app_restart_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
851 {
852     nxt_app_t            *app;
853     nxt_int_t            ret;
854     nxt_str_t            app_name;
855     nxt_port_t           *reply_port, *shared_port, *old_shared_port;
856     nxt_port_t           *proto_port;
857     nxt_port_msg_type_t  reply;
858 
859     reply_port = nxt_runtime_port_find(task->thread->runtime,
860                                        msg->port_msg.pid,
861                                        msg->port_msg.reply_port);
862     if (nxt_slow_path(reply_port == NULL)) {
863         nxt_alert(task, "app_restart_handler: reply port not found");
864         return;
865     }
866 
867     app_name.length = nxt_buf_mem_used_size(&msg->buf->mem);
868     app_name.start = msg->buf->mem.pos;
869 
870     nxt_debug(task, "app_restart_handler: %V", &app_name);
871 
872     app = nxt_router_app_find(&nxt_router->apps, &app_name);
873 
874     if (nxt_fast_path(app != NULL)) {
875         shared_port = nxt_port_new(task, NXT_SHARED_PORT_ID, nxt_pid,
876                                    NXT_PROCESS_APP);
877         if (nxt_slow_path(shared_port == NULL)) {
878             goto fail;
879         }
880 
881         ret = nxt_port_socket_init(task, shared_port, 0);
882         if (nxt_slow_path(ret != NXT_OK)) {
883             nxt_port_use(task, shared_port, -1);
884             goto fail;
885         }
886 
887         ret = nxt_router_app_queue_init(task, shared_port);
888         if (nxt_slow_path(ret != NXT_OK)) {
889             nxt_port_write_close(shared_port);
890             nxt_port_read_close(shared_port);
891             nxt_port_use(task, shared_port, -1);
892             goto fail;
893         }
894 
895         nxt_port_write_enable(task, shared_port);
896 
897         nxt_thread_mutex_lock(&app->mutex);
898 
899         proto_port = app->proto_port;
900 
901         if (proto_port != NULL) {
902             nxt_debug(task, "send QUIT to prototype '%V' pid %PI", &app->name,
903                       proto_port->pid);
904 
905             app->proto_port = NULL;
906             proto_port->app = NULL;
907         }
908 
909         app->generation++;
910 
911         shared_port->app = app;
912 
913         old_shared_port = app->shared_port;
914         old_shared_port->app = NULL;
915 
916         app->shared_port = shared_port;
917 
918         nxt_thread_mutex_unlock(&app->mutex);
919 
920         nxt_port_close(task, old_shared_port);
921         nxt_port_use(task, old_shared_port, -1);
922 
923         if (proto_port != NULL) {
924             (void) nxt_port_socket_write(task, proto_port, NXT_PORT_MSG_QUIT,
925                                          -1, 0, 0, NULL);
926 
927             nxt_port_close(task, proto_port);
928 
929             nxt_port_use(task, proto_port, -1);
930         }
931 
932         reply = NXT_PORT_MSG_RPC_READY_LAST;
933 
934     } else {
935 
936 fail:
937 
938         reply = NXT_PORT_MSG_RPC_ERROR;
939     }
940 
941     nxt_port_socket_write(task, reply_port, reply, -1, msg->port_msg.stream,
942                           0, NULL);
943 }
944 
945 
946 static void
947 nxt_router_app_process_remove_pid(nxt_task_t *task, nxt_port_t *port,
948     void *data)
949 {
950     union {
951         nxt_pid_t  removed_pid;
952         void       *data;
953     } u;
954 
955     u.data = data;
956 
957     nxt_port_rpc_remove_peer(task, port, u.removed_pid);
958 }
959 
960 
961 static void
962 nxt_router_remove_pid_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
963 {
964     nxt_event_engine_t  *engine;
965 
966     nxt_port_remove_pid_handler(task, msg);
967 
968     nxt_queue_each(engine, &nxt_router->engines, nxt_event_engine_t, link0)
969     {
970         if (nxt_fast_path(engine->port != NULL)) {
971             nxt_port_post(task, engine->port, nxt_router_app_process_remove_pid,
972                           msg->u.data);
973         }
974     }
975     nxt_queue_loop;
976 
977     if (msg->port_msg.stream == 0) {
978         return;
979     }
980 
981     msg->port_msg.type = _NXT_PORT_MSG_RPC_ERROR;
982 
983     nxt_port_rpc_handler(task, msg);
984 }
985 
986 
987 static nxt_router_temp_conf_t *
988 nxt_router_temp_conf(nxt_task_t *task)
989 {
990     nxt_mp_t                *mp, *tmp;
991     nxt_router_conf_t       *rtcf;
992     nxt_router_temp_conf_t  *tmcf;
993 
994     mp = nxt_mp_create(1024, 128, 256, 32);
995     if (nxt_slow_path(mp == NULL)) {
996         return NULL;
997     }
998 
999     rtcf = nxt_mp_zget(mp, sizeof(nxt_router_conf_t));
1000     if (nxt_slow_path(rtcf == NULL)) {
1001         goto fail;
1002     }
1003 
1004     rtcf->mem_pool = mp;
1005 
1006     tmp = nxt_mp_create(1024, 128, 256, 32);
1007     if (nxt_slow_path(tmp == NULL)) {
1008         goto fail;
1009     }
1010 
1011     tmcf = nxt_mp_zget(tmp, sizeof(nxt_router_temp_conf_t));
1012     if (nxt_slow_path(tmcf == NULL)) {
1013         goto temp_fail;
1014     }
1015 
1016     tmcf->mem_pool = tmp;
1017     tmcf->router_conf = rtcf;
1018     tmcf->count = 1;
1019     tmcf->engine = task->thread->engine;
1020 
1021     tmcf->engines = nxt_array_create(tmcf->mem_pool, 4,
1022                                      sizeof(nxt_router_engine_conf_t));
1023     if (nxt_slow_path(tmcf->engines == NULL)) {
1024         goto temp_fail;
1025     }
1026 
1027     nxt_queue_init(&creating_sockets);
1028     nxt_queue_init(&pending_sockets);
1029     nxt_queue_init(&updating_sockets);
1030     nxt_queue_init(&keeping_sockets);
1031     nxt_queue_init(&deleting_sockets);
1032 
1033 #if (NXT_TLS)
1034     nxt_queue_init(&tmcf->tls);
1035 #endif
1036 
1037     nxt_queue_init(&tmcf->apps);
1038     nxt_queue_init(&tmcf->previous);
1039 
1040     return tmcf;
1041 
1042 temp_fail:
1043 
1044     nxt_mp_destroy(tmp);
1045 
1046 fail:
1047 
1048     nxt_mp_destroy(mp);
1049 
1050     return NULL;
1051 }
1052 
1053 
1054 nxt_inline nxt_bool_t
1055 nxt_router_app_can_start(nxt_app_t *app)
1056 {
1057     return app->processes + app->pending_processes < app->max_processes
1058             && app->pending_processes < app->max_pending_processes;
1059 }
1060 
1061 
1062 nxt_inline nxt_bool_t
1063 nxt_router_app_need_start(nxt_app_t *app)
1064 {
1065     return (app->active_requests
1066               > app->port_hash_count + app->pending_processes)
1067            || (app->spare_processes
1068                 > app->idle_processes + app->pending_processes);
1069 }
1070 
1071 
1072 static void
1073 nxt_router_conf_apply(nxt_task_t *task, void *obj, void *data)
1074 {
1075     nxt_int_t                    ret;
1076     nxt_app_t                    *app;
1077     nxt_router_t                 *router;
1078     nxt_runtime_t                *rt;
1079     nxt_queue_link_t             *qlk;
1080     nxt_socket_conf_t            *skcf;
1081     nxt_router_conf_t            *rtcf;
1082     nxt_router_temp_conf_t       *tmcf;
1083     const nxt_event_interface_t  *interface;
1084 #if (NXT_TLS)
1085     nxt_router_tlssock_t         *tls;
1086 #endif
1087 
1088     tmcf = obj;
1089 
1090     qlk = nxt_queue_first(&pending_sockets);
1091 
1092     if (qlk != nxt_queue_tail(&pending_sockets)) {
1093         nxt_queue_remove(qlk);
1094         nxt_queue_insert_tail(&creating_sockets, qlk);
1095 
1096         skcf = nxt_queue_link_data(qlk, nxt_socket_conf_t, link);
1097 
1098         nxt_router_listen_socket_rpc_create(task, tmcf, skcf);
1099 
1100         return;
1101     }
1102 
1103 #if (NXT_TLS)
1104     qlk = nxt_queue_last(&tmcf->tls);
1105 
1106     if (qlk != nxt_queue_head(&tmcf->tls)) {
1107         nxt_queue_remove(qlk);
1108 
1109         tls = nxt_queue_link_data(qlk, nxt_router_tlssock_t, link);
1110 
1111         nxt_cert_store_get(task, &tls->name, tmcf->mem_pool,
1112                            nxt_router_tls_rpc_handler, tls);
1113         return;
1114     }
1115 #endif
1116 
1117     nxt_queue_each(app, &tmcf->apps, nxt_app_t, link) {
1118 
1119         if (nxt_router_app_need_start(app)) {
1120             nxt_router_app_rpc_create(task, tmcf, app);
1121             return;
1122         }
1123 
1124     } nxt_queue_loop;
1125 
1126     rtcf = tmcf->router_conf;
1127 
1128     if (rtcf->access_log != NULL && rtcf->access_log->fd == -1) {
1129         nxt_router_access_log_open(task, tmcf);
1130         return;
1131     }
1132 
1133     rt = task->thread->runtime;
1134 
1135     interface = nxt_service_get(rt->services, "engine", NULL);
1136 
1137     router = rtcf->router;
1138 
1139     ret = nxt_router_engines_create(task, router, tmcf, interface);
1140     if (nxt_slow_path(ret != NXT_OK)) {
1141         goto fail;
1142     }
1143 
1144     ret = nxt_router_threads_create(task, rt, tmcf);
1145     if (nxt_slow_path(ret != NXT_OK)) {
1146         goto fail;
1147     }
1148 
1149     nxt_router_apps_sort(task, router, tmcf);
1150 
1151     nxt_router_apps_hash_use(task, rtcf, 1);
1152 
1153     nxt_router_engines_post(router, tmcf);
1154 
1155     nxt_queue_add(&router->sockets, &updating_sockets);
1156     nxt_queue_add(&router->sockets, &creating_sockets);
1157 
1158     if (router->access_log != rtcf->access_log) {
1159         nxt_router_access_log_use(&router->lock, rtcf->access_log);
1160 
1161         nxt_router_access_log_release(task, &router->lock, router->access_log);
1162 
1163         router->access_log = rtcf->access_log;
1164     }
1165 
1166     nxt_router_conf_ready(task, tmcf);
1167 
1168     return;
1169 
1170 fail:
1171 
1172     nxt_router_conf_error(task, tmcf);
1173 
1174     return;
1175 }
1176 
1177 
1178 static void
1179 nxt_router_conf_wait(nxt_task_t *task, void *obj, void *data)
1180 {
1181     nxt_joint_job_t  *job;
1182 
1183     job = obj;
1184 
1185     nxt_router_conf_ready(task, job->tmcf);
1186 }
1187 
1188 
1189 static void
1190 nxt_router_conf_ready(nxt_task_t *task, nxt_router_temp_conf_t *tmcf)
1191 {
1192     uint32_t               count;
1193     nxt_router_conf_t      *rtcf;
1194     nxt_thread_spinlock_t  *lock;
1195 
1196     nxt_debug(task, "temp conf %p count: %D", tmcf, tmcf->count);
1197 
1198     if (--tmcf->count > 0) {
1199         return;
1200     }
1201 
1202     nxt_router_conf_send(task, tmcf, NXT_PORT_MSG_RPC_READY_LAST);
1203 
1204     rtcf = tmcf->router_conf;
1205 
1206     lock = &rtcf->router->lock;
1207 
1208     nxt_thread_spin_lock(lock);
1209 
1210     count = rtcf->count;
1211 
1212     nxt_thread_spin_unlock(lock);
1213 
1214     nxt_debug(task, "rtcf %p: %D", rtcf, count);
1215 
1216     if (count == 0) {
1217         nxt_router_apps_hash_use(task, rtcf, -1);
1218 
1219         nxt_router_access_log_release(task, lock, rtcf->access_log);
1220 
1221         nxt_mp_destroy(rtcf->mem_pool);
1222     }
1223 
1224     nxt_mp_release(tmcf->mem_pool);
1225 }
1226 
1227 
1228 static void
1229 nxt_router_conf_error(nxt_task_t *task, nxt_router_temp_conf_t *tmcf)
1230 {
1231     nxt_app_t          *app;
1232     nxt_queue_t        new_socket_confs;
1233     nxt_socket_t       s;
1234     nxt_router_t       *router;
1235     nxt_queue_link_t   *qlk;
1236     nxt_socket_conf_t  *skcf;
1237     nxt_router_conf_t  *rtcf;
1238 
1239     nxt_alert(task, "failed to apply new conf");
1240 
1241     for (qlk = nxt_queue_first(&creating_sockets);
1242          qlk != nxt_queue_tail(&creating_sockets);
1243          qlk = nxt_queue_next(qlk))
1244     {
1245         skcf = nxt_queue_link_data(qlk, nxt_socket_conf_t, link);
1246         s = skcf->listen->socket;
1247 
1248         if (s != -1) {
1249             nxt_socket_close(task, s);
1250         }
1251 
1252         nxt_free(skcf->listen);
1253     }
1254 
1255     nxt_queue_init(&new_socket_confs);
1256     nxt_queue_add(&new_socket_confs, &updating_sockets);
1257     nxt_queue_add(&new_socket_confs, &pending_sockets);
1258     nxt_queue_add(&new_socket_confs, &creating_sockets);
1259 
1260     rtcf = tmcf->router_conf;
1261 
1262     nxt_queue_each(app, &tmcf->apps, nxt_app_t, link) {
1263 
1264         nxt_router_app_unlink(task, app);
1265 
1266     } nxt_queue_loop;
1267 
1268     router = rtcf->router;
1269 
1270     nxt_queue_add(&router->sockets, &keeping_sockets);
1271     nxt_queue_add(&router->sockets, &deleting_sockets);
1272 
1273     nxt_queue_add(&router->apps, &tmcf->previous);
1274 
1275     // TODO: new engines and threads
1276 
1277     nxt_router_access_log_release(task, &router->lock, rtcf->access_log);
1278 
1279     nxt_mp_destroy(rtcf->mem_pool);
1280 
1281     nxt_router_conf_send(task, tmcf, NXT_PORT_MSG_RPC_ERROR);
1282 
1283     nxt_mp_release(tmcf->mem_pool);
1284 }
1285 
1286 
1287 static void
1288 nxt_router_conf_send(nxt_task_t *task, nxt_router_temp_conf_t *tmcf,
1289     nxt_port_msg_type_t type)
1290 {
1291     nxt_port_socket_write(task, tmcf->port, type, -1, tmcf->stream, 0, NULL);
1292 
1293     nxt_port_use(task, tmcf->port, -1);
1294 
1295     tmcf->port = NULL;
1296 }
1297 
1298 
1299 static nxt_conf_map_t  nxt_router_conf[] = {
1300     {
1301         nxt_string("listeners_threads"),
1302         NXT_CONF_MAP_INT32,
1303         offsetof(nxt_router_conf_t, threads),
1304     },
1305 };
1306 
1307 
1308 static nxt_conf_map_t  nxt_router_app_conf[] = {
1309     {
1310         nxt_string("type"),
1311         NXT_CONF_MAP_STR,
1312         offsetof(nxt_router_app_conf_t, type),
1313     },
1314 
1315     {
1316         nxt_string("limits"),
1317         NXT_CONF_MAP_PTR,
1318         offsetof(nxt_router_app_conf_t, limits_value),
1319     },
1320 
1321     {
1322         nxt_string("processes"),
1323         NXT_CONF_MAP_INT32,
1324         offsetof(nxt_router_app_conf_t, processes),
1325     },
1326 
1327     {
1328         nxt_string("processes"),
1329         NXT_CONF_MAP_PTR,
1330         offsetof(nxt_router_app_conf_t, processes_value),
1331     },
1332 
1333     {
1334         nxt_string("targets"),
1335         NXT_CONF_MAP_PTR,
1336         offsetof(nxt_router_app_conf_t, targets_value),
1337     },
1338 };
1339 
1340 
1341 static nxt_conf_map_t  nxt_router_app_limits_conf[] = {
1342     {
1343         nxt_string("timeout"),
1344         NXT_CONF_MAP_MSEC,
1345         offsetof(nxt_router_app_conf_t, timeout),
1346     },
1347 };
1348 
1349 
1350 static nxt_conf_map_t  nxt_router_app_processes_conf[] = {
1351     {
1352         nxt_string("spare"),
1353         NXT_CONF_MAP_INT32,
1354         offsetof(nxt_router_app_conf_t, spare_processes),
1355     },
1356 
1357     {
1358         nxt_string("max"),
1359         NXT_CONF_MAP_INT32,
1360         offsetof(nxt_router_app_conf_t, max_processes),
1361     },
1362 
1363     {
1364         nxt_string("idle_timeout"),
1365         NXT_CONF_MAP_MSEC,
1366         offsetof(nxt_router_app_conf_t, idle_timeout),
1367     },
1368 };
1369 
1370 
1371 static nxt_conf_map_t  nxt_router_listener_conf[] = {
1372     {
1373         nxt_string("pass"),
1374         NXT_CONF_MAP_STR_COPY,
1375         offsetof(nxt_router_listener_conf_t, pass),
1376     },
1377 
1378     {
1379         nxt_string("application"),
1380         NXT_CONF_MAP_STR_COPY,
1381         offsetof(nxt_router_listener_conf_t, application),
1382     },
1383 };
1384 
1385 
1386 static nxt_conf_map_t  nxt_router_http_conf[] = {
1387     {
1388         nxt_string("header_buffer_size"),
1389         NXT_CONF_MAP_SIZE,
1390         offsetof(nxt_socket_conf_t, header_buffer_size),
1391     },
1392 
1393     {
1394         nxt_string("large_header_buffer_size"),
1395         NXT_CONF_MAP_SIZE,
1396         offsetof(nxt_socket_conf_t, large_header_buffer_size),
1397     },
1398 
1399     {
1400         nxt_string("large_header_buffers"),
1401         NXT_CONF_MAP_SIZE,
1402         offsetof(nxt_socket_conf_t, large_header_buffers),
1403     },
1404 
1405     {
1406         nxt_string("body_buffer_size"),
1407         NXT_CONF_MAP_SIZE,
1408         offsetof(nxt_socket_conf_t, body_buffer_size),
1409     },
1410 
1411     {
1412         nxt_string("max_body_size"),
1413         NXT_CONF_MAP_SIZE,
1414         offsetof(nxt_socket_conf_t, max_body_size),
1415     },
1416 
1417     {
1418         nxt_string("idle_timeout"),
1419         NXT_CONF_MAP_MSEC,
1420         offsetof(nxt_socket_conf_t, idle_timeout),
1421     },
1422 
1423     {
1424         nxt_string("header_read_timeout"),
1425         NXT_CONF_MAP_MSEC,
1426         offsetof(nxt_socket_conf_t, header_read_timeout),
1427     },
1428 
1429     {
1430         nxt_string("body_read_timeout"),
1431         NXT_CONF_MAP_MSEC,
1432         offsetof(nxt_socket_conf_t, body_read_timeout),
1433     },
1434 
1435     {
1436         nxt_string("send_timeout"),
1437         NXT_CONF_MAP_MSEC,
1438         offsetof(nxt_socket_conf_t, send_timeout),
1439     },
1440 
1441     {
1442         nxt_string("body_temp_path"),
1443         NXT_CONF_MAP_STR,
1444         offsetof(nxt_socket_conf_t, body_temp_path),
1445     },
1446 
1447     {
1448         nxt_string("discard_unsafe_fields"),
1449         NXT_CONF_MAP_INT8,
1450         offsetof(nxt_socket_conf_t, discard_unsafe_fields),
1451     },
1452 };
1453 
1454 
1455 static nxt_conf_map_t  nxt_router_websocket_conf[] = {
1456     {
1457         nxt_string("max_frame_size"),
1458         NXT_CONF_MAP_SIZE,
1459         offsetof(nxt_websocket_conf_t, max_frame_size),
1460     },
1461 
1462     {
1463         nxt_string("read_timeout"),
1464         NXT_CONF_MAP_MSEC,
1465         offsetof(nxt_websocket_conf_t, read_timeout),
1466     },
1467 
1468     {
1469         nxt_string("keepalive_interval"),
1470         NXT_CONF_MAP_MSEC,
1471         offsetof(nxt_websocket_conf_t, keepalive_interval),
1472     },
1473 
1474 };
1475 
1476 
1477 static nxt_int_t
1478 nxt_router_conf_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf,
1479     u_char *start, u_char *end)
1480 {
1481     u_char                      *p;
1482     size_t                      size;
1483     nxt_mp_t                    *mp, *app_mp;
1484     uint32_t                    next, next_target;
1485     nxt_int_t                   ret;
1486     nxt_str_t                   name, path, target;
1487     nxt_app_t                   *app, *prev;
1488     nxt_str_t                   *t, *s, *targets;
1489     nxt_uint_t                  n, i;
1490     nxt_port_t                  *port;
1491     nxt_router_t                *router;
1492     nxt_app_joint_t             *app_joint;
1493 #if (NXT_TLS)
1494     nxt_tls_init_t              *tls_init;
1495     nxt_conf_value_t            *certificate;
1496 #endif
1497     nxt_conf_value_t            *conf, *http, *value, *websocket;
1498     nxt_conf_value_t            *applications, *application;
1499     nxt_conf_value_t            *listeners, *listener;
1500     nxt_conf_value_t            *routes_conf, *static_conf, *client_ip_conf;
1501     nxt_socket_conf_t           *skcf;
1502     nxt_http_routes_t           *routes;
1503     nxt_event_engine_t          *engine;
1504     nxt_app_lang_module_t       *lang;
1505     nxt_router_app_conf_t       apcf;
1506     nxt_router_access_log_t     *access_log;
1507     nxt_router_listener_conf_t  lscf;
1508 
1509     static nxt_str_t  http_path = nxt_string("/settings/http");
1510     static nxt_str_t  applications_path = nxt_string("/applications");
1511     static nxt_str_t  listeners_path = nxt_string("/listeners");
1512     static nxt_str_t  routes_path = nxt_string("/routes");
1513     static nxt_str_t  access_log_path = nxt_string("/access_log");
1514 #if (NXT_TLS)
1515     static nxt_str_t  certificate_path = nxt_string("/tls/certificate");
1516     static nxt_str_t  conf_commands_path = nxt_string("/tls/conf_commands");
1517     static nxt_str_t  conf_cache_path = nxt_string("/tls/session/cache_size");
1518     static nxt_str_t  conf_timeout_path = nxt_string("/tls/session/timeout");
1519     static nxt_str_t  conf_tickets = nxt_string("/tls/session/tickets");
1520 #endif
1521     static nxt_str_t  static_path = nxt_string("/settings/http/static");
1522     static nxt_str_t  websocket_path = nxt_string("/settings/http/websocket");
1523     static nxt_str_t  client_ip_path = nxt_string("/client_ip");
1524 
1525     conf = nxt_conf_json_parse(tmcf->mem_pool, start, end, NULL);
1526     if (conf == NULL) {
1527         nxt_alert(task, "configuration parsing error");
1528         return NXT_ERROR;
1529     }
1530 
1531     mp = tmcf->router_conf->mem_pool;
1532 
1533     ret = nxt_conf_map_object(mp, conf, nxt_router_conf,
1534                               nxt_nitems(nxt_router_conf), tmcf->router_conf);
1535     if (ret != NXT_OK) {
1536         nxt_alert(task, "root map error");
1537         return NXT_ERROR;
1538     }
1539 
1540     if (tmcf->router_conf->threads == 0) {
1541         tmcf->router_conf->threads = nxt_ncpu;
1542     }
1543 
1544     static_conf = nxt_conf_get_path(conf, &static_path);
1545 
1546     ret = nxt_router_conf_process_static(task, tmcf->router_conf, static_conf);
1547     if (nxt_slow_path(ret != NXT_OK)) {
1548         return NXT_ERROR;
1549     }
1550 
1551     router = tmcf->router_conf->router;
1552 
1553     applications = nxt_conf_get_path(conf, &applications_path);
1554 
1555     if (applications != NULL) {
1556         next = 0;
1557 
1558         for ( ;; ) {
1559             application = nxt_conf_next_object_member(applications,
1560                                                       &name, &next);
1561             if (application == NULL) {
1562                 break;
1563             }
1564 
1565             nxt_debug(task, "application \"%V\"", &name);
1566 
1567             size = nxt_conf_json_length(application, NULL);
1568 
1569             app_mp = nxt_mp_create(4096, 128, 1024, 64);
1570             if (nxt_slow_path(app_mp == NULL)) {
1571                 goto fail;
1572             }
1573 
1574             app = nxt_mp_get(app_mp, sizeof(nxt_app_t) + name.length + size);
1575             if (app == NULL) {
1576                 goto app_fail;
1577             }
1578 
1579             nxt_memzero(app, sizeof(nxt_app_t));
1580 
1581             app->mem_pool = app_mp;
1582 
1583             app->name.start = nxt_pointer_to(app, sizeof(nxt_app_t));
1584             app->conf.start = nxt_pointer_to(app, sizeof(nxt_app_t)
1585                                                   + name.length);
1586 
1587             p = nxt_conf_json_print(app->conf.start, application, NULL);
1588             app->conf.length = p - app->conf.start;
1589 
1590             nxt_assert(app->conf.length <= size);
1591 
1592             nxt_debug(task, "application conf \"%V\"", &app->conf);
1593 
1594             prev = nxt_router_app_find(&router->apps, &name);
1595 
1596             if (prev != NULL && nxt_strstr_eq(&app->conf, &prev->conf)) {
1597                 nxt_mp_destroy(app_mp);
1598 
1599                 nxt_queue_remove(&prev->link);
1600                 nxt_queue_insert_tail(&tmcf->previous, &prev->link);
1601 
1602                 ret = nxt_router_apps_hash_add(tmcf->router_conf, prev);
1603                 if (nxt_slow_path(ret != NXT_OK)) {
1604                     goto fail;
1605                 }
1606 
1607                 continue;
1608             }
1609 
1610             apcf.processes = 1;
1611             apcf.max_processes = 1;
1612             apcf.spare_processes = 0;
1613             apcf.timeout = 0;
1614             apcf.idle_timeout = 15000;
1615             apcf.limits_value = NULL;
1616             apcf.processes_value = NULL;
1617             apcf.targets_value = NULL;
1618 
1619             app_joint = nxt_malloc(sizeof(nxt_app_joint_t));
1620             if (nxt_slow_path(app_joint == NULL)) {
1621                 goto app_fail;
1622             }
1623 
1624             nxt_memzero(app_joint, sizeof(nxt_app_joint_t));
1625 
1626             ret = nxt_conf_map_object(mp, application, nxt_router_app_conf,
1627                                       nxt_nitems(nxt_router_app_conf), &apcf);
1628             if (ret != NXT_OK) {
1629                 nxt_alert(task, "application map error");
1630                 goto app_fail;
1631             }
1632 
1633             if (apcf.limits_value != NULL) {
1634 
1635                 if (nxt_conf_type(apcf.limits_value) != NXT_CONF_OBJECT) {
1636                     nxt_alert(task, "application limits is not object");
1637                     goto app_fail;
1638                 }
1639 
1640                 ret = nxt_conf_map_object(mp, apcf.limits_value,
1641                                         nxt_router_app_limits_conf,
1642                                         nxt_nitems(nxt_router_app_limits_conf),
1643                                         &apcf);
1644                 if (ret != NXT_OK) {
1645                     nxt_alert(task, "application limits map error");
1646                     goto app_fail;
1647                 }
1648             }
1649 
1650             if (apcf.processes_value != NULL
1651                 && nxt_conf_type(apcf.processes_value) == NXT_CONF_OBJECT)
1652             {
1653                 ret = nxt_conf_map_object(mp, apcf.processes_value,
1654                                      nxt_router_app_processes_conf,
1655                                      nxt_nitems(nxt_router_app_processes_conf),
1656                                      &apcf);
1657                 if (ret != NXT_OK) {
1658                     nxt_alert(task, "application processes map error");
1659                     goto app_fail;
1660                 }
1661 
1662             } else {
1663                 apcf.max_processes = apcf.processes;
1664                 apcf.spare_processes = apcf.processes;
1665             }
1666 
1667             if (apcf.targets_value != NULL) {
1668                 n = nxt_conf_object_members_count(apcf.targets_value);
1669 
1670                 targets = nxt_mp_get(app_mp, sizeof(nxt_str_t) * n);
1671                 if (nxt_slow_path(targets == NULL)) {
1672                     goto app_fail;
1673                 }
1674 
1675                 next_target = 0;
1676 
1677                 for (i = 0; i < n; i++) {
1678                     (void) nxt_conf_next_object_member(apcf.targets_value,
1679                                                        &target, &next_target);
1680 
1681                     s = nxt_str_dup(app_mp, &targets[i], &target);
1682                     if (nxt_slow_path(s == NULL)) {
1683                         goto app_fail;
1684                     }
1685                 }
1686 
1687             } else {
1688                 targets = NULL;
1689             }
1690 
1691             nxt_debug(task, "application type: %V", &apcf.type);
1692             nxt_debug(task, "application processes: %D", apcf.processes);
1693             nxt_debug(task, "application request timeout: %M", apcf.timeout);
1694 
1695             lang = nxt_app_lang_module(task->thread->runtime, &apcf.type);
1696 
1697             if (lang == NULL) {
1698                 nxt_alert(task, "unknown application type: \"%V\"", &apcf.type);
1699                 goto app_fail;
1700             }
1701 
1702             nxt_debug(task, "application language module: \"%s\"", lang->file);
1703 
1704             ret = nxt_thread_mutex_create(&app->mutex);
1705             if (ret != NXT_OK) {
1706                 goto app_fail;
1707             }
1708 
1709             nxt_queue_init(&app->ports);
1710             nxt_queue_init(&app->spare_ports);
1711             nxt_queue_init(&app->idle_ports);
1712             nxt_queue_init(&app->ack_waiting_req);
1713 
1714             app->name.length = name.length;
1715             nxt_memcpy(app->name.start, name.start, name.length);
1716 
1717             app->type = lang->type;
1718             app->max_processes = apcf.max_processes;
1719             app->spare_processes = apcf.spare_processes;
1720             app->max_pending_processes = apcf.spare_processes
1721                                          ? apcf.spare_processes : 1;
1722             app->timeout = apcf.timeout;
1723             app->idle_timeout = apcf.idle_timeout;
1724 
1725             app->targets = targets;
1726 
1727             engine = task->thread->engine;
1728 
1729             app->engine = engine;
1730 
1731             app->adjust_idle_work.handler = nxt_router_adjust_idle_timer;
1732             app->adjust_idle_work.task = &engine->task;
1733             app->adjust_idle_work.obj = app;
1734 
1735             nxt_queue_insert_tail(&tmcf->apps, &app->link);
1736 
1737             ret = nxt_router_apps_hash_add(tmcf->router_conf, app);
1738             if (nxt_slow_path(ret != NXT_OK)) {
1739                 goto app_fail;
1740             }
1741 
1742             nxt_router_app_use(task, app, 1);
1743 
1744             app->joint = app_joint;
1745 
1746             app_joint->use_count = 1;
1747             app_joint->app = app;
1748 
1749             app_joint->idle_timer.bias = NXT_TIMER_DEFAULT_BIAS;
1750             app_joint->idle_timer.work_queue = &engine->fast_work_queue;
1751             app_joint->idle_timer.handler = nxt_router_app_idle_timeout;
1752             app_joint->idle_timer.task = &engine->task;
1753             app_joint->idle_timer.log = app_joint->idle_timer.task->log;
1754 
1755             app_joint->free_app_work.handler = nxt_router_free_app;
1756             app_joint->free_app_work.task = &engine->task;
1757             app_joint->free_app_work.obj = app_joint;
1758 
1759             port = nxt_port_new(task, NXT_SHARED_PORT_ID, nxt_pid,
1760                                 NXT_PROCESS_APP);
1761             if (nxt_slow_path(port == NULL)) {
1762                 return NXT_ERROR;
1763             }
1764 
1765             ret = nxt_port_socket_init(task, port, 0);
1766             if (nxt_slow_path(ret != NXT_OK)) {
1767                 nxt_port_use(task, port, -1);
1768                 return NXT_ERROR;
1769             }
1770 
1771             ret = nxt_router_app_queue_init(task, port);
1772             if (nxt_slow_path(ret != NXT_OK)) {
1773                 nxt_port_write_close(port);
1774                 nxt_port_read_close(port);
1775                 nxt_port_use(task, port, -1);
1776                 return NXT_ERROR;
1777             }
1778 
1779             nxt_port_write_enable(task, port);
1780             port->app = app;
1781 
1782             app->shared_port = port;
1783 
1784             nxt_thread_mutex_create(&app->outgoing.mutex);
1785         }
1786     }
1787 
1788     routes_conf = nxt_conf_get_path(conf, &routes_path);
1789     if (nxt_fast_path(routes_conf != NULL)) {
1790         routes = nxt_http_routes_create(task, tmcf, routes_conf);
1791         if (nxt_slow_path(routes == NULL)) {
1792             return NXT_ERROR;
1793         }
1794         tmcf->router_conf->routes = routes;
1795     }
1796 
1797     ret = nxt_upstreams_create(task, tmcf, conf);
1798     if (nxt_slow_path(ret != NXT_OK)) {
1799         return ret;
1800     }
1801 
1802     http = nxt_conf_get_path(conf, &http_path);
1803 #if 0
1804     if (http == NULL) {
1805         nxt_alert(task, "no \"http\" block");
1806         return NXT_ERROR;
1807     }
1808 #endif
1809 
1810     websocket = nxt_conf_get_path(conf, &websocket_path);
1811 
1812     listeners = nxt_conf_get_path(conf, &listeners_path);
1813 
1814     if (listeners != NULL) {
1815         next = 0;
1816 
1817         for ( ;; ) {
1818             listener = nxt_conf_next_object_member(listeners, &name, &next);
1819             if (listener == NULL) {
1820                 break;
1821             }
1822 
1823             skcf = nxt_router_socket_conf(task, tmcf, &name);
1824             if (skcf == NULL) {
1825                 goto fail;
1826             }
1827 
1828             nxt_memzero(&lscf, sizeof(lscf));
1829 
1830             ret = nxt_conf_map_object(mp, listener, nxt_router_listener_conf,
1831                                       nxt_nitems(nxt_router_listener_conf),
1832                                       &lscf);
1833             if (ret != NXT_OK) {
1834                 nxt_alert(task, "listener map error");
1835                 goto fail;
1836             }
1837 
1838             nxt_debug(task, "application: %V", &lscf.application);
1839 
1840             // STUB, default values if http block is not defined.
1841             skcf->header_buffer_size = 2048;
1842             skcf->large_header_buffer_size = 8192;
1843             skcf->large_header_buffers = 4;
1844             skcf->discard_unsafe_fields = 1;
1845             skcf->body_buffer_size = 16 * 1024;
1846             skcf->max_body_size = 8 * 1024 * 1024;
1847             skcf->proxy_header_buffer_size = 64 * 1024;
1848             skcf->proxy_buffer_size = 4096;
1849             skcf->proxy_buffers = 256;
1850             skcf->idle_timeout = 180 * 1000;
1851             skcf->header_read_timeout = 30 * 1000;
1852             skcf->body_read_timeout = 30 * 1000;
1853             skcf->send_timeout = 30 * 1000;
1854             skcf->proxy_timeout = 60 * 1000;
1855             skcf->proxy_send_timeout = 30 * 1000;
1856             skcf->proxy_read_timeout = 30 * 1000;
1857 
1858             skcf->websocket_conf.max_frame_size = 1024 * 1024;
1859             skcf->websocket_conf.read_timeout = 60 * 1000;
1860             skcf->websocket_conf.keepalive_interval = 30 * 1000;
1861 
1862             nxt_str_null(&skcf->body_temp_path);
1863 
1864             if (http != NULL) {
1865                 ret = nxt_conf_map_object(mp, http, nxt_router_http_conf,
1866                                           nxt_nitems(nxt_router_http_conf),
1867                                           skcf);
1868                 if (ret != NXT_OK) {
1869                     nxt_alert(task, "http map error");
1870                     goto fail;
1871                 }
1872             }
1873 
1874             if (websocket != NULL) {
1875                 ret = nxt_conf_map_object(mp, websocket,
1876                                           nxt_router_websocket_conf,
1877                                           nxt_nitems(nxt_router_websocket_conf),
1878                                           &skcf->websocket_conf);
1879                 if (ret != NXT_OK) {
1880                     nxt_alert(task, "websocket map error");
1881                     goto fail;
1882                 }
1883             }
1884 
1885             t = &skcf->body_temp_path;
1886 
1887             if (t->length == 0) {
1888                 t->start = (u_char *) task->thread->runtime->tmp;
1889                 t->length = nxt_strlen(t->start);
1890             }
1891 
1892             client_ip_conf = nxt_conf_get_path(listener, &client_ip_path);
1893             ret = nxt_router_conf_process_client_ip(task, tmcf, skcf,
1894                                                     client_ip_conf);
1895             if (nxt_slow_path(ret != NXT_OK)) {
1896                 return NXT_ERROR;
1897             }
1898 
1899 #if (NXT_TLS)
1900             certificate = nxt_conf_get_path(listener, &certificate_path);
1901 
1902             if (certificate != NULL) {
1903                 tls_init = nxt_mp_get(tmcf->mem_pool, sizeof(nxt_tls_init_t));
1904                 if (nxt_slow_path(tls_init == NULL)) {
1905                     return NXT_ERROR;
1906                 }
1907 
1908                 tls_init->cache_size = 0;
1909                 tls_init->timeout = 300;
1910 
1911                 value = nxt_conf_get_path(listener, &conf_cache_path);
1912                 if (value != NULL) {
1913                     tls_init->cache_size = nxt_conf_get_number(value);
1914                 }
1915 
1916                 value = nxt_conf_get_path(listener, &conf_timeout_path);
1917                 if (value != NULL) {
1918                     tls_init->timeout = nxt_conf_get_number(value);
1919                 }
1920 
1921                 tls_init->conf_cmds = nxt_conf_get_path(listener,
1922                                                         &conf_commands_path);
1923 
1924                 tls_init->tickets_conf = nxt_conf_get_path(listener,
1925                                                            &conf_tickets);
1926 
1927                 n = nxt_conf_array_elements_count_or_1(certificate);
1928 
1929                 for (i = 0; i < n; i++) {
1930                     value = nxt_conf_get_array_element_or_itself(certificate,
1931                                                                  i);
1932                     nxt_assert(value != NULL);
1933 
1934                     ret = nxt_router_conf_tls_insert(tmcf, value, skcf,
1935                                                      tls_init, i == 0);
1936                     if (nxt_slow_path(ret != NXT_OK)) {
1937                         goto fail;
1938                     }
1939                 }
1940             }
1941 #endif
1942 
1943             skcf->listen->handler = nxt_http_conn_init;
1944             skcf->router_conf = tmcf->router_conf;
1945             skcf->router_conf->count++;
1946 
1947             if (lscf.pass.length != 0) {
1948                 skcf->action = nxt_http_action_create(task, tmcf, &lscf.pass);
1949 
1950             /* COMPATIBILITY: listener application. */
1951             } else if (lscf.application.length > 0) {
1952                 skcf->action = nxt_http_pass_application(task,
1953                                                          tmcf->router_conf,
1954                                                          &lscf.application);
1955             }
1956 
1957             if (nxt_slow_path(skcf->action == NULL)) {
1958                 goto fail;
1959             }
1960         }
1961     }
1962 
1963     ret = nxt_http_routes_resolve(task, tmcf);
1964     if (nxt_slow_path(ret != NXT_OK)) {
1965         goto fail;
1966     }
1967 
1968     value = nxt_conf_get_path(conf, &access_log_path);
1969 
1970     if (value != NULL) {
1971         nxt_conf_get_string(value, &path);
1972 
1973         access_log = router->access_log;
1974 
1975         if (access_log != NULL && nxt_strstr_eq(&path, &access_log->path)) {
1976             nxt_router_access_log_use(&router->lock, access_log);
1977 
1978         } else {
1979             access_log = nxt_malloc(sizeof(nxt_router_access_log_t)
1980                                     + path.length);
1981             if (access_log == NULL) {
1982                 nxt_alert(task, "failed to allocate access log structure");
1983                 goto fail;
1984             }
1985 
1986             access_log->fd = -1;
1987             access_log->handler = &nxt_router_access_log_writer;
1988             access_log->count = 1;
1989 
1990             access_log->path.length = path.length;
1991             access_log->path.start = (u_char *) access_log
1992                                      + sizeof(nxt_router_access_log_t);
1993 
1994             nxt_memcpy(access_log->path.start, path.start, path.length);
1995         }
1996 
1997         tmcf->router_conf->access_log = access_log;
1998     }
1999 
2000     nxt_queue_add(&deleting_sockets, &router->sockets);
2001     nxt_queue_init(&router->sockets);
2002 
2003     return NXT_OK;
2004 
2005 app_fail:
2006 
2007     nxt_mp_destroy(app_mp);
2008 
2009 fail:
2010 
2011     nxt_queue_each(app, &tmcf->apps, nxt_app_t, link) {
2012 
2013         nxt_queue_remove(&app->link);
2014         nxt_thread_mutex_destroy(&app->mutex);
2015         nxt_mp_destroy(app->mem_pool);
2016 
2017     } nxt_queue_loop;
2018 
2019     return NXT_ERROR;
2020 }
2021 
2022 
2023 #if (NXT_TLS)
2024 
2025 static nxt_int_t
2026 nxt_router_conf_tls_insert(nxt_router_temp_conf_t *tmcf,
2027     nxt_conf_value_t *value, nxt_socket_conf_t *skcf,
2028     nxt_tls_init_t *tls_init, nxt_bool_t last)
2029 {
2030     nxt_router_tlssock_t  *tls;
2031 
2032     tls = nxt_mp_get(tmcf->mem_pool, sizeof(nxt_router_tlssock_t));
2033     if (nxt_slow_path(tls == NULL)) {
2034         return NXT_ERROR;
2035     }
2036 
2037     tls->tls_init = tls_init;
2038     tls->socket_conf = skcf;
2039     tls->temp_conf = tmcf;
2040     tls->last = last;
2041     nxt_conf_get_string(value, &tls->name);
2042 
2043     nxt_queue_insert_tail(&tmcf->tls, &tls->link);
2044 
2045     return NXT_OK;
2046 }
2047 
2048 #endif
2049 
2050 
2051 static nxt_int_t
2052 nxt_router_conf_process_static(nxt_task_t *task, nxt_router_conf_t *rtcf,
2053     nxt_conf_value_t *conf)
2054 {
2055     uint32_t          next, i;
2056     nxt_mp_t          *mp;
2057     nxt_str_t         *type, exten, str;
2058     nxt_int_t         ret;
2059     nxt_uint_t        exts;
2060     nxt_conf_value_t  *mtypes_conf, *ext_conf, *value;
2061 
2062     static nxt_str_t  mtypes_path = nxt_string("/mime_types");
2063 
2064     mp = rtcf->mem_pool;
2065 
2066     ret = nxt_http_static_mtypes_init(mp, &rtcf->mtypes_hash);
2067     if (nxt_slow_path(ret != NXT_OK)) {
2068         return NXT_ERROR;
2069     }
2070 
2071     if (conf == NULL) {
2072         return NXT_OK;
2073     }
2074 
2075     mtypes_conf = nxt_conf_get_path(conf, &mtypes_path);
2076 
2077     if (mtypes_conf != NULL) {
2078         next = 0;
2079 
2080         for ( ;; ) {
2081             ext_conf = nxt_conf_next_object_member(mtypes_conf, &str, &next);
2082 
2083             if (ext_conf == NULL) {
2084                 break;
2085             }
2086 
2087             type = nxt_str_dup(mp, NULL, &str);
2088             if (nxt_slow_path(type == NULL)) {
2089                 return NXT_ERROR;
2090             }
2091 
2092             if (nxt_conf_type(ext_conf) == NXT_CONF_STRING) {
2093                 nxt_conf_get_string(ext_conf, &str);
2094 
2095                 if (nxt_slow_path(nxt_str_dup(mp, &exten, &str) == NULL)) {
2096                     return NXT_ERROR;
2097                 }
2098 
2099                 ret = nxt_http_static_mtypes_hash_add(mp, &rtcf->mtypes_hash,
2100                                                       &exten, type);
2101                 if (nxt_slow_path(ret != NXT_OK)) {
2102                     return NXT_ERROR;
2103                 }
2104 
2105                 continue;
2106             }
2107 
2108             exts = nxt_conf_array_elements_count(ext_conf);
2109 
2110             for (i = 0; i < exts; i++) {
2111                 value = nxt_conf_get_array_element(ext_conf, i);
2112 
2113                 nxt_conf_get_string(value, &str);
2114 
2115                 if (nxt_slow_path(nxt_str_dup(mp, &exten, &str) == NULL)) {
2116                     return NXT_ERROR;
2117                 }
2118 
2119                 ret = nxt_http_static_mtypes_hash_add(mp, &rtcf->mtypes_hash,
2120                                                       &exten, type);
2121                 if (nxt_slow_path(ret != NXT_OK)) {
2122                     return NXT_ERROR;
2123                 }
2124             }
2125         }
2126     }
2127 
2128     return NXT_OK;
2129 }
2130 
2131 
2132 static nxt_int_t
2133 nxt_router_conf_process_client_ip(nxt_task_t *task, nxt_router_temp_conf_t *tmcf,
2134     nxt_socket_conf_t *skcf, nxt_conf_value_t *conf)
2135 {
2136     char                        c;
2137     size_t                      i;
2138     nxt_mp_t                    *mp;
2139     uint32_t                    hash;
2140     nxt_str_t                   header;
2141     nxt_conf_value_t            *source_conf, *header_conf, *recursive_conf;
2142     nxt_http_client_ip_t        *client_ip;
2143     nxt_http_route_addr_rule_t  *source;
2144 
2145     static nxt_str_t  header_path = nxt_string("/header");
2146     static nxt_str_t  source_path = nxt_string("/source");
2147     static nxt_str_t  recursive_path = nxt_string("/recursive");
2148 
2149     if (conf == NULL) {
2150         skcf->client_ip = NULL;
2151 
2152         return NXT_OK;
2153     }
2154 
2155     mp = tmcf->router_conf->mem_pool;
2156 
2157     source_conf = nxt_conf_get_path(conf, &source_path);
2158     header_conf = nxt_conf_get_path(conf, &header_path);
2159     recursive_conf = nxt_conf_get_path(conf, &recursive_path);
2160 
2161     if (source_conf == NULL || header_conf == NULL) {
2162         return NXT_ERROR;
2163     }
2164 
2165     client_ip = nxt_mp_zget(mp, sizeof(nxt_http_client_ip_t));
2166     if (nxt_slow_path(client_ip == NULL)) {
2167         return NXT_ERROR;
2168     }
2169 
2170     source = nxt_http_route_addr_rule_create(task, mp, source_conf);
2171     if (nxt_slow_path(source == NULL)) {
2172         return NXT_ERROR;
2173     }
2174 
2175     client_ip->source = source;
2176 
2177     nxt_conf_get_string(header_conf, &header);
2178 
2179     if (recursive_conf != NULL) {
2180         client_ip->recursive = nxt_conf_get_boolean(recursive_conf);
2181     }
2182 
2183     client_ip->header = nxt_str_dup(mp, NULL, &header);
2184     if (nxt_slow_path(client_ip->header == NULL)) {
2185         return NXT_ERROR;
2186     }
2187 
2188     hash = NXT_HTTP_FIELD_HASH_INIT;
2189 
2190     for (i = 0; i < client_ip->header->length; i++) {
2191         c = client_ip->header->start[i];
2192         hash = nxt_http_field_hash_char(hash, nxt_lowcase(c));
2193     }
2194 
2195     hash = nxt_http_field_hash_end(hash) & 0xFFFF;
2196 
2197     client_ip->header_hash = hash;
2198 
2199     skcf->client_ip = client_ip;
2200 
2201     return NXT_OK;
2202 }
2203 
2204 
2205 static nxt_app_t *
2206 nxt_router_app_find(nxt_queue_t *queue, nxt_str_t *name)
2207 {
2208     nxt_app_t  *app;
2209 
2210     nxt_queue_each(app, queue, nxt_app_t, link) {
2211 
2212         if (nxt_strstr_eq(name, &app->name)) {
2213             return app;
2214         }
2215 
2216     } nxt_queue_loop;
2217 
2218     return NULL;
2219 }
2220 
2221 
2222 static nxt_int_t
2223 nxt_router_app_queue_init(nxt_task_t *task, nxt_port_t *port)
2224 {
2225     void       *mem;
2226     nxt_int_t  fd;
2227 
2228     fd = nxt_shm_open(task, sizeof(nxt_app_queue_t));
2229     if (nxt_slow_path(fd == -1)) {
2230         return NXT_ERROR;
2231     }
2232 
2233     mem = nxt_mem_mmap(NULL, sizeof(nxt_app_queue_t),
2234                        PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
2235     if (nxt_slow_path(mem == MAP_FAILED)) {
2236         nxt_fd_close(fd);
2237 
2238         return NXT_ERROR;
2239     }
2240 
2241     nxt_app_queue_init(mem);
2242 
2243     port->queue_fd = fd;
2244     port->queue = mem;
2245 
2246     return NXT_OK;
2247 }
2248 
2249 
2250 static nxt_int_t
2251 nxt_router_port_queue_init(nxt_task_t *task, nxt_port_t *port)
2252 {
2253     void       *mem;
2254     nxt_int_t  fd;
2255 
2256     fd = nxt_shm_open(task, sizeof(nxt_port_queue_t));
2257     if (nxt_slow_path(fd == -1)) {
2258         return NXT_ERROR;
2259     }
2260 
2261     mem = nxt_mem_mmap(NULL, sizeof(nxt_port_queue_t),
2262                        PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
2263     if (nxt_slow_path(mem == MAP_FAILED)) {
2264         nxt_fd_close(fd);
2265 
2266         return NXT_ERROR;
2267     }
2268 
2269     nxt_port_queue_init(mem);
2270 
2271     port->queue_fd = fd;
2272     port->queue = mem;
2273 
2274     return NXT_OK;
2275 }
2276 
2277 
2278 static nxt_int_t
2279 nxt_router_port_queue_map(nxt_task_t *task, nxt_port_t *port, nxt_fd_t fd)
2280 {
2281     void  *mem;
2282 
2283     nxt_assert(fd != -1);
2284 
2285     mem = nxt_mem_mmap(NULL, sizeof(nxt_port_queue_t),
2286                        PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
2287     if (nxt_slow_path(mem == MAP_FAILED)) {
2288 
2289         return NXT_ERROR;
2290     }
2291 
2292     port->queue = mem;
2293 
2294     return NXT_OK;
2295 }
2296 
2297 
2298 static const nxt_lvlhsh_proto_t  nxt_router_apps_hash_proto  nxt_aligned(64) = {
2299     NXT_LVLHSH_DEFAULT,
2300     nxt_router_apps_hash_test,
2301     nxt_mp_lvlhsh_alloc,
2302     nxt_mp_lvlhsh_free,
2303 };
2304 
2305 
2306 static nxt_int_t
2307 nxt_router_apps_hash_test(nxt_lvlhsh_query_t *lhq, void *data)
2308 {
2309     nxt_app_t  *app;
2310 
2311     app = data;
2312 
2313     return nxt_strstr_eq(&lhq->key, &app->name) ? NXT_OK : NXT_DECLINED;
2314 }
2315 
2316 
2317 static nxt_int_t
2318 nxt_router_apps_hash_add(nxt_router_conf_t *rtcf, nxt_app_t *app)
2319 {
2320     nxt_lvlhsh_query_t  lhq;
2321 
2322     lhq.key_hash = nxt_djb_hash(app->name.start, app->name.length);
2323     lhq.replace = 0;
2324     lhq.key = app->name;
2325     lhq.value = app;
2326     lhq.proto = &nxt_router_apps_hash_proto;
2327     lhq.pool = rtcf->mem_pool;
2328 
2329     switch (nxt_lvlhsh_insert(&rtcf->apps_hash, &lhq)) {
2330 
2331     case NXT_OK:
2332         return NXT_OK;
2333 
2334     case NXT_DECLINED:
2335         nxt_thread_log_alert("router app hash adding failed: "
2336                              "\"%V\" is already in hash", &lhq.key);
2337         /* Fall through. */
2338     default:
2339         return NXT_ERROR;
2340     }
2341 }
2342 
2343 
2344 static nxt_app_t *
2345 nxt_router_apps_hash_get(nxt_router_conf_t *rtcf, nxt_str_t *name)
2346 {
2347     nxt_lvlhsh_query_t  lhq;
2348 
2349     lhq.key_hash = nxt_djb_hash(name->start, name->length);
2350     lhq.key = *name;
2351     lhq.proto = &nxt_router_apps_hash_proto;
2352 
2353     if (nxt_lvlhsh_find(&rtcf->apps_hash, &lhq) != NXT_OK) {
2354         return NULL;
2355     }
2356 
2357     return lhq.value;
2358 }
2359 
2360 
2361 static void
2362 nxt_router_apps_hash_use(nxt_task_t *task, nxt_router_conf_t *rtcf, int i)
2363 {
2364     nxt_app_t          *app;
2365     nxt_lvlhsh_each_t  lhe;
2366 
2367     nxt_lvlhsh_each_init(&lhe, &nxt_router_apps_hash_proto);
2368 
2369     for ( ;; ) {
2370         app = nxt_lvlhsh_each(&rtcf->apps_hash, &lhe);
2371 
2372         if (app == NULL) {
2373             break;
2374         }
2375 
2376         nxt_router_app_use(task, app, i);
2377     }
2378 }
2379 
2380 
2381 typedef struct {
2382     nxt_app_t  *app;
2383     nxt_int_t  target;
2384 } nxt_http_app_conf_t;
2385 
2386 
2387 nxt_int_t
2388 nxt_router_application_init(nxt_router_conf_t *rtcf, nxt_str_t *name,
2389     nxt_str_t *target, nxt_http_action_t *action)
2390 {
2391     nxt_app_t            *app;
2392     nxt_str_t            *targets;
2393     nxt_uint_t           i;
2394     nxt_http_app_conf_t  *conf;
2395 
2396     app = nxt_router_apps_hash_get(rtcf, name);
2397     if (app == NULL) {
2398         return NXT_DECLINED;
2399     }
2400 
2401     conf = nxt_mp_get(rtcf->mem_pool, sizeof(nxt_http_app_conf_t));
2402     if (nxt_slow_path(conf == NULL)) {
2403         return NXT_ERROR;
2404     }
2405 
2406     action->handler = nxt_http_application_handler;
2407     action->u.conf = conf;
2408 
2409     conf->app = app;
2410 
2411     if (target != NULL && target->length != 0) {
2412         targets = app->targets;
2413 
2414         for (i = 0; !nxt_strstr_eq(target, &targets[i]); i++);
2415 
2416         conf->target = i;
2417 
2418     } else {
2419         conf->target = 0;
2420     }
2421 
2422     return NXT_OK;
2423 }
2424 
2425 
2426 static nxt_socket_conf_t *
2427 nxt_router_socket_conf(nxt_task_t *task, nxt_router_temp_conf_t *tmcf,
2428     nxt_str_t *name)
2429 {
2430     size_t               size;
2431     nxt_int_t            ret;
2432     nxt_bool_t           wildcard;
2433     nxt_sockaddr_t       *sa;
2434     nxt_socket_conf_t    *skcf;
2435     nxt_listen_socket_t  *ls;
2436 
2437     sa = nxt_sockaddr_parse(tmcf->mem_pool, name);
2438     if (nxt_slow_path(sa == NULL)) {
2439         nxt_alert(task, "invalid listener \"%V\"", name);
2440         return NULL;
2441     }
2442 
2443     sa->type = SOCK_STREAM;
2444 
2445     nxt_debug(task, "router listener: \"%*s\"",
2446               (size_t) sa->length, nxt_sockaddr_start(sa));
2447 
2448     skcf = nxt_mp_zget(tmcf->router_conf->mem_pool, sizeof(nxt_socket_conf_t));
2449     if (nxt_slow_path(skcf == NULL)) {
2450         return NULL;
2451     }
2452 
2453     size = nxt_sockaddr_size(sa);
2454 
2455     ret = nxt_router_listen_socket_find(tmcf, skcf, sa);
2456 
2457     if (ret != NXT_OK) {
2458 
2459         ls = nxt_zalloc(sizeof(nxt_listen_socket_t) + size);
2460         if (nxt_slow_path(ls == NULL)) {
2461             return NULL;
2462         }
2463 
2464         skcf->listen = ls;
2465 
2466         ls->sockaddr = nxt_pointer_to(ls, sizeof(nxt_listen_socket_t));
2467         nxt_memcpy(ls->sockaddr, sa, size);
2468 
2469         nxt_listen_socket_remote_size(ls);
2470 
2471         ls->socket = -1;
2472         ls->backlog = NXT_LISTEN_BACKLOG;
2473         ls->flags = NXT_NONBLOCK;
2474         ls->read_after_accept = 1;
2475     }
2476 
2477     switch (sa->u.sockaddr.sa_family) {
2478 #if (NXT_HAVE_UNIX_DOMAIN)
2479     case AF_UNIX:
2480         wildcard = 0;
2481         break;
2482 #endif
2483 #if (NXT_INET6)
2484     case AF_INET6:
2485         wildcard = IN6_IS_ADDR_UNSPECIFIED(&sa->u.sockaddr_in6.sin6_addr);
2486         break;
2487 #endif
2488     case AF_INET:
2489     default:
2490         wildcard = (sa->u.sockaddr_in.sin_addr.s_addr == INADDR_ANY);
2491         break;
2492     }
2493 
2494     if (!wildcard) {
2495         skcf->sockaddr = nxt_mp_zget(tmcf->router_conf->mem_pool, size);
2496         if (nxt_slow_path(skcf->sockaddr == NULL)) {
2497             return NULL;
2498         }
2499 
2500         nxt_memcpy(skcf->sockaddr, sa, size);
2501     }
2502 
2503     return skcf;
2504 }
2505 
2506 
2507 static nxt_int_t
2508 nxt_router_listen_socket_find(nxt_router_temp_conf_t *tmcf,
2509     nxt_socket_conf_t *nskcf, nxt_sockaddr_t *sa)
2510 {
2511     nxt_router_t       *router;
2512     nxt_queue_link_t   *qlk;
2513     nxt_socket_conf_t  *skcf;
2514 
2515     router = tmcf->router_conf->router;
2516 
2517     for (qlk = nxt_queue_first(&router->sockets);
2518          qlk != nxt_queue_tail(&router->sockets);
2519          qlk = nxt_queue_next(qlk))
2520     {
2521         skcf = nxt_queue_link_data(qlk, nxt_socket_conf_t, link);
2522 
2523         if (nxt_sockaddr_cmp(skcf->listen->sockaddr, sa)) {
2524             nskcf->listen = skcf->listen;
2525 
2526             nxt_queue_remove(qlk);
2527             nxt_queue_insert_tail(&keeping_sockets, qlk);
2528 
2529             nxt_queue_insert_tail(&updating_sockets, &nskcf->link);
2530 
2531             return NXT_OK;
2532         }
2533     }
2534 
2535     nxt_queue_insert_tail(&pending_sockets, &nskcf->link);
2536 
2537     return NXT_DECLINED;
2538 }
2539 
2540 
2541 static void
2542 nxt_router_listen_socket_rpc_create(nxt_task_t *task,
2543     nxt_router_temp_conf_t *tmcf, nxt_socket_conf_t *skcf)
2544 {
2545     size_t            size;
2546     uint32_t          stream;
2547     nxt_int_t         ret;
2548     nxt_buf_t         *b;
2549     nxt_port_t        *main_port, *router_port;
2550     nxt_runtime_t     *rt;
2551     nxt_socket_rpc_t  *rpc;
2552 
2553     rpc = nxt_mp_alloc(tmcf->mem_pool, sizeof(nxt_socket_rpc_t));
2554     if (rpc == NULL) {
2555         goto fail;
2556     }
2557 
2558     rpc->socket_conf = skcf;
2559     rpc->temp_conf = tmcf;
2560 
2561     size = nxt_sockaddr_size(skcf->listen->sockaddr);
2562 
2563     b = nxt_buf_mem_alloc(tmcf->mem_pool, size, 0);
2564     if (b == NULL) {
2565         goto fail;
2566     }
2567 
2568     b->completion_handler = nxt_buf_dummy_completion;
2569 
2570     b->mem.free = nxt_cpymem(b->mem.free, skcf->listen->sockaddr, size);
2571 
2572     rt = task->thread->runtime;
2573     main_port = rt->port_by_type[NXT_PROCESS_MAIN];
2574     router_port = rt->port_by_type[NXT_PROCESS_ROUTER];
2575 
2576     stream = nxt_port_rpc_register_handler(task, router_port,
2577                                            nxt_router_listen_socket_ready,
2578                                            nxt_router_listen_socket_error,
2579                                            main_port->pid, rpc);
2580     if (nxt_slow_path(stream == 0)) {
2581         goto fail;
2582     }
2583 
2584     ret = nxt_port_socket_write(task, main_port, NXT_PORT_MSG_SOCKET, -1,
2585                                 stream, router_port->id, b);
2586 
2587     if (nxt_slow_path(ret != NXT_OK)) {
2588         nxt_port_rpc_cancel(task, router_port, stream);
2589         goto fail;
2590     }
2591 
2592     return;
2593 
2594 fail:
2595 
2596     nxt_router_conf_error(task, tmcf);
2597 }
2598 
2599 
2600 static void
2601 nxt_router_listen_socket_ready(nxt_task_t *task, nxt_port_recv_msg_t *msg,
2602     void *data)
2603 {
2604     nxt_int_t         ret;
2605     nxt_socket_t      s;
2606     nxt_socket_rpc_t  *rpc;
2607 
2608     rpc = data;
2609 
2610     s = msg->fd[0];
2611 
2612     ret = nxt_socket_nonblocking(task, s);
2613     if (nxt_slow_path(ret != NXT_OK)) {
2614         goto fail;
2615     }
2616 
2617     nxt_socket_defer_accept(task, s, rpc->socket_conf->listen->sockaddr);
2618 
2619     ret = nxt_listen_socket(task, s, NXT_LISTEN_BACKLOG);
2620     if (nxt_slow_path(ret != NXT_OK)) {
2621         goto fail;
2622     }
2623 
2624     rpc->socket_conf->listen->socket = s;
2625 
2626     nxt_work_queue_add(&task->thread->engine->fast_work_queue,
2627                        nxt_router_conf_apply, task, rpc->temp_conf, NULL);
2628 
2629     return;
2630 
2631 fail:
2632 
2633     nxt_socket_close(task, s);
2634 
2635     nxt_router_conf_error(task, rpc->temp_conf);
2636 }
2637 
2638 
2639 static void
2640 nxt_router_listen_socket_error(nxt_task_t *task, nxt_port_recv_msg_t *msg,
2641     void *data)
2642 {
2643     nxt_socket_rpc_t        *rpc;
2644     nxt_router_temp_conf_t  *tmcf;
2645 
2646     rpc = data;
2647     tmcf = rpc->temp_conf;
2648 
2649 #if 0
2650     u_char                  *p;
2651     size_t                  size;
2652     uint8_t                 error;
2653     nxt_buf_t               *in, *out;
2654     nxt_sockaddr_t          *sa;
2655 
2656     static nxt_str_t  socket_errors[] = {
2657         nxt_string("ListenerSystem"),
2658         nxt_string("ListenerNoIPv6"),
2659         nxt_string("ListenerPort"),
2660         nxt_string("ListenerInUse"),
2661         nxt_string("ListenerNoAddress"),
2662         nxt_string("ListenerNoAccess"),
2663         nxt_string("ListenerPath"),
2664     };
2665 
2666     sa = rpc->socket_conf->listen->sockaddr;
2667 
2668     in = nxt_buf_chk_make_plain(tmcf->mem_pool, msg->buf, msg->size);
2669 
2670     if (nxt_slow_path(in == NULL)) {
2671         return;
2672     }
2673 
2674     p = in->mem.pos;
2675 
2676     error = *p++;
2677 
2678     size = nxt_length("listen socket error: ")
2679            + nxt_length("{listener: \"\", code:\"\", message: \"\"}")
2680            + sa->length + socket_errors[error].length + (in->mem.free - p);
2681 
2682     out = nxt_buf_mem_alloc(tmcf->mem_pool, size, 0);
2683     if (nxt_slow_path(out == NULL)) {
2684         return;
2685     }
2686 
2687     out->mem.free = nxt_sprintf(out->mem.free, out->mem.end,
2688                         "listen socket error: "
2689                         "{listener: \"%*s\", code:\"%V\", message: \"%*s\"}",
2690                         (size_t) sa->length, nxt_sockaddr_start(sa),
2691                         &socket_errors[error], in->mem.free - p, p);
2692 
2693     nxt_debug(task, "%*s", out->mem.free - out->mem.pos, out->mem.pos);
2694 #endif
2695 
2696     nxt_router_conf_error(task, tmcf);
2697 }
2698 
2699 
2700 #if (NXT_TLS)
2701 
2702 static void
2703 nxt_router_tls_rpc_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg,
2704     void *data)
2705 {
2706     nxt_mp_t                *mp;
2707     nxt_int_t               ret;
2708     nxt_tls_conf_t          *tlscf;
2709     nxt_router_tlssock_t    *tls;
2710     nxt_tls_bundle_conf_t   *bundle;
2711     nxt_router_temp_conf_t  *tmcf;
2712 
2713     nxt_debug(task, "tls rpc handler");
2714 
2715     tls = data;
2716     tmcf = tls->temp_conf;
2717 
2718     if (msg == NULL || msg->port_msg.type == _NXT_PORT_MSG_RPC_ERROR) {
2719         goto fail;
2720     }
2721 
2722     mp = tmcf->router_conf->mem_pool;
2723 
2724     if (tls->socket_conf->tls == NULL){
2725         tlscf = nxt_mp_zget(mp, sizeof(nxt_tls_conf_t));
2726         if (nxt_slow_path(tlscf == NULL)) {
2727             goto fail;
2728         }
2729 
2730         tlscf->no_wait_shutdown = 1;
2731         tls->socket_conf->tls = tlscf;
2732 
2733     } else {
2734         tlscf = tls->socket_conf->tls;
2735     }
2736 
2737     tls->tls_init->conf = tlscf;
2738 
2739     bundle = nxt_mp_get(mp, sizeof(nxt_tls_bundle_conf_t));
2740     if (nxt_slow_path(bundle == NULL)) {
2741         goto fail;
2742     }
2743 
2744     if (nxt_slow_path(nxt_str_dup(mp, &bundle->name, &tls->name) == NULL)) {
2745         goto fail;
2746     }
2747 
2748     bundle->chain_file = msg->fd[0];
2749     bundle->next = tlscf->bundle;
2750     tlscf->bundle = bundle;
2751 
2752     ret = task->thread->runtime->tls->server_init(task, mp, tls->tls_init,
2753                                                   tls->last);
2754     if (nxt_slow_path(ret != NXT_OK)) {
2755         goto fail;
2756     }
2757 
2758     nxt_work_queue_add(&task->thread->engine->fast_work_queue,
2759                        nxt_router_conf_apply, task, tmcf, NULL);
2760     return;
2761 
2762 fail:
2763 
2764     nxt_router_conf_error(task, tmcf);
2765 }
2766 
2767 #endif
2768 
2769 
2770 static void
2771 nxt_router_app_rpc_create(nxt_task_t *task,
2772     nxt_router_temp_conf_t *tmcf, nxt_app_t *app)
2773 {
2774     size_t         size;
2775     uint32_t       stream;
2776     nxt_fd_t       port_fd, queue_fd;
2777     nxt_int_t      ret;
2778     nxt_buf_t      *b;
2779     nxt_port_t     *router_port, *dport;
2780     nxt_runtime_t  *rt;
2781     nxt_app_rpc_t  *rpc;
2782 
2783     rt = task->thread->runtime;
2784 
2785     dport = app->proto_port;
2786 
2787     if (dport == NULL) {
2788         nxt_debug(task, "app '%V' prototype prefork", &app->name);
2789 
2790         size = app->name.length + 1 + app->conf.length;
2791 
2792         b = nxt_buf_mem_alloc(tmcf->mem_pool, size, 0);
2793         if (nxt_slow_path(b == NULL)) {
2794             goto fail;
2795         }
2796 
2797         b->completion_handler = nxt_buf_dummy_completion;
2798 
2799         nxt_buf_cpystr(b, &app->name);
2800         *b->mem.free++ = '\0';
2801         nxt_buf_cpystr(b, &app->conf);
2802 
2803         dport = rt->port_by_type[NXT_PROCESS_MAIN];
2804 
2805         port_fd = app->shared_port->pair[0];
2806         queue_fd = app->shared_port->queue_fd;
2807 
2808     } else {
2809         nxt_debug(task, "app '%V' prefork", &app->name);
2810 
2811         b = NULL;
2812         port_fd = -1;
2813         queue_fd = -1;
2814     }
2815 
2816     router_port = rt->port_by_type[NXT_PROCESS_ROUTER];
2817 
2818     rpc = nxt_port_rpc_register_handler_ex(task, router_port,
2819                                            nxt_router_app_prefork_ready,
2820                                            nxt_router_app_prefork_error,
2821                                            sizeof(nxt_app_rpc_t));
2822     if (nxt_slow_path(rpc == NULL)) {
2823         goto fail;
2824     }
2825 
2826     rpc->app = app;
2827     rpc->temp_conf = tmcf;
2828     rpc->proto = (b != NULL);
2829 
2830     stream = nxt_port_rpc_ex_stream(rpc);
2831 
2832     ret = nxt_port_socket_write2(task, dport, NXT_PORT_MSG_START_PROCESS,
2833                                  port_fd, queue_fd, stream, router_port->id, b);
2834     if (nxt_slow_path(ret != NXT_OK)) {
2835         nxt_port_rpc_cancel(task, router_port, stream);
2836         goto fail;
2837     }
2838 
2839     if (b == NULL) {
2840         nxt_port_rpc_ex_set_peer(task, router_port, rpc, dport->pid);
2841 
2842         app->pending_processes++;
2843     }
2844 
2845     return;
2846 
2847 fail:
2848 
2849     nxt_router_conf_error(task, tmcf);
2850 }
2851 
2852 
2853 static void
2854 nxt_router_app_prefork_ready(nxt_task_t *task, nxt_port_recv_msg_t *msg,
2855     void *data)
2856 {
2857     nxt_app_t           *app;
2858     nxt_port_t          *port;
2859     nxt_app_rpc_t       *rpc;
2860     nxt_event_engine_t  *engine;
2861 
2862     rpc = data;
2863     app = rpc->app;
2864 
2865     port = msg->u.new_port;
2866 
2867     nxt_assert(port != NULL);
2868     nxt_assert(port->id == 0);
2869 
2870     if (rpc->proto) {
2871         nxt_assert(app->proto_port == NULL);
2872         nxt_assert(port->type == NXT_PROCESS_PROTOTYPE);
2873 
2874         nxt_port_inc_use(port);
2875 
2876         app->proto_port = port;
2877         port->app = app;
2878 
2879         nxt_router_app_rpc_create(task, rpc->temp_conf, app);
2880 
2881         return;
2882     }
2883 
2884     nxt_assert(port->type == NXT_PROCESS_APP);
2885 
2886     port->app = app;
2887     port->main_app_port = port;
2888 
2889     app->pending_processes--;
2890     app->processes++;
2891     app->idle_processes++;
2892 
2893     engine = task->thread->engine;
2894 
2895     nxt_queue_insert_tail(&app->ports, &port->app_link);
2896     nxt_queue_insert_tail(&app->spare_ports, &port->idle_link);
2897 
2898     nxt_debug(task, "app '%V' move new port %PI:%d to spare_ports",
2899               &app->name, port->pid, port->id);
2900 
2901     nxt_port_hash_add(&app->port_hash, port);
2902     app->port_hash_count++;
2903 
2904     port->idle_start = 0;
2905 
2906     nxt_port_inc_use(port);
2907 
2908     nxt_port_socket_write(task, port, NXT_PORT_MSG_PORT_ACK, -1, 0, 0, NULL);
2909 
2910     nxt_work_queue_add(&engine->fast_work_queue,
2911                        nxt_router_conf_apply, task, rpc->temp_conf, NULL);
2912 }
2913 
2914 
2915 static void
2916 nxt_router_app_prefork_error(nxt_task_t *task, nxt_port_recv_msg_t *msg,
2917     void *data)
2918 {
2919     nxt_app_t               *app;
2920     nxt_app_rpc_t           *rpc;
2921     nxt_router_temp_conf_t  *tmcf;
2922 
2923     rpc = data;
2924     app = rpc->app;
2925     tmcf = rpc->temp_conf;
2926 
2927     if (rpc->proto) {
2928         nxt_log(task, NXT_LOG_WARN, "failed to start prototype \"%V\"",
2929                 &app->name);
2930 
2931     } else {
2932         nxt_log(task, NXT_LOG_WARN, "failed to start application \"%V\"",
2933                 &app->name);
2934 
2935         app->pending_processes--;
2936     }
2937 
2938     nxt_router_conf_error(task, tmcf);
2939 }
2940 
2941 
2942 static nxt_int_t
2943 nxt_router_engines_create(nxt_task_t *task, nxt_router_t *router,
2944     nxt_router_temp_conf_t *tmcf, const nxt_event_interface_t *interface)
2945 {
2946     nxt_int_t                 ret;
2947     nxt_uint_t                n, threads;
2948     nxt_queue_link_t          *qlk;
2949     nxt_router_engine_conf_t  *recf;
2950 
2951     threads = tmcf->router_conf->threads;
2952 
2953     tmcf->engines = nxt_array_create(tmcf->mem_pool, threads,
2954                                      sizeof(nxt_router_engine_conf_t));
2955     if (nxt_slow_path(tmcf->engines == NULL)) {
2956         return NXT_ERROR;
2957     }
2958 
2959     n = 0;
2960 
2961     for (qlk = nxt_queue_first(&router->engines);
2962          qlk != nxt_queue_tail(&router->engines);
2963          qlk = nxt_queue_next(qlk))
2964     {
2965         recf = nxt_array_zero_add(tmcf->engines);
2966         if (nxt_slow_path(recf == NULL)) {
2967             return NXT_ERROR;
2968         }
2969 
2970         recf->engine = nxt_queue_link_data(qlk, nxt_event_engine_t, link0);
2971 
2972         if (n < threads) {
2973             recf->action = NXT_ROUTER_ENGINE_KEEP;
2974             ret = nxt_router_engine_conf_update(tmcf, recf);
2975 
2976         } else {
2977             recf->action = NXT_ROUTER_ENGINE_DELETE;
2978             ret = nxt_router_engine_conf_delete(tmcf, recf);
2979         }
2980 
2981         if (nxt_slow_path(ret != NXT_OK)) {
2982             return ret;
2983         }
2984 
2985         n++;
2986     }
2987 
2988     tmcf->new_threads = n;
2989 
2990     while (n < threads) {
2991         recf = nxt_array_zero_add(tmcf->engines);
2992         if (nxt_slow_path(recf == NULL)) {
2993             return NXT_ERROR;
2994         }
2995 
2996         recf->action = NXT_ROUTER_ENGINE_ADD;
2997 
2998         recf->engine = nxt_event_engine_create(task, interface, NULL, 0, 0);
2999         if (nxt_slow_path(recf->engine == NULL)) {
3000             return NXT_ERROR;
3001         }
3002 
3003         ret = nxt_router_engine_conf_create(tmcf, recf);
3004         if (nxt_slow_path(ret != NXT_OK)) {
3005             return ret;
3006         }
3007 
3008         n++;
3009     }
3010 
3011     return NXT_OK;
3012 }
3013 
3014 
3015 static nxt_int_t
3016 nxt_router_engine_conf_create(nxt_router_temp_conf_t *tmcf,
3017     nxt_router_engine_conf_t *recf)
3018 {
3019     nxt_int_t  ret;
3020 
3021     ret = nxt_router_engine_joints_create(tmcf, recf, &creating_sockets,
3022                                           nxt_router_listen_socket_create);
3023     if (nxt_slow_path(ret != NXT_OK)) {
3024         return ret;
3025     }
3026 
3027     ret = nxt_router_engine_joints_create(tmcf, recf, &updating_sockets,
3028                                           nxt_router_listen_socket_create);
3029     if (nxt_slow_path(ret != NXT_OK)) {
3030         return ret;
3031     }
3032 
3033     return ret;
3034 }
3035 
3036 
3037 static nxt_int_t
3038 nxt_router_engine_conf_update(nxt_router_temp_conf_t *tmcf,
3039     nxt_router_engine_conf_t *recf)
3040 {
3041     nxt_int_t  ret;
3042 
3043     ret = nxt_router_engine_joints_create(tmcf, recf, &creating_sockets,
3044                                           nxt_router_listen_socket_create);
3045     if (nxt_slow_path(ret != NXT_OK)) {
3046         return ret;
3047     }
3048 
3049     ret = nxt_router_engine_joints_create(tmcf, recf, &updating_sockets,
3050                                           nxt_router_listen_socket_update);
3051     if (nxt_slow_path(ret != NXT_OK)) {
3052         return ret;
3053     }
3054 
3055     ret = nxt_router_engine_joints_delete(tmcf, recf, &deleting_sockets);
3056     if (nxt_slow_path(ret != NXT_OK)) {
3057         return ret;
3058     }
3059 
3060     return ret;
3061 }
3062 
3063 
3064 static nxt_int_t
3065 nxt_router_engine_conf_delete(nxt_router_temp_conf_t *tmcf,
3066     nxt_router_engine_conf_t *recf)
3067 {
3068     nxt_int_t  ret;
3069 
3070     ret = nxt_router_engine_quit(tmcf, recf);
3071     if (nxt_slow_path(ret != NXT_OK)) {
3072         return ret;
3073     }
3074 
3075     ret = nxt_router_engine_joints_delete(tmcf, recf, &updating_sockets);
3076     if (nxt_slow_path(ret != NXT_OK)) {
3077         return ret;
3078     }
3079 
3080     return nxt_router_engine_joints_delete(tmcf, recf, &deleting_sockets);
3081 }
3082 
3083 
3084 static nxt_int_t
3085 nxt_router_engine_joints_create(nxt_router_temp_conf_t *tmcf,
3086     nxt_router_engine_conf_t *recf, nxt_queue_t *sockets,
3087     nxt_work_handler_t handler)
3088 {
3089     nxt_int_t                ret;
3090     nxt_joint_job_t          *job;
3091     nxt_queue_link_t         *qlk;
3092     nxt_socket_conf_t        *skcf;
3093     nxt_socket_conf_joint_t  *joint;
3094 
3095     for (qlk = nxt_queue_first(sockets);
3096          qlk != nxt_queue_tail(sockets);
3097          qlk = nxt_queue_next(qlk))
3098     {
3099         job = nxt_mp_get(tmcf->mem_pool, sizeof(nxt_joint_job_t));
3100         if (nxt_slow_path(job == NULL)) {
3101             return NXT_ERROR;
3102         }
3103 
3104         job->work.next = recf->jobs;
3105         recf->jobs = &job->work;
3106 
3107         job->task = tmcf->engine->task;
3108         job->work.handler = handler;
3109         job->work.task = &job->task;
3110         job->work.obj = job;
3111         job->tmcf = tmcf;
3112 
3113         tmcf->count++;
3114 
3115         joint = nxt_mp_alloc(tmcf->router_conf->mem_pool,
3116                              sizeof(nxt_socket_conf_joint_t));
3117         if (nxt_slow_path(joint == NULL)) {
3118             return NXT_ERROR;
3119         }
3120 
3121         job->work.data = joint;
3122 
3123         ret = nxt_upstreams_joint_create(tmcf, &joint->upstreams);
3124         if (nxt_slow_path(ret != NXT_OK)) {
3125             return ret;
3126         }
3127 
3128         joint->count = 1;
3129 
3130         skcf = nxt_queue_link_data(qlk, nxt_socket_conf_t, link);
3131         skcf->count++;
3132         joint->socket_conf = skcf;
3133 
3134         joint->engine = recf->engine;
3135     }
3136 
3137     return NXT_OK;
3138 }
3139 
3140 
3141 static nxt_int_t
3142 nxt_router_engine_quit(nxt_router_temp_conf_t *tmcf,
3143     nxt_router_engine_conf_t *recf)
3144 {
3145     nxt_joint_job_t  *job;
3146 
3147     job = nxt_mp_get(tmcf->mem_pool, sizeof(nxt_joint_job_t));
3148     if (nxt_slow_path(job == NULL)) {
3149         return NXT_ERROR;
3150     }
3151 
3152     job->work.next = recf->jobs;
3153     recf->jobs = &job->work;
3154 
3155     job->task = tmcf->engine->task;
3156     job->work.handler = nxt_router_worker_thread_quit;
3157     job->work.task = &job->task;
3158     job->work.obj = NULL;
3159     job->work.data = NULL;
3160     job->tmcf = NULL;
3161 
3162     return NXT_OK;
3163 }
3164 
3165 
3166 static nxt_int_t
3167 nxt_router_engine_joints_delete(