xref: /unit/src/nxt_router.c (revision 2133:46433e3cef45)
1 
2 /*
3  * Copyright (C) Igor Sysoev
4  * Copyright (C) Valentin V. Bartenev
5  * Copyright (C) NGINX, Inc.
6  */
7 
8 #include <nxt_router.h>
9 #include <nxt_conf.h>
10 #if (NXT_TLS)
11 #include <nxt_cert.h>
12 #endif
13 #include <nxt_http.h>
14 #include <nxt_port_memory_int.h>
15 #include <nxt_unit_request.h>
16 #include <nxt_unit_response.h>
17 #include <nxt_router_request.h>
18 #include <nxt_app_queue.h>
19 #include <nxt_port_queue.h>
20 
21 #define NXT_SHARED_PORT_ID  0xFFFFu
22 
23 typedef struct {
24     nxt_str_t         type;
25     uint32_t          processes;
26     uint32_t          max_processes;
27     uint32_t          spare_processes;
28     nxt_msec_t        timeout;
29     nxt_msec_t        idle_timeout;
30     nxt_conf_value_t  *limits_value;
31     nxt_conf_value_t  *processes_value;
32     nxt_conf_value_t  *targets_value;
33 } nxt_router_app_conf_t;
34 
35 
36 typedef struct {
37     nxt_str_t         pass;
38     nxt_str_t         application;
39 } nxt_router_listener_conf_t;
40 
41 
42 #if (NXT_TLS)
43 
44 typedef struct {
45     nxt_str_t               name;
46     nxt_socket_conf_t       *socket_conf;
47     nxt_router_temp_conf_t  *temp_conf;
48     nxt_tls_init_t          *tls_init;
49     nxt_bool_t              last;
50 
51     nxt_queue_link_t        link;  /* for nxt_socket_conf_t.tls */
52 } nxt_router_tlssock_t;
53 
54 #endif
55 
56 
57 typedef struct {
58     nxt_str_t               *name;
59     nxt_socket_conf_t       *socket_conf;
60     nxt_router_temp_conf_t  *temp_conf;
61     nxt_bool_t              last;
62 } nxt_socket_rpc_t;
63 
64 
65 typedef struct {
66     nxt_app_t               *app;
67     nxt_router_temp_conf_t  *temp_conf;
68     uint8_t                 proto;  /* 1 bit */
69 } nxt_app_rpc_t;
70 
71 
72 typedef struct {
73     nxt_app_joint_t         *app_joint;
74     uint32_t                generation;
75     uint8_t                 proto;  /* 1 bit */
76 } nxt_app_joint_rpc_t;
77 
78 
79 static nxt_int_t nxt_router_prefork(nxt_task_t *task, nxt_process_t *process,
80     nxt_mp_t *mp);
81 static nxt_int_t nxt_router_start(nxt_task_t *task, nxt_process_data_t *data);
82 static void nxt_router_greet_controller(nxt_task_t *task,
83     nxt_port_t *controller_port);
84 
85 static nxt_int_t nxt_router_start_app_process(nxt_task_t *task, nxt_app_t *app);
86 
87 static void nxt_router_new_port_handler(nxt_task_t *task,
88     nxt_port_recv_msg_t *msg);
89 static void nxt_router_conf_data_handler(nxt_task_t *task,
90     nxt_port_recv_msg_t *msg);
91 static void nxt_router_app_restart_handler(nxt_task_t *task,
92     nxt_port_recv_msg_t *msg);
93 static void nxt_router_remove_pid_handler(nxt_task_t *task,
94     nxt_port_recv_msg_t *msg);
95 static void nxt_router_access_log_reopen_handler(nxt_task_t *task,
96     nxt_port_recv_msg_t *msg);
97 
98 static nxt_router_temp_conf_t *nxt_router_temp_conf(nxt_task_t *task);
99 static void nxt_router_conf_apply(nxt_task_t *task, void *obj, void *data);
100 static void nxt_router_conf_ready(nxt_task_t *task,
101     nxt_router_temp_conf_t *tmcf);
102 static void nxt_router_conf_error(nxt_task_t *task,
103     nxt_router_temp_conf_t *tmcf);
104 static void nxt_router_conf_send(nxt_task_t *task,
105     nxt_router_temp_conf_t *tmcf, nxt_port_msg_type_t type);
106 
107 static nxt_int_t nxt_router_conf_create(nxt_task_t *task,
108     nxt_router_temp_conf_t *tmcf, u_char *start, u_char *end);
109 static nxt_int_t nxt_router_conf_process_static(nxt_task_t *task,
110     nxt_router_conf_t *rtcf, nxt_conf_value_t *conf);
111 static nxt_http_forward_t *nxt_router_conf_forward(nxt_task_t *task,
112     nxt_mp_t *mp, nxt_conf_value_t *conf);
113 static nxt_int_t nxt_router_conf_forward_header(nxt_mp_t *mp,
114     nxt_conf_value_t *conf, nxt_http_forward_header_t *fh);
115 
116 static nxt_app_t *nxt_router_app_find(nxt_queue_t *queue, nxt_str_t *name);
117 static nxt_int_t nxt_router_apps_hash_test(nxt_lvlhsh_query_t *lhq, void *data);
118 static nxt_int_t nxt_router_apps_hash_add(nxt_router_conf_t *rtcf,
119     nxt_app_t *app);
120 static nxt_app_t *nxt_router_apps_hash_get(nxt_router_conf_t *rtcf,
121     nxt_str_t *name);
122 static void nxt_router_apps_hash_use(nxt_task_t *task, nxt_router_conf_t *rtcf,
123     int i);
124 
125 static nxt_int_t nxt_router_app_queue_init(nxt_task_t *task,
126     nxt_port_t *port);
127 static nxt_int_t nxt_router_port_queue_init(nxt_task_t *task,
128     nxt_port_t *port);
129 static nxt_int_t nxt_router_port_queue_map(nxt_task_t *task,
130     nxt_port_t *port, nxt_fd_t fd);
131 static void nxt_router_listen_socket_rpc_create(nxt_task_t *task,
132     nxt_router_temp_conf_t *tmcf, nxt_socket_conf_t *skcf);
133 static void nxt_router_listen_socket_ready(nxt_task_t *task,
134     nxt_port_recv_msg_t *msg, void *data);
135 static void nxt_router_listen_socket_error(nxt_task_t *task,
136     nxt_port_recv_msg_t *msg, void *data);
137 #if (NXT_TLS)
138 static void nxt_router_tls_rpc_handler(nxt_task_t *task,
139     nxt_port_recv_msg_t *msg, void *data);
140 static nxt_int_t nxt_router_conf_tls_insert(nxt_router_temp_conf_t *tmcf,
141     nxt_conf_value_t *value, nxt_socket_conf_t *skcf, nxt_tls_init_t *tls_init,
142     nxt_bool_t last);
143 #endif
144 static void nxt_router_app_rpc_create(nxt_task_t *task,
145     nxt_router_temp_conf_t *tmcf, nxt_app_t *app);
146 static void nxt_router_app_prefork_ready(nxt_task_t *task,
147     nxt_port_recv_msg_t *msg, void *data);
148 static void nxt_router_app_prefork_error(nxt_task_t *task,
149     nxt_port_recv_msg_t *msg, void *data);
150 static nxt_socket_conf_t *nxt_router_socket_conf(nxt_task_t *task,
151     nxt_router_temp_conf_t *tmcf, nxt_str_t *name);
152 static nxt_int_t nxt_router_listen_socket_find(nxt_router_temp_conf_t *tmcf,
153     nxt_socket_conf_t *nskcf, nxt_sockaddr_t *sa);
154 
155 static nxt_int_t nxt_router_engines_create(nxt_task_t *task,
156     nxt_router_t *router, nxt_router_temp_conf_t *tmcf,
157     const nxt_event_interface_t *interface);
158 static nxt_int_t nxt_router_engine_conf_create(nxt_router_temp_conf_t *tmcf,
159     nxt_router_engine_conf_t *recf);
160 static nxt_int_t nxt_router_engine_conf_update(nxt_router_temp_conf_t *tmcf,
161     nxt_router_engine_conf_t *recf);
162 static nxt_int_t nxt_router_engine_conf_delete(nxt_router_temp_conf_t *tmcf,
163     nxt_router_engine_conf_t *recf);
164 static nxt_int_t nxt_router_engine_joints_create(nxt_router_temp_conf_t *tmcf,
165     nxt_router_engine_conf_t *recf, nxt_queue_t *sockets,
166     nxt_work_handler_t handler);
167 static nxt_int_t nxt_router_engine_quit(nxt_router_temp_conf_t *tmcf,
168     nxt_router_engine_conf_t *recf);
169 static nxt_int_t nxt_router_engine_joints_delete(nxt_router_temp_conf_t *tmcf,
170     nxt_router_engine_conf_t *recf, nxt_queue_t *sockets);
171 
172 static nxt_int_t nxt_router_threads_create(nxt_task_t *task, nxt_runtime_t *rt,
173     nxt_router_temp_conf_t *tmcf);
174 static nxt_int_t nxt_router_thread_create(nxt_task_t *task, nxt_runtime_t *rt,
175     nxt_event_engine_t *engine);
176 static void nxt_router_apps_sort(nxt_task_t *task, nxt_router_t *router,
177     nxt_router_temp_conf_t *tmcf);
178 
179 static void nxt_router_engines_post(nxt_router_t *router,
180     nxt_router_temp_conf_t *tmcf);
181 static void nxt_router_engine_post(nxt_event_engine_t *engine,
182     nxt_work_t *jobs);
183 
184 static void nxt_router_thread_start(void *data);
185 static void nxt_router_rt_add_port(nxt_task_t *task, void *obj,
186     void *data);
187 static void nxt_router_listen_socket_create(nxt_task_t *task, void *obj,
188     void *data);
189 static void nxt_router_listen_socket_update(nxt_task_t *task, void *obj,
190     void *data);
191 static void nxt_router_listen_socket_delete(nxt_task_t *task, void *obj,
192     void *data);
193 static void nxt_router_worker_thread_quit(nxt_task_t *task, void *obj,
194     void *data);
195 static void nxt_router_listen_socket_close(nxt_task_t *task, void *obj,
196     void *data);
197 static void nxt_router_thread_exit_handler(nxt_task_t *task, void *obj,
198     void *data);
199 static void nxt_router_req_headers_ack_handler(nxt_task_t *task,
200     nxt_port_recv_msg_t *msg, nxt_request_rpc_data_t *req_rpc_data);
201 static void nxt_router_listen_socket_release(nxt_task_t *task,
202     nxt_socket_conf_t *skcf);
203 
204 static void nxt_router_access_log_writer(nxt_task_t *task,
205     nxt_http_request_t *r, nxt_router_access_log_t *access_log);
206 static u_char *nxt_router_access_log_date(u_char *buf, nxt_realtime_t *now,
207     struct tm *tm, size_t size, const char *format);
208 static void nxt_router_access_log_open(nxt_task_t *task,
209     nxt_router_temp_conf_t *tmcf);
210 static void nxt_router_access_log_ready(nxt_task_t *task,
211     nxt_port_recv_msg_t *msg, void *data);
212 static void nxt_router_access_log_error(nxt_task_t *task,
213     nxt_port_recv_msg_t *msg, void *data);
214 static void nxt_router_access_log_use(nxt_thread_spinlock_t *lock,
215     nxt_router_access_log_t *access_log);
216 static void nxt_router_access_log_release(nxt_task_t *task,
217     nxt_thread_spinlock_t *lock, nxt_router_access_log_t *access_log);
218 static void nxt_router_access_log_reopen_completion(nxt_task_t *task, void *obj,
219     void *data);
220 static void nxt_router_access_log_reopen_ready(nxt_task_t *task,
221     nxt_port_recv_msg_t *msg, void *data);
222 static void nxt_router_access_log_reopen_error(nxt_task_t *task,
223     nxt_port_recv_msg_t *msg, void *data);
224 
225 static void nxt_router_app_port_ready(nxt_task_t *task,
226     nxt_port_recv_msg_t *msg, void *data);
227 static void nxt_router_app_port_error(nxt_task_t *task,
228     nxt_port_recv_msg_t *msg, void *data);
229 
230 static void nxt_router_app_use(nxt_task_t *task, nxt_app_t *app, int i);
231 static void nxt_router_app_unlink(nxt_task_t *task, nxt_app_t *app);
232 
233 static void nxt_router_app_port_release(nxt_task_t *task, nxt_app_t *app,
234     nxt_port_t *port, nxt_apr_action_t action);
235 static void nxt_router_app_port_get(nxt_task_t *task, nxt_app_t *app,
236     nxt_request_rpc_data_t *req_rpc_data);
237 static void nxt_router_http_request_error(nxt_task_t *task, void *obj,
238     void *data);
239 static void nxt_router_http_request_done(nxt_task_t *task, void *obj,
240     void *data);
241 
242 static void nxt_router_app_prepare_request(nxt_task_t *task,
243     nxt_request_rpc_data_t *req_rpc_data);
244 static nxt_buf_t *nxt_router_prepare_msg(nxt_task_t *task,
245     nxt_http_request_t *r, nxt_app_t *app, const nxt_str_t *prefix);
246 
247 static void nxt_router_app_timeout(nxt_task_t *task, void *obj, void *data);
248 static void nxt_router_adjust_idle_timer(nxt_task_t *task, void *obj,
249     void *data);
250 static void nxt_router_app_idle_timeout(nxt_task_t *task, void *obj,
251     void *data);
252 static void nxt_router_app_joint_release_handler(nxt_task_t *task, void *obj,
253     void *data);
254 static void nxt_router_free_app(nxt_task_t *task, void *obj, void *data);
255 
256 static const nxt_http_request_state_t  nxt_http_request_send_state;
257 static void nxt_http_request_send_body(nxt_task_t *task, void *obj, void *data);
258 
259 static void nxt_router_app_joint_use(nxt_task_t *task,
260     nxt_app_joint_t *app_joint, int i);
261 
262 static void nxt_router_http_request_release_post(nxt_task_t *task,
263     nxt_http_request_t *r);
264 static void nxt_router_http_request_release(nxt_task_t *task, void *obj,
265     void *data);
266 static void nxt_router_oosm_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg);
267 static void nxt_router_get_port_handler(nxt_task_t *task,
268     nxt_port_recv_msg_t *msg);
269 static void nxt_router_get_mmap_handler(nxt_task_t *task,
270     nxt_port_recv_msg_t *msg);
271 
272 extern const nxt_http_request_state_t  nxt_http_websocket;
273 
274 static nxt_router_t  *nxt_router;
275 
276 static const nxt_str_t http_prefix = nxt_string("HTTP_");
277 static const nxt_str_t empty_prefix = nxt_string("");
278 
279 static const nxt_str_t  *nxt_app_msg_prefix[] = {
280     &empty_prefix,
281     &empty_prefix,
282     &http_prefix,
283     &http_prefix,
284     &http_prefix,
285     &empty_prefix,
286 };
287 
288 
289 static const nxt_port_handlers_t  nxt_router_process_port_handlers = {
290     .quit         = nxt_signal_quit_handler,
291     .new_port     = nxt_router_new_port_handler,
292     .get_port     = nxt_router_get_port_handler,
293     .change_file  = nxt_port_change_log_file_handler,
294     .mmap         = nxt_port_mmap_handler,
295     .get_mmap     = nxt_router_get_mmap_handler,
296     .data         = nxt_router_conf_data_handler,
297     .app_restart  = nxt_router_app_restart_handler,
298     .remove_pid   = nxt_router_remove_pid_handler,
299     .access_log   = nxt_router_access_log_reopen_handler,
300     .rpc_ready    = nxt_port_rpc_handler,
301     .rpc_error    = nxt_port_rpc_handler,
302     .oosm         = nxt_router_oosm_handler,
303 };
304 
305 
306 const nxt_process_init_t  nxt_router_process = {
307     .name           = "router",
308     .type           = NXT_PROCESS_ROUTER,
309     .prefork        = nxt_router_prefork,
310     .restart        = 1,
311     .setup          = nxt_process_core_setup,
312     .start          = nxt_router_start,
313     .port_handlers  = &nxt_router_process_port_handlers,
314     .signals        = nxt_process_signals,
315 };
316 
317 
318 /* Queues of nxt_socket_conf_t */
319 nxt_queue_t  creating_sockets;
320 nxt_queue_t  pending_sockets;
321 nxt_queue_t  updating_sockets;
322 nxt_queue_t  keeping_sockets;
323 nxt_queue_t  deleting_sockets;
324 
325 
326 static nxt_int_t
nxt_router_prefork(nxt_task_t * task,nxt_process_t * process,nxt_mp_t * mp)327 nxt_router_prefork(nxt_task_t *task, nxt_process_t *process, nxt_mp_t *mp)
328 {
329     nxt_runtime_stop_app_processes(task, task->thread->runtime);
330 
331     return NXT_OK;
332 }
333 
334 
335 static nxt_int_t
nxt_router_start(nxt_task_t * task,nxt_process_data_t * data)336 nxt_router_start(nxt_task_t *task, nxt_process_data_t *data)
337 {
338     nxt_int_t      ret;
339     nxt_port_t     *controller_port;
340     nxt_router_t   *router;
341     nxt_runtime_t  *rt;
342 
343     rt = task->thread->runtime;
344 
345     nxt_log(task, NXT_LOG_INFO, "router started");
346 
347 #if (NXT_TLS)
348     rt->tls = nxt_service_get(rt->services, "SSL/TLS", "OpenSSL");
349     if (nxt_slow_path(rt->tls == NULL)) {
350         return NXT_ERROR;
351     }
352 
353     ret = rt->tls->library_init(task);
354     if (nxt_slow_path(ret != NXT_OK)) {
355         return ret;
356     }
357 #endif
358 
359     ret = nxt_http_init(task);
360     if (nxt_slow_path(ret != NXT_OK)) {
361         return ret;
362     }
363 
364     router = nxt_zalloc(sizeof(nxt_router_t));
365     if (nxt_slow_path(router == NULL)) {
366         return NXT_ERROR;
367     }
368 
369     nxt_queue_init(&router->engines);
370     nxt_queue_init(&router->sockets);
371     nxt_queue_init(&router->apps);
372 
373     nxt_router = router;
374 
375     controller_port = rt->port_by_type[NXT_PROCESS_CONTROLLER];
376     if (controller_port != NULL) {
377         nxt_router_greet_controller(task, controller_port);
378     }
379 
380     return NXT_OK;
381 }
382 
383 
384 static void
nxt_router_greet_controller(nxt_task_t * task,nxt_port_t * controller_port)385 nxt_router_greet_controller(nxt_task_t *task, nxt_port_t *controller_port)
386 {
387     nxt_port_socket_write(task, controller_port, NXT_PORT_MSG_PROCESS_READY,
388                           -1, 0, 0, NULL);
389 }
390 
391 
392 static void
nxt_router_start_app_process_handler(nxt_task_t * task,nxt_port_t * port,void * data)393 nxt_router_start_app_process_handler(nxt_task_t *task, nxt_port_t *port,
394     void *data)
395 {
396     size_t               size;
397     uint32_t             stream;
398     nxt_fd_t             port_fd, queue_fd;
399     nxt_int_t            ret;
400     nxt_app_t            *app;
401     nxt_buf_t            *b;
402     nxt_port_t           *dport;
403     nxt_runtime_t        *rt;
404     nxt_app_joint_rpc_t  *app_joint_rpc;
405 
406     app = data;
407 
408     nxt_thread_mutex_lock(&app->mutex);
409 
410     dport = app->proto_port;
411 
412     nxt_thread_mutex_unlock(&app->mutex);
413 
414     if (dport != NULL) {
415         nxt_debug(task, "app '%V' %p start process", &app->name, app);
416 
417         b = NULL;
418         port_fd = -1;
419         queue_fd = -1;
420 
421     } else {
422         if (app->proto_port_requests > 0) {
423             nxt_debug(task, "app '%V' %p wait for prototype process",
424                       &app->name, app);
425 
426             app->proto_port_requests++;
427 
428             goto skip;
429         }
430 
431         nxt_debug(task, "app '%V' %p start prototype process", &app->name, app);
432 
433         rt = task->thread->runtime;
434         dport = rt->port_by_type[NXT_PROCESS_MAIN];
435 
436         size = app->name.length + 1 + app->conf.length;
437 
438         b = nxt_buf_mem_alloc(task->thread->engine->mem_pool, size, 0);
439         if (nxt_slow_path(b == NULL)) {
440             goto failed;
441         }
442 
443         nxt_buf_cpystr(b, &app->name);
444         *b->mem.free++ = '\0';
445         nxt_buf_cpystr(b, &app->conf);
446 
447         port_fd = app->shared_port->pair[0];
448         queue_fd = app->shared_port->queue_fd;
449     }
450 
451     app_joint_rpc = nxt_port_rpc_register_handler_ex(task, port,
452                                                      nxt_router_app_port_ready,
453                                                      nxt_router_app_port_error,
454                                                    sizeof(nxt_app_joint_rpc_t));
455     if (nxt_slow_path(app_joint_rpc == NULL)) {
456         goto failed;
457     }
458 
459     stream = nxt_port_rpc_ex_stream(app_joint_rpc);
460 
461     ret = nxt_port_socket_write2(task, dport, NXT_PORT_MSG_START_PROCESS,
462                                  port_fd, queue_fd, stream, port->id, b);
463     if (nxt_slow_path(ret != NXT_OK)) {
464         nxt_port_rpc_cancel(task, port, stream);
465 
466         goto failed;
467     }
468 
469     app_joint_rpc->app_joint = app->joint;
470     app_joint_rpc->generation = app->generation;
471     app_joint_rpc->proto = (b != NULL);
472 
473     if (b != NULL) {
474         app->proto_port_requests++;
475 
476         b = NULL;
477     }
478 
479     nxt_router_app_joint_use(task, app->joint, 1);
480 
481 failed:
482 
483     if (b != NULL) {
484         nxt_mp_free(b->data, b);
485     }
486 
487 skip:
488 
489     nxt_router_app_use(task, app, -1);
490 }
491 
492 
493 static void
nxt_router_app_joint_use(nxt_task_t * task,nxt_app_joint_t * app_joint,int i)494 nxt_router_app_joint_use(nxt_task_t *task, nxt_app_joint_t *app_joint, int i)
495 {
496     app_joint->use_count += i;
497 
498     if (app_joint->use_count == 0) {
499         nxt_assert(app_joint->app == NULL);
500 
501         nxt_free(app_joint);
502     }
503 }
504 
505 
506 static nxt_int_t
nxt_router_start_app_process(nxt_task_t * task,nxt_app_t * app)507 nxt_router_start_app_process(nxt_task_t *task, nxt_app_t *app)
508 {
509     nxt_int_t      res;
510     nxt_port_t     *router_port;
511     nxt_runtime_t  *rt;
512 
513     nxt_debug(task, "app '%V' start process", &app->name);
514 
515     rt = task->thread->runtime;
516     router_port = rt->port_by_type[NXT_PROCESS_ROUTER];
517 
518     nxt_router_app_use(task, app, 1);
519 
520     res = nxt_port_post(task, router_port, nxt_router_start_app_process_handler,
521                         app);
522 
523     if (res == NXT_OK) {
524         return res;
525     }
526 
527     nxt_thread_mutex_lock(&app->mutex);
528 
529     app->pending_processes--;
530 
531     nxt_thread_mutex_unlock(&app->mutex);
532 
533     nxt_router_app_use(task, app, -1);
534 
535     return NXT_ERROR;
536 }
537 
538 
539 nxt_inline nxt_bool_t
nxt_router_msg_cancel(nxt_task_t * task,nxt_request_rpc_data_t * req_rpc_data)540 nxt_router_msg_cancel(nxt_task_t *task, nxt_request_rpc_data_t *req_rpc_data)
541 {
542     nxt_buf_t       *b, *next;
543     nxt_bool_t      cancelled;
544     nxt_port_t      *app_port;
545     nxt_msg_info_t  *msg_info;
546 
547     msg_info = &req_rpc_data->msg_info;
548 
549     if (msg_info->buf == NULL) {
550         return 0;
551     }
552 
553     app_port = req_rpc_data->app_port;
554 
555     if (app_port != NULL && app_port->id == NXT_SHARED_PORT_ID) {
556         cancelled = nxt_app_queue_cancel(app_port->queue,
557                                          msg_info->tracking_cookie,
558                                          req_rpc_data->stream);
559 
560         if (cancelled) {
561             nxt_debug(task, "stream #%uD: cancelled by router",
562                       req_rpc_data->stream);
563         }
564 
565     } else {
566         cancelled = 0;
567     }
568 
569     for (b = msg_info->buf; b != NULL; b = next) {
570         next = b->next;
571         b->next = NULL;
572 
573         if (b->is_port_mmap_sent) {
574             b->is_port_mmap_sent = cancelled == 0;
575         }
576 
577         b->completion_handler(task, b, b->parent);
578     }
579 
580     msg_info->buf = NULL;
581 
582     return cancelled;
583 }
584 
585 
586 nxt_inline nxt_bool_t
nxt_queue_chk_remove(nxt_queue_link_t * lnk)587 nxt_queue_chk_remove(nxt_queue_link_t *lnk)
588 {
589     if (lnk->next != NULL) {
590         nxt_queue_remove(lnk);
591 
592         lnk->next = NULL;
593 
594         return 1;
595     }
596 
597     return 0;
598 }
599 
600 
601 nxt_inline void
nxt_request_rpc_data_unlink(nxt_task_t * task,nxt_request_rpc_data_t * req_rpc_data)602 nxt_request_rpc_data_unlink(nxt_task_t *task,
603     nxt_request_rpc_data_t *req_rpc_data)
604 {
605     nxt_app_t           *app;
606     nxt_bool_t          unlinked;
607     nxt_http_request_t  *r;
608 
609     nxt_router_msg_cancel(task, req_rpc_data);
610 
611     app = req_rpc_data->app;
612 
613     if (req_rpc_data->app_port != NULL) {
614         nxt_router_app_port_release(task, app, req_rpc_data->app_port,
615                                     req_rpc_data->apr_action);
616 
617         req_rpc_data->app_port = NULL;
618     }
619 
620     r = req_rpc_data->request;
621 
622     if (r != NULL) {
623         r->timer_data = NULL;
624 
625         nxt_router_http_request_release_post(task, r);
626 
627         r->req_rpc_data = NULL;
628         req_rpc_data->request = NULL;
629 
630         if (app != NULL) {
631             unlinked = 0;
632 
633             nxt_thread_mutex_lock(&app->mutex);
634 
635             if (r->app_link.next != NULL) {
636                 nxt_queue_remove(&r->app_link);
637                 r->app_link.next = NULL;
638 
639                 unlinked = 1;
640             }
641 
642             nxt_thread_mutex_unlock(&app->mutex);
643 
644             if (unlinked) {
645                 nxt_mp_release(r->mem_pool);
646             }
647         }
648     }
649 
650     if (app != NULL) {
651         nxt_router_app_use(task, app, -1);
652 
653         req_rpc_data->app = NULL;
654     }
655 
656     if (req_rpc_data->msg_info.body_fd != -1) {
657         nxt_fd_close(req_rpc_data->msg_info.body_fd);
658 
659         req_rpc_data->msg_info.body_fd = -1;
660     }
661 
662     if (req_rpc_data->rpc_cancel) {
663         req_rpc_data->rpc_cancel = 0;
664 
665         nxt_port_rpc_cancel(task, task->thread->engine->port,
666                             req_rpc_data->stream);
667     }
668 }
669 
670 
671 static void
nxt_router_new_port_handler(nxt_task_t * task,nxt_port_recv_msg_t * msg)672 nxt_router_new_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
673 {
674     nxt_int_t      res;
675     nxt_app_t      *app;
676     nxt_port_t     *port, *main_app_port;
677     nxt_runtime_t  *rt;
678 
679     nxt_port_new_port_handler(task, msg);
680 
681     port = msg->u.new_port;
682 
683     if (port != NULL && port->type == NXT_PROCESS_CONTROLLER) {
684         nxt_router_greet_controller(task, msg->u.new_port);
685     }
686 
687     if (port != NULL && port->type == NXT_PROCESS_PROTOTYPE)  {
688         nxt_port_rpc_handler(task, msg);
689 
690         return;
691     }
692 
693     if (port == NULL || port->type != NXT_PROCESS_APP) {
694 
695         if (msg->port_msg.stream == 0) {
696             return;
697         }
698 
699         msg->port_msg.type = _NXT_PORT_MSG_RPC_ERROR;
700 
701     } else {
702         if (msg->fd[1] != -1) {
703             res = nxt_router_port_queue_map(task, port, msg->fd[1]);
704             if (nxt_slow_path(res != NXT_OK)) {
705                 return;
706             }
707 
708             nxt_fd_close(msg->fd[1]);
709             msg->fd[1] = -1;
710         }
711     }
712 
713     if (msg->port_msg.stream != 0) {
714         nxt_port_rpc_handler(task, msg);
715         return;
716     }
717 
718     nxt_debug(task, "new port id %d (%d)", port->id, port->type);
719 
720     /*
721      * Port with "id == 0" is application 'main' port and it always
722      * should come with non-zero stream.
723      */
724     nxt_assert(port->id != 0);
725 
726     /* Find 'main' app port and get app reference. */
727     rt = task->thread->runtime;
728 
729     /*
730      * It is safe to access 'runtime->ports' hash because 'NEW_PORT'
731      * sent to main port (with id == 0) and processed in main thread.
732      */
733     main_app_port = nxt_port_hash_find(&rt->ports, port->pid, 0);
734     nxt_assert(main_app_port != NULL);
735 
736     app = main_app_port->app;
737 
738     if (nxt_fast_path(app != NULL)) {
739         nxt_thread_mutex_lock(&app->mutex);
740 
741         /* TODO here should be find-and-add code because there can be
742            port waiters in port_hash */
743         nxt_port_hash_add(&app->port_hash, port);
744         app->port_hash_count++;
745 
746         nxt_thread_mutex_unlock(&app->mutex);
747 
748         port->app = app;
749     }
750 
751     port->main_app_port = main_app_port;
752 
753     nxt_port_socket_write(task, port, NXT_PORT_MSG_PORT_ACK, -1, 0, 0, NULL);
754 }
755 
756 
757 static void
nxt_router_conf_data_handler(nxt_task_t * task,nxt_port_recv_msg_t * msg)758 nxt_router_conf_data_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
759 {
760     void                    *p;
761     size_t                  size;
762     nxt_int_t               ret;
763     nxt_port_t              *port;
764     nxt_router_temp_conf_t  *tmcf;
765 
766     port = nxt_runtime_port_find(task->thread->runtime,
767                                  msg->port_msg.pid,
768                                  msg->port_msg.reply_port);
769     if (nxt_slow_path(port == NULL)) {
770         nxt_alert(task, "conf_data_handler: reply port not found");
771         return;
772     }
773 
774     p = MAP_FAILED;
775 
776     /*
777      * Ancient compilers like gcc 4.8.5 on CentOS 7 wants 'size' to be
778      * initialized in 'cleanup' section.
779      */
780     size = 0;
781 
782     tmcf = nxt_router_temp_conf(task);
783     if (nxt_slow_path(tmcf == NULL)) {
784         goto fail;
785     }
786 
787     if (nxt_slow_path(msg->fd[0] == -1)) {
788         nxt_alert(task, "conf_data_handler: invalid shm fd");
789         goto fail;
790     }
791 
792     if (nxt_buf_mem_used_size(&msg->buf->mem) != sizeof(size_t)) {
793         nxt_alert(task, "conf_data_handler: unexpected buffer size (%d)",
794                   (int) nxt_buf_mem_used_size(&msg->buf->mem));
795         goto fail;
796     }
797 
798     nxt_memcpy(&size, msg->buf->mem.pos, sizeof(size_t));
799 
800     p = nxt_mem_mmap(NULL, size, PROT_READ, MAP_SHARED, msg->fd[0], 0);
801 
802     nxt_fd_close(msg->fd[0]);
803     msg->fd[0] = -1;
804 
805     if (nxt_slow_path(p == MAP_FAILED)) {
806         goto fail;
807     }
808 
809     nxt_debug(task, "conf_data_handler(%uz): %*s", size, size, p);
810 
811     tmcf->router_conf->router = nxt_router;
812     tmcf->stream = msg->port_msg.stream;
813     tmcf->port = port;
814 
815     nxt_port_use(task, tmcf->port, 1);
816 
817     ret = nxt_router_conf_create(task, tmcf, p, nxt_pointer_to(p, size));
818 
819     if (nxt_fast_path(ret == NXT_OK)) {
820         nxt_router_conf_apply(task, tmcf, NULL);
821 
822     } else {
823         nxt_router_conf_error(task, tmcf);
824     }
825 
826     goto cleanup;
827 
828 fail:
829 
830     nxt_port_socket_write(task, port, NXT_PORT_MSG_RPC_ERROR, -1,
831                           msg->port_msg.stream, 0, NULL);
832 
833     if (tmcf != NULL) {
834         nxt_mp_release(tmcf->mem_pool);
835     }
836 
837 cleanup:
838 
839     if (p != MAP_FAILED) {
840         nxt_mem_munmap(p, size);
841     }
842 
843     if (msg->fd[0] != -1) {
844         nxt_fd_close(msg->fd[0]);
845         msg->fd[0] = -1;
846     }
847 }
848 
849 
850 static void
nxt_router_app_restart_handler(nxt_task_t * task,nxt_port_recv_msg_t * msg)851 nxt_router_app_restart_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
852 {
853     nxt_app_t            *app;
854     nxt_int_t            ret;
855     nxt_str_t            app_name;
856     nxt_port_t           *reply_port, *shared_port, *old_shared_port;
857     nxt_port_t           *proto_port;
858     nxt_port_msg_type_t  reply;
859 
860     reply_port = nxt_runtime_port_find(task->thread->runtime,
861                                        msg->port_msg.pid,
862                                        msg->port_msg.reply_port);
863     if (nxt_slow_path(reply_port == NULL)) {
864         nxt_alert(task, "app_restart_handler: reply port not found");
865         return;
866     }
867 
868     app_name.length = nxt_buf_mem_used_size(&msg->buf->mem);
869     app_name.start = msg->buf->mem.pos;
870 
871     nxt_debug(task, "app_restart_handler: %V", &app_name);
872 
873     app = nxt_router_app_find(&nxt_router->apps, &app_name);
874 
875     if (nxt_fast_path(app != NULL)) {
876         shared_port = nxt_port_new(task, NXT_SHARED_PORT_ID, nxt_pid,
877                                    NXT_PROCESS_APP);
878         if (nxt_slow_path(shared_port == NULL)) {
879             goto fail;
880         }
881 
882         ret = nxt_port_socket_init(task, shared_port, 0);
883         if (nxt_slow_path(ret != NXT_OK)) {
884             nxt_port_use(task, shared_port, -1);
885             goto fail;
886         }
887 
888         ret = nxt_router_app_queue_init(task, shared_port);
889         if (nxt_slow_path(ret != NXT_OK)) {
890             nxt_port_write_close(shared_port);
891             nxt_port_read_close(shared_port);
892             nxt_port_use(task, shared_port, -1);
893             goto fail;
894         }
895 
896         nxt_port_write_enable(task, shared_port);
897 
898         nxt_thread_mutex_lock(&app->mutex);
899 
900         proto_port = app->proto_port;
901 
902         if (proto_port != NULL) {
903             nxt_debug(task, "send QUIT to prototype '%V' pid %PI", &app->name,
904                       proto_port->pid);
905 
906             app->proto_port = NULL;
907             proto_port->app = NULL;
908         }
909 
910         app->generation++;
911 
912         shared_port->app = app;
913 
914         old_shared_port = app->shared_port;
915         old_shared_port->app = NULL;
916 
917         app->shared_port = shared_port;
918 
919         nxt_thread_mutex_unlock(&app->mutex);
920 
921         nxt_port_close(task, old_shared_port);
922         nxt_port_use(task, old_shared_port, -1);
923 
924         if (proto_port != NULL) {
925             (void) nxt_port_socket_write(task, proto_port, NXT_PORT_MSG_QUIT,
926                                          -1, 0, 0, NULL);
927 
928             nxt_port_close(task, proto_port);
929 
930             nxt_port_use(task, proto_port, -1);
931         }
932 
933         reply = NXT_PORT_MSG_RPC_READY_LAST;
934 
935     } else {
936 
937 fail:
938 
939         reply = NXT_PORT_MSG_RPC_ERROR;
940     }
941 
942     nxt_port_socket_write(task, reply_port, reply, -1, msg->port_msg.stream,
943                           0, NULL);
944 }
945 
946 
947 static void
nxt_router_app_process_remove_pid(nxt_task_t * task,nxt_port_t * port,void * data)948 nxt_router_app_process_remove_pid(nxt_task_t *task, nxt_port_t *port,
949     void *data)
950 {
951     union {
952         nxt_pid_t  removed_pid;
953         void       *data;
954     } u;
955 
956     u.data = data;
957 
958     nxt_port_rpc_remove_peer(task, port, u.removed_pid);
959 }
960 
961 
962 static void
nxt_router_remove_pid_handler(nxt_task_t * task,nxt_port_recv_msg_t * msg)963 nxt_router_remove_pid_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
964 {
965     nxt_event_engine_t  *engine;
966 
967     nxt_port_remove_pid_handler(task, msg);
968 
969     nxt_queue_each(engine, &nxt_router->engines, nxt_event_engine_t, link0)
970     {
971         if (nxt_fast_path(engine->port != NULL)) {
972             nxt_port_post(task, engine->port, nxt_router_app_process_remove_pid,
973                           msg->u.data);
974         }
975     }
976     nxt_queue_loop;
977 
978     if (msg->port_msg.stream == 0) {
979         return;
980     }
981 
982     msg->port_msg.type = _NXT_PORT_MSG_RPC_ERROR;
983 
984     nxt_port_rpc_handler(task, msg);
985 }
986 
987 
988 static nxt_router_temp_conf_t *
nxt_router_temp_conf(nxt_task_t * task)989 nxt_router_temp_conf(nxt_task_t *task)
990 {
991     nxt_mp_t                *mp, *tmp;
992     nxt_router_conf_t       *rtcf;
993     nxt_router_temp_conf_t  *tmcf;
994 
995     mp = nxt_mp_create(1024, 128, 256, 32);
996     if (nxt_slow_path(mp == NULL)) {
997         return NULL;
998     }
999 
1000     rtcf = nxt_mp_zget(mp, sizeof(nxt_router_conf_t));
1001     if (nxt_slow_path(rtcf == NULL)) {
1002         goto fail;
1003     }
1004 
1005     rtcf->mem_pool = mp;
1006 
1007     tmp = nxt_mp_create(1024, 128, 256, 32);
1008     if (nxt_slow_path(tmp == NULL)) {
1009         goto fail;
1010     }
1011 
1012     tmcf = nxt_mp_zget(tmp, sizeof(nxt_router_temp_conf_t));
1013     if (nxt_slow_path(tmcf == NULL)) {
1014         goto temp_fail;
1015     }
1016 
1017     tmcf->mem_pool = tmp;
1018     tmcf->router_conf = rtcf;
1019     tmcf->count = 1;
1020     tmcf->engine = task->thread->engine;
1021 
1022     tmcf->engines = nxt_array_create(tmcf->mem_pool, 4,
1023                                      sizeof(nxt_router_engine_conf_t));
1024     if (nxt_slow_path(tmcf->engines == NULL)) {
1025         goto temp_fail;
1026     }
1027 
1028     nxt_queue_init(&creating_sockets);
1029     nxt_queue_init(&pending_sockets);
1030     nxt_queue_init(&updating_sockets);
1031     nxt_queue_init(&keeping_sockets);
1032     nxt_queue_init(&deleting_sockets);
1033 
1034 #if (NXT_TLS)
1035     nxt_queue_init(&tmcf->tls);
1036 #endif
1037 
1038     nxt_queue_init(&tmcf->apps);
1039     nxt_queue_init(&tmcf->previous);
1040 
1041     return tmcf;
1042 
1043 temp_fail:
1044 
1045     nxt_mp_destroy(tmp);
1046 
1047 fail:
1048 
1049     nxt_mp_destroy(mp);
1050 
1051     return NULL;
1052 }
1053 
1054 
1055 nxt_inline nxt_bool_t
nxt_router_app_can_start(nxt_app_t * app)1056 nxt_router_app_can_start(nxt_app_t *app)
1057 {
1058     return app->processes + app->pending_processes < app->max_processes
1059             && app->pending_processes < app->max_pending_processes;
1060 }
1061 
1062 
1063 nxt_inline nxt_bool_t
nxt_router_app_need_start(nxt_app_t * app)1064 nxt_router_app_need_start(nxt_app_t *app)
1065 {
1066     return (app->active_requests
1067               > app->port_hash_count + app->pending_processes)
1068            || (app->spare_processes
1069                 > app->idle_processes + app->pending_processes);
1070 }
1071 
1072 
1073 static void
nxt_router_conf_apply(nxt_task_t * task,void * obj,void * data)1074 nxt_router_conf_apply(nxt_task_t *task, void *obj, void *data)
1075 {
1076     nxt_int_t                    ret;
1077     nxt_app_t                    *app;
1078     nxt_router_t                 *router;
1079     nxt_runtime_t                *rt;
1080     nxt_queue_link_t             *qlk;
1081     nxt_socket_conf_t            *skcf;
1082     nxt_router_conf_t            *rtcf;
1083     nxt_router_temp_conf_t       *tmcf;
1084     const nxt_event_interface_t  *interface;
1085 #if (NXT_TLS)
1086     nxt_router_tlssock_t         *tls;
1087 #endif
1088 
1089     tmcf = obj;
1090 
1091     qlk = nxt_queue_first(&pending_sockets);
1092 
1093     if (qlk != nxt_queue_tail(&pending_sockets)) {
1094         nxt_queue_remove(qlk);
1095         nxt_queue_insert_tail(&creating_sockets, qlk);
1096 
1097         skcf = nxt_queue_link_data(qlk, nxt_socket_conf_t, link);
1098 
1099         nxt_router_listen_socket_rpc_create(task, tmcf, skcf);
1100 
1101         return;
1102     }
1103 
1104 #if (NXT_TLS)
1105     qlk = nxt_queue_last(&tmcf->tls);
1106 
1107     if (qlk != nxt_queue_head(&tmcf->tls)) {
1108         nxt_queue_remove(qlk);
1109 
1110         tls = nxt_queue_link_data(qlk, nxt_router_tlssock_t, link);
1111 
1112         nxt_cert_store_get(task, &tls->name, tmcf->mem_pool,
1113                            nxt_router_tls_rpc_handler, tls);
1114         return;
1115     }
1116 #endif
1117 
1118     nxt_queue_each(app, &tmcf->apps, nxt_app_t, link) {
1119 
1120         if (nxt_router_app_need_start(app)) {
1121             nxt_router_app_rpc_create(task, tmcf, app);
1122             return;
1123         }
1124 
1125     } nxt_queue_loop;
1126 
1127     rtcf = tmcf->router_conf;
1128 
1129     if (rtcf->access_log != NULL && rtcf->access_log->fd == -1) {
1130         nxt_router_access_log_open(task, tmcf);
1131         return;
1132     }
1133 
1134     rt = task->thread->runtime;
1135 
1136     interface = nxt_service_get(rt->services, "engine", NULL);
1137 
1138     router = rtcf->router;
1139 
1140     ret = nxt_router_engines_create(task, router, tmcf, interface);
1141     if (nxt_slow_path(ret != NXT_OK)) {
1142         goto fail;
1143     }
1144 
1145     ret = nxt_router_threads_create(task, rt, tmcf);
1146     if (nxt_slow_path(ret != NXT_OK)) {
1147         goto fail;
1148     }
1149 
1150     nxt_router_apps_sort(task, router, tmcf);
1151 
1152     nxt_router_apps_hash_use(task, rtcf, 1);
1153 
1154     nxt_router_engines_post(router, tmcf);
1155 
1156     nxt_queue_add(&router->sockets, &updating_sockets);
1157     nxt_queue_add(&router->sockets, &creating_sockets);
1158 
1159     if (router->access_log != rtcf->access_log) {
1160         nxt_router_access_log_use(&router->lock, rtcf->access_log);
1161 
1162         nxt_router_access_log_release(task, &router->lock, router->access_log);
1163 
1164         router->access_log = rtcf->access_log;
1165     }
1166 
1167     nxt_router_conf_ready(task, tmcf);
1168 
1169     return;
1170 
1171 fail:
1172 
1173     nxt_router_conf_error(task, tmcf);
1174 
1175     return;
1176 }
1177 
1178 
1179 static void
nxt_router_conf_wait(nxt_task_t * task,void * obj,void * data)1180 nxt_router_conf_wait(nxt_task_t *task, void *obj, void *data)
1181 {
1182     nxt_joint_job_t  *job;
1183 
1184     job = obj;
1185 
1186     nxt_router_conf_ready(task, job->tmcf);
1187 }
1188 
1189 
1190 static void
nxt_router_conf_ready(nxt_task_t * task,nxt_router_temp_conf_t * tmcf)1191 nxt_router_conf_ready(nxt_task_t *task, nxt_router_temp_conf_t *tmcf)
1192 {
1193     uint32_t               count;
1194     nxt_router_conf_t      *rtcf;
1195     nxt_thread_spinlock_t  *lock;
1196 
1197     nxt_debug(task, "temp conf %p count: %D", tmcf, tmcf->count);
1198 
1199     if (--tmcf->count > 0) {
1200         return;
1201     }
1202 
1203     nxt_router_conf_send(task, tmcf, NXT_PORT_MSG_RPC_READY_LAST);
1204 
1205     rtcf = tmcf->router_conf;
1206 
1207     lock = &rtcf->router->lock;
1208 
1209     nxt_thread_spin_lock(lock);
1210 
1211     count = rtcf->count;
1212 
1213     nxt_thread_spin_unlock(lock);
1214 
1215     nxt_debug(task, "rtcf %p: %D", rtcf, count);
1216 
1217     if (count == 0) {
1218         nxt_router_apps_hash_use(task, rtcf, -1);
1219 
1220         nxt_router_access_log_release(task, lock, rtcf->access_log);
1221 
1222         nxt_mp_destroy(rtcf->mem_pool);
1223     }
1224 
1225     nxt_mp_release(tmcf->mem_pool);
1226 }
1227 
1228 
1229 static void
nxt_router_conf_error(nxt_task_t * task,nxt_router_temp_conf_t * tmcf)1230 nxt_router_conf_error(nxt_task_t *task, nxt_router_temp_conf_t *tmcf)
1231 {
1232     nxt_app_t          *app;
1233     nxt_socket_t       s;
1234     nxt_router_t       *router;
1235     nxt_queue_link_t   *qlk;
1236     nxt_socket_conf_t  *skcf;
1237     nxt_router_conf_t  *rtcf;
1238 
1239     nxt_alert(task, "failed to apply new conf");
1240 
1241     for (qlk = nxt_queue_first(&creating_sockets);
1242          qlk != nxt_queue_tail(&creating_sockets);
1243          qlk = nxt_queue_next(qlk))
1244     {
1245         skcf = nxt_queue_link_data(qlk, nxt_socket_conf_t, link);
1246         s = skcf->listen->socket;
1247 
1248         if (s != -1) {
1249             nxt_socket_close(task, s);
1250         }
1251 
1252         nxt_free(skcf->listen);
1253     }
1254 
1255     rtcf = tmcf->router_conf;
1256 
1257     nxt_queue_each(app, &tmcf->apps, nxt_app_t, link) {
1258 
1259         nxt_router_app_unlink(task, app);
1260 
1261     } nxt_queue_loop;
1262 
1263     router = rtcf->router;
1264 
1265     nxt_queue_add(&router->sockets, &keeping_sockets);
1266     nxt_queue_add(&router->sockets, &deleting_sockets);
1267 
1268     nxt_queue_add(&router->apps, &tmcf->previous);
1269 
1270     // TODO: new engines and threads
1271 
1272     nxt_router_access_log_release(task, &router->lock, rtcf->access_log);
1273 
1274     nxt_mp_destroy(rtcf->mem_pool);
1275 
1276     nxt_router_conf_send(task, tmcf, NXT_PORT_MSG_RPC_ERROR);
1277 
1278     nxt_mp_release(tmcf->mem_pool);
1279 }
1280 
1281 
1282 static void
nxt_router_conf_send(nxt_task_t * task,nxt_router_temp_conf_t * tmcf,nxt_port_msg_type_t type)1283 nxt_router_conf_send(nxt_task_t *task, nxt_router_temp_conf_t *tmcf,
1284     nxt_port_msg_type_t type)
1285 {
1286     nxt_port_socket_write(task, tmcf->port, type, -1, tmcf->stream, 0, NULL);
1287 
1288     nxt_port_use(task, tmcf->port, -1);
1289 
1290     tmcf->port = NULL;
1291 }
1292 
1293 
1294 static nxt_conf_map_t  nxt_router_conf[] = {
1295     {
1296         nxt_string("listeners_threads"),
1297         NXT_CONF_MAP_INT32,
1298         offsetof(nxt_router_conf_t, threads),
1299     },
1300 };
1301 
1302 
1303 static nxt_conf_map_t  nxt_router_app_conf[] = {
1304     {
1305         nxt_string("type"),
1306         NXT_CONF_MAP_STR,
1307         offsetof(nxt_router_app_conf_t, type),
1308     },
1309 
1310     {
1311         nxt_string("limits"),
1312         NXT_CONF_MAP_PTR,
1313         offsetof(nxt_router_app_conf_t, limits_value),
1314     },
1315 
1316     {
1317         nxt_string("processes"),
1318         NXT_CONF_MAP_INT32,
1319         offsetof(nxt_router_app_conf_t, processes),
1320     },
1321 
1322     {
1323         nxt_string("processes"),
1324         NXT_CONF_MAP_PTR,
1325         offsetof(nxt_router_app_conf_t, processes_value),
1326     },
1327 
1328     {
1329         nxt_string("targets"),
1330         NXT_CONF_MAP_PTR,
1331         offsetof(nxt_router_app_conf_t, targets_value),
1332     },
1333 };
1334 
1335 
1336 static nxt_conf_map_t  nxt_router_app_limits_conf[] = {
1337     {
1338         nxt_string("timeout"),
1339         NXT_CONF_MAP_MSEC,
1340         offsetof(nxt_router_app_conf_t, timeout),
1341     },
1342 };
1343 
1344 
1345 static nxt_conf_map_t  nxt_router_app_processes_conf[] = {
1346     {
1347         nxt_string("spare"),
1348         NXT_CONF_MAP_INT32,
1349         offsetof(nxt_router_app_conf_t, spare_processes),
1350     },
1351 
1352     {
1353         nxt_string("max"),
1354         NXT_CONF_MAP_INT32,
1355         offsetof(nxt_router_app_conf_t, max_processes),
1356     },
1357 
1358     {
1359         nxt_string("idle_timeout"),
1360         NXT_CONF_MAP_MSEC,
1361         offsetof(nxt_router_app_conf_t, idle_timeout),
1362     },
1363 };
1364 
1365 
1366 static nxt_conf_map_t  nxt_router_listener_conf[] = {
1367     {
1368         nxt_string("pass"),
1369         NXT_CONF_MAP_STR_COPY,
1370         offsetof(nxt_router_listener_conf_t, pass),
1371     },
1372 
1373     {
1374         nxt_string("application"),
1375         NXT_CONF_MAP_STR_COPY,
1376         offsetof(nxt_router_listener_conf_t, application),
1377     },
1378 };
1379 
1380 
1381 static nxt_conf_map_t  nxt_router_http_conf[] = {
1382     {
1383         nxt_string("header_buffer_size"),
1384         NXT_CONF_MAP_SIZE,
1385         offsetof(nxt_socket_conf_t, header_buffer_size),
1386     },
1387 
1388     {
1389         nxt_string("large_header_buffer_size"),
1390         NXT_CONF_MAP_SIZE,
1391         offsetof(nxt_socket_conf_t, large_header_buffer_size),
1392     },
1393 
1394     {
1395         nxt_string("large_header_buffers"),
1396         NXT_CONF_MAP_SIZE,
1397         offsetof(nxt_socket_conf_t, large_header_buffers),
1398     },
1399 
1400     {
1401         nxt_string("body_buffer_size"),
1402         NXT_CONF_MAP_SIZE,
1403         offsetof(nxt_socket_conf_t, body_buffer_size),
1404     },
1405 
1406     {
1407         nxt_string("max_body_size"),
1408         NXT_CONF_MAP_SIZE,
1409         offsetof(nxt_socket_conf_t, max_body_size),
1410     },
1411 
1412     {
1413         nxt_string("idle_timeout"),
1414         NXT_CONF_MAP_MSEC,
1415         offsetof(nxt_socket_conf_t, idle_timeout),
1416     },
1417 
1418     {
1419         nxt_string("header_read_timeout"),
1420         NXT_CONF_MAP_MSEC,
1421         offsetof(nxt_socket_conf_t, header_read_timeout),
1422     },
1423 
1424     {
1425         nxt_string("body_read_timeout"),
1426         NXT_CONF_MAP_MSEC,
1427         offsetof(nxt_socket_conf_t, body_read_timeout),
1428     },
1429 
1430     {
1431         nxt_string("send_timeout"),
1432         NXT_CONF_MAP_MSEC,
1433         offsetof(nxt_socket_conf_t, send_timeout),
1434     },
1435 
1436     {
1437         nxt_string("body_temp_path"),
1438         NXT_CONF_MAP_STR,
1439         offsetof(nxt_socket_conf_t, body_temp_path),
1440     },
1441 
1442     {
1443         nxt_string("discard_unsafe_fields"),
1444         NXT_CONF_MAP_INT8,
1445         offsetof(nxt_socket_conf_t, discard_unsafe_fields),
1446     },
1447 };
1448 
1449 
1450 static nxt_conf_map_t  nxt_router_websocket_conf[] = {
1451     {
1452         nxt_string("max_frame_size"),
1453         NXT_CONF_MAP_SIZE,
1454         offsetof(nxt_websocket_conf_t, max_frame_size),
1455     },
1456 
1457     {
1458         nxt_string("read_timeout"),
1459         NXT_CONF_MAP_MSEC,
1460         offsetof(nxt_websocket_conf_t, read_timeout),
1461     },
1462 
1463     {
1464         nxt_string("keepalive_interval"),
1465         NXT_CONF_MAP_MSEC,
1466         offsetof(nxt_websocket_conf_t, keepalive_interval),
1467     },
1468 
1469 };
1470 
1471 
1472 static nxt_int_t
nxt_router_conf_create(nxt_task_t * task,nxt_router_temp_conf_t * tmcf,u_char * start,u_char * end)1473 nxt_router_conf_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf,
1474     u_char *start, u_char *end)
1475 {
1476     u_char                      *p;
1477     size_t                      size;
1478     nxt_mp_t                    *mp, *app_mp;
1479     uint32_t                    next, next_target;
1480     nxt_int_t                   ret;
1481     nxt_str_t                   name, path, target;
1482     nxt_app_t                   *app, *prev;
1483     nxt_str_t                   *t, *s, *targets;
1484     nxt_uint_t                  n, i;
1485     nxt_port_t                  *port;
1486     nxt_router_t                *router;
1487     nxt_app_joint_t             *app_joint;
1488 #if (NXT_TLS)
1489     nxt_tls_init_t              *tls_init;
1490     nxt_conf_value_t            *certificate;
1491 #endif
1492     nxt_conf_value_t            *root, *conf, *http, *value, *websocket;
1493     nxt_conf_value_t            *applications, *application;
1494     nxt_conf_value_t            *listeners, *listener;
1495     nxt_socket_conf_t           *skcf;
1496     nxt_router_conf_t           *rtcf;
1497     nxt_http_routes_t           *routes;
1498     nxt_event_engine_t          *engine;
1499     nxt_app_lang_module_t       *lang;
1500     nxt_router_app_conf_t       apcf;
1501     nxt_router_access_log_t     *access_log;
1502     nxt_router_listener_conf_t  lscf;
1503 
1504     static nxt_str_t  http_path = nxt_string("/settings/http");
1505     static nxt_str_t  applications_path = nxt_string("/applications");
1506     static nxt_str_t  listeners_path = nxt_string("/listeners");
1507     static nxt_str_t  routes_path = nxt_string("/routes");
1508     static nxt_str_t  access_log_path = nxt_string("/access_log");
1509 #if (NXT_TLS)
1510     static nxt_str_t  certificate_path = nxt_string("/tls/certificate");
1511     static nxt_str_t  conf_commands_path = nxt_string("/tls/conf_commands");
1512     static nxt_str_t  conf_cache_path = nxt_string("/tls/session/cache_size");
1513     static nxt_str_t  conf_timeout_path = nxt_string("/tls/session/timeout");
1514     static nxt_str_t  conf_tickets = nxt_string("/tls/session/tickets");
1515 #endif
1516     static nxt_str_t  static_path = nxt_string("/settings/http/static");
1517     static nxt_str_t  websocket_path = nxt_string("/settings/http/websocket");
1518     static nxt_str_t  forwarded_path = nxt_string("/forwarded");
1519     static nxt_str_t  client_ip_path = nxt_string("/client_ip");
1520 
1521     root = nxt_conf_json_parse(tmcf->mem_pool, start, end, NULL);
1522     if (root == NULL) {
1523         nxt_alert(task, "configuration parsing error");
1524         return NXT_ERROR;
1525     }
1526 
1527     rtcf = tmcf->router_conf;
1528     mp = rtcf->mem_pool;
1529 
1530     ret = nxt_conf_map_object(mp, root, nxt_router_conf,
1531                               nxt_nitems(nxt_router_conf), rtcf);
1532     if (ret != NXT_OK) {
1533         nxt_alert(task, "root map error");
1534         return NXT_ERROR;
1535     }
1536 
1537     if (rtcf->threads == 0) {
1538         rtcf->threads = nxt_ncpu;
1539     }
1540 
1541     conf = nxt_conf_get_path(root, &static_path);
1542 
1543     ret = nxt_router_conf_process_static(task, rtcf, conf);
1544     if (nxt_slow_path(ret != NXT_OK)) {
1545         return NXT_ERROR;
1546     }
1547 
1548     router = rtcf->router;
1549 
1550     applications = nxt_conf_get_path(root, &applications_path);
1551 
1552     if (applications != NULL) {
1553         next = 0;
1554 
1555         for ( ;; ) {
1556             application = nxt_conf_next_object_member(applications,
1557                                                       &name, &next);
1558             if (application == NULL) {
1559                 break;
1560             }
1561 
1562             nxt_debug(task, "application \"%V\"", &name);
1563 
1564             size = nxt_conf_json_length(application, NULL);
1565 
1566             app_mp = nxt_mp_create(4096, 128, 1024, 64);
1567             if (nxt_slow_path(app_mp == NULL)) {
1568                 goto fail;
1569             }
1570 
1571             app = nxt_mp_get(app_mp, sizeof(nxt_app_t) + name.length + size);
1572             if (app == NULL) {
1573                 goto app_fail;
1574             }
1575 
1576             nxt_memzero(app, sizeof(nxt_app_t));
1577 
1578             app->mem_pool = app_mp;
1579 
1580             app->name.start = nxt_pointer_to(app, sizeof(nxt_app_t));
1581             app->conf.start = nxt_pointer_to(app, sizeof(nxt_app_t)
1582                                                   + name.length);
1583 
1584             p = nxt_conf_json_print(app->conf.start, application, NULL);
1585             app->conf.length = p - app->conf.start;
1586 
1587             nxt_assert(app->conf.length <= size);
1588 
1589             nxt_debug(task, "application conf \"%V\"", &app->conf);
1590 
1591             prev = nxt_router_app_find(&router->apps, &name);
1592 
1593             if (prev != NULL && nxt_strstr_eq(&app->conf, &prev->conf)) {
1594                 nxt_mp_destroy(app_mp);
1595 
1596                 nxt_queue_remove(&prev->link);
1597                 nxt_queue_insert_tail(&tmcf->previous, &prev->link);
1598 
1599                 ret = nxt_router_apps_hash_add(rtcf, prev);
1600                 if (nxt_slow_path(ret != NXT_OK)) {
1601                     goto fail;
1602                 }
1603 
1604                 continue;
1605             }
1606 
1607             apcf.processes = 1;
1608             apcf.max_processes = 1;
1609             apcf.spare_processes = 0;
1610             apcf.timeout = 0;
1611             apcf.idle_timeout = 15000;
1612             apcf.limits_value = NULL;
1613             apcf.processes_value = NULL;
1614             apcf.targets_value = NULL;
1615 
1616             app_joint = nxt_malloc(sizeof(nxt_app_joint_t));
1617             if (nxt_slow_path(app_joint == NULL)) {
1618                 goto app_fail;
1619             }
1620 
1621             nxt_memzero(app_joint, sizeof(nxt_app_joint_t));
1622 
1623             ret = nxt_conf_map_object(mp, application, nxt_router_app_conf,
1624                                       nxt_nitems(nxt_router_app_conf), &apcf);
1625             if (ret != NXT_OK) {
1626                 nxt_alert(task, "application map error");
1627                 goto app_fail;
1628             }
1629 
1630             if (apcf.limits_value != NULL) {
1631 
1632                 if (nxt_conf_type(apcf.limits_value) != NXT_CONF_OBJECT) {
1633                     nxt_alert(task, "application limits is not object");
1634                     goto app_fail;
1635                 }
1636 
1637                 ret = nxt_conf_map_object(mp, apcf.limits_value,
1638                                         nxt_router_app_limits_conf,
1639                                         nxt_nitems(nxt_router_app_limits_conf),
1640                                         &apcf);
1641                 if (ret != NXT_OK) {
1642                     nxt_alert(task, "application limits map error");
1643                     goto app_fail;
1644                 }
1645             }
1646 
1647             if (apcf.processes_value != NULL
1648                 && nxt_conf_type(apcf.processes_value) == NXT_CONF_OBJECT)
1649             {
1650                 ret = nxt_conf_map_object(mp, apcf.processes_value,
1651                                      nxt_router_app_processes_conf,
1652                                      nxt_nitems(nxt_router_app_processes_conf),
1653                                      &apcf);
1654                 if (ret != NXT_OK) {
1655                     nxt_alert(task, "application processes map error");
1656                     goto app_fail;
1657                 }
1658 
1659             } else {
1660                 apcf.max_processes = apcf.processes;
1661                 apcf.spare_processes = apcf.processes;
1662             }
1663 
1664             if (apcf.targets_value != NULL) {
1665                 n = nxt_conf_object_members_count(apcf.targets_value);
1666 
1667                 targets = nxt_mp_get(app_mp, sizeof(nxt_str_t) * n);
1668                 if (nxt_slow_path(targets == NULL)) {
1669                     goto app_fail;
1670                 }
1671 
1672                 next_target = 0;
1673 
1674                 for (i = 0; i < n; i++) {
1675                     (void) nxt_conf_next_object_member(apcf.targets_value,
1676                                                        &target, &next_target);
1677 
1678                     s = nxt_str_dup(app_mp, &targets[i], &target);
1679                     if (nxt_slow_path(s == NULL)) {
1680                         goto app_fail;
1681                     }
1682                 }
1683 
1684             } else {
1685                 targets = NULL;
1686             }
1687 
1688             nxt_debug(task, "application type: %V", &apcf.type);
1689             nxt_debug(task, "application processes: %D", apcf.processes);
1690             nxt_debug(task, "application request timeout: %M", apcf.timeout);
1691 
1692             lang = nxt_app_lang_module(task->thread->runtime, &apcf.type);
1693 
1694             if (lang == NULL) {
1695                 nxt_alert(task, "unknown application type: \"%V\"", &apcf.type);
1696                 goto app_fail;
1697             }
1698 
1699             nxt_debug(task, "application language module: \"%s\"", lang->file);
1700 
1701             ret = nxt_thread_mutex_create(&app->mutex);
1702             if (ret != NXT_OK) {
1703                 goto app_fail;
1704             }
1705 
1706             nxt_queue_init(&app->ports);
1707             nxt_queue_init(&app->spare_ports);
1708             nxt_queue_init(&app->idle_ports);
1709             nxt_queue_init(&app->ack_waiting_req);
1710 
1711             app->name.length = name.length;
1712             nxt_memcpy(app->name.start, name.start, name.length);
1713 
1714             app->type = lang->type;
1715             app->max_processes = apcf.max_processes;
1716             app->spare_processes = apcf.spare_processes;
1717             app->max_pending_processes = apcf.spare_processes
1718                                          ? apcf.spare_processes : 1;
1719             app->timeout = apcf.timeout;
1720             app->idle_timeout = apcf.idle_timeout;
1721 
1722             app->targets = targets;
1723 
1724             engine = task->thread->engine;
1725 
1726             app->engine = engine;
1727 
1728             app->adjust_idle_work.handler = nxt_router_adjust_idle_timer;
1729             app->adjust_idle_work.task = &engine->task;
1730             app->adjust_idle_work.obj = app;
1731 
1732             nxt_queue_insert_tail(&tmcf->apps, &app->link);
1733 
1734             ret = nxt_router_apps_hash_add(rtcf, app);
1735             if (nxt_slow_path(ret != NXT_OK)) {
1736                 goto app_fail;
1737             }
1738 
1739             nxt_router_app_use(task, app, 1);
1740 
1741             app->joint = app_joint;
1742 
1743             app_joint->use_count = 1;
1744             app_joint->app = app;
1745 
1746             app_joint->idle_timer.bias = NXT_TIMER_DEFAULT_BIAS;
1747             app_joint->idle_timer.work_queue = &engine->fast_work_queue;
1748             app_joint->idle_timer.handler = nxt_router_app_idle_timeout;
1749             app_joint->idle_timer.task = &engine->task;
1750             app_joint->idle_timer.log = app_joint->idle_timer.task->log;
1751 
1752             app_joint->free_app_work.handler = nxt_router_free_app;
1753             app_joint->free_app_work.task = &engine->task;
1754             app_joint->free_app_work.obj = app_joint;
1755 
1756             port = nxt_port_new(task, NXT_SHARED_PORT_ID, nxt_pid,
1757                                 NXT_PROCESS_APP);
1758             if (nxt_slow_path(port == NULL)) {
1759                 return NXT_ERROR;
1760             }
1761 
1762             ret = nxt_port_socket_init(task, port, 0);
1763             if (nxt_slow_path(ret != NXT_OK)) {
1764                 nxt_port_use(task, port, -1);
1765                 return NXT_ERROR;
1766             }
1767 
1768             ret = nxt_router_app_queue_init(task, port);
1769             if (nxt_slow_path(ret != NXT_OK)) {
1770                 nxt_port_write_close(port);
1771                 nxt_port_read_close(port);
1772                 nxt_port_use(task, port, -1);
1773                 return NXT_ERROR;
1774             }
1775 
1776             nxt_port_write_enable(task, port);
1777             port->app = app;
1778 
1779             app->shared_port = port;
1780 
1781             nxt_thread_mutex_create(&app->outgoing.mutex);
1782         }
1783     }
1784 
1785     conf = nxt_conf_get_path(root, &routes_path);
1786     if (nxt_fast_path(conf != NULL)) {
1787         routes = nxt_http_routes_create(task, tmcf, conf);
1788         if (nxt_slow_path(routes == NULL)) {
1789             return NXT_ERROR;
1790         }
1791         rtcf->routes = routes;
1792     }
1793 
1794     ret = nxt_upstreams_create(task, tmcf, root);
1795     if (nxt_slow_path(ret != NXT_OK)) {
1796         return ret;
1797     }
1798 
1799     http = nxt_conf_get_path(root, &http_path);
1800 #if 0
1801     if (http == NULL) {
1802         nxt_alert(task, "no \"http\" block");
1803         return NXT_ERROR;
1804     }
1805 #endif
1806 
1807     websocket = nxt_conf_get_path(root, &websocket_path);
1808 
1809     listeners = nxt_conf_get_path(root, &listeners_path);
1810 
1811     if (listeners != NULL) {
1812         next = 0;
1813 
1814         for ( ;; ) {
1815             listener = nxt_conf_next_object_member(listeners, &name, &next);
1816             if (listener == NULL) {
1817                 break;
1818             }
1819 
1820             skcf = nxt_router_socket_conf(task, tmcf, &name);
1821             if (skcf == NULL) {
1822                 goto fail;
1823             }
1824 
1825             nxt_memzero(&lscf, sizeof(lscf));
1826 
1827             ret = nxt_conf_map_object(mp, listener, nxt_router_listener_conf,
1828                                       nxt_nitems(nxt_router_listener_conf),
1829                                       &lscf);
1830             if (ret != NXT_OK) {
1831                 nxt_alert(task, "listener map error");
1832                 goto fail;
1833             }
1834 
1835             nxt_debug(task, "application: %V", &lscf.application);
1836 
1837             // STUB, default values if http block is not defined.
1838             skcf->header_buffer_size = 2048;
1839             skcf->large_header_buffer_size = 8192;
1840             skcf->large_header_buffers = 4;
1841             skcf->discard_unsafe_fields = 1;
1842             skcf->body_buffer_size = 16 * 1024;
1843             skcf->max_body_size = 8 * 1024 * 1024;
1844             skcf->proxy_header_buffer_size = 64 * 1024;
1845             skcf->proxy_buffer_size = 4096;
1846             skcf->proxy_buffers = 256;
1847             skcf->idle_timeout = 180 * 1000;
1848             skcf->header_read_timeout = 30 * 1000;
1849             skcf->body_read_timeout = 30 * 1000;
1850             skcf->send_timeout = 30 * 1000;
1851             skcf->proxy_timeout = 60 * 1000;
1852             skcf->proxy_send_timeout = 30 * 1000;
1853             skcf->proxy_read_timeout = 30 * 1000;
1854 
1855             skcf->websocket_conf.max_frame_size = 1024 * 1024;
1856             skcf->websocket_conf.read_timeout = 60 * 1000;
1857             skcf->websocket_conf.keepalive_interval = 30 * 1000;
1858 
1859             nxt_str_null(&skcf->body_temp_path);
1860 
1861             if (http != NULL) {
1862                 ret = nxt_conf_map_object(mp, http, nxt_router_http_conf,
1863                                           nxt_nitems(nxt_router_http_conf),
1864                                           skcf);
1865                 if (ret != NXT_OK) {
1866                     nxt_alert(task, "http map error");
1867                     goto fail;
1868                 }
1869             }
1870 
1871             if (websocket != NULL) {
1872                 ret = nxt_conf_map_object(mp, websocket,
1873                                           nxt_router_websocket_conf,
1874                                           nxt_nitems(nxt_router_websocket_conf),
1875                                           &skcf->websocket_conf);
1876                 if (ret != NXT_OK) {
1877                     nxt_alert(task, "websocket map error");
1878                     goto fail;
1879                 }
1880             }
1881 
1882             t = &skcf->body_temp_path;
1883 
1884             if (t->length == 0) {
1885                 t->start = (u_char *) task->thread->runtime->tmp;
1886                 t->length = nxt_strlen(t->start);
1887             }
1888 
1889             conf = nxt_conf_get_path(listener, &forwarded_path);
1890 
1891             if (conf != NULL) {
1892                 skcf->forwarded = nxt_router_conf_forward(task, mp, conf);
1893                 if (nxt_slow_path(skcf->forwarded == NULL)) {
1894                     return NXT_ERROR;
1895                 }
1896             }
1897 
1898             conf = nxt_conf_get_path(listener, &client_ip_path);
1899 
1900             if (conf != NULL) {
1901                 skcf->client_ip = nxt_router_conf_forward(task, mp, conf);
1902                 if (nxt_slow_path(skcf->client_ip == NULL)) {
1903                     return NXT_ERROR;
1904                 }
1905             }
1906 
1907 #if (NXT_TLS)
1908             certificate = nxt_conf_get_path(listener, &certificate_path);
1909 
1910             if (certificate != NULL) {
1911                 tls_init = nxt_mp_get(tmcf->mem_pool, sizeof(nxt_tls_init_t));
1912                 if (nxt_slow_path(tls_init == NULL)) {
1913                     return NXT_ERROR;
1914                 }
1915 
1916                 tls_init->cache_size = 0;
1917                 tls_init->timeout = 300;
1918 
1919                 value = nxt_conf_get_path(listener, &conf_cache_path);
1920                 if (value != NULL) {
1921                     tls_init->cache_size = nxt_conf_get_number(value);
1922                 }
1923 
1924                 value = nxt_conf_get_path(listener, &conf_timeout_path);
1925                 if (value != NULL) {
1926                     tls_init->timeout = nxt_conf_get_number(value);
1927                 }
1928 
1929                 tls_init->conf_cmds = nxt_conf_get_path(listener,
1930                                                         &conf_commands_path);
1931 
1932                 tls_init->tickets_conf = nxt_conf_get_path(listener,
1933                                                            &conf_tickets);
1934 
1935                 n = nxt_conf_array_elements_count_or_1(certificate);
1936 
1937                 for (i = 0; i < n; i++) {
1938                     value = nxt_conf_get_array_element_or_itself(certificate,
1939                                                                  i);
1940                     nxt_assert(value != NULL);
1941 
1942                     ret = nxt_router_conf_tls_insert(tmcf, value, skcf,
1943                                                      tls_init, i == 0);
1944                     if (nxt_slow_path(ret != NXT_OK)) {
1945                         goto fail;
1946                     }
1947                 }
1948             }
1949 #endif
1950 
1951             skcf->listen->handler = nxt_http_conn_init;
1952             skcf->router_conf = rtcf;
1953             skcf->router_conf->count++;
1954 
1955             if (lscf.pass.length != 0) {
1956                 skcf->action = nxt_http_action_create(task, tmcf, &lscf.pass);
1957 
1958             /* COMPATIBILITY: listener application. */
1959             } else if (lscf.application.length > 0) {
1960                 skcf->action = nxt_http_pass_application(task, rtcf,
1961                                                          &lscf.application);
1962             }
1963 
1964             if (nxt_slow_path(skcf->action == NULL)) {
1965                 goto fail;
1966             }
1967         }
1968     }
1969 
1970     ret = nxt_http_routes_resolve(task, tmcf);
1971     if (nxt_slow_path(ret != NXT_OK)) {
1972         goto fail;
1973     }
1974 
1975     value = nxt_conf_get_path(root, &access_log_path);
1976 
1977     if (value != NULL) {
1978         nxt_conf_get_string(value, &path);
1979 
1980         access_log = router->access_log;
1981 
1982         if (access_log != NULL && nxt_strstr_eq(&path, &access_log->path)) {
1983             nxt_router_access_log_use(&router->lock, access_log);
1984 
1985         } else {
1986             access_log = nxt_malloc(sizeof(nxt_router_access_log_t)
1987                                     + path.length);
1988             if (access_log == NULL) {
1989                 nxt_alert(task, "failed to allocate access log structure");
1990                 goto fail;
1991             }
1992 
1993             access_log->fd = -1;
1994             access_log->handler = &nxt_router_access_log_writer;
1995             access_log->count = 1;
1996 
1997             access_log->path.length = path.length;
1998             access_log->path.start = (u_char *) access_log
1999                                      + sizeof(nxt_router_access_log_t);
2000 
2001             nxt_memcpy(access_log->path.start, path.start, path.length);
2002         }
2003 
2004         rtcf->access_log = access_log;
2005     }
2006 
2007     nxt_queue_add(&deleting_sockets, &router->sockets);
2008     nxt_queue_init(&router->sockets);
2009 
2010     return NXT_OK;
2011 
2012 app_fail:
2013 
2014     nxt_mp_destroy(app_mp);
2015 
2016 fail:
2017 
2018     nxt_queue_each(app, &tmcf->apps, nxt_app_t, link) {
2019 
2020         nxt_queue_remove(&app->link);
2021         nxt_thread_mutex_destroy(&app->mutex);
2022         nxt_mp_destroy(app->mem_pool);
2023 
2024     } nxt_queue_loop;
2025 
2026     return NXT_ERROR;
2027 }
2028 
2029 
2030 #if (NXT_TLS)
2031 
2032 static nxt_int_t
nxt_router_conf_tls_insert(nxt_router_temp_conf_t * tmcf,nxt_conf_value_t * value,nxt_socket_conf_t * skcf,nxt_tls_init_t * tls_init,nxt_bool_t last)2033 nxt_router_conf_tls_insert(nxt_router_temp_conf_t *tmcf,
2034     nxt_conf_value_t *value, nxt_socket_conf_t *skcf,
2035     nxt_tls_init_t *tls_init, nxt_bool_t last)
2036 {
2037     nxt_router_tlssock_t  *tls;
2038 
2039     tls = nxt_mp_get(tmcf->mem_pool, sizeof(nxt_router_tlssock_t));
2040     if (nxt_slow_path(tls == NULL)) {
2041         return NXT_ERROR;
2042     }
2043 
2044     tls->tls_init = tls_init;
2045     tls->socket_conf = skcf;
2046     tls->temp_conf = tmcf;
2047     tls->last = last;
2048     nxt_conf_get_string(value, &tls->name);
2049 
2050     nxt_queue_insert_tail(&tmcf->tls, &tls->link);
2051 
2052     return NXT_OK;
2053 }
2054 
2055 #endif
2056 
2057 
2058 static nxt_int_t
nxt_router_conf_process_static(nxt_task_t * task,nxt_router_conf_t * rtcf,nxt_conf_value_t * conf)2059 nxt_router_conf_process_static(nxt_task_t *task, nxt_router_conf_t *rtcf,
2060     nxt_conf_value_t *conf)
2061 {
2062     uint32_t          next, i;
2063     nxt_mp_t          *mp;
2064     nxt_str_t         *type, exten, str;
2065     nxt_int_t         ret;
2066     nxt_uint_t        exts;
2067     nxt_conf_value_t  *mtypes_conf, *ext_conf, *value;
2068 
2069     static nxt_str_t  mtypes_path = nxt_string("/mime_types");
2070 
2071     mp = rtcf->mem_pool;
2072 
2073     ret = nxt_http_static_mtypes_init(mp, &rtcf->mtypes_hash);
2074     if (nxt_slow_path(ret != NXT_OK)) {
2075         return NXT_ERROR;
2076     }
2077 
2078     if (conf == NULL) {
2079         return NXT_OK;
2080     }
2081 
2082     mtypes_conf = nxt_conf_get_path(conf, &mtypes_path);
2083 
2084     if (mtypes_conf != NULL) {
2085         next = 0;
2086 
2087         for ( ;; ) {
2088             ext_conf = nxt_conf_next_object_member(mtypes_conf, &str, &next);
2089 
2090             if (ext_conf == NULL) {
2091                 break;
2092             }
2093 
2094             type = nxt_str_dup(mp, NULL, &str);
2095             if (nxt_slow_path(type == NULL)) {
2096                 return NXT_ERROR;
2097             }
2098 
2099             if (nxt_conf_type(ext_conf) == NXT_CONF_STRING) {
2100                 nxt_conf_get_string(ext_conf, &str);
2101 
2102                 if (nxt_slow_path(nxt_str_dup(mp, &exten, &str) == NULL)) {
2103                     return NXT_ERROR;
2104                 }
2105 
2106                 ret = nxt_http_static_mtypes_hash_add(mp, &rtcf->mtypes_hash,
2107                                                       &exten, type);
2108                 if (nxt_slow_path(ret != NXT_OK)) {
2109                     return NXT_ERROR;
2110                 }
2111 
2112                 continue;
2113             }
2114 
2115             exts = nxt_conf_array_elements_count(ext_conf);
2116 
2117             for (i = 0; i < exts; i++) {
2118                 value = nxt_conf_get_array_element(ext_conf, i);
2119 
2120                 nxt_conf_get_string(value, &str);
2121 
2122                 if (nxt_slow_path(nxt_str_dup(mp, &exten, &str) == NULL)) {
2123                     return NXT_ERROR;
2124                 }
2125 
2126                 ret = nxt_http_static_mtypes_hash_add(mp, &rtcf->mtypes_hash,
2127                                                       &exten, type);
2128                 if (nxt_slow_path(ret != NXT_OK)) {
2129                     return NXT_ERROR;
2130                 }
2131             }
2132         }
2133     }
2134 
2135     return NXT_OK;
2136 }
2137 
2138 
2139 static nxt_http_forward_t *
nxt_router_conf_forward(nxt_task_t * task,nxt_mp_t * mp,nxt_conf_value_t * conf)2140 nxt_router_conf_forward(nxt_task_t *task, nxt_mp_t *mp, nxt_conf_value_t *conf)
2141 {
2142     nxt_int_t                   ret;
2143     nxt_conf_value_t            *header_conf, *client_ip_conf, *protocol_conf;
2144     nxt_conf_value_t            *source_conf, *recursive_conf;
2145     nxt_http_forward_t          *forward;
2146     nxt_http_route_addr_rule_t  *source;
2147 
2148     static nxt_str_t  header_path = nxt_string("/header");
2149     static nxt_str_t  client_ip_path = nxt_string("/client_ip");
2150     static nxt_str_t  protocol_path = nxt_string("/protocol");
2151     static nxt_str_t  source_path = nxt_string("/source");
2152     static nxt_str_t  recursive_path = nxt_string("/recursive");
2153 
2154     header_conf = nxt_conf_get_path(conf, &header_path);
2155 
2156     if (header_conf != NULL) {
2157         client_ip_conf = nxt_conf_get_path(conf, &header_path);
2158         protocol_conf = NULL;
2159 
2160     } else {
2161         client_ip_conf = nxt_conf_get_path(conf, &client_ip_path);
2162         protocol_conf = nxt_conf_get_path(conf, &protocol_path);
2163     }
2164 
2165     source_conf = nxt_conf_get_path(conf, &source_path);
2166     recursive_conf = nxt_conf_get_path(conf, &recursive_path);
2167 
2168     if (source_conf == NULL
2169         || (protocol_conf == NULL && client_ip_conf == NULL))
2170     {
2171         return NULL;
2172     }
2173 
2174     forward = nxt_mp_zget(mp, sizeof(nxt_http_forward_t));
2175     if (nxt_slow_path(forward == NULL)) {
2176         return NULL;
2177     }
2178 
2179     source = nxt_http_route_addr_rule_create(task, mp, source_conf);
2180     if (nxt_slow_path(source == NULL)) {
2181         return NULL;
2182     }
2183 
2184     forward->source = source;
2185 
2186     if (recursive_conf != NULL) {
2187         forward->recursive = nxt_conf_get_boolean(recursive_conf);
2188     }
2189 
2190     if (client_ip_conf != NULL) {
2191         ret = nxt_router_conf_forward_header(mp, client_ip_conf,
2192                                              &forward->client_ip);
2193         if (nxt_slow_path(ret != NXT_OK)) {
2194             return NULL;
2195         }
2196     }
2197 
2198     if (protocol_conf != NULL) {
2199         ret = nxt_router_conf_forward_header(mp, protocol_conf,
2200                                              &forward->protocol);
2201         if (nxt_slow_path(ret != NXT_OK)) {
2202             return NULL;
2203         }
2204     }
2205 
2206     return forward;
2207 }
2208 
2209 
2210 static nxt_int_t
nxt_router_conf_forward_header(nxt_mp_t * mp,nxt_conf_value_t * conf,nxt_http_forward_header_t * fh)2211 nxt_router_conf_forward_header(nxt_mp_t *mp, nxt_conf_value_t *conf,
2212     nxt_http_forward_header_t *fh)
2213 {
2214     char       c;
2215     size_t     i;
2216     uint32_t   hash;
2217     nxt_str_t  header;
2218 
2219     nxt_conf_get_string(conf, &header);
2220 
2221     fh->header = nxt_str_dup(mp, NULL, &header);
2222     if (nxt_slow_path(fh->header == NULL)) {
2223         return NXT_ERROR;
2224     }
2225 
2226     hash = NXT_HTTP_FIELD_HASH_INIT;
2227 
2228     for (i = 0; i < fh->header->length; i++) {
2229         c = fh->header->start[i];
2230         hash = nxt_http_field_hash_char(hash, nxt_lowcase(c));
2231     }
2232 
2233     hash = nxt_http_field_hash_end(hash) & 0xFFFF;
2234 
2235     fh->header_hash = hash;
2236 
2237     return NXT_OK;
2238 }
2239 
2240 
2241 static nxt_app_t *
nxt_router_app_find(nxt_queue_t * queue,nxt_str_t * name)2242 nxt_router_app_find(nxt_queue_t *queue, nxt_str_t *name)
2243 {
2244     nxt_app_t  *app;
2245 
2246     nxt_queue_each(app, queue, nxt_app_t, link) {
2247 
2248         if (nxt_strstr_eq(name, &app->name)) {
2249             return app;
2250         }
2251 
2252     } nxt_queue_loop;
2253 
2254     return NULL;
2255 }
2256 
2257 
2258 static nxt_int_t
nxt_router_app_queue_init(nxt_task_t * task,nxt_port_t * port)2259 nxt_router_app_queue_init(nxt_task_t *task, nxt_port_t *port)
2260 {
2261     void       *mem;
2262     nxt_int_t  fd;
2263 
2264     fd = nxt_shm_open(task, sizeof(nxt_app_queue_t));
2265     if (nxt_slow_path(fd == -1)) {
2266         return NXT_ERROR;
2267     }
2268 
2269     mem = nxt_mem_mmap(NULL, sizeof(nxt_app_queue_t),
2270                        PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
2271     if (nxt_slow_path(mem == MAP_FAILED)) {
2272         nxt_fd_close(fd);
2273 
2274         return NXT_ERROR;
2275     }
2276 
2277     nxt_app_queue_init(mem);
2278 
2279     port->queue_fd = fd;
2280     port->queue = mem;
2281 
2282     return NXT_OK;
2283 }
2284 
2285 
2286 static nxt_int_t
nxt_router_port_queue_init(nxt_task_t * task,nxt_port_t * port)2287 nxt_router_port_queue_init(nxt_task_t *task, nxt_port_t *port)
2288 {
2289     void       *mem;
2290     nxt_int_t  fd;
2291 
2292     fd = nxt_shm_open(task, sizeof(nxt_port_queue_t));
2293     if (nxt_slow_path(fd == -1)) {
2294         return NXT_ERROR;
2295     }
2296 
2297     mem = nxt_mem_mmap(NULL, sizeof(nxt_port_queue_t),
2298                        PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
2299     if (nxt_slow_path(mem == MAP_FAILED)) {
2300         nxt_fd_close(fd);
2301 
2302         return NXT_ERROR;
2303     }
2304 
2305     nxt_port_queue_init(mem);
2306 
2307     port->queue_fd = fd;
2308     port->queue = mem;
2309 
2310     return NXT_OK;
2311 }
2312 
2313 
2314 static nxt_int_t
nxt_router_port_queue_map(nxt_task_t * task,nxt_port_t * port,nxt_fd_t fd)2315 nxt_router_port_queue_map(nxt_task_t *task, nxt_port_t *port, nxt_fd_t fd)
2316 {
2317     void  *mem;
2318 
2319     nxt_assert(fd != -1);
2320 
2321     mem = nxt_mem_mmap(NULL, sizeof(nxt_port_queue_t),
2322                        PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
2323     if (nxt_slow_path(mem == MAP_FAILED)) {
2324 
2325         return NXT_ERROR;
2326     }
2327 
2328     port->queue = mem;
2329 
2330     return NXT_OK;
2331 }
2332 
2333 
2334 static const nxt_lvlhsh_proto_t  nxt_router_apps_hash_proto  nxt_aligned(64) = {
2335     NXT_LVLHSH_DEFAULT,
2336     nxt_router_apps_hash_test,
2337     nxt_mp_lvlhsh_alloc,
2338     nxt_mp_lvlhsh_free,
2339 };
2340 
2341 
2342 static nxt_int_t
nxt_router_apps_hash_test(nxt_lvlhsh_query_t * lhq,void * data)2343 nxt_router_apps_hash_test(nxt_lvlhsh_query_t *lhq, void *data)
2344 {
2345     nxt_app_t  *app;
2346 
2347     app = data;
2348 
2349     return nxt_strstr_eq(&lhq->key, &app->name) ? NXT_OK : NXT_DECLINED;
2350 }
2351 
2352 
2353 static nxt_int_t
nxt_router_apps_hash_add(nxt_router_conf_t * rtcf,nxt_app_t * app)2354 nxt_router_apps_hash_add(nxt_router_conf_t *rtcf, nxt_app_t *app)
2355 {
2356     nxt_lvlhsh_query_t  lhq;
2357 
2358     lhq.key_hash = nxt_djb_hash(app->name.start, app->name.length);
2359     lhq.replace = 0;
2360     lhq.key = app->name;
2361     lhq.value = app;
2362     lhq.proto = &nxt_router_apps_hash_proto;
2363     lhq.pool = rtcf->mem_pool;
2364 
2365     switch (nxt_lvlhsh_insert(&rtcf->apps_hash, &lhq)) {
2366 
2367     case NXT_OK:
2368         return NXT_OK;
2369 
2370     case NXT_DECLINED:
2371         nxt_thread_log_alert("router app hash adding failed: "
2372                              "\"%V\" is already in hash", &lhq.key);
2373         /* Fall through. */
2374     default:
2375         return NXT_ERROR;
2376     }
2377 }
2378 
2379 
2380 static nxt_app_t *
nxt_router_apps_hash_get(nxt_router_conf_t * rtcf,nxt_str_t * name)2381 nxt_router_apps_hash_get(nxt_router_conf_t *rtcf, nxt_str_t *name)
2382 {
2383     nxt_lvlhsh_query_t  lhq;
2384 
2385     lhq.key_hash = nxt_djb_hash(name->start, name->length);
2386     lhq.key = *name;
2387     lhq.proto = &nxt_router_apps_hash_proto;
2388 
2389     if (nxt_lvlhsh_find(&rtcf->apps_hash, &lhq) != NXT_OK) {
2390         return NULL;
2391     }
2392 
2393     return lhq.value;
2394 }
2395 
2396 
2397 static void
nxt_router_apps_hash_use(nxt_task_t * task,nxt_router_conf_t * rtcf,int i)2398 nxt_router_apps_hash_use(nxt_task_t *task, nxt_router_conf_t *rtcf, int i)
2399 {
2400     nxt_app_t          *app;
2401     nxt_lvlhsh_each_t  lhe;
2402 
2403     nxt_lvlhsh_each_init(&lhe, &nxt_router_apps_hash_proto);
2404 
2405     for ( ;; ) {
2406         app = nxt_lvlhsh_each(&rtcf->apps_hash, &lhe);
2407 
2408         if (app == NULL) {
2409             break;
2410         }
2411 
2412         nxt_router_app_use(task, app, i);
2413     }
2414 }
2415 
2416 
2417 typedef struct {
2418     nxt_app_t  *app;
2419     nxt_int_t  target;
2420 } nxt_http_app_conf_t;
2421 
2422 
2423 nxt_int_t
nxt_router_application_init(nxt_router_conf_t * rtcf,nxt_str_t * name,nxt_str_t * target,nxt_http_action_t * action)2424 nxt_router_application_init(nxt_router_conf_t *rtcf, nxt_str_t *name,
2425     nxt_str_t *target, nxt_http_action_t *action)
2426 {
2427     nxt_app_t            *app;
2428     nxt_str_t            *targets;
2429     nxt_uint_t           i;
2430     nxt_http_app_conf_t  *conf;
2431 
2432     app = nxt_router_apps_hash_get(rtcf, name);
2433     if (app == NULL) {
2434         return NXT_DECLINED;
2435     }
2436 
2437     conf = nxt_mp_get(rtcf->mem_pool, sizeof(nxt_http_app_conf_t));
2438     if (nxt_slow_path(conf == NULL)) {
2439         return NXT_ERROR;
2440     }
2441 
2442     action->handler = nxt_http_application_handler;
2443     action->u.conf = conf;
2444 
2445     conf->app = app;
2446 
2447     if (target != NULL && target->length != 0) {
2448         targets = app->targets;
2449 
2450         for (i = 0; !nxt_strstr_eq(target, &targets[i]); i++);
2451 
2452         conf->target = i;
2453 
2454     } else {
2455         conf->target = 0;
2456     }
2457 
2458     return NXT_OK;
2459 }
2460 
2461 
2462 static nxt_socket_conf_t *
nxt_router_socket_conf(nxt_task_t * task,nxt_router_temp_conf_t * tmcf,nxt_str_t * name)2463 nxt_router_socket_conf(nxt_task_t *task, nxt_router_temp_conf_t *tmcf,
2464     nxt_str_t *name)
2465 {
2466     size_t               size;
2467     nxt_int_t            ret;
2468     nxt_bool_t           wildcard;
2469     nxt_sockaddr_t       *sa;
2470     nxt_socket_conf_t    *skcf;
2471     nxt_listen_socket_t  *ls;
2472 
2473     sa = nxt_sockaddr_parse(tmcf->mem_pool, name);
2474     if (nxt_slow_path(sa == NULL)) {
2475         nxt_alert(task, "invalid listener \"%V\"", name);
2476         return NULL;
2477     }
2478 
2479     sa->type = SOCK_STREAM;
2480 
2481     nxt_debug(task, "router listener: \"%*s\"",
2482               (size_t) sa->length, nxt_sockaddr_start(sa));
2483 
2484     skcf = nxt_mp_zget(tmcf->router_conf->mem_pool, sizeof(nxt_socket_conf_t));
2485     if (nxt_slow_path(skcf == NULL)) {
2486         return NULL;
2487     }
2488 
2489     size = nxt_sockaddr_size(sa);
2490 
2491     ret = nxt_router_listen_socket_find(tmcf, skcf, sa);
2492 
2493     if (ret != NXT_OK) {
2494 
2495         ls = nxt_zalloc(sizeof(nxt_listen_socket_t) + size);
2496         if (nxt_slow_path(ls == NULL)) {
2497             return NULL;
2498         }
2499 
2500         skcf->listen = ls;
2501 
2502         ls->sockaddr = nxt_pointer_to(ls, sizeof(nxt_listen_socket_t));
2503         nxt_memcpy(ls->sockaddr, sa, size);
2504 
2505         nxt_listen_socket_remote_size(ls);
2506 
2507         ls->socket = -1;
2508         ls->backlog = NXT_LISTEN_BACKLOG;
2509         ls->flags = NXT_NONBLOCK;
2510         ls->read_after_accept = 1;
2511     }
2512 
2513     switch (sa->u.sockaddr.sa_family) {
2514 #if (NXT_HAVE_UNIX_DOMAIN)
2515     case AF_UNIX:
2516         wildcard = 0;
2517         break;
2518 #endif
2519 #if (NXT_INET6)
2520     case AF_INET6:
2521         wildcard = IN6_IS_ADDR_UNSPECIFIED(&sa->u.sockaddr_in6.sin6_addr);
2522         break;
2523 #endif
2524     case AF_INET:
2525     default:
2526         wildcard = (sa->u.sockaddr_in.sin_addr.s_addr == INADDR_ANY);
2527         break;
2528     }
2529 
2530     if (!wildcard) {
2531         skcf->sockaddr = nxt_mp_zget(tmcf->router_conf->mem_pool, size);
2532         if (nxt_slow_path(skcf->sockaddr == NULL)) {
2533             return NULL;
2534         }
2535 
2536         nxt_memcpy(skcf->sockaddr, sa, size);
2537     }
2538 
2539     return skcf;
2540 }
2541 
2542 
2543 static nxt_int_t
nxt_router_listen_socket_find(nxt_router_temp_conf_t * tmcf,nxt_socket_conf_t * nskcf,nxt_sockaddr_t * sa)2544 nxt_router_listen_socket_find(nxt_router_temp_conf_t *tmcf,
2545     nxt_socket_conf_t *nskcf, nxt_sockaddr_t *sa)
2546 {
2547     nxt_router_t       *router;
2548     nxt_queue_link_t   *qlk;
2549     nxt_socket_conf_t  *skcf;
2550 
2551     router = tmcf->router_conf->router;
2552 
2553     for (qlk = nxt_queue_first(&router->sockets);
2554          qlk != nxt_queue_tail(&router->sockets);
2555          qlk = nxt_queue_next(qlk))
2556     {
2557         skcf = nxt_queue_link_data(qlk, nxt_socket_conf_t, link);
2558 
2559         if (nxt_sockaddr_cmp(skcf->listen->sockaddr, sa)) {
2560             nskcf->listen = skcf->listen;
2561 
2562             nxt_queue_remove(qlk);
2563             nxt_queue_insert_tail(&keeping_sockets, qlk);
2564 
2565             nxt_queue_insert_tail(&updating_sockets, &nskcf->link);
2566 
2567             return NXT_OK;
2568         }
2569     }
2570 
2571     nxt_queue_insert_tail(&pending_sockets, &nskcf->link);
2572 
2573     return NXT_DECLINED;
2574 }
2575 
2576 
2577 static void
nxt_router_listen_socket_rpc_create(nxt_task_t * task,nxt_router_temp_conf_t * tmcf,nxt_socket_conf_t * skcf)2578 nxt_router_listen_socket_rpc_create(nxt_task_t *task,
2579     nxt_router_temp_conf_t *tmcf, nxt_socket_conf_t *skcf)
2580 {
2581     size_t            size;
2582     uint32_t          stream;
2583     nxt_int_t         ret;
2584     nxt_buf_t         *b;
2585     nxt_port_t        *main_port, *router_port;
2586     nxt_runtime_t     *rt;
2587     nxt_socket_rpc_t  *rpc;
2588 
2589     rpc = nxt_mp_alloc(tmcf->mem_pool, sizeof(nxt_socket_rpc_t));
2590     if (rpc == NULL) {
2591         goto fail;
2592     }
2593 
2594     rpc->socket_conf = skcf;
2595     rpc->temp_conf = tmcf;
2596 
2597     size = nxt_sockaddr_size(skcf->listen->sockaddr);
2598 
2599     b = nxt_buf_mem_alloc(tmcf->mem_pool, size, 0);
2600     if (b == NULL) {
2601         goto fail;
2602     }
2603 
2604     b->completion_handler = nxt_buf_dummy_completion;
2605 
2606     b->mem.free = nxt_cpymem(b->mem.free, skcf->listen->sockaddr, size);
2607 
2608     rt = task->thread->runtime;
2609     main_port = rt->port_by_type[NXT_PROCESS_MAIN];
2610     router_port = rt->port_by_type[NXT_PROCESS_ROUTER];
2611 
2612     stream = nxt_port_rpc_register_handler(task, router_port,
2613                                            nxt_router_listen_socket_ready,
2614                                            nxt_router_listen_socket_error,
2615                                            main_port->pid, rpc);
2616     if (nxt_slow_path(stream == 0)) {
2617         goto fail;
2618     }
2619 
2620     ret = nxt_port_socket_write(task, main_port, NXT_PORT_MSG_SOCKET, -1,
2621                                 stream, router_port->id, b);
2622 
2623     if (nxt_slow_path(ret != NXT_OK)) {
2624         nxt_port_rpc_cancel(task, router_port, stream);
2625         goto fail;
2626     }
2627 
2628     return;
2629 
2630 fail:
2631 
2632     nxt_router_conf_error(task, tmcf);
2633 }
2634 
2635 
2636 static void
nxt_router_listen_socket_ready(nxt_task_t * task,nxt_port_recv_msg_t * msg,void * data)2637 nxt_router_listen_socket_ready(nxt_task_t *task, nxt_port_recv_msg_t *msg,
2638     void *data)
2639 {
2640     nxt_int_t         ret;
2641     nxt_socket_t      s;
2642     nxt_socket_rpc_t  *rpc;
2643 
2644     rpc = data;
2645 
2646     s = msg->fd[0];
2647 
2648     ret = nxt_socket_nonblocking(task, s);
2649     if (nxt_slow_path(ret != NXT_OK)) {
2650         goto fail;
2651     }
2652 
2653     nxt_socket_defer_accept(task, s, rpc->socket_conf->listen->sockaddr);
2654 
2655     ret = nxt_listen_socket(task, s, NXT_LISTEN_BACKLOG);
2656     if (nxt_slow_path(ret != NXT_OK)) {
2657         goto fail;
2658     }
2659 
2660     rpc->socket_conf->listen->socket = s;
2661 
2662     nxt_work_queue_add(&task->thread->engine->fast_work_queue,
2663                        nxt_router_conf_apply, task, rpc->temp_conf, NULL);
2664 
2665     return;
2666 
2667 fail:
2668 
2669     nxt_socket_close(task, s);
2670 
2671     nxt_router_conf_error(task, rpc->temp_conf);
2672 }
2673 
2674 
2675 static void
nxt_router_listen_socket_error(nxt_task_t * task,nxt_port_recv_msg_t * msg,void * data)2676 nxt_router_listen_socket_error(nxt_task_t *task, nxt_port_recv_msg_t *msg,
2677     void *data)
2678 {
2679     nxt_socket_rpc_t        *rpc;
2680     nxt_router_temp_conf_t  *tmcf;
2681 
2682     rpc = data;
2683     tmcf = rpc->temp_conf;
2684 
2685 #if 0
2686     u_char                  *p;
2687     size_t                  size;
2688     uint8_t                 error;
2689     nxt_buf_t               *in, *out;
2690     nxt_sockaddr_t          *sa;
2691 
2692     static nxt_str_t  socket_errors[] = {
2693         nxt_string("ListenerSystem"),
2694         nxt_string("ListenerNoIPv6"),
2695         nxt_string("ListenerPort"),
2696         nxt_string("ListenerInUse"),
2697         nxt_string("ListenerNoAddress"),
2698         nxt_string("ListenerNoAccess"),
2699         nxt_string("ListenerPath"),
2700     };
2701 
2702     sa = rpc->socket_conf->listen->sockaddr;
2703 
2704     in = nxt_buf_chk_make_plain(tmcf->mem_pool, msg->buf, msg->size);
2705 
2706     if (nxt_slow_path(in == NULL)) {
2707         return;
2708     }
2709 
2710     p = in->mem.pos;
2711 
2712     error = *p++;
2713 
2714     size = nxt_length("listen socket error: ")
2715            + nxt_length("{listener: \"\", code:\"\", message: \"\"}")
2716            + sa->length + socket_errors[error].length + (in->mem.free - p);
2717 
2718     out = nxt_buf_mem_alloc(tmcf->mem_pool, size, 0);
2719     if (nxt_slow_path(out == NULL)) {
2720         return;
2721     }
2722 
2723     out->mem.free = nxt_sprintf(out->mem.free, out->mem.end,
2724                         "listen socket error: "
2725                         "{listener: \"%*s\", code:\"%V\", message: \"%*s\"}",
2726                         (size_t) sa->length, nxt_sockaddr_start(sa),
2727                         &socket_errors[error], in->mem.free - p, p);
2728 
2729     nxt_debug(task, "%*s", out->mem.free - out->mem.pos, out->mem.pos);
2730 #endif
2731 
2732     nxt_router_conf_error(task, tmcf);
2733 }
2734 
2735 
2736 #if (NXT_TLS)
2737 
2738 static void
nxt_router_tls_rpc_handler(nxt_task_t * task,nxt_port_recv_msg_t * msg,void * data)2739 nxt_router_tls_rpc_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg,
2740     void *data)
2741 {
2742     nxt_mp_t                *mp;
2743     nxt_int_t               ret;
2744     nxt_tls_conf_t          *tlscf;
2745     nxt_router_tlssock_t    *tls;
2746     nxt_tls_bundle_conf_t   *bundle;
2747     nxt_router_temp_conf_t  *tmcf;
2748 
2749     nxt_debug(task, "tls rpc handler");
2750 
2751     tls = data;
2752     tmcf = tls->temp_conf;
2753 
2754     if (msg == NULL || msg->port_msg.type == _NXT_PORT_MSG_RPC_ERROR) {
2755         goto fail;
2756     }
2757 
2758     mp = tmcf->router_conf->mem_pool;
2759 
2760     if (tls->socket_conf->tls == NULL){
2761         tlscf = nxt_mp_zget(mp, sizeof(nxt_tls_conf_t));
2762         if (nxt_slow_path(tlscf == NULL)) {
2763             goto fail;
2764         }
2765 
2766         tlscf->no_wait_shutdown = 1;
2767         tls->socket_conf->tls = tlscf;
2768 
2769     } else {
2770         tlscf = tls->socket_conf->tls;
2771     }
2772 
2773     tls->tls_init->conf = tlscf;
2774 
2775     bundle = nxt_mp_get(mp, sizeof(nxt_tls_bundle_conf_t));
2776     if (nxt_slow_path(bundle == NULL)) {
2777         goto fail;
2778     }
2779 
2780     if (nxt_slow_path(nxt_str_dup(mp, &bundle->name, &tls->name) == NULL)) {
2781         goto fail;
2782     }
2783 
2784     bundle->chain_file = msg->fd[0];
2785     bundle->next = tlscf->bundle;
2786     tlscf->bundle = bundle;
2787 
2788     ret = task->thread->runtime->tls->server_init(task, mp, tls->tls_init,
2789                                                   tls->last);
2790     if (nxt_slow_path(ret != NXT_OK)) {
2791         goto fail;
2792     }
2793 
2794     nxt_work_queue_add(&task->thread->engine->fast_work_queue,
2795                        nxt_router_conf_apply, task, tmcf, NULL);
2796     return;
2797 
2798 fail:
2799 
2800     nxt_router_conf_error(task, tmcf);
2801 }
2802 
2803 #endif
2804 
2805 
2806 static void
nxt_router_app_rpc_create(nxt_task_t * task,nxt_router_temp_conf_t * tmcf,nxt_app_t * app)2807 nxt_router_app_rpc_create(nxt_task_t *task,
2808     nxt_router_temp_conf_t *tmcf, nxt_app_t *app)
2809 {
2810     size_t         size;
2811     uint32_t       stream;
2812     nxt_fd_t       port_fd, queue_fd;
2813     nxt_int_t      ret;
2814     nxt_buf_t      *b;
2815     nxt_port_t     *router_port, *dport;
2816     nxt_runtime_t  *rt;
2817     nxt_app_rpc_t  *rpc;
2818 
2819     rt = task->thread->runtime;
2820 
2821     dport = app->proto_port;
2822 
2823     if (dport == NULL) {
2824         nxt_debug(task, "app '%V' prototype prefork", &app->name);
2825 
2826         size = app->name.length + 1 + app->conf.length;
2827 
2828         b = nxt_buf_mem_alloc(tmcf->mem_pool, size, 0);
2829         if (nxt_slow_path(b == NULL)) {
2830             goto fail;
2831         }
2832 
2833         b->completion_handler = nxt_buf_dummy_completion;
2834 
2835         nxt_buf_cpystr(b, &app->name);
2836         *b->mem.free++ = '\0';
2837         nxt_buf_cpystr(b, &app->conf);
2838 
2839         dport = rt->port_by_type[NXT_PROCESS_MAIN];
2840 
2841         port_fd = app->shared_port->pair[0];
2842         queue_fd = app->shared_port->queue_fd;
2843 
2844     } else {
2845         nxt_debug(task, "app '%V' prefork", &app->name);
2846 
2847         b = NULL;
2848         port_fd = -1;
2849         queue_fd = -1;
2850     }
2851 
2852     router_port = rt->port_by_type[NXT_PROCESS_ROUTER];
2853 
2854     rpc = nxt_port_rpc_register_handler_ex(task, router_port,
2855                                            nxt_router_app_prefork_ready,
2856                                            nxt_router_app_prefork_error,
2857                                            sizeof(nxt_app_rpc_t));
2858     if (nxt_slow_path(rpc == NULL)) {
2859         goto fail;
2860     }
2861 
2862     rpc->app = app;
2863     rpc->temp_conf = tmcf;
2864     rpc->proto = (b != NULL);
2865 
2866     stream = nxt_port_rpc_ex_stream(rpc);
2867 
2868     ret = nxt_port_socket_write2(task, dport, NXT_PORT_MSG_START_PROCESS,
2869                                  port_fd, queue_fd, stream, router_port->id, b);
2870     if (nxt_slow_path(ret != NXT_OK)) {
2871         nxt_port_rpc_cancel(task, router_port, stream);
2872         goto fail;
2873     }
2874 
2875     if (b == NULL) {
2876         nxt_port_rpc_ex_set_peer(task, router_port, rpc, dport->pid);
2877 
2878         app->pending_processes++;
2879     }
2880 
2881     return;
2882 
2883 fail:
2884 
2885     nxt_router_conf_error(task, tmcf);
2886 }
2887 
2888 
2889 static void
nxt_router_app_prefork_ready(nxt_task_t * task,nxt_port_recv_msg_t * msg,void * data)2890 nxt_router_app_prefork_ready(nxt_task_t *task, nxt_port_recv_msg_t *msg,
2891     void *data)
2892 {
2893     nxt_app_t           *app;
2894     nxt_port_t          *port;
2895     nxt_app_rpc_t       *rpc;
2896     nxt_event_engine_t  *engine;
2897 
2898     rpc = data;
2899     app = rpc->app;
2900 
2901     port = msg->u.new_port;
2902 
2903     nxt_assert(port != NULL);
2904     nxt_assert(port->id == 0);
2905 
2906     if (rpc->proto) {
2907         nxt_assert(app->proto_port == NULL);
2908         nxt_assert(port->type == NXT_PROCESS_PROTOTYPE);
2909 
2910         nxt_port_inc_use(port);
2911 
2912         app->proto_port = port;
2913         port->app = app;
2914 
2915         nxt_router_app_rpc_create(task, rpc->temp_conf, app);
2916 
2917         return;
2918     }
2919 
2920     nxt_assert(port->type == NXT_PROCESS_APP);
2921 
2922     port->app = app;
2923     port->main_app_port = port;
2924 
2925     app->pending_processes--;
2926     app->processes++;
2927     app->idle_processes++;
2928 
2929     engine = task->thread->engine;
2930 
2931     nxt_queue_insert_tail(&app->ports, &port->app_link);
2932     nxt_queue_insert_tail(&app->spare_ports, &port->idle_link);
2933 
2934     nxt_debug(task, "app '%V' move new port %PI:%d to spare_ports",
2935               &app->name, port->pid, port->id);
2936 
2937     nxt_port_hash_add(&app->port_hash, port);
2938     app->port_hash_count++;
2939 
2940     port->idle_start = 0;
2941 
2942     nxt_port_inc_use(port);
2943 
2944     nxt_port_socket_write(task, port, NXT_PORT_MSG_PORT_ACK, -1, 0, 0, NULL);
2945 
2946     nxt_work_queue_add(&engine->fast_work_queue,
2947                        nxt_router_conf_apply, task, rpc->temp_conf, NULL);
2948 }
2949 
2950 
2951 static void
nxt_router_app_prefork_error(nxt_task_t * task,nxt_port_recv_msg_t * msg,void * data)2952 nxt_router_app_prefork_error(nxt_task_t *task, nxt_port_recv_msg_t *msg,
2953     void *data)
2954 {
2955     nxt_app_t               *app;
2956     nxt_app_rpc_t           *rpc;
2957     nxt_router_temp_conf_t  *tmcf;
2958 
2959     rpc = data;
2960     app = rpc->app;
2961     tmcf = rpc->temp_conf;
2962 
2963     if (rpc->proto) {
2964         nxt_log(task, NXT_LOG_WARN, "failed to start prototype \"%V\"",
2965                 &app->name);
2966 
2967     } else {
2968         nxt_log(task, NXT_LOG_WARN, "failed to start application \"%V\"",
2969                 &app->name);
2970 
2971         app->pending_processes--;
2972     }
2973 
2974     nxt_router_conf_error(task, tmcf);
2975 }
2976 
2977 
2978 static nxt_int_t
nxt_router_engines_create(nxt_task_t * task,nxt_router_t * router,nxt_router_temp_conf_t * tmcf,const nxt_event_interface_t * interface)2979 nxt_router_engines_create(nxt_task_t *task, nxt_router_t *router,
2980     nxt_router_temp_conf_t *tmcf, const nxt_event_interface_t *interface)
2981 {
2982     nxt_int_t                 ret;
2983     nxt_uint_t                n, threads;
2984     nxt_queue_link_t          *qlk;
2985     nxt_router_engine_conf_t  *recf;
2986 
2987     threads = tmcf->router_conf->threads;
2988 
2989     tmcf->engines = nxt_array_create(tmcf->mem_pool, threads,
2990                                      sizeof(nxt_router_engine_conf_t));
2991     if (nxt_slow_path(tmcf->engines == NULL)) {
2992         return NXT_ERROR;
2993     }
2994 
2995     n = 0;
2996 
2997     for (qlk = nxt_queue_first(&router->engines);
2998          qlk != nxt_queue_tail(&router->engines);
2999          qlk = nxt_queue_next(qlk))
3000     {
3001         recf = nxt_array_zero_add(tmcf->engines);
3002         if (nxt_slow_path(recf == NULL)) {
3003             return NXT_ERROR;
3004         }
3005 
3006         recf->engine = nxt_queue_link_data(qlk, nxt_event_engine_t, link0);
3007 
3008         if (n < threads) {
3009             recf->action = NXT_ROUTER_ENGINE_KEEP;
3010             ret = nxt_router_engine_conf_update(tmcf, recf);
3011 
3012         } else {
3013             recf->action = NXT_ROUTER_ENGINE_DELETE;
3014             ret = nxt_router_engine_conf_delete(tmcf, recf);
3015         }
3016 
3017         if (nxt_slow_path(ret != NXT_OK)) {
3018             return ret;
3019         }
3020 
3021         n++;
3022     }
3023 
3024     tmcf->new_threads = n;
3025 
3026     while (n < threads) {
3027         recf = nxt_array_zero_add(tmcf->engines);
3028         if (nxt_slow_path(recf == NULL)) {
3029             return NXT_ERROR;
3030         }
3031 
3032         recf->action = NXT_ROUTER_ENGINE_ADD;
3033 
3034         recf->engine = nxt_event_engine_create(task, interface, NULL, 0, 0);
3035         if (nxt_slow_path(recf->engine == NULL)) {
3036             return NXT_ERROR;
3037         }
3038 
3039         ret = nxt_router_engine_conf_create(tmcf, recf);
3040         if (nxt_slow_path(ret != NXT_OK)) {
3041             return ret;
3042         }
3043 
3044         n++;
3045     }
3046 
3047     return NXT_OK;
3048 }
3049 
3050 
3051 static nxt_int_t
nxt_router_engine_conf_create(nxt_router_temp_conf_t * tmcf,nxt_router_engine_conf_t * recf)3052 nxt_router_engine_conf_create(nxt_router_temp_conf_t *tmcf,
3053     nxt_router_engine_conf_t *recf)
3054 {
3055     nxt_int_t  ret;
3056 
3057     ret = nxt_router_engine_joints_create(tmcf, recf, &creating_sockets,
3058                                           nxt_router_listen_socket_create);
3059     if (nxt_slow_path(ret != NXT_OK)) {
3060         return ret;
3061     }
3062 
3063     ret = nxt_router_engine_joints_create(tmcf, recf, &updating_sockets,
3064                                           nxt_router_listen_socket_create);
3065     if (nxt_slow_path(ret != NXT_OK)) {
3066         return ret;
3067     }
3068 
3069     return ret;
3070 }
3071 
3072 
3073 static nxt_int_t
nxt_router_engine_conf_update(nxt_router_temp_conf_t * tmcf,nxt_router_engine_conf_t * recf)3074 nxt_router_engine_conf_update(nxt_router_temp_conf_t *tmcf,
3075     nxt_router_engine_conf_t *recf)
3076 {
3077     nxt_int_t  ret;
3078 
3079     ret = nxt_router_engine_joints_create(tmcf, recf, &creating_sockets,
3080                                           nxt_router_listen_socket_create);
3081     if (nxt_slow_path(ret != NXT_OK)) {
3082         return ret;
3083     }
3084 
3085     ret = nxt_router_engine_joints_create(tmcf, recf, &updating_sockets,
3086                                           nxt_router_listen_socket_update);
3087     if (nxt_slow_path(ret != NXT_OK)) {
3088         return ret;
3089     }
3090 
3091     ret = nxt_router_engine_joints_delete(tmcf, recf, &deleting_sockets);
3092     if (nxt_slow_path(ret != NXT_OK)) {
3093         return ret;
3094     }
3095 
3096     return ret;
3097 }
3098 
3099 
3100 static nxt_int_t
nxt_router_engine_conf_delete(nxt_router_temp_conf_t * tmcf,nxt_router_engine_conf_t * recf)3101 nxt_router_engine_conf_delete(nxt_router_temp_conf_t *tmcf,
3102     nxt_router_engine_conf_t *recf)
3103 {
3104     nxt_int_t  ret;
3105 
3106     ret = nxt_router_engine_quit(tmcf, recf);
3107     if (nxt_slow_path(ret != NXT_OK)) {
3108         return ret;
3109     }
3110 
3111     ret = nxt_router_engine_joints_delete(tmcf, recf, &updating_sockets);
3112     if (nxt_slow_path(ret != NXT_OK)) {
3113         return ret;
3114     }
3115 
3116     return nxt_router_engine_joints_delete(tmcf, recf, &deleting_sockets);
3117 }
3118 
3119 
3120 static nxt_int_t
nxt_router_engine_joints_create(nxt_router_temp_conf_t * tmcf,nxt_router_engine_conf_t * recf,nxt_queue_t * sockets,nxt_work_handler_t handler)3121 nxt_router_engine_joints_create(nxt_router_temp_conf_t *tmcf,
3122     nxt_router_engine_conf_t *recf, nxt_queue_t *sockets,
3123     nxt_work_handler_t handler)
3124 {
3125     nxt_int_t                ret;
3126     nxt_joint_job_t          *job;
3127     nxt_queue_link_t         *qlk;
3128     nxt_socket_conf_t        *skcf;
3129     nxt_socket_conf_joint_t  *joint;
3130 
3131     for (qlk = nxt_queue_first(sockets);
3132          qlk != nxt_queue_tail(sockets);
3133          qlk = nxt_queue_next(qlk))
3134     {
3135         job = nxt_mp_get(tmcf->mem_pool, sizeof(nxt_joint_job_t));
3136         if (nxt_slow_path(job == NULL)) {
3137             return NXT_ERROR;
3138         }
3139 
3140         job->work.next = recf->jobs;
3141         recf->jobs = &job->work;
3142 
3143         job->task = tmcf->engine->task;
3144         job->work.handler = handler;
3145         job->work.task = &j