xref: /unit/src/nxt_router.c (revision 2450:14277f21a722)
1 
2 /*
3  * Copyright (C) Igor Sysoev
4  * Copyright (C) Valentin V. Bartenev
5  * Copyright (C) NGINX, Inc.
6  */
7 
8 #include <nxt_router.h>
9 #include <nxt_conf.h>
10 #include <nxt_status.h>
11 #if (NXT_TLS)
12 #include <nxt_cert.h>
13 #endif
14 #if (NXT_HAVE_NJS)
15 #include <nxt_script.h>
16 #endif
17 #include <nxt_http.h>
18 #include <nxt_port_memory_int.h>
19 #include <nxt_unit_request.h>
20 #include <nxt_unit_response.h>
21 #include <nxt_router_request.h>
22 #include <nxt_app_queue.h>
23 #include <nxt_port_queue.h>
24 
25 #define NXT_SHARED_PORT_ID  0xFFFFu
26 
27 typedef struct {
28     nxt_str_t         type;
29     uint32_t          processes;
30     uint32_t          max_processes;
31     uint32_t          spare_processes;
32     nxt_msec_t        timeout;
33     nxt_msec_t        idle_timeout;
34     nxt_conf_value_t  *limits_value;
35     nxt_conf_value_t  *processes_value;
36     nxt_conf_value_t  *targets_value;
37 } nxt_router_app_conf_t;
38 
39 
40 typedef struct {
41     nxt_str_t         pass;
42     nxt_str_t         application;
43 } nxt_router_listener_conf_t;
44 
45 
46 #if (NXT_TLS)
47 
48 typedef struct {
49     nxt_str_t               name;
50     nxt_socket_conf_t       *socket_conf;
51     nxt_router_temp_conf_t  *temp_conf;
52     nxt_tls_init_t          *tls_init;
53     nxt_bool_t              last;
54 
55     nxt_queue_link_t        link;  /* for nxt_socket_conf_t.tls */
56 } nxt_router_tlssock_t;
57 
58 #endif
59 
60 
61 #if (NXT_HAVE_NJS)
62 
63 typedef struct {
64     nxt_str_t               name;
65     nxt_router_temp_conf_t  *temp_conf;
66     nxt_queue_link_t        link;
67 } nxt_router_js_module_t;
68 
69 #endif
70 
71 
72 typedef struct {
73     nxt_str_t               *name;
74     nxt_socket_conf_t       *socket_conf;
75     nxt_router_temp_conf_t  *temp_conf;
76     nxt_bool_t              last;
77 } nxt_socket_rpc_t;
78 
79 
80 typedef struct {
81     nxt_app_t               *app;
82     nxt_router_temp_conf_t  *temp_conf;
83     uint8_t                 proto;  /* 1 bit */
84 } nxt_app_rpc_t;
85 
86 
87 typedef struct {
88     nxt_app_joint_t         *app_joint;
89     uint32_t                generation;
90     uint8_t                 proto;  /* 1 bit */
91 } nxt_app_joint_rpc_t;
92 
93 
94 static nxt_int_t nxt_router_prefork(nxt_task_t *task, nxt_process_t *process,
95     nxt_mp_t *mp);
96 static nxt_int_t nxt_router_start(nxt_task_t *task, nxt_process_data_t *data);
97 static void nxt_router_greet_controller(nxt_task_t *task,
98     nxt_port_t *controller_port);
99 
100 static nxt_int_t nxt_router_start_app_process(nxt_task_t *task, nxt_app_t *app);
101 
102 static void nxt_router_new_port_handler(nxt_task_t *task,
103     nxt_port_recv_msg_t *msg);
104 static void nxt_router_conf_data_handler(nxt_task_t *task,
105     nxt_port_recv_msg_t *msg);
106 static void nxt_router_app_restart_handler(nxt_task_t *task,
107     nxt_port_recv_msg_t *msg);
108 static void nxt_router_status_handler(nxt_task_t *task,
109     nxt_port_recv_msg_t *msg);
110 static void nxt_router_remove_pid_handler(nxt_task_t *task,
111     nxt_port_recv_msg_t *msg);
112 
113 static nxt_router_temp_conf_t *nxt_router_temp_conf(nxt_task_t *task);
114 static void nxt_router_conf_ready(nxt_task_t *task,
115     nxt_router_temp_conf_t *tmcf);
116 static void nxt_router_conf_send(nxt_task_t *task,
117     nxt_router_temp_conf_t *tmcf, nxt_port_msg_type_t type);
118 
119 static nxt_int_t nxt_router_conf_create(nxt_task_t *task,
120     nxt_router_temp_conf_t *tmcf, u_char *start, u_char *end);
121 static nxt_int_t nxt_router_conf_process_static(nxt_task_t *task,
122     nxt_router_conf_t *rtcf, nxt_conf_value_t *conf);
123 static nxt_http_forward_t *nxt_router_conf_forward(nxt_task_t *task,
124     nxt_mp_t *mp, nxt_conf_value_t *conf);
125 static nxt_int_t nxt_router_conf_forward_header(nxt_mp_t *mp,
126     nxt_conf_value_t *conf, nxt_http_forward_header_t *fh);
127 
128 static nxt_app_t *nxt_router_app_find(nxt_queue_t *queue, nxt_str_t *name);
129 static nxt_int_t nxt_router_apps_hash_test(nxt_lvlhsh_query_t *lhq, void *data);
130 static nxt_int_t nxt_router_apps_hash_add(nxt_router_conf_t *rtcf,
131     nxt_app_t *app);
132 static nxt_app_t *nxt_router_apps_hash_get(nxt_router_conf_t *rtcf,
133     nxt_str_t *name);
134 static void nxt_router_apps_hash_use(nxt_task_t *task, nxt_router_conf_t *rtcf,
135     int i);
136 
137 static nxt_int_t nxt_router_app_queue_init(nxt_task_t *task,
138     nxt_port_t *port);
139 static nxt_int_t nxt_router_port_queue_init(nxt_task_t *task,
140     nxt_port_t *port);
141 static nxt_int_t nxt_router_port_queue_map(nxt_task_t *task,
142     nxt_port_t *port, nxt_fd_t fd);
143 static void nxt_router_listen_socket_rpc_create(nxt_task_t *task,
144     nxt_router_temp_conf_t *tmcf, nxt_socket_conf_t *skcf);
145 static void nxt_router_listen_socket_ready(nxt_task_t *task,
146     nxt_port_recv_msg_t *msg, void *data);
147 static void nxt_router_listen_socket_error(nxt_task_t *task,
148     nxt_port_recv_msg_t *msg, void *data);
149 #if (NXT_TLS)
150 static void nxt_router_tls_rpc_handler(nxt_task_t *task,
151     nxt_port_recv_msg_t *msg, void *data);
152 static nxt_int_t nxt_router_conf_tls_insert(nxt_router_temp_conf_t *tmcf,
153     nxt_conf_value_t *value, nxt_socket_conf_t *skcf, nxt_tls_init_t *tls_init,
154     nxt_bool_t last);
155 #endif
156 #if (NXT_HAVE_NJS)
157 static void nxt_router_js_module_rpc_handler(nxt_task_t *task,
158     nxt_port_recv_msg_t *msg, void *data);
159 static nxt_int_t nxt_router_js_module_insert(nxt_router_temp_conf_t *tmcf,
160     nxt_conf_value_t *value);
161 #endif
162 static void nxt_router_app_rpc_create(nxt_task_t *task,
163     nxt_router_temp_conf_t *tmcf, nxt_app_t *app);
164 static void nxt_router_app_prefork_ready(nxt_task_t *task,
165     nxt_port_recv_msg_t *msg, void *data);
166 static void nxt_router_app_prefork_error(nxt_task_t *task,
167     nxt_port_recv_msg_t *msg, void *data);
168 static nxt_socket_conf_t *nxt_router_socket_conf(nxt_task_t *task,
169     nxt_router_temp_conf_t *tmcf, nxt_str_t *name);
170 static nxt_int_t nxt_router_listen_socket_find(nxt_router_temp_conf_t *tmcf,
171     nxt_socket_conf_t *nskcf, nxt_sockaddr_t *sa);
172 
173 static nxt_int_t nxt_router_engines_create(nxt_task_t *task,
174     nxt_router_t *router, nxt_router_temp_conf_t *tmcf,
175     const nxt_event_interface_t *interface);
176 static nxt_int_t nxt_router_engine_conf_create(nxt_router_temp_conf_t *tmcf,
177     nxt_router_engine_conf_t *recf);
178 static nxt_int_t nxt_router_engine_conf_update(nxt_router_temp_conf_t *tmcf,
179     nxt_router_engine_conf_t *recf);
180 static nxt_int_t nxt_router_engine_conf_delete(nxt_router_temp_conf_t *tmcf,
181     nxt_router_engine_conf_t *recf);
182 static nxt_int_t nxt_router_engine_joints_create(nxt_router_temp_conf_t *tmcf,
183     nxt_router_engine_conf_t *recf, nxt_queue_t *sockets,
184     nxt_work_handler_t handler);
185 static nxt_int_t nxt_router_engine_quit(nxt_router_temp_conf_t *tmcf,
186     nxt_router_engine_conf_t *recf);
187 static nxt_int_t nxt_router_engine_joints_delete(nxt_router_temp_conf_t *tmcf,
188     nxt_router_engine_conf_t *recf, nxt_queue_t *sockets);
189 
190 static nxt_int_t nxt_router_threads_create(nxt_task_t *task, nxt_runtime_t *rt,
191     nxt_router_temp_conf_t *tmcf);
192 static nxt_int_t nxt_router_thread_create(nxt_task_t *task, nxt_runtime_t *rt,
193     nxt_event_engine_t *engine);
194 static void nxt_router_apps_sort(nxt_task_t *task, nxt_router_t *router,
195     nxt_router_temp_conf_t *tmcf);
196 
197 static void nxt_router_engines_post(nxt_router_t *router,
198     nxt_router_temp_conf_t *tmcf);
199 static void nxt_router_engine_post(nxt_event_engine_t *engine,
200     nxt_work_t *jobs);
201 
202 static void nxt_router_thread_start(void *data);
203 static void nxt_router_rt_add_port(nxt_task_t *task, void *obj,
204     void *data);
205 static void nxt_router_listen_socket_create(nxt_task_t *task, void *obj,
206     void *data);
207 static void nxt_router_listen_socket_update(nxt_task_t *task, void *obj,
208     void *data);
209 static void nxt_router_listen_socket_delete(nxt_task_t *task, void *obj,
210     void *data);
211 static void nxt_router_worker_thread_quit(nxt_task_t *task, void *obj,
212     void *data);
213 static void nxt_router_listen_socket_close(nxt_task_t *task, void *obj,
214     void *data);
215 static void nxt_router_thread_exit_handler(nxt_task_t *task, void *obj,
216     void *data);
217 static void nxt_router_req_headers_ack_handler(nxt_task_t *task,
218     nxt_port_recv_msg_t *msg, nxt_request_rpc_data_t *req_rpc_data);
219 static void nxt_router_listen_socket_release(nxt_task_t *task,
220     nxt_socket_conf_t *skcf);
221 
222 static void nxt_router_app_port_ready(nxt_task_t *task,
223     nxt_port_recv_msg_t *msg, void *data);
224 static void nxt_router_app_port_error(nxt_task_t *task,
225     nxt_port_recv_msg_t *msg, void *data);
226 
227 static void nxt_router_app_use(nxt_task_t *task, nxt_app_t *app, int i);
228 static void nxt_router_app_unlink(nxt_task_t *task, nxt_app_t *app);
229 
230 static void nxt_router_app_port_release(nxt_task_t *task, nxt_app_t *app,
231     nxt_port_t *port, nxt_apr_action_t action);
232 static void nxt_router_app_port_get(nxt_task_t *task, nxt_app_t *app,
233     nxt_request_rpc_data_t *req_rpc_data);
234 static void nxt_router_http_request_error(nxt_task_t *task, void *obj,
235     void *data);
236 static void nxt_router_http_request_done(nxt_task_t *task, void *obj,
237     void *data);
238 
239 static void nxt_router_app_prepare_request(nxt_task_t *task,
240     nxt_request_rpc_data_t *req_rpc_data);
241 static nxt_buf_t *nxt_router_prepare_msg(nxt_task_t *task,
242     nxt_http_request_t *r, nxt_app_t *app, const nxt_str_t *prefix);
243 
244 static void nxt_router_app_timeout(nxt_task_t *task, void *obj, void *data);
245 static void nxt_router_adjust_idle_timer(nxt_task_t *task, void *obj,
246     void *data);
247 static void nxt_router_app_idle_timeout(nxt_task_t *task, void *obj,
248     void *data);
249 static void nxt_router_app_joint_release_handler(nxt_task_t *task, void *obj,
250     void *data);
251 static void nxt_router_free_app(nxt_task_t *task, void *obj, void *data);
252 
253 static const nxt_http_request_state_t  nxt_http_request_send_state;
254 static void nxt_http_request_send_body(nxt_task_t *task, void *obj, void *data);
255 
256 static void nxt_router_app_joint_use(nxt_task_t *task,
257     nxt_app_joint_t *app_joint, int i);
258 
259 static void nxt_router_http_request_release_post(nxt_task_t *task,
260     nxt_http_request_t *r);
261 static void nxt_router_http_request_release(nxt_task_t *task, void *obj,
262     void *data);
263 static void nxt_router_oosm_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg);
264 static void nxt_router_get_port_handler(nxt_task_t *task,
265     nxt_port_recv_msg_t *msg);
266 static void nxt_router_get_mmap_handler(nxt_task_t *task,
267     nxt_port_recv_msg_t *msg);
268 
269 extern const nxt_http_request_state_t  nxt_http_websocket;
270 
271 nxt_router_t  *nxt_router;
272 
273 static const nxt_str_t http_prefix = nxt_string("HTTP_");
274 static const nxt_str_t empty_prefix = nxt_string("");
275 
276 static const nxt_str_t  *nxt_app_msg_prefix[] = {
277     &empty_prefix,
278     &empty_prefix,
279     &http_prefix,
280     &http_prefix,
281     &http_prefix,
282     &empty_prefix,
283 };
284 
285 
286 static const nxt_port_handlers_t  nxt_router_process_port_handlers = {
287     .quit         = nxt_signal_quit_handler,
288     .new_port     = nxt_router_new_port_handler,
289     .get_port     = nxt_router_get_port_handler,
290     .change_file  = nxt_port_change_log_file_handler,
291     .mmap         = nxt_port_mmap_handler,
292     .get_mmap     = nxt_router_get_mmap_handler,
293     .data         = nxt_router_conf_data_handler,
294     .app_restart  = nxt_router_app_restart_handler,
295     .status       = nxt_router_status_handler,
296     .remove_pid   = nxt_router_remove_pid_handler,
297     .access_log   = nxt_router_access_log_reopen_handler,
298     .rpc_ready    = nxt_port_rpc_handler,
299     .rpc_error    = nxt_port_rpc_handler,
300     .oosm         = nxt_router_oosm_handler,
301 };
302 
303 
304 const nxt_process_init_t  nxt_router_process = {
305     .name           = "router",
306     .type           = NXT_PROCESS_ROUTER,
307     .prefork        = nxt_router_prefork,
308     .restart        = 1,
309     .setup          = nxt_process_core_setup,
310     .start          = nxt_router_start,
311     .port_handlers  = &nxt_router_process_port_handlers,
312     .signals        = nxt_process_signals,
313 };
314 
315 
316 /* Queues of nxt_socket_conf_t */
317 nxt_queue_t  creating_sockets;
318 nxt_queue_t  pending_sockets;
319 nxt_queue_t  updating_sockets;
320 nxt_queue_t  keeping_sockets;
321 nxt_queue_t  deleting_sockets;
322 
323 
324 static nxt_int_t
nxt_router_prefork(nxt_task_t * task,nxt_process_t * process,nxt_mp_t * mp)325 nxt_router_prefork(nxt_task_t *task, nxt_process_t *process, nxt_mp_t *mp)
326 {
327     nxt_runtime_stop_app_processes(task, task->thread->runtime);
328 
329     return NXT_OK;
330 }
331 
332 
333 static nxt_int_t
nxt_router_start(nxt_task_t * task,nxt_process_data_t * data)334 nxt_router_start(nxt_task_t *task, nxt_process_data_t *data)
335 {
336     nxt_int_t      ret;
337     nxt_port_t     *controller_port;
338     nxt_router_t   *router;
339     nxt_runtime_t  *rt;
340 
341     rt = task->thread->runtime;
342 
343     nxt_log(task, NXT_LOG_INFO, "router started");
344 
345 #if (NXT_TLS)
346     rt->tls = nxt_service_get(rt->services, "SSL/TLS", "OpenSSL");
347     if (nxt_slow_path(rt->tls == NULL)) {
348         return NXT_ERROR;
349     }
350 
351     ret = rt->tls->library_init(task);
352     if (nxt_slow_path(ret != NXT_OK)) {
353         return ret;
354     }
355 #endif
356 
357     ret = nxt_http_init(task);
358     if (nxt_slow_path(ret != NXT_OK)) {
359         return ret;
360     }
361 
362     router = nxt_zalloc(sizeof(nxt_router_t));
363     if (nxt_slow_path(router == NULL)) {
364         return NXT_ERROR;
365     }
366 
367     nxt_queue_init(&router->engines);
368     nxt_queue_init(&router->sockets);
369     nxt_queue_init(&router->apps);
370 
371     nxt_router = router;
372 
373     controller_port = rt->port_by_type[NXT_PROCESS_CONTROLLER];
374     if (controller_port != NULL) {
375         nxt_router_greet_controller(task, controller_port);
376     }
377 
378     return NXT_OK;
379 }
380 
381 
382 static void
nxt_router_greet_controller(nxt_task_t * task,nxt_port_t * controller_port)383 nxt_router_greet_controller(nxt_task_t *task, nxt_port_t *controller_port)
384 {
385     nxt_port_socket_write(task, controller_port, NXT_PORT_MSG_PROCESS_READY,
386                           -1, 0, 0, NULL);
387 }
388 
389 
390 static void
nxt_router_start_app_process_handler(nxt_task_t * task,nxt_port_t * port,void * data)391 nxt_router_start_app_process_handler(nxt_task_t *task, nxt_port_t *port,
392     void *data)
393 {
394     size_t               size;
395     uint32_t             stream;
396     nxt_fd_t             port_fd, queue_fd;
397     nxt_int_t            ret;
398     nxt_app_t            *app;
399     nxt_buf_t            *b;
400     nxt_port_t           *dport;
401     nxt_runtime_t        *rt;
402     nxt_app_joint_rpc_t  *app_joint_rpc;
403 
404     app = data;
405 
406     nxt_thread_mutex_lock(&app->mutex);
407 
408     dport = app->proto_port;
409 
410     nxt_thread_mutex_unlock(&app->mutex);
411 
412     if (dport != NULL) {
413         nxt_debug(task, "app '%V' %p start process", &app->name, app);
414 
415         b = NULL;
416         port_fd = -1;
417         queue_fd = -1;
418 
419     } else {
420         if (app->proto_port_requests > 0) {
421             nxt_debug(task, "app '%V' %p wait for prototype process",
422                       &app->name, app);
423 
424             app->proto_port_requests++;
425 
426             goto skip;
427         }
428 
429         nxt_debug(task, "app '%V' %p start prototype process", &app->name, app);
430 
431         rt = task->thread->runtime;
432         dport = rt->port_by_type[NXT_PROCESS_MAIN];
433 
434         size = app->name.length + 1 + app->conf.length;
435 
436         b = nxt_buf_mem_alloc(task->thread->engine->mem_pool, size, 0);
437         if (nxt_slow_path(b == NULL)) {
438             goto failed;
439         }
440 
441         nxt_buf_cpystr(b, &app->name);
442         *b->mem.free++ = '\0';
443         nxt_buf_cpystr(b, &app->conf);
444 
445         port_fd = app->shared_port->pair[0];
446         queue_fd = app->shared_port->queue_fd;
447     }
448 
449     app_joint_rpc = nxt_port_rpc_register_handler_ex(task, port,
450                                                      nxt_router_app_port_ready,
451                                                      nxt_router_app_port_error,
452                                                    sizeof(nxt_app_joint_rpc_t));
453     if (nxt_slow_path(app_joint_rpc == NULL)) {
454         goto failed;
455     }
456 
457     stream = nxt_port_rpc_ex_stream(app_joint_rpc);
458 
459     ret = nxt_port_socket_write2(task, dport, NXT_PORT_MSG_START_PROCESS,
460                                  port_fd, queue_fd, stream, port->id, b);
461     if (nxt_slow_path(ret != NXT_OK)) {
462         nxt_port_rpc_cancel(task, port, stream);
463 
464         goto failed;
465     }
466 
467     app_joint_rpc->app_joint = app->joint;
468     app_joint_rpc->generation = app->generation;
469     app_joint_rpc->proto = (b != NULL);
470 
471     if (b != NULL) {
472         app->proto_port_requests++;
473 
474         b = NULL;
475     }
476 
477     nxt_router_app_joint_use(task, app->joint, 1);
478 
479 failed:
480 
481     if (b != NULL) {
482         nxt_mp_free(b->data, b);
483     }
484 
485 skip:
486 
487     nxt_router_app_use(task, app, -1);
488 }
489 
490 
491 static void
nxt_router_app_joint_use(nxt_task_t * task,nxt_app_joint_t * app_joint,int i)492 nxt_router_app_joint_use(nxt_task_t *task, nxt_app_joint_t *app_joint, int i)
493 {
494     app_joint->use_count += i;
495 
496     if (app_joint->use_count == 0) {
497         nxt_assert(app_joint->app == NULL);
498 
499         nxt_free(app_joint);
500     }
501 }
502 
503 
504 static nxt_int_t
nxt_router_start_app_process(nxt_task_t * task,nxt_app_t * app)505 nxt_router_start_app_process(nxt_task_t *task, nxt_app_t *app)
506 {
507     nxt_int_t      res;
508     nxt_port_t     *router_port;
509     nxt_runtime_t  *rt;
510 
511     nxt_debug(task, "app '%V' start process", &app->name);
512 
513     rt = task->thread->runtime;
514     router_port = rt->port_by_type[NXT_PROCESS_ROUTER];
515 
516     nxt_router_app_use(task, app, 1);
517 
518     res = nxt_port_post(task, router_port, nxt_router_start_app_process_handler,
519                         app);
520 
521     if (res == NXT_OK) {
522         return res;
523     }
524 
525     nxt_thread_mutex_lock(&app->mutex);
526 
527     app->pending_processes--;
528 
529     nxt_thread_mutex_unlock(&app->mutex);
530 
531     nxt_router_app_use(task, app, -1);
532 
533     return NXT_ERROR;
534 }
535 
536 
537 nxt_inline nxt_bool_t
nxt_router_msg_cancel(nxt_task_t * task,nxt_request_rpc_data_t * req_rpc_data)538 nxt_router_msg_cancel(nxt_task_t *task, nxt_request_rpc_data_t *req_rpc_data)
539 {
540     nxt_buf_t       *b, *next;
541     nxt_bool_t      cancelled;
542     nxt_port_t      *app_port;
543     nxt_msg_info_t  *msg_info;
544 
545     msg_info = &req_rpc_data->msg_info;
546 
547     if (msg_info->buf == NULL) {
548         return 0;
549     }
550 
551     app_port = req_rpc_data->app_port;
552 
553     if (app_port != NULL && app_port->id == NXT_SHARED_PORT_ID) {
554         cancelled = nxt_app_queue_cancel(app_port->queue,
555                                          msg_info->tracking_cookie,
556                                          req_rpc_data->stream);
557 
558         if (cancelled) {
559             nxt_debug(task, "stream #%uD: cancelled by router",
560                       req_rpc_data->stream);
561         }
562 
563     } else {
564         cancelled = 0;
565     }
566 
567     for (b = msg_info->buf; b != NULL; b = next) {
568         next = b->next;
569         b->next = NULL;
570 
571         if (b->is_port_mmap_sent) {
572             b->is_port_mmap_sent = cancelled == 0;
573         }
574 
575         b->completion_handler(task, b, b->parent);
576     }
577 
578     msg_info->buf = NULL;
579 
580     return cancelled;
581 }
582 
583 
584 nxt_inline nxt_bool_t
nxt_queue_chk_remove(nxt_queue_link_t * lnk)585 nxt_queue_chk_remove(nxt_queue_link_t *lnk)
586 {
587     if (lnk->next != NULL) {
588         nxt_queue_remove(lnk);
589 
590         lnk->next = NULL;
591 
592         return 1;
593     }
594 
595     return 0;
596 }
597 
598 
599 nxt_inline void
nxt_request_rpc_data_unlink(nxt_task_t * task,nxt_request_rpc_data_t * req_rpc_data)600 nxt_request_rpc_data_unlink(nxt_task_t *task,
601     nxt_request_rpc_data_t *req_rpc_data)
602 {
603     nxt_app_t           *app;
604     nxt_bool_t          unlinked;
605     nxt_http_request_t  *r;
606 
607     nxt_router_msg_cancel(task, req_rpc_data);
608 
609     app = req_rpc_data->app;
610 
611     if (req_rpc_data->app_port != NULL) {
612         nxt_router_app_port_release(task, app, req_rpc_data->app_port,
613                                     req_rpc_data->apr_action);
614 
615         req_rpc_data->app_port = NULL;
616     }
617 
618     r = req_rpc_data->request;
619 
620     if (r != NULL) {
621         r->timer_data = NULL;
622 
623         nxt_router_http_request_release_post(task, r);
624 
625         r->req_rpc_data = NULL;
626         req_rpc_data->request = NULL;
627 
628         if (app != NULL) {
629             unlinked = 0;
630 
631             nxt_thread_mutex_lock(&app->mutex);
632 
633             if (r->app_link.next != NULL) {
634                 nxt_queue_remove(&r->app_link);
635                 r->app_link.next = NULL;
636 
637                 unlinked = 1;
638             }
639 
640             nxt_thread_mutex_unlock(&app->mutex);
641 
642             if (unlinked) {
643                 nxt_mp_release(r->mem_pool);
644             }
645         }
646     }
647 
648     if (app != NULL) {
649         nxt_router_app_use(task, app, -1);
650 
651         req_rpc_data->app = NULL;
652     }
653 
654     if (req_rpc_data->msg_info.body_fd != -1) {
655         nxt_fd_close(req_rpc_data->msg_info.body_fd);
656 
657         req_rpc_data->msg_info.body_fd = -1;
658     }
659 
660     if (req_rpc_data->rpc_cancel) {
661         req_rpc_data->rpc_cancel = 0;
662 
663         nxt_port_rpc_cancel(task, task->thread->engine->port,
664                             req_rpc_data->stream);
665     }
666 }
667 
668 
669 static void
nxt_router_new_port_handler(nxt_task_t * task,nxt_port_recv_msg_t * msg)670 nxt_router_new_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
671 {
672     nxt_int_t      res;
673     nxt_app_t      *app;
674     nxt_port_t     *port, *main_app_port;
675     nxt_runtime_t  *rt;
676 
677     nxt_port_new_port_handler(task, msg);
678 
679     port = msg->u.new_port;
680 
681     if (port != NULL && port->type == NXT_PROCESS_CONTROLLER) {
682         nxt_router_greet_controller(task, msg->u.new_port);
683     }
684 
685     if (port != NULL && port->type == NXT_PROCESS_PROTOTYPE)  {
686         nxt_port_rpc_handler(task, msg);
687 
688         return;
689     }
690 
691     if (port == NULL || port->type != NXT_PROCESS_APP) {
692 
693         if (msg->port_msg.stream == 0) {
694             return;
695         }
696 
697         msg->port_msg.type = _NXT_PORT_MSG_RPC_ERROR;
698 
699     } else {
700         if (msg->fd[1] != -1) {
701             res = nxt_router_port_queue_map(task, port, msg->fd[1]);
702             if (nxt_slow_path(res != NXT_OK)) {
703                 return;
704             }
705 
706             nxt_fd_close(msg->fd[1]);
707             msg->fd[1] = -1;
708         }
709     }
710 
711     if (msg->port_msg.stream != 0) {
712         nxt_port_rpc_handler(task, msg);
713         return;
714     }
715 
716     nxt_debug(task, "new port id %d (%d)", port->id, port->type);
717 
718     /*
719      * Port with "id == 0" is application 'main' port and it always
720      * should come with non-zero stream.
721      */
722     nxt_assert(port->id != 0);
723 
724     /* Find 'main' app port and get app reference. */
725     rt = task->thread->runtime;
726 
727     /*
728      * It is safe to access 'runtime->ports' hash because 'NEW_PORT'
729      * sent to main port (with id == 0) and processed in main thread.
730      */
731     main_app_port = nxt_port_hash_find(&rt->ports, port->pid, 0);
732     nxt_assert(main_app_port != NULL);
733 
734     app = main_app_port->app;
735 
736     if (nxt_fast_path(app != NULL)) {
737         nxt_thread_mutex_lock(&app->mutex);
738 
739         /* TODO here should be find-and-add code because there can be
740            port waiters in port_hash */
741         nxt_port_hash_add(&app->port_hash, port);
742         app->port_hash_count++;
743 
744         nxt_thread_mutex_unlock(&app->mutex);
745 
746         port->app = app;
747     }
748 
749     port->main_app_port = main_app_port;
750 
751     nxt_port_socket_write(task, port, NXT_PORT_MSG_PORT_ACK, -1, 0, 0, NULL);
752 }
753 
754 
755 static void
nxt_router_conf_data_handler(nxt_task_t * task,nxt_port_recv_msg_t * msg)756 nxt_router_conf_data_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
757 {
758     void                    *p;
759     size_t                  size;
760     nxt_int_t               ret;
761     nxt_port_t              *port;
762     nxt_router_temp_conf_t  *tmcf;
763 
764     port = nxt_runtime_port_find(task->thread->runtime,
765                                  msg->port_msg.pid,
766                                  msg->port_msg.reply_port);
767     if (nxt_slow_path(port == NULL)) {
768         nxt_alert(task, "conf_data_handler: reply port not found");
769         return;
770     }
771 
772     p = MAP_FAILED;
773 
774     /*
775      * Ancient compilers like gcc 4.8.5 on CentOS 7 wants 'size' to be
776      * initialized in 'cleanup' section.
777      */
778     size = 0;
779 
780     tmcf = nxt_router_temp_conf(task);
781     if (nxt_slow_path(tmcf == NULL)) {
782         goto fail;
783     }
784 
785     if (nxt_slow_path(msg->fd[0] == -1)) {
786         nxt_alert(task, "conf_data_handler: invalid shm fd");
787         goto fail;
788     }
789 
790     if (nxt_buf_mem_used_size(&msg->buf->mem) != sizeof(size_t)) {
791         nxt_alert(task, "conf_data_handler: unexpected buffer size (%d)",
792                   (int) nxt_buf_mem_used_size(&msg->buf->mem));
793         goto fail;
794     }
795 
796     nxt_memcpy(&size, msg->buf->mem.pos, sizeof(size_t));
797 
798     p = nxt_mem_mmap(NULL, size, PROT_READ, MAP_SHARED, msg->fd[0], 0);
799 
800     nxt_fd_close(msg->fd[0]);
801     msg->fd[0] = -1;
802 
803     if (nxt_slow_path(p == MAP_FAILED)) {
804         goto fail;
805     }
806 
807     nxt_debug(task, "conf_data_handler(%uz): %*s", size, size, p);
808 
809     tmcf->router_conf->router = nxt_router;
810     tmcf->stream = msg->port_msg.stream;
811     tmcf->port = port;
812 
813     nxt_port_use(task, tmcf->port, 1);
814 
815     ret = nxt_router_conf_create(task, tmcf, p, nxt_pointer_to(p, size));
816 
817     if (nxt_fast_path(ret == NXT_OK)) {
818         nxt_router_conf_apply(task, tmcf, NULL);
819 
820     } else {
821         nxt_router_conf_error(task, tmcf);
822     }
823 
824     goto cleanup;
825 
826 fail:
827 
828     nxt_port_socket_write(task, port, NXT_PORT_MSG_RPC_ERROR, -1,
829                           msg->port_msg.stream, 0, NULL);
830 
831     if (tmcf != NULL) {
832         nxt_mp_release(tmcf->mem_pool);
833     }
834 
835 cleanup:
836 
837     if (p != MAP_FAILED) {
838         nxt_mem_munmap(p, size);
839     }
840 
841     if (msg->fd[0] != -1) {
842         nxt_fd_close(msg->fd[0]);
843         msg->fd[0] = -1;
844     }
845 }
846 
847 
848 static void
nxt_router_app_restart_handler(nxt_task_t * task,nxt_port_recv_msg_t * msg)849 nxt_router_app_restart_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
850 {
851     nxt_app_t            *app;
852     nxt_int_t            ret;
853     nxt_str_t            app_name;
854     nxt_port_t           *reply_port, *shared_port, *old_shared_port;
855     nxt_port_t           *proto_port;
856     nxt_port_msg_type_t  reply;
857 
858     reply_port = nxt_runtime_port_find(task->thread->runtime,
859                                        msg->port_msg.pid,
860                                        msg->port_msg.reply_port);
861     if (nxt_slow_path(reply_port == NULL)) {
862         nxt_alert(task, "app_restart_handler: reply port not found");
863         return;
864     }
865 
866     app_name.length = nxt_buf_mem_used_size(&msg->buf->mem);
867     app_name.start = msg->buf->mem.pos;
868 
869     nxt_debug(task, "app_restart_handler: %V", &app_name);
870 
871     app = nxt_router_app_find(&nxt_router->apps, &app_name);
872 
873     if (nxt_fast_path(app != NULL)) {
874         shared_port = nxt_port_new(task, NXT_SHARED_PORT_ID, nxt_pid,
875                                    NXT_PROCESS_APP);
876         if (nxt_slow_path(shared_port == NULL)) {
877             goto fail;
878         }
879 
880         ret = nxt_port_socket_init(task, shared_port, 0);
881         if (nxt_slow_path(ret != NXT_OK)) {
882             nxt_port_use(task, shared_port, -1);
883             goto fail;
884         }
885 
886         ret = nxt_router_app_queue_init(task, shared_port);
887         if (nxt_slow_path(ret != NXT_OK)) {
888             nxt_port_write_close(shared_port);
889             nxt_port_read_close(shared_port);
890             nxt_port_use(task, shared_port, -1);
891             goto fail;
892         }
893 
894         nxt_port_write_enable(task, shared_port);
895 
896         nxt_thread_mutex_lock(&app->mutex);
897 
898         proto_port = app->proto_port;
899 
900         if (proto_port != NULL) {
901             nxt_debug(task, "send QUIT to prototype '%V' pid %PI", &app->name,
902                       proto_port->pid);
903 
904             app->proto_port = NULL;
905             proto_port->app = NULL;
906         }
907 
908         app->generation++;
909 
910         shared_port->app = app;
911 
912         old_shared_port = app->shared_port;
913         old_shared_port->app = NULL;
914 
915         app->shared_port = shared_port;
916 
917         nxt_thread_mutex_unlock(&app->mutex);
918 
919         nxt_port_close(task, old_shared_port);
920         nxt_port_use(task, old_shared_port, -1);
921 
922         if (proto_port != NULL) {
923             (void) nxt_port_socket_write(task, proto_port, NXT_PORT_MSG_QUIT,
924                                          -1, 0, 0, NULL);
925 
926             nxt_port_close(task, proto_port);
927 
928             nxt_port_use(task, proto_port, -1);
929         }
930 
931         reply = NXT_PORT_MSG_RPC_READY_LAST;
932 
933     } else {
934 
935 fail:
936 
937         reply = NXT_PORT_MSG_RPC_ERROR;
938     }
939 
940     nxt_port_socket_write(task, reply_port, reply, -1, msg->port_msg.stream,
941                           0, NULL);
942 }
943 
944 
945 static void
nxt_router_status_handler(nxt_task_t * task,nxt_port_recv_msg_t * msg)946 nxt_router_status_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
947 {
948     u_char               *p;
949     size_t               alloc;
950     nxt_app_t            *app;
951     nxt_buf_t            *b;
952     nxt_uint_t           type;
953     nxt_port_t           *port;
954     nxt_status_app_t     *app_stat;
955     nxt_event_engine_t   *engine;
956     nxt_status_report_t  *report;
957 
958     port = nxt_runtime_port_find(task->thread->runtime,
959                                  msg->port_msg.pid,
960                                  msg->port_msg.reply_port);
961     if (nxt_slow_path(port == NULL)) {
962         nxt_alert(task, "nxt_router_status_handler(): reply port not found");
963         return;
964     }
965 
966     alloc = sizeof(nxt_status_report_t);
967 
968     nxt_queue_each(app, &nxt_router->apps, nxt_app_t, link) {
969 
970         alloc += sizeof(nxt_status_app_t) + app->name.length;
971 
972     } nxt_queue_loop;
973 
974     b = nxt_buf_mem_alloc(port->mem_pool, alloc, 0);
975     if (nxt_slow_path(b == NULL)) {
976         type = NXT_PORT_MSG_RPC_ERROR;
977         goto fail;
978     }
979 
980     report = (nxt_status_report_t *) b->mem.free;
981     b->mem.free = b->mem.end;
982 
983     nxt_memzero(report, sizeof(nxt_status_report_t));
984 
985     nxt_queue_each(engine, &nxt_router->engines, nxt_event_engine_t, link0) {
986 
987         report->accepted_conns += engine->accepted_conns_cnt;
988         report->idle_conns += engine->idle_conns_cnt;
989         report->closed_conns += engine->closed_conns_cnt;
990         report->requests += engine->requests_cnt;
991 
992     } nxt_queue_loop;
993 
994     report->apps_count = 0;
995     app_stat = report->apps;
996     p = b->mem.end;
997 
998     nxt_queue_each(app, &nxt_router->apps, nxt_app_t, link) {
999         p -= app->name.length;
1000 
1001         nxt_memcpy(p, app->name.start, app->name.length);
1002 
1003         app_stat->name.length = app->name.length;
1004         app_stat->name.start = (u_char *) (p - b->mem.pos);
1005 
1006         app_stat->active_requests = app->active_requests;
1007         app_stat->pending_processes = app->pending_processes;
1008         app_stat->processes = app->processes;
1009         app_stat->idle_processes = app->idle_processes;
1010 
1011         report->apps_count++;
1012         app_stat++;
1013     } nxt_queue_loop;
1014 
1015     type = NXT_PORT_MSG_RPC_READY_LAST;
1016 
1017 fail:
1018 
1019     nxt_port_socket_write(task, port, type, -1, msg->port_msg.stream, 0, b);
1020 }
1021 
1022 
1023 static void
nxt_router_app_process_remove_pid(nxt_task_t * task,nxt_port_t * port,void * data)1024 nxt_router_app_process_remove_pid(nxt_task_t *task, nxt_port_t *port,
1025     void *data)
1026 {
1027     union {
1028         nxt_pid_t  removed_pid;
1029         void       *data;
1030     } u;
1031 
1032     u.data = data;
1033 
1034     nxt_port_rpc_remove_peer(task, port, u.removed_pid);
1035 }
1036 
1037 
1038 static void
nxt_router_remove_pid_handler(nxt_task_t * task,nxt_port_recv_msg_t * msg)1039 nxt_router_remove_pid_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
1040 {
1041     nxt_event_engine_t  *engine;
1042 
1043     nxt_port_remove_pid_handler(task, msg);
1044 
1045     nxt_queue_each(engine, &nxt_router->engines, nxt_event_engine_t, link0)
1046     {
1047         if (nxt_fast_path(engine->port != NULL)) {
1048             nxt_port_post(task, engine->port, nxt_router_app_process_remove_pid,
1049                           msg->u.data);
1050         }
1051     }
1052     nxt_queue_loop;
1053 
1054     if (msg->port_msg.stream == 0) {
1055         return;
1056     }
1057 
1058     msg->port_msg.type = _NXT_PORT_MSG_RPC_ERROR;
1059 
1060     nxt_port_rpc_handler(task, msg);
1061 }
1062 
1063 
1064 static nxt_router_temp_conf_t *
nxt_router_temp_conf(nxt_task_t * task)1065 nxt_router_temp_conf(nxt_task_t *task)
1066 {
1067     nxt_mp_t                *mp, *tmp;
1068     nxt_router_conf_t       *rtcf;
1069     nxt_router_temp_conf_t  *tmcf;
1070 
1071     mp = nxt_mp_create(1024, 128, 256, 32);
1072     if (nxt_slow_path(mp == NULL)) {
1073         return NULL;
1074     }
1075 
1076     rtcf = nxt_mp_zget(mp, sizeof(nxt_router_conf_t));
1077     if (nxt_slow_path(rtcf == NULL)) {
1078         goto fail;
1079     }
1080 
1081     rtcf->mem_pool = mp;
1082 
1083     rtcf->tstr_state = nxt_tstr_state_new(mp, 0);
1084     if (nxt_slow_path(rtcf->tstr_state == NULL)) {
1085         goto fail;
1086     }
1087 
1088 #if (NXT_HAVE_NJS)
1089     nxt_http_register_js_proto(rtcf->tstr_state->jcf);
1090 #endif
1091 
1092     tmp = nxt_mp_create(1024, 128, 256, 32);
1093     if (nxt_slow_path(tmp == NULL)) {
1094         goto fail;
1095     }
1096 
1097     tmcf = nxt_mp_zget(tmp, sizeof(nxt_router_temp_conf_t));
1098     if (nxt_slow_path(tmcf == NULL)) {
1099         goto temp_fail;
1100     }
1101 
1102     tmcf->mem_pool = tmp;
1103     tmcf->router_conf = rtcf;
1104     tmcf->count = 1;
1105     tmcf->engine = task->thread->engine;
1106 
1107     tmcf->engines = nxt_array_create(tmcf->mem_pool, 4,
1108                                      sizeof(nxt_router_engine_conf_t));
1109     if (nxt_slow_path(tmcf->engines == NULL)) {
1110         goto temp_fail;
1111     }
1112 
1113     nxt_queue_init(&creating_sockets);
1114     nxt_queue_init(&pending_sockets);
1115     nxt_queue_init(&updating_sockets);
1116     nxt_queue_init(&keeping_sockets);
1117     nxt_queue_init(&deleting_sockets);
1118 
1119 #if (NXT_TLS)
1120     nxt_queue_init(&tmcf->tls);
1121 #endif
1122 
1123 #if (NXT_HAVE_NJS)
1124     nxt_queue_init(&tmcf->js_modules);
1125 #endif
1126 
1127     nxt_queue_init(&tmcf->apps);
1128     nxt_queue_init(&tmcf->previous);
1129 
1130     return tmcf;
1131 
1132 temp_fail:
1133 
1134     nxt_mp_destroy(tmp);
1135 
1136 fail:
1137 
1138     if (rtcf->tstr_state != NULL) {
1139         nxt_tstr_state_release(rtcf->tstr_state);
1140     }
1141 
1142     nxt_mp_destroy(mp);
1143 
1144     return NULL;
1145 }
1146 
1147 
1148 nxt_inline nxt_bool_t
nxt_router_app_can_start(nxt_app_t * app)1149 nxt_router_app_can_start(nxt_app_t *app)
1150 {
1151     return app->processes + app->pending_processes < app->max_processes
1152             && app->pending_processes < app->max_pending_processes;
1153 }
1154 
1155 
1156 nxt_inline nxt_bool_t
nxt_router_app_need_start(nxt_app_t * app)1157 nxt_router_app_need_start(nxt_app_t *app)
1158 {
1159     return (app->active_requests
1160               > app->port_hash_count + app->pending_processes)
1161            || (app->spare_processes
1162                 > app->idle_processes + app->pending_processes);
1163 }
1164 
1165 
1166 void
nxt_router_conf_apply(nxt_task_t * task,void * obj,void * data)1167 nxt_router_conf_apply(nxt_task_t *task, void *obj, void *data)
1168 {
1169     nxt_int_t                    ret;
1170     nxt_app_t                    *app;
1171     nxt_router_t                 *router;
1172     nxt_runtime_t                *rt;
1173     nxt_queue_link_t             *qlk;
1174     nxt_socket_conf_t            *skcf;
1175     nxt_router_conf_t            *rtcf;
1176     nxt_router_temp_conf_t       *tmcf;
1177     const nxt_event_interface_t  *interface;
1178 #if (NXT_TLS)
1179     nxt_router_tlssock_t         *tls;
1180 #endif
1181 #if (NXT_HAVE_NJS)
1182     nxt_router_js_module_t       *js_module;
1183 #endif
1184 
1185     tmcf = obj;
1186 
1187     qlk = nxt_queue_first(&pending_sockets);
1188 
1189     if (qlk != nxt_queue_tail(&pending_sockets)) {
1190         nxt_queue_remove(qlk);
1191         nxt_queue_insert_tail(&creating_sockets, qlk);
1192 
1193         skcf = nxt_queue_link_data(qlk, nxt_socket_conf_t, link);
1194 
1195         nxt_router_listen_socket_rpc_create(task, tmcf, skcf);
1196 
1197         return;
1198     }
1199 
1200 #if (NXT_TLS)
1201     qlk = nxt_queue_last(&tmcf->tls);
1202 
1203     if (qlk != nxt_queue_head(&tmcf->tls)) {
1204         nxt_queue_remove(qlk);
1205 
1206         tls = nxt_queue_link_data(qlk, nxt_router_tlssock_t, link);
1207 
1208         nxt_cert_store_get(task, &tls->name, tmcf->mem_pool,
1209                            nxt_router_tls_rpc_handler, tls);
1210         return;
1211     }
1212 #endif
1213 
1214 #if (NXT_HAVE_NJS)
1215     qlk = nxt_queue_last(&tmcf->js_modules);
1216 
1217     if (qlk != nxt_queue_head(&tmcf->js_modules)) {
1218         nxt_queue_remove(qlk);
1219 
1220         js_module = nxt_queue_link_data(qlk, nxt_router_js_module_t, link);
1221 
1222         nxt_script_store_get(task, &js_module->name, tmcf->mem_pool,
1223                              nxt_router_js_module_rpc_handler, js_module);
1224         return;
1225     }
1226 #endif
1227 
1228     rtcf = tmcf->router_conf;
1229 
1230     ret = nxt_tstr_state_done(rtcf->tstr_state, NULL);
1231     if (nxt_slow_path(ret != NXT_OK)) {
1232         goto fail;
1233     }
1234 
1235     nxt_queue_each(app, &tmcf->apps, nxt_app_t, link) {
1236 
1237         if (nxt_router_app_need_start(app)) {
1238             nxt_router_app_rpc_create(task, tmcf, app);
1239             return;
1240         }
1241 
1242     } nxt_queue_loop;
1243 
1244     if (rtcf->access_log != NULL && rtcf->access_log->fd == -1) {
1245         nxt_router_access_log_open(task, tmcf);
1246         return;
1247     }
1248 
1249     rt = task->thread->runtime;
1250 
1251     interface = nxt_service_get(rt->services, "engine", NULL);
1252 
1253     router = rtcf->router;
1254 
1255     ret = nxt_router_engines_create(task, router, tmcf, interface);
1256     if (nxt_slow_path(ret != NXT_OK)) {
1257         goto fail;
1258     }
1259 
1260     ret = nxt_router_threads_create(task, rt, tmcf);
1261     if (nxt_slow_path(ret != NXT_OK)) {
1262         goto fail;
1263     }
1264 
1265     nxt_router_apps_sort(task, router, tmcf);
1266 
1267     nxt_router_apps_hash_use(task, rtcf, 1);
1268 
1269     nxt_router_engines_post(router, tmcf);
1270 
1271     nxt_queue_add(&router->sockets, &updating_sockets);
1272     nxt_queue_add(&router->sockets, &creating_sockets);
1273 
1274     if (router->access_log != rtcf->access_log) {
1275         nxt_router_access_log_use(&router->lock, rtcf->access_log);
1276 
1277         nxt_router_access_log_release(task, &router->lock, router->access_log);
1278 
1279         router->access_log = rtcf->access_log;
1280     }
1281 
1282     nxt_router_conf_ready(task, tmcf);
1283 
1284     return;
1285 
1286 fail:
1287 
1288     nxt_router_conf_error(task, tmcf);
1289 
1290     return;
1291 }
1292 
1293 
1294 static void
nxt_router_conf_wait(nxt_task_t * task,void * obj,void * data)1295 nxt_router_conf_wait(nxt_task_t *task, void *obj, void *data)
1296 {
1297     nxt_joint_job_t  *job;
1298 
1299     job = obj;
1300 
1301     nxt_router_conf_ready(task, job->tmcf);
1302 }
1303 
1304 
1305 static void
nxt_router_conf_ready(nxt_task_t * task,nxt_router_temp_conf_t * tmcf)1306 nxt_router_conf_ready(nxt_task_t *task, nxt_router_temp_conf_t *tmcf)
1307 {
1308     uint32_t               count;
1309     nxt_router_conf_t      *rtcf;
1310     nxt_thread_spinlock_t  *lock;
1311 
1312     nxt_debug(task, "temp conf %p count: %D", tmcf, tmcf->count);
1313 
1314     if (--tmcf->count > 0) {
1315         return;
1316     }
1317 
1318     nxt_router_conf_send(task, tmcf, NXT_PORT_MSG_RPC_READY_LAST);
1319 
1320     rtcf = tmcf->router_conf;
1321 
1322     lock = &rtcf->router->lock;
1323 
1324     nxt_thread_spin_lock(lock);
1325 
1326     count = rtcf->count;
1327 
1328     nxt_thread_spin_unlock(lock);
1329 
1330     nxt_debug(task, "rtcf %p: %D", rtcf, count);
1331 
1332     if (count == 0) {
1333         nxt_router_apps_hash_use(task, rtcf, -1);
1334 
1335         nxt_router_access_log_release(task, lock, rtcf->access_log);
1336 
1337         nxt_mp_destroy(rtcf->mem_pool);
1338     }
1339 
1340     nxt_mp_release(tmcf->mem_pool);
1341 }
1342 
1343 
1344 void
nxt_router_conf_error(nxt_task_t * task,nxt_router_temp_conf_t * tmcf)1345 nxt_router_conf_error(nxt_task_t *task, nxt_router_temp_conf_t *tmcf)
1346 {
1347     nxt_app_t          *app;
1348     nxt_socket_t       s;
1349     nxt_router_t       *router;
1350     nxt_queue_link_t   *qlk;
1351     nxt_socket_conf_t  *skcf;
1352     nxt_router_conf_t  *rtcf;
1353 
1354     nxt_alert(task, "failed to apply new conf");
1355 
1356     for (qlk = nxt_queue_first(&creating_sockets);
1357          qlk != nxt_queue_tail(&creating_sockets);
1358          qlk = nxt_queue_next(qlk))
1359     {
1360         skcf = nxt_queue_link_data(qlk, nxt_socket_conf_t, link);
1361         s = skcf->listen->socket;
1362 
1363         if (s != -1) {
1364             nxt_socket_close(task, s);
1365         }
1366 
1367         nxt_free(skcf->listen);
1368     }
1369 
1370     rtcf = tmcf->router_conf;
1371 
1372     nxt_queue_each(app, &tmcf->apps, nxt_app_t, link) {
1373 
1374         nxt_router_app_unlink(task, app);
1375 
1376     } nxt_queue_loop;
1377 
1378     router = rtcf->router;
1379 
1380     nxt_queue_add(&router->sockets, &keeping_sockets);
1381     nxt_queue_add(&router->sockets, &deleting_sockets);
1382 
1383     nxt_queue_add(&router->apps, &tmcf->previous);
1384 
1385     // TODO: new engines and threads
1386 
1387     nxt_router_access_log_release(task, &router->lock, rtcf->access_log);
1388 
1389     nxt_mp_destroy(rtcf->mem_pool);
1390 
1391     nxt_router_conf_send(task, tmcf, NXT_PORT_MSG_RPC_ERROR);
1392 
1393     nxt_mp_release(tmcf->mem_pool);
1394 }
1395 
1396 
1397 static void
nxt_router_conf_send(nxt_task_t * task,nxt_router_temp_conf_t * tmcf,nxt_port_msg_type_t type)1398 nxt_router_conf_send(nxt_task_t *task, nxt_router_temp_conf_t *tmcf,
1399     nxt_port_msg_type_t type)
1400 {
1401     nxt_port_socket_write(task, tmcf->port, type, -1, tmcf->stream, 0, NULL);
1402 
1403     nxt_port_use(task, tmcf->port, -1);
1404 
1405     tmcf->port = NULL;
1406 }
1407 
1408 
1409 static nxt_conf_map_t  nxt_router_conf[] = {
1410     {
1411         nxt_string("listeners_threads"),
1412         NXT_CONF_MAP_INT32,
1413         offsetof(nxt_router_conf_t, threads),
1414     },
1415 };
1416 
1417 
1418 static nxt_conf_map_t  nxt_router_app_conf[] = {
1419     {
1420         nxt_string("type"),
1421         NXT_CONF_MAP_STR,
1422         offsetof(nxt_router_app_conf_t, type),
1423     },
1424 
1425     {
1426         nxt_string("limits"),
1427         NXT_CONF_MAP_PTR,
1428         offsetof(nxt_router_app_conf_t, limits_value),
1429     },
1430 
1431     {
1432         nxt_string("processes"),
1433         NXT_CONF_MAP_INT32,
1434         offsetof(nxt_router_app_conf_t, processes),
1435     },
1436 
1437     {
1438         nxt_string("processes"),
1439         NXT_CONF_MAP_PTR,
1440         offsetof(nxt_router_app_conf_t, processes_value),
1441     },
1442 
1443     {
1444         nxt_string("targets"),
1445         NXT_CONF_MAP_PTR,
1446         offsetof(nxt_router_app_conf_t, targets_value),
1447     },
1448 };
1449 
1450 
1451 static nxt_conf_map_t  nxt_router_app_limits_conf[] = {
1452     {
1453         nxt_string("timeout"),
1454         NXT_CONF_MAP_MSEC,
1455         offsetof(nxt_router_app_conf_t, timeout),
1456     },
1457 };
1458 
1459 
1460 static nxt_conf_map_t  nxt_router_app_processes_conf[] = {
1461     {
1462         nxt_string("spare"),
1463         NXT_CONF_MAP_INT32,
1464         offsetof(nxt_router_app_conf_t, spare_processes),
1465     },
1466 
1467     {
1468         nxt_string("max"),
1469         NXT_CONF_MAP_INT32,
1470         offsetof(nxt_router_app_conf_t, max_processes),
1471     },
1472 
1473     {
1474         nxt_string("idle_timeout"),
1475         NXT_CONF_MAP_MSEC,
1476         offsetof(nxt_router_app_conf_t, idle_timeout),
1477     },
1478 };
1479 
1480 
1481 static nxt_conf_map_t  nxt_router_listener_conf[] = {
1482     {
1483         nxt_string("pass"),
1484         NXT_CONF_MAP_STR_COPY,
1485         offsetof(nxt_router_listener_conf_t, pass),
1486     },
1487 
1488     {
1489         nxt_string("application"),
1490         NXT_CONF_MAP_STR_COPY,
1491         offsetof(nxt_router_listener_conf_t, application),
1492     },
1493 };
1494 
1495 
1496 static nxt_conf_map_t  nxt_router_http_conf[] = {
1497     {
1498         nxt_string("header_buffer_size"),
1499         NXT_CONF_MAP_SIZE,
1500         offsetof(nxt_socket_conf_t, header_buffer_size),
1501     },
1502 
1503     {
1504         nxt_string("large_header_buffer_size"),
1505         NXT_CONF_MAP_SIZE,
1506         offsetof(nxt_socket_conf_t, large_header_buffer_size),
1507     },
1508 
1509     {
1510         nxt_string("large_header_buffers"),
1511         NXT_CONF_MAP_SIZE,
1512         offsetof(nxt_socket_conf_t, large_header_buffers),
1513     },
1514 
1515     {
1516         nxt_string("body_buffer_size"),
1517         NXT_CONF_MAP_SIZE,
1518         offsetof(nxt_socket_conf_t, body_buffer_size),
1519     },
1520 
1521     {
1522         nxt_string("max_body_size"),
1523         NXT_CONF_MAP_SIZE,
1524         offsetof(nxt_socket_conf_t, max_body_size),
1525     },
1526 
1527     {
1528         nxt_string("idle_timeout"),
1529         NXT_CONF_MAP_MSEC,
1530         offsetof(nxt_socket_conf_t, idle_timeout),
1531     },
1532 
1533     {
1534         nxt_string("header_read_timeout"),
1535         NXT_CONF_MAP_MSEC,
1536         offsetof(nxt_socket_conf_t, header_read_timeout),
1537     },
1538 
1539     {
1540         nxt_string("body_read_timeout"),
1541         NXT_CONF_MAP_MSEC,
1542         offsetof(nxt_socket_conf_t, body_read_timeout),
1543     },
1544 
1545     {
1546         nxt_string("send_timeout"),
1547         NXT_CONF_MAP_MSEC,
1548         offsetof(nxt_socket_conf_t, send_timeout),
1549     },
1550 
1551     {
1552         nxt_string("body_temp_path"),
1553         NXT_CONF_MAP_STR,
1554         offsetof(nxt_socket_conf_t, body_temp_path),
1555     },
1556 
1557     {
1558         nxt_string("discard_unsafe_fields"),
1559         NXT_CONF_MAP_INT8,
1560         offsetof(nxt_socket_conf_t, discard_unsafe_fields),
1561     },
1562 
1563     {
1564         nxt_string("log_route"),
1565         NXT_CONF_MAP_INT8,
1566         offsetof(nxt_socket_conf_t, log_route),
1567     },
1568 
1569     {
1570         nxt_string("server_version"),
1571         NXT_CONF_MAP_INT8,
1572         offsetof(nxt_socket_conf_t, server_version),
1573     },
1574 };
1575 
1576 
1577 static nxt_conf_map_t  nxt_router_websocket_conf[] = {
1578     {
1579         nxt_string("max_frame_size"),
1580         NXT_CONF_MAP_SIZE,
1581         offsetof(nxt_websocket_conf_t, max_frame_size),
1582     },
1583 
1584     {
1585         nxt_string("read_timeout"),
1586         NXT_CONF_MAP_MSEC,
1587         offsetof(nxt_websocket_conf_t, read_timeout),
1588     },
1589 
1590     {
1591         nxt_string("keepalive_interval"),
1592         NXT_CONF_MAP_MSEC,
1593         offsetof(nxt_websocket_conf_t, keepalive_interval),
1594     },
1595 
1596 };
1597 
1598 
1599 static nxt_int_t
nxt_router_conf_create(nxt_task_t * task,nxt_router_temp_conf_t * tmcf,u_char * start,u_char * end)1600 nxt_router_conf_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf,
1601     u_char *start, u_char *end)
1602 {
1603     u_char                      *p;
1604     size_t                      size;
1605     nxt_mp_t                    *mp, *app_mp;
1606     uint32_t                    next, next_target;
1607     nxt_int_t                   ret;
1608     nxt_str_t                   name, target;
1609     nxt_app_t                   *app, *prev;
1610     nxt_str_t                   *t, *s, *targets;
1611     nxt_uint_t                  n, i;
1612     nxt_port_t                  *port;
1613     nxt_router_t                *router;
1614     nxt_app_joint_t             *app_joint;
1615 #if (NXT_TLS)
1616     nxt_tls_init_t              *tls_init;
1617     nxt_conf_value_t            *certificate;
1618 #endif
1619 #if (NXT_HAVE_NJS)
1620     nxt_conf_value_t            *js_module;
1621 #endif
1622     nxt_conf_value_t            *root, *conf, *http, *value, *websocket;
1623     nxt_conf_value_t            *applications, *application;
1624     nxt_conf_value_t            *listeners, *listener;
1625     nxt_socket_conf_t           *skcf;
1626     nxt_router_conf_t           *rtcf;
1627     nxt_http_routes_t           *routes;
1628     nxt_event_engine_t          *engine;
1629     nxt_app_lang_module_t       *lang;
1630     nxt_router_app_conf_t       apcf;
1631     nxt_router_listener_conf_t  lscf;
1632 
1633     static nxt_str_t  http_path = nxt_string("/settings/http");
1634     static nxt_str_t  applications_path = nxt_string("/applications");
1635     static nxt_str_t  listeners_path = nxt_string("/listeners");
1636     static nxt_str_t  routes_path = nxt_string("/routes");
1637     static nxt_str_t  access_log_path = nxt_string("/access_log");
1638 #if (NXT_TLS)
1639     static nxt_str_t  certificate_path = nxt_string("/tls/certificate");
1640     static nxt_str_t  conf_commands_path = nxt_string("/tls/conf_commands");
1641     static nxt_str_t  conf_cache_path = nxt_string("/tls/session/cache_size");
1642     static nxt_str_t  conf_timeout_path = nxt_string("/tls/session/timeout");
1643     static nxt_str_t  conf_tickets = nxt_string("/tls/session/tickets");
1644 #endif
1645 #if (NXT_HAVE_NJS)
1646     static nxt_str_t  js_module_path = nxt_string("/settings/js_module");
1647 #endif
1648     static nxt_str_t  static_path = nxt_string("/settings/http/static");
1649     static nxt_str_t  websocket_path = nxt_string("/settings/http/websocket");
1650     static nxt_str_t  forwarded_path = nxt_string("/forwarded");
1651     static nxt_str_t  client_ip_path = nxt_string("/client_ip");
1652 
1653     root = nxt_conf_json_parse(tmcf->mem_pool, start, end, NULL);
1654     if (root == NULL) {
1655         nxt_alert(task, "configuration parsing error");
1656         return NXT_ERROR;
1657     }
1658 
1659     rtcf = tmcf->router_conf;
1660     mp = rtcf->mem_pool;
1661 
1662     ret = nxt_conf_map_object(mp, root, nxt_router_conf,
1663                               nxt_nitems(nxt_router_conf), rtcf);
1664     if (ret != NXT_OK) {
1665         nxt_alert(task, "root map error");
1666         return NXT_ERROR;
1667     }
1668 
1669     if (rtcf->threads == 0) {
1670         rtcf->threads = nxt_ncpu;
1671     }
1672 
1673     conf = nxt_conf_get_path(root, &static_path);
1674 
1675     ret = nxt_router_conf_process_static(task, rtcf, conf);
1676     if (nxt_slow_path(ret != NXT_OK)) {
1677         return NXT_ERROR;
1678     }
1679 
1680     router = rtcf->router;
1681 
1682     applications = nxt_conf_get_path(root, &applications_path);
1683 
1684     if (applications != NULL) {
1685         next = 0;
1686 
1687         for ( ;; ) {
1688             application = nxt_conf_next_object_member(applications,
1689                                                       &name, &next);
1690             if (application == NULL) {
1691                 break;
1692             }
1693 
1694             nxt_debug(task, "application \"%V\"", &name);
1695 
1696             size = nxt_conf_json_length(application, NULL);
1697 
1698             app_mp = nxt_mp_create(4096, 128, 1024, 64);
1699             if (nxt_slow_path(app_mp == NULL)) {
1700                 goto fail;
1701             }
1702 
1703             app = nxt_mp_get(app_mp, sizeof(nxt_app_t) + name.length + size);
1704             if (app == NULL) {
1705                 goto app_fail;
1706             }
1707 
1708             nxt_memzero(app, sizeof(nxt_app_t));
1709 
1710             app->mem_pool = app_mp;
1711 
1712             app->name.start = nxt_pointer_to(app, sizeof(nxt_app_t));
1713             app->conf.start = nxt_pointer_to(app, sizeof(nxt_app_t)
1714                                                   + name.length);
1715 
1716             p = nxt_conf_json_print(app->conf.start, application, NULL);
1717             app->conf.length = p - app->conf.start;
1718 
1719             nxt_assert(app->conf.length <= size);
1720 
1721             nxt_debug(task, "application conf \"%V\"", &app->conf);
1722 
1723             prev = nxt_router_app_find(&router->apps, &name);
1724 
1725             if (prev != NULL && nxt_strstr_eq(&app->conf, &prev->conf)) {
1726                 nxt_mp_destroy(app_mp);
1727 
1728                 nxt_queue_remove(&prev->link);
1729                 nxt_queue_insert_tail(&tmcf->previous, &prev->link);
1730 
1731                 ret = nxt_router_apps_hash_add(rtcf, prev);
1732                 if (nxt_slow_path(ret != NXT_OK)) {
1733                     goto fail;
1734                 }
1735 
1736                 continue;
1737             }
1738 
1739             apcf.processes = 1;
1740             apcf.max_processes = 1;
1741             apcf.spare_processes = 0;
1742             apcf.timeout = 0;
1743             apcf.idle_timeout = 15000;
1744             apcf.limits_value = NULL;
1745             apcf.processes_value = NULL;
1746             apcf.targets_value = NULL;
1747 
1748             app_joint = nxt_malloc(sizeof(nxt_app_joint_t));
1749             if (nxt_slow_path(app_joint == NULL)) {
1750                 goto app_fail;
1751             }
1752 
1753             nxt_memzero(app_joint, sizeof(nxt_app_joint_t));
1754 
1755             ret = nxt_conf_map_object(mp, application, nxt_router_app_conf,
1756                                       nxt_nitems(nxt_router_app_conf), &apcf);
1757             if (ret != NXT_OK) {
1758                 nxt_alert(task, "application map error");
1759                 goto app_fail;
1760             }
1761 
1762             if (apcf.limits_value != NULL) {
1763 
1764                 if (nxt_conf_type(apcf.limits_value) != NXT_CONF_OBJECT) {
1765                     nxt_alert(task, "application limits is not object");
1766                     goto app_fail;
1767                 }
1768 
1769                 ret = nxt_conf_map_object(mp, apcf.limits_value,
1770                                         nxt_router_app_limits_conf,
1771                                         nxt_nitems(nxt_router_app_limits_conf),
1772                                         &apcf);
1773                 if (ret != NXT_OK) {
1774                     nxt_alert(task, "application limits map error");
1775                     goto app_fail;
1776                 }
1777             }
1778 
1779             if (apcf.processes_value != NULL
1780                 && nxt_conf_type(apcf.processes_value) == NXT_CONF_OBJECT)
1781             {
1782                 ret = nxt_conf_map_object(mp, apcf.processes_value,
1783                                      nxt_router_app_processes_conf,
1784                                      nxt_nitems(nxt_router_app_processes_conf),
1785                                      &apcf);
1786                 if (ret != NXT_OK) {
1787                     nxt_alert(task, "application processes map error");
1788                     goto app_fail;
1789                 }
1790 
1791             } else {
1792                 apcf.max_processes = apcf.processes;
1793                 apcf.spare_processes = apcf.processes;
1794             }
1795 
1796             if (apcf.targets_value != NULL) {
1797                 n = nxt_conf_object_members_count(apcf.targets_value);
1798 
1799                 targets = nxt_mp_get(app_mp, sizeof(nxt_str_t) * n);
1800                 if (nxt_slow_path(targets == NULL)) {
1801                     goto app_fail;
1802                 }
1803 
1804                 next_target = 0;
1805 
1806                 for (i = 0; i < n; i++) {
1807                     (void) nxt_conf_next_object_member(apcf.targets_value,
1808                                                        &target, &next_target);
1809 
1810                     s = nxt_str_dup(app_mp, &targets[i], &target);
1811                     if (nxt_slow_path(s == NULL)) {
1812                         goto app_fail;
1813                     }
1814                 }
1815 
1816             } else {
1817                 targets = NULL;
1818             }
1819 
1820             nxt_debug(task, "application type: %V", &apcf.type);
1821             nxt_debug(task, "application processes: %D", apcf.processes);
1822             nxt_debug(task, "application request timeout: %M", apcf.timeout);
1823 
1824             lang = nxt_app_lang_module(task->thread->runtime, &apcf.type);
1825 
1826             if (lang == NULL) {
1827                 nxt_alert(task, "unknown application type: \"%V\"", &apcf.type);
1828                 goto app_fail;
1829             }
1830 
1831             nxt_debug(task, "application language module: \"%s\"", lang->file);
1832 
1833             ret = nxt_thread_mutex_create(&app->mutex);
1834             if (ret != NXT_OK) {
1835                 goto app_fail;
1836             }
1837 
1838             nxt_queue_init(&app->ports);
1839             nxt_queue_init(&app->spare_ports);
1840             nxt_queue_init(&app->idle_ports);
1841             nxt_queue_init(&app->ack_waiting_req);
1842 
1843             app->name.length = name.length;
1844             nxt_memcpy(app->name.start, name.start, name.length);
1845 
1846             app->type = lang->type;
1847             app->max_processes = apcf.max_processes;
1848             app->spare_processes = apcf.spare_processes;
1849             app->max_pending_processes = apcf.spare_processes
1850                                          ? apcf.spare_processes : 1;
1851             app->timeout = apcf.timeout;
1852             app->idle_timeout = apcf.idle_timeout;
1853 
1854             app->targets = targets;
1855 
1856             engine = task->thread->engine;
1857 
1858             app->engine = engine;
1859 
1860             app->adjust_idle_work.handler = nxt_router_adjust_idle_timer;
1861             app->adjust_idle_work.task = &engine->task;
1862             app->adjust_idle_work.obj = app;
1863 
1864             nxt_queue_insert_tail(&tmcf->apps, &app->link);
1865 
1866             ret = nxt_router_apps_hash_add(rtcf, app);
1867             if (nxt_slow_path(ret != NXT_OK)) {
1868                 goto app_fail;
1869             }
1870 
1871             nxt_router_app_use(task, app, 1);
1872 
1873             app->joint = app_joint;
1874 
1875             app_joint->use_count = 1;
1876             app_joint->app = app;
1877 
1878             app_joint->idle_timer.bias = NXT_TIMER_DEFAULT_BIAS;
1879             app_joint->idle_timer.work_queue = &engine->fast_work_queue;
1880             app_joint->idle_timer.handler = nxt_router_app_idle_timeout;
1881             app_joint->idle_timer.task = &engine->task;
1882             app_joint->idle_timer.log = app_joint->idle_timer.task->log;
1883 
1884             app_joint->free_app_work.handler = nxt_router_free_app;
1885             app_joint->free_app_work.task = &engine->task;
1886             app_joint->free_app_work.obj = app_joint;
1887 
1888             port = nxt_port_new(task, NXT_SHARED_PORT_ID, nxt_pid,
1889                                 NXT_PROCESS_APP);
1890             if (nxt_slow_path(port == NULL)) {
1891                 return NXT_ERROR;
1892             }
1893 
1894             ret = nxt_port_socket_init(task, port, 0);
1895             if (nxt_slow_path(ret != NXT_OK)) {
1896                 nxt_port_use(task, port, -1);
1897                 return NXT_ERROR;
1898             }
1899 
1900             ret = nxt_router_app_queue_init(task, port);
1901             if (nxt_slow_path(ret != NXT_OK)) {
1902                 nxt_port_write_close(port);
1903                 nxt_port_read_close(port);
1904                 nxt_port_use(task, port, -1);
1905                 return NXT_ERROR;
1906             }
1907 
1908             nxt_port_write_enable(task, port);
1909             port->app = app;
1910 
1911             app->shared_port = port;
1912 
1913             nxt_thread_mutex_create(&app->outgoing.mutex);
1914         }
1915     }
1916 
1917     conf = nxt_conf_get_path(root, &routes_path);
1918     if (nxt_fast_path(conf != NULL)) {
1919         routes = nxt_http_routes_create(task, tmcf, conf);
1920         if (nxt_slow_path(routes == NULL)) {
1921             return NXT_ERROR;
1922         }
1923 
1924         rtcf->routes = routes;
1925     }
1926 
1927     ret = nxt_upstreams_create(task, tmcf, root);
1928     if (nxt_slow_path(ret != NXT_OK)) {
1929         return ret;
1930     }
1931 
1932     http = nxt_conf_get_path(root, &http_path);
1933 #if 0
1934     if (http == NULL) {
1935         nxt_alert(task, "no \"http\" block");
1936         return NXT_ERROR;
1937     }
1938 #endif
1939 
1940     websocket = nxt_conf_get_path(root, &websocket_path);
1941 
1942     listeners = nxt_conf_get_path(root, &listeners_path);
1943 
1944     if (listeners != NULL) {
1945         next = 0;
1946 
1947         for ( ;; ) {
1948             listener = nxt_conf_next_object_member(listeners, &name, &next);
1949             if (listener == NULL) {
1950                 break;
1951             }
1952 
1953             skcf = nxt_router_socket_conf(task, tmcf, &name);
1954             if (skcf == NULL) {
1955                 goto fail;
1956             }
1957 
1958             nxt_memzero(&lscf, sizeof(lscf));
1959 
1960             ret = nxt_conf_map_object(mp, listener, nxt_router_listener_conf,
1961                                       nxt_nitems(nxt_router_listener_conf),
1962                                       &lscf);
1963             if (ret != NXT_OK) {
1964                 nxt_alert(task, "listener map error");
1965                 goto fail;
1966             }
1967 
1968             nxt_debug(task, "application: %V", &lscf.application);
1969 
1970             // STUB, default values if http block is not defined.
1971             skcf->header_buffer_size = 2048;
1972             skcf->large_header_buffer_size = 8192;
1973             skcf->large_header_buffers = 4;
1974             skcf->discard_unsafe_fields = 1;
1975             skcf->body_buffer_size = 16 * 1024;
1976             skcf->max_body_size = 8 * 1024 * 1024;
1977             skcf->proxy_header_buffer_size = 64 * 1024;
1978             skcf->proxy_buffer_size = 4096;
1979             skcf->proxy_buffers = 256;
1980             skcf->idle_timeout = 180 * 1000;
1981             skcf->header_read_timeout = 30 * 1000;
1982             skcf->body_read_timeout = 30 * 1000;
1983             skcf->send_timeout = 30 * 1000;
1984             skcf->proxy_timeout = 60 * 1000;
1985             skcf->proxy_send_timeout = 30 * 1000;
1986             skcf->proxy_read_timeout = 30 * 1000;
1987 
1988             skcf->server_version = 1;
1989 
1990             skcf->websocket_conf.max_frame_size = 1024 * 1024;
1991             skcf->websocket_conf.read_timeout = 60 * 1000;
1992             skcf->websocket_conf.keepalive_interval = 30 * 1000;
1993 
1994             nxt_str_null(&skcf->body_temp_path);
1995 
1996             if (http != NULL) {
1997                 ret = nxt_conf_map_object(mp, http, nxt_router_http_conf,
1998                                           nxt_nitems(nxt_router_http_conf),
1999                                           skcf);
2000                 if (ret != NXT_OK) {
2001                     nxt_alert(task, "http map error");
2002                     goto fail;
2003                 }
2004             }
2005 
2006             if (websocket != NULL) {
2007                 ret = nxt_conf_map_object(mp, websocket,
2008                                           nxt_router_websocket_conf,
2009                                           nxt_nitems(nxt_router_websocket_conf),
2010                                           &skcf->websocket_conf);
2011                 if (ret != NXT_OK) {
2012                     nxt_alert(task, "websocket map error");
2013                     goto fail;
2014                 }
2015             }
2016 
2017             t = &skcf->body_temp_path;
2018 
2019             if (t->length == 0) {
2020                 t->start = (u_char *) task->thread->runtime->tmp;
2021                 t->length = nxt_strlen(t->start);
2022             }
2023 
2024             conf = nxt_conf_get_path(listener, &forwarded_path);
2025 
2026             if (conf != NULL) {
2027                 skcf->forwarded = nxt_router_conf_forward(task, mp, conf);
2028                 if (nxt_slow_path(skcf->forwarded == NULL)) {
2029                     return NXT_ERROR;
2030                 }
2031             }
2032 
2033             conf = nxt_conf_get_path(listener, &client_ip_path);
2034 
2035             if (conf != NULL) {
2036                 skcf->client_ip = nxt_router_conf_forward(task, mp, conf);
2037                 if (nxt_slow_path(skcf->client_ip == NULL)) {
2038                     return NXT_ERROR;
2039                 }
2040             }
2041 
2042 #if (NXT_TLS)
2043             certificate = nxt_conf_get_path(listener, &certificate_path);
2044 
2045             if (certificate != NULL) {
2046                 tls_init = nxt_mp_get(tmcf->mem_pool, sizeof(nxt_tls_init_t));
2047                 if (nxt_slow_path(tls_init == NULL)) {
2048                     return NXT_ERROR;
2049                 }
2050 
2051                 tls_init->cache_size = 0;
2052                 tls_init->timeout = 300;
2053 
2054                 value = nxt_conf_get_path(listener, &conf_cache_path);
2055                 if (value != NULL) {
2056                     tls_init->cache_size = nxt_conf_get_number(value);
2057                 }
2058 
2059                 value = nxt_conf_get_path(listener, &conf_timeout_path);
2060                 if (value != NULL) {
2061                     tls_init->timeout = nxt_conf_get_number(value);
2062                 }
2063 
2064                 tls_init->conf_cmds = nxt_conf_get_path(listener,
2065                                                         &conf_commands_path);
2066 
2067                 tls_init->tickets_conf = nxt_conf_get_path(listener,
2068                                                            &conf_tickets);
2069 
2070                 n = nxt_conf_array_elements_count_or_1(certificate);
2071 
2072                 for (i = 0; i < n; i++) {
2073                     value = nxt_conf_get_array_element_or_itself(certificate,
2074                                                                  i);
2075                     nxt_assert(value != NULL);
2076 
2077                     ret = nxt_router_conf_tls_insert(tmcf, value, skcf,
2078                                                      tls_init, i == 0);
2079                     if (nxt_slow_path(ret != NXT_OK)) {
2080                         goto fail;
2081                     }
2082                 }
2083             }
2084 #endif
2085 
2086             skcf->listen->handler = nxt_http_conn_init;
2087             skcf->router_conf = rtcf;
2088             skcf->router_conf->count++;
2089 
2090             if (lscf.pass.length != 0) {
2091                 skcf->action = nxt_http_action_create(task, tmcf, &lscf.pass);
2092 
2093             /* COMPATIBILITY: listener application. */
2094             } else if (lscf.application.length > 0) {
2095                 skcf->action = nxt_http_pass_application(task, rtcf,
2096                                                          &lscf.application);
2097             }
2098 
2099             if (nxt_slow_path(skcf->action == NULL)) {
2100                 goto fail;
2101             }
2102         }
2103     }
2104 
2105     ret = nxt_http_routes_resolve(task, tmcf);
2106     if (nxt_slow_path(ret != NXT_OK)) {
2107         goto fail;
2108     }
2109 
2110     value = nxt_conf_get_path(root, &access_log_path);
2111 
2112     if (value != NULL) {
2113         ret = nxt_router_access_log_create(task, rtcf, value);
2114         if (nxt_slow_path(ret != NXT_OK)) {
2115             goto fail;
2116         }
2117     }
2118 
2119 #if (NXT_HAVE_NJS)
2120     js_module = nxt_conf_get_path(root, &js_module_path);
2121 
2122     if (js_module != NULL) {
2123         if (nxt_conf_type(js_module) == NXT_CONF_ARRAY) {
2124             n = nxt_conf_array_elements_count(js_module);
2125 
2126             for (i = 0; i < n; i++) {
2127                 value = nxt_conf_get_array_element(js_module, i);
2128 
2129                 ret = nxt_router_js_module_insert(tmcf, value);
2130                 if (nxt_slow_path(ret != NXT_OK)) {
2131                     goto fail;
2132                 }
2133             }
2134 
2135         } else {
2136             /* NXT_CONF_STRING */
2137 
2138             ret = nxt_router_js_module_insert(tmcf, js_module);
2139             if (nxt_slow_path(ret != NXT_OK)) {
2140                 goto fail;
2141             }
2142         }
2143     }
2144 
2145 #endif
2146 
2147     nxt_queue_add(&deleting_sockets, &router->sockets);
2148     nxt_queue_init(&router->sockets);
2149 
2150     return NXT_OK;
2151 
2152 app_fail:
2153 
2154     nxt_mp_destroy(app_mp);
2155 
2156 fail:
2157 
2158     nxt_queue_each(app, &tmcf->apps, nxt_app_t, link) {
2159 
2160         nxt_queue_remove(&app->link);
2161         nxt_thread_mutex_destroy(&app->mutex);
2162         nxt_mp_destroy(app->mem_pool);
2163 
2164     } nxt_queue_loop;
2165 
2166     return NXT_ERROR;
2167 }
2168 
2169 
2170 #if (NXT_TLS)
2171 
2172 static nxt_int_t
nxt_router_conf_tls_insert(nxt_router_temp_conf_t * tmcf,nxt_conf_value_t * value,nxt_socket_conf_t * skcf,nxt_tls_init_t * tls_init,nxt_bool_t last)2173 nxt_router_conf_tls_insert(nxt_router_temp_conf_t *tmcf,
2174     nxt_conf_value_t *value, nxt_socket_conf_t *skcf,
2175     nxt_tls_init_t *tls_init, nxt_bool_t last)
2176 {
2177     nxt_router_tlssock_t  *tls;
2178 
2179     tls = nxt_mp_get(tmcf->mem_pool, sizeof(nxt_router_tlssock_t));
2180     if (nxt_slow_path(tls == NULL)) {
2181         return NXT_ERROR;
2182     }
2183 
2184     tls->tls_init = tls_init;
2185     tls->socket_conf = skcf;
2186     tls->temp_conf = tmcf;
2187     tls->last = last;
2188     nxt_conf_get_string(value, &tls->name);
2189 
2190     nxt_queue_insert_tail(&tmcf->tls, &tls->link);
2191 
2192     return NXT_OK;
2193 }
2194 
2195 #endif
2196 
2197 
2198 #if (NXT_HAVE_NJS)
2199 
2200 static void
nxt_router_js_module_rpc_handler(nxt_task_t * task,nxt_port_recv_msg_t * msg,void * data)2201 nxt_router_js_module_rpc_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg,
2202     void *data)
2203 {
2204     nxt_int_t               ret;
2205     nxt_str_t               text;
2206     nxt_router_conf_t       *rtcf;
2207     nxt_router_temp_conf_t  *tmcf;
2208     nxt_router_js_module_t  *js_module;
2209 
2210     nxt_debug(task, "auto module rpc handler");
2211 
2212     js_module = data;
2213     tmcf = js_module->temp_conf;
2214 
2215     if (msg == NULL || msg->port_msg.type == _NXT_PORT_MSG_RPC_ERROR) {
2216         goto fail;
2217     }
2218 
2219     rtcf = tmcf->router_conf;
2220 
2221     ret = nxt_script_file_read(msg->fd[0], &text);
2222 
2223     nxt_fd_close(msg->fd[0]);
2224 
2225     if (nxt_slow_path(ret == NXT_ERROR)) {
2226         goto fail;
2227     }
2228 
2229     if (text.length > 0) {
2230         ret = nxt_js_add_module(rtcf->tstr_state->jcf, &js_module->name, &text);
2231 
2232         nxt_free(text.start);
2233 
2234         if (nxt_slow_path(ret == NXT_ERROR)) {
2235             goto fail;
2236         }
2237     }
2238 
2239     nxt_work_queue_add(&task->thread->engine->fast_work_queue,
2240                        nxt_router_conf_apply, task, tmcf, NULL);
2241     return;
2242 
2243 fail:
2244 
2245     nxt_router_conf_error(task, tmcf);
2246 }
2247 
2248 
2249 static nxt_int_t
nxt_router_js_module_insert(nxt_router_temp_conf_t * tmcf,nxt_conf_value_t * value)2250 nxt_router_js_module_insert(nxt_router_temp_conf_t *tmcf,
2251     nxt_conf_value_t *value)
2252 {
2253     nxt_router_js_module_t  *js_module;
2254 
2255     js_module = nxt_mp_get(tmcf->mem_pool, sizeof(nxt_router_js_module_t));
2256     if (nxt_slow_path(js_module == NULL)) {
2257         return NXT_ERROR;
2258     }
2259 
2260     js_module->temp_conf = tmcf;
2261     nxt_conf_get_string(value, &js_module->name);
2262 
2263     nxt_queue_insert_tail(&tmcf->js_modules, &js_module->link);
2264 
2265     return NXT_OK;
2266 }
2267 
2268 #endif
2269 
2270 
2271 static nxt_int_t
nxt_router_conf_process_static(nxt_task_t * task,nxt_router_conf_t * rtcf,nxt_conf_value_t * conf)2272 nxt_router_conf_process_static(nxt_task_t *task, nxt_router_conf_t *rtcf,
2273     nxt_conf_value_t *conf)
2274 {
2275     uint32_t          next, i;
2276     nxt_mp_t          *mp;
2277     nxt_str_t         *type, exten, str;
2278     nxt_int_t         ret;
2279     nxt_uint_t        exts;
2280     nxt_conf_value_t  *mtypes_conf, *ext_conf, *value;
2281 
2282     static nxt_str_t  mtypes_path = nxt_string("/mime_types");
2283 
2284     mp = rtcf->mem_pool;
2285 
2286     ret = nxt_http_static_mtypes_init(mp, &rtcf->mtypes_hash);
2287     if (nxt_slow_path(ret != NXT_OK)) {
2288         return NXT_ERROR;
2289     }
2290 
2291     if (conf == NULL) {
2292         return NXT_OK;
2293     }
2294 
2295     mtypes_conf = nxt_conf_get_path(conf, &mtypes_path);
2296 
2297     if (mtypes_conf != NULL) {
2298         next = 0;
2299 
2300         for ( ;; ) {
2301             ext_conf = nxt_conf_next_object_member(mtypes_conf, &str, &next);
2302 
2303             if (ext_conf == NULL) {
2304                 break;
2305             }
2306 
2307             type = nxt_str_dup(mp, NULL, &str);
2308             if (nxt_slow_path(type == NULL)) {
2309                 return NXT_ERROR;
2310             }
2311 
2312             if (nxt_conf_type(ext_conf) == NXT_CONF_STRING) {
2313                 nxt_conf_get_string(ext_conf, &str);
2314 
2315                 if (nxt_slow_path(nxt_str_dup(mp, &exten, &str) == NULL)) {
2316                     return NXT_ERROR;
2317                 }
2318 
2319                 ret = nxt_http_static_mtypes_hash_add(mp, &rtcf->mtypes_hash,
2320                                                       &exten, type);
2321                 if (nxt_slow_path(ret != NXT_OK)) {
2322                     return NXT_ERROR;
2323                 }
2324 
2325                 continue;
2326             }
2327 
2328             exts = nxt_conf_array_elements_count(ext_conf);
2329 
2330             for (i = 0; i < exts; i++) {
2331                 value = nxt_conf_get_array_element(ext_conf, i);
2332 
2333                 nxt_conf_get_string(value, &str);
2334 
2335                 if (nxt_slow_path(nxt_str_dup(mp, &exten, &str) == NULL)) {
2336                     return NXT_ERROR;
2337                 }
2338 
2339                 ret = nxt_http_static_mtypes_hash_add(mp, &rtcf->mtypes_hash,
2340                                                       &exten, type);
2341                 if (nxt_slow_path(ret != NXT_OK)) {
2342                     return NXT_ERROR;
2343                 }
2344             }
2345         }
2346     }
2347 
2348     return NXT_OK;
2349 }
2350 
2351 
2352 static nxt_http_forward_t *
nxt_router_conf_forward(nxt_task_t * task,nxt_mp_t * mp,nxt_conf_value_t * conf)2353 nxt_router_conf_forward(nxt_task_t *task, nxt_mp_t *mp, nxt_conf_value_t *conf)
2354 {
2355     nxt_int_t                   ret;
2356     nxt_conf_value_t            *header_conf, *client_ip_conf, *protocol_conf;
2357     nxt_conf_value_t            *source_conf, *recursive_conf;
2358     nxt_http_forward_t          *forward;
2359     nxt_http_route_addr_rule_t  *source;
2360 
2361     static nxt_str_t  header_path = nxt_string("/header");
2362     static nxt_str_t  client_ip_path = nxt_string("/client_ip");
2363     static nxt_str_t  protocol_path = nxt_string("/protocol");
2364     static nxt_str_t  source_path = nxt_string("/source");
2365     static nxt_str_t  recursive_path = nxt_string("/recursive");
2366 
2367     header_conf = nxt_conf_get_path(conf, &header_path);
2368 
2369     if (header_conf != NULL) {
2370         client_ip_conf = nxt_conf_get_path(conf, &header_path);
2371         protocol_conf = NULL;
2372 
2373     } else {
2374         client_ip_conf = nxt_conf_get_path(conf, &client_ip_path);
2375         protocol_conf = nxt_conf_get_path(conf, &protocol_path);
2376     }
2377 
2378     source_conf = nxt_conf_get_path(conf, &source_path);
2379     recursive_conf = nxt_conf_get_path(conf, &recursive_path);
2380 
2381     if (source_conf == NULL
2382         || (protocol_conf == NULL && client_ip_conf == NULL))
2383     {
2384         return NULL;
2385     }
2386 
2387     forward = nxt_mp_zget(mp, sizeof(nxt_http_forward_t));
2388     if (nxt_slow_path(forward == NULL)) {
2389         return NULL;
2390     }
2391 
2392     source = nxt_http_route_addr_rule_create(task, mp, source_conf);
2393     if (nxt_slow_path(source == NULL)) {
2394         return NULL;
2395     }
2396 
2397     forward->source = source;
2398 
2399     if (recursive_conf != NULL) {
2400         forward->recursive = nxt_conf_get_boolean(recursive_conf);
2401     }
2402 
2403     if (client_ip_conf != NULL) {
2404         ret = nxt_router_conf_forward_header(mp, client_ip_conf,
2405                                              &forward->client_ip);
2406         if (nxt_slow_path(ret != NXT_OK)) {
2407             return NULL;
2408         }
2409     }
2410 
2411     if (protocol_conf != NULL) {
2412         ret = nxt_router_conf_forward_header(mp, protocol_conf,
2413                                              &forward->protocol);
2414         if (nxt_slow_path(ret != NXT_OK)) {
2415             return NULL;
2416         }
2417     }
2418 
2419     return forward;
2420 }
2421 
2422 
2423 static nxt_int_t
nxt_router_conf_forward_header(nxt_mp_t * mp,nxt_conf_value_t * conf,nxt_http_forward_header_t * fh)2424 nxt_router_conf_forward_header(nxt_mp_t *mp, nxt_conf_value_t *conf,
2425     nxt_http_forward_header_t *fh)
2426 {
2427     char       c;
2428     size_t     i;
2429     uint32_t   hash;
2430     nxt_str_t  header;
2431 
2432     nxt_conf_get_string(conf, &header);
2433 
2434     fh->header = nxt_str_dup(mp, NULL, &header);
2435     if (nxt_slow_path(fh->header == NULL)) {
2436         return NXT_ERROR;
2437     }
2438 
2439     hash = NXT_HTTP_FIELD_HASH_INIT;
2440 
2441     for (i = 0; i < fh->header->length; i++) {
2442         c = fh->header->start[i];
2443         hash = nxt_http_field_hash_char(hash, nxt_lowcase(c));
2444     }
2445 
2446     hash = nxt_http_field_hash_end(hash) & 0xFFFF;
2447 
2448     fh->header_hash = hash;
2449 
2450     return NXT_OK;
2451 }
2452 
2453 
2454 static nxt_app_t *
nxt_router_app_find(nxt_queue_t * queue,nxt_str_t * name)2455 nxt_router_app_find(nxt_queue_t *queue, nxt_str_t *name)
2456 {
2457     nxt_app_t  *app;
2458 
2459     nxt_queue_each(app, queue, nxt_app_t, link) {
2460 
2461         if (nxt_strstr_eq(name, &app->name)) {
2462             return app;
2463         }
2464 
2465     } nxt_queue_loop;
2466 
2467     return NULL;
2468 }
2469 
2470 
2471 static nxt_int_t
nxt_router_app_queue_init(nxt_task_t * task,nxt_port_t * port)2472 nxt_router_app_queue_init(nxt_task_t *task, nxt_port_t *port)
2473 {
2474     void       *mem;
2475     nxt_int_t  fd;
2476 
2477     fd = nxt_shm_open(task, sizeof(nxt_app_queue_t));
2478     if (nxt_slow_path(fd == -1)) {
2479         return NXT_ERROR;
2480     }
2481 
2482     mem = nxt_mem_mmap(NULL, sizeof(nxt_app_queue_t),
2483                        PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
2484     if (nxt_slow_path(mem == MAP_FAILED)) {
2485         nxt_fd_close(fd);
2486 
2487         return NXT_ERROR;
2488     }
2489 
2490     nxt_app_queue_init(mem);
2491 
2492     port->queue_fd = fd;
2493     port->queue = mem;
2494 
2495     return NXT_OK;
2496 }
2497 
2498 
2499 static nxt_int_t
nxt_router_port_queue_init(nxt_task_t * task,nxt_port_t * port)2500 nxt_router_port_queue_init(nxt_task_t *task, nxt_port_t *port)
2501 {
2502     void       *mem;
2503     nxt_int_t  fd;
2504 
2505     fd = nxt_shm_open(task, sizeof(nxt_port_queue_t));
2506     if (nxt_slow_path(fd == -1)) {
2507         return NXT_ERROR;
2508     }
2509 
2510     mem = nxt_mem_mmap(NULL, sizeof(nxt_port_queue_t),
2511                        PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
2512     if (nxt_slow_path(mem == MAP_FAILED)) {
2513         nxt_fd_close(fd);
2514 
2515         return NXT_ERROR;
2516     }
2517 
2518     nxt_port_queue_init(mem);
2519 
2520     port->queue_fd = fd;
2521     port->queue = mem;
2522 
2523     return NXT_OK;
2524 }
2525 
2526 
2527 static nxt_int_t
nxt_router_port_queue_map(nxt_task_t * task,nxt_port_t * port,nxt_fd_t fd)2528 nxt_router_port_queue_map(nxt_task_t *task, nxt_port_t *port, nxt_fd_t fd)
2529 {
2530     void  *mem;
2531 
2532     nxt_assert(fd != -1);
2533 
2534     mem = nxt_mem_mmap(NULL, sizeof(nxt_port_queue_t),
2535                        PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
2536     if (nxt_slow_path(mem == MAP_FAILED)) {
2537 
2538         return NXT_ERROR;
2539     }
2540 
2541     port->queue = mem;
2542 
2543     return NXT_OK;
2544 }
2545 
2546 
2547 static const nxt_lvlhsh_proto_t  nxt_router_apps_hash_proto  nxt_aligned(64) = {
2548     NXT_LVLHSH_DEFAULT,
2549     nxt_router_apps_hash_test,
2550     nxt_mp_lvlhsh_alloc,
2551     nxt_mp_lvlhsh_free,
2552 };
2553 
2554 
2555 static nxt_int_t
nxt_router_apps_hash_test(nxt_lvlhsh_query_t * lhq,void * data)2556 nxt_router_apps_hash_test(nxt_lvlhsh_query_t *lhq, void *data)
2557 {
2558     nxt_app_t  *app;
2559 
2560     app = data;
2561 
2562     return nxt_strstr_eq(&lhq->key, &app->name) ? NXT_OK : NXT_DECLINED;
2563 }
2564 
2565 
2566 static nxt_int_t
nxt_router_apps_hash_add(nxt_router_conf_t * rtcf,nxt_app_t * app)2567 nxt_router_apps_hash_add(nxt_router_conf_t *rtcf, nxt_app_t *app)
2568 {
2569     nxt_lvlhsh_query_t  lhq;
2570 
2571     lhq.key_hash = nxt_djb_hash(app->name.start, app->name.length);
2572     lhq.replace = 0;
2573     lhq.key = app->name;
2574     lhq.value = app;
2575     lhq.proto = &nxt_router_apps_hash_proto;
2576     lhq.pool = rtcf->mem_pool;
2577 
2578     switch (nxt_lvlhsh_insert(&rtcf->apps_hash, &lhq)) {
2579 
2580     case NXT_OK:
2581         return NXT_OK;
2582 
2583     case NXT_DECLINED:
2584         nxt_thread_log_alert("router app hash adding failed: "
2585                              "\"%V\" is already in hash", &lhq.key);
2586         /* Fall through. */
2587     default:
2588         return NXT_ERROR;
2589     }
2590 }
2591 
2592 
2593 static nxt_app_t *
nxt_router_apps_hash_get(nxt_router_conf_t * rtcf,nxt_str_t * name)2594 nxt_router_apps_hash_get(nxt_router_conf_t *rtcf, nxt_str_t *name)
2595 {
2596     nxt_lvlhsh_query_t  lhq;
2597 
2598     lhq.key_hash = nxt_djb_hash(name->start, name->length);
2599     lhq.key = *name;
2600     lhq.proto = &nxt_router_apps_hash_proto;
2601 
2602     if (nxt_lvlhsh_find(&rtcf->apps_hash, &lhq) != NXT_OK) {
2603         return NULL;
2604     }
2605 
2606     return lhq.value;
2607 }
2608 
2609 
2610 static void
nxt_router_apps_hash_use(nxt_task_t * task,nxt_router_conf_t * rtcf,int i)2611 nxt_router_apps_hash_use(nxt_task_t *task, nxt_router_conf_t *rtcf, int i)
2612 {
2613     nxt_app_t          *app;
2614     nxt_lvlhsh_each_t  lhe;
2615 
2616     nxt_lvlhsh_each_init(&lhe, &nxt_router_apps_hash_proto);
2617 
2618     for ( ;; ) {
2619         app = nxt_lvlhsh_each(&rtcf->apps_hash, &lhe);
2620 
2621         if (app == NULL) {
2622             break;
2623         }
2624 
2625         nxt_router_app_use(task, app, i);
2626     }
2627 }
2628 
2629 
2630 typedef struct {
2631     nxt_app_t  *app;
2632     nxt_int_t  target;
2633 } nxt_http_app_conf_t;
2634 
2635 
2636 nxt_int_t
nxt_router_application_init(nxt_router_conf_t * rtcf,nxt_str_t * name,nxt_str_t * target,nxt_http_action_t * action)2637 nxt_router_application_init(nxt_router_conf_t *rtcf, nxt_str_t *name,
2638     nxt_str_t *target, nxt_http_action_t *action)
2639 {
2640     nxt_app_t            *app;
2641     nxt_str_t            *targets;
2642     nxt_uint_t           i;
2643     nxt_http_app_conf_t  *conf;
2644 
2645     app = nxt_router_apps_hash_get(rtcf, name);
2646     if (app == NULL) {
2647         return NXT_DECLINED;
2648     }
2649 
2650     conf = nxt_mp_get(rtcf->mem_pool, sizeof(nxt_http_app_conf_t));
2651     if (nxt_slow_path(conf == NULL)) {
2652         return NXT_ERROR;
2653     }
2654 
2655     action->handler = nxt_http_application_handler;
2656     action->u.conf = conf;
2657 
2658     conf->app = app;
2659 
2660     if (target != NULL && target->length != 0) {
2661         targets = app->targets;
2662 
2663         for (i = 0; !nxt_strstr_eq(target, &targets[i]); i++);
2664 
2665         conf->target = i;
2666 
2667     } else {
2668         conf->target = 0;
2669     }
2670 
2671     return NXT_OK;
2672 }
2673 
2674 
2675 static nxt_socket_conf_t *
nxt_router_socket_conf(nxt_task_t * task,nxt_router_temp_conf_t * tmcf,nxt_str_t * name)2676 nxt_router_socket_conf(nxt_task_t *task, nxt_router_temp_conf_t *tmcf,
2677     nxt_str_t *name)
2678 {
2679     size_t               size;
2680     nxt_int_t            ret;
2681     nxt_bool_t           wildcard;
2682     nxt_sockaddr_t       *sa;
2683     nxt_socket_conf_t    *skcf;
2684     nxt_listen_socket_t  *ls;
2685 
2686     sa = nxt_sockaddr_parse(tmcf->mem_pool, name);
2687     if (nxt_slow_path(sa == NULL)) {
2688         nxt_alert(task, "invalid listener \"%V\"", name);
2689         return NULL;
2690     }
2691 
2692     sa->type = SOCK_STREAM;
2693 
2694     nxt_debug(task, "router listener: \"%*s\"",
2695               (size_t) sa->length, nxt_sockaddr_start(sa));
2696 
2697     skcf = nxt_mp_zget(tmcf->router_conf->mem_pool, sizeof(nxt_socket_conf_t));
2698     if (nxt_slow_path(skcf == NULL)) {
2699         return NULL;
2700     }
2701 
2702     size = nxt_sockaddr_size(sa);
2703 
2704     ret = nxt_router_listen_socket_find(tmcf, skcf, sa);
2705 
2706     if (ret != NXT_OK) {
2707 
2708         ls = nxt_zalloc(sizeof(nxt_listen_socket_t) + size);
2709         if (nxt_slow_path(ls == NULL)) {
2710             return NULL;
2711         }
2712 
2713         skcf->listen = ls;
2714 
2715         ls->sockaddr = nxt_pointer_to(ls, sizeof(nxt_listen_socket_t));
2716         nxt_memcpy(ls->sockaddr, sa, size);
2717 
2718         nxt_listen_socket_remote_size(ls);
2719 
2720         ls->socket = -1;
2721         ls->backlog = NXT_LISTEN_BACKLOG;
2722         ls->flags = NXT_NONBLOCK;
2723         ls->read_after_accept = 1;
2724     }
2725 
2726     switch (sa->u.sockaddr.sa_family) {
2727 #if (NXT_HAVE_UNIX_DOMAIN)
2728     case AF_UNIX:
2729         wildcard = 0;
2730         break;
2731 #endif
2732 #if (NXT_INET6)
2733     case AF_INET6:
2734         wildcard = IN6_IS_ADDR_UNSPECIFIED(&sa->u.sockaddr_in6.sin6_addr);
2735         break;
2736 #endif
2737     case AF_INET:
2738     default:
2739         wildcard = (sa->u.sockaddr_in.sin_addr.s_addr == INADDR_ANY);
2740         break;
2741     }
2742 
2743     if (!wildcard) {
2744         skcf->sockaddr = nxt_mp_zget(tmcf->router_conf->mem_pool, size);
2745         if (nxt_slow_path(skcf->sockaddr == NULL)) {
2746             return NULL;
2747         }
2748 
2749         nxt_memcpy(skcf->sockaddr, sa, size);
2750     }
2751 
2752     return skcf;
2753 }
2754 
2755 
2756 static nxt_int_t
nxt_router_listen_socket_find(nxt_router_temp_conf_t * tmcf,nxt_socket_conf_t * nskcf,nxt_sockaddr_t * sa)2757 nxt_router_listen_socket_find(nxt_router_temp_conf_t *tmcf,
2758     nxt_socket_conf_t *nskcf, nxt_sockaddr_t *sa)
2759 {
2760     nxt_router_t       *router;
2761     nxt_queue_link_t   *qlk;
2762     nxt_socket_conf_t  *skcf;
2763 
2764     router = tmcf->router_conf->router;
2765 
2766     for (qlk = nxt_queue_first(&router->sockets);
2767          qlk != nxt_queue_tail(&router->sockets);
2768          qlk = nxt_queue_next(qlk))
2769     {
2770         skcf = nxt_queue_link_data(qlk, nxt_socket_conf_t, link);
2771 
2772         if (nxt_sockaddr_cmp(skcf->listen->sockaddr, sa)) {
2773             nskcf->listen = skcf->listen;
2774 
2775             nxt_queue_remove(qlk);
2776             nxt_queue_insert_tail(&keeping_sockets, qlk);
2777 
2778             nxt_queue_insert_tail(&updating_sockets, &nskcf->link);
2779 
2780             return NXT_OK;
2781         }
2782     }
2783 
2784     nxt_queue_insert_tail(&pending_sockets, &nskcf->link);
2785 
2786     return NXT_DECLINED;
2787 }
2788 
2789 
2790 static void
nxt_router_listen_socket_rpc_create(nxt_task_t * task,nxt_router_temp_conf_t * tmcf,nxt_socket_conf_t * skcf)2791 nxt_router_listen_socket_rpc_create(nxt_task_t *task,
2792     nxt_router_temp_conf_t *tmcf, nxt_socket_conf_t *skcf)
2793 {
2794     size_t            size;
2795     uint32_t          stream;
2796     nxt_int_t         ret;
2797     nxt_buf_t         *b;
2798     nxt_port_t        *main_port, *router_port;
2799     nxt_runtime_t     *rt;
2800     nxt_socket_rpc_t  *rpc;
2801 
2802     rpc = nxt_mp_alloc(tmcf->mem_pool, sizeof(nxt_socket_rpc_t));
2803     if (rpc == NULL) {
2804         goto fail;
2805     }
2806 
2807     rpc->socket_conf = skcf;
2808     rpc->temp_conf = tmcf;
2809 
2810     size = nxt_sockaddr_size(skcf->listen->sockaddr);
2811 
2812     b = nxt_buf_mem_alloc(tmcf->mem_pool, size, 0);
2813     if (b == NULL) {
2814         goto fail;
2815     }
2816 
2817     b->completion_handler = nxt_buf_dummy_completion;
2818 
2819     b->mem.free = nxt_cpymem(b->mem.free, skcf->listen->sockaddr, size);
2820 
2821     rt = task->thread->runtime;
2822     main_port = rt->port_by_type[NXT_PROCESS_MAIN];
2823     router_port = rt->port_by_type[NXT_PROCESS_ROUTER];
2824 
2825     stream = nxt_port_rpc_register_handler(task, router_port,
2826                                            nxt_router_listen_socket_ready,
2827                                            nxt_router_listen_socket_error,
2828                                            main_port->pid, rpc);
2829     if (nxt_slow_path(stream == 0)) {
2830         goto fail;
2831     }
2832 
2833     ret = nxt_port_socket_write(task, main_port, NXT_PORT_MSG_SOCKET, -1,
2834                                 stream, router_port->id, b);
2835 
2836     if (nxt_slow_path(ret != NXT_OK)) {
2837         nxt_port_rpc_cancel(task, router_port, stream);
2838         goto fail;
2839     }
2840 
2841     return;
2842 
2843 fail:
2844 
2845     nxt_router_conf_error(task, tmcf);
2846 }
2847 
2848 
2849 static void
nxt_router_listen_socket_ready(nxt_task_t * task,nxt_port_recv_msg_t * msg,void * data)2850 nxt_router_listen_socket_ready(nxt_task_t *task, nxt_port_recv_msg_t *msg,
2851     void *data)
2852 {
2853     nxt_int_t         ret;
2854     nxt_socket_t      s;
2855     nxt_socket_rpc_t  *rpc;
2856 
2857     rpc = data;
2858 
2859     s = msg->fd[0];
2860 
2861     ret = nxt_socket_nonblocking(task, s);
2862     if (nxt_slow_path(ret != NXT_OK)) {
2863         goto fail;
2864     }
2865 
2866     nxt_socket_defer_accept(task, s, rpc->socket_conf->listen->sockaddr);
2867 
2868     ret = nxt_listen_socket(task, s, NXT_LISTEN_BACKLOG);
2869     if (nxt_slow_path(ret != NXT_OK)) {
2870         goto fail;
2871     }
2872 
2873     rpc->socket_conf->listen->socket = s;
2874 
2875     nxt_work_queue_add(&task->thread->engine->fast_work_queue,
2876                        nxt_router_conf_apply, task, rpc->temp_conf, NULL);
2877 
2878     return;
2879 
2880 fail:
2881 
2882     nxt_socket_close(task, s);
2883 
2884     nxt_router_conf_error(task, rpc->temp_conf);
2885 }
2886 
2887 
2888 static void
nxt_router_listen_socket_error(nxt_task_t * task,nxt_port_recv_msg_t * msg,void * data)2889 nxt_router_listen_socket_error(nxt_task_t *task, nxt_port_recv_msg_t *msg,
2890     void *data)
2891 {
2892     nxt_socket_rpc_t        *rpc;
2893     nxt_router_temp_conf_t  *tmcf;
2894 
2895     rpc = data;
2896     tmcf = rpc->temp_conf;
2897 
2898 #if 0
2899     u_char                  *p;
2900     size_t                  size;
2901     uint8_t                 error;
2902     nxt_buf_t               *in, *out;
2903     nxt_sockaddr_t          *sa;
2904 
2905     static nxt_str_t  socket_errors[] = {
2906         nxt_string("ListenerSystem"),
2907         nxt_string("ListenerNoIPv6"),
2908         nxt_string("ListenerPort"),
2909         nxt_string("ListenerInUse"),
2910         nxt_string("ListenerNoAddress"),
2911         nxt_string("ListenerNoAccess"),
2912         nxt_string("ListenerPath"),
2913     };
2914 
2915     sa = rpc->socket_conf->listen->sockaddr;
2916 
2917     in = nxt_buf_chk_make_plain(tmcf->mem_pool, msg->buf, msg->size);
2918 
2919     if (nxt_slow_path(in == NULL)) {
2920         return;
2921     }
2922 
2923     p = in->mem.pos;
2924 
2925     error = *p++;
2926 
2927     size = nxt_length("listen socket error: ")
2928            + nxt_length("{listener: \"\", code:\"\", message: \"\"}")
2929            + sa->length + socket_errors[error].length + (in->mem.free - p);
2930 
2931     out = nxt_buf_mem_alloc(tmcf->mem_pool, size, 0);
2932     if (nxt_slow_path(out == NULL)) {
2933         return;
2934     }
2935 
2936     out->mem.free = nxt_sprintf(out->mem.free, out->mem.end,
2937                         "listen socket error: "
2938                         "{listener: \"%*s\", code:\"%V\", message: \"%*s\"}",
2939                         (size_t) sa->length, nxt_sockaddr_start(sa),
2940                         &socket_errors[error], in->mem.free - p, p);
2941 
2942     nxt_debug(task, "%*s", out->mem.free - out->mem.pos, out->mem.pos);
2943 #endif
2944 
2945     nxt_router_conf_error(task, tmcf);
2946 }
2947 
2948 
2949 #if (NXT_TLS)
2950 
2951 static void
nxt_router_tls_rpc_handler(nxt_task_t * task,nxt_port_recv_msg_t * msg,void * data)2952 nxt_router_tls_rpc_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg,
2953     void *data)
2954 {
2955     nxt_mp_t                *mp;
2956     nxt_int_t               ret;
2957     nxt_tls_conf_t          *tlscf;
2958     nxt_router_tlssock_t    *tls;
2959     nxt_tls_bundle_conf_t   *bundle;
2960     nxt_router_temp_conf_t  *tmcf;
2961 
2962     nxt_debug(task, "tls rpc handler");
2963 
2964     tls = data;
2965     tmcf = tls->temp_conf;
2966 
2967     if (msg == NULL || msg->port_msg.type == _NXT_PORT_MSG_RPC_ERROR) {
2968         goto fail;
2969     }
2970 
2971     mp = tmcf->router_conf->mem_pool;
2972 
2973     if (tls->socket_conf->tls == NULL){
2974         tlscf = nxt_mp_zget(mp, sizeof(nxt_tls_conf_t));
2975         if (nxt_slow_path(tlscf == NULL)) {
2976             goto fail;
2977         }
2978 
2979         tlscf->no_wait_shutdown = 1;
2980         tls->socket_conf->tls = tlscf;
2981 
2982     } else {
2983         tlscf = tls->socket_conf->tls;
2984     }
2985 
2986     tls->tls_init->conf = tlscf;
2987 
2988     bundle = nxt_mp_get(mp, sizeof(nxt_tls_bundle_conf_t));
2989     if (nxt_slow_path(bundle == NULL)) {
2990         goto fail;
2991     }
2992 
2993     if (nxt_slow_path(nxt_str_dup(mp, &bundle->name, &tls->name) == NULL)) {
2994         goto fail;
2995     }
2996 
2997     bundle->chain_file = msg->fd[0];
2998     bundle->next = tlscf->bundle;
2999     tlscf->bundle = bundle;
3000 
3001     ret = task->thread->runtime->tls->server_init(task, mp, tls->tls_init,
3002                                                   tls->last);
3003     if (nxt_slow_path(ret != NXT_OK)) {
3004         goto fail;
3005     }
3006 
3007     nxt_work_queue_add(&task->thread->engine->fast_work_queue,
3008                        nxt_router_conf_apply, task, tmcf, NULL);
3009     return;
3010 
3011 fail:
3012 
3013     nxt_router_conf_error(task, tmcf);
3014 }
3015 
3016 #endif
3017 
3018 
3019 static void
nxt_router_app_rpc_create(nxt_task_t * task,nxt_router_temp_conf_t * tmcf,nxt_app_t * app)3020 nxt_router_app_rpc_create(nxt_task_t *task,
3021     nxt_router_temp_conf_t *tmcf, nxt_app_t *app)
3022 {
3023     size_t         size;
3024     uint32_t       stream;
3025     nxt_fd_t       port_fd, queue_fd;
3026     nxt_int_t      ret;
3027     nxt_buf_t      *b;
3028     nxt_port_t     *router_port, *dport;
3029     nxt_runtime_t  *rt;
3030     nxt_app_rpc_t  *rpc;
3031 
3032     rt = task->thread->runtime;
3033 
3034     dport = app->proto_port;
3035 
3036     if (dport == NULL) {
3037         nxt_debug(task, "app '%V' prototype prefork", &app->name);
3038 
3039         size = app->name.length + 1 + app->conf.length;
3040 
3041         b = nxt_buf_mem_alloc(tmcf->mem_pool, size, 0);
3042         if (nxt_slow_path(b == NULL)) {
3043             goto fail;
3044         }
3045 
3046         b->completion_handler = nxt_buf_dummy_completion;
3047 
3048         nxt_buf_cpystr(b, &app->name);
3049         *b->mem.free++ = '\0';
3050         nxt_buf_cpystr(b, &app->conf);
3051 
3052         dport = rt->port_by_type[NXT_PROCESS_MAIN];
3053 
3054         port_fd = app->shared_port->pair[0];
3055         queue_fd = app->shared_port->queue_fd;
3056 
3057     } else {
3058         nxt_debug(task, "app '%V' prefork", &app->name);
3059 
3060         b = NULL;
3061         port_fd = -1;
3062         queue_fd = -1;
3063     }
3064 
3065     router_port = rt->port_by_type[NXT_PROCESS_ROUTER];
3066 
3067     rpc = nxt_port_rpc_register_handler_ex(task, router_port,
3068                                            nxt_router_app_prefork_ready,
3069                                            nxt_router_app_prefork_error,
3070                                            sizeof(nxt_app_rpc_t));
3071     if (nxt_slow_path(rpc == NULL)) {
3072         goto fail;
3073     }
3074 
3075     rpc->app = app;
3076     rpc->temp_conf = tmcf;
3077     rpc->proto = (b != NULL);
3078 
3079     stream = nxt_port_rpc_ex_stream(rpc);
3080 
3081     ret = nxt_port_socket_write2(task, dport, NXT_PORT_MSG_START_PROCESS,
3082                                  port_fd, queue_fd, stream, router_port->id, b);
3083     if (nxt_slow_path(ret != NXT_OK)) {
3084         nxt_port_rpc_cancel(task, router_port, stream);
3085         goto fail;
3086     }
3087 
3088     if (b == NULL) {
3089         nxt_port_rpc_ex_set_peer(task, router_port, rpc, dport->pid);
3090 
3091         app->pending_processes++;
3092     }
3093 
3094     return;
3095 
3096 fail:
3097 
3098     nxt_router_conf_error(task, tmcf);
3099 }
3100 
3101 
3102 static void
nxt_router_app_prefork_ready(nxt_task_t * task,nxt_port_recv_msg_t * msg,void * data)3103 nxt_router_app_prefork_ready(nxt_task_t *task, nxt_port_recv_msg_t *msg,
3104     void *data)
3105 {
3106     nxt_app_t           *app;
3107     nxt_port_t          *port;
3108     nxt_app_rpc_t       *rpc;
3109     nxt_event_engine_t  *engine;
3110 
3111     rpc = data;
3112     app = rpc->app;
3113 
3114     port = msg->u.new_port;
3115 
3116     nxt_assert(port != NULL);
3117     nxt_assert(port->id == 0);
3118 
3119     if (rpc->proto) {
3120         nxt_assert(app->proto_port == NULL);
3121         nxt_assert(port->type == NXT_PROCESS_PROTOTYPE);
3122 
3123         nxt_port_inc_use(port);
3124 
3125         app->proto_port = port;
3126         port->app = app;
3127 
3128         nxt_router_app_rpc_create(task, rpc->temp_conf, app);
3129 
3130         return;
3131     }
3132 
3133     nxt_assert(port->type == NXT_PROCESS_APP);
3134 
3135     port->app = app;
3136     port->main_app_port = port;
3137 
3138     app->pending_processes--;
3139     app->processes++;
3140     app->idle_processes++;
3141 
3142     engine = task->thread->engine;
3143 
3144     nxt_queue_insert_tail(&app->ports, &port->app_link);
3145     nxt_queue_insert_tail(&app->spare_ports, &port->idle_link);
3146 
3147     nxt_debug(task, "app '%V' move new port %PI:%d to spare_ports",
3148               &app->name, port->pid, port->id);
3149 
3150     nxt_port_hash_add(&app->port_hash, port);
3151     app->port_hash_count++;
3152 
3153     port->idle_start = 0;
3154 
3155     nxt_port_inc_use(port);
3156 
3157     nxt_port_socket_write(task, port, NXT_PORT_MSG_PORT_ACK, -1, 0, 0, NULL);
3158 
3159     nxt_work_queue_add(&engine->fast_work_queue,
3160                        nxt_router_conf_apply, task, rpc->temp_conf, NULL);
3161 }
3162 
3163 
3164 static void
nxt_router_app_prefork_error(nxt_task_t * task,nxt_port_recv_msg_t * msg,void * data)3165 nxt_router_app_prefork_error(nxt_task_t *task, nxt_port_recv_msg_t *msg,
3166     void *