xref: /unit/src/nxt_router.c (revision 2248:67f848571b9f)
1 
2 /*
3  * Copyright (C) Igor Sysoev
4  * Copyright (C) Valentin V. Bartenev
5  * Copyright (C) NGINX, Inc.
6  */
7 
8 #include <nxt_router.h>
9 #include <nxt_conf.h>
10 #include <nxt_status.h>
11 #if (NXT_TLS)
12 #include <nxt_cert.h>
13 #endif
14 #include <nxt_http.h>
15 #include <nxt_port_memory_int.h>
16 #include <nxt_unit_request.h>
17 #include <nxt_unit_response.h>
18 #include <nxt_router_request.h>
19 #include <nxt_app_queue.h>
20 #include <nxt_port_queue.h>
21 
22 #define NXT_SHARED_PORT_ID  0xFFFFu
23 
24 typedef struct {
25     nxt_str_t         type;
26     uint32_t          processes;
27     uint32_t          max_processes;
28     uint32_t          spare_processes;
29     nxt_msec_t        timeout;
30     nxt_msec_t        idle_timeout;
31     nxt_conf_value_t  *limits_value;
32     nxt_conf_value_t  *processes_value;
33     nxt_conf_value_t  *targets_value;
34 } nxt_router_app_conf_t;
35 
36 
37 typedef struct {
38     nxt_str_t         pass;
39     nxt_str_t         application;
40 } nxt_router_listener_conf_t;
41 
42 
43 #if (NXT_TLS)
44 
45 typedef struct {
46     nxt_str_t               name;
47     nxt_socket_conf_t       *socket_conf;
48     nxt_router_temp_conf_t  *temp_conf;
49     nxt_tls_init_t          *tls_init;
50     nxt_bool_t              last;
51 
52     nxt_queue_link_t        link;  /* for nxt_socket_conf_t.tls */
53 } nxt_router_tlssock_t;
54 
55 #endif
56 
57 
58 typedef struct {
59     nxt_str_t               *name;
60     nxt_socket_conf_t       *socket_conf;
61     nxt_router_temp_conf_t  *temp_conf;
62     nxt_bool_t              last;
63 } nxt_socket_rpc_t;
64 
65 
66 typedef struct {
67     nxt_app_t               *app;
68     nxt_router_temp_conf_t  *temp_conf;
69     uint8_t                 proto;  /* 1 bit */
70 } nxt_app_rpc_t;
71 
72 
73 typedef struct {
74     nxt_app_joint_t         *app_joint;
75     uint32_t                generation;
76     uint8_t                 proto;  /* 1 bit */
77 } nxt_app_joint_rpc_t;
78 
79 
80 static nxt_int_t nxt_router_prefork(nxt_task_t *task, nxt_process_t *process,
81     nxt_mp_t *mp);
82 static nxt_int_t nxt_router_start(nxt_task_t *task, nxt_process_data_t *data);
83 static void nxt_router_greet_controller(nxt_task_t *task,
84     nxt_port_t *controller_port);
85 
86 static nxt_int_t nxt_router_start_app_process(nxt_task_t *task, nxt_app_t *app);
87 
88 static void nxt_router_new_port_handler(nxt_task_t *task,
89     nxt_port_recv_msg_t *msg);
90 static void nxt_router_conf_data_handler(nxt_task_t *task,
91     nxt_port_recv_msg_t *msg);
92 static void nxt_router_app_restart_handler(nxt_task_t *task,
93     nxt_port_recv_msg_t *msg);
94 static void nxt_router_status_handler(nxt_task_t *task,
95     nxt_port_recv_msg_t *msg);
96 static void nxt_router_remove_pid_handler(nxt_task_t *task,
97     nxt_port_recv_msg_t *msg);
98 
99 static nxt_router_temp_conf_t *nxt_router_temp_conf(nxt_task_t *task);
100 static void nxt_router_conf_ready(nxt_task_t *task,
101     nxt_router_temp_conf_t *tmcf);
102 static void nxt_router_conf_send(nxt_task_t *task,
103     nxt_router_temp_conf_t *tmcf, nxt_port_msg_type_t type);
104 
105 static nxt_int_t nxt_router_conf_create(nxt_task_t *task,
106     nxt_router_temp_conf_t *tmcf, u_char *start, u_char *end);
107 static nxt_int_t nxt_router_conf_process_static(nxt_task_t *task,
108     nxt_router_conf_t *rtcf, nxt_conf_value_t *conf);
109 static nxt_http_forward_t *nxt_router_conf_forward(nxt_task_t *task,
110     nxt_mp_t *mp, nxt_conf_value_t *conf);
111 static nxt_int_t nxt_router_conf_forward_header(nxt_mp_t *mp,
112     nxt_conf_value_t *conf, nxt_http_forward_header_t *fh);
113 
114 static nxt_app_t *nxt_router_app_find(nxt_queue_t *queue, nxt_str_t *name);
115 static nxt_int_t nxt_router_apps_hash_test(nxt_lvlhsh_query_t *lhq, void *data);
116 static nxt_int_t nxt_router_apps_hash_add(nxt_router_conf_t *rtcf,
117     nxt_app_t *app);
118 static nxt_app_t *nxt_router_apps_hash_get(nxt_router_conf_t *rtcf,
119     nxt_str_t *name);
120 static void nxt_router_apps_hash_use(nxt_task_t *task, nxt_router_conf_t *rtcf,
121     int i);
122 
123 static nxt_int_t nxt_router_app_queue_init(nxt_task_t *task,
124     nxt_port_t *port);
125 static nxt_int_t nxt_router_port_queue_init(nxt_task_t *task,
126     nxt_port_t *port);
127 static nxt_int_t nxt_router_port_queue_map(nxt_task_t *task,
128     nxt_port_t *port, nxt_fd_t fd);
129 static void nxt_router_listen_socket_rpc_create(nxt_task_t *task,
130     nxt_router_temp_conf_t *tmcf, nxt_socket_conf_t *skcf);
131 static void nxt_router_listen_socket_ready(nxt_task_t *task,
132     nxt_port_recv_msg_t *msg, void *data);
133 static void nxt_router_listen_socket_error(nxt_task_t *task,
134     nxt_port_recv_msg_t *msg, void *data);
135 #if (NXT_TLS)
136 static void nxt_router_tls_rpc_handler(nxt_task_t *task,
137     nxt_port_recv_msg_t *msg, void *data);
138 static nxt_int_t nxt_router_conf_tls_insert(nxt_router_temp_conf_t *tmcf,
139     nxt_conf_value_t *value, nxt_socket_conf_t *skcf, nxt_tls_init_t *tls_init,
140     nxt_bool_t last);
141 #endif
142 static void nxt_router_app_rpc_create(nxt_task_t *task,
143     nxt_router_temp_conf_t *tmcf, nxt_app_t *app);
144 static void nxt_router_app_prefork_ready(nxt_task_t *task,
145     nxt_port_recv_msg_t *msg, void *data);
146 static void nxt_router_app_prefork_error(nxt_task_t *task,
147     nxt_port_recv_msg_t *msg, void *data);
148 static nxt_socket_conf_t *nxt_router_socket_conf(nxt_task_t *task,
149     nxt_router_temp_conf_t *tmcf, nxt_str_t *name);
150 static nxt_int_t nxt_router_listen_socket_find(nxt_router_temp_conf_t *tmcf,
151     nxt_socket_conf_t *nskcf, nxt_sockaddr_t *sa);
152 
153 static nxt_int_t nxt_router_engines_create(nxt_task_t *task,
154     nxt_router_t *router, nxt_router_temp_conf_t *tmcf,
155     const nxt_event_interface_t *interface);
156 static nxt_int_t nxt_router_engine_conf_create(nxt_router_temp_conf_t *tmcf,
157     nxt_router_engine_conf_t *recf);
158 static nxt_int_t nxt_router_engine_conf_update(nxt_router_temp_conf_t *tmcf,
159     nxt_router_engine_conf_t *recf);
160 static nxt_int_t nxt_router_engine_conf_delete(nxt_router_temp_conf_t *tmcf,
161     nxt_router_engine_conf_t *recf);
162 static nxt_int_t nxt_router_engine_joints_create(nxt_router_temp_conf_t *tmcf,
163     nxt_router_engine_conf_t *recf, nxt_queue_t *sockets,
164     nxt_work_handler_t handler);
165 static nxt_int_t nxt_router_engine_quit(nxt_router_temp_conf_t *tmcf,
166     nxt_router_engine_conf_t *recf);
167 static nxt_int_t nxt_router_engine_joints_delete(nxt_router_temp_conf_t *tmcf,
168     nxt_router_engine_conf_t *recf, nxt_queue_t *sockets);
169 
170 static nxt_int_t nxt_router_threads_create(nxt_task_t *task, nxt_runtime_t *rt,
171     nxt_router_temp_conf_t *tmcf);
172 static nxt_int_t nxt_router_thread_create(nxt_task_t *task, nxt_runtime_t *rt,
173     nxt_event_engine_t *engine);
174 static void nxt_router_apps_sort(nxt_task_t *task, nxt_router_t *router,
175     nxt_router_temp_conf_t *tmcf);
176 
177 static void nxt_router_engines_post(nxt_router_t *router,
178     nxt_router_temp_conf_t *tmcf);
179 static void nxt_router_engine_post(nxt_event_engine_t *engine,
180     nxt_work_t *jobs);
181 
182 static void nxt_router_thread_start(void *data);
183 static void nxt_router_rt_add_port(nxt_task_t *task, void *obj,
184     void *data);
185 static void nxt_router_listen_socket_create(nxt_task_t *task, void *obj,
186     void *data);
187 static void nxt_router_listen_socket_update(nxt_task_t *task, void *obj,
188     void *data);
189 static void nxt_router_listen_socket_delete(nxt_task_t *task, void *obj,
190     void *data);
191 static void nxt_router_worker_thread_quit(nxt_task_t *task, void *obj,
192     void *data);
193 static void nxt_router_listen_socket_close(nxt_task_t *task, void *obj,
194     void *data);
195 static void nxt_router_thread_exit_handler(nxt_task_t *task, void *obj,
196     void *data);
197 static void nxt_router_req_headers_ack_handler(nxt_task_t *task,
198     nxt_port_recv_msg_t *msg, nxt_request_rpc_data_t *req_rpc_data);
199 static void nxt_router_listen_socket_release(nxt_task_t *task,
200     nxt_socket_conf_t *skcf);
201 
202 static void nxt_router_app_port_ready(nxt_task_t *task,
203     nxt_port_recv_msg_t *msg, void *data);
204 static void nxt_router_app_port_error(nxt_task_t *task,
205     nxt_port_recv_msg_t *msg, void *data);
206 
207 static void nxt_router_app_use(nxt_task_t *task, nxt_app_t *app, int i);
208 static void nxt_router_app_unlink(nxt_task_t *task, nxt_app_t *app);
209 
210 static void nxt_router_app_port_release(nxt_task_t *task, nxt_app_t *app,
211     nxt_port_t *port, nxt_apr_action_t action);
212 static void nxt_router_app_port_get(nxt_task_t *task, nxt_app_t *app,
213     nxt_request_rpc_data_t *req_rpc_data);
214 static void nxt_router_http_request_error(nxt_task_t *task, void *obj,
215     void *data);
216 static void nxt_router_http_request_done(nxt_task_t *task, void *obj,
217     void *data);
218 
219 static void nxt_router_app_prepare_request(nxt_task_t *task,
220     nxt_request_rpc_data_t *req_rpc_data);
221 static nxt_buf_t *nxt_router_prepare_msg(nxt_task_t *task,
222     nxt_http_request_t *r, nxt_app_t *app, const nxt_str_t *prefix);
223 
224 static void nxt_router_app_timeout(nxt_task_t *task, void *obj, void *data);
225 static void nxt_router_adjust_idle_timer(nxt_task_t *task, void *obj,
226     void *data);
227 static void nxt_router_app_idle_timeout(nxt_task_t *task, void *obj,
228     void *data);
229 static void nxt_router_app_joint_release_handler(nxt_task_t *task, void *obj,
230     void *data);
231 static void nxt_router_free_app(nxt_task_t *task, void *obj, void *data);
232 
233 static const nxt_http_request_state_t  nxt_http_request_send_state;
234 static void nxt_http_request_send_body(nxt_task_t *task, void *obj, void *data);
235 
236 static void nxt_router_app_joint_use(nxt_task_t *task,
237     nxt_app_joint_t *app_joint, int i);
238 
239 static void nxt_router_http_request_release_post(nxt_task_t *task,
240     nxt_http_request_t *r);
241 static void nxt_router_http_request_release(nxt_task_t *task, void *obj,
242     void *data);
243 static void nxt_router_oosm_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg);
244 static void nxt_router_get_port_handler(nxt_task_t *task,
245     nxt_port_recv_msg_t *msg);
246 static void nxt_router_get_mmap_handler(nxt_task_t *task,
247     nxt_port_recv_msg_t *msg);
248 
249 extern const nxt_http_request_state_t  nxt_http_websocket;
250 
251 nxt_router_t  *nxt_router;
252 
253 static const nxt_str_t http_prefix = nxt_string("HTTP_");
254 static const nxt_str_t empty_prefix = nxt_string("");
255 
256 static const nxt_str_t  *nxt_app_msg_prefix[] = {
257     &empty_prefix,
258     &empty_prefix,
259     &http_prefix,
260     &http_prefix,
261     &http_prefix,
262     &empty_prefix,
263 };
264 
265 
266 static const nxt_port_handlers_t  nxt_router_process_port_handlers = {
267     .quit         = nxt_signal_quit_handler,
268     .new_port     = nxt_router_new_port_handler,
269     .get_port     = nxt_router_get_port_handler,
270     .change_file  = nxt_port_change_log_file_handler,
271     .mmap         = nxt_port_mmap_handler,
272     .get_mmap     = nxt_router_get_mmap_handler,
273     .data         = nxt_router_conf_data_handler,
274     .app_restart  = nxt_router_app_restart_handler,
275     .status       = nxt_router_status_handler,
276     .remove_pid   = nxt_router_remove_pid_handler,
277     .access_log   = nxt_router_access_log_reopen_handler,
278     .rpc_ready    = nxt_port_rpc_handler,
279     .rpc_error    = nxt_port_rpc_handler,
280     .oosm         = nxt_router_oosm_handler,
281 };
282 
283 
284 const nxt_process_init_t  nxt_router_process = {
285     .name           = "router",
286     .type           = NXT_PROCESS_ROUTER,
287     .prefork        = nxt_router_prefork,
288     .restart        = 1,
289     .setup          = nxt_process_core_setup,
290     .start          = nxt_router_start,
291     .port_handlers  = &nxt_router_process_port_handlers,
292     .signals        = nxt_process_signals,
293 };
294 
295 
296 /* Queues of nxt_socket_conf_t */
297 nxt_queue_t  creating_sockets;
298 nxt_queue_t  pending_sockets;
299 nxt_queue_t  updating_sockets;
300 nxt_queue_t  keeping_sockets;
301 nxt_queue_t  deleting_sockets;
302 
303 
304 static nxt_int_t
nxt_router_prefork(nxt_task_t * task,nxt_process_t * process,nxt_mp_t * mp)305 nxt_router_prefork(nxt_task_t *task, nxt_process_t *process, nxt_mp_t *mp)
306 {
307     nxt_runtime_stop_app_processes(task, task->thread->runtime);
308 
309     return NXT_OK;
310 }
311 
312 
313 static nxt_int_t
nxt_router_start(nxt_task_t * task,nxt_process_data_t * data)314 nxt_router_start(nxt_task_t *task, nxt_process_data_t *data)
315 {
316     nxt_int_t      ret;
317     nxt_port_t     *controller_port;
318     nxt_router_t   *router;
319     nxt_runtime_t  *rt;
320 
321     rt = task->thread->runtime;
322 
323     nxt_log(task, NXT_LOG_INFO, "router started");
324 
325 #if (NXT_TLS)
326     rt->tls = nxt_service_get(rt->services, "SSL/TLS", "OpenSSL");
327     if (nxt_slow_path(rt->tls == NULL)) {
328         return NXT_ERROR;
329     }
330 
331     ret = rt->tls->library_init(task);
332     if (nxt_slow_path(ret != NXT_OK)) {
333         return ret;
334     }
335 #endif
336 
337     ret = nxt_http_init(task);
338     if (nxt_slow_path(ret != NXT_OK)) {
339         return ret;
340     }
341 
342     router = nxt_zalloc(sizeof(nxt_router_t));
343     if (nxt_slow_path(router == NULL)) {
344         return NXT_ERROR;
345     }
346 
347     nxt_queue_init(&router->engines);
348     nxt_queue_init(&router->sockets);
349     nxt_queue_init(&router->apps);
350 
351     nxt_router = router;
352 
353     controller_port = rt->port_by_type[NXT_PROCESS_CONTROLLER];
354     if (controller_port != NULL) {
355         nxt_router_greet_controller(task, controller_port);
356     }
357 
358     return NXT_OK;
359 }
360 
361 
362 static void
nxt_router_greet_controller(nxt_task_t * task,nxt_port_t * controller_port)363 nxt_router_greet_controller(nxt_task_t *task, nxt_port_t *controller_port)
364 {
365     nxt_port_socket_write(task, controller_port, NXT_PORT_MSG_PROCESS_READY,
366                           -1, 0, 0, NULL);
367 }
368 
369 
370 static void
nxt_router_start_app_process_handler(nxt_task_t * task,nxt_port_t * port,void * data)371 nxt_router_start_app_process_handler(nxt_task_t *task, nxt_port_t *port,
372     void *data)
373 {
374     size_t               size;
375     uint32_t             stream;
376     nxt_fd_t             port_fd, queue_fd;
377     nxt_int_t            ret;
378     nxt_app_t            *app;
379     nxt_buf_t            *b;
380     nxt_port_t           *dport;
381     nxt_runtime_t        *rt;
382     nxt_app_joint_rpc_t  *app_joint_rpc;
383 
384     app = data;
385 
386     nxt_thread_mutex_lock(&app->mutex);
387 
388     dport = app->proto_port;
389 
390     nxt_thread_mutex_unlock(&app->mutex);
391 
392     if (dport != NULL) {
393         nxt_debug(task, "app '%V' %p start process", &app->name, app);
394 
395         b = NULL;
396         port_fd = -1;
397         queue_fd = -1;
398 
399     } else {
400         if (app->proto_port_requests > 0) {
401             nxt_debug(task, "app '%V' %p wait for prototype process",
402                       &app->name, app);
403 
404             app->proto_port_requests++;
405 
406             goto skip;
407         }
408 
409         nxt_debug(task, "app '%V' %p start prototype process", &app->name, app);
410 
411         rt = task->thread->runtime;
412         dport = rt->port_by_type[NXT_PROCESS_MAIN];
413 
414         size = app->name.length + 1 + app->conf.length;
415 
416         b = nxt_buf_mem_alloc(task->thread->engine->mem_pool, size, 0);
417         if (nxt_slow_path(b == NULL)) {
418             goto failed;
419         }
420 
421         nxt_buf_cpystr(b, &app->name);
422         *b->mem.free++ = '\0';
423         nxt_buf_cpystr(b, &app->conf);
424 
425         port_fd = app->shared_port->pair[0];
426         queue_fd = app->shared_port->queue_fd;
427     }
428 
429     app_joint_rpc = nxt_port_rpc_register_handler_ex(task, port,
430                                                      nxt_router_app_port_ready,
431                                                      nxt_router_app_port_error,
432                                                    sizeof(nxt_app_joint_rpc_t));
433     if (nxt_slow_path(app_joint_rpc == NULL)) {
434         goto failed;
435     }
436 
437     stream = nxt_port_rpc_ex_stream(app_joint_rpc);
438 
439     ret = nxt_port_socket_write2(task, dport, NXT_PORT_MSG_START_PROCESS,
440                                  port_fd, queue_fd, stream, port->id, b);
441     if (nxt_slow_path(ret != NXT_OK)) {
442         nxt_port_rpc_cancel(task, port, stream);
443 
444         goto failed;
445     }
446 
447     app_joint_rpc->app_joint = app->joint;
448     app_joint_rpc->generation = app->generation;
449     app_joint_rpc->proto = (b != NULL);
450 
451     if (b != NULL) {
452         app->proto_port_requests++;
453 
454         b = NULL;
455     }
456 
457     nxt_router_app_joint_use(task, app->joint, 1);
458 
459 failed:
460 
461     if (b != NULL) {
462         nxt_mp_free(b->data, b);
463     }
464 
465 skip:
466 
467     nxt_router_app_use(task, app, -1);
468 }
469 
470 
471 static void
nxt_router_app_joint_use(nxt_task_t * task,nxt_app_joint_t * app_joint,int i)472 nxt_router_app_joint_use(nxt_task_t *task, nxt_app_joint_t *app_joint, int i)
473 {
474     app_joint->use_count += i;
475 
476     if (app_joint->use_count == 0) {
477         nxt_assert(app_joint->app == NULL);
478 
479         nxt_free(app_joint);
480     }
481 }
482 
483 
484 static nxt_int_t
nxt_router_start_app_process(nxt_task_t * task,nxt_app_t * app)485 nxt_router_start_app_process(nxt_task_t *task, nxt_app_t *app)
486 {
487     nxt_int_t      res;
488     nxt_port_t     *router_port;
489     nxt_runtime_t  *rt;
490 
491     nxt_debug(task, "app '%V' start process", &app->name);
492 
493     rt = task->thread->runtime;
494     router_port = rt->port_by_type[NXT_PROCESS_ROUTER];
495 
496     nxt_router_app_use(task, app, 1);
497 
498     res = nxt_port_post(task, router_port, nxt_router_start_app_process_handler,
499                         app);
500 
501     if (res == NXT_OK) {
502         return res;
503     }
504 
505     nxt_thread_mutex_lock(&app->mutex);
506 
507     app->pending_processes--;
508 
509     nxt_thread_mutex_unlock(&app->mutex);
510 
511     nxt_router_app_use(task, app, -1);
512 
513     return NXT_ERROR;
514 }
515 
516 
517 nxt_inline nxt_bool_t
nxt_router_msg_cancel(nxt_task_t * task,nxt_request_rpc_data_t * req_rpc_data)518 nxt_router_msg_cancel(nxt_task_t *task, nxt_request_rpc_data_t *req_rpc_data)
519 {
520     nxt_buf_t       *b, *next;
521     nxt_bool_t      cancelled;
522     nxt_port_t      *app_port;
523     nxt_msg_info_t  *msg_info;
524 
525     msg_info = &req_rpc_data->msg_info;
526 
527     if (msg_info->buf == NULL) {
528         return 0;
529     }
530 
531     app_port = req_rpc_data->app_port;
532 
533     if (app_port != NULL && app_port->id == NXT_SHARED_PORT_ID) {
534         cancelled = nxt_app_queue_cancel(app_port->queue,
535                                          msg_info->tracking_cookie,
536                                          req_rpc_data->stream);
537 
538         if (cancelled) {
539             nxt_debug(task, "stream #%uD: cancelled by router",
540                       req_rpc_data->stream);
541         }
542 
543     } else {
544         cancelled = 0;
545     }
546 
547     for (b = msg_info->buf; b != NULL; b = next) {
548         next = b->next;
549         b->next = NULL;
550 
551         if (b->is_port_mmap_sent) {
552             b->is_port_mmap_sent = cancelled == 0;
553         }
554 
555         b->completion_handler(task, b, b->parent);
556     }
557 
558     msg_info->buf = NULL;
559 
560     return cancelled;
561 }
562 
563 
564 nxt_inline nxt_bool_t
nxt_queue_chk_remove(nxt_queue_link_t * lnk)565 nxt_queue_chk_remove(nxt_queue_link_t *lnk)
566 {
567     if (lnk->next != NULL) {
568         nxt_queue_remove(lnk);
569 
570         lnk->next = NULL;
571 
572         return 1;
573     }
574 
575     return 0;
576 }
577 
578 
579 nxt_inline void
nxt_request_rpc_data_unlink(nxt_task_t * task,nxt_request_rpc_data_t * req_rpc_data)580 nxt_request_rpc_data_unlink(nxt_task_t *task,
581     nxt_request_rpc_data_t *req_rpc_data)
582 {
583     nxt_app_t           *app;
584     nxt_bool_t          unlinked;
585     nxt_http_request_t  *r;
586 
587     nxt_router_msg_cancel(task, req_rpc_data);
588 
589     app = req_rpc_data->app;
590 
591     if (req_rpc_data->app_port != NULL) {
592         nxt_router_app_port_release(task, app, req_rpc_data->app_port,
593                                     req_rpc_data->apr_action);
594 
595         req_rpc_data->app_port = NULL;
596     }
597 
598     r = req_rpc_data->request;
599 
600     if (r != NULL) {
601         r->timer_data = NULL;
602 
603         nxt_router_http_request_release_post(task, r);
604 
605         r->req_rpc_data = NULL;
606         req_rpc_data->request = NULL;
607 
608         if (app != NULL) {
609             unlinked = 0;
610 
611             nxt_thread_mutex_lock(&app->mutex);
612 
613             if (r->app_link.next != NULL) {
614                 nxt_queue_remove(&r->app_link);
615                 r->app_link.next = NULL;
616 
617                 unlinked = 1;
618             }
619 
620             nxt_thread_mutex_unlock(&app->mutex);
621 
622             if (unlinked) {
623                 nxt_mp_release(r->mem_pool);
624             }
625         }
626     }
627 
628     if (app != NULL) {
629         nxt_router_app_use(task, app, -1);
630 
631         req_rpc_data->app = NULL;
632     }
633 
634     if (req_rpc_data->msg_info.body_fd != -1) {
635         nxt_fd_close(req_rpc_data->msg_info.body_fd);
636 
637         req_rpc_data->msg_info.body_fd = -1;
638     }
639 
640     if (req_rpc_data->rpc_cancel) {
641         req_rpc_data->rpc_cancel = 0;
642 
643         nxt_port_rpc_cancel(task, task->thread->engine->port,
644                             req_rpc_data->stream);
645     }
646 }
647 
648 
649 static void
nxt_router_new_port_handler(nxt_task_t * task,nxt_port_recv_msg_t * msg)650 nxt_router_new_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
651 {
652     nxt_int_t      res;
653     nxt_app_t      *app;
654     nxt_port_t     *port, *main_app_port;
655     nxt_runtime_t  *rt;
656 
657     nxt_port_new_port_handler(task, msg);
658 
659     port = msg->u.new_port;
660 
661     if (port != NULL && port->type == NXT_PROCESS_CONTROLLER) {
662         nxt_router_greet_controller(task, msg->u.new_port);
663     }
664 
665     if (port != NULL && port->type == NXT_PROCESS_PROTOTYPE)  {
666         nxt_port_rpc_handler(task, msg);
667 
668         return;
669     }
670 
671     if (port == NULL || port->type != NXT_PROCESS_APP) {
672 
673         if (msg->port_msg.stream == 0) {
674             return;
675         }
676 
677         msg->port_msg.type = _NXT_PORT_MSG_RPC_ERROR;
678 
679     } else {
680         if (msg->fd[1] != -1) {
681             res = nxt_router_port_queue_map(task, port, msg->fd[1]);
682             if (nxt_slow_path(res != NXT_OK)) {
683                 return;
684             }
685 
686             nxt_fd_close(msg->fd[1]);
687             msg->fd[1] = -1;
688         }
689     }
690 
691     if (msg->port_msg.stream != 0) {
692         nxt_port_rpc_handler(task, msg);
693         return;
694     }
695 
696     nxt_debug(task, "new port id %d (%d)", port->id, port->type);
697 
698     /*
699      * Port with "id == 0" is application 'main' port and it always
700      * should come with non-zero stream.
701      */
702     nxt_assert(port->id != 0);
703 
704     /* Find 'main' app port and get app reference. */
705     rt = task->thread->runtime;
706 
707     /*
708      * It is safe to access 'runtime->ports' hash because 'NEW_PORT'
709      * sent to main port (with id == 0) and processed in main thread.
710      */
711     main_app_port = nxt_port_hash_find(&rt->ports, port->pid, 0);
712     nxt_assert(main_app_port != NULL);
713 
714     app = main_app_port->app;
715 
716     if (nxt_fast_path(app != NULL)) {
717         nxt_thread_mutex_lock(&app->mutex);
718 
719         /* TODO here should be find-and-add code because there can be
720            port waiters in port_hash */
721         nxt_port_hash_add(&app->port_hash, port);
722         app->port_hash_count++;
723 
724         nxt_thread_mutex_unlock(&app->mutex);
725 
726         port->app = app;
727     }
728 
729     port->main_app_port = main_app_port;
730 
731     nxt_port_socket_write(task, port, NXT_PORT_MSG_PORT_ACK, -1, 0, 0, NULL);
732 }
733 
734 
735 static void
nxt_router_conf_data_handler(nxt_task_t * task,nxt_port_recv_msg_t * msg)736 nxt_router_conf_data_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
737 {
738     void                    *p;
739     size_t                  size;
740     nxt_int_t               ret;
741     nxt_port_t              *port;
742     nxt_router_temp_conf_t  *tmcf;
743 
744     port = nxt_runtime_port_find(task->thread->runtime,
745                                  msg->port_msg.pid,
746                                  msg->port_msg.reply_port);
747     if (nxt_slow_path(port == NULL)) {
748         nxt_alert(task, "conf_data_handler: reply port not found");
749         return;
750     }
751 
752     p = MAP_FAILED;
753 
754     /*
755      * Ancient compilers like gcc 4.8.5 on CentOS 7 wants 'size' to be
756      * initialized in 'cleanup' section.
757      */
758     size = 0;
759 
760     tmcf = nxt_router_temp_conf(task);
761     if (nxt_slow_path(tmcf == NULL)) {
762         goto fail;
763     }
764 
765     if (nxt_slow_path(msg->fd[0] == -1)) {
766         nxt_alert(task, "conf_data_handler: invalid shm fd");
767         goto fail;
768     }
769 
770     if (nxt_buf_mem_used_size(&msg->buf->mem) != sizeof(size_t)) {
771         nxt_alert(task, "conf_data_handler: unexpected buffer size (%d)",
772                   (int) nxt_buf_mem_used_size(&msg->buf->mem));
773         goto fail;
774     }
775 
776     nxt_memcpy(&size, msg->buf->mem.pos, sizeof(size_t));
777 
778     p = nxt_mem_mmap(NULL, size, PROT_READ, MAP_SHARED, msg->fd[0], 0);
779 
780     nxt_fd_close(msg->fd[0]);
781     msg->fd[0] = -1;
782 
783     if (nxt_slow_path(p == MAP_FAILED)) {
784         goto fail;
785     }
786 
787     nxt_debug(task, "conf_data_handler(%uz): %*s", size, size, p);
788 
789     tmcf->router_conf->router = nxt_router;
790     tmcf->stream = msg->port_msg.stream;
791     tmcf->port = port;
792 
793     nxt_port_use(task, tmcf->port, 1);
794 
795     ret = nxt_router_conf_create(task, tmcf, p, nxt_pointer_to(p, size));
796 
797     if (nxt_fast_path(ret == NXT_OK)) {
798         nxt_router_conf_apply(task, tmcf, NULL);
799 
800     } else {
801         nxt_router_conf_error(task, tmcf);
802     }
803 
804     goto cleanup;
805 
806 fail:
807 
808     nxt_port_socket_write(task, port, NXT_PORT_MSG_RPC_ERROR, -1,
809                           msg->port_msg.stream, 0, NULL);
810 
811     if (tmcf != NULL) {
812         nxt_mp_release(tmcf->mem_pool);
813     }
814 
815 cleanup:
816 
817     if (p != MAP_FAILED) {
818         nxt_mem_munmap(p, size);
819     }
820 
821     if (msg->fd[0] != -1) {
822         nxt_fd_close(msg->fd[0]);
823         msg->fd[0] = -1;
824     }
825 }
826 
827 
828 static void
nxt_router_app_restart_handler(nxt_task_t * task,nxt_port_recv_msg_t * msg)829 nxt_router_app_restart_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
830 {
831     nxt_app_t            *app;
832     nxt_int_t            ret;
833     nxt_str_t            app_name;
834     nxt_port_t           *reply_port, *shared_port, *old_shared_port;
835     nxt_port_t           *proto_port;
836     nxt_port_msg_type_t  reply;
837 
838     reply_port = nxt_runtime_port_find(task->thread->runtime,
839                                        msg->port_msg.pid,
840                                        msg->port_msg.reply_port);
841     if (nxt_slow_path(reply_port == NULL)) {
842         nxt_alert(task, "app_restart_handler: reply port not found");
843         return;
844     }
845 
846     app_name.length = nxt_buf_mem_used_size(&msg->buf->mem);
847     app_name.start = msg->buf->mem.pos;
848 
849     nxt_debug(task, "app_restart_handler: %V", &app_name);
850 
851     app = nxt_router_app_find(&nxt_router->apps, &app_name);
852 
853     if (nxt_fast_path(app != NULL)) {
854         shared_port = nxt_port_new(task, NXT_SHARED_PORT_ID, nxt_pid,
855                                    NXT_PROCESS_APP);
856         if (nxt_slow_path(shared_port == NULL)) {
857             goto fail;
858         }
859 
860         ret = nxt_port_socket_init(task, shared_port, 0);
861         if (nxt_slow_path(ret != NXT_OK)) {
862             nxt_port_use(task, shared_port, -1);
863             goto fail;
864         }
865 
866         ret = nxt_router_app_queue_init(task, shared_port);
867         if (nxt_slow_path(ret != NXT_OK)) {
868             nxt_port_write_close(shared_port);
869             nxt_port_read_close(shared_port);
870             nxt_port_use(task, shared_port, -1);
871             goto fail;
872         }
873 
874         nxt_port_write_enable(task, shared_port);
875 
876         nxt_thread_mutex_lock(&app->mutex);
877 
878         proto_port = app->proto_port;
879 
880         if (proto_port != NULL) {
881             nxt_debug(task, "send QUIT to prototype '%V' pid %PI", &app->name,
882                       proto_port->pid);
883 
884             app->proto_port = NULL;
885             proto_port->app = NULL;
886         }
887 
888         app->generation++;
889 
890         shared_port->app = app;
891 
892         old_shared_port = app->shared_port;
893         old_shared_port->app = NULL;
894 
895         app->shared_port = shared_port;
896 
897         nxt_thread_mutex_unlock(&app->mutex);
898 
899         nxt_port_close(task, old_shared_port);
900         nxt_port_use(task, old_shared_port, -1);
901 
902         if (proto_port != NULL) {
903             (void) nxt_port_socket_write(task, proto_port, NXT_PORT_MSG_QUIT,
904                                          -1, 0, 0, NULL);
905 
906             nxt_port_close(task, proto_port);
907 
908             nxt_port_use(task, proto_port, -1);
909         }
910 
911         reply = NXT_PORT_MSG_RPC_READY_LAST;
912 
913     } else {
914 
915 fail:
916 
917         reply = NXT_PORT_MSG_RPC_ERROR;
918     }
919 
920     nxt_port_socket_write(task, reply_port, reply, -1, msg->port_msg.stream,
921                           0, NULL);
922 }
923 
924 
925 static void
nxt_router_status_handler(nxt_task_t * task,nxt_port_recv_msg_t * msg)926 nxt_router_status_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
927 {
928     u_char               *p;
929     size_t               alloc;
930     nxt_app_t            *app;
931     nxt_buf_t            *b;
932     nxt_uint_t           type;
933     nxt_port_t           *port;
934     nxt_status_app_t     *app_stat;
935     nxt_event_engine_t   *engine;
936     nxt_status_report_t  *report;
937 
938     port = nxt_runtime_port_find(task->thread->runtime,
939                                  msg->port_msg.pid,
940                                  msg->port_msg.reply_port);
941     if (nxt_slow_path(port == NULL)) {
942         nxt_alert(task, "nxt_router_status_handler(): reply port not found");
943         return;
944     }
945 
946     alloc = sizeof(nxt_status_report_t);
947 
948     nxt_queue_each(app, &nxt_router->apps, nxt_app_t, link) {
949 
950         alloc += sizeof(nxt_status_app_t) + app->name.length;
951 
952     } nxt_queue_loop;
953 
954     b = nxt_buf_mem_alloc(port->mem_pool, alloc, 0);
955     if (nxt_slow_path(b == NULL)) {
956         type = NXT_PORT_MSG_RPC_ERROR;
957         goto fail;
958     }
959 
960     report = (nxt_status_report_t *) b->mem.free;
961     b->mem.free = b->mem.end;
962 
963     nxt_memzero(report, sizeof(nxt_status_report_t));
964 
965     nxt_queue_each(engine, &nxt_router->engines, nxt_event_engine_t, link0) {
966 
967         report->accepted_conns += engine->accepted_conns_cnt;
968         report->idle_conns += engine->idle_conns_cnt;
969         report->closed_conns += engine->closed_conns_cnt;
970         report->requests += engine->requests_cnt;
971 
972     } nxt_queue_loop;
973 
974     report->apps_count = 0;
975     app_stat = report->apps;
976     p = b->mem.end;
977 
978     nxt_queue_each(app, &nxt_router->apps, nxt_app_t, link) {
979         p -= app->name.length;
980 
981         nxt_memcpy(p, app->name.start, app->name.length);
982 
983         app_stat->name.length = app->name.length;
984         app_stat->name.start = (u_char *) (p - b->mem.pos);
985 
986         app_stat->active_requests = app->active_requests;
987         app_stat->pending_processes = app->pending_processes;
988         app_stat->processes = app->processes;
989         app_stat->idle_processes = app->idle_processes;
990 
991         report->apps_count++;
992         app_stat++;
993     } nxt_queue_loop;
994 
995     type = NXT_PORT_MSG_RPC_READY_LAST;
996 
997 fail:
998 
999     nxt_port_socket_write(task, port, type, -1, msg->port_msg.stream, 0, b);
1000 }
1001 
1002 
1003 static void
nxt_router_app_process_remove_pid(nxt_task_t * task,nxt_port_t * port,void * data)1004 nxt_router_app_process_remove_pid(nxt_task_t *task, nxt_port_t *port,
1005     void *data)
1006 {
1007     union {
1008         nxt_pid_t  removed_pid;
1009         void       *data;
1010     } u;
1011 
1012     u.data = data;
1013 
1014     nxt_port_rpc_remove_peer(task, port, u.removed_pid);
1015 }
1016 
1017 
1018 static void
nxt_router_remove_pid_handler(nxt_task_t * task,nxt_port_recv_msg_t * msg)1019 nxt_router_remove_pid_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
1020 {
1021     nxt_event_engine_t  *engine;
1022 
1023     nxt_port_remove_pid_handler(task, msg);
1024 
1025     nxt_queue_each(engine, &nxt_router->engines, nxt_event_engine_t, link0)
1026     {
1027         if (nxt_fast_path(engine->port != NULL)) {
1028             nxt_port_post(task, engine->port, nxt_router_app_process_remove_pid,
1029                           msg->u.data);
1030         }
1031     }
1032     nxt_queue_loop;
1033 
1034     if (msg->port_msg.stream == 0) {
1035         return;
1036     }
1037 
1038     msg->port_msg.type = _NXT_PORT_MSG_RPC_ERROR;
1039 
1040     nxt_port_rpc_handler(task, msg);
1041 }
1042 
1043 
1044 static nxt_router_temp_conf_t *
nxt_router_temp_conf(nxt_task_t * task)1045 nxt_router_temp_conf(nxt_task_t *task)
1046 {
1047     nxt_mp_t                *mp, *tmp;
1048     nxt_router_conf_t       *rtcf;
1049     nxt_router_temp_conf_t  *tmcf;
1050 
1051     mp = nxt_mp_create(1024, 128, 256, 32);
1052     if (nxt_slow_path(mp == NULL)) {
1053         return NULL;
1054     }
1055 
1056     rtcf = nxt_mp_zget(mp, sizeof(nxt_router_conf_t));
1057     if (nxt_slow_path(rtcf == NULL)) {
1058         goto fail;
1059     }
1060 
1061     rtcf->mem_pool = mp;
1062 
1063     rtcf->tstr_state = nxt_tstr_state_new(mp, 0);
1064     if (nxt_slow_path(rtcf->tstr_state == NULL)) {
1065         goto fail;
1066     }
1067 
1068 #if (NXT_HAVE_NJS)
1069     nxt_http_register_js_proto(rtcf->tstr_state->jcf);
1070 #endif
1071 
1072     tmp = nxt_mp_create(1024, 128, 256, 32);
1073     if (nxt_slow_path(tmp == NULL)) {
1074         goto fail;
1075     }
1076 
1077     tmcf = nxt_mp_zget(tmp, sizeof(nxt_router_temp_conf_t));
1078     if (nxt_slow_path(tmcf == NULL)) {
1079         goto temp_fail;
1080     }
1081 
1082     tmcf->mem_pool = tmp;
1083     tmcf->router_conf = rtcf;
1084     tmcf->count = 1;
1085     tmcf->engine = task->thread->engine;
1086 
1087     tmcf->engines = nxt_array_create(tmcf->mem_pool, 4,
1088                                      sizeof(nxt_router_engine_conf_t));
1089     if (nxt_slow_path(tmcf->engines == NULL)) {
1090         goto temp_fail;
1091     }
1092 
1093     nxt_queue_init(&creating_sockets);
1094     nxt_queue_init(&pending_sockets);
1095     nxt_queue_init(&updating_sockets);
1096     nxt_queue_init(&keeping_sockets);
1097     nxt_queue_init(&deleting_sockets);
1098 
1099 #if (NXT_TLS)
1100     nxt_queue_init(&tmcf->tls);
1101 #endif
1102 
1103     nxt_queue_init(&tmcf->apps);
1104     nxt_queue_init(&tmcf->previous);
1105 
1106     return tmcf;
1107 
1108 temp_fail:
1109 
1110     nxt_mp_destroy(tmp);
1111 
1112 fail:
1113 
1114     nxt_mp_destroy(mp);
1115 
1116     return NULL;
1117 }
1118 
1119 
1120 nxt_inline nxt_bool_t
nxt_router_app_can_start(nxt_app_t * app)1121 nxt_router_app_can_start(nxt_app_t *app)
1122 {
1123     return app->processes + app->pending_processes < app->max_processes
1124             && app->pending_processes < app->max_pending_processes;
1125 }
1126 
1127 
1128 nxt_inline nxt_bool_t
nxt_router_app_need_start(nxt_app_t * app)1129 nxt_router_app_need_start(nxt_app_t *app)
1130 {
1131     return (app->active_requests
1132               > app->port_hash_count + app->pending_processes)
1133            || (app->spare_processes
1134                 > app->idle_processes + app->pending_processes);
1135 }
1136 
1137 
1138 void
nxt_router_conf_apply(nxt_task_t * task,void * obj,void * data)1139 nxt_router_conf_apply(nxt_task_t *task, void *obj, void *data)
1140 {
1141     nxt_int_t                    ret;
1142     nxt_app_t                    *app;
1143     nxt_router_t                 *router;
1144     nxt_runtime_t                *rt;
1145     nxt_queue_link_t             *qlk;
1146     nxt_socket_conf_t            *skcf;
1147     nxt_router_conf_t            *rtcf;
1148     nxt_router_temp_conf_t       *tmcf;
1149     const nxt_event_interface_t  *interface;
1150 #if (NXT_TLS)
1151     nxt_router_tlssock_t         *tls;
1152 #endif
1153 
1154     tmcf = obj;
1155 
1156     qlk = nxt_queue_first(&pending_sockets);
1157 
1158     if (qlk != nxt_queue_tail(&pending_sockets)) {
1159         nxt_queue_remove(qlk);
1160         nxt_queue_insert_tail(&creating_sockets, qlk);
1161 
1162         skcf = nxt_queue_link_data(qlk, nxt_socket_conf_t, link);
1163 
1164         nxt_router_listen_socket_rpc_create(task, tmcf, skcf);
1165 
1166         return;
1167     }
1168 
1169 #if (NXT_TLS)
1170     qlk = nxt_queue_last(&tmcf->tls);
1171 
1172     if (qlk != nxt_queue_head(&tmcf->tls)) {
1173         nxt_queue_remove(qlk);
1174 
1175         tls = nxt_queue_link_data(qlk, nxt_router_tlssock_t, link);
1176 
1177         nxt_cert_store_get(task, &tls->name, tmcf->mem_pool,
1178                            nxt_router_tls_rpc_handler, tls);
1179         return;
1180     }
1181 #endif
1182 
1183     nxt_queue_each(app, &tmcf->apps, nxt_app_t, link) {
1184 
1185         if (nxt_router_app_need_start(app)) {
1186             nxt_router_app_rpc_create(task, tmcf, app);
1187             return;
1188         }
1189 
1190     } nxt_queue_loop;
1191 
1192     rtcf = tmcf->router_conf;
1193 
1194     if (rtcf->access_log != NULL && rtcf->access_log->fd == -1) {
1195         nxt_router_access_log_open(task, tmcf);
1196         return;
1197     }
1198 
1199     rt = task->thread->runtime;
1200 
1201     interface = nxt_service_get(rt->services, "engine", NULL);
1202 
1203     router = rtcf->router;
1204 
1205     ret = nxt_router_engines_create(task, router, tmcf, interface);
1206     if (nxt_slow_path(ret != NXT_OK)) {
1207         goto fail;
1208     }
1209 
1210     ret = nxt_router_threads_create(task, rt, tmcf);
1211     if (nxt_slow_path(ret != NXT_OK)) {
1212         goto fail;
1213     }
1214 
1215     nxt_router_apps_sort(task, router, tmcf);
1216 
1217     nxt_router_apps_hash_use(task, rtcf, 1);
1218 
1219     nxt_router_engines_post(router, tmcf);
1220 
1221     nxt_queue_add(&router->sockets, &updating_sockets);
1222     nxt_queue_add(&router->sockets, &creating_sockets);
1223 
1224     if (router->access_log != rtcf->access_log) {
1225         nxt_router_access_log_use(&router->lock, rtcf->access_log);
1226 
1227         nxt_router_access_log_release(task, &router->lock, router->access_log);
1228 
1229         router->access_log = rtcf->access_log;
1230     }
1231 
1232     nxt_router_conf_ready(task, tmcf);
1233 
1234     return;
1235 
1236 fail:
1237 
1238     nxt_router_conf_error(task, tmcf);
1239 
1240     return;
1241 }
1242 
1243 
1244 static void
nxt_router_conf_wait(nxt_task_t * task,void * obj,void * data)1245 nxt_router_conf_wait(nxt_task_t *task, void *obj, void *data)
1246 {
1247     nxt_joint_job_t  *job;
1248 
1249     job = obj;
1250 
1251     nxt_router_conf_ready(task, job->tmcf);
1252 }
1253 
1254 
1255 static void
nxt_router_conf_ready(nxt_task_t * task,nxt_router_temp_conf_t * tmcf)1256 nxt_router_conf_ready(nxt_task_t *task, nxt_router_temp_conf_t *tmcf)
1257 {
1258     uint32_t               count;
1259     nxt_router_conf_t      *rtcf;
1260     nxt_thread_spinlock_t  *lock;
1261 
1262     nxt_debug(task, "temp conf %p count: %D", tmcf, tmcf->count);
1263 
1264     if (--tmcf->count > 0) {
1265         return;
1266     }
1267 
1268     nxt_router_conf_send(task, tmcf, NXT_PORT_MSG_RPC_READY_LAST);
1269 
1270     rtcf = tmcf->router_conf;
1271 
1272     lock = &rtcf->router->lock;
1273 
1274     nxt_thread_spin_lock(lock);
1275 
1276     count = rtcf->count;
1277 
1278     nxt_thread_spin_unlock(lock);
1279 
1280     nxt_debug(task, "rtcf %p: %D", rtcf, count);
1281 
1282     if (count == 0) {
1283         nxt_router_apps_hash_use(task, rtcf, -1);
1284 
1285         nxt_router_access_log_release(task, lock, rtcf->access_log);
1286 
1287         nxt_mp_destroy(rtcf->mem_pool);
1288     }
1289 
1290     nxt_mp_release(tmcf->mem_pool);
1291 }
1292 
1293 
1294 void
nxt_router_conf_error(nxt_task_t * task,nxt_router_temp_conf_t * tmcf)1295 nxt_router_conf_error(nxt_task_t *task, nxt_router_temp_conf_t *tmcf)
1296 {
1297     nxt_app_t          *app;
1298     nxt_socket_t       s;
1299     nxt_router_t       *router;
1300     nxt_queue_link_t   *qlk;
1301     nxt_socket_conf_t  *skcf;
1302     nxt_router_conf_t  *rtcf;
1303 
1304     nxt_alert(task, "failed to apply new conf");
1305 
1306     for (qlk = nxt_queue_first(&creating_sockets);
1307          qlk != nxt_queue_tail(&creating_sockets);
1308          qlk = nxt_queue_next(qlk))
1309     {
1310         skcf = nxt_queue_link_data(qlk, nxt_socket_conf_t, link);
1311         s = skcf->listen->socket;
1312 
1313         if (s != -1) {
1314             nxt_socket_close(task, s);
1315         }
1316 
1317         nxt_free(skcf->listen);
1318     }
1319 
1320     rtcf = tmcf->router_conf;
1321 
1322     nxt_queue_each(app, &tmcf->apps, nxt_app_t, link) {
1323 
1324         nxt_router_app_unlink(task, app);
1325 
1326     } nxt_queue_loop;
1327 
1328     router = rtcf->router;
1329 
1330     nxt_queue_add(&router->sockets, &keeping_sockets);
1331     nxt_queue_add(&router->sockets, &deleting_sockets);
1332 
1333     nxt_queue_add(&router->apps, &tmcf->previous);
1334 
1335     // TODO: new engines and threads
1336 
1337     nxt_router_access_log_release(task, &router->lock, rtcf->access_log);
1338 
1339     nxt_mp_destroy(rtcf->mem_pool);
1340 
1341     nxt_router_conf_send(task, tmcf, NXT_PORT_MSG_RPC_ERROR);
1342 
1343     nxt_mp_release(tmcf->mem_pool);
1344 }
1345 
1346 
1347 static void
nxt_router_conf_send(nxt_task_t * task,nxt_router_temp_conf_t * tmcf,nxt_port_msg_type_t type)1348 nxt_router_conf_send(nxt_task_t *task, nxt_router_temp_conf_t *tmcf,
1349     nxt_port_msg_type_t type)
1350 {
1351     nxt_port_socket_write(task, tmcf->port, type, -1, tmcf->stream, 0, NULL);
1352 
1353     nxt_port_use(task, tmcf->port, -1);
1354 
1355     tmcf->port = NULL;
1356 }
1357 
1358 
1359 static nxt_conf_map_t  nxt_router_conf[] = {
1360     {
1361         nxt_string("listeners_threads"),
1362         NXT_CONF_MAP_INT32,
1363         offsetof(nxt_router_conf_t, threads),
1364     },
1365 };
1366 
1367 
1368 static nxt_conf_map_t  nxt_router_app_conf[] = {
1369     {
1370         nxt_string("type"),
1371         NXT_CONF_MAP_STR,
1372         offsetof(nxt_router_app_conf_t, type),
1373     },
1374 
1375     {
1376         nxt_string("limits"),
1377         NXT_CONF_MAP_PTR,
1378         offsetof(nxt_router_app_conf_t, limits_value),
1379     },
1380 
1381     {
1382         nxt_string("processes"),
1383         NXT_CONF_MAP_INT32,
1384         offsetof(nxt_router_app_conf_t, processes),
1385     },
1386 
1387     {
1388         nxt_string("processes"),
1389         NXT_CONF_MAP_PTR,
1390         offsetof(nxt_router_app_conf_t, processes_value),
1391     },
1392 
1393     {
1394         nxt_string("targets"),
1395         NXT_CONF_MAP_PTR,
1396         offsetof(nxt_router_app_conf_t, targets_value),
1397     },
1398 };
1399 
1400 
1401 static nxt_conf_map_t  nxt_router_app_limits_conf[] = {
1402     {
1403         nxt_string("timeout"),
1404         NXT_CONF_MAP_MSEC,
1405         offsetof(nxt_router_app_conf_t, timeout),
1406     },
1407 };
1408 
1409 
1410 static nxt_conf_map_t  nxt_router_app_processes_conf[] = {
1411     {
1412         nxt_string("spare"),
1413         NXT_CONF_MAP_INT32,
1414         offsetof(nxt_router_app_conf_t, spare_processes),
1415     },
1416 
1417     {
1418         nxt_string("max"),
1419         NXT_CONF_MAP_INT32,
1420         offsetof(nxt_router_app_conf_t, max_processes),
1421     },
1422 
1423     {
1424         nxt_string("idle_timeout"),
1425         NXT_CONF_MAP_MSEC,
1426         offsetof(nxt_router_app_conf_t, idle_timeout),
1427     },
1428 };
1429 
1430 
1431 static nxt_conf_map_t  nxt_router_listener_conf[] = {
1432     {
1433         nxt_string("pass"),
1434         NXT_CONF_MAP_STR_COPY,
1435         offsetof(nxt_router_listener_conf_t, pass),
1436     },
1437 
1438     {
1439         nxt_string("application"),
1440         NXT_CONF_MAP_STR_COPY,
1441         offsetof(nxt_router_listener_conf_t, application),
1442     },
1443 };
1444 
1445 
1446 static nxt_conf_map_t  nxt_router_http_conf[] = {
1447     {
1448         nxt_string("header_buffer_size"),
1449         NXT_CONF_MAP_SIZE,
1450         offsetof(nxt_socket_conf_t, header_buffer_size),
1451     },
1452 
1453     {
1454         nxt_string("large_header_buffer_size"),
1455         NXT_CONF_MAP_SIZE,
1456         offsetof(nxt_socket_conf_t, large_header_buffer_size),
1457     },
1458 
1459     {
1460         nxt_string("large_header_buffers"),
1461         NXT_CONF_MAP_SIZE,
1462         offsetof(nxt_socket_conf_t, large_header_buffers),
1463     },
1464 
1465     {
1466         nxt_string("body_buffer_size"),
1467         NXT_CONF_MAP_SIZE,
1468         offsetof(nxt_socket_conf_t, body_buffer_size),
1469     },
1470 
1471     {
1472         nxt_string("max_body_size"),
1473         NXT_CONF_MAP_SIZE,
1474         offsetof(nxt_socket_conf_t, max_body_size),
1475     },
1476 
1477     {
1478         nxt_string("idle_timeout"),
1479         NXT_CONF_MAP_MSEC,
1480         offsetof(nxt_socket_conf_t, idle_timeout),
1481     },
1482 
1483     {
1484         nxt_string("header_read_timeout"),
1485         NXT_CONF_MAP_MSEC,
1486         offsetof(nxt_socket_conf_t, header_read_timeout),
1487     },
1488 
1489     {
1490         nxt_string("body_read_timeout"),
1491         NXT_CONF_MAP_MSEC,
1492         offsetof(nxt_socket_conf_t, body_read_timeout),
1493     },
1494 
1495     {
1496         nxt_string("send_timeout"),
1497         NXT_CONF_MAP_MSEC,
1498         offsetof(nxt_socket_conf_t, send_timeout),
1499     },
1500 
1501     {
1502         nxt_string("body_temp_path"),
1503         NXT_CONF_MAP_STR,
1504         offsetof(nxt_socket_conf_t, body_temp_path),
1505     },
1506 
1507     {
1508         nxt_string("discard_unsafe_fields"),
1509         NXT_CONF_MAP_INT8,
1510         offsetof(nxt_socket_conf_t, discard_unsafe_fields),
1511     },
1512 };
1513 
1514 
1515 static nxt_conf_map_t  nxt_router_websocket_conf[] = {
1516     {
1517         nxt_string("max_frame_size"),
1518         NXT_CONF_MAP_SIZE,
1519         offsetof(nxt_websocket_conf_t, max_frame_size),
1520     },
1521 
1522     {
1523         nxt_string("read_timeout"),
1524         NXT_CONF_MAP_MSEC,
1525         offsetof(nxt_websocket_conf_t, read_timeout),
1526     },
1527 
1528     {
1529         nxt_string("keepalive_interval"),
1530         NXT_CONF_MAP_MSEC,
1531         offsetof(nxt_websocket_conf_t, keepalive_interval),
1532     },
1533 
1534 };
1535 
1536 
1537 static nxt_int_t
nxt_router_conf_create(nxt_task_t * task,nxt_router_temp_conf_t * tmcf,u_char * start,u_char * end)1538 nxt_router_conf_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf,
1539     u_char *start, u_char *end)
1540 {
1541     u_char                      *p;
1542     size_t                      size;
1543     nxt_mp_t                    *mp, *app_mp;
1544     uint32_t                    next, next_target;
1545     nxt_int_t                   ret;
1546     nxt_str_t                   name, target;
1547     nxt_app_t                   *app, *prev;
1548     nxt_str_t                   *t, *s, *targets;
1549     nxt_uint_t                  n, i;
1550     nxt_port_t                  *port;
1551     nxt_router_t                *router;
1552     nxt_app_joint_t             *app_joint;
1553 #if (NXT_TLS)
1554     nxt_tls_init_t              *tls_init;
1555     nxt_conf_value_t            *certificate;
1556 #endif
1557     nxt_conf_value_t            *root, *conf, *http, *value, *websocket;
1558     nxt_conf_value_t            *applications, *application;
1559     nxt_conf_value_t            *listeners, *listener;
1560     nxt_socket_conf_t           *skcf;
1561     nxt_router_conf_t           *rtcf;
1562     nxt_http_routes_t           *routes;
1563     nxt_event_engine_t          *engine;
1564     nxt_app_lang_module_t       *lang;
1565     nxt_router_app_conf_t       apcf;
1566     nxt_router_listener_conf_t  lscf;
1567 
1568     static nxt_str_t  http_path = nxt_string("/settings/http");
1569     static nxt_str_t  applications_path = nxt_string("/applications");
1570     static nxt_str_t  listeners_path = nxt_string("/listeners");
1571     static nxt_str_t  routes_path = nxt_string("/routes");
1572     static nxt_str_t  access_log_path = nxt_string("/access_log");
1573 #if (NXT_TLS)
1574     static nxt_str_t  certificate_path = nxt_string("/tls/certificate");
1575     static nxt_str_t  conf_commands_path = nxt_string("/tls/conf_commands");
1576     static nxt_str_t  conf_cache_path = nxt_string("/tls/session/cache_size");
1577     static nxt_str_t  conf_timeout_path = nxt_string("/tls/session/timeout");
1578     static nxt_str_t  conf_tickets = nxt_string("/tls/session/tickets");
1579 #endif
1580     static nxt_str_t  static_path = nxt_string("/settings/http/static");
1581     static nxt_str_t  websocket_path = nxt_string("/settings/http/websocket");
1582     static nxt_str_t  forwarded_path = nxt_string("/forwarded");
1583     static nxt_str_t  client_ip_path = nxt_string("/client_ip");
1584 
1585     root = nxt_conf_json_parse(tmcf->mem_pool, start, end, NULL);
1586     if (root == NULL) {
1587         nxt_alert(task, "configuration parsing error");
1588         return NXT_ERROR;
1589     }
1590 
1591     rtcf = tmcf->router_conf;
1592     mp = rtcf->mem_pool;
1593 
1594     ret = nxt_conf_map_object(mp, root, nxt_router_conf,
1595                               nxt_nitems(nxt_router_conf), rtcf);
1596     if (ret != NXT_OK) {
1597         nxt_alert(task, "root map error");
1598         return NXT_ERROR;
1599     }
1600 
1601     if (rtcf->threads == 0) {
1602         rtcf->threads = nxt_ncpu;
1603     }
1604 
1605     conf = nxt_conf_get_path(root, &static_path);
1606 
1607     ret = nxt_router_conf_process_static(task, rtcf, conf);
1608     if (nxt_slow_path(ret != NXT_OK)) {
1609         return NXT_ERROR;
1610     }
1611 
1612     router = rtcf->router;
1613 
1614     applications = nxt_conf_get_path(root, &applications_path);
1615 
1616     if (applications != NULL) {
1617         next = 0;
1618 
1619         for ( ;; ) {
1620             application = nxt_conf_next_object_member(applications,
1621                                                       &name, &next);
1622             if (application == NULL) {
1623                 break;
1624             }
1625 
1626             nxt_debug(task, "application \"%V\"", &name);
1627 
1628             size = nxt_conf_json_length(application, NULL);
1629 
1630             app_mp = nxt_mp_create(4096, 128, 1024, 64);
1631             if (nxt_slow_path(app_mp == NULL)) {
1632                 goto fail;
1633             }
1634 
1635             app = nxt_mp_get(app_mp, sizeof(nxt_app_t) + name.length + size);
1636             if (app == NULL) {
1637                 goto app_fail;
1638             }
1639 
1640             nxt_memzero(app, sizeof(nxt_app_t));
1641 
1642             app->mem_pool = app_mp;
1643 
1644             app->name.start = nxt_pointer_to(app, sizeof(nxt_app_t));
1645             app->conf.start = nxt_pointer_to(app, sizeof(nxt_app_t)
1646                                                   + name.length);
1647 
1648             p = nxt_conf_json_print(app->conf.start, application, NULL);
1649             app->conf.length = p - app->conf.start;
1650 
1651             nxt_assert(app->conf.length <= size);
1652 
1653             nxt_debug(task, "application conf \"%V\"", &app->conf);
1654 
1655             prev = nxt_router_app_find(&router->apps, &name);
1656 
1657             if (prev != NULL && nxt_strstr_eq(&app->conf, &prev->conf)) {
1658                 nxt_mp_destroy(app_mp);
1659 
1660                 nxt_queue_remove(&prev->link);
1661                 nxt_queue_insert_tail(&tmcf->previous, &prev->link);
1662 
1663                 ret = nxt_router_apps_hash_add(rtcf, prev);
1664                 if (nxt_slow_path(ret != NXT_OK)) {
1665                     goto fail;
1666                 }
1667 
1668                 continue;
1669             }
1670 
1671             apcf.processes = 1;
1672             apcf.max_processes = 1;
1673             apcf.spare_processes = 0;
1674             apcf.timeout = 0;
1675             apcf.idle_timeout = 15000;
1676             apcf.limits_value = NULL;
1677             apcf.processes_value = NULL;
1678             apcf.targets_value = NULL;
1679 
1680             app_joint = nxt_malloc(sizeof(nxt_app_joint_t));
1681             if (nxt_slow_path(app_joint == NULL)) {
1682                 goto app_fail;
1683             }
1684 
1685             nxt_memzero(app_joint, sizeof(nxt_app_joint_t));
1686 
1687             ret = nxt_conf_map_object(mp, application, nxt_router_app_conf,
1688                                       nxt_nitems(nxt_router_app_conf), &apcf);
1689             if (ret != NXT_OK) {
1690                 nxt_alert(task, "application map error");
1691                 goto app_fail;
1692             }
1693 
1694             if (apcf.limits_value != NULL) {
1695 
1696                 if (nxt_conf_type(apcf.limits_value) != NXT_CONF_OBJECT) {
1697                     nxt_alert(task, "application limits is not object");
1698                     goto app_fail;
1699                 }
1700 
1701                 ret = nxt_conf_map_object(mp, apcf.limits_value,
1702                                         nxt_router_app_limits_conf,
1703                                         nxt_nitems(nxt_router_app_limits_conf),
1704                                         &apcf);
1705                 if (ret != NXT_OK) {
1706                     nxt_alert(task, "application limits map error");
1707                     goto app_fail;
1708                 }
1709             }
1710 
1711             if (apcf.processes_value != NULL
1712                 && nxt_conf_type(apcf.processes_value) == NXT_CONF_OBJECT)
1713             {
1714                 ret = nxt_conf_map_object(mp, apcf.processes_value,
1715                                      nxt_router_app_processes_conf,
1716                                      nxt_nitems(nxt_router_app_processes_conf),
1717                                      &apcf);
1718                 if (ret != NXT_OK) {
1719                     nxt_alert(task, "application processes map error");
1720                     goto app_fail;
1721                 }
1722 
1723             } else {
1724                 apcf.max_processes = apcf.processes;
1725                 apcf.spare_processes = apcf.processes;
1726             }
1727 
1728             if (apcf.targets_value != NULL) {
1729                 n = nxt_conf_object_members_count(apcf.targets_value);
1730 
1731                 targets = nxt_mp_get(app_mp, sizeof(nxt_str_t) * n);
1732                 if (nxt_slow_path(targets == NULL)) {
1733                     goto app_fail;
1734                 }
1735 
1736                 next_target = 0;
1737 
1738                 for (i = 0; i < n; i++) {
1739                     (void) nxt_conf_next_object_member(apcf.targets_value,
1740                                                        &target, &next_target);
1741 
1742                     s = nxt_str_dup(app_mp, &targets[i], &target);
1743                     if (nxt_slow_path(s == NULL)) {
1744                         goto app_fail;
1745                     }
1746                 }
1747 
1748             } else {
1749                 targets = NULL;
1750             }
1751 
1752             nxt_debug(task, "application type: %V", &apcf.type);
1753             nxt_debug(task, "application processes: %D", apcf.processes);
1754             nxt_debug(task, "application request timeout: %M", apcf.timeout);
1755 
1756             lang = nxt_app_lang_module(task->thread->runtime, &apcf.type);
1757 
1758             if (lang == NULL) {
1759                 nxt_alert(task, "unknown application type: \"%V\"", &apcf.type);
1760                 goto app_fail;
1761             }
1762 
1763             nxt_debug(task, "application language module: \"%s\"", lang->file);
1764 
1765             ret = nxt_thread_mutex_create(&app->mutex);
1766             if (ret != NXT_OK) {
1767                 goto app_fail;
1768             }
1769 
1770             nxt_queue_init(&app->ports);
1771             nxt_queue_init(&app->spare_ports);
1772             nxt_queue_init(&app->idle_ports);
1773             nxt_queue_init(&app->ack_waiting_req);
1774 
1775             app->name.length = name.length;
1776             nxt_memcpy(app->name.start, name.start, name.length);
1777 
1778             app->type = lang->type;
1779             app->max_processes = apcf.max_processes;
1780             app->spare_processes = apcf.spare_processes;
1781             app->max_pending_processes = apcf.spare_processes
1782                                          ? apcf.spare_processes : 1;
1783             app->timeout = apcf.timeout;
1784             app->idle_timeout = apcf.idle_timeout;
1785 
1786             app->targets = targets;
1787 
1788             engine = task->thread->engine;
1789 
1790             app->engine = engine;
1791 
1792             app->adjust_idle_work.handler = nxt_router_adjust_idle_timer;
1793             app->adjust_idle_work.task = &engine->task;
1794             app->adjust_idle_work.obj = app;
1795 
1796             nxt_queue_insert_tail(&tmcf->apps, &app->link);
1797 
1798             ret = nxt_router_apps_hash_add(rtcf, app);
1799             if (nxt_slow_path(ret != NXT_OK)) {
1800                 goto app_fail;
1801             }
1802 
1803             nxt_router_app_use(task, app, 1);
1804 
1805             app->joint = app_joint;
1806 
1807             app_joint->use_count = 1;
1808             app_joint->app = app;
1809 
1810             app_joint->idle_timer.bias = NXT_TIMER_DEFAULT_BIAS;
1811             app_joint->idle_timer.work_queue = &engine->fast_work_queue;
1812             app_joint->idle_timer.handler = nxt_router_app_idle_timeout;
1813             app_joint->idle_timer.task = &engine->task;
1814             app_joint->idle_timer.log = app_joint->idle_timer.task->log;
1815 
1816             app_joint->free_app_work.handler = nxt_router_free_app;
1817             app_joint->free_app_work.task = &engine->task;
1818             app_joint->free_app_work.obj = app_joint;
1819 
1820             port = nxt_port_new(task, NXT_SHARED_PORT_ID, nxt_pid,
1821                                 NXT_PROCESS_APP);
1822             if (nxt_slow_path(port == NULL)) {
1823                 return NXT_ERROR;
1824             }
1825 
1826             ret = nxt_port_socket_init(task, port, 0);
1827             if (nxt_slow_path(ret != NXT_OK)) {
1828                 nxt_port_use(task, port, -1);
1829                 return NXT_ERROR;
1830             }
1831 
1832             ret = nxt_router_app_queue_init(task, port);
1833             if (nxt_slow_path(ret != NXT_OK)) {
1834                 nxt_port_write_close(port);
1835                 nxt_port_read_close(port);
1836                 nxt_port_use(task, port, -1);
1837                 return NXT_ERROR;
1838             }
1839 
1840             nxt_port_write_enable(task, port);
1841             port->app = app;
1842 
1843             app->shared_port = port;
1844 
1845             nxt_thread_mutex_create(&app->outgoing.mutex);
1846         }
1847     }
1848 
1849     conf = nxt_conf_get_path(root, &routes_path);
1850     if (nxt_fast_path(conf != NULL)) {
1851         routes = nxt_http_routes_create(task, tmcf, conf);
1852         if (nxt_slow_path(routes == NULL)) {
1853             return NXT_ERROR;
1854         }
1855 
1856         rtcf->routes = routes;
1857     }
1858 
1859     ret = nxt_upstreams_create(task, tmcf, root);
1860     if (nxt_slow_path(ret != NXT_OK)) {
1861         return ret;
1862     }
1863 
1864     http = nxt_conf_get_path(root, &http_path);
1865 #if 0
1866     if (http == NULL) {
1867         nxt_alert(task, "no \"http\" block");
1868         return NXT_ERROR;
1869     }
1870 #endif
1871 
1872     websocket = nxt_conf_get_path(root, &websocket_path);
1873 
1874     listeners = nxt_conf_get_path(root, &listeners_path);
1875 
1876     if (listeners != NULL) {
1877         next = 0;
1878 
1879         for ( ;; ) {
1880             listener = nxt_conf_next_object_member(listeners, &name, &next);
1881             if (listener == NULL) {
1882                 break;
1883             }
1884 
1885             skcf = nxt_router_socket_conf(task, tmcf, &name);
1886             if (skcf == NULL) {
1887                 goto fail;
1888             }
1889 
1890             nxt_memzero(&lscf, sizeof(lscf));
1891 
1892             ret = nxt_conf_map_object(mp, listener, nxt_router_listener_conf,
1893                                       nxt_nitems(nxt_router_listener_conf),
1894                                       &lscf);
1895             if (ret != NXT_OK) {
1896                 nxt_alert(task, "listener map error");
1897                 goto fail;
1898             }
1899 
1900             nxt_debug(task, "application: %V", &lscf.application);
1901 
1902             // STUB, default values if http block is not defined.
1903             skcf->header_buffer_size = 2048;
1904             skcf->large_header_buffer_size = 8192;
1905             skcf->large_header_buffers = 4;
1906             skcf->discard_unsafe_fields = 1;
1907             skcf->body_buffer_size = 16 * 1024;
1908             skcf->max_body_size = 8 * 1024 * 1024;
1909             skcf->proxy_header_buffer_size = 64 * 1024;
1910             skcf->proxy_buffer_size = 4096;
1911             skcf->proxy_buffers = 256;
1912             skcf->idle_timeout = 180 * 1000;
1913             skcf->header_read_timeout = 30 * 1000;
1914             skcf->body_read_timeout = 30 * 1000;
1915             skcf->send_timeout = 30 * 1000;
1916             skcf->proxy_timeout = 60 * 1000;
1917             skcf->proxy_send_timeout = 30 * 1000;
1918             skcf->proxy_read_timeout = 30 * 1000;
1919 
1920             skcf->websocket_conf.max_frame_size = 1024 * 1024;
1921             skcf->websocket_conf.read_timeout = 60 * 1000;
1922             skcf->websocket_conf.keepalive_interval = 30 * 1000;
1923 
1924             nxt_str_null(&skcf->body_temp_path);
1925 
1926             if (http != NULL) {
1927                 ret = nxt_conf_map_object(mp, http, nxt_router_http_conf,
1928                                           nxt_nitems(nxt_router_http_conf),
1929                                           skcf);
1930                 if (ret != NXT_OK) {
1931                     nxt_alert(task, "http map error");
1932                     goto fail;
1933                 }
1934             }
1935 
1936             if (websocket != NULL) {
1937                 ret = nxt_conf_map_object(mp, websocket,
1938                                           nxt_router_websocket_conf,
1939                                           nxt_nitems(nxt_router_websocket_conf),
1940                                           &skcf->websocket_conf);
1941                 if (ret != NXT_OK) {
1942                     nxt_alert(task, "websocket map error");
1943                     goto fail;
1944                 }
1945             }
1946 
1947             t = &skcf->body_temp_path;
1948 
1949             if (t->length == 0) {
1950                 t->start = (u_char *) task->thread->runtime->tmp;
1951                 t->length = nxt_strlen(t->start);
1952             }
1953 
1954             conf = nxt_conf_get_path(listener, &forwarded_path);
1955 
1956             if (conf != NULL) {
1957                 skcf->forwarded = nxt_router_conf_forward(task, mp, conf);
1958                 if (nxt_slow_path(skcf->forwarded == NULL)) {
1959                     return NXT_ERROR;
1960                 }
1961             }
1962 
1963             conf = nxt_conf_get_path(listener, &client_ip_path);
1964 
1965             if (conf != NULL) {
1966                 skcf->client_ip = nxt_router_conf_forward(task, mp, conf);
1967                 if (nxt_slow_path(skcf->client_ip == NULL)) {
1968                     return NXT_ERROR;
1969                 }
1970             }
1971 
1972 #if (NXT_TLS)
1973             certificate = nxt_conf_get_path(listener, &certificate_path);
1974 
1975             if (certificate != NULL) {
1976                 tls_init = nxt_mp_get(tmcf->mem_pool, sizeof(nxt_tls_init_t));
1977                 if (nxt_slow_path(tls_init == NULL)) {
1978                     return NXT_ERROR;
1979                 }
1980 
1981                 tls_init->cache_size = 0;
1982                 tls_init->timeout = 300;
1983 
1984                 value = nxt_conf_get_path(listener, &conf_cache_path);
1985                 if (value != NULL) {
1986                     tls_init->cache_size = nxt_conf_get_number(value);
1987                 }
1988 
1989                 value = nxt_conf_get_path(listener, &conf_timeout_path);
1990                 if (value != NULL) {
1991                     tls_init->timeout = nxt_conf_get_number(value);
1992                 }
1993 
1994                 tls_init->conf_cmds = nxt_conf_get_path(listener,
1995                                                         &conf_commands_path);
1996 
1997                 tls_init->tickets_conf = nxt_conf_get_path(listener,
1998                                                            &conf_tickets);
1999 
2000                 n = nxt_conf_array_elements_count_or_1(certificate);
2001 
2002                 for (i = 0; i < n; i++) {
2003                     value = nxt_conf_get_array_element_or_itself(certificate,
2004                                                                  i);
2005                     nxt_assert(value != NULL);
2006 
2007                     ret = nxt_router_conf_tls_insert(tmcf, value, skcf,
2008                                                      tls_init, i == 0);
2009                     if (nxt_slow_path(ret != NXT_OK)) {
2010                         goto fail;
2011                     }
2012                 }
2013             }
2014 #endif
2015 
2016             skcf->listen->handler = nxt_http_conn_init;
2017             skcf->router_conf = rtcf;
2018             skcf->router_conf->count++;
2019 
2020             if (lscf.pass.length != 0) {
2021                 skcf->action = nxt_http_action_create(task, tmcf, &lscf.pass);
2022 
2023             /* COMPATIBILITY: listener application. */
2024             } else if (lscf.application.length > 0) {
2025                 skcf->action = nxt_http_pass_application(task, rtcf,
2026                                                          &lscf.application);
2027             }
2028 
2029             if (nxt_slow_path(skcf->action == NULL)) {
2030                 goto fail;
2031             }
2032         }
2033     }
2034 
2035     ret = nxt_http_routes_resolve(task, tmcf);
2036     if (nxt_slow_path(ret != NXT_OK)) {
2037         goto fail;
2038     }
2039 
2040     value = nxt_conf_get_path(root, &access_log_path);
2041 
2042     if (value != NULL) {
2043         ret = nxt_router_access_log_create(task, rtcf, value);
2044         if (nxt_slow_path(ret != NXT_OK)) {
2045             goto fail;
2046         }
2047     }
2048 
2049     ret = nxt_tstr_state_done(rtcf->tstr_state, NULL);
2050     if (nxt_slow_path(ret != NXT_OK)) {
2051         goto fail;
2052     }
2053 
2054     nxt_queue_add(&deleting_sockets, &router->sockets);
2055     nxt_queue_init(&router->sockets);
2056 
2057     return NXT_OK;
2058 
2059 app_fail:
2060 
2061     nxt_mp_destroy(app_mp);
2062 
2063 fail:
2064 
2065     nxt_queue_each(app, &tmcf->apps, nxt_app_t, link) {
2066 
2067         nxt_queue_remove(&app->link);
2068         nxt_thread_mutex_destroy(&app->mutex);
2069         nxt_mp_destroy(app->mem_pool);
2070 
2071     } nxt_queue_loop;
2072 
2073     return NXT_ERROR;
2074 }
2075 
2076 
2077 #if (NXT_TLS)
2078 
2079 static nxt_int_t
nxt_router_conf_tls_insert(nxt_router_temp_conf_t * tmcf,nxt_conf_value_t * value,nxt_socket_conf_t * skcf,nxt_tls_init_t * tls_init,nxt_bool_t last)2080 nxt_router_conf_tls_insert(nxt_router_temp_conf_t *tmcf,
2081     nxt_conf_value_t *value, nxt_socket_conf_t *skcf,
2082     nxt_tls_init_t *tls_init, nxt_bool_t last)
2083 {
2084     nxt_router_tlssock_t  *tls;
2085 
2086     tls = nxt_mp_get(tmcf->mem_pool, sizeof(nxt_router_tlssock_t));
2087     if (nxt_slow_path(tls == NULL)) {
2088         return NXT_ERROR;
2089     }
2090 
2091     tls->tls_init = tls_init;
2092     tls->socket_conf = skcf;
2093     tls->temp_conf = tmcf;
2094     tls->last = last;
2095     nxt_conf_get_string(value, &tls->name);
2096 
2097     nxt_queue_insert_tail(&tmcf->tls, &tls->link);
2098 
2099     return NXT_OK;
2100 }
2101 
2102 #endif
2103 
2104 
2105 static nxt_int_t
nxt_router_conf_process_static(nxt_task_t * task,nxt_router_conf_t * rtcf,nxt_conf_value_t * conf)2106 nxt_router_conf_process_static(nxt_task_t *task, nxt_router_conf_t *rtcf,
2107     nxt_conf_value_t *conf)
2108 {
2109     uint32_t          next, i;
2110     nxt_mp_t          *mp;
2111     nxt_str_t         *type, exten, str;
2112     nxt_int_t         ret;
2113     nxt_uint_t        exts;
2114     nxt_conf_value_t  *mtypes_conf, *ext_conf, *value;
2115 
2116     static nxt_str_t  mtypes_path = nxt_string("/mime_types");
2117 
2118     mp = rtcf->mem_pool;
2119 
2120     ret = nxt_http_static_mtypes_init(mp, &rtcf->mtypes_hash);
2121     if (nxt_slow_path(ret != NXT_OK)) {
2122         return NXT_ERROR;
2123     }
2124 
2125     if (conf == NULL) {
2126         return NXT_OK;
2127     }
2128 
2129     mtypes_conf = nxt_conf_get_path(conf, &mtypes_path);
2130 
2131     if (mtypes_conf != NULL) {
2132         next = 0;
2133 
2134         for ( ;; ) {
2135             ext_conf = nxt_conf_next_object_member(mtypes_conf, &str, &next);
2136 
2137             if (ext_conf == NULL) {
2138                 break;
2139             }
2140 
2141             type = nxt_str_dup(mp, NULL, &str);
2142             if (nxt_slow_path(type == NULL)) {
2143                 return NXT_ERROR;
2144             }
2145 
2146             if (nxt_conf_type(ext_conf) == NXT_CONF_STRING) {
2147                 nxt_conf_get_string(ext_conf, &str);
2148 
2149                 if (nxt_slow_path(nxt_str_dup(mp, &exten, &str) == NULL)) {
2150                     return NXT_ERROR;
2151                 }
2152 
2153                 ret = nxt_http_static_mtypes_hash_add(mp, &rtcf->mtypes_hash,
2154                                                       &exten, type);
2155                 if (nxt_slow_path(ret != NXT_OK)) {
2156                     return NXT_ERROR;
2157                 }
2158 
2159                 continue;
2160             }
2161 
2162             exts = nxt_conf_array_elements_count(ext_conf);
2163 
2164             for (i = 0; i < exts; i++) {
2165                 value = nxt_conf_get_array_element(ext_conf, i);
2166 
2167                 nxt_conf_get_string(value, &str);
2168 
2169                 if (nxt_slow_path(nxt_str_dup(mp, &exten, &str) == NULL)) {
2170                     return NXT_ERROR;
2171                 }
2172 
2173                 ret = nxt_http_static_mtypes_hash_add(mp, &rtcf->mtypes_hash,
2174                                                       &exten, type);
2175                 if (nxt_slow_path(ret != NXT_OK)) {
2176                     return NXT_ERROR;
2177                 }
2178             }
2179         }
2180     }
2181 
2182     return NXT_OK;
2183 }
2184 
2185 
2186 static nxt_http_forward_t *
nxt_router_conf_forward(nxt_task_t * task,nxt_mp_t * mp,nxt_conf_value_t * conf)2187 nxt_router_conf_forward(nxt_task_t *task, nxt_mp_t *mp, nxt_conf_value_t *conf)
2188 {
2189     nxt_int_t                   ret;
2190     nxt_conf_value_t            *header_conf, *client_ip_conf, *protocol_conf;
2191     nxt_conf_value_t            *source_conf, *recursive_conf;
2192     nxt_http_forward_t          *forward;
2193     nxt_http_route_addr_rule_t  *source;
2194 
2195     static nxt_str_t  header_path = nxt_string("/header");
2196     static nxt_str_t  client_ip_path = nxt_string("/client_ip");
2197     static nxt_str_t  protocol_path = nxt_string("/protocol");
2198     static nxt_str_t  source_path = nxt_string("/source");
2199     static nxt_str_t  recursive_path = nxt_string("/recursive");
2200 
2201     header_conf = nxt_conf_get_path(conf, &header_path);
2202 
2203     if (header_conf != NULL) {
2204         client_ip_conf = nxt_conf_get_path(conf, &header_path);
2205         protocol_conf = NULL;
2206 
2207     } else {
2208         client_ip_conf = nxt_conf_get_path(conf, &client_ip_path);
2209         protocol_conf = nxt_conf_get_path(conf, &protocol_path);
2210     }
2211 
2212     source_conf = nxt_conf_get_path(conf, &source_path);
2213     recursive_conf = nxt_conf_get_path(conf, &recursive_path);
2214 
2215     if (source_conf == NULL
2216         || (protocol_conf == NULL && client_ip_conf == NULL))
2217     {
2218         return NULL;
2219     }
2220 
2221     forward = nxt_mp_zget(mp, sizeof(nxt_http_forward_t));
2222     if (nxt_slow_path(forward == NULL)) {
2223         return NULL;
2224     }
2225 
2226     source = nxt_http_route_addr_rule_create(task, mp, source_conf);
2227     if (nxt_slow_path(source == NULL)) {
2228         return NULL;
2229     }
2230 
2231     forward->source = source;
2232 
2233     if (recursive_conf != NULL) {
2234         forward->recursive = nxt_conf_get_boolean(recursive_conf);
2235     }
2236 
2237     if (client_ip_conf != NULL) {
2238         ret = nxt_router_conf_forward_header(mp, client_ip_conf,
2239                                              &forward->client_ip);
2240         if (nxt_slow_path(ret != NXT_OK)) {
2241             return NULL;
2242         }
2243     }
2244 
2245     if (protocol_conf != NULL) {
2246         ret = nxt_router_conf_forward_header(mp, protocol_conf,
2247                                              &forward->protocol);
2248         if (nxt_slow_path(ret != NXT_OK)) {
2249             return NULL;
2250         }
2251     }
2252 
2253     return forward;
2254 }
2255 
2256 
2257 static nxt_int_t
nxt_router_conf_forward_header(nxt_mp_t * mp,nxt_conf_value_t * conf,nxt_http_forward_header_t * fh)2258 nxt_router_conf_forward_header(nxt_mp_t *mp, nxt_conf_value_t *conf,
2259     nxt_http_forward_header_t *fh)
2260 {
2261     char       c;
2262     size_t     i;
2263     uint32_t   hash;
2264     nxt_str_t  header;
2265 
2266     nxt_conf_get_string(conf, &header);
2267 
2268     fh->header = nxt_str_dup(mp, NULL, &header);
2269     if (nxt_slow_path(fh->header == NULL)) {
2270         return NXT_ERROR;
2271     }
2272 
2273     hash = NXT_HTTP_FIELD_HASH_INIT;
2274 
2275     for (i = 0; i < fh->header->length; i++) {
2276         c = fh->header->start[i];
2277         hash = nxt_http_field_hash_char(hash, nxt_lowcase(c));
2278     }
2279 
2280     hash = nxt_http_field_hash_end(hash) & 0xFFFF;
2281 
2282     fh->header_hash = hash;
2283 
2284     return NXT_OK;
2285 }
2286 
2287 
2288 static nxt_app_t *
nxt_router_app_find(nxt_queue_t * queue,nxt_str_t * name)2289 nxt_router_app_find(nxt_queue_t *queue, nxt_str_t *name)
2290 {
2291     nxt_app_t  *app;
2292 
2293     nxt_queue_each(app, queue, nxt_app_t, link) {
2294 
2295         if (nxt_strstr_eq(name, &app->name)) {
2296             return app;
2297         }
2298 
2299     } nxt_queue_loop;
2300 
2301     return NULL;
2302 }
2303 
2304 
2305 static nxt_int_t
nxt_router_app_queue_init(nxt_task_t * task,nxt_port_t * port)2306 nxt_router_app_queue_init(nxt_task_t *task, nxt_port_t *port)
2307 {
2308     void       *mem;
2309     nxt_int_t  fd;
2310 
2311     fd = nxt_shm_open(task, sizeof(nxt_app_queue_t));
2312     if (nxt_slow_path(fd == -1)) {
2313         return NXT_ERROR;
2314     }
2315 
2316     mem = nxt_mem_mmap(NULL, sizeof(nxt_app_queue_t),
2317                        PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
2318     if (nxt_slow_path(mem == MAP_FAILED)) {
2319         nxt_fd_close(fd);
2320 
2321         return NXT_ERROR;
2322     }
2323 
2324     nxt_app_queue_init(mem);
2325 
2326     port->queue_fd = fd;
2327     port->queue = mem;
2328 
2329     return NXT_OK;
2330 }
2331 
2332 
2333 static nxt_int_t
nxt_router_port_queue_init(nxt_task_t * task,nxt_port_t * port)2334 nxt_router_port_queue_init(nxt_task_t *task, nxt_port_t *port)
2335 {
2336     void       *mem;
2337     nxt_int_t  fd;
2338 
2339     fd = nxt_shm_open(task, sizeof(nxt_port_queue_t));
2340     if (nxt_slow_path(fd == -1)) {
2341         return NXT_ERROR;
2342     }
2343 
2344     mem = nxt_mem_mmap(NULL, sizeof(nxt_port_queue_t),
2345                        PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
2346     if (nxt_slow_path(mem == MAP_FAILED)) {
2347         nxt_fd_close(fd);
2348 
2349         return NXT_ERROR;
2350     }
2351 
2352     nxt_port_queue_init(mem);
2353 
2354     port->queue_fd = fd;
2355     port->queue = mem;
2356 
2357     return NXT_OK;
2358 }
2359 
2360 
2361 static nxt_int_t
nxt_router_port_queue_map(nxt_task_t * task,nxt_port_t * port,nxt_fd_t fd)2362 nxt_router_port_queue_map(nxt_task_t *task, nxt_port_t *port, nxt_fd_t fd)
2363 {
2364     void  *mem;
2365 
2366     nxt_assert(fd != -1);
2367 
2368     mem = nxt_mem_mmap(NULL, sizeof(nxt_port_queue_t),
2369                        PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
2370     if (nxt_slow_path(mem == MAP_FAILED)) {
2371 
2372         return NXT_ERROR;
2373     }
2374 
2375     port->queue = mem;
2376 
2377     return NXT_OK;
2378 }
2379 
2380 
2381 static const nxt_lvlhsh_proto_t  nxt_router_apps_hash_proto  nxt_aligned(64) = {
2382     NXT_LVLHSH_DEFAULT,
2383     nxt_router_apps_hash_test,
2384     nxt_mp_lvlhsh_alloc,
2385     nxt_mp_lvlhsh_free,
2386 };
2387 
2388 
2389 static nxt_int_t
nxt_router_apps_hash_test(nxt_lvlhsh_query_t * lhq,void * data)2390 nxt_router_apps_hash_test(nxt_lvlhsh_query_t *lhq, void *data)
2391 {
2392     nxt_app_t  *app;
2393 
2394     app = data;
2395 
2396     return nxt_strstr_eq(&lhq->key, &app->name) ? NXT_OK : NXT_DECLINED;
2397 }
2398 
2399 
2400 static nxt_int_t
nxt_router_apps_hash_add(nxt_router_conf_t * rtcf,nxt_app_t * app)2401 nxt_router_apps_hash_add(nxt_router_conf_t *rtcf, nxt_app_t *app)
2402 {
2403     nxt_lvlhsh_query_t  lhq;
2404 
2405     lhq.key_hash = nxt_djb_hash(app->name.start, app->name.length);
2406     lhq.replace = 0;
2407     lhq.key = app->name;
2408     lhq.value = app;
2409     lhq.proto = &nxt_router_apps_hash_proto;
2410     lhq.pool = rtcf->mem_pool;
2411 
2412     switch (nxt_lvlhsh_insert(&rtcf->apps_hash, &lhq)) {
2413 
2414     case NXT_OK:
2415         return NXT_OK;
2416 
2417     case NXT_DECLINED:
2418         nxt_thread_log_alert("router app hash adding failed: "
2419                              "\"%V\" is already in hash", &lhq.key);
2420         /* Fall through. */
2421     default:
2422         return NXT_ERROR;
2423     }
2424 }
2425 
2426 
2427 static nxt_app_t *
nxt_router_apps_hash_get(nxt_router_conf_t * rtcf,nxt_str_t * name)2428 nxt_router_apps_hash_get(nxt_router_conf_t *rtcf, nxt_str_t *name)
2429 {
2430     nxt_lvlhsh_query_t  lhq;
2431 
2432     lhq.key_hash = nxt_djb_hash(name->start, name->length);
2433     lhq.key = *name;
2434     lhq.proto = &nxt_router_apps_hash_proto;
2435 
2436     if (nxt_lvlhsh_find(&rtcf->apps_hash, &lhq) != NXT_OK) {
2437         return NULL;
2438     }
2439 
2440     return lhq.value;
2441 }
2442 
2443 
2444 static void
nxt_router_apps_hash_use(nxt_task_t * task,nxt_router_conf_t * rtcf,int i)2445 nxt_router_apps_hash_use(nxt_task_t *task, nxt_router_conf_t *rtcf, int i)
2446 {
2447     nxt_app_t          *app;
2448     nxt_lvlhsh_each_t  lhe;
2449 
2450     nxt_lvlhsh_each_init(&lhe, &nxt_router_apps_hash_proto);
2451 
2452     for ( ;; ) {
2453         app = nxt_lvlhsh_each(&rtcf->apps_hash, &lhe);
2454 
2455         if (app == NULL) {
2456             break;
2457         }
2458 
2459         nxt_router_app_use(task, app, i);
2460     }
2461 }
2462 
2463 
2464 typedef struct {
2465     nxt_app_t  *app;
2466     nxt_int_t  target;
2467 } nxt_http_app_conf_t;
2468 
2469 
2470 nxt_int_t
nxt_router_application_init(nxt_router_conf_t * rtcf,nxt_str_t * name,nxt_str_t * target,nxt_http_action_t * action)2471 nxt_router_application_init(nxt_router_conf_t *rtcf, nxt_str_t *name,
2472     nxt_str_t *target, nxt_http_action_t *action)
2473 {
2474     nxt_app_t            *app;
2475     nxt_str_t            *targets;
2476     nxt_uint_t           i;
2477     nxt_http_app_conf_t  *conf;
2478 
2479     app = nxt_router_apps_hash_get(rtcf, name);
2480     if (app == NULL) {
2481         return NXT_DECLINED;
2482     }
2483 
2484     conf = nxt_mp_get(rtcf->mem_pool, sizeof(nxt_http_app_conf_t));
2485     if (nxt_slow_path(conf == NULL)) {
2486         return NXT_ERROR;
2487     }
2488 
2489     action->handler = nxt_http_application_handler;
2490     action->u.conf = conf;
2491 
2492     conf->app = app;
2493 
2494     if (target != NULL && target->length != 0) {
2495         targets = app->targets;
2496 
2497         for (i = 0; !nxt_strstr_eq(target, &targets[i]); i++);
2498 
2499         conf->target = i;
2500 
2501     } else {
2502         conf->target = 0;
2503     }
2504 
2505     return NXT_OK;
2506 }
2507 
2508 
2509 static nxt_socket_conf_t *
nxt_router_socket_conf(nxt_task_t * task,nxt_router_temp_conf_t * tmcf,nxt_str_t * name)2510 nxt_router_socket_conf(nxt_task_t *task, nxt_router_temp_conf_t *tmcf,
2511     nxt_str_t *name)
2512 {
2513     size_t               size;
2514     nxt_int_t            ret;
2515     nxt_bool_t           wildcard;
2516     nxt_sockaddr_t       *sa;
2517     nxt_socket_conf_t    *skcf;
2518     nxt_listen_socket_t  *ls;
2519 
2520     sa = nxt_sockaddr_parse(tmcf->mem_pool, name);
2521     if (nxt_slow_path(sa == NULL)) {
2522         nxt_alert(task, "invalid listener \"%V\"", name);
2523         return NULL;
2524     }
2525 
2526     sa->type = SOCK_STREAM;
2527 
2528     nxt_debug(task, "router listener: \"%*s\"",
2529               (size_t) sa->length, nxt_sockaddr_start(sa));
2530 
2531     skcf = nxt_mp_zget(tmcf->router_conf->mem_pool, sizeof(nxt_socket_conf_t));
2532     if (nxt_slow_path(skcf == NULL)) {
2533         return NULL;
2534     }
2535 
2536     size = nxt_sockaddr_size(sa);
2537 
2538     ret = nxt_router_listen_socket_find(tmcf, skcf, sa);
2539 
2540     if (ret != NXT_OK) {
2541 
2542         ls = nxt_zalloc(sizeof(nxt_listen_socket_t) + size);
2543         if (nxt_slow_path(ls == NULL)) {
2544             return NULL;
2545         }
2546 
2547         skcf->listen = ls;
2548 
2549         ls->sockaddr = nxt_pointer_to(ls, sizeof(nxt_listen_socket_t));
2550         nxt_memcpy(ls->sockaddr, sa, size);
2551 
2552         nxt_listen_socket_remote_size(ls);
2553 
2554         ls->socket = -1;
2555         ls->backlog = NXT_LISTEN_BACKLOG;
2556         ls->flags = NXT_NONBLOCK;
2557         ls->read_after_accept = 1;
2558     }
2559 
2560     switch (sa->u.sockaddr.sa_family) {
2561 #if (NXT_HAVE_UNIX_DOMAIN)
2562     case AF_UNIX:
2563         wildcard = 0;
2564         break;
2565 #endif
2566 #if (NXT_INET6)
2567     case AF_INET6:
2568         wildcard = IN6_IS_ADDR_UNSPECIFIED(&sa->u.sockaddr_in6.sin6_addr);
2569         break;
2570 #endif
2571     case AF_INET:
2572     default:
2573         wildcard = (sa->u.sockaddr_in.sin_addr.s_addr == INADDR_ANY);
2574         break;
2575     }
2576 
2577     if (!wildcard) {
2578         skcf->sockaddr = nxt_mp_zget(tmcf->router_conf->mem_pool, size);
2579         if (nxt_slow_path(skcf->sockaddr == NULL)) {
2580             return NULL;
2581         }
2582 
2583         nxt_memcpy(skcf->sockaddr, sa, size);
2584     }
2585 
2586     return skcf;
2587 }
2588 
2589 
2590 static nxt_int_t
nxt_router_listen_socket_find(nxt_router_temp_conf_t * tmcf,nxt_socket_conf_t * nskcf,nxt_sockaddr_t * sa)2591 nxt_router_listen_socket_find(nxt_router_temp_conf_t *tmcf,
2592     nxt_socket_conf_t *nskcf, nxt_sockaddr_t *sa)
2593 {
2594     nxt_router_t       *router;
2595     nxt_queue_link_t   *qlk;
2596     nxt_socket_conf_t  *skcf;
2597 
2598     router = tmcf->router_conf->router;
2599 
2600     for (qlk = nxt_queue_first(&router->sockets);
2601          qlk != nxt_queue_tail(&router->sockets);
2602          qlk = nxt_queue_next(qlk))
2603     {
2604         skcf = nxt_queue_link_data(qlk, nxt_socket_conf_t, link);
2605 
2606         if (nxt_sockaddr_cmp(skcf->listen->sockaddr, sa)) {
2607             nskcf->listen = skcf->listen;
2608 
2609             nxt_queue_remove(qlk);
2610             nxt_queue_insert_tail(&keeping_sockets, qlk);
2611 
2612             nxt_queue_insert_tail(&updating_sockets, &nskcf->link);
2613 
2614             return NXT_OK;
2615         }
2616     }
2617 
2618     nxt_queue_insert_tail(&pending_sockets, &nskcf->link);
2619 
2620     return NXT_DECLINED;
2621 }
2622 
2623 
2624 static void
nxt_router_listen_socket_rpc_create(nxt_task_t * task,nxt_router_temp_conf_t * tmcf,nxt_socket_conf_t * skcf)2625 nxt_router_listen_socket_rpc_create(nxt_task_t *task,
2626     nxt_router_temp_conf_t *tmcf, nxt_socket_conf_t *skcf)
2627 {
2628     size_t            size;
2629     uint32_t          stream;
2630     nxt_int_t         ret;
2631     nxt_buf_t         *b;
2632     nxt_port_t        *main_port, *router_port;
2633     nxt_runtime_t     *rt;
2634     nxt_socket_rpc_t  *rpc;
2635 
2636     rpc = nxt_mp_alloc(tmcf->mem_pool, sizeof(nxt_socket_rpc_t));
2637     if (rpc == NULL) {
2638         goto fail;
2639     }
2640 
2641     rpc->socket_conf = skcf;
2642     rpc->temp_conf = tmcf;
2643 
2644     size = nxt_sockaddr_size(skcf->listen->sockaddr);
2645 
2646     b = nxt_buf_mem_alloc(tmcf->mem_pool, size, 0);
2647     if (b == NULL) {
2648         goto fail;
2649     }
2650 
2651     b->completion_handler = nxt_buf_dummy_completion;
2652 
2653     b->mem.free = nxt_cpymem(b->mem.free, skcf->listen->sockaddr, size);
2654 
2655     rt = task->thread->runtime;
2656     main_port = rt->port_by_type[NXT_PROCESS_MAIN];
2657     router_port = rt->port_by_type[NXT_PROCESS_ROUTER];
2658 
2659     stream = nxt_port_rpc_register_handler(task, router_port,
2660                                            nxt_router_listen_socket_ready,
2661                                            nxt_router_listen_socket_error,
2662                                            main_port->pid, rpc);
2663     if (nxt_slow_path(stream == 0)) {
2664         goto fail;
2665     }
2666 
2667     ret = nxt_port_socket_write(task, main_port, NXT_PORT_MSG_SOCKET, -1,
2668                                 stream, router_port->id, b);
2669 
2670     if (nxt_slow_path(ret != NXT_OK)) {
2671         nxt_port_rpc_cancel(task, router_port, stream);
2672         goto fail;
2673     }
2674 
2675     return;
2676 
2677 fail:
2678 
2679     nxt_router_conf_error(task, tmcf);
2680 }
2681 
2682 
2683 static void
nxt_router_listen_socket_ready(nxt_task_t * task,nxt_port_recv_msg_t * msg,void * data)2684 nxt_router_listen_socket_ready(nxt_task_t *task, nxt_port_recv_msg_t *msg,
2685     void *data)
2686 {
2687     nxt_int_t         ret;
2688     nxt_socket_t      s;
2689     nxt_socket_rpc_t  *rpc;
2690 
2691     rpc = data;
2692 
2693     s = msg->fd[0];
2694 
2695     ret = nxt_socket_nonblocking(task, s);
2696     if (nxt_slow_path(ret != NXT_OK)) {
2697         goto fail;
2698     }
2699 
2700     nxt_socket_defer_accept(task, s, rpc->socket_conf->listen->sockaddr);
2701 
2702     ret = nxt_listen_socket(task, s, NXT_LISTEN_BACKLOG);
2703     if (nxt_slow_path(ret != NXT_OK)) {
2704         goto fail;
2705     }
2706 
2707     rpc->socket_conf->listen->socket = s;
2708 
2709     nxt_work_queue_add(&task->thread->engine->fast_work_queue,
2710                        nxt_router_conf_apply, task, rpc->temp_conf, NULL);
2711 
2712     return;
2713 
2714 fail:
2715 
2716     nxt_socket_close(task, s);
2717 
2718     nxt_router_conf_error(task, rpc->temp_conf);
2719 }
2720 
2721 
2722 static void
nxt_router_listen_socket_error(nxt_task_t * task,nxt_port_recv_msg_t * msg,void * data)2723 nxt_router_listen_socket_error(nxt_task_t *task, nxt_port_recv_msg_t *msg,
2724     void *data)
2725 {
2726     nxt_socket_rpc_t        *rpc;
2727     nxt_router_temp_conf_t  *tmcf;
2728 
2729     rpc = data;
2730     tmcf = rpc->temp_conf;
2731 
2732 #if 0
2733     u_char                  *p;
2734     size_t                  size;
2735     uint8_t                 error;
2736     nxt_buf_t               *in, *out;
2737     nxt_sockaddr_t          *sa;
2738 
2739     static nxt_str_t  socket_errors[] = {
2740         nxt_string("ListenerSystem"),
2741         nxt_string("ListenerNoIPv6"),
2742         nxt_string("ListenerPort"),
2743         nxt_string("ListenerInUse"),
2744         nxt_string("ListenerNoAddress"),
2745         nxt_string("ListenerNoAccess"),
2746         nxt_string("ListenerPath"),
2747     };
2748 
2749     sa = rpc->socket_conf->listen->sockaddr;
2750 
2751     in = nxt_buf_chk_make_plain(tmcf->mem_pool, msg->buf, msg->size);
2752 
2753     if (nxt_slow_path(in == NULL)) {
2754         return;
2755     }
2756 
2757     p = in->mem.pos;
2758 
2759     error = *p++;
2760 
2761     size = nxt_length("listen socket error: ")
2762            + nxt_length("{listener: \"\", code:\"\", message: \"\"}")
2763            + sa->length + socket_errors[error].length + (in->mem.free - p);
2764 
2765     out = nxt_buf_mem_alloc(tmcf->mem_pool, size, 0);
2766     if (nxt_slow_path(out == NULL)) {
2767         return;
2768     }
2769 
2770     out->mem.free = nxt_sprintf(out->mem.free, out->mem.end,
2771                         "listen socket error: "
2772                         "{listener: \"%*s\", code:\"%V\", message: \"%*s\"}",
2773                         (size_t) sa->length, nxt_sockaddr_start(sa),
2774                         &socket_errors[error], in->mem.free - p, p);
2775 
2776     nxt_debug(task, "%*s", out->mem.free - out->mem.pos, out->mem.pos);
2777 #endif
2778 
2779     nxt_router_conf_error(task, tmcf);
2780 }
2781 
2782 
2783 #if (NXT_TLS)
2784 
2785 static void
nxt_router_tls_rpc_handler(nxt_task_t * task,nxt_port_recv_msg_t * msg,void * data)2786 nxt_router_tls_rpc_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg,
2787     void *data)
2788 {
2789     nxt_mp_t                *mp;
2790     nxt_int_t               ret;
2791     nxt_tls_conf_t          *tlscf;
2792     nxt_router_tlssock_t    *tls;
2793     nxt_tls_bundle_conf_t   *bundle;
2794     nxt_router_temp_conf_t  *tmcf;
2795 
2796     nxt_debug(task, "tls rpc handler");
2797 
2798     tls = data;
2799     tmcf = tls->temp_conf;
2800 
2801     if (msg == NULL || msg->port_msg.type == _NXT_PORT_MSG_RPC_ERROR) {
2802         goto fail;
2803     }
2804 
2805     mp = tmcf->router_conf->mem_pool;
2806 
2807     if (tls->socket_conf->tls == NULL){
2808         tlscf = nxt_mp_zget(mp, sizeof(nxt_tls_conf_t));
2809         if (nxt_slow_path(tlscf == NULL)) {
2810             goto fail;
2811         }
2812 
2813         tlscf->no_wait_shutdown = 1;
2814         tls->socket_conf->tls = tlscf;
2815 
2816     } else {
2817         tlscf = tls->socket_conf->tls;
2818     }
2819 
2820     tls->tls_init->conf = tlscf;
2821 
2822     bundle = nxt_mp_get(mp, sizeof(nxt_tls_bundle_conf_t));
2823     if (nxt_slow_path(bundle == NULL)) {
2824         goto fail;
2825     }
2826 
2827     if (nxt_slow_path(nxt_str_dup(mp, &bundle->name, &tls->name) == NULL)) {
2828         goto fail;
2829     }
2830 
2831     bundle->chain_file = msg->fd[0];
2832     bundle->next = tlscf->bundle;
2833     tlscf->bundle = bundle;
2834 
2835     ret = task->thread->runtime->tls->server_init(task, mp, tls->tls_init,
2836                                                   tls->last);
2837     if (nxt_slow_path(ret != NXT_OK)) {
2838         goto fail;
2839     }
2840 
2841     nxt_work_queue_add(&task->thread->engine->fast_work_queue,
2842                        nxt_router_conf_apply, task, tmcf, NULL);
2843     return;
2844 
2845 fail:
2846 
2847     nxt_router_conf_error(task, tmcf);
2848 }
2849 
2850 #endif
2851 
2852 
2853 static void
nxt_router_app_rpc_create(nxt_task_t * task,nxt_router_temp_conf_t * tmcf,nxt_app_t * app)2854 nxt_router_app_rpc_create(nxt_task_t *task,
2855     nxt_router_temp_conf_t *tmcf, nxt_app_t *app)
2856 {
2857     size_t         size;
2858     uint32_t       stream;
2859     nxt_fd_t       port_fd, queue_fd;
2860     nxt_int_t      ret;
2861     nxt_buf_t      *b;
2862     nxt_port_t     *router_port, *dport;
2863     nxt_runtime_t  *rt;
2864     nxt_app_rpc_t  *rpc;
2865 
2866     rt = task->thread->runtime;
2867 
2868     dport = app->proto_port;
2869 
2870     if (dport == NULL) {
2871         nxt_debug(task, "app '%V' prototype prefork", &app->name);
2872 
2873         size = app->name.length + 1 + app->conf.length;
2874 
2875         b = nxt_buf_mem_alloc(tmcf->mem_pool, size, 0);
2876         if (nxt_slow_path(b == NULL)) {
2877             goto fail;
2878         }
2879 
2880         b->completion_handler = nxt_buf_dummy_completion;
2881 
2882         nxt_buf_cpystr(b, &app->name);
2883         *b->mem.free++ = '\0';
2884         nxt_buf_cpystr(b, &app->conf);
2885 
2886         dport = rt->port_by_type[NXT_PROCESS_MAIN];
2887 
2888         port_fd = app->shared_port->pair[0];
2889         queue_fd = app->shared_port->queue_fd;
2890 
2891     } else {
2892         nxt_debug(task, "app '%V' prefork", &app->name);
2893 
2894         b = NULL;
2895         port_fd = -1;
2896         queue_fd = -1;
2897     }
2898 
2899     router_port = rt->port_by_type[NXT_PROCESS_ROUTER];
2900 
2901     rpc = nxt_port_rpc_register_handler_ex(task, router_port,
2902                                            nxt_router_app_prefork_ready,
2903                                            nxt_router_app_prefork_error,
2904                                            sizeof(nxt_app_rpc_t));
2905     if (nxt_slow_path(rpc == NULL)) {
2906         goto fail;
2907     }
2908 
2909     rpc->app = app;
2910     rpc->temp_conf = tmcf;
2911     rpc->proto = (b != NULL);
2912 
2913     stream = nxt_port_rpc_ex_stream(rpc);
2914 
2915     ret = nxt_port_socket_write2(task, dport, NXT_PORT_MSG_START_PROCESS,
2916                                  port_fd, queue_fd, stream, router_port->id, b);
2917     if (nxt_slow_path(ret != NXT_OK)) {
2918         nxt_port_rpc_cancel(task, router_port, stream);
2919         goto fail;
2920     }
2921 
2922     if (b == NULL) {
2923         nxt_port_rpc_ex_set_peer(task, router_port, rpc, dport->pid);
2924 
2925         app->pending_processes++;
2926     }
2927 
2928     return;
2929 
2930 fail:
2931 
2932     nxt_router_conf_error(task, tmcf);
2933 }
2934 
2935 
2936 static void
nxt_router_app_prefork_ready(nxt_task_t * task,nxt_port_recv_msg_t * msg,void * data)2937 nxt_router_app_prefork_ready(nxt_task_t *task, nxt_port_recv_msg_t *msg,
2938     void *data)
2939 {
2940     nxt_app_t           *app;
2941     nxt_port_t          *port;
2942     nxt_app_rpc_t       *rpc;
2943     nxt_event_engine_t  *engine;
2944 
2945     rpc = data;
2946     app = rpc->app;
2947 
2948     port = msg->u.new_port;
2949 
2950     nxt_assert(port != NULL);
2951     nxt_assert(port->id == 0);
2952 
2953     if (rpc->proto) {
2954         nxt_assert(app->proto_port == NULL);
2955         nxt_assert(port->type == NXT_PROCESS_PROTOTYPE);
2956 
2957         nxt_port_inc_use(port);
2958 
2959         app->proto_port = port;
2960         port->app = app;
2961 
2962         nxt_router_app_rpc_create(task, rpc->temp_conf, app);
2963 
2964         return;
2965     }
2966 
2967     nxt_assert(port->type == NXT_PROCESS_APP);
2968 
2969     port->app = app;
2970     port->main_app_port = port;
2971 
2972     app->pending_processes--;
2973     app->processes++;
2974     app->idle_processes++;
2975 
2976     engine = task->thread->engine;
2977 
2978     nxt_queue_insert_tail(&app->ports, &port->app_link);
2979     nxt_queue_insert_tail(&app->spare_ports, &port->idle_link);
2980 
2981     nxt_debug(task, "app '%V' move new port %PI:%d to spare_ports",
2982               &app->name, port->pid, port->id);
2983 
2984     nxt_port_hash_add(&app->port_hash, port);
2985     app->port_hash_count++;
2986 
2987     port->idle_start = 0;
2988 
2989     nxt_port_inc_use(port);
2990 
2991     nxt_port_socket_write(task, port, NXT_PORT_MSG_PORT_ACK, -1, 0, 0, NULL);
2992 
2993     nxt_work_queue_add(&engine->fast_work_queue,
2994                        nxt_router_conf_apply, task, rpc->temp_conf, NULL);
2995 }
2996 
2997 
2998 static void
nxt_router_app_prefork_error(nxt_task_t * task,nxt_port_recv_msg_t * msg,void * data)2999 nxt_router_app_prefork_error(nxt_task_t *task, nxt_port_recv_msg_t *msg,
3000     void *data)
3001 {
3002     nxt_app_t               *app;
3003     nxt_app_rpc_t           *rpc;
3004     nxt_router_temp_conf_t  *tmcf;
3005 
3006     rpc = data;
3007     app = rpc->app;
3008     tmcf = rpc->temp_conf;
3009 
3010     if (rpc->proto) {
3011         nxt_log(task, NXT_LOG_WARN, "failed to start prototype \"%V\"",
3012                 &app->name);
3013 
3014     } else {
3015         nxt_log(task, NXT_LOG_WARN, "failed to start application \"%V\"",
3016                 &app->name);
3017 
3018         app->pending_processes--;
3019     }
3020 
3021     nxt_router_conf_error(task, tmcf);
3022 }
3023 
3024 
3025 static nxt_int_t
nxt_router_engines_create(nxt_task_t * task,nxt_router_t * router,nxt_router_temp_conf_t * tmcf,const nxt_event_interface_t * interface)3026 nxt_router_engines_create(nxt_task_t *task, nxt_router_t *router,
3027     nxt_router_temp_conf_t *tmcf, const nxt_event_interface_t *interface)
3028 {
3029     nxt_int_t                 ret;
3030     nxt_uint_t                n, threads;
3031     nxt_queue_link_t          *qlk;
3032     nxt_router_engine_conf_t  *recf;
3033 
3034     threads = tmcf->router_conf->threads;
3035 
3036     tmcf->engines = nxt_array_create(tmcf->mem_pool, threads,
3037                                      sizeof(nxt_router_engine_conf_t));
3038     if (nxt_slow_path(tmcf->engines == NULL)) {
3039         return NXT_ERROR;
3040     }
3041 
3042     n = 0;
3043 
3044     for (qlk = nxt_queue_first(&router->engines);
3045          qlk != nxt_queue_tail(&router->engines);
3046          qlk = nxt_queue_next(qlk))
3047     {
3048         recf = nxt_array_zero_add(tmcf->engines);
3049         if (nxt_slow_path(recf == NULL)) {
3050             return NXT_ERROR;
3051         }
3052 
3053         recf->engine = nxt_queue_link_data(qlk, nxt_event_engine_t, link0);
3054 
3055         if (n < threads) {
3056             recf->action = NXT_ROUTER_ENGINE_KEEP;
3057             ret = nxt_router_engine_conf_update(tmcf, recf);
3058 
3059         } else {
3060             recf->action = NXT_ROUTER_ENGINE_DELETE;
3061             ret = nxt_router_engine_conf_delete(tmcf, recf);
3062         }
3063 
3064         if (nxt_slow_path(ret != NXT_OK)) {
3065             return ret;
3066         }
3067 
3068         n++;
3069     }
3070 
3071     tmcf->new_threads = n;
3072 
3073     while (n < threads) {
3074         recf = nxt_array_zero_add(tmcf->engines);
3075         if (nxt_slow_path(recf == NULL)) {
3076             return NXT_ERROR;
3077         }
3078 
3079         recf->action = NXT_ROUTER_ENGINE_ADD;
3080 
3081         recf->engine = nxt_event_engine_create(task, interface, NULL, 0, 0);
3082         if (nxt_slow_path(recf->engine == NULL)) {
3083             return NXT_ERROR;
3084         }
3085 
3086         ret = nxt_router_engine_conf_create(tmcf, recf);
3087         if (nxt_slow_path(ret != NXT_OK)) {
3088             return ret;
3089         }
3090 
3091         n++;
3092     }
3093 
3094     return NXT_OK;
3095 }
3096 
3097 
3098 static nxt_int_t
nxt_router_engine_conf_create(nxt_router_temp_conf_t * tmcf,nxt_router_engine_conf_t * recf)3099 nxt_router_engine_conf_create(nxt_router_temp_conf_t *tmcf,
3100     nxt_router_engine_conf_t *recf)
3101 {
3102     nxt_int_t  ret;
3103 
3104     ret = nxt_router_engine_joints_create(tmcf, recf, &creating_sockets,
3105                                           nxt_router_listen_socket_create);
3106     if (nxt_slow_path(ret != NXT_OK)) {
3107         return ret;
3108     }
3109 
3110     ret = nxt_router_engine_joints_create(tmcf, recf, &updating_sockets,
3111                                           nxt_router_listen_socket_create);
3112     if (nxt_slow_path(ret != NXT_OK)) {
3113         return ret;
3114     }
3115 
3116     return ret;
3117 }
3118 
3119 
3120 static nxt_int_t
nxt_router_engine_conf_update(nxt_router_temp_conf_t * tmcf,nxt_router_engine_conf_t * recf)3121 nxt_router_engine_conf_update(nxt_router_temp_conf_t *tmcf,
3122     nxt_router_engine_conf_t *recf)
3123 {
3124     nxt_int_t  ret;
3125 
3126     ret = nxt_router_engine_joints_create(tmcf, recf, &creating_sockets,
3127                                           nxt_router_listen_socket_create);
3128     if (nxt_slow_path(ret != NXT_OK)) {
3129         return ret;
3130     }
3131 
3132     ret = nxt_router_engine_joints_create(tmcf, recf, &updating_sockets,
3133                                           nxt_router_listen_socket_update);
3134     if (nxt_slow_path(ret != NXT_OK)) {
3135         return ret;
3136     }
3137 
3138     ret = nxt_router_engine_joints_delete(tmcf, recf, &deleting_sockets);
3139     if (nxt_slow_path(ret != NXT_OK)) {
3140         return ret;
3141     }
3142 
3143     return ret;
3144 }
3145 
3146 
3147 static nxt_int_t
nxt_router_engine_conf_delete(nxt_router_temp_conf_t * tmcf,nxt_router_engine_conf_t * recf)3148 nxt_router_engine_conf_delete(nxt_router_temp_conf_t *tmcf,
3149     nxt_router_engine_conf_t *recf)
3150 {
3151     nxt_int_t  ret;
3152 
3153     ret = nxt_router_engine_quit(tmcf,