xref: /unit/src/nxt_router.c (revision 2165:556348458f34)
1 
2 /*
3  * Copyright (C) Igor Sysoev
4  * Copyright (C) Valentin V. Bartenev
5  * Copyright (C) NGINX, Inc.
6  */
7 
8 #include <nxt_router.h>
9 #include <nxt_conf.h>
10 #if (NXT_TLS)
11 #include <nxt_cert.h>
12 #endif
13 #include <nxt_http.h>
14 #include <nxt_port_memory_int.h>
15 #include <nxt_unit_request.h>
16 #include <nxt_unit_response.h>
17 #include <nxt_router_request.h>
18 #include <nxt_app_queue.h>
19 #include <nxt_port_queue.h>
20 
21 #define NXT_SHARED_PORT_ID  0xFFFFu
22 
23 typedef struct {
24     nxt_str_t         type;
25     uint32_t          processes;
26     uint32_t          max_processes;
27     uint32_t          spare_processes;
28     nxt_msec_t        timeout;
29     nxt_msec_t        idle_timeout;
30     nxt_conf_value_t  *limits_value;
31     nxt_conf_value_t  *processes_value;
32     nxt_conf_value_t  *targets_value;
33 } nxt_router_app_conf_t;
34 
35 
36 typedef struct {
37     nxt_str_t         pass;
38     nxt_str_t         application;
39 } nxt_router_listener_conf_t;
40 
41 
42 #if (NXT_TLS)
43 
44 typedef struct {
45     nxt_str_t               name;
46     nxt_socket_conf_t       *socket_conf;
47     nxt_router_temp_conf_t  *temp_conf;
48     nxt_tls_init_t          *tls_init;
49     nxt_bool_t              last;
50 
51     nxt_queue_link_t        link;  /* for nxt_socket_conf_t.tls */
52 } nxt_router_tlssock_t;
53 
54 #endif
55 
56 
57 typedef struct {
58     nxt_str_t               *name;
59     nxt_socket_conf_t       *socket_conf;
60     nxt_router_temp_conf_t  *temp_conf;
61     nxt_bool_t              last;
62 } nxt_socket_rpc_t;
63 
64 
65 typedef struct {
66     nxt_app_t               *app;
67     nxt_router_temp_conf_t  *temp_conf;
68     uint8_t                 proto;  /* 1 bit */
69 } nxt_app_rpc_t;
70 
71 
72 typedef struct {
73     nxt_app_joint_t         *app_joint;
74     uint32_t                generation;
75     uint8_t                 proto;  /* 1 bit */
76 } nxt_app_joint_rpc_t;
77 
78 
79 static nxt_int_t nxt_router_prefork(nxt_task_t *task, nxt_process_t *process,
80     nxt_mp_t *mp);
81 static nxt_int_t nxt_router_start(nxt_task_t *task, nxt_process_data_t *data);
82 static void nxt_router_greet_controller(nxt_task_t *task,
83     nxt_port_t *controller_port);
84 
85 static nxt_int_t nxt_router_start_app_process(nxt_task_t *task, nxt_app_t *app);
86 
87 static void nxt_router_new_port_handler(nxt_task_t *task,
88     nxt_port_recv_msg_t *msg);
89 static void nxt_router_conf_data_handler(nxt_task_t *task,
90     nxt_port_recv_msg_t *msg);
91 static void nxt_router_app_restart_handler(nxt_task_t *task,
92     nxt_port_recv_msg_t *msg);
93 static void nxt_router_remove_pid_handler(nxt_task_t *task,
94     nxt_port_recv_msg_t *msg);
95 
96 static nxt_router_temp_conf_t *nxt_router_temp_conf(nxt_task_t *task);
97 static void nxt_router_conf_ready(nxt_task_t *task,
98     nxt_router_temp_conf_t *tmcf);
99 static void nxt_router_conf_send(nxt_task_t *task,
100     nxt_router_temp_conf_t *tmcf, nxt_port_msg_type_t type);
101 
102 static nxt_int_t nxt_router_conf_create(nxt_task_t *task,
103     nxt_router_temp_conf_t *tmcf, u_char *start, u_char *end);
104 static nxt_int_t nxt_router_conf_process_static(nxt_task_t *task,
105     nxt_router_conf_t *rtcf, nxt_conf_value_t *conf);
106 static nxt_http_forward_t *nxt_router_conf_forward(nxt_task_t *task,
107     nxt_mp_t *mp, nxt_conf_value_t *conf);
108 static nxt_int_t nxt_router_conf_forward_header(nxt_mp_t *mp,
109     nxt_conf_value_t *conf, nxt_http_forward_header_t *fh);
110 
111 static nxt_app_t *nxt_router_app_find(nxt_queue_t *queue, nxt_str_t *name);
112 static nxt_int_t nxt_router_apps_hash_test(nxt_lvlhsh_query_t *lhq, void *data);
113 static nxt_int_t nxt_router_apps_hash_add(nxt_router_conf_t *rtcf,
114     nxt_app_t *app);
115 static nxt_app_t *nxt_router_apps_hash_get(nxt_router_conf_t *rtcf,
116     nxt_str_t *name);
117 static void nxt_router_apps_hash_use(nxt_task_t *task, nxt_router_conf_t *rtcf,
118     int i);
119 
120 static nxt_int_t nxt_router_app_queue_init(nxt_task_t *task,
121     nxt_port_t *port);
122 static nxt_int_t nxt_router_port_queue_init(nxt_task_t *task,
123     nxt_port_t *port);
124 static nxt_int_t nxt_router_port_queue_map(nxt_task_t *task,
125     nxt_port_t *port, nxt_fd_t fd);
126 static void nxt_router_listen_socket_rpc_create(nxt_task_t *task,
127     nxt_router_temp_conf_t *tmcf, nxt_socket_conf_t *skcf);
128 static void nxt_router_listen_socket_ready(nxt_task_t *task,
129     nxt_port_recv_msg_t *msg, void *data);
130 static void nxt_router_listen_socket_error(nxt_task_t *task,
131     nxt_port_recv_msg_t *msg, void *data);
132 #if (NXT_TLS)
133 static void nxt_router_tls_rpc_handler(nxt_task_t *task,
134     nxt_port_recv_msg_t *msg, void *data);
135 static nxt_int_t nxt_router_conf_tls_insert(nxt_router_temp_conf_t *tmcf,
136     nxt_conf_value_t *value, nxt_socket_conf_t *skcf, nxt_tls_init_t *tls_init,
137     nxt_bool_t last);
138 #endif
139 static void nxt_router_app_rpc_create(nxt_task_t *task,
140     nxt_router_temp_conf_t *tmcf, nxt_app_t *app);
141 static void nxt_router_app_prefork_ready(nxt_task_t *task,
142     nxt_port_recv_msg_t *msg, void *data);
143 static void nxt_router_app_prefork_error(nxt_task_t *task,
144     nxt_port_recv_msg_t *msg, void *data);
145 static nxt_socket_conf_t *nxt_router_socket_conf(nxt_task_t *task,
146     nxt_router_temp_conf_t *tmcf, nxt_str_t *name);
147 static nxt_int_t nxt_router_listen_socket_find(nxt_router_temp_conf_t *tmcf,
148     nxt_socket_conf_t *nskcf, nxt_sockaddr_t *sa);
149 
150 static nxt_int_t nxt_router_engines_create(nxt_task_t *task,
151     nxt_router_t *router, nxt_router_temp_conf_t *tmcf,
152     const nxt_event_interface_t *interface);
153 static nxt_int_t nxt_router_engine_conf_create(nxt_router_temp_conf_t *tmcf,
154     nxt_router_engine_conf_t *recf);
155 static nxt_int_t nxt_router_engine_conf_update(nxt_router_temp_conf_t *tmcf,
156     nxt_router_engine_conf_t *recf);
157 static nxt_int_t nxt_router_engine_conf_delete(nxt_router_temp_conf_t *tmcf,
158     nxt_router_engine_conf_t *recf);
159 static nxt_int_t nxt_router_engine_joints_create(nxt_router_temp_conf_t *tmcf,
160     nxt_router_engine_conf_t *recf, nxt_queue_t *sockets,
161     nxt_work_handler_t handler);
162 static nxt_int_t nxt_router_engine_quit(nxt_router_temp_conf_t *tmcf,
163     nxt_router_engine_conf_t *recf);
164 static nxt_int_t nxt_router_engine_joints_delete(nxt_router_temp_conf_t *tmcf,
165     nxt_router_engine_conf_t *recf, nxt_queue_t *sockets);
166 
167 static nxt_int_t nxt_router_threads_create(nxt_task_t *task, nxt_runtime_t *rt,
168     nxt_router_temp_conf_t *tmcf);
169 static nxt_int_t nxt_router_thread_create(nxt_task_t *task, nxt_runtime_t *rt,
170     nxt_event_engine_t *engine);
171 static void nxt_router_apps_sort(nxt_task_t *task, nxt_router_t *router,
172     nxt_router_temp_conf_t *tmcf);
173 
174 static void nxt_router_engines_post(nxt_router_t *router,
175     nxt_router_temp_conf_t *tmcf);
176 static void nxt_router_engine_post(nxt_event_engine_t *engine,
177     nxt_work_t *jobs);
178 
179 static void nxt_router_thread_start(void *data);
180 static void nxt_router_rt_add_port(nxt_task_t *task, void *obj,
181     void *data);
182 static void nxt_router_listen_socket_create(nxt_task_t *task, void *obj,
183     void *data);
184 static void nxt_router_listen_socket_update(nxt_task_t *task, void *obj,
185     void *data);
186 static void nxt_router_listen_socket_delete(nxt_task_t *task, void *obj,
187     void *data);
188 static void nxt_router_worker_thread_quit(nxt_task_t *task, void *obj,
189     void *data);
190 static void nxt_router_listen_socket_close(nxt_task_t *task, void *obj,
191     void *data);
192 static void nxt_router_thread_exit_handler(nxt_task_t *task, void *obj,
193     void *data);
194 static void nxt_router_req_headers_ack_handler(nxt_task_t *task,
195     nxt_port_recv_msg_t *msg, nxt_request_rpc_data_t *req_rpc_data);
196 static void nxt_router_listen_socket_release(nxt_task_t *task,
197     nxt_socket_conf_t *skcf);
198 
199 static void nxt_router_app_port_ready(nxt_task_t *task,
200     nxt_port_recv_msg_t *msg, void *data);
201 static void nxt_router_app_port_error(nxt_task_t *task,
202     nxt_port_recv_msg_t *msg, void *data);
203 
204 static void nxt_router_app_use(nxt_task_t *task, nxt_app_t *app, int i);
205 static void nxt_router_app_unlink(nxt_task_t *task, nxt_app_t *app);
206 
207 static void nxt_router_app_port_release(nxt_task_t *task, nxt_app_t *app,
208     nxt_port_t *port, nxt_apr_action_t action);
209 static void nxt_router_app_port_get(nxt_task_t *task, nxt_app_t *app,
210     nxt_request_rpc_data_t *req_rpc_data);
211 static void nxt_router_http_request_error(nxt_task_t *task, void *obj,
212     void *data);
213 static void nxt_router_http_request_done(nxt_task_t *task, void *obj,
214     void *data);
215 
216 static void nxt_router_app_prepare_request(nxt_task_t *task,
217     nxt_request_rpc_data_t *req_rpc_data);
218 static nxt_buf_t *nxt_router_prepare_msg(nxt_task_t *task,
219     nxt_http_request_t *r, nxt_app_t *app, const nxt_str_t *prefix);
220 
221 static void nxt_router_app_timeout(nxt_task_t *task, void *obj, void *data);
222 static void nxt_router_adjust_idle_timer(nxt_task_t *task, void *obj,
223     void *data);
224 static void nxt_router_app_idle_timeout(nxt_task_t *task, void *obj,
225     void *data);
226 static void nxt_router_app_joint_release_handler(nxt_task_t *task, void *obj,
227     void *data);
228 static void nxt_router_free_app(nxt_task_t *task, void *obj, void *data);
229 
230 static const nxt_http_request_state_t  nxt_http_request_send_state;
231 static void nxt_http_request_send_body(nxt_task_t *task, void *obj, void *data);
232 
233 static void nxt_router_app_joint_use(nxt_task_t *task,
234     nxt_app_joint_t *app_joint, int i);
235 
236 static void nxt_router_http_request_release_post(nxt_task_t *task,
237     nxt_http_request_t *r);
238 static void nxt_router_http_request_release(nxt_task_t *task, void *obj,
239     void *data);
240 static void nxt_router_oosm_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg);
241 static void nxt_router_get_port_handler(nxt_task_t *task,
242     nxt_port_recv_msg_t *msg);
243 static void nxt_router_get_mmap_handler(nxt_task_t *task,
244     nxt_port_recv_msg_t *msg);
245 
246 extern const nxt_http_request_state_t  nxt_http_websocket;
247 
248 nxt_router_t  *nxt_router;
249 
250 static const nxt_str_t http_prefix = nxt_string("HTTP_");
251 static const nxt_str_t empty_prefix = nxt_string("");
252 
253 static const nxt_str_t  *nxt_app_msg_prefix[] = {
254     &empty_prefix,
255     &empty_prefix,
256     &http_prefix,
257     &http_prefix,
258     &http_prefix,
259     &empty_prefix,
260 };
261 
262 
263 static const nxt_port_handlers_t  nxt_router_process_port_handlers = {
264     .quit         = nxt_signal_quit_handler,
265     .new_port     = nxt_router_new_port_handler,
266     .get_port     = nxt_router_get_port_handler,
267     .change_file  = nxt_port_change_log_file_handler,
268     .mmap         = nxt_port_mmap_handler,
269     .get_mmap     = nxt_router_get_mmap_handler,
270     .data         = nxt_router_conf_data_handler,
271     .app_restart  = nxt_router_app_restart_handler,
272     .remove_pid   = nxt_router_remove_pid_handler,
273     .access_log   = nxt_router_access_log_reopen_handler,
274     .rpc_ready    = nxt_port_rpc_handler,
275     .rpc_error    = nxt_port_rpc_handler,
276     .oosm         = nxt_router_oosm_handler,
277 };
278 
279 
280 const nxt_process_init_t  nxt_router_process = {
281     .name           = "router",
282     .type           = NXT_PROCESS_ROUTER,
283     .prefork        = nxt_router_prefork,
284     .restart        = 1,
285     .setup          = nxt_process_core_setup,
286     .start          = nxt_router_start,
287     .port_handlers  = &nxt_router_process_port_handlers,
288     .signals        = nxt_process_signals,
289 };
290 
291 
292 /* Queues of nxt_socket_conf_t */
293 nxt_queue_t  creating_sockets;
294 nxt_queue_t  pending_sockets;
295 nxt_queue_t  updating_sockets;
296 nxt_queue_t  keeping_sockets;
297 nxt_queue_t  deleting_sockets;
298 
299 
300 static nxt_int_t
nxt_router_prefork(nxt_task_t * task,nxt_process_t * process,nxt_mp_t * mp)301 nxt_router_prefork(nxt_task_t *task, nxt_process_t *process, nxt_mp_t *mp)
302 {
303     nxt_runtime_stop_app_processes(task, task->thread->runtime);
304 
305     return NXT_OK;
306 }
307 
308 
309 static nxt_int_t
nxt_router_start(nxt_task_t * task,nxt_process_data_t * data)310 nxt_router_start(nxt_task_t *task, nxt_process_data_t *data)
311 {
312     nxt_int_t      ret;
313     nxt_port_t     *controller_port;
314     nxt_router_t   *router;
315     nxt_runtime_t  *rt;
316 
317     rt = task->thread->runtime;
318 
319     nxt_log(task, NXT_LOG_INFO, "router started");
320 
321 #if (NXT_TLS)
322     rt->tls = nxt_service_get(rt->services, "SSL/TLS", "OpenSSL");
323     if (nxt_slow_path(rt->tls == NULL)) {
324         return NXT_ERROR;
325     }
326 
327     ret = rt->tls->library_init(task);
328     if (nxt_slow_path(ret != NXT_OK)) {
329         return ret;
330     }
331 #endif
332 
333     ret = nxt_http_init(task);
334     if (nxt_slow_path(ret != NXT_OK)) {
335         return ret;
336     }
337 
338     router = nxt_zalloc(sizeof(nxt_router_t));
339     if (nxt_slow_path(router == NULL)) {
340         return NXT_ERROR;
341     }
342 
343     nxt_queue_init(&router->engines);
344     nxt_queue_init(&router->sockets);
345     nxt_queue_init(&router->apps);
346 
347     nxt_router = router;
348 
349     controller_port = rt->port_by_type[NXT_PROCESS_CONTROLLER];
350     if (controller_port != NULL) {
351         nxt_router_greet_controller(task, controller_port);
352     }
353 
354     return NXT_OK;
355 }
356 
357 
358 static void
nxt_router_greet_controller(nxt_task_t * task,nxt_port_t * controller_port)359 nxt_router_greet_controller(nxt_task_t *task, nxt_port_t *controller_port)
360 {
361     nxt_port_socket_write(task, controller_port, NXT_PORT_MSG_PROCESS_READY,
362                           -1, 0, 0, NULL);
363 }
364 
365 
366 static void
nxt_router_start_app_process_handler(nxt_task_t * task,nxt_port_t * port,void * data)367 nxt_router_start_app_process_handler(nxt_task_t *task, nxt_port_t *port,
368     void *data)
369 {
370     size_t               size;
371     uint32_t             stream;
372     nxt_fd_t             port_fd, queue_fd;
373     nxt_int_t            ret;
374     nxt_app_t            *app;
375     nxt_buf_t            *b;
376     nxt_port_t           *dport;
377     nxt_runtime_t        *rt;
378     nxt_app_joint_rpc_t  *app_joint_rpc;
379 
380     app = data;
381 
382     nxt_thread_mutex_lock(&app->mutex);
383 
384     dport = app->proto_port;
385 
386     nxt_thread_mutex_unlock(&app->mutex);
387 
388     if (dport != NULL) {
389         nxt_debug(task, "app '%V' %p start process", &app->name, app);
390 
391         b = NULL;
392         port_fd = -1;
393         queue_fd = -1;
394 
395     } else {
396         if (app->proto_port_requests > 0) {
397             nxt_debug(task, "app '%V' %p wait for prototype process",
398                       &app->name, app);
399 
400             app->proto_port_requests++;
401 
402             goto skip;
403         }
404 
405         nxt_debug(task, "app '%V' %p start prototype process", &app->name, app);
406 
407         rt = task->thread->runtime;
408         dport = rt->port_by_type[NXT_PROCESS_MAIN];
409 
410         size = app->name.length + 1 + app->conf.length;
411 
412         b = nxt_buf_mem_alloc(task->thread->engine->mem_pool, size, 0);
413         if (nxt_slow_path(b == NULL)) {
414             goto failed;
415         }
416 
417         nxt_buf_cpystr(b, &app->name);
418         *b->mem.free++ = '\0';
419         nxt_buf_cpystr(b, &app->conf);
420 
421         port_fd = app->shared_port->pair[0];
422         queue_fd = app->shared_port->queue_fd;
423     }
424 
425     app_joint_rpc = nxt_port_rpc_register_handler_ex(task, port,
426                                                      nxt_router_app_port_ready,
427                                                      nxt_router_app_port_error,
428                                                    sizeof(nxt_app_joint_rpc_t));
429     if (nxt_slow_path(app_joint_rpc == NULL)) {
430         goto failed;
431     }
432 
433     stream = nxt_port_rpc_ex_stream(app_joint_rpc);
434 
435     ret = nxt_port_socket_write2(task, dport, NXT_PORT_MSG_START_PROCESS,
436                                  port_fd, queue_fd, stream, port->id, b);
437     if (nxt_slow_path(ret != NXT_OK)) {
438         nxt_port_rpc_cancel(task, port, stream);
439 
440         goto failed;
441     }
442 
443     app_joint_rpc->app_joint = app->joint;
444     app_joint_rpc->generation = app->generation;
445     app_joint_rpc->proto = (b != NULL);
446 
447     if (b != NULL) {
448         app->proto_port_requests++;
449 
450         b = NULL;
451     }
452 
453     nxt_router_app_joint_use(task, app->joint, 1);
454 
455 failed:
456 
457     if (b != NULL) {
458         nxt_mp_free(b->data, b);
459     }
460 
461 skip:
462 
463     nxt_router_app_use(task, app, -1);
464 }
465 
466 
467 static void
nxt_router_app_joint_use(nxt_task_t * task,nxt_app_joint_t * app_joint,int i)468 nxt_router_app_joint_use(nxt_task_t *task, nxt_app_joint_t *app_joint, int i)
469 {
470     app_joint->use_count += i;
471 
472     if (app_joint->use_count == 0) {
473         nxt_assert(app_joint->app == NULL);
474 
475         nxt_free(app_joint);
476     }
477 }
478 
479 
480 static nxt_int_t
nxt_router_start_app_process(nxt_task_t * task,nxt_app_t * app)481 nxt_router_start_app_process(nxt_task_t *task, nxt_app_t *app)
482 {
483     nxt_int_t      res;
484     nxt_port_t     *router_port;
485     nxt_runtime_t  *rt;
486 
487     nxt_debug(task, "app '%V' start process", &app->name);
488 
489     rt = task->thread->runtime;
490     router_port = rt->port_by_type[NXT_PROCESS_ROUTER];
491 
492     nxt_router_app_use(task, app, 1);
493 
494     res = nxt_port_post(task, router_port, nxt_router_start_app_process_handler,
495                         app);
496 
497     if (res == NXT_OK) {
498         return res;
499     }
500 
501     nxt_thread_mutex_lock(&app->mutex);
502 
503     app->pending_processes--;
504 
505     nxt_thread_mutex_unlock(&app->mutex);
506 
507     nxt_router_app_use(task, app, -1);
508 
509     return NXT_ERROR;
510 }
511 
512 
513 nxt_inline nxt_bool_t
nxt_router_msg_cancel(nxt_task_t * task,nxt_request_rpc_data_t * req_rpc_data)514 nxt_router_msg_cancel(nxt_task_t *task, nxt_request_rpc_data_t *req_rpc_data)
515 {
516     nxt_buf_t       *b, *next;
517     nxt_bool_t      cancelled;
518     nxt_port_t      *app_port;
519     nxt_msg_info_t  *msg_info;
520 
521     msg_info = &req_rpc_data->msg_info;
522 
523     if (msg_info->buf == NULL) {
524         return 0;
525     }
526 
527     app_port = req_rpc_data->app_port;
528 
529     if (app_port != NULL && app_port->id == NXT_SHARED_PORT_ID) {
530         cancelled = nxt_app_queue_cancel(app_port->queue,
531                                          msg_info->tracking_cookie,
532                                          req_rpc_data->stream);
533 
534         if (cancelled) {
535             nxt_debug(task, "stream #%uD: cancelled by router",
536                       req_rpc_data->stream);
537         }
538 
539     } else {
540         cancelled = 0;
541     }
542 
543     for (b = msg_info->buf; b != NULL; b = next) {
544         next = b->next;
545         b->next = NULL;
546 
547         if (b->is_port_mmap_sent) {
548             b->is_port_mmap_sent = cancelled == 0;
549         }
550 
551         b->completion_handler(task, b, b->parent);
552     }
553 
554     msg_info->buf = NULL;
555 
556     return cancelled;
557 }
558 
559 
560 nxt_inline nxt_bool_t
nxt_queue_chk_remove(nxt_queue_link_t * lnk)561 nxt_queue_chk_remove(nxt_queue_link_t *lnk)
562 {
563     if (lnk->next != NULL) {
564         nxt_queue_remove(lnk);
565 
566         lnk->next = NULL;
567 
568         return 1;
569     }
570 
571     return 0;
572 }
573 
574 
575 nxt_inline void
nxt_request_rpc_data_unlink(nxt_task_t * task,nxt_request_rpc_data_t * req_rpc_data)576 nxt_request_rpc_data_unlink(nxt_task_t *task,
577     nxt_request_rpc_data_t *req_rpc_data)
578 {
579     nxt_app_t           *app;
580     nxt_bool_t          unlinked;
581     nxt_http_request_t  *r;
582 
583     nxt_router_msg_cancel(task, req_rpc_data);
584 
585     app = req_rpc_data->app;
586 
587     if (req_rpc_data->app_port != NULL) {
588         nxt_router_app_port_release(task, app, req_rpc_data->app_port,
589                                     req_rpc_data->apr_action);
590 
591         req_rpc_data->app_port = NULL;
592     }
593 
594     r = req_rpc_data->request;
595 
596     if (r != NULL) {
597         r->timer_data = NULL;
598 
599         nxt_router_http_request_release_post(task, r);
600 
601         r->req_rpc_data = NULL;
602         req_rpc_data->request = NULL;
603 
604         if (app != NULL) {
605             unlinked = 0;
606 
607             nxt_thread_mutex_lock(&app->mutex);
608 
609             if (r->app_link.next != NULL) {
610                 nxt_queue_remove(&r->app_link);
611                 r->app_link.next = NULL;
612 
613                 unlinked = 1;
614             }
615 
616             nxt_thread_mutex_unlock(&app->mutex);
617 
618             if (unlinked) {
619                 nxt_mp_release(r->mem_pool);
620             }
621         }
622     }
623 
624     if (app != NULL) {
625         nxt_router_app_use(task, app, -1);
626 
627         req_rpc_data->app = NULL;
628     }
629 
630     if (req_rpc_data->msg_info.body_fd != -1) {
631         nxt_fd_close(req_rpc_data->msg_info.body_fd);
632 
633         req_rpc_data->msg_info.body_fd = -1;
634     }
635 
636     if (req_rpc_data->rpc_cancel) {
637         req_rpc_data->rpc_cancel = 0;
638 
639         nxt_port_rpc_cancel(task, task->thread->engine->port,
640                             req_rpc_data->stream);
641     }
642 }
643 
644 
645 static void
nxt_router_new_port_handler(nxt_task_t * task,nxt_port_recv_msg_t * msg)646 nxt_router_new_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
647 {
648     nxt_int_t      res;
649     nxt_app_t      *app;
650     nxt_port_t     *port, *main_app_port;
651     nxt_runtime_t  *rt;
652 
653     nxt_port_new_port_handler(task, msg);
654 
655     port = msg->u.new_port;
656 
657     if (port != NULL && port->type == NXT_PROCESS_CONTROLLER) {
658         nxt_router_greet_controller(task, msg->u.new_port);
659     }
660 
661     if (port != NULL && port->type == NXT_PROCESS_PROTOTYPE)  {
662         nxt_port_rpc_handler(task, msg);
663 
664         return;
665     }
666 
667     if (port == NULL || port->type != NXT_PROCESS_APP) {
668 
669         if (msg->port_msg.stream == 0) {
670             return;
671         }
672 
673         msg->port_msg.type = _NXT_PORT_MSG_RPC_ERROR;
674 
675     } else {
676         if (msg->fd[1] != -1) {
677             res = nxt_router_port_queue_map(task, port, msg->fd[1]);
678             if (nxt_slow_path(res != NXT_OK)) {
679                 return;
680             }
681 
682             nxt_fd_close(msg->fd[1]);
683             msg->fd[1] = -1;
684         }
685     }
686 
687     if (msg->port_msg.stream != 0) {
688         nxt_port_rpc_handler(task, msg);
689         return;
690     }
691 
692     nxt_debug(task, "new port id %d (%d)", port->id, port->type);
693 
694     /*
695      * Port with "id == 0" is application 'main' port and it always
696      * should come with non-zero stream.
697      */
698     nxt_assert(port->id != 0);
699 
700     /* Find 'main' app port and get app reference. */
701     rt = task->thread->runtime;
702 
703     /*
704      * It is safe to access 'runtime->ports' hash because 'NEW_PORT'
705      * sent to main port (with id == 0) and processed in main thread.
706      */
707     main_app_port = nxt_port_hash_find(&rt->ports, port->pid, 0);
708     nxt_assert(main_app_port != NULL);
709 
710     app = main_app_port->app;
711 
712     if (nxt_fast_path(app != NULL)) {
713         nxt_thread_mutex_lock(&app->mutex);
714 
715         /* TODO here should be find-and-add code because there can be
716            port waiters in port_hash */
717         nxt_port_hash_add(&app->port_hash, port);
718         app->port_hash_count++;
719 
720         nxt_thread_mutex_unlock(&app->mutex);
721 
722         port->app = app;
723     }
724 
725     port->main_app_port = main_app_port;
726 
727     nxt_port_socket_write(task, port, NXT_PORT_MSG_PORT_ACK, -1, 0, 0, NULL);
728 }
729 
730 
731 static void
nxt_router_conf_data_handler(nxt_task_t * task,nxt_port_recv_msg_t * msg)732 nxt_router_conf_data_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
733 {
734     void                    *p;
735     size_t                  size;
736     nxt_int_t               ret;
737     nxt_port_t              *port;
738     nxt_router_temp_conf_t  *tmcf;
739 
740     port = nxt_runtime_port_find(task->thread->runtime,
741                                  msg->port_msg.pid,
742                                  msg->port_msg.reply_port);
743     if (nxt_slow_path(port == NULL)) {
744         nxt_alert(task, "conf_data_handler: reply port not found");
745         return;
746     }
747 
748     p = MAP_FAILED;
749 
750     /*
751      * Ancient compilers like gcc 4.8.5 on CentOS 7 wants 'size' to be
752      * initialized in 'cleanup' section.
753      */
754     size = 0;
755 
756     tmcf = nxt_router_temp_conf(task);
757     if (nxt_slow_path(tmcf == NULL)) {
758         goto fail;
759     }
760 
761     if (nxt_slow_path(msg->fd[0] == -1)) {
762         nxt_alert(task, "conf_data_handler: invalid shm fd");
763         goto fail;
764     }
765 
766     if (nxt_buf_mem_used_size(&msg->buf->mem) != sizeof(size_t)) {
767         nxt_alert(task, "conf_data_handler: unexpected buffer size (%d)",
768                   (int) nxt_buf_mem_used_size(&msg->buf->mem));
769         goto fail;
770     }
771 
772     nxt_memcpy(&size, msg->buf->mem.pos, sizeof(size_t));
773 
774     p = nxt_mem_mmap(NULL, size, PROT_READ, MAP_SHARED, msg->fd[0], 0);
775 
776     nxt_fd_close(msg->fd[0]);
777     msg->fd[0] = -1;
778 
779     if (nxt_slow_path(p == MAP_FAILED)) {
780         goto fail;
781     }
782 
783     nxt_debug(task, "conf_data_handler(%uz): %*s", size, size, p);
784 
785     tmcf->router_conf->router = nxt_router;
786     tmcf->stream = msg->port_msg.stream;
787     tmcf->port = port;
788 
789     nxt_port_use(task, tmcf->port, 1);
790 
791     ret = nxt_router_conf_create(task, tmcf, p, nxt_pointer_to(p, size));
792 
793     if (nxt_fast_path(ret == NXT_OK)) {
794         nxt_router_conf_apply(task, tmcf, NULL);
795 
796     } else {
797         nxt_router_conf_error(task, tmcf);
798     }
799 
800     goto cleanup;
801 
802 fail:
803 
804     nxt_port_socket_write(task, port, NXT_PORT_MSG_RPC_ERROR, -1,
805                           msg->port_msg.stream, 0, NULL);
806 
807     if (tmcf != NULL) {
808         nxt_mp_release(tmcf->mem_pool);
809     }
810 
811 cleanup:
812 
813     if (p != MAP_FAILED) {
814         nxt_mem_munmap(p, size);
815     }
816 
817     if (msg->fd[0] != -1) {
818         nxt_fd_close(msg->fd[0]);
819         msg->fd[0] = -1;
820     }
821 }
822 
823 
824 static void
nxt_router_app_restart_handler(nxt_task_t * task,nxt_port_recv_msg_t * msg)825 nxt_router_app_restart_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
826 {
827     nxt_app_t            *app;
828     nxt_int_t            ret;
829     nxt_str_t            app_name;
830     nxt_port_t           *reply_port, *shared_port, *old_shared_port;
831     nxt_port_t           *proto_port;
832     nxt_port_msg_type_t  reply;
833 
834     reply_port = nxt_runtime_port_find(task->thread->runtime,
835                                        msg->port_msg.pid,
836                                        msg->port_msg.reply_port);
837     if (nxt_slow_path(reply_port == NULL)) {
838         nxt_alert(task, "app_restart_handler: reply port not found");
839         return;
840     }
841 
842     app_name.length = nxt_buf_mem_used_size(&msg->buf->mem);
843     app_name.start = msg->buf->mem.pos;
844 
845     nxt_debug(task, "app_restart_handler: %V", &app_name);
846 
847     app = nxt_router_app_find(&nxt_router->apps, &app_name);
848 
849     if (nxt_fast_path(app != NULL)) {
850         shared_port = nxt_port_new(task, NXT_SHARED_PORT_ID, nxt_pid,
851                                    NXT_PROCESS_APP);
852         if (nxt_slow_path(shared_port == NULL)) {
853             goto fail;
854         }
855 
856         ret = nxt_port_socket_init(task, shared_port, 0);
857         if (nxt_slow_path(ret != NXT_OK)) {
858             nxt_port_use(task, shared_port, -1);
859             goto fail;
860         }
861 
862         ret = nxt_router_app_queue_init(task, shared_port);
863         if (nxt_slow_path(ret != NXT_OK)) {
864             nxt_port_write_close(shared_port);
865             nxt_port_read_close(shared_port);
866             nxt_port_use(task, shared_port, -1);
867             goto fail;
868         }
869 
870         nxt_port_write_enable(task, shared_port);
871 
872         nxt_thread_mutex_lock(&app->mutex);
873 
874         proto_port = app->proto_port;
875 
876         if (proto_port != NULL) {
877             nxt_debug(task, "send QUIT to prototype '%V' pid %PI", &app->name,
878                       proto_port->pid);
879 
880             app->proto_port = NULL;
881             proto_port->app = NULL;
882         }
883 
884         app->generation++;
885 
886         shared_port->app = app;
887 
888         old_shared_port = app->shared_port;
889         old_shared_port->app = NULL;
890 
891         app->shared_port = shared_port;
892 
893         nxt_thread_mutex_unlock(&app->mutex);
894 
895         nxt_port_close(task, old_shared_port);
896         nxt_port_use(task, old_shared_port, -1);
897 
898         if (proto_port != NULL) {
899             (void) nxt_port_socket_write(task, proto_port, NXT_PORT_MSG_QUIT,
900                                          -1, 0, 0, NULL);
901 
902             nxt_port_close(task, proto_port);
903 
904             nxt_port_use(task, proto_port, -1);
905         }
906 
907         reply = NXT_PORT_MSG_RPC_READY_LAST;
908 
909     } else {
910 
911 fail:
912 
913         reply = NXT_PORT_MSG_RPC_ERROR;
914     }
915 
916     nxt_port_socket_write(task, reply_port, reply, -1, msg->port_msg.stream,
917                           0, NULL);
918 }
919 
920 
921 static void
nxt_router_app_process_remove_pid(nxt_task_t * task,nxt_port_t * port,void * data)922 nxt_router_app_process_remove_pid(nxt_task_t *task, nxt_port_t *port,
923     void *data)
924 {
925     union {
926         nxt_pid_t  removed_pid;
927         void       *data;
928     } u;
929 
930     u.data = data;
931 
932     nxt_port_rpc_remove_peer(task, port, u.removed_pid);
933 }
934 
935 
936 static void
nxt_router_remove_pid_handler(nxt_task_t * task,nxt_port_recv_msg_t * msg)937 nxt_router_remove_pid_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
938 {
939     nxt_event_engine_t  *engine;
940 
941     nxt_port_remove_pid_handler(task, msg);
942 
943     nxt_queue_each(engine, &nxt_router->engines, nxt_event_engine_t, link0)
944     {
945         if (nxt_fast_path(engine->port != NULL)) {
946             nxt_port_post(task, engine->port, nxt_router_app_process_remove_pid,
947                           msg->u.data);
948         }
949     }
950     nxt_queue_loop;
951 
952     if (msg->port_msg.stream == 0) {
953         return;
954     }
955 
956     msg->port_msg.type = _NXT_PORT_MSG_RPC_ERROR;
957 
958     nxt_port_rpc_handler(task, msg);
959 }
960 
961 
962 static nxt_router_temp_conf_t *
nxt_router_temp_conf(nxt_task_t * task)963 nxt_router_temp_conf(nxt_task_t *task)
964 {
965     nxt_mp_t                *mp, *tmp;
966     nxt_router_conf_t       *rtcf;
967     nxt_router_temp_conf_t  *tmcf;
968 
969     mp = nxt_mp_create(1024, 128, 256, 32);
970     if (nxt_slow_path(mp == NULL)) {
971         return NULL;
972     }
973 
974     rtcf = nxt_mp_zget(mp, sizeof(nxt_router_conf_t));
975     if (nxt_slow_path(rtcf == NULL)) {
976         goto fail;
977     }
978 
979     rtcf->mem_pool = mp;
980 
981     rtcf->var_fields = nxt_array_create(mp, 4, sizeof(nxt_var_field_t));
982     if (nxt_slow_path(rtcf->var_fields == NULL)) {
983         goto fail;
984     }
985 
986     tmp = nxt_mp_create(1024, 128, 256, 32);
987     if (nxt_slow_path(tmp == NULL)) {
988         goto fail;
989     }
990 
991     tmcf = nxt_mp_zget(tmp, sizeof(nxt_router_temp_conf_t));
992     if (nxt_slow_path(tmcf == NULL)) {
993         goto temp_fail;
994     }
995 
996     tmcf->mem_pool = tmp;
997     tmcf->router_conf = rtcf;
998     tmcf->count = 1;
999     tmcf->engine = task->thread->engine;
1000 
1001     tmcf->engines = nxt_array_create(tmcf->mem_pool, 4,
1002                                      sizeof(nxt_router_engine_conf_t));
1003     if (nxt_slow_path(tmcf->engines == NULL)) {
1004         goto temp_fail;
1005     }
1006 
1007     nxt_queue_init(&creating_sockets);
1008     nxt_queue_init(&pending_sockets);
1009     nxt_queue_init(&updating_sockets);
1010     nxt_queue_init(&keeping_sockets);
1011     nxt_queue_init(&deleting_sockets);
1012 
1013 #if (NXT_TLS)
1014     nxt_queue_init(&tmcf->tls);
1015 #endif
1016 
1017     nxt_queue_init(&tmcf->apps);
1018     nxt_queue_init(&tmcf->previous);
1019 
1020     return tmcf;
1021 
1022 temp_fail:
1023 
1024     nxt_mp_destroy(tmp);
1025 
1026 fail:
1027 
1028     nxt_mp_destroy(mp);
1029 
1030     return NULL;
1031 }
1032 
1033 
1034 nxt_inline nxt_bool_t
nxt_router_app_can_start(nxt_app_t * app)1035 nxt_router_app_can_start(nxt_app_t *app)
1036 {
1037     return app->processes + app->pending_processes < app->max_processes
1038             && app->pending_processes < app->max_pending_processes;
1039 }
1040 
1041 
1042 nxt_inline nxt_bool_t
nxt_router_app_need_start(nxt_app_t * app)1043 nxt_router_app_need_start(nxt_app_t *app)
1044 {
1045     return (app->active_requests
1046               > app->port_hash_count + app->pending_processes)
1047            || (app->spare_processes
1048                 > app->idle_processes + app->pending_processes);
1049 }
1050 
1051 
1052 void
nxt_router_conf_apply(nxt_task_t * task,void * obj,void * data)1053 nxt_router_conf_apply(nxt_task_t *task, void *obj, void *data)
1054 {
1055     nxt_int_t                    ret;
1056     nxt_app_t                    *app;
1057     nxt_router_t                 *router;
1058     nxt_runtime_t                *rt;
1059     nxt_queue_link_t             *qlk;
1060     nxt_socket_conf_t            *skcf;
1061     nxt_router_conf_t            *rtcf;
1062     nxt_router_temp_conf_t       *tmcf;
1063     const nxt_event_interface_t  *interface;
1064 #if (NXT_TLS)
1065     nxt_router_tlssock_t         *tls;
1066 #endif
1067 
1068     tmcf = obj;
1069 
1070     qlk = nxt_queue_first(&pending_sockets);
1071 
1072     if (qlk != nxt_queue_tail(&pending_sockets)) {
1073         nxt_queue_remove(qlk);
1074         nxt_queue_insert_tail(&creating_sockets, qlk);
1075 
1076         skcf = nxt_queue_link_data(qlk, nxt_socket_conf_t, link);
1077 
1078         nxt_router_listen_socket_rpc_create(task, tmcf, skcf);
1079 
1080         return;
1081     }
1082 
1083 #if (NXT_TLS)
1084     qlk = nxt_queue_last(&tmcf->tls);
1085 
1086     if (qlk != nxt_queue_head(&tmcf->tls)) {
1087         nxt_queue_remove(qlk);
1088 
1089         tls = nxt_queue_link_data(qlk, nxt_router_tlssock_t, link);
1090 
1091         nxt_cert_store_get(task, &tls->name, tmcf->mem_pool,
1092                            nxt_router_tls_rpc_handler, tls);
1093         return;
1094     }
1095 #endif
1096 
1097     nxt_queue_each(app, &tmcf->apps, nxt_app_t, link) {
1098 
1099         if (nxt_router_app_need_start(app)) {
1100             nxt_router_app_rpc_create(task, tmcf, app);
1101             return;
1102         }
1103 
1104     } nxt_queue_loop;
1105 
1106     rtcf = tmcf->router_conf;
1107 
1108     if (rtcf->access_log != NULL && rtcf->access_log->fd == -1) {
1109         nxt_router_access_log_open(task, tmcf);
1110         return;
1111     }
1112 
1113     rt = task->thread->runtime;
1114 
1115     interface = nxt_service_get(rt->services, "engine", NULL);
1116 
1117     router = rtcf->router;
1118 
1119     ret = nxt_router_engines_create(task, router, tmcf, interface);
1120     if (nxt_slow_path(ret != NXT_OK)) {
1121         goto fail;
1122     }
1123 
1124     ret = nxt_router_threads_create(task, rt, tmcf);
1125     if (nxt_slow_path(ret != NXT_OK)) {
1126         goto fail;
1127     }
1128 
1129     nxt_router_apps_sort(task, router, tmcf);
1130 
1131     nxt_router_apps_hash_use(task, rtcf, 1);
1132 
1133     nxt_router_engines_post(router, tmcf);
1134 
1135     nxt_queue_add(&router->sockets, &updating_sockets);
1136     nxt_queue_add(&router->sockets, &creating_sockets);
1137 
1138     if (router->access_log != rtcf->access_log) {
1139         nxt_router_access_log_use(&router->lock, rtcf->access_log);
1140 
1141         nxt_router_access_log_release(task, &router->lock, router->access_log);
1142 
1143         router->access_log = rtcf->access_log;
1144     }
1145 
1146     nxt_router_conf_ready(task, tmcf);
1147 
1148     return;
1149 
1150 fail:
1151 
1152     nxt_router_conf_error(task, tmcf);
1153 
1154     return;
1155 }
1156 
1157 
1158 static void
nxt_router_conf_wait(nxt_task_t * task,void * obj,void * data)1159 nxt_router_conf_wait(nxt_task_t *task, void *obj, void *data)
1160 {
1161     nxt_joint_job_t  *job;
1162 
1163     job = obj;
1164 
1165     nxt_router_conf_ready(task, job->tmcf);
1166 }
1167 
1168 
1169 static void
nxt_router_conf_ready(nxt_task_t * task,nxt_router_temp_conf_t * tmcf)1170 nxt_router_conf_ready(nxt_task_t *task, nxt_router_temp_conf_t *tmcf)
1171 {
1172     uint32_t               count;
1173     nxt_router_conf_t      *rtcf;
1174     nxt_thread_spinlock_t  *lock;
1175 
1176     nxt_debug(task, "temp conf %p count: %D", tmcf, tmcf->count);
1177 
1178     if (--tmcf->count > 0) {
1179         return;
1180     }
1181 
1182     nxt_router_conf_send(task, tmcf, NXT_PORT_MSG_RPC_READY_LAST);
1183 
1184     rtcf = tmcf->router_conf;
1185 
1186     lock = &rtcf->router->lock;
1187 
1188     nxt_thread_spin_lock(lock);
1189 
1190     count = rtcf->count;
1191 
1192     nxt_thread_spin_unlock(lock);
1193 
1194     nxt_debug(task, "rtcf %p: %D", rtcf, count);
1195 
1196     if (count == 0) {
1197         nxt_router_apps_hash_use(task, rtcf, -1);
1198 
1199         nxt_router_access_log_release(task, lock, rtcf->access_log);
1200 
1201         nxt_mp_destroy(rtcf->mem_pool);
1202     }
1203 
1204     nxt_mp_release(tmcf->mem_pool);
1205 }
1206 
1207 
1208 void
nxt_router_conf_error(nxt_task_t * task,nxt_router_temp_conf_t * tmcf)1209 nxt_router_conf_error(nxt_task_t *task, nxt_router_temp_conf_t *tmcf)
1210 {
1211     nxt_app_t          *app;
1212     nxt_socket_t       s;
1213     nxt_router_t       *router;
1214     nxt_queue_link_t   *qlk;
1215     nxt_socket_conf_t  *skcf;
1216     nxt_router_conf_t  *rtcf;
1217 
1218     nxt_alert(task, "failed to apply new conf");
1219 
1220     for (qlk = nxt_queue_first(&creating_sockets);
1221          qlk != nxt_queue_tail(&creating_sockets);
1222          qlk = nxt_queue_next(qlk))
1223     {
1224         skcf = nxt_queue_link_data(qlk, nxt_socket_conf_t, link);
1225         s = skcf->listen->socket;
1226 
1227         if (s != -1) {
1228             nxt_socket_close(task, s);
1229         }
1230 
1231         nxt_free(skcf->listen);
1232     }
1233 
1234     rtcf = tmcf->router_conf;
1235 
1236     nxt_queue_each(app, &tmcf->apps, nxt_app_t, link) {
1237 
1238         nxt_router_app_unlink(task, app);
1239 
1240     } nxt_queue_loop;
1241 
1242     router = rtcf->router;
1243 
1244     nxt_queue_add(&router->sockets, &keeping_sockets);
1245     nxt_queue_add(&router->sockets, &deleting_sockets);
1246 
1247     nxt_queue_add(&router->apps, &tmcf->previous);
1248 
1249     // TODO: new engines and threads
1250 
1251     nxt_router_access_log_release(task, &router->lock, rtcf->access_log);
1252 
1253     nxt_mp_destroy(rtcf->mem_pool);
1254 
1255     nxt_router_conf_send(task, tmcf, NXT_PORT_MSG_RPC_ERROR);
1256 
1257     nxt_mp_release(tmcf->mem_pool);
1258 }
1259 
1260 
1261 static void
nxt_router_conf_send(nxt_task_t * task,nxt_router_temp_conf_t * tmcf,nxt_port_msg_type_t type)1262 nxt_router_conf_send(nxt_task_t *task, nxt_router_temp_conf_t *tmcf,
1263     nxt_port_msg_type_t type)
1264 {
1265     nxt_port_socket_write(task, tmcf->port, type, -1, tmcf->stream, 0, NULL);
1266 
1267     nxt_port_use(task, tmcf->port, -1);
1268 
1269     tmcf->port = NULL;
1270 }
1271 
1272 
1273 static nxt_conf_map_t  nxt_router_conf[] = {
1274     {
1275         nxt_string("listeners_threads"),
1276         NXT_CONF_MAP_INT32,
1277         offsetof(nxt_router_conf_t, threads),
1278     },
1279 };
1280 
1281 
1282 static nxt_conf_map_t  nxt_router_app_conf[] = {
1283     {
1284         nxt_string("type"),
1285         NXT_CONF_MAP_STR,
1286         offsetof(nxt_router_app_conf_t, type),
1287     },
1288 
1289     {
1290         nxt_string("limits"),
1291         NXT_CONF_MAP_PTR,
1292         offsetof(nxt_router_app_conf_t, limits_value),
1293     },
1294 
1295     {
1296         nxt_string("processes"),
1297         NXT_CONF_MAP_INT32,
1298         offsetof(nxt_router_app_conf_t, processes),
1299     },
1300 
1301     {
1302         nxt_string("processes"),
1303         NXT_CONF_MAP_PTR,
1304         offsetof(nxt_router_app_conf_t, processes_value),
1305     },
1306 
1307     {
1308         nxt_string("targets"),
1309         NXT_CONF_MAP_PTR,
1310         offsetof(nxt_router_app_conf_t, targets_value),
1311     },
1312 };
1313 
1314 
1315 static nxt_conf_map_t  nxt_router_app_limits_conf[] = {
1316     {
1317         nxt_string("timeout"),
1318         NXT_CONF_MAP_MSEC,
1319         offsetof(nxt_router_app_conf_t, timeout),
1320     },
1321 };
1322 
1323 
1324 static nxt_conf_map_t  nxt_router_app_processes_conf[] = {
1325     {
1326         nxt_string("spare"),
1327         NXT_CONF_MAP_INT32,
1328         offsetof(nxt_router_app_conf_t, spare_processes),
1329     },
1330 
1331     {
1332         nxt_string("max"),
1333         NXT_CONF_MAP_INT32,
1334         offsetof(nxt_router_app_conf_t, max_processes),
1335     },
1336 
1337     {
1338         nxt_string("idle_timeout"),
1339         NXT_CONF_MAP_MSEC,
1340         offsetof(nxt_router_app_conf_t, idle_timeout),
1341     },
1342 };
1343 
1344 
1345 static nxt_conf_map_t  nxt_router_listener_conf[] = {
1346     {
1347         nxt_string("pass"),
1348         NXT_CONF_MAP_STR_COPY,
1349         offsetof(nxt_router_listener_conf_t, pass),
1350     },
1351 
1352     {
1353         nxt_string("application"),
1354         NXT_CONF_MAP_STR_COPY,
1355         offsetof(nxt_router_listener_conf_t, application),
1356     },
1357 };
1358 
1359 
1360 static nxt_conf_map_t  nxt_router_http_conf[] = {
1361     {
1362         nxt_string("header_buffer_size"),
1363         NXT_CONF_MAP_SIZE,
1364         offsetof(nxt_socket_conf_t, header_buffer_size),
1365     },
1366 
1367     {
1368         nxt_string("large_header_buffer_size"),
1369         NXT_CONF_MAP_SIZE,
1370         offsetof(nxt_socket_conf_t, large_header_buffer_size),
1371     },
1372 
1373     {
1374         nxt_string("large_header_buffers"),
1375         NXT_CONF_MAP_SIZE,
1376         offsetof(nxt_socket_conf_t, large_header_buffers),
1377     },
1378 
1379     {
1380         nxt_string("body_buffer_size"),
1381         NXT_CONF_MAP_SIZE,
1382         offsetof(nxt_socket_conf_t, body_buffer_size),
1383     },
1384 
1385     {
1386         nxt_string("max_body_size"),
1387         NXT_CONF_MAP_SIZE,
1388         offsetof(nxt_socket_conf_t, max_body_size),
1389     },
1390 
1391     {
1392         nxt_string("idle_timeout"),
1393         NXT_CONF_MAP_MSEC,
1394         offsetof(nxt_socket_conf_t, idle_timeout),
1395     },
1396 
1397     {
1398         nxt_string("header_read_timeout"),
1399         NXT_CONF_MAP_MSEC,
1400         offsetof(nxt_socket_conf_t, header_read_timeout),
1401     },
1402 
1403     {
1404         nxt_string("body_read_timeout"),
1405         NXT_CONF_MAP_MSEC,
1406         offsetof(nxt_socket_conf_t, body_read_timeout),
1407     },
1408 
1409     {
1410         nxt_string("send_timeout"),
1411         NXT_CONF_MAP_MSEC,
1412         offsetof(nxt_socket_conf_t, send_timeout),
1413     },
1414 
1415     {
1416         nxt_string("body_temp_path"),
1417         NXT_CONF_MAP_STR,
1418         offsetof(nxt_socket_conf_t, body_temp_path),
1419     },
1420 
1421     {
1422         nxt_string("discard_unsafe_fields"),
1423         NXT_CONF_MAP_INT8,
1424         offsetof(nxt_socket_conf_t, discard_unsafe_fields),
1425     },
1426 };
1427 
1428 
1429 static nxt_conf_map_t  nxt_router_websocket_conf[] = {
1430     {
1431         nxt_string("max_frame_size"),
1432         NXT_CONF_MAP_SIZE,
1433         offsetof(nxt_websocket_conf_t, max_frame_size),
1434     },
1435 
1436     {
1437         nxt_string("read_timeout"),
1438         NXT_CONF_MAP_MSEC,
1439         offsetof(nxt_websocket_conf_t, read_timeout),
1440     },
1441 
1442     {
1443         nxt_string("keepalive_interval"),
1444         NXT_CONF_MAP_MSEC,
1445         offsetof(nxt_websocket_conf_t, keepalive_interval),
1446     },
1447 
1448 };
1449 
1450 
1451 static nxt_int_t
nxt_router_conf_create(nxt_task_t * task,nxt_router_temp_conf_t * tmcf,u_char * start,u_char * end)1452 nxt_router_conf_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf,
1453     u_char *start, u_char *end)
1454 {
1455     u_char                      *p;
1456     size_t                      size;
1457     nxt_mp_t                    *mp, *app_mp;
1458     uint32_t                    next, next_target;
1459     nxt_int_t                   ret;
1460     nxt_str_t                   name, target;
1461     nxt_app_t                   *app, *prev;
1462     nxt_str_t                   *t, *s, *targets;
1463     nxt_uint_t                  n, i;
1464     nxt_port_t                  *port;
1465     nxt_router_t                *router;
1466     nxt_app_joint_t             *app_joint;
1467 #if (NXT_TLS)
1468     nxt_tls_init_t              *tls_init;
1469     nxt_conf_value_t            *certificate;
1470 #endif
1471     nxt_conf_value_t            *root, *conf, *http, *value, *websocket;
1472     nxt_conf_value_t            *applications, *application;
1473     nxt_conf_value_t            *listeners, *listener;
1474     nxt_socket_conf_t           *skcf;
1475     nxt_router_conf_t           *rtcf;
1476     nxt_http_routes_t           *routes;
1477     nxt_event_engine_t          *engine;
1478     nxt_app_lang_module_t       *lang;
1479     nxt_router_app_conf_t       apcf;
1480     nxt_router_listener_conf_t  lscf;
1481 
1482     static nxt_str_t  http_path = nxt_string("/settings/http");
1483     static nxt_str_t  applications_path = nxt_string("/applications");
1484     static nxt_str_t  listeners_path = nxt_string("/listeners");
1485     static nxt_str_t  routes_path = nxt_string("/routes");
1486     static nxt_str_t  access_log_path = nxt_string("/access_log");
1487 #if (NXT_TLS)
1488     static nxt_str_t  certificate_path = nxt_string("/tls/certificate");
1489     static nxt_str_t  conf_commands_path = nxt_string("/tls/conf_commands");
1490     static nxt_str_t  conf_cache_path = nxt_string("/tls/session/cache_size");
1491     static nxt_str_t  conf_timeout_path = nxt_string("/tls/session/timeout");
1492     static nxt_str_t  conf_tickets = nxt_string("/tls/session/tickets");
1493 #endif
1494     static nxt_str_t  static_path = nxt_string("/settings/http/static");
1495     static nxt_str_t  websocket_path = nxt_string("/settings/http/websocket");
1496     static nxt_str_t  forwarded_path = nxt_string("/forwarded");
1497     static nxt_str_t  client_ip_path = nxt_string("/client_ip");
1498 
1499     root = nxt_conf_json_parse(tmcf->mem_pool, start, end, NULL);
1500     if (root == NULL) {
1501         nxt_alert(task, "configuration parsing error");
1502         return NXT_ERROR;
1503     }
1504 
1505     rtcf = tmcf->router_conf;
1506     mp = rtcf->mem_pool;
1507 
1508     ret = nxt_conf_map_object(mp, root, nxt_router_conf,
1509                               nxt_nitems(nxt_router_conf), rtcf);
1510     if (ret != NXT_OK) {
1511         nxt_alert(task, "root map error");
1512         return NXT_ERROR;
1513     }
1514 
1515     if (rtcf->threads == 0) {
1516         rtcf->threads = nxt_ncpu;
1517     }
1518 
1519     conf = nxt_conf_get_path(root, &static_path);
1520 
1521     ret = nxt_router_conf_process_static(task, rtcf, conf);
1522     if (nxt_slow_path(ret != NXT_OK)) {
1523         return NXT_ERROR;
1524     }
1525 
1526     router = rtcf->router;
1527 
1528     applications = nxt_conf_get_path(root, &applications_path);
1529 
1530     if (applications != NULL) {
1531         next = 0;
1532 
1533         for ( ;; ) {
1534             application = nxt_conf_next_object_member(applications,
1535                                                       &name, &next);
1536             if (application == NULL) {
1537                 break;
1538             }
1539 
1540             nxt_debug(task, "application \"%V\"", &name);
1541 
1542             size = nxt_conf_json_length(application, NULL);
1543 
1544             app_mp = nxt_mp_create(4096, 128, 1024, 64);
1545             if (nxt_slow_path(app_mp == NULL)) {
1546                 goto fail;
1547             }
1548 
1549             app = nxt_mp_get(app_mp, sizeof(nxt_app_t) + name.length + size);
1550             if (app == NULL) {
1551                 goto app_fail;
1552             }
1553 
1554             nxt_memzero(app, sizeof(nxt_app_t));
1555 
1556             app->mem_pool = app_mp;
1557 
1558             app->name.start = nxt_pointer_to(app, sizeof(nxt_app_t));
1559             app->conf.start = nxt_pointer_to(app, sizeof(nxt_app_t)
1560                                                   + name.length);
1561 
1562             p = nxt_conf_json_print(app->conf.start, application, NULL);
1563             app->conf.length = p - app->conf.start;
1564 
1565             nxt_assert(app->conf.length <= size);
1566 
1567             nxt_debug(task, "application conf \"%V\"", &app->conf);
1568 
1569             prev = nxt_router_app_find(&router->apps, &name);
1570 
1571             if (prev != NULL && nxt_strstr_eq(&app->conf, &prev->conf)) {
1572                 nxt_mp_destroy(app_mp);
1573 
1574                 nxt_queue_remove(&prev->link);
1575                 nxt_queue_insert_tail(&tmcf->previous, &prev->link);
1576 
1577                 ret = nxt_router_apps_hash_add(rtcf, prev);
1578                 if (nxt_slow_path(ret != NXT_OK)) {
1579                     goto fail;
1580                 }
1581 
1582                 continue;
1583             }
1584 
1585             apcf.processes = 1;
1586             apcf.max_processes = 1;
1587             apcf.spare_processes = 0;
1588             apcf.timeout = 0;
1589             apcf.idle_timeout = 15000;
1590             apcf.limits_value = NULL;
1591             apcf.processes_value = NULL;
1592             apcf.targets_value = NULL;
1593 
1594             app_joint = nxt_malloc(sizeof(nxt_app_joint_t));
1595             if (nxt_slow_path(app_joint == NULL)) {
1596                 goto app_fail;
1597             }
1598 
1599             nxt_memzero(app_joint, sizeof(nxt_app_joint_t));
1600 
1601             ret = nxt_conf_map_object(mp, application, nxt_router_app_conf,
1602                                       nxt_nitems(nxt_router_app_conf), &apcf);
1603             if (ret != NXT_OK) {
1604                 nxt_alert(task, "application map error");
1605                 goto app_fail;
1606             }
1607 
1608             if (apcf.limits_value != NULL) {
1609 
1610                 if (nxt_conf_type(apcf.limits_value) != NXT_CONF_OBJECT) {
1611                     nxt_alert(task, "application limits is not object");
1612                     goto app_fail;
1613                 }
1614 
1615                 ret = nxt_conf_map_object(mp, apcf.limits_value,
1616                                         nxt_router_app_limits_conf,
1617                                         nxt_nitems(nxt_router_app_limits_conf),
1618                                         &apcf);
1619                 if (ret != NXT_OK) {
1620                     nxt_alert(task, "application limits map error");
1621                     goto app_fail;
1622                 }
1623             }
1624 
1625             if (apcf.processes_value != NULL
1626                 && nxt_conf_type(apcf.processes_value) == NXT_CONF_OBJECT)
1627             {
1628                 ret = nxt_conf_map_object(mp, apcf.processes_value,
1629                                      nxt_router_app_processes_conf,
1630                                      nxt_nitems(nxt_router_app_processes_conf),
1631                                      &apcf);
1632                 if (ret != NXT_OK) {
1633                     nxt_alert(task, "application processes map error");
1634                     goto app_fail;
1635                 }
1636 
1637             } else {
1638                 apcf.max_processes = apcf.processes;
1639                 apcf.spare_processes = apcf.processes;
1640             }
1641 
1642             if (apcf.targets_value != NULL) {
1643                 n = nxt_conf_object_members_count(apcf.targets_value);
1644 
1645                 targets = nxt_mp_get(app_mp, sizeof(nxt_str_t) * n);
1646                 if (nxt_slow_path(targets == NULL)) {
1647                     goto app_fail;
1648                 }
1649 
1650                 next_target = 0;
1651 
1652                 for (i = 0; i < n; i++) {
1653                     (void) nxt_conf_next_object_member(apcf.targets_value,
1654                                                        &target, &next_target);
1655 
1656                     s = nxt_str_dup(app_mp, &targets[i], &target);
1657                     if (nxt_slow_path(s == NULL)) {
1658                         goto app_fail;
1659                     }
1660                 }
1661 
1662             } else {
1663                 targets = NULL;
1664             }
1665 
1666             nxt_debug(task, "application type: %V", &apcf.type);
1667             nxt_debug(task, "application processes: %D", apcf.processes);
1668             nxt_debug(task, "application request timeout: %M", apcf.timeout);
1669 
1670             lang = nxt_app_lang_module(task->thread->runtime, &apcf.type);
1671 
1672             if (lang == NULL) {
1673                 nxt_alert(task, "unknown application type: \"%V\"", &apcf.type);
1674                 goto app_fail;
1675             }
1676 
1677             nxt_debug(task, "application language module: \"%s\"", lang->file);
1678 
1679             ret = nxt_thread_mutex_create(&app->mutex);
1680             if (ret != NXT_OK) {
1681                 goto app_fail;
1682             }
1683 
1684             nxt_queue_init(&app->ports);
1685             nxt_queue_init(&app->spare_ports);
1686             nxt_queue_init(&app->idle_ports);
1687             nxt_queue_init(&app->ack_waiting_req);
1688 
1689             app->name.length = name.length;
1690             nxt_memcpy(app->name.start, name.start, name.length);
1691 
1692             app->type = lang->type;
1693             app->max_processes = apcf.max_processes;
1694             app->spare_processes = apcf.spare_processes;
1695             app->max_pending_processes = apcf.spare_processes
1696                                          ? apcf.spare_processes : 1;
1697             app->timeout = apcf.timeout;
1698             app->idle_timeout = apcf.idle_timeout;
1699 
1700             app->targets = targets;
1701 
1702             engine = task->thread->engine;
1703 
1704             app->engine = engine;
1705 
1706             app->adjust_idle_work.handler = nxt_router_adjust_idle_timer;
1707             app->adjust_idle_work.task = &engine->task;
1708             app->adjust_idle_work.obj = app;
1709 
1710             nxt_queue_insert_tail(&tmcf->apps, &app->link);
1711 
1712             ret = nxt_router_apps_hash_add(rtcf, app);
1713             if (nxt_slow_path(ret != NXT_OK)) {
1714                 goto app_fail;
1715             }
1716 
1717             nxt_router_app_use(task, app, 1);
1718 
1719             app->joint = app_joint;
1720 
1721             app_joint->use_count = 1;
1722             app_joint->app = app;
1723 
1724             app_joint->idle_timer.bias = NXT_TIMER_DEFAULT_BIAS;
1725             app_joint->idle_timer.work_queue = &engine->fast_work_queue;
1726             app_joint->idle_timer.handler = nxt_router_app_idle_timeout;
1727             app_joint->idle_timer.task = &engine->task;
1728             app_joint->idle_timer.log = app_joint->idle_timer.task->log;
1729 
1730             app_joint->free_app_work.handler = nxt_router_free_app;
1731             app_joint->free_app_work.task = &engine->task;
1732             app_joint->free_app_work.obj = app_joint;
1733 
1734             port = nxt_port_new(task, NXT_SHARED_PORT_ID, nxt_pid,
1735                                 NXT_PROCESS_APP);
1736             if (nxt_slow_path(port == NULL)) {
1737                 return NXT_ERROR;
1738             }
1739 
1740             ret = nxt_port_socket_init(task, port, 0);
1741             if (nxt_slow_path(ret != NXT_OK)) {
1742                 nxt_port_use(task, port, -1);
1743                 return NXT_ERROR;
1744             }
1745 
1746             ret = nxt_router_app_queue_init(task, port);
1747             if (nxt_slow_path(ret != NXT_OK)) {
1748                 nxt_port_write_close(port);
1749                 nxt_port_read_close(port);
1750                 nxt_port_use(task, port, -1);
1751                 return NXT_ERROR;
1752             }
1753 
1754             nxt_port_write_enable(task, port);
1755             port->app = app;
1756 
1757             app->shared_port = port;
1758 
1759             nxt_thread_mutex_create(&app->outgoing.mutex);
1760         }
1761     }
1762 
1763     conf = nxt_conf_get_path(root, &routes_path);
1764     if (nxt_fast_path(conf != NULL)) {
1765         routes = nxt_http_routes_create(task, tmcf, conf);
1766         if (nxt_slow_path(routes == NULL)) {
1767             return NXT_ERROR;
1768         }
1769 
1770         rtcf->routes = routes;
1771     }
1772 
1773     ret = nxt_upstreams_create(task, tmcf, root);
1774     if (nxt_slow_path(ret != NXT_OK)) {
1775         return ret;
1776     }
1777 
1778     http = nxt_conf_get_path(root, &http_path);
1779 #if 0
1780     if (http == NULL) {
1781         nxt_alert(task, "no \"http\" block");
1782         return NXT_ERROR;
1783     }
1784 #endif
1785 
1786     websocket = nxt_conf_get_path(root, &websocket_path);
1787 
1788     listeners = nxt_conf_get_path(root, &listeners_path);
1789 
1790     if (listeners != NULL) {
1791         next = 0;
1792 
1793         for ( ;; ) {
1794             listener = nxt_conf_next_object_member(listeners, &name, &next);
1795             if (listener == NULL) {
1796                 break;
1797             }
1798 
1799             skcf = nxt_router_socket_conf(task, tmcf, &name);
1800             if (skcf == NULL) {
1801                 goto fail;
1802             }
1803 
1804             nxt_memzero(&lscf, sizeof(lscf));
1805 
1806             ret = nxt_conf_map_object(mp, listener, nxt_router_listener_conf,
1807                                       nxt_nitems(nxt_router_listener_conf),
1808                                       &lscf);
1809             if (ret != NXT_OK) {
1810                 nxt_alert(task, "listener map error");
1811                 goto fail;
1812             }
1813 
1814             nxt_debug(task, "application: %V", &lscf.application);
1815 
1816             // STUB, default values if http block is not defined.
1817             skcf->header_buffer_size = 2048;
1818             skcf->large_header_buffer_size = 8192;
1819             skcf->large_header_buffers = 4;
1820             skcf->discard_unsafe_fields = 1;
1821             skcf->body_buffer_size = 16 * 1024;
1822             skcf->max_body_size = 8 * 1024 * 1024;
1823             skcf->proxy_header_buffer_size = 64 * 1024;
1824             skcf->proxy_buffer_size = 4096;
1825             skcf->proxy_buffers = 256;
1826             skcf->idle_timeout = 180 * 1000;
1827             skcf->header_read_timeout = 30 * 1000;
1828             skcf->body_read_timeout = 30 * 1000;
1829             skcf->send_timeout = 30 * 1000;
1830             skcf->proxy_timeout = 60 * 1000;
1831             skcf->proxy_send_timeout = 30 * 1000;
1832             skcf->proxy_read_timeout = 30 * 1000;
1833 
1834             skcf->websocket_conf.max_frame_size = 1024 * 1024;
1835             skcf->websocket_conf.read_timeout = 60 * 1000;
1836             skcf->websocket_conf.keepalive_interval = 30 * 1000;
1837 
1838             nxt_str_null(&skcf->body_temp_path);
1839 
1840             if (http != NULL) {
1841                 ret = nxt_conf_map_object(mp, http, nxt_router_http_conf,
1842                                           nxt_nitems(nxt_router_http_conf),
1843                                           skcf);
1844                 if (ret != NXT_OK) {
1845                     nxt_alert(task, "http map error");
1846                     goto fail;
1847                 }
1848             }
1849 
1850             if (websocket != NULL) {
1851                 ret = nxt_conf_map_object(mp, websocket,
1852                                           nxt_router_websocket_conf,
1853                                           nxt_nitems(nxt_router_websocket_conf),
1854                                           &skcf->websocket_conf);
1855                 if (ret != NXT_OK) {
1856                     nxt_alert(task, "websocket map error");
1857                     goto fail;
1858                 }
1859             }
1860 
1861             t = &skcf->body_temp_path;
1862 
1863             if (t->length == 0) {
1864                 t->start = (u_char *) task->thread->runtime->tmp;
1865                 t->length = nxt_strlen(t->start);
1866             }
1867 
1868             conf = nxt_conf_get_path(listener, &forwarded_path);
1869 
1870             if (conf != NULL) {
1871                 skcf->forwarded = nxt_router_conf_forward(task, mp, conf);
1872                 if (nxt_slow_path(skcf->forwarded == NULL)) {
1873                     return NXT_ERROR;
1874                 }
1875             }
1876 
1877             conf = nxt_conf_get_path(listener, &client_ip_path);
1878 
1879             if (conf != NULL) {
1880                 skcf->client_ip = nxt_router_conf_forward(task, mp, conf);
1881                 if (nxt_slow_path(skcf->client_ip == NULL)) {
1882                     return NXT_ERROR;
1883                 }
1884             }
1885 
1886 #if (NXT_TLS)
1887             certificate = nxt_conf_get_path(listener, &certificate_path);
1888 
1889             if (certificate != NULL) {
1890                 tls_init = nxt_mp_get(tmcf->mem_pool, sizeof(nxt_tls_init_t));
1891                 if (nxt_slow_path(tls_init == NULL)) {
1892                     return NXT_ERROR;
1893                 }
1894 
1895                 tls_init->cache_size = 0;
1896                 tls_init->timeout = 300;
1897 
1898                 value = nxt_conf_get_path(listener, &conf_cache_path);
1899                 if (value != NULL) {
1900                     tls_init->cache_size = nxt_conf_get_number(value);
1901                 }
1902 
1903                 value = nxt_conf_get_path(listener, &conf_timeout_path);
1904                 if (value != NULL) {
1905                     tls_init->timeout = nxt_conf_get_number(value);
1906                 }
1907 
1908                 tls_init->conf_cmds = nxt_conf_get_path(listener,
1909                                                         &conf_commands_path);
1910 
1911                 tls_init->tickets_conf = nxt_conf_get_path(listener,
1912                                                            &conf_tickets);
1913 
1914                 n = nxt_conf_array_elements_count_or_1(certificate);
1915 
1916                 for (i = 0; i < n; i++) {
1917                     value = nxt_conf_get_array_element_or_itself(certificate,
1918                                                                  i);
1919                     nxt_assert(value != NULL);
1920 
1921                     ret = nxt_router_conf_tls_insert(tmcf, value, skcf,
1922                                                      tls_init, i == 0);
1923                     if (nxt_slow_path(ret != NXT_OK)) {
1924                         goto fail;
1925                     }
1926                 }
1927             }
1928 #endif
1929 
1930             skcf->listen->handler = nxt_http_conn_init;
1931             skcf->router_conf = rtcf;
1932             skcf->router_conf->count++;
1933 
1934             if (lscf.pass.length != 0) {
1935                 skcf->action = nxt_http_action_create(task, tmcf, &lscf.pass);
1936 
1937             /* COMPATIBILITY: listener application. */
1938             } else if (lscf.application.length > 0) {
1939                 skcf->action = nxt_http_pass_application(task, rtcf,
1940                                                          &lscf.application);
1941             }
1942 
1943             if (nxt_slow_path(skcf->action == NULL)) {
1944                 goto fail;
1945             }
1946         }
1947     }
1948 
1949     ret = nxt_http_routes_resolve(task, tmcf);
1950     if (nxt_slow_path(ret != NXT_OK)) {
1951         goto fail;
1952     }
1953 
1954     value = nxt_conf_get_path(root, &access_log_path);
1955 
1956     if (value != NULL) {
1957         ret = nxt_router_access_log_create(task, rtcf, value);
1958         if (nxt_slow_path(ret != NXT_OK)) {
1959             goto fail;
1960         }
1961     }
1962 
1963     nxt_queue_add(&deleting_sockets, &router->sockets);
1964     nxt_queue_init(&router->sockets);
1965 
1966     return NXT_OK;
1967 
1968 app_fail:
1969 
1970     nxt_mp_destroy(app_mp);
1971 
1972 fail:
1973 
1974     nxt_queue_each(app, &tmcf->apps, nxt_app_t, link) {
1975 
1976         nxt_queue_remove(&app->link);
1977         nxt_thread_mutex_destroy(&app->mutex);
1978         nxt_mp_destroy(app->mem_pool);
1979 
1980     } nxt_queue_loop;
1981 
1982     return NXT_ERROR;
1983 }
1984 
1985 
1986 #if (NXT_TLS)
1987 
1988 static nxt_int_t
nxt_router_conf_tls_insert(nxt_router_temp_conf_t * tmcf,nxt_conf_value_t * value,nxt_socket_conf_t * skcf,nxt_tls_init_t * tls_init,nxt_bool_t last)1989 nxt_router_conf_tls_insert(nxt_router_temp_conf_t *tmcf,
1990     nxt_conf_value_t *value, nxt_socket_conf_t *skcf,
1991     nxt_tls_init_t *tls_init, nxt_bool_t last)
1992 {
1993     nxt_router_tlssock_t  *tls;
1994 
1995     tls = nxt_mp_get(tmcf->mem_pool, sizeof(nxt_router_tlssock_t));
1996     if (nxt_slow_path(tls == NULL)) {
1997         return NXT_ERROR;
1998     }
1999 
2000     tls->tls_init = tls_init;
2001     tls->socket_conf = skcf;
2002     tls->temp_conf = tmcf;
2003     tls->last = last;
2004     nxt_conf_get_string(value, &tls->name);
2005 
2006     nxt_queue_insert_tail(&tmcf->tls, &tls->link);
2007 
2008     return NXT_OK;
2009 }
2010 
2011 #endif
2012 
2013 
2014 static nxt_int_t
nxt_router_conf_process_static(nxt_task_t * task,nxt_router_conf_t * rtcf,nxt_conf_value_t * conf)2015 nxt_router_conf_process_static(nxt_task_t *task, nxt_router_conf_t *rtcf,
2016     nxt_conf_value_t *conf)
2017 {
2018     uint32_t          next, i;
2019     nxt_mp_t          *mp;
2020     nxt_str_t         *type, exten, str;
2021     nxt_int_t         ret;
2022     nxt_uint_t        exts;
2023     nxt_conf_value_t  *mtypes_conf, *ext_conf, *value;
2024 
2025     static nxt_str_t  mtypes_path = nxt_string("/mime_types");
2026 
2027     mp = rtcf->mem_pool;
2028 
2029     ret = nxt_http_static_mtypes_init(mp, &rtcf->mtypes_hash);
2030     if (nxt_slow_path(ret != NXT_OK)) {
2031         return NXT_ERROR;
2032     }
2033 
2034     if (conf == NULL) {
2035         return NXT_OK;
2036     }
2037 
2038     mtypes_conf = nxt_conf_get_path(conf, &mtypes_path);
2039 
2040     if (mtypes_conf != NULL) {
2041         next = 0;
2042 
2043         for ( ;; ) {
2044             ext_conf = nxt_conf_next_object_member(mtypes_conf, &str, &next);
2045 
2046             if (ext_conf == NULL) {
2047                 break;
2048             }
2049 
2050             type = nxt_str_dup(mp, NULL, &str);
2051             if (nxt_slow_path(type == NULL)) {
2052                 return NXT_ERROR;
2053             }
2054 
2055             if (nxt_conf_type(ext_conf) == NXT_CONF_STRING) {
2056                 nxt_conf_get_string(ext_conf, &str);
2057 
2058                 if (nxt_slow_path(nxt_str_dup(mp, &exten, &str) == NULL)) {
2059                     return NXT_ERROR;
2060                 }
2061 
2062                 ret = nxt_http_static_mtypes_hash_add(mp, &rtcf->mtypes_hash,
2063                                                       &exten, type);
2064                 if (nxt_slow_path(ret != NXT_OK)) {
2065                     return NXT_ERROR;
2066                 }
2067 
2068                 continue;
2069             }
2070 
2071             exts = nxt_conf_array_elements_count(ext_conf);
2072 
2073             for (i = 0; i < exts; i++) {
2074                 value = nxt_conf_get_array_element(ext_conf, i);
2075 
2076                 nxt_conf_get_string(value, &str);
2077 
2078                 if (nxt_slow_path(nxt_str_dup(mp, &exten, &str) == NULL)) {
2079                     return NXT_ERROR;
2080                 }
2081 
2082                 ret = nxt_http_static_mtypes_hash_add(mp, &rtcf->mtypes_hash,
2083                                                       &exten, type);
2084                 if (nxt_slow_path(ret != NXT_OK)) {
2085                     return NXT_ERROR;
2086                 }
2087             }
2088         }
2089     }
2090 
2091     return NXT_OK;
2092 }
2093 
2094 
2095 static nxt_http_forward_t *
nxt_router_conf_forward(nxt_task_t * task,nxt_mp_t * mp,nxt_conf_value_t * conf)2096 nxt_router_conf_forward(nxt_task_t *task, nxt_mp_t *mp, nxt_conf_value_t *conf)
2097 {
2098     nxt_int_t                   ret;
2099     nxt_conf_value_t            *header_conf, *client_ip_conf, *protocol_conf;
2100     nxt_conf_value_t            *source_conf, *recursive_conf;
2101     nxt_http_forward_t          *forward;
2102     nxt_http_route_addr_rule_t  *source;
2103 
2104     static nxt_str_t  header_path = nxt_string("/header");
2105     static nxt_str_t  client_ip_path = nxt_string("/client_ip");
2106     static nxt_str_t  protocol_path = nxt_string("/protocol");
2107     static nxt_str_t  source_path = nxt_string("/source");
2108     static nxt_str_t  recursive_path = nxt_string("/recursive");
2109 
2110     header_conf = nxt_conf_get_path(conf, &header_path);
2111 
2112     if (header_conf != NULL) {
2113         client_ip_conf = nxt_conf_get_path(conf, &header_path);
2114         protocol_conf = NULL;
2115 
2116     } else {
2117         client_ip_conf = nxt_conf_get_path(conf, &client_ip_path);
2118         protocol_conf = nxt_conf_get_path(conf, &protocol_path);
2119     }
2120 
2121     source_conf = nxt_conf_get_path(conf, &source_path);
2122     recursive_conf = nxt_conf_get_path(conf, &recursive_path);
2123 
2124     if (source_conf == NULL
2125         || (protocol_conf == NULL && client_ip_conf == NULL))
2126     {
2127         return NULL;
2128     }
2129 
2130     forward = nxt_mp_zget(mp, sizeof(nxt_http_forward_t));
2131     if (nxt_slow_path(forward == NULL)) {
2132         return NULL;
2133     }
2134 
2135     source = nxt_http_route_addr_rule_create(task, mp, source_conf);
2136     if (nxt_slow_path(source == NULL)) {
2137         return NULL;
2138     }
2139 
2140     forward->source = source;
2141 
2142     if (recursive_conf != NULL) {
2143         forward->recursive = nxt_conf_get_boolean(recursive_conf);
2144     }
2145 
2146     if (client_ip_conf != NULL) {
2147         ret = nxt_router_conf_forward_header(mp, client_ip_conf,
2148                                              &forward->client_ip);
2149         if (nxt_slow_path(ret != NXT_OK)) {
2150             return NULL;
2151         }
2152     }
2153 
2154     if (protocol_conf != NULL) {
2155         ret = nxt_router_conf_forward_header(mp, protocol_conf,
2156                                              &forward->protocol);
2157         if (nxt_slow_path(ret != NXT_OK)) {
2158             return NULL;
2159         }
2160     }
2161 
2162     return forward;
2163 }
2164 
2165 
2166 static nxt_int_t
nxt_router_conf_forward_header(nxt_mp_t * mp,nxt_conf_value_t * conf,nxt_http_forward_header_t * fh)2167 nxt_router_conf_forward_header(nxt_mp_t *mp, nxt_conf_value_t *conf,
2168     nxt_http_forward_header_t *fh)
2169 {
2170     char       c;
2171     size_t     i;
2172     uint32_t   hash;
2173     nxt_str_t  header;
2174 
2175     nxt_conf_get_string(conf, &header);
2176 
2177     fh->header = nxt_str_dup(mp, NULL, &header);
2178     if (nxt_slow_path(fh->header == NULL)) {
2179         return NXT_ERROR;
2180     }
2181 
2182     hash = NXT_HTTP_FIELD_HASH_INIT;
2183 
2184     for (i = 0; i < fh->header->length; i++) {
2185         c = fh->header->start[i];
2186         hash = nxt_http_field_hash_char(hash, nxt_lowcase(c));
2187     }
2188 
2189     hash = nxt_http_field_hash_end(hash) & 0xFFFF;
2190 
2191     fh->header_hash = hash;
2192 
2193     return NXT_OK;
2194 }
2195 
2196 
2197 static nxt_app_t *
nxt_router_app_find(nxt_queue_t * queue,nxt_str_t * name)2198 nxt_router_app_find(nxt_queue_t *queue, nxt_str_t *name)
2199 {
2200     nxt_app_t  *app;
2201 
2202     nxt_queue_each(app, queue, nxt_app_t, link) {
2203 
2204         if (nxt_strstr_eq(name, &app->name)) {
2205             return app;
2206         }
2207 
2208     } nxt_queue_loop;
2209 
2210     return NULL;
2211 }
2212 
2213 
2214 static nxt_int_t
nxt_router_app_queue_init(nxt_task_t * task,nxt_port_t * port)2215 nxt_router_app_queue_init(nxt_task_t *task, nxt_port_t *port)
2216 {
2217     void       *mem;
2218     nxt_int_t  fd;
2219 
2220     fd = nxt_shm_open(task, sizeof(nxt_app_queue_t));
2221     if (nxt_slow_path(fd == -1)) {
2222         return NXT_ERROR;
2223     }
2224 
2225     mem = nxt_mem_mmap(NULL, sizeof(nxt_app_queue_t),
2226                        PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
2227     if (nxt_slow_path(mem == MAP_FAILED)) {
2228         nxt_fd_close(fd);
2229 
2230         return NXT_ERROR;
2231     }
2232 
2233     nxt_app_queue_init(mem);
2234 
2235     port->queue_fd = fd;
2236     port->queue = mem;
2237 
2238     return NXT_OK;
2239 }
2240 
2241 
2242 static nxt_int_t
nxt_router_port_queue_init(nxt_task_t * task,nxt_port_t * port)2243 nxt_router_port_queue_init(nxt_task_t *task, nxt_port_t *port)
2244 {
2245     void       *mem;
2246     nxt_int_t  fd;
2247 
2248     fd = nxt_shm_open(task, sizeof(nxt_port_queue_t));
2249     if (nxt_slow_path(fd == -1)) {
2250         return NXT_ERROR;
2251     }
2252 
2253     mem = nxt_mem_mmap(NULL, sizeof(nxt_port_queue_t),
2254                        PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
2255     if (nxt_slow_path(mem == MAP_FAILED)) {
2256         nxt_fd_close(fd);
2257 
2258         return NXT_ERROR;
2259     }
2260 
2261     nxt_port_queue_init(mem);
2262 
2263     port->queue_fd = fd;
2264     port->queue = mem;
2265 
2266     return NXT_OK;
2267 }
2268 
2269 
2270 static nxt_int_t
nxt_router_port_queue_map(nxt_task_t * task,nxt_port_t * port,nxt_fd_t fd)2271 nxt_router_port_queue_map(nxt_task_t *task, nxt_port_t *port, nxt_fd_t fd)
2272 {
2273     void  *mem;
2274 
2275     nxt_assert(fd != -1);
2276 
2277     mem = nxt_mem_mmap(NULL, sizeof(nxt_port_queue_t),
2278                        PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
2279     if (nxt_slow_path(mem == MAP_FAILED)) {
2280 
2281         return NXT_ERROR;
2282     }
2283 
2284     port->queue = mem;
2285 
2286     return NXT_OK;
2287 }
2288 
2289 
2290 static const nxt_lvlhsh_proto_t  nxt_router_apps_hash_proto  nxt_aligned(64) = {
2291     NXT_LVLHSH_DEFAULT,
2292     nxt_router_apps_hash_test,
2293     nxt_mp_lvlhsh_alloc,
2294     nxt_mp_lvlhsh_free,
2295 };
2296 
2297 
2298 static nxt_int_t
nxt_router_apps_hash_test(nxt_lvlhsh_query_t * lhq,void * data)2299 nxt_router_apps_hash_test(nxt_lvlhsh_query_t *lhq, void *data)
2300 {
2301     nxt_app_t  *app;
2302 
2303     app = data;
2304 
2305     return nxt_strstr_eq(&lhq->key, &app->name) ? NXT_OK : NXT_DECLINED;
2306 }
2307 
2308 
2309 static nxt_int_t
nxt_router_apps_hash_add(nxt_router_conf_t * rtcf,nxt_app_t * app)2310 nxt_router_apps_hash_add(nxt_router_conf_t *rtcf, nxt_app_t *app)
2311 {
2312     nxt_lvlhsh_query_t  lhq;
2313 
2314     lhq.key_hash = nxt_djb_hash(app->name.start, app->name.length);
2315     lhq.replace = 0;
2316     lhq.key = app->name;
2317     lhq.value = app;
2318     lhq.proto = &nxt_router_apps_hash_proto;
2319     lhq.pool = rtcf->mem_pool;
2320 
2321     switch (nxt_lvlhsh_insert(&rtcf->apps_hash, &lhq)) {
2322 
2323     case NXT_OK:
2324         return NXT_OK;
2325 
2326     case NXT_DECLINED:
2327         nxt_thread_log_alert("router app hash adding failed: "
2328                              "\"%V\" is already in hash", &lhq.key);
2329         /* Fall through. */
2330     default:
2331         return NXT_ERROR;
2332     }
2333 }
2334 
2335 
2336 static nxt_app_t *
nxt_router_apps_hash_get(nxt_router_conf_t * rtcf,nxt_str_t * name)2337 nxt_router_apps_hash_get(nxt_router_conf_t *rtcf, nxt_str_t *name)
2338 {
2339     nxt_lvlhsh_query_t  lhq;
2340 
2341     lhq.key_hash = nxt_djb_hash(name->start, name->length);
2342     lhq.key = *name;
2343     lhq.proto = &nxt_router_apps_hash_proto;
2344 
2345     if (nxt_lvlhsh_find(&rtcf->apps_hash, &lhq) != NXT_OK) {
2346         return NULL;
2347     }
2348 
2349     return lhq.value;
2350 }
2351 
2352 
2353 static void
nxt_router_apps_hash_use(nxt_task_t * task,nxt_router_conf_t * rtcf,int i)2354 nxt_router_apps_hash_use(nxt_task_t *task, nxt_router_conf_t *rtcf, int i)
2355 {
2356     nxt_app_t          *app;
2357     nxt_lvlhsh_each_t  lhe;
2358 
2359     nxt_lvlhsh_each_init(&lhe, &nxt_router_apps_hash_proto);
2360 
2361     for ( ;; ) {
2362         app = nxt_lvlhsh_each(&rtcf->apps_hash, &lhe);
2363 
2364         if (app == NULL) {
2365             break;
2366         }
2367 
2368         nxt_router_app_use(task, app, i);
2369     }
2370 }
2371 
2372 
2373 typedef struct {
2374     nxt_app_t  *app;
2375     nxt_int_t  target;
2376 } nxt_http_app_conf_t;
2377 
2378 
2379 nxt_int_t
nxt_router_application_init(nxt_router_conf_t * rtcf,nxt_str_t * name,nxt_str_t * target,nxt_http_action_t * action)2380 nxt_router_application_init(nxt_router_conf_t *rtcf, nxt_str_t *name,
2381     nxt_str_t *target, nxt_http_action_t *action)
2382 {
2383     nxt_app_t            *app;
2384     nxt_str_t            *targets;
2385     nxt_uint_t           i;
2386     nxt_http_app_conf_t  *conf;
2387 
2388     app = nxt_router_apps_hash_get(rtcf, name);
2389     if (app == NULL) {
2390         return NXT_DECLINED;
2391     }
2392 
2393     conf = nxt_mp_get(rtcf->mem_pool, sizeof(nxt_http_app_conf_t));
2394     if (nxt_slow_path(conf == NULL)) {
2395         return NXT_ERROR;
2396     }
2397 
2398     action->handler = nxt_http_application_handler;
2399     action->u.conf = conf;
2400 
2401     conf->app = app;
2402 
2403     if (target != NULL && target->length != 0) {
2404         targets = app->targets;
2405 
2406         for (i = 0; !nxt_strstr_eq(target, &targets[i]); i++);
2407 
2408         conf->target = i;
2409 
2410     } else {
2411         conf->target = 0;
2412     }
2413 
2414     return NXT_OK;
2415 }
2416 
2417 
2418 static nxt_socket_conf_t *
nxt_router_socket_conf(nxt_task_t * task,nxt_router_temp_conf_t * tmcf,nxt_str_t * name)2419 nxt_router_socket_conf(nxt_task_t *task, nxt_router_temp_conf_t *tmcf,
2420     nxt_str_t *name)
2421 {
2422     size_t               size;
2423     nxt_int_t            ret;
2424     nxt_bool_t           wildcard;
2425     nxt_sockaddr_t       *sa;
2426     nxt_socket_conf_t    *skcf;
2427     nxt_listen_socket_t  *ls;
2428 
2429     sa = nxt_sockaddr_parse(tmcf->mem_pool, name);
2430     if (nxt_slow_path(sa == NULL)) {
2431         nxt_alert(task, "invalid listener \"%V\"", name);
2432         return NULL;
2433     }
2434 
2435     sa->type = SOCK_STREAM;
2436 
2437     nxt_debug(task, "router listener: \"%*s\"",
2438               (size_t) sa->length, nxt_sockaddr_start(sa));
2439 
2440     skcf = nxt_mp_zget(tmcf->router_conf->mem_pool, sizeof(nxt_socket_conf_t));
2441     if (nxt_slow_path(skcf == NULL)) {
2442         return NULL;
2443     }
2444 
2445     size = nxt_sockaddr_size(sa);
2446 
2447     ret = nxt_router_listen_socket_find(tmcf, skcf, sa);
2448 
2449     if (ret != NXT_OK) {
2450 
2451         ls = nxt_zalloc(sizeof(nxt_listen_socket_t) + size);
2452         if (nxt_slow_path(ls == NULL)) {
2453             return NULL;
2454         }
2455 
2456         skcf->listen = ls;
2457 
2458         ls->sockaddr = nxt_pointer_to(ls, sizeof(nxt_listen_socket_t));
2459         nxt_memcpy(ls->sockaddr, sa, size);
2460 
2461         nxt_listen_socket_remote_size(ls);
2462 
2463         ls->socket = -1;
2464         ls->backlog = NXT_LISTEN_BACKLOG;
2465         ls->flags = NXT_NONBLOCK;
2466         ls->read_after_accept = 1;
2467     }
2468 
2469     switch (sa->u.sockaddr.sa_family) {
2470 #if (NXT_HAVE_UNIX_DOMAIN)
2471     case AF_UNIX:
2472         wildcard = 0;
2473         break;
2474 #endif
2475 #if (NXT_INET6)
2476     case AF_INET6:
2477         wildcard = IN6_IS_ADDR_UNSPECIFIED(&sa->u.sockaddr_in6.sin6_addr);
2478         break;
2479 #endif
2480     case AF_INET:
2481     default:
2482         wildcard = (sa->u.sockaddr_in.sin_addr.s_addr == INADDR_ANY);
2483         break;
2484     }
2485 
2486     if (!wildcard) {
2487         skcf->sockaddr = nxt_mp_zget(tmcf->router_conf->mem_pool, size);
2488         if (nxt_slow_path(skcf->sockaddr == NULL)) {
2489             return NULL;
2490         }
2491 
2492         nxt_memcpy(skcf->sockaddr, sa, size);
2493     }
2494 
2495     return skcf;
2496 }
2497 
2498 
2499 static nxt_int_t
nxt_router_listen_socket_find(nxt_router_temp_conf_t * tmcf,nxt_socket_conf_t * nskcf,nxt_sockaddr_t * sa)2500 nxt_router_listen_socket_find(nxt_router_temp_conf_t *tmcf,
2501     nxt_socket_conf_t *nskcf, nxt_sockaddr_t *sa)
2502 {
2503     nxt_router_t       *router;
2504     nxt_queue_link_t   *qlk;
2505     nxt_socket_conf_t  *skcf;
2506 
2507     router = tmcf->router_conf->router;
2508 
2509     for (qlk = nxt_queue_first(&router->sockets);
2510          qlk != nxt_queue_tail(&router->sockets);
2511          qlk = nxt_queue_next(qlk))
2512     {
2513         skcf = nxt_queue_link_data(qlk, nxt_socket_conf_t, link);
2514 
2515         if (nxt_sockaddr_cmp(skcf->listen->sockaddr, sa)) {
2516             nskcf->listen = skcf->listen;
2517 
2518             nxt_queue_remove(qlk);
2519             nxt_queue_insert_tail(&keeping_sockets, qlk);
2520 
2521             nxt_queue_insert_tail(&updating_sockets, &nskcf->link);
2522 
2523             return NXT_OK;
2524         }
2525     }
2526 
2527     nxt_queue_insert_tail(&pending_sockets, &nskcf->link);
2528 
2529     return NXT_DECLINED;
2530 }
2531 
2532 
2533 static void
nxt_router_listen_socket_rpc_create(nxt_task_t * task,nxt_router_temp_conf_t * tmcf,nxt_socket_conf_t * skcf)2534 nxt_router_listen_socket_rpc_create(nxt_task_t *task,
2535     nxt_router_temp_conf_t *tmcf, nxt_socket_conf_t *skcf)
2536 {
2537     size_t            size;
2538     uint32_t          stream;
2539     nxt_int_t         ret;
2540     nxt_buf_t         *b;
2541     nxt_port_t        *main_port, *router_port;
2542     nxt_runtime_t     *rt;
2543     nxt_socket_rpc_t  *rpc;
2544 
2545     rpc = nxt_mp_alloc(tmcf->mem_pool, sizeof(nxt_socket_rpc_t));
2546     if (rpc == NULL) {
2547         goto fail;
2548     }
2549 
2550     rpc->socket_conf = skcf;
2551     rpc->temp_conf = tmcf;
2552 
2553     size = nxt_sockaddr_size(skcf->listen->sockaddr);
2554 
2555     b = nxt_buf_mem_alloc(tmcf->mem_pool, size, 0);
2556     if (b == NULL) {
2557         goto fail;
2558     }
2559 
2560     b->completion_handler = nxt_buf_dummy_completion;
2561 
2562     b->mem.free = nxt_cpymem(b->mem.free, skcf->listen->sockaddr, size);
2563 
2564     rt = task->thread->runtime;
2565     main_port = rt->port_by_type[NXT_PROCESS_MAIN];
2566     router_port = rt->port_by_type[NXT_PROCESS_ROUTER];
2567 
2568     stream = nxt_port_rpc_register_handler(task, router_port,
2569                                            nxt_router_listen_socket_ready,
2570                                            nxt_router_listen_socket_error,
2571                                            main_port->pid, rpc);
2572     if (nxt_slow_path(stream == 0)) {
2573         goto fail;
2574     }
2575 
2576     ret = nxt_port_socket_write(task, main_port, NXT_PORT_MSG_SOCKET, -1,
2577                                 stream, router_port->id, b);
2578 
2579     if (nxt_slow_path(ret != NXT_OK)) {
2580         nxt_port_rpc_cancel(task, router_port, stream);
2581         goto fail;
2582     }
2583 
2584     return;
2585 
2586 fail:
2587 
2588     nxt_router_conf_error(task, tmcf);
2589 }
2590 
2591 
2592 static void
nxt_router_listen_socket_ready(nxt_task_t * task,nxt_port_recv_msg_t * msg,void * data)2593 nxt_router_listen_socket_ready(nxt_task_t *task, nxt_port_recv_msg_t *msg,
2594     void *data)
2595 {
2596     nxt_int_t         ret;
2597     nxt_socket_t      s;
2598     nxt_socket_rpc_t  *rpc;
2599 
2600     rpc = data;
2601 
2602     s = msg->fd[0];
2603 
2604     ret = nxt_socket_nonblocking(task, s);
2605     if (nxt_slow_path(ret != NXT_OK)) {
2606         goto fail;
2607     }
2608 
2609     nxt_socket_defer_accept(task, s, rpc->socket_conf->listen->sockaddr);
2610 
2611     ret = nxt_listen_socket(task, s, NXT_LISTEN_BACKLOG);
2612     if (nxt_slow_path(ret != NXT_OK)) {
2613         goto fail;
2614     }
2615 
2616     rpc->socket_conf->listen->socket = s;
2617 
2618     nxt_work_queue_add(&task->thread->engine->fast_work_queue,
2619                        nxt_router_conf_apply, task, rpc->temp_conf, NULL);
2620 
2621     return;
2622 
2623 fail:
2624 
2625     nxt_socket_close(task, s);
2626 
2627     nxt_router_conf_error(task, rpc->temp_conf);
2628 }
2629 
2630 
2631 static void
nxt_router_listen_socket_error(nxt_task_t * task,nxt_port_recv_msg_t * msg,void * data)2632 nxt_router_listen_socket_error(nxt_task_t *task, nxt_port_recv_msg_t *msg,
2633     void *data)
2634 {
2635     nxt_socket_rpc_t        *rpc;
2636     nxt_router_temp_conf_t  *tmcf;
2637 
2638     rpc = data;
2639     tmcf = rpc->temp_conf;
2640 
2641 #if 0
2642     u_char                  *p;
2643     size_t                  size;
2644     uint8_t                 error;
2645     nxt_buf_t               *in, *out;
2646     nxt_sockaddr_t          *sa;
2647 
2648     static nxt_str_t  socket_errors[] = {
2649         nxt_string("ListenerSystem"),
2650         nxt_string("ListenerNoIPv6"),
2651         nxt_string("ListenerPort"),
2652         nxt_string("ListenerInUse"),
2653         nxt_string("ListenerNoAddress"),
2654         nxt_string("ListenerNoAccess"),
2655         nxt_string("ListenerPath"),
2656     };
2657 
2658     sa = rpc->socket_conf->listen->sockaddr;
2659 
2660     in = nxt_buf_chk_make_plain(tmcf->mem_pool, msg->buf, msg->size);
2661 
2662     if (nxt_slow_path(in == NULL)) {
2663         return;
2664     }
2665 
2666     p = in->mem.pos;
2667 
2668     error = *p++;
2669 
2670     size = nxt_length("listen socket error: ")
2671            + nxt_length("{listener: \"\", code:\"\", message: \"\"}")
2672            + sa->length + socket_errors[error].length + (in->mem.free - p);
2673 
2674     out = nxt_buf_mem_alloc(tmcf->mem_pool, size, 0);
2675     if (nxt_slow_path(out == NULL)) {
2676         return;
2677     }
2678 
2679     out->mem.free = nxt_sprintf(out->mem.free, out->mem.end,
2680                         "listen socket error: "
2681                         "{listener: \"%*s\", code:\"%V\", message: \"%*s\"}",
2682                         (size_t) sa->length, nxt_sockaddr_start(sa),
2683                         &socket_errors[error], in->mem.free - p, p);
2684 
2685     nxt_debug(task, "%*s", out->mem.free - out->mem.pos, out->mem.pos);
2686 #endif
2687 
2688     nxt_router_conf_error(task, tmcf);
2689 }
2690 
2691 
2692 #if (NXT_TLS)
2693 
2694 static void
nxt_router_tls_rpc_handler(nxt_task_t * task,nxt_port_recv_msg_t * msg,void * data)2695 nxt_router_tls_rpc_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg,
2696     void *data)
2697 {
2698     nxt_mp_t                *mp;
2699     nxt_int_t               ret;
2700     nxt_tls_conf_t          *tlscf;
2701     nxt_router_tlssock_t    *tls;
2702     nxt_tls_bundle_conf_t   *bundle;
2703     nxt_router_temp_conf_t  *tmcf;
2704 
2705     nxt_debug(task, "tls rpc handler");
2706 
2707     tls = data;
2708     tmcf = tls->temp_conf;
2709 
2710     if (msg == NULL || msg->port_msg.type == _NXT_PORT_MSG_RPC_ERROR) {
2711         goto fail;
2712     }
2713 
2714     mp = tmcf->router_conf->mem_pool;
2715 
2716     if (tls->socket_conf->tls == NULL){
2717         tlscf = nxt_mp_zget(mp, sizeof(nxt_tls_conf_t));
2718         if (nxt_slow_path(tlscf == NULL)) {
2719             goto fail;
2720         }
2721 
2722         tlscf->no_wait_shutdown = 1;
2723         tls->socket_conf->tls = tlscf;
2724 
2725     } else {
2726         tlscf = tls->socket_conf->tls;
2727     }
2728 
2729     tls->tls_init->conf = tlscf;
2730 
2731     bundle = nxt_mp_get(mp, sizeof(nxt_tls_bundle_conf_t));
2732     if (nxt_slow_path(bundle == NULL)) {
2733         goto fail;
2734     }
2735 
2736     if (nxt_slow_path(nxt_str_dup(mp, &bundle->name, &tls->name) == NULL)) {
2737         goto fail;
2738     }
2739 
2740     bundle->chain_file = msg->fd[0];
2741     bundle->next = tlscf->bundle;
2742     tlscf->bundle = bundle;
2743 
2744     ret = task->thread->runtime->tls->server_init(task, mp, tls->tls_init,
2745                                                   tls->last);
2746     if (nxt_slow_path(ret != NXT_OK)) {
2747         goto fail;
2748     }
2749 
2750     nxt_work_queue_add(&task->thread->engine->fast_work_queue,
2751                        nxt_router_conf_apply, task, tmcf, NULL);
2752     return;
2753 
2754 fail:
2755 
2756     nxt_router_conf_error(task, tmcf);
2757 }
2758 
2759 #endif
2760 
2761 
2762 static void
nxt_router_app_rpc_create(nxt_task_t * task,nxt_router_temp_conf_t * tmcf,nxt_app_t * app)2763 nxt_router_app_rpc_create(nxt_task_t *task,
2764     nxt_router_temp_conf_t *tmcf, nxt_app_t *app)
2765 {
2766     size_t         size;
2767     uint32_t       stream;
2768     nxt_fd_t       port_fd, queue_fd;
2769     nxt_int_t      ret;
2770     nxt_buf_t      *b;
2771     nxt_port_t     *router_port, *dport;
2772     nxt_runtime_t  *rt;
2773     nxt_app_rpc_t  *rpc;
2774 
2775     rt = task->thread->runtime;
2776 
2777     dport = app->proto_port;
2778 
2779     if (dport == NULL) {
2780         nxt_debug(task, "app '%V' prototype prefork", &app->name);
2781 
2782         size = app->name.length + 1 + app->conf.length;
2783 
2784         b = nxt_buf_mem_alloc(tmcf->mem_pool, size, 0);
2785         if (nxt_slow_path(b == NULL)) {
2786             goto fail;
2787         }
2788 
2789         b->completion_handler = nxt_buf_dummy_completion;
2790 
2791         nxt_buf_cpystr(b, &app->name);
2792         *b->mem.free++ = '\0';
2793         nxt_buf_cpystr(b, &app->conf);
2794 
2795         dport = rt->port_by_type[NXT_PROCESS_MAIN];
2796 
2797         port_fd = app->shared_port->pair[0];
2798         queue_fd = app->shared_port->queue_fd;
2799 
2800     } else {
2801         nxt_debug(task, "app '%V' prefork", &app->name);
2802 
2803         b = NULL;
2804         port_fd = -1;
2805         queue_fd = -1;
2806     }
2807 
2808     router_port = rt->port_by_type[NXT_PROCESS_ROUTER];
2809 
2810     rpc = nxt_port_rpc_register_handler_ex(task, router_port,
2811                                            nxt_router_app_prefork_ready,
2812                                            nxt_router_app_prefork_error,
2813                                            sizeof(nxt_app_rpc_t));
2814     if (nxt_slow_path(rpc == NULL)) {
2815         goto fail;
2816     }
2817 
2818     rpc->app = app;
2819     rpc->temp_conf = tmcf;
2820     rpc->proto = (b != NULL);
2821 
2822     stream = nxt_port_rpc_ex_stream(rpc);
2823 
2824     ret = nxt_port_socket_write2(task, dport, NXT_PORT_MSG_START_PROCESS,
2825                                  port_fd, queue_fd, stream, router_port->id, b);
2826     if (nxt_slow_path(ret != NXT_OK)) {
2827         nxt_port_rpc_cancel(task, router_port, stream);
2828         goto fail;
2829     }
2830 
2831     if (b == NULL) {
2832         nxt_port_rpc_ex_set_peer(task, router_port, rpc, dport->pid);
2833 
2834         app->pending_processes++;
2835     }
2836 
2837     return;
2838 
2839 fail:
2840 
2841     nxt_router_conf_error(task, tmcf);
2842 }
2843 
2844 
2845 static void
nxt_router_app_prefork_ready(nxt_task_t * task,nxt_port_recv_msg_t * msg,void * data)2846 nxt_router_app_prefork_ready(nxt_task_t *task, nxt_port_recv_msg_t *msg,
2847     void *data)
2848 {
2849     nxt_app_t           *app;
2850     nxt_port_t          *port;
2851     nxt_app_rpc_t       *rpc;
2852     nxt_event_engine_t  *engine;
2853 
2854     rpc = data;
2855     app = rpc->app;
2856 
2857     port = msg->u.new_port;
2858 
2859     nxt_assert(port != NULL);
2860     nxt_assert(port->id == 0);
2861 
2862     if (rpc->proto) {
2863         nxt_assert(app->proto_port == NULL);
2864         nxt_assert(port->type == NXT_PROCESS_PROTOTYPE);
2865 
2866         nxt_port_inc_use(port);
2867 
2868         app->proto_port = port;
2869         port->app = app;
2870 
2871         nxt_router_app_rpc_create(task, rpc->temp_conf, app);
2872 
2873         return;
2874     }
2875 
2876     nxt_assert(port->type == NXT_PROCESS_APP);
2877 
2878     port->app = app;
2879     port->main_app_port = port;
2880 
2881     app->pending_processes--;
2882     app->processes++;
2883     app->idle_processes++;
2884 
2885     engine = task->thread->engine;
2886 
2887     nxt_queue_insert_tail(&app->ports, &port->app_link);
2888     nxt_queue_insert_tail(&app->spare_ports, &port->idle_link);
2889 
2890     nxt_debug(task, "app '%V' move new port %PI:%d to spare_ports",
2891               &app->name, port->pid, port->id);
2892 
2893     nxt_port_hash_add(&app->port_hash, port);
2894     app->port_hash_count++;
2895 
2896     port->idle_start = 0;
2897 
2898     nxt_port_inc_use(port);
2899 
2900     nxt_port_socket_write(task, port, NXT_PORT_MSG_PORT_ACK, -1, 0, 0, NULL);
2901 
2902     nxt_work_queue_add(&engine->fast_work_queue,
2903                        nxt_router_conf_apply, task, rpc->temp_conf, NULL);
2904 }
2905 
2906 
2907 static void
nxt_router_app_prefork_error(nxt_task_t * task,nxt_port_recv_msg_t * msg,void * data)2908 nxt_router_app_prefork_error(nxt_task_t *task, nxt_port_recv_msg_t *msg,
2909     void *data)
2910 {
2911     nxt_app_t               *app;
2912     nxt_app_rpc_t           *rpc;
2913     nxt_router_temp_conf_t  *tmcf;
2914 
2915     rpc = data;
2916     app = rpc->app;
2917     tmcf = rpc->temp_conf;
2918 
2919     if (rpc->proto) {
2920         nxt_log(task, NXT_LOG_WARN, "failed to start prototype \"%V\"",
2921                 &app->name);
2922 
2923     } else {
2924         nxt_log(task, NXT_LOG_WARN, "failed to start application \"%V\"",
2925                 &app->name);
2926 
2927         app->pending_processes--;
2928     }
2929 
2930     nxt_router_conf_error(task, tmcf);
2931 }
2932 
2933 
2934 static nxt_int_t
nxt_router_engines_create(nxt_task_t * task,nxt_router_t * router,nxt_router_temp_conf_t * tmcf,const nxt_event_interface_t * interface)2935 nxt_router_engines_create(nxt_task_t *task, nxt_router_t *router,
2936     nxt_router_temp_conf_t *tmcf, const nxt_event_interface_t *interface)
2937 {
2938     nxt_int_t                 ret;
2939     nxt_uint_t                n, threads;
2940     nxt_queue_link_t          *qlk;
2941     nxt_router_engine_conf_t  *recf;
2942 
2943     threads = tmcf->router_conf->threads;
2944 
2945     tmcf->engines = nxt_array_create(tmcf->mem_pool, threads,
2946                                      sizeof(nxt_router_engine_conf_t));
2947     if (nxt_slow_path(tmcf->engines == NULL)) {
2948         return NXT_ERROR;
2949     }
2950 
2951     n = 0;
2952 
2953     for (qlk = nxt_queue_first(&router->engines);
2954          qlk != nxt_queue_tail(&router->engines);
2955          qlk = nxt_queue_next(qlk))
2956     {
2957         recf = nxt_array_zero_add(tmcf->engines);
2958         if (nxt_slow_path(recf == NULL)) {
2959             return NXT_ERROR;
2960         }
2961 
2962         recf->engine = nxt_queue_link_data(qlk, nxt_event_engine_t, link0);
2963 
2964         if (n < threads) {
2965             recf->action = NXT_ROUTER_ENGINE_KEEP;
2966             ret = nxt_router_engine_conf_update(tmcf, recf);
2967 
2968         } else {
2969             recf->action = NXT_ROUTER_ENGINE_DELETE;
2970             ret = nxt_router_engine_conf_delete(tmcf, recf);
2971         }
2972 
2973         if (nxt_slow_path(ret != NXT_OK)) {
2974             return ret;
2975         }
2976 
2977         n++;
2978     }
2979 
2980     tmcf->new_threads = n;
2981 
2982     while (n < threads) {
2983         recf = nxt_array_zero_add(tmcf->engines);
2984         if (nxt_slow_path(recf == NULL)) {
2985             return NXT_ERROR;
2986         }
2987 
2988         recf->action = NXT_ROUTER_ENGINE_ADD;
2989 
2990         recf->engine = nxt_event_engine_create(task, interface, NULL, 0, 0);
2991         if (nxt_slow_path(recf->engine == NULL)) {
2992             return NXT_ERROR;
2993         }
2994 
2995         ret = nxt_router_engine_conf_create(tmcf, recf);
2996         if (nxt_slow_path(ret != NXT_OK)) {
2997             return ret;
2998         }
2999 
3000         n++;
3001     }
3002 
3003     return NXT_OK;
3004 }
3005 
3006 
3007 static nxt_int_t
nxt_router_engine_conf_create(nxt_router_temp_conf_t * tmcf,nxt_router_engine_conf_t * recf)3008 nxt_router_engine_conf_create(nxt_router_temp_conf_t *tmcf,
3009     nxt_router_engine_conf_t *recf)
3010 {
3011     nxt_int_t  ret;
3012 
3013     ret = nxt_router_engine_joints_create(tmcf, recf, &creating_sockets,
3014                                           nxt_router_listen_socket_create);
3015     if (nxt_slow_path(ret != NXT_OK)) {
3016         return ret;
3017     }
3018 
3019     ret = nxt_router_engine_joints_create(tmcf, recf, &updating_sockets,
3020                                           nxt_router_listen_socket_create);
3021     if (nxt_slow_path(ret != NXT_OK)) {
3022         return ret;
3023     }
3024 
3025     return ret;
3026 }
3027 
3028 
3029 static nxt_int_t
nxt_router_engine_conf_update(nxt_router_temp_conf_t * tmcf,nxt_router_engine_conf_t * recf)3030 nxt_router_engine_conf_update(nxt_router_temp_conf_t *tmcf,
3031     nxt_router_engine_conf_t *recf)
3032 {
3033     nxt_int_t  ret;
3034 
3035     ret = nxt_router_engine_joints_create(tmcf, recf, &creating_sockets,
3036                                           nxt_router_listen_socket_create);
3037     if (nxt_slow_path(ret != NXT_OK)) {
3038         return ret;
3039     }
3040 
3041     ret = nxt_router_engine_joints_create(tmcf, recf, &updating_sockets,
3042                                           nxt_router_listen_socket_update);
3043     if (nxt_slow_path(ret != NXT_OK)) {
3044         return ret;
3045     }
3046 
3047     ret = nxt_router_engine_joints_delete(tmcf, recf, &deleting_sockets);
3048     if (nxt_slow_path(ret != NXT_OK)) {
3049         return ret;
3050     }
3051 
3052     return ret;
3053 }
3054 
3055 
3056 static nxt_int_t
nxt_router_engine_conf_delete(nxt_router_temp_conf_t * tmcf,nxt_router_engine_conf_t * recf)3057 nxt_router_engine_conf_delete(nxt_router_temp_conf_t *tmcf,
3058     nxt_router_engine_conf_t *recf)
3059 {
3060     nxt_int_t  ret;
3061 
3062     ret = nxt_router_engine_quit(tmcf, recf);
3063     if (nxt_slow_path(ret != NXT_OK)) {
3064         return ret;
3065     }
3066 
3067     ret = nxt_router_engine_joints_delete(tmcf, recf, &updating_sockets);
3068     if (nxt_slow_path(ret != NXT_OK)) {
3069         return ret;
3070     }
3071 
3072     return nxt_router_engine_joints_delete(tmcf, recf, &deleting_sockets);
3073 }
3074 
3075 
3076 static nxt_int_t
nxt_router_engine_joints_create(nxt_router_temp_conf_t * tmcf,nxt_router_engine_conf_t * recf,nxt_queue_t * sockets,nxt_work_handler_t handler)3077 nxt_router_engine_joints_create(nxt_router_temp_conf_t *tmcf,
3078     nxt_router_engine_conf_t *recf, nxt_queue_t *sockets,
3079     nxt_work_handler_t handler)
3080 {
3081     nxt_int_t                ret;
3082     nxt_joint_job_t          *job;
3083     nxt_queue_link_t         *qlk;
3084     nxt_socket_conf_t        *skcf;
3085     nxt_socket_conf_joint_t  *joint;
3086 
3087     for (qlk = nxt_queue_first(sockets);
3088          qlk != nxt_queue_tail(sockets);
3089          qlk = nxt_queue_next(qlk))
3090     {
3091         job = nxt_mp_get(tmcf->mem_pool, sizeof(nxt_joint_job_t));
3092         if (nxt_slow_path(job == NULL)) {
3093             return NXT_ERROR;
3094         }
3095 
3096         job->work.next = recf->jobs;
3097         recf->jobs = &job->work;
3098 
3099         job->task = tmcf->engine->task;
3100         job->work.handler = handler;
3101         job->work.task = &job->task;
3102         job->work.obj = job;
3103         job->tmcf = tmcf;
3104 
3105         tmcf->count++;
3106 
3107         joint = nxt_mp_alloc(tmcf->router_conf->mem_pool,
3108                              sizeof(nxt_socket_conf_joint_t));
3109         if (nxt_slow_path(joint == NULL)) {
3110             return NXT_ERROR;
3111         }
3112 
3113         job->work.data = joint;
3114 
3115         ret = nxt_upstreams_joint_create(tmcf, &joint->upstreams);
3116         if (nxt_slow_path(ret != NXT_OK)) {
3117             return ret;
3118         }
3119 
3120         joint->count = 1;
3121 
3122         skcf = nxt_queue_link_data(qlk, nxt_socket_conf_t, link);
3123         skcf->count++;
3124         joint->socket_conf = skcf;
3125 
3126         joint->engine = recf->engine;
3127     }
3128 
3129     return NXT_OK;
3130 }
3131 
3132 
3133 static nxt_int_t
nxt_router_engine_quit(nxt_router_temp_conf_t * tmcf,nxt_router_engine_conf_t * recf)3134 nxt_router_engine_quit(nxt_router_temp_conf_t *tmcf,
3135     nxt_router_engine_conf_t *recf)
3136 {
3137     nxt_joint_job_t  *job;
3138 
3139     job = nxt_mp_get(tmcf->mem_pool, sizeof(nxt_joint_job_t));
3140     if (nxt_slow_path(job == NULL)) {
3141         return NXT_ERROR;
3142     }
3143 
3144     job->work.next = recf->jobs;
3145     recf->jobs = &job->work;
3146 
3147     job->task = tmcf->engine->task;
3148     job->work.handler = nxt_router_worker_thread_quit;
3149     job->work.task = &job->task;
3150     job->work.obj = NULL;
3151     job->work.data = NULL;
3152     job->tmcf = NULL;
3153 
3154     return NXT_OK;
3155 }
3156 
3157 
3158 static nxt_int_t
nxt_router_engine_joints_delete(nxt_router_temp_conf_t * tmcf,nxt_router_engine_conf_t * recf,nxt_queue_t * sockets)3159