xref: /unit/src/nxt_router.c (revision 2677:2f8937e9d8a0)
1 
2 /*
3  * Copyright (C) Igor Sysoev
4  * Copyright (C) Valentin V. Bartenev
5  * Copyright (C) NGINX, Inc.
6  */
7 
8 #include <nxt_router.h>
9 #include <nxt_conf.h>
10 #include <nxt_status.h>
11 #if (NXT_TLS)
12 #include <nxt_cert.h>
13 #endif
14 #if (NXT_HAVE_NJS)
15 #include <nxt_script.h>
16 #endif
17 #include <nxt_http.h>
18 #include <nxt_port_memory_int.h>
19 #include <nxt_unit_request.h>
20 #include <nxt_unit_response.h>
21 #include <nxt_router_request.h>
22 #include <nxt_app_queue.h>
23 #include <nxt_port_queue.h>
24 
25 #define NXT_SHARED_PORT_ID  0xFFFFu
26 
27 typedef struct {
28     nxt_str_t         type;
29     uint32_t          processes;
30     uint32_t          max_processes;
31     uint32_t          spare_processes;
32     nxt_msec_t        timeout;
33     nxt_msec_t        idle_timeout;
34     nxt_conf_value_t  *limits_value;
35     nxt_conf_value_t  *processes_value;
36     nxt_conf_value_t  *targets_value;
37 } nxt_router_app_conf_t;
38 
39 
40 typedef struct {
41     nxt_str_t         pass;
42     nxt_str_t         application;
43 } nxt_router_listener_conf_t;
44 
45 
46 #if (NXT_TLS)
47 
48 typedef struct {
49     nxt_str_t               name;
50     nxt_socket_conf_t       *socket_conf;
51     nxt_router_temp_conf_t  *temp_conf;
52     nxt_tls_init_t          *tls_init;
53     nxt_bool_t              last;
54 
55     nxt_queue_link_t        link;  /* for nxt_socket_conf_t.tls */
56 } nxt_router_tlssock_t;
57 
58 #endif
59 
60 
61 #if (NXT_HAVE_NJS)
62 
63 typedef struct {
64     nxt_str_t               name;
65     nxt_router_temp_conf_t  *temp_conf;
66     nxt_queue_link_t        link;
67 } nxt_router_js_module_t;
68 
69 #endif
70 
71 
72 typedef struct {
73     nxt_str_t               *name;
74     nxt_socket_conf_t       *socket_conf;
75     nxt_router_temp_conf_t  *temp_conf;
76     nxt_bool_t              last;
77 } nxt_socket_rpc_t;
78 
79 
80 typedef struct {
81     nxt_app_t               *app;
82     nxt_router_temp_conf_t  *temp_conf;
83     uint8_t                 proto;  /* 1 bit */
84 } nxt_app_rpc_t;
85 
86 
87 typedef struct {
88     nxt_app_joint_t         *app_joint;
89     uint32_t                generation;
90     uint8_t                 proto;  /* 1 bit */
91 } nxt_app_joint_rpc_t;
92 
93 
94 static nxt_int_t nxt_router_prefork(nxt_task_t *task, nxt_process_t *process,
95     nxt_mp_t *mp);
96 static nxt_int_t nxt_router_start(nxt_task_t *task, nxt_process_data_t *data);
97 static void nxt_router_greet_controller(nxt_task_t *task,
98     nxt_port_t *controller_port);
99 
100 static nxt_int_t nxt_router_start_app_process(nxt_task_t *task, nxt_app_t *app);
101 
102 static void nxt_router_new_port_handler(nxt_task_t *task,
103     nxt_port_recv_msg_t *msg);
104 static void nxt_router_conf_data_handler(nxt_task_t *task,
105     nxt_port_recv_msg_t *msg);
106 static void nxt_router_app_restart_handler(nxt_task_t *task,
107     nxt_port_recv_msg_t *msg);
108 static void nxt_router_status_handler(nxt_task_t *task,
109     nxt_port_recv_msg_t *msg);
110 static void nxt_router_remove_pid_handler(nxt_task_t *task,
111     nxt_port_recv_msg_t *msg);
112 
113 static nxt_router_temp_conf_t *nxt_router_temp_conf(nxt_task_t *task);
114 static void nxt_router_conf_ready(nxt_task_t *task,
115     nxt_router_temp_conf_t *tmcf);
116 static void nxt_router_conf_send(nxt_task_t *task,
117     nxt_router_temp_conf_t *tmcf, nxt_port_msg_type_t type);
118 
119 static nxt_int_t nxt_router_conf_create(nxt_task_t *task,
120     nxt_router_temp_conf_t *tmcf, u_char *start, u_char *end);
121 static nxt_int_t nxt_router_conf_process_static(nxt_task_t *task,
122     nxt_router_conf_t *rtcf, nxt_conf_value_t *conf);
123 static nxt_http_forward_t *nxt_router_conf_forward(nxt_task_t *task,
124     nxt_mp_t *mp, nxt_conf_value_t *conf);
125 static nxt_int_t nxt_router_conf_forward_header(nxt_mp_t *mp,
126     nxt_conf_value_t *conf, nxt_http_forward_header_t *fh);
127 
128 static nxt_app_t *nxt_router_app_find(nxt_queue_t *queue, nxt_str_t *name);
129 static nxt_int_t nxt_router_apps_hash_test(nxt_lvlhsh_query_t *lhq, void *data);
130 static nxt_int_t nxt_router_apps_hash_add(nxt_router_conf_t *rtcf,
131     nxt_app_t *app);
132 static nxt_app_t *nxt_router_apps_hash_get(nxt_router_conf_t *rtcf,
133     nxt_str_t *name);
134 static void nxt_router_apps_hash_use(nxt_task_t *task, nxt_router_conf_t *rtcf,
135     int i);
136 
137 static nxt_int_t nxt_router_app_queue_init(nxt_task_t *task,
138     nxt_port_t *port);
139 static nxt_int_t nxt_router_port_queue_init(nxt_task_t *task,
140     nxt_port_t *port);
141 static nxt_int_t nxt_router_port_queue_map(nxt_task_t *task,
142     nxt_port_t *port, nxt_fd_t fd);
143 static void nxt_router_listen_socket_rpc_create(nxt_task_t *task,
144     nxt_router_temp_conf_t *tmcf, nxt_socket_conf_t *skcf);
145 static void nxt_router_listen_socket_ready(nxt_task_t *task,
146     nxt_port_recv_msg_t *msg, void *data);
147 static void nxt_router_listen_socket_error(nxt_task_t *task,
148     nxt_port_recv_msg_t *msg, void *data);
149 #if (NXT_TLS)
150 static void nxt_router_tls_rpc_handler(nxt_task_t *task,
151     nxt_port_recv_msg_t *msg, void *data);
152 static nxt_int_t nxt_router_conf_tls_insert(nxt_router_temp_conf_t *tmcf,
153     nxt_conf_value_t *value, nxt_socket_conf_t *skcf, nxt_tls_init_t *tls_init,
154     nxt_bool_t last);
155 #endif
156 #if (NXT_HAVE_NJS)
157 static void nxt_router_js_module_rpc_handler(nxt_task_t *task,
158     nxt_port_recv_msg_t *msg, void *data);
159 static nxt_int_t nxt_router_js_module_insert(nxt_router_temp_conf_t *tmcf,
160     nxt_conf_value_t *value);
161 #endif
162 static void nxt_router_app_rpc_create(nxt_task_t *task,
163     nxt_router_temp_conf_t *tmcf, nxt_app_t *app);
164 static void nxt_router_app_prefork_ready(nxt_task_t *task,
165     nxt_port_recv_msg_t *msg, void *data);
166 static void nxt_router_app_prefork_error(nxt_task_t *task,
167     nxt_port_recv_msg_t *msg, void *data);
168 static nxt_socket_conf_t *nxt_router_socket_conf(nxt_task_t *task,
169     nxt_router_temp_conf_t *tmcf, nxt_str_t *name);
170 static nxt_int_t nxt_router_listen_socket_find(nxt_router_temp_conf_t *tmcf,
171     nxt_socket_conf_t *nskcf, nxt_sockaddr_t *sa);
172 
173 static nxt_int_t nxt_router_engines_create(nxt_task_t *task,
174     nxt_router_t *router, nxt_router_temp_conf_t *tmcf,
175     const nxt_event_interface_t *interface);
176 static nxt_int_t nxt_router_engine_conf_create(nxt_router_temp_conf_t *tmcf,
177     nxt_router_engine_conf_t *recf);
178 static nxt_int_t nxt_router_engine_conf_update(nxt_router_temp_conf_t *tmcf,
179     nxt_router_engine_conf_t *recf);
180 static nxt_int_t nxt_router_engine_conf_delete(nxt_router_temp_conf_t *tmcf,
181     nxt_router_engine_conf_t *recf);
182 static nxt_int_t nxt_router_engine_joints_create(nxt_router_temp_conf_t *tmcf,
183     nxt_router_engine_conf_t *recf, nxt_queue_t *sockets,
184     nxt_work_handler_t handler);
185 static nxt_int_t nxt_router_engine_quit(nxt_router_temp_conf_t *tmcf,
186     nxt_router_engine_conf_t *recf);
187 static nxt_int_t nxt_router_engine_joints_delete(nxt_router_temp_conf_t *tmcf,
188     nxt_router_engine_conf_t *recf, nxt_queue_t *sockets);
189 
190 static nxt_int_t nxt_router_threads_create(nxt_task_t *task, nxt_runtime_t *rt,
191     nxt_router_temp_conf_t *tmcf);
192 static nxt_int_t nxt_router_thread_create(nxt_task_t *task, nxt_runtime_t *rt,
193     nxt_event_engine_t *engine);
194 static void nxt_router_apps_sort(nxt_task_t *task, nxt_router_t *router,
195     nxt_router_temp_conf_t *tmcf);
196 
197 static void nxt_router_engines_post(nxt_router_t *router,
198     nxt_router_temp_conf_t *tmcf);
199 static void nxt_router_engine_post(nxt_event_engine_t *engine,
200     nxt_work_t *jobs);
201 
202 static void nxt_router_thread_start(void *data);
203 static void nxt_router_rt_add_port(nxt_task_t *task, void *obj,
204     void *data);
205 static void nxt_router_listen_socket_create(nxt_task_t *task, void *obj,
206     void *data);
207 static void nxt_router_listen_socket_update(nxt_task_t *task, void *obj,
208     void *data);
209 static void nxt_router_listen_socket_delete(nxt_task_t *task, void *obj,
210     void *data);
211 static void nxt_router_worker_thread_quit(nxt_task_t *task, void *obj,
212     void *data);
213 static void nxt_router_listen_socket_close(nxt_task_t *task, void *obj,
214     void *data);
215 static void nxt_router_thread_exit_handler(nxt_task_t *task, void *obj,
216     void *data);
217 static void nxt_router_req_headers_ack_handler(nxt_task_t *task,
218     nxt_port_recv_msg_t *msg, nxt_request_rpc_data_t *req_rpc_data);
219 static void nxt_router_listen_socket_release(nxt_task_t *task,
220     nxt_socket_conf_t *skcf);
221 
222 static void nxt_router_app_port_ready(nxt_task_t *task,
223     nxt_port_recv_msg_t *msg, void *data);
224 static void nxt_router_app_port_error(nxt_task_t *task,
225     nxt_port_recv_msg_t *msg, void *data);
226 
227 static void nxt_router_app_use(nxt_task_t *task, nxt_app_t *app, int i);
228 static void nxt_router_app_unlink(nxt_task_t *task, nxt_app_t *app);
229 
230 static void nxt_router_app_port_release(nxt_task_t *task, nxt_app_t *app,
231     nxt_port_t *port, nxt_apr_action_t action);
232 static void nxt_router_app_port_get(nxt_task_t *task, nxt_app_t *app,
233     nxt_request_rpc_data_t *req_rpc_data);
234 static void nxt_router_http_request_error(nxt_task_t *task, void *obj,
235     void *data);
236 static void nxt_router_http_request_done(nxt_task_t *task, void *obj,
237     void *data);
238 
239 static void nxt_router_app_prepare_request(nxt_task_t *task,
240     nxt_request_rpc_data_t *req_rpc_data);
241 static nxt_buf_t *nxt_router_prepare_msg(nxt_task_t *task,
242     nxt_http_request_t *r, nxt_app_t *app, const nxt_str_t *prefix);
243 
244 static void nxt_router_app_timeout(nxt_task_t *task, void *obj, void *data);
245 static void nxt_router_adjust_idle_timer(nxt_task_t *task, void *obj,
246     void *data);
247 static void nxt_router_app_idle_timeout(nxt_task_t *task, void *obj,
248     void *data);
249 static void nxt_router_app_joint_release_handler(nxt_task_t *task, void *obj,
250     void *data);
251 static void nxt_router_free_app(nxt_task_t *task, void *obj, void *data);
252 
253 static const nxt_http_request_state_t  nxt_http_request_send_state;
254 static void nxt_http_request_send_body(nxt_task_t *task, void *obj, void *data);
255 
256 static void nxt_router_app_joint_use(nxt_task_t *task,
257     nxt_app_joint_t *app_joint, int i);
258 
259 static void nxt_router_http_request_release_post(nxt_task_t *task,
260     nxt_http_request_t *r);
261 static void nxt_router_http_request_release(nxt_task_t *task, void *obj,
262     void *data);
263 static void nxt_router_oosm_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg);
264 static void nxt_router_get_port_handler(nxt_task_t *task,
265     nxt_port_recv_msg_t *msg);
266 static void nxt_router_get_mmap_handler(nxt_task_t *task,
267     nxt_port_recv_msg_t *msg);
268 
269 extern const nxt_http_request_state_t  nxt_http_websocket;
270 
271 nxt_router_t  *nxt_router;
272 
273 static const nxt_str_t http_prefix = nxt_string("HTTP_");
274 static const nxt_str_t empty_prefix = nxt_string("");
275 
276 static const nxt_str_t  *nxt_app_msg_prefix[] = {
277     [NXT_APP_EXTERNAL]  = &empty_prefix,
278     [NXT_APP_PYTHON]    = &empty_prefix,
279     [NXT_APP_PHP]       = &http_prefix,
280     [NXT_APP_PERL]      = &http_prefix,
281     [NXT_APP_RUBY]      = &http_prefix,
282     [NXT_APP_JAVA]      = &empty_prefix,
283     [NXT_APP_WASM]      = &empty_prefix,
284     [NXT_APP_WASM_WC]   = &empty_prefix,
285 };
286 
287 
288 static const nxt_port_handlers_t  nxt_router_process_port_handlers = {
289     .quit         = nxt_signal_quit_handler,
290     .new_port     = nxt_router_new_port_handler,
291     .get_port     = nxt_router_get_port_handler,
292     .change_file  = nxt_port_change_log_file_handler,
293     .mmap         = nxt_port_mmap_handler,
294     .get_mmap     = nxt_router_get_mmap_handler,
295     .data         = nxt_router_conf_data_handler,
296     .app_restart  = nxt_router_app_restart_handler,
297     .status       = nxt_router_status_handler,
298     .remove_pid   = nxt_router_remove_pid_handler,
299     .access_log   = nxt_router_access_log_reopen_handler,
300     .rpc_ready    = nxt_port_rpc_handler,
301     .rpc_error    = nxt_port_rpc_handler,
302     .oosm         = nxt_router_oosm_handler,
303 };
304 
305 
306 const nxt_process_init_t  nxt_router_process = {
307     .name           = "router",
308     .type           = NXT_PROCESS_ROUTER,
309     .prefork        = nxt_router_prefork,
310     .restart        = 1,
311     .setup          = nxt_process_core_setup,
312     .start          = nxt_router_start,
313     .port_handlers  = &nxt_router_process_port_handlers,
314     .signals        = nxt_process_signals,
315 };
316 
317 
318 /* Queues of nxt_socket_conf_t */
319 nxt_queue_t  creating_sockets;
320 nxt_queue_t  pending_sockets;
321 nxt_queue_t  updating_sockets;
322 nxt_queue_t  keeping_sockets;
323 nxt_queue_t  deleting_sockets;
324 
325 
326 static nxt_int_t
nxt_router_prefork(nxt_task_t * task,nxt_process_t * process,nxt_mp_t * mp)327 nxt_router_prefork(nxt_task_t *task, nxt_process_t *process, nxt_mp_t *mp)
328 {
329     nxt_runtime_stop_app_processes(task, task->thread->runtime);
330 
331     return NXT_OK;
332 }
333 
334 
335 static nxt_int_t
nxt_router_start(nxt_task_t * task,nxt_process_data_t * data)336 nxt_router_start(nxt_task_t *task, nxt_process_data_t *data)
337 {
338     nxt_int_t      ret;
339     nxt_port_t     *controller_port;
340     nxt_router_t   *router;
341     nxt_runtime_t  *rt;
342 
343     rt = task->thread->runtime;
344 
345     nxt_log(task, NXT_LOG_INFO, "router started");
346 
347 #if (NXT_TLS)
348     rt->tls = nxt_service_get(rt->services, "SSL/TLS", "OpenSSL");
349     if (nxt_slow_path(rt->tls == NULL)) {
350         return NXT_ERROR;
351     }
352 
353     ret = rt->tls->library_init(task);
354     if (nxt_slow_path(ret != NXT_OK)) {
355         return ret;
356     }
357 #endif
358 
359     ret = nxt_http_init(task);
360     if (nxt_slow_path(ret != NXT_OK)) {
361         return ret;
362     }
363 
364     router = nxt_zalloc(sizeof(nxt_router_t));
365     if (nxt_slow_path(router == NULL)) {
366         return NXT_ERROR;
367     }
368 
369     nxt_queue_init(&router->engines);
370     nxt_queue_init(&router->sockets);
371     nxt_queue_init(&router->apps);
372 
373     nxt_router = router;
374 
375     controller_port = rt->port_by_type[NXT_PROCESS_CONTROLLER];
376     if (controller_port != NULL) {
377         nxt_router_greet_controller(task, controller_port);
378     }
379 
380     return NXT_OK;
381 }
382 
383 
384 static void
nxt_router_greet_controller(nxt_task_t * task,nxt_port_t * controller_port)385 nxt_router_greet_controller(nxt_task_t *task, nxt_port_t *controller_port)
386 {
387     nxt_port_socket_write(task, controller_port, NXT_PORT_MSG_PROCESS_READY,
388                           -1, 0, 0, NULL);
389 }
390 
391 
392 static void
nxt_router_start_app_process_handler(nxt_task_t * task,nxt_port_t * port,void * data)393 nxt_router_start_app_process_handler(nxt_task_t *task, nxt_port_t *port,
394     void *data)
395 {
396     size_t               size;
397     uint32_t             stream;
398     nxt_fd_t             port_fd, queue_fd;
399     nxt_int_t            ret;
400     nxt_app_t            *app;
401     nxt_buf_t            *b;
402     nxt_port_t           *dport;
403     nxt_runtime_t        *rt;
404     nxt_app_joint_rpc_t  *app_joint_rpc;
405 
406     app = data;
407 
408     nxt_thread_mutex_lock(&app->mutex);
409 
410     dport = app->proto_port;
411 
412     nxt_thread_mutex_unlock(&app->mutex);
413 
414     if (dport != NULL) {
415         nxt_debug(task, "app '%V' %p start process", &app->name, app);
416 
417         b = NULL;
418         port_fd = -1;
419         queue_fd = -1;
420 
421     } else {
422         if (app->proto_port_requests > 0) {
423             nxt_debug(task, "app '%V' %p wait for prototype process",
424                       &app->name, app);
425 
426             app->proto_port_requests++;
427 
428             goto skip;
429         }
430 
431         nxt_debug(task, "app '%V' %p start prototype process", &app->name, app);
432 
433         rt = task->thread->runtime;
434         dport = rt->port_by_type[NXT_PROCESS_MAIN];
435 
436         size = app->name.length + 1 + app->conf.length;
437 
438         b = nxt_buf_mem_alloc(task->thread->engine->mem_pool, size, 0);
439         if (nxt_slow_path(b == NULL)) {
440             goto failed;
441         }
442 
443         nxt_buf_cpystr(b, &app->name);
444         *b->mem.free++ = '\0';
445         nxt_buf_cpystr(b, &app->conf);
446 
447         port_fd = app->shared_port->pair[0];
448         queue_fd = app->shared_port->queue_fd;
449     }
450 
451     app_joint_rpc = nxt_port_rpc_register_handler_ex(task, port,
452                                                      nxt_router_app_port_ready,
453                                                      nxt_router_app_port_error,
454                                                    sizeof(nxt_app_joint_rpc_t));
455     if (nxt_slow_path(app_joint_rpc == NULL)) {
456         goto failed;
457     }
458 
459     stream = nxt_port_rpc_ex_stream(app_joint_rpc);
460 
461     ret = nxt_port_socket_write2(task, dport, NXT_PORT_MSG_START_PROCESS,
462                                  port_fd, queue_fd, stream, port->id, b);
463     if (nxt_slow_path(ret != NXT_OK)) {
464         nxt_port_rpc_cancel(task, port, stream);
465 
466         goto failed;
467     }
468 
469     app_joint_rpc->app_joint = app->joint;
470     app_joint_rpc->generation = app->generation;
471     app_joint_rpc->proto = (b != NULL);
472 
473     if (b != NULL) {
474         app->proto_port_requests++;
475 
476         b = NULL;
477     }
478 
479     nxt_router_app_joint_use(task, app->joint, 1);
480 
481 failed:
482 
483     if (b != NULL) {
484         nxt_mp_free(b->data, b);
485     }
486 
487 skip:
488 
489     nxt_router_app_use(task, app, -1);
490 }
491 
492 
493 static void
nxt_router_app_joint_use(nxt_task_t * task,nxt_app_joint_t * app_joint,int i)494 nxt_router_app_joint_use(nxt_task_t *task, nxt_app_joint_t *app_joint, int i)
495 {
496     app_joint->use_count += i;
497 
498     if (app_joint->use_count == 0) {
499         nxt_assert(app_joint->app == NULL);
500 
501         nxt_free(app_joint);
502     }
503 }
504 
505 
506 static nxt_int_t
nxt_router_start_app_process(nxt_task_t * task,nxt_app_t * app)507 nxt_router_start_app_process(nxt_task_t *task, nxt_app_t *app)
508 {
509     nxt_int_t      res;
510     nxt_port_t     *router_port;
511     nxt_runtime_t  *rt;
512 
513     nxt_debug(task, "app '%V' start process", &app->name);
514 
515     rt = task->thread->runtime;
516     router_port = rt->port_by_type[NXT_PROCESS_ROUTER];
517 
518     nxt_router_app_use(task, app, 1);
519 
520     res = nxt_port_post(task, router_port, nxt_router_start_app_process_handler,
521                         app);
522 
523     if (res == NXT_OK) {
524         return res;
525     }
526 
527     nxt_thread_mutex_lock(&app->mutex);
528 
529     app->pending_processes--;
530 
531     nxt_thread_mutex_unlock(&app->mutex);
532 
533     nxt_router_app_use(task, app, -1);
534 
535     return NXT_ERROR;
536 }
537 
538 
539 nxt_inline nxt_bool_t
nxt_router_msg_cancel(nxt_task_t * task,nxt_request_rpc_data_t * req_rpc_data)540 nxt_router_msg_cancel(nxt_task_t *task, nxt_request_rpc_data_t *req_rpc_data)
541 {
542     nxt_buf_t       *b, *next;
543     nxt_bool_t      cancelled;
544     nxt_port_t      *app_port;
545     nxt_msg_info_t  *msg_info;
546 
547     msg_info = &req_rpc_data->msg_info;
548 
549     if (msg_info->buf == NULL) {
550         return 0;
551     }
552 
553     app_port = req_rpc_data->app_port;
554 
555     if (app_port != NULL && app_port->id == NXT_SHARED_PORT_ID) {
556         cancelled = nxt_app_queue_cancel(app_port->queue,
557                                          msg_info->tracking_cookie,
558                                          req_rpc_data->stream);
559 
560         if (cancelled) {
561             nxt_debug(task, "stream #%uD: cancelled by router",
562                       req_rpc_data->stream);
563         }
564 
565     } else {
566         cancelled = 0;
567     }
568 
569     for (b = msg_info->buf; b != NULL; b = next) {
570         next = b->next;
571         b->next = NULL;
572 
573         if (b->is_port_mmap_sent) {
574             b->is_port_mmap_sent = cancelled == 0;
575         }
576 
577         b->completion_handler(task, b, b->parent);
578     }
579 
580     msg_info->buf = NULL;
581 
582     return cancelled;
583 }
584 
585 
586 nxt_inline nxt_bool_t
nxt_queue_chk_remove(nxt_queue_link_t * lnk)587 nxt_queue_chk_remove(nxt_queue_link_t *lnk)
588 {
589     if (lnk->next != NULL) {
590         nxt_queue_remove(lnk);
591 
592         lnk->next = NULL;
593 
594         return 1;
595     }
596 
597     return 0;
598 }
599 
600 
601 nxt_inline void
nxt_request_rpc_data_unlink(nxt_task_t * task,nxt_request_rpc_data_t * req_rpc_data)602 nxt_request_rpc_data_unlink(nxt_task_t *task,
603     nxt_request_rpc_data_t *req_rpc_data)
604 {
605     nxt_app_t           *app;
606     nxt_bool_t          unlinked;
607     nxt_http_request_t  *r;
608 
609     nxt_router_msg_cancel(task, req_rpc_data);
610 
611     app = req_rpc_data->app;
612 
613     if (req_rpc_data->app_port != NULL) {
614         nxt_router_app_port_release(task, app, req_rpc_data->app_port,
615                                     req_rpc_data->apr_action);
616 
617         req_rpc_data->app_port = NULL;
618     }
619 
620     r = req_rpc_data->request;
621 
622     if (r != NULL) {
623         r->timer_data = NULL;
624 
625         nxt_router_http_request_release_post(task, r);
626 
627         r->req_rpc_data = NULL;
628         req_rpc_data->request = NULL;
629 
630         if (app != NULL) {
631             unlinked = 0;
632 
633             nxt_thread_mutex_lock(&app->mutex);
634 
635             if (r->app_link.next != NULL) {
636                 nxt_queue_remove(&r->app_link);
637                 r->app_link.next = NULL;
638 
639                 unlinked = 1;
640             }
641 
642             nxt_thread_mutex_unlock(&app->mutex);
643 
644             if (unlinked) {
645                 nxt_mp_release(r->mem_pool);
646             }
647         }
648     }
649 
650     if (app != NULL) {
651         nxt_router_app_use(task, app, -1);
652 
653         req_rpc_data->app = NULL;
654     }
655 
656     if (req_rpc_data->msg_info.body_fd != -1) {
657         nxt_fd_close(req_rpc_data->msg_info.body_fd);
658 
659         req_rpc_data->msg_info.body_fd = -1;
660     }
661 
662     if (req_rpc_data->rpc_cancel) {
663         req_rpc_data->rpc_cancel = 0;
664 
665         nxt_port_rpc_cancel(task, task->thread->engine->port,
666                             req_rpc_data->stream);
667     }
668 }
669 
670 
671 static void
nxt_router_new_port_handler(nxt_task_t * task,nxt_port_recv_msg_t * msg)672 nxt_router_new_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
673 {
674     nxt_int_t      res;
675     nxt_app_t      *app;
676     nxt_port_t     *port, *main_app_port;
677     nxt_runtime_t  *rt;
678 
679     nxt_port_new_port_handler(task, msg);
680 
681     port = msg->u.new_port;
682 
683     if (port != NULL && port->type == NXT_PROCESS_CONTROLLER) {
684         nxt_router_greet_controller(task, msg->u.new_port);
685     }
686 
687     if (port != NULL && port->type == NXT_PROCESS_PROTOTYPE)  {
688         nxt_port_rpc_handler(task, msg);
689 
690         return;
691     }
692 
693     if (port == NULL || port->type != NXT_PROCESS_APP) {
694 
695         if (msg->port_msg.stream == 0) {
696             return;
697         }
698 
699         msg->port_msg.type = _NXT_PORT_MSG_RPC_ERROR;
700 
701     } else {
702         if (msg->fd[1] != -1) {
703             res = nxt_router_port_queue_map(task, port, msg->fd[1]);
704             if (nxt_slow_path(res != NXT_OK)) {
705                 return;
706             }
707 
708             nxt_fd_close(msg->fd[1]);
709             msg->fd[1] = -1;
710         }
711     }
712 
713     if (msg->port_msg.stream != 0) {
714         nxt_port_rpc_handler(task, msg);
715         return;
716     }
717 
718     nxt_debug(task, "new port id %d (%d)", port->id, port->type);
719 
720     /*
721      * Port with "id == 0" is application 'main' port and it always
722      * should come with non-zero stream.
723      */
724     nxt_assert(port->id != 0);
725 
726     /* Find 'main' app port and get app reference. */
727     rt = task->thread->runtime;
728 
729     /*
730      * It is safe to access 'runtime->ports' hash because 'NEW_PORT'
731      * sent to main port (with id == 0) and processed in main thread.
732      */
733     main_app_port = nxt_port_hash_find(&rt->ports, port->pid, 0);
734     nxt_assert(main_app_port != NULL);
735 
736     app = main_app_port->app;
737 
738     if (nxt_fast_path(app != NULL)) {
739         nxt_thread_mutex_lock(&app->mutex);
740 
741         /* TODO here should be find-and-add code because there can be
742            port waiters in port_hash */
743         nxt_port_hash_add(&app->port_hash, port);
744         app->port_hash_count++;
745 
746         nxt_thread_mutex_unlock(&app->mutex);
747 
748         port->app = app;
749     }
750 
751     port->main_app_port = main_app_port;
752 
753     nxt_port_socket_write(task, port, NXT_PORT_MSG_PORT_ACK, -1, 0, 0, NULL);
754 }
755 
756 
757 static void
nxt_router_conf_data_handler(nxt_task_t * task,nxt_port_recv_msg_t * msg)758 nxt_router_conf_data_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
759 {
760     void                    *p;
761     size_t                  size;
762     nxt_int_t               ret;
763     nxt_port_t              *port;
764     nxt_router_temp_conf_t  *tmcf;
765 
766     port = nxt_runtime_port_find(task->thread->runtime,
767                                  msg->port_msg.pid,
768                                  msg->port_msg.reply_port);
769     if (nxt_slow_path(port == NULL)) {
770         nxt_alert(task, "conf_data_handler: reply port not found");
771         return;
772     }
773 
774     p = MAP_FAILED;
775 
776     /*
777      * Ancient compilers like gcc 4.8.5 on CentOS 7 wants 'size' to be
778      * initialized in 'cleanup' section.
779      */
780     size = 0;
781 
782     tmcf = nxt_router_temp_conf(task);
783     if (nxt_slow_path(tmcf == NULL)) {
784         goto fail;
785     }
786 
787     if (nxt_slow_path(msg->fd[0] == -1)) {
788         nxt_alert(task, "conf_data_handler: invalid shm fd");
789         goto fail;
790     }
791 
792     if (nxt_buf_mem_used_size(&msg->buf->mem) != sizeof(size_t)) {
793         nxt_alert(task, "conf_data_handler: unexpected buffer size (%d)",
794                   (int) nxt_buf_mem_used_size(&msg->buf->mem));
795         goto fail;
796     }
797 
798     nxt_memcpy(&size, msg->buf->mem.pos, sizeof(size_t));
799 
800     p = nxt_mem_mmap(NULL, size, PROT_READ, MAP_SHARED, msg->fd[0], 0);
801 
802     nxt_fd_close(msg->fd[0]);
803     msg->fd[0] = -1;
804 
805     if (nxt_slow_path(p == MAP_FAILED)) {
806         goto fail;
807     }
808 
809     nxt_debug(task, "conf_data_handler(%uz): %*s", size, size, p);
810 
811     tmcf->router_conf->router = nxt_router;
812     tmcf->stream = msg->port_msg.stream;
813     tmcf->port = port;
814 
815     nxt_port_use(task, tmcf->port, 1);
816 
817     ret = nxt_router_conf_create(task, tmcf, p, nxt_pointer_to(p, size));
818 
819     if (nxt_fast_path(ret == NXT_OK)) {
820         nxt_router_conf_apply(task, tmcf, NULL);
821 
822     } else {
823         nxt_router_conf_error(task, tmcf);
824     }
825 
826     goto cleanup;
827 
828 fail:
829 
830     nxt_port_socket_write(task, port, NXT_PORT_MSG_RPC_ERROR, -1,
831                           msg->port_msg.stream, 0, NULL);
832 
833     if (tmcf != NULL) {
834         nxt_mp_release(tmcf->mem_pool);
835     }
836 
837 cleanup:
838 
839     if (p != MAP_FAILED) {
840         nxt_mem_munmap(p, size);
841     }
842 
843     if (msg->fd[0] != -1) {
844         nxt_fd_close(msg->fd[0]);
845         msg->fd[0] = -1;
846     }
847 }
848 
849 
850 static void
nxt_router_app_restart_handler(nxt_task_t * task,nxt_port_recv_msg_t * msg)851 nxt_router_app_restart_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
852 {
853     nxt_app_t            *app;
854     nxt_int_t            ret;
855     nxt_str_t            app_name;
856     nxt_port_t           *reply_port, *shared_port, *old_shared_port;
857     nxt_port_t           *proto_port;
858     nxt_port_msg_type_t  reply;
859 
860     reply_port = nxt_runtime_port_find(task->thread->runtime,
861                                        msg->port_msg.pid,
862                                        msg->port_msg.reply_port);
863     if (nxt_slow_path(reply_port == NULL)) {
864         nxt_alert(task, "app_restart_handler: reply port not found");
865         return;
866     }
867 
868     app_name.length = nxt_buf_mem_used_size(&msg->buf->mem);
869     app_name.start = msg->buf->mem.pos;
870 
871     nxt_debug(task, "app_restart_handler: %V", &app_name);
872 
873     app = nxt_router_app_find(&nxt_router->apps, &app_name);
874 
875     if (nxt_fast_path(app != NULL)) {
876         shared_port = nxt_port_new(task, NXT_SHARED_PORT_ID, nxt_pid,
877                                    NXT_PROCESS_APP);
878         if (nxt_slow_path(shared_port == NULL)) {
879             goto fail;
880         }
881 
882         ret = nxt_port_socket_init(task, shared_port, 0);
883         if (nxt_slow_path(ret != NXT_OK)) {
884             nxt_port_use(task, shared_port, -1);
885             goto fail;
886         }
887 
888         ret = nxt_router_app_queue_init(task, shared_port);
889         if (nxt_slow_path(ret != NXT_OK)) {
890             nxt_port_write_close(shared_port);
891             nxt_port_read_close(shared_port);
892             nxt_port_use(task, shared_port, -1);
893             goto fail;
894         }
895 
896         nxt_port_write_enable(task, shared_port);
897 
898         nxt_thread_mutex_lock(&app->mutex);
899 
900         proto_port = app->proto_port;
901 
902         if (proto_port != NULL) {
903             nxt_debug(task, "send QUIT to prototype '%V' pid %PI", &app->name,
904                       proto_port->pid);
905 
906             app->proto_port = NULL;
907             proto_port->app = NULL;
908         }
909 
910         app->generation++;
911 
912         shared_port->app = app;
913 
914         old_shared_port = app->shared_port;
915         old_shared_port->app = NULL;
916 
917         app->shared_port = shared_port;
918 
919         nxt_thread_mutex_unlock(&app->mutex);
920 
921         nxt_port_close(task, old_shared_port);
922         nxt_port_use(task, old_shared_port, -1);
923 
924         if (proto_port != NULL) {
925             (void) nxt_port_socket_write(task, proto_port, NXT_PORT_MSG_QUIT,
926                                          -1, 0, 0, NULL);
927 
928             nxt_port_close(task, proto_port);
929 
930             nxt_port_use(task, proto_port, -1);
931         }
932 
933         reply = NXT_PORT_MSG_RPC_READY_LAST;
934 
935     } else {
936 
937 fail:
938 
939         reply = NXT_PORT_MSG_RPC_ERROR;
940     }
941 
942     nxt_port_socket_write(task, reply_port, reply, -1, msg->port_msg.stream,
943                           0, NULL);
944 }
945 
946 
947 static void
nxt_router_status_handler(nxt_task_t * task,nxt_port_recv_msg_t * msg)948 nxt_router_status_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
949 {
950     u_char               *p;
951     size_t               alloc;
952     nxt_app_t            *app;
953     nxt_buf_t            *b;
954     nxt_uint_t           type;
955     nxt_port_t           *port;
956     nxt_status_app_t     *app_stat;
957     nxt_event_engine_t   *engine;
958     nxt_status_report_t  *report;
959 
960     port = nxt_runtime_port_find(task->thread->runtime,
961                                  msg->port_msg.pid,
962                                  msg->port_msg.reply_port);
963     if (nxt_slow_path(port == NULL)) {
964         nxt_alert(task, "nxt_router_status_handler(): reply port not found");
965         return;
966     }
967 
968     alloc = sizeof(nxt_status_report_t);
969 
970     nxt_queue_each(app, &nxt_router->apps, nxt_app_t, link) {
971 
972         alloc += sizeof(nxt_status_app_t) + app->name.length;
973 
974     } nxt_queue_loop;
975 
976     b = nxt_buf_mem_alloc(port->mem_pool, alloc, 0);
977     if (nxt_slow_path(b == NULL)) {
978         type = NXT_PORT_MSG_RPC_ERROR;
979         goto fail;
980     }
981 
982     report = (nxt_status_report_t *) b->mem.free;
983     b->mem.free = b->mem.end;
984 
985     nxt_memzero(report, sizeof(nxt_status_report_t));
986 
987     nxt_queue_each(engine, &nxt_router->engines, nxt_event_engine_t, link0) {
988 
989         report->accepted_conns += engine->accepted_conns_cnt;
990         report->idle_conns += engine->idle_conns_cnt;
991         report->closed_conns += engine->closed_conns_cnt;
992         report->requests += engine->requests_cnt;
993 
994     } nxt_queue_loop;
995 
996     report->apps_count = 0;
997     app_stat = report->apps;
998     p = b->mem.end;
999 
1000     nxt_queue_each(app, &nxt_router->apps, nxt_app_t, link) {
1001         p -= app->name.length;
1002 
1003         nxt_memcpy(p, app->name.start, app->name.length);
1004 
1005         app_stat->name.length = app->name.length;
1006         app_stat->name.start = (u_char *) (p - b->mem.pos);
1007 
1008         app_stat->active_requests = app->active_requests;
1009         app_stat->pending_processes = app->pending_processes;
1010         app_stat->processes = app->processes;
1011         app_stat->idle_processes = app->idle_processes;
1012 
1013         report->apps_count++;
1014         app_stat++;
1015     } nxt_queue_loop;
1016 
1017     type = NXT_PORT_MSG_RPC_READY_LAST;
1018 
1019 fail:
1020 
1021     nxt_port_socket_write(task, port, type, -1, msg->port_msg.stream, 0, b);
1022 }
1023 
1024 
1025 static void
nxt_router_app_process_remove_pid(nxt_task_t * task,nxt_port_t * port,void * data)1026 nxt_router_app_process_remove_pid(nxt_task_t *task, nxt_port_t *port,
1027     void *data)
1028 {
1029     union {
1030         nxt_pid_t  removed_pid;
1031         void       *data;
1032     } u;
1033 
1034     u.data = data;
1035 
1036     nxt_port_rpc_remove_peer(task, port, u.removed_pid);
1037 }
1038 
1039 
1040 static void
nxt_router_remove_pid_handler(nxt_task_t * task,nxt_port_recv_msg_t * msg)1041 nxt_router_remove_pid_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
1042 {
1043     nxt_event_engine_t  *engine;
1044 
1045     nxt_port_remove_pid_handler(task, msg);
1046 
1047     nxt_queue_each(engine, &nxt_router->engines, nxt_event_engine_t, link0)
1048     {
1049         if (nxt_fast_path(engine->port != NULL)) {
1050             nxt_port_post(task, engine->port, nxt_router_app_process_remove_pid,
1051                           msg->u.data);
1052         }
1053     }
1054     nxt_queue_loop;
1055 
1056     if (msg->port_msg.stream == 0) {
1057         return;
1058     }
1059 
1060     msg->port_msg.type = _NXT_PORT_MSG_RPC_ERROR;
1061 
1062     nxt_port_rpc_handler(task, msg);
1063 }
1064 
1065 
1066 static nxt_router_temp_conf_t *
nxt_router_temp_conf(nxt_task_t * task)1067 nxt_router_temp_conf(nxt_task_t *task)
1068 {
1069     nxt_mp_t                *mp, *tmp;
1070     nxt_router_conf_t       *rtcf;
1071     nxt_router_temp_conf_t  *tmcf;
1072 
1073     mp = nxt_mp_create(1024, 128, 256, 32);
1074     if (nxt_slow_path(mp == NULL)) {
1075         return NULL;
1076     }
1077 
1078     rtcf = nxt_mp_zget(mp, sizeof(nxt_router_conf_t));
1079     if (nxt_slow_path(rtcf == NULL)) {
1080         goto fail;
1081     }
1082 
1083     rtcf->mem_pool = mp;
1084 
1085     rtcf->tstr_state = nxt_tstr_state_new(mp, 0);
1086     if (nxt_slow_path(rtcf->tstr_state == NULL)) {
1087         goto fail;
1088     }
1089 
1090 #if (NXT_HAVE_NJS)
1091     nxt_http_register_js_proto(rtcf->tstr_state->jcf);
1092 #endif
1093 
1094     tmp = nxt_mp_create(1024, 128, 256, 32);
1095     if (nxt_slow_path(tmp == NULL)) {
1096         goto fail;
1097     }
1098 
1099     tmcf = nxt_mp_zget(tmp, sizeof(nxt_router_temp_conf_t));
1100     if (nxt_slow_path(tmcf == NULL)) {
1101         goto temp_fail;
1102     }
1103 
1104     tmcf->mem_pool = tmp;
1105     tmcf->router_conf = rtcf;
1106     tmcf->count = 1;
1107     tmcf->engine = task->thread->engine;
1108 
1109     tmcf->engines = nxt_array_create(tmcf->mem_pool, 4,
1110                                      sizeof(nxt_router_engine_conf_t));
1111     if (nxt_slow_path(tmcf->engines == NULL)) {
1112         goto temp_fail;
1113     }
1114 
1115     nxt_queue_init(&creating_sockets);
1116     nxt_queue_init(&pending_sockets);
1117     nxt_queue_init(&updating_sockets);
1118     nxt_queue_init(&keeping_sockets);
1119     nxt_queue_init(&deleting_sockets);
1120 
1121 #if (NXT_TLS)
1122     nxt_queue_init(&tmcf->tls);
1123 #endif
1124 
1125 #if (NXT_HAVE_NJS)
1126     nxt_queue_init(&tmcf->js_modules);
1127 #endif
1128 
1129     nxt_queue_init(&tmcf->apps);
1130     nxt_queue_init(&tmcf->previous);
1131 
1132     return tmcf;
1133 
1134 temp_fail:
1135 
1136     nxt_mp_destroy(tmp);
1137 
1138 fail:
1139 
1140     if (rtcf->tstr_state != NULL) {
1141         nxt_tstr_state_release(rtcf->tstr_state);
1142     }
1143 
1144     nxt_mp_destroy(mp);
1145 
1146     return NULL;
1147 }
1148 
1149 
1150 nxt_inline nxt_bool_t
nxt_router_app_can_start(nxt_app_t * app)1151 nxt_router_app_can_start(nxt_app_t *app)
1152 {
1153     return app->processes + app->pending_processes < app->max_processes
1154             && app->pending_processes < app->max_pending_processes;
1155 }
1156 
1157 
1158 nxt_inline nxt_bool_t
nxt_router_app_need_start(nxt_app_t * app)1159 nxt_router_app_need_start(nxt_app_t *app)
1160 {
1161     return (app->active_requests
1162               > app->port_hash_count + app->pending_processes)
1163            || (app->spare_processes
1164                 > app->idle_processes + app->pending_processes);
1165 }
1166 
1167 
1168 void
nxt_router_conf_apply(nxt_task_t * task,void * obj,void * data)1169 nxt_router_conf_apply(nxt_task_t *task, void *obj, void *data)
1170 {
1171     nxt_int_t                    ret;
1172     nxt_app_t                    *app;
1173     nxt_router_t                 *router;
1174     nxt_runtime_t                *rt;
1175     nxt_queue_link_t             *qlk;
1176     nxt_socket_conf_t            *skcf;
1177     nxt_router_conf_t            *rtcf;
1178     nxt_router_temp_conf_t       *tmcf;
1179     const nxt_event_interface_t  *interface;
1180 #if (NXT_TLS)
1181     nxt_router_tlssock_t         *tls;
1182 #endif
1183 #if (NXT_HAVE_NJS)
1184     nxt_router_js_module_t       *js_module;
1185 #endif
1186 
1187     tmcf = obj;
1188 
1189     qlk = nxt_queue_first(&pending_sockets);
1190 
1191     if (qlk != nxt_queue_tail(&pending_sockets)) {
1192         nxt_queue_remove(qlk);
1193         nxt_queue_insert_tail(&creating_sockets, qlk);
1194 
1195         skcf = nxt_queue_link_data(qlk, nxt_socket_conf_t, link);
1196 
1197         nxt_router_listen_socket_rpc_create(task, tmcf, skcf);
1198 
1199         return;
1200     }
1201 
1202 #if (NXT_TLS)
1203     qlk = nxt_queue_last(&tmcf->tls);
1204 
1205     if (qlk != nxt_queue_head(&tmcf->tls)) {
1206         nxt_queue_remove(qlk);
1207 
1208         tls = nxt_queue_link_data(qlk, nxt_router_tlssock_t, link);
1209 
1210         nxt_cert_store_get(task, &tls->name, tmcf->mem_pool,
1211                            nxt_router_tls_rpc_handler, tls);
1212         return;
1213     }
1214 #endif
1215 
1216 #if (NXT_HAVE_NJS)
1217     qlk = nxt_queue_last(&tmcf->js_modules);
1218 
1219     if (qlk != nxt_queue_head(&tmcf->js_modules)) {
1220         nxt_queue_remove(qlk);
1221 
1222         js_module = nxt_queue_link_data(qlk, nxt_router_js_module_t, link);
1223 
1224         nxt_script_store_get(task, &js_module->name, tmcf->mem_pool,
1225                              nxt_router_js_module_rpc_handler, js_module);
1226         return;
1227     }
1228 #endif
1229 
1230     rtcf = tmcf->router_conf;
1231 
1232     ret = nxt_tstr_state_done(rtcf->tstr_state, NULL);
1233     if (nxt_slow_path(ret != NXT_OK)) {
1234         goto fail;
1235     }
1236 
1237     nxt_queue_each(app, &tmcf->apps, nxt_app_t, link) {
1238 
1239         if (nxt_router_app_need_start(app)) {
1240             nxt_router_app_rpc_create(task, tmcf, app);
1241             return;
1242         }
1243 
1244     } nxt_queue_loop;
1245 
1246     if (rtcf->access_log != NULL && rtcf->access_log->fd == -1) {
1247         nxt_router_access_log_open(task, tmcf);
1248         return;
1249     }
1250 
1251     rt = task->thread->runtime;
1252 
1253     interface = nxt_service_get(rt->services, "engine", NULL);
1254 
1255     router = rtcf->router;
1256 
1257     ret = nxt_router_engines_create(task, router, tmcf, interface);
1258     if (nxt_slow_path(ret != NXT_OK)) {
1259         goto fail;
1260     }
1261 
1262     ret = nxt_router_threads_create(task, rt, tmcf);
1263     if (nxt_slow_path(ret != NXT_OK)) {
1264         goto fail;
1265     }
1266 
1267     nxt_router_apps_sort(task, router, tmcf);
1268 
1269     nxt_router_apps_hash_use(task, rtcf, 1);
1270 
1271     nxt_router_engines_post(router, tmcf);
1272 
1273     nxt_queue_add(&router->sockets, &updating_sockets);
1274     nxt_queue_add(&router->sockets, &creating_sockets);
1275 
1276     if (router->access_log != rtcf->access_log) {
1277         nxt_router_access_log_use(&router->lock, rtcf->access_log);
1278 
1279         nxt_router_access_log_release(task, &router->lock, router->access_log);
1280 
1281         router->access_log = rtcf->access_log;
1282     }
1283 
1284     nxt_router_conf_ready(task, tmcf);
1285 
1286     return;
1287 
1288 fail:
1289 
1290     nxt_router_conf_error(task, tmcf);
1291 
1292     return;
1293 }
1294 
1295 
1296 static void
nxt_router_conf_wait(nxt_task_t * task,void * obj,void * data)1297 nxt_router_conf_wait(nxt_task_t *task, void *obj, void *data)
1298 {
1299     nxt_joint_job_t  *job;
1300 
1301     job = obj;
1302 
1303     nxt_router_conf_ready(task, job->tmcf);
1304 }
1305 
1306 
1307 static void
nxt_router_conf_ready(nxt_task_t * task,nxt_router_temp_conf_t * tmcf)1308 nxt_router_conf_ready(nxt_task_t *task, nxt_router_temp_conf_t *tmcf)
1309 {
1310     uint32_t               count;
1311     nxt_router_conf_t      *rtcf;
1312     nxt_thread_spinlock_t  *lock;
1313 
1314     nxt_debug(task, "temp conf %p count: %D", tmcf, tmcf->count);
1315 
1316     if (--tmcf->count > 0) {
1317         return;
1318     }
1319 
1320     nxt_router_conf_send(task, tmcf, NXT_PORT_MSG_RPC_READY_LAST);
1321 
1322     rtcf = tmcf->router_conf;
1323 
1324     lock = &rtcf->router->lock;
1325 
1326     nxt_thread_spin_lock(lock);
1327 
1328     count = rtcf->count;
1329 
1330     nxt_thread_spin_unlock(lock);
1331 
1332     nxt_debug(task, "rtcf %p: %D", rtcf, count);
1333 
1334     if (count == 0) {
1335         nxt_router_apps_hash_use(task, rtcf, -1);
1336 
1337         nxt_router_access_log_release(task, lock, rtcf->access_log);
1338 
1339         nxt_mp_destroy(rtcf->mem_pool);
1340     }
1341 
1342     nxt_mp_release(tmcf->mem_pool);
1343 }
1344 
1345 
1346 void
nxt_router_conf_error(nxt_task_t * task,nxt_router_temp_conf_t * tmcf)1347 nxt_router_conf_error(nxt_task_t *task, nxt_router_temp_conf_t *tmcf)
1348 {
1349     nxt_app_t          *app;
1350     nxt_socket_t       s;
1351     nxt_router_t       *router;
1352     nxt_queue_link_t   *qlk;
1353     nxt_socket_conf_t  *skcf;
1354     nxt_router_conf_t  *rtcf;
1355 
1356     nxt_alert(task, "failed to apply new conf");
1357 
1358     for (qlk = nxt_queue_first(&creating_sockets);
1359          qlk != nxt_queue_tail(&creating_sockets);
1360          qlk = nxt_queue_next(qlk))
1361     {
1362         skcf = nxt_queue_link_data(qlk, nxt_socket_conf_t, link);
1363         s = skcf->listen->socket;
1364 
1365         if (s != -1) {
1366             nxt_socket_close(task, s);
1367         }
1368 
1369         nxt_free(skcf->listen);
1370     }
1371 
1372     rtcf = tmcf->router_conf;
1373 
1374     nxt_queue_each(app, &tmcf->apps, nxt_app_t, link) {
1375 
1376         nxt_router_app_unlink(task, app);
1377 
1378     } nxt_queue_loop;
1379 
1380     router = rtcf->router;
1381 
1382     nxt_queue_add(&router->sockets, &keeping_sockets);
1383     nxt_queue_add(&router->sockets, &deleting_sockets);
1384 
1385     nxt_queue_add(&router->apps, &tmcf->previous);
1386 
1387     // TODO: new engines and threads
1388 
1389     nxt_router_access_log_release(task, &router->lock, rtcf->access_log);
1390 
1391     nxt_mp_destroy(rtcf->mem_pool);
1392 
1393     nxt_router_conf_send(task, tmcf, NXT_PORT_MSG_RPC_ERROR);
1394 
1395     nxt_mp_release(tmcf->mem_pool);
1396 }
1397 
1398 
1399 static void
nxt_router_conf_send(nxt_task_t * task,nxt_router_temp_conf_t * tmcf,nxt_port_msg_type_t type)1400 nxt_router_conf_send(nxt_task_t *task, nxt_router_temp_conf_t *tmcf,
1401     nxt_port_msg_type_t type)
1402 {
1403     nxt_port_socket_write(task, tmcf->port, type, -1, tmcf->stream, 0, NULL);
1404 
1405     nxt_port_use(task, tmcf->port, -1);
1406 
1407     tmcf->port = NULL;
1408 }
1409 
1410 
1411 static nxt_conf_map_t  nxt_router_conf[] = {
1412     {
1413         nxt_string("listeners_threads"),
1414         NXT_CONF_MAP_INT32,
1415         offsetof(nxt_router_conf_t, threads),
1416     },
1417 };
1418 
1419 
1420 static nxt_conf_map_t  nxt_router_app_conf[] = {
1421     {
1422         nxt_string("type"),
1423         NXT_CONF_MAP_STR,
1424         offsetof(nxt_router_app_conf_t, type),
1425     },
1426 
1427     {
1428         nxt_string("limits"),
1429         NXT_CONF_MAP_PTR,
1430         offsetof(nxt_router_app_conf_t, limits_value),
1431     },
1432 
1433     {
1434         nxt_string("processes"),
1435         NXT_CONF_MAP_INT32,
1436         offsetof(nxt_router_app_conf_t, processes),
1437     },
1438 
1439     {
1440         nxt_string("processes"),
1441         NXT_CONF_MAP_PTR,
1442         offsetof(nxt_router_app_conf_t, processes_value),
1443     },
1444 
1445     {
1446         nxt_string("targets"),
1447         NXT_CONF_MAP_PTR,
1448         offsetof(nxt_router_app_conf_t, targets_value),
1449     },
1450 };
1451 
1452 
1453 static nxt_conf_map_t  nxt_router_app_limits_conf[] = {
1454     {
1455         nxt_string("timeout"),
1456         NXT_CONF_MAP_MSEC,
1457         offsetof(nxt_router_app_conf_t, timeout),
1458     },
1459 };
1460 
1461 
1462 static nxt_conf_map_t  nxt_router_app_processes_conf[] = {
1463     {
1464         nxt_string("spare"),
1465         NXT_CONF_MAP_INT32,
1466         offsetof(nxt_router_app_conf_t, spare_processes),
1467     },
1468 
1469     {
1470         nxt_string("max"),
1471         NXT_CONF_MAP_INT32,
1472         offsetof(nxt_router_app_conf_t, max_processes),
1473     },
1474 
1475     {
1476         nxt_string("idle_timeout"),
1477         NXT_CONF_MAP_MSEC,
1478         offsetof(nxt_router_app_conf_t, idle_timeout),
1479     },
1480 };
1481 
1482 
1483 static nxt_conf_map_t  nxt_router_listener_conf[] = {
1484     {
1485         nxt_string("pass"),
1486         NXT_CONF_MAP_STR_COPY,
1487         offsetof(nxt_router_listener_conf_t, pass),
1488     },
1489 
1490     {
1491         nxt_string("application"),
1492         NXT_CONF_MAP_STR_COPY,
1493         offsetof(nxt_router_listener_conf_t, application),
1494     },
1495 };
1496 
1497 
1498 static nxt_conf_map_t  nxt_router_http_conf[] = {
1499     {
1500         nxt_string("header_buffer_size"),
1501         NXT_CONF_MAP_SIZE,
1502         offsetof(nxt_socket_conf_t, header_buffer_size),
1503     },
1504 
1505     {
1506         nxt_string("large_header_buffer_size"),
1507         NXT_CONF_MAP_SIZE,
1508         offsetof(nxt_socket_conf_t, large_header_buffer_size),
1509     },
1510 
1511     {
1512         nxt_string("large_header_buffers"),
1513         NXT_CONF_MAP_SIZE,
1514         offsetof(nxt_socket_conf_t, large_header_buffers),
1515     },
1516 
1517     {
1518         nxt_string("body_buffer_size"),
1519         NXT_CONF_MAP_SIZE,
1520         offsetof(nxt_socket_conf_t, body_buffer_size),
1521     },
1522 
1523     {
1524         nxt_string("max_body_size"),
1525         NXT_CONF_MAP_SIZE,
1526         offsetof(nxt_socket_conf_t, max_body_size),
1527     },
1528 
1529     {
1530         nxt_string("idle_timeout"),
1531         NXT_CONF_MAP_MSEC,
1532         offsetof(nxt_socket_conf_t, idle_timeout),
1533     },
1534 
1535     {
1536         nxt_string("header_read_timeout"),
1537         NXT_CONF_MAP_MSEC,
1538         offsetof(nxt_socket_conf_t, header_read_timeout),
1539     },
1540 
1541     {
1542         nxt_string("body_read_timeout"),
1543         NXT_CONF_MAP_MSEC,
1544         offsetof(nxt_socket_conf_t, body_read_timeout),
1545     },
1546 
1547     {
1548         nxt_string("send_timeout"),
1549         NXT_CONF_MAP_MSEC,
1550         offsetof(nxt_socket_conf_t, send_timeout),
1551     },
1552 
1553     {
1554         nxt_string("body_temp_path"),
1555         NXT_CONF_MAP_STR,
1556         offsetof(nxt_socket_conf_t, body_temp_path),
1557     },
1558 
1559     {
1560         nxt_string("discard_unsafe_fields"),
1561         NXT_CONF_MAP_INT8,
1562         offsetof(nxt_socket_conf_t, discard_unsafe_fields),
1563     },
1564 
1565     {
1566         nxt_string("log_route"),
1567         NXT_CONF_MAP_INT8,
1568         offsetof(nxt_socket_conf_t, log_route),
1569     },
1570 
1571     {
1572         nxt_string("server_version"),
1573         NXT_CONF_MAP_INT8,
1574         offsetof(nxt_socket_conf_t, server_version),
1575     },
1576 };
1577 
1578 
1579 static nxt_conf_map_t  nxt_router_websocket_conf[] = {
1580     {
1581         nxt_string("max_frame_size"),
1582         NXT_CONF_MAP_SIZE,
1583         offsetof(nxt_websocket_conf_t, max_frame_size),
1584     },
1585 
1586     {
1587         nxt_string("read_timeout"),
1588         NXT_CONF_MAP_MSEC,
1589         offsetof(nxt_websocket_conf_t, read_timeout),
1590     },
1591 
1592     {
1593         nxt_string("keepalive_interval"),
1594         NXT_CONF_MAP_MSEC,
1595         offsetof(nxt_websocket_conf_t, keepalive_interval),
1596     },
1597 
1598 };
1599 
1600 
1601 static nxt_int_t
nxt_router_conf_create(nxt_task_t * task,nxt_router_temp_conf_t * tmcf,u_char * start,u_char * end)1602 nxt_router_conf_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf,
1603     u_char *start, u_char *end)
1604 {
1605     u_char                      *p;
1606     size_t                      size;
1607     nxt_mp_t                    *mp, *app_mp;
1608     uint32_t                    next, next_target;
1609     nxt_int_t                   ret;
1610     nxt_str_t                   name, target;
1611     nxt_app_t                   *app, *prev;
1612     nxt_str_t                   *t, *s, *targets;
1613     nxt_uint_t                  n, i;
1614     nxt_port_t                  *port;
1615     nxt_router_t                *router;
1616     nxt_app_joint_t             *app_joint;
1617 #if (NXT_TLS)
1618     nxt_tls_init_t              *tls_init;
1619     nxt_conf_value_t            *certificate;
1620 #endif
1621 #if (NXT_HAVE_NJS)
1622     nxt_conf_value_t            *js_module;
1623 #endif
1624     nxt_conf_value_t            *root, *conf, *http, *value, *websocket;
1625     nxt_conf_value_t            *applications, *application;
1626     nxt_conf_value_t            *listeners, *listener;
1627     nxt_socket_conf_t           *skcf;
1628     nxt_router_conf_t           *rtcf;
1629     nxt_http_routes_t           *routes;
1630     nxt_event_engine_t          *engine;
1631     nxt_app_lang_module_t       *lang;
1632     nxt_router_app_conf_t       apcf;
1633     nxt_router_listener_conf_t  lscf;
1634 
1635     static nxt_str_t  http_path = nxt_string("/settings/http");
1636     static nxt_str_t  applications_path = nxt_string("/applications");
1637     static nxt_str_t  listeners_path = nxt_string("/listeners");
1638     static nxt_str_t  routes_path = nxt_string("/routes");
1639     static nxt_str_t  access_log_path = nxt_string("/access_log");
1640 #if (NXT_TLS)
1641     static nxt_str_t  certificate_path = nxt_string("/tls/certificate");
1642     static nxt_str_t  conf_commands_path = nxt_string("/tls/conf_commands");
1643     static nxt_str_t  conf_cache_path = nxt_string("/tls/session/cache_size");
1644     static nxt_str_t  conf_timeout_path = nxt_string("/tls/session/timeout");
1645     static nxt_str_t  conf_tickets = nxt_string("/tls/session/tickets");
1646 #endif
1647 #if (NXT_HAVE_NJS)
1648     static nxt_str_t  js_module_path = nxt_string("/settings/js_module");
1649 #endif
1650     static nxt_str_t  static_path = nxt_string("/settings/http/static");
1651     static nxt_str_t  websocket_path = nxt_string("/settings/http/websocket");
1652     static nxt_str_t  forwarded_path = nxt_string("/forwarded");
1653     static nxt_str_t  client_ip_path = nxt_string("/client_ip");
1654 
1655     root = nxt_conf_json_parse(tmcf->mem_pool, start, end, NULL);
1656     if (root == NULL) {
1657         nxt_alert(task, "configuration parsing error");
1658         return NXT_ERROR;
1659     }
1660 
1661     rtcf = tmcf->router_conf;
1662     mp = rtcf->mem_pool;
1663 
1664     ret = nxt_conf_map_object(mp, root, nxt_router_conf,
1665                               nxt_nitems(nxt_router_conf), rtcf);
1666     if (ret != NXT_OK) {
1667         nxt_alert(task, "root map error");
1668         return NXT_ERROR;
1669     }
1670 
1671     if (rtcf->threads == 0) {
1672         rtcf->threads = nxt_ncpu;
1673     }
1674 
1675     conf = nxt_conf_get_path(root, &static_path);
1676 
1677     ret = nxt_router_conf_process_static(task, rtcf, conf);
1678     if (nxt_slow_path(ret != NXT_OK)) {
1679         return NXT_ERROR;
1680     }
1681 
1682     router = rtcf->router;
1683 
1684     applications = nxt_conf_get_path(root, &applications_path);
1685 
1686     if (applications != NULL) {
1687         next = 0;
1688 
1689         for ( ;; ) {
1690             application = nxt_conf_next_object_member(applications,
1691                                                       &name, &next);
1692             if (application == NULL) {
1693                 break;
1694             }
1695 
1696             nxt_debug(task, "application \"%V\"", &name);
1697 
1698             size = nxt_conf_json_length(application, NULL);
1699 
1700             app_mp = nxt_mp_create(4096, 128, 1024, 64);
1701             if (nxt_slow_path(app_mp == NULL)) {
1702                 goto fail;
1703             }
1704 
1705             app = nxt_mp_get(app_mp, sizeof(nxt_app_t) + name.length + size);
1706             if (app == NULL) {
1707                 goto app_fail;
1708             }
1709 
1710             nxt_memzero(app, sizeof(nxt_app_t));
1711 
1712             app->mem_pool = app_mp;
1713 
1714             app->name.start = nxt_pointer_to(app, sizeof(nxt_app_t));
1715             app->conf.start = nxt_pointer_to(app, sizeof(nxt_app_t)
1716                                                   + name.length);
1717 
1718             p = nxt_conf_json_print(app->conf.start, application, NULL);
1719             app->conf.length = p - app->conf.start;
1720 
1721             nxt_assert(app->conf.length <= size);
1722 
1723             nxt_debug(task, "application conf \"%V\"", &app->conf);
1724 
1725             prev = nxt_router_app_find(&router->apps, &name);
1726 
1727             if (prev != NULL && nxt_strstr_eq(&app->conf, &prev->conf)) {
1728                 nxt_mp_destroy(app_mp);
1729 
1730                 nxt_queue_remove(&prev->link);
1731                 nxt_queue_insert_tail(&tmcf->previous, &prev->link);
1732 
1733                 ret = nxt_router_apps_hash_add(rtcf, prev);
1734                 if (nxt_slow_path(ret != NXT_OK)) {
1735                     goto fail;
1736                 }
1737 
1738                 continue;
1739             }
1740 
1741             apcf.processes = 1;
1742             apcf.max_processes = 1;
1743             apcf.spare_processes = 0;
1744             apcf.timeout = 0;
1745             apcf.idle_timeout = 15000;
1746             apcf.limits_value = NULL;
1747             apcf.processes_value = NULL;
1748             apcf.targets_value = NULL;
1749 
1750             app_joint = nxt_malloc(sizeof(nxt_app_joint_t));
1751             if (nxt_slow_path(app_joint == NULL)) {
1752                 goto app_fail;
1753             }
1754 
1755             nxt_memzero(app_joint, sizeof(nxt_app_joint_t));
1756 
1757             ret = nxt_conf_map_object(mp, application, nxt_router_app_conf,
1758                                       nxt_nitems(nxt_router_app_conf), &apcf);
1759             if (ret != NXT_OK) {
1760                 nxt_alert(task, "application map error");
1761                 goto app_fail;
1762             }
1763 
1764             if (apcf.limits_value != NULL) {
1765 
1766                 if (nxt_conf_type(apcf.limits_value) != NXT_CONF_OBJECT) {
1767                     nxt_alert(task, "application limits is not object");
1768                     goto app_fail;
1769                 }
1770 
1771                 ret = nxt_conf_map_object(mp, apcf.limits_value,
1772                                         nxt_router_app_limits_conf,
1773                                         nxt_nitems(nxt_router_app_limits_conf),
1774                                         &apcf);
1775                 if (ret != NXT_OK) {
1776                     nxt_alert(task, "application limits map error");
1777                     goto app_fail;
1778                 }
1779             }
1780 
1781             if (apcf.processes_value != NULL
1782                 && nxt_conf_type(apcf.processes_value) == NXT_CONF_OBJECT)
1783             {
1784                 ret = nxt_conf_map_object(mp, apcf.processes_value,
1785                                      nxt_router_app_processes_conf,
1786                                      nxt_nitems(nxt_router_app_processes_conf),
1787                                      &apcf);
1788                 if (ret != NXT_OK) {
1789                     nxt_alert(task, "application processes map error");
1790                     goto app_fail;
1791                 }
1792 
1793             } else {
1794                 apcf.max_processes = apcf.processes;
1795                 apcf.spare_processes = apcf.processes;
1796             }
1797 
1798             if (apcf.targets_value != NULL) {
1799                 n = nxt_conf_object_members_count(apcf.targets_value);
1800 
1801                 targets = nxt_mp_get(app_mp, sizeof(nxt_str_t) * n);
1802                 if (nxt_slow_path(targets == NULL)) {
1803                     goto app_fail;
1804                 }
1805 
1806                 next_target = 0;
1807 
1808                 for (i = 0; i < n; i++) {
1809                     (void) nxt_conf_next_object_member(apcf.targets_value,
1810                                                        &target, &next_target);
1811 
1812                     s = nxt_str_dup(app_mp, &targets[i], &target);
1813                     if (nxt_slow_path(s == NULL)) {
1814                         goto app_fail;
1815                     }
1816                 }
1817 
1818             } else {
1819                 targets = NULL;
1820             }
1821 
1822             nxt_debug(task, "application type: %V", &apcf.type);
1823             nxt_debug(task, "application processes: %D", apcf.processes);
1824             nxt_debug(task, "application request timeout: %M", apcf.timeout);
1825 
1826             lang = nxt_app_lang_module(task->thread->runtime, &apcf.type);
1827 
1828             if (lang == NULL) {
1829                 nxt_alert(task, "unknown application type: \"%V\"", &apcf.type);
1830                 goto app_fail;
1831             }
1832 
1833             nxt_debug(task, "application language module: \"%s\"", lang->file);
1834 
1835             ret = nxt_thread_mutex_create(&app->mutex);
1836             if (ret != NXT_OK) {
1837                 goto app_fail;
1838             }
1839 
1840             nxt_queue_init(&app->ports);
1841             nxt_queue_init(&app->spare_ports);
1842             nxt_queue_init(&app->idle_ports);
1843             nxt_queue_init(&app->ack_waiting_req);
1844 
1845             app->name.length = name.length;
1846             nxt_memcpy(app->name.start, name.start, name.length);
1847 
1848             app->type = lang->type;
1849             app->max_processes = apcf.max_processes;
1850             app->spare_processes = apcf.spare_processes;
1851             app->max_pending_processes = apcf.spare_processes
1852                                          ? apcf.spare_processes : 1;
1853             app->timeout = apcf.timeout;
1854             app->idle_timeout = apcf.idle_timeout;
1855 
1856             app->targets = targets;
1857 
1858             engine = task->thread->engine;
1859 
1860             app->engine = engine;
1861 
1862             app->adjust_idle_work.handler = nxt_router_adjust_idle_timer;
1863             app->adjust_idle_work.task = &engine->task;
1864             app->adjust_idle_work.obj = app;
1865 
1866             nxt_queue_insert_tail(&tmcf->apps, &app->link);
1867 
1868             ret = nxt_router_apps_hash_add(rtcf, app);
1869             if (nxt_slow_path(ret != NXT_OK)) {
1870                 goto app_fail;
1871             }
1872 
1873             nxt_router_app_use(task, app, 1);
1874 
1875             app->joint = app_joint;
1876 
1877             app_joint->use_count = 1;
1878             app_joint->app = app;
1879 
1880             app_joint->idle_timer.bias = NXT_TIMER_DEFAULT_BIAS;
1881             app_joint->idle_timer.work_queue = &engine->fast_work_queue;
1882             app_joint->idle_timer.handler = nxt_router_app_idle_timeout;
1883             app_joint->idle_timer.task = &engine->task;
1884             app_joint->idle_timer.log = app_joint->idle_timer.task->log;
1885 
1886             app_joint->free_app_work.handler = nxt_router_free_app;
1887             app_joint->free_app_work.task = &engine->task;
1888             app_joint->free_app_work.obj = app_joint;
1889 
1890             port = nxt_port_new(task, NXT_SHARED_PORT_ID, nxt_pid,
1891                                 NXT_PROCESS_APP);
1892             if (nxt_slow_path(port == NULL)) {
1893                 return NXT_ERROR;
1894             }
1895 
1896             ret = nxt_port_socket_init(task, port, 0);
1897             if (nxt_slow_path(ret != NXT_OK)) {
1898                 nxt_port_use(task, port, -1);
1899                 return NXT_ERROR;
1900             }
1901 
1902             ret = nxt_router_app_queue_init(task, port);
1903             if (nxt_slow_path(ret != NXT_OK)) {
1904                 nxt_port_write_close(port);
1905                 nxt_port_read_close(port);
1906                 nxt_port_use(task, port, -1);
1907                 return NXT_ERROR;
1908             }
1909 
1910             nxt_port_write_enable(task, port);
1911             port->app = app;
1912 
1913             app->shared_port = port;
1914 
1915             nxt_thread_mutex_create(&app->outgoing.mutex);
1916         }
1917     }
1918 
1919     conf = nxt_conf_get_path(root, &routes_path);
1920     if (nxt_fast_path(conf != NULL)) {
1921         routes = nxt_http_routes_create(task, tmcf, conf);
1922         if (nxt_slow_path(routes == NULL)) {
1923             return NXT_ERROR;
1924         }
1925 
1926         rtcf->routes = routes;
1927     }
1928 
1929     ret = nxt_upstreams_create(task, tmcf, root);
1930     if (nxt_slow_path(ret != NXT_OK)) {
1931         return ret;
1932     }
1933 
1934     http = nxt_conf_get_path(root, &http_path);
1935 #if 0
1936     if (http == NULL) {
1937         nxt_alert(task, "no \"http\" block");
1938         return NXT_ERROR;
1939     }
1940 #endif
1941 
1942     websocket = nxt_conf_get_path(root, &websocket_path);
1943 
1944     listeners = nxt_conf_get_path(root, &listeners_path);
1945 
1946     if (listeners != NULL) {
1947         next = 0;
1948 
1949         for ( ;; ) {
1950             listener = nxt_conf_next_object_member(listeners, &name, &next);
1951             if (listener == NULL) {
1952                 break;
1953             }
1954 
1955             skcf = nxt_router_socket_conf(task, tmcf, &name);
1956             if (skcf == NULL) {
1957                 goto fail;
1958             }
1959 
1960             nxt_memzero(&lscf, sizeof(lscf));
1961 
1962             ret = nxt_conf_map_object(mp, listener, nxt_router_listener_conf,
1963                                       nxt_nitems(nxt_router_listener_conf),
1964                                       &lscf);
1965             if (ret != NXT_OK) {
1966                 nxt_alert(task, "listener map error");
1967                 goto fail;
1968             }
1969 
1970             nxt_debug(task, "application: %V", &lscf.application);
1971 
1972             // STUB, default values if http block is not defined.
1973             skcf->header_buffer_size = 2048;
1974             skcf->large_header_buffer_size = 8192;
1975             skcf->large_header_buffers = 4;
1976             skcf->discard_unsafe_fields = 1;
1977             skcf->body_buffer_size = 16 * 1024;
1978             skcf->max_body_size = 8 * 1024 * 1024;
1979             skcf->proxy_header_buffer_size = 64 * 1024;
1980             skcf->proxy_buffer_size = 4096;
1981             skcf->proxy_buffers = 256;
1982             skcf->idle_timeout = 180 * 1000;
1983             skcf->header_read_timeout = 30 * 1000;
1984             skcf->body_read_timeout = 30 * 1000;
1985             skcf->send_timeout = 30 * 1000;
1986             skcf->proxy_timeout = 60 * 1000;
1987             skcf->proxy_send_timeout = 30 * 1000;
1988             skcf->proxy_read_timeout = 30 * 1000;
1989 
1990             skcf->server_version = 1;
1991 
1992             skcf->websocket_conf.max_frame_size = 1024 * 1024;
1993             skcf->websocket_conf.read_timeout = 60 * 1000;
1994             skcf->websocket_conf.keepalive_interval = 30 * 1000;
1995 
1996             nxt_str_null(&skcf->body_temp_path);
1997 
1998             if (http != NULL) {
1999                 ret = nxt_conf_map_object(mp, http, nxt_router_http_conf,
2000                                           nxt_nitems(nxt_router_http_conf),
2001                                           skcf);
2002                 if (ret != NXT_OK) {
2003                     nxt_alert(task, "http map error");
2004                     goto fail;
2005                 }
2006             }
2007 
2008             if (websocket != NULL) {
2009                 ret = nxt_conf_map_object(mp, websocket,
2010                                           nxt_router_websocket_conf,
2011                                           nxt_nitems(nxt_router_websocket_conf),
2012                                           &skcf->websocket_conf);
2013                 if (ret != NXT_OK) {
2014                     nxt_alert(task, "websocket map error");
2015                     goto fail;
2016                 }
2017             }
2018 
2019             t = &skcf->body_temp_path;
2020 
2021             if (t->length == 0) {
2022                 t->start = (u_char *) task->thread->runtime->tmp;
2023                 t->length = nxt_strlen(t->start);
2024             }
2025 
2026             conf = nxt_conf_get_path(listener, &forwarded_path);
2027 
2028             if (conf != NULL) {
2029                 skcf->forwarded = nxt_router_conf_forward(task, mp, conf);
2030                 if (nxt_slow_path(skcf->forwarded == NULL)) {
2031                     return NXT_ERROR;
2032                 }
2033             }
2034 
2035             conf = nxt_conf_get_path(listener, &client_ip_path);
2036 
2037             if (conf != NULL) {
2038                 skcf->client_ip = nxt_router_conf_forward(task, mp, conf);
2039                 if (nxt_slow_path(skcf->client_ip == NULL)) {
2040                     return NXT_ERROR;
2041                 }
2042             }
2043 
2044 #if (NXT_TLS)
2045             certificate = nxt_conf_get_path(listener, &certificate_path);
2046 
2047             if (certificate != NULL) {
2048                 tls_init = nxt_mp_get(tmcf->mem_pool, sizeof(nxt_tls_init_t));
2049                 if (nxt_slow_path(tls_init == NULL)) {
2050                     return NXT_ERROR;
2051                 }
2052 
2053                 tls_init->cache_size = 0;
2054                 tls_init->timeout = 300;
2055 
2056                 value = nxt_conf_get_path(listener, &conf_cache_path);
2057                 if (value != NULL) {
2058                     tls_init->cache_size = nxt_conf_get_number(value);
2059                 }
2060 
2061                 value = nxt_conf_get_path(listener, &conf_timeout_path);
2062                 if (value != NULL) {
2063                     tls_init->timeout = nxt_conf_get_number(value);
2064                 }
2065 
2066                 tls_init->conf_cmds = nxt_conf_get_path(listener,
2067                                                         &conf_commands_path);
2068 
2069                 tls_init->tickets_conf = nxt_conf_get_path(listener,
2070                                                            &conf_tickets);
2071 
2072                 n = nxt_conf_array_elements_count_or_1(certificate);
2073 
2074                 for (i = 0; i < n; i++) {
2075                     value = nxt_conf_get_array_element_or_itself(certificate,
2076                                                                  i);
2077                     nxt_assert(value != NULL);
2078 
2079                     ret = nxt_router_conf_tls_insert(tmcf, value, skcf,
2080                                                      tls_init, i == 0);
2081                     if (nxt_slow_path(ret != NXT_OK)) {
2082                         goto fail;
2083                     }
2084                 }
2085             }
2086 #endif
2087 
2088             skcf->listen->handler = nxt_http_conn_init;
2089             skcf->router_conf = rtcf;
2090             skcf->router_conf->count++;
2091 
2092             if (lscf.pass.length != 0) {
2093                 skcf->action = nxt_http_action_create(task, tmcf, &lscf.pass);
2094 
2095             /* COMPATIBILITY: listener application. */
2096             } else if (lscf.application.length > 0) {
2097                 skcf->action = nxt_http_pass_application(task, rtcf,
2098                                                          &lscf.application);
2099             }
2100 
2101             if (nxt_slow_path(skcf->action == NULL)) {
2102                 goto fail;
2103             }
2104         }
2105     }
2106 
2107     ret = nxt_http_routes_resolve(task, tmcf);
2108     if (nxt_slow_path(ret != NXT_OK)) {
2109         goto fail;
2110     }
2111 
2112     value = nxt_conf_get_path(root, &access_log_path);
2113 
2114     if (value != NULL) {
2115         ret = nxt_router_access_log_create(task, rtcf, value);
2116         if (nxt_slow_path(ret != NXT_OK)) {
2117             goto fail;
2118         }
2119     }
2120 
2121 #if (NXT_HAVE_NJS)
2122     js_module = nxt_conf_get_path(root, &js_module_path);
2123 
2124     if (js_module != NULL) {
2125         if (nxt_conf_type(js_module) == NXT_CONF_ARRAY) {
2126             n = nxt_conf_array_elements_count(js_module);
2127 
2128             for (i = 0; i < n; i++) {
2129                 value = nxt_conf_get_array_element(js_module, i);
2130 
2131                 ret = nxt_router_js_module_insert(tmcf, value);
2132                 if (nxt_slow_path(ret != NXT_OK)) {
2133                     goto fail;
2134                 }
2135             }
2136 
2137         } else {
2138             /* NXT_CONF_STRING */
2139 
2140             ret = nxt_router_js_module_insert(tmcf, js_module);
2141             if (nxt_slow_path(ret != NXT_OK)) {
2142                 goto fail;
2143             }
2144         }
2145     }
2146 
2147 #endif
2148 
2149     nxt_queue_add(&deleting_sockets, &router->sockets);
2150     nxt_queue_init(&router->sockets);
2151 
2152     return NXT_OK;
2153 
2154 app_fail:
2155 
2156     nxt_mp_destroy(app_mp);
2157 
2158 fail:
2159 
2160     nxt_queue_each(app, &tmcf->apps, nxt_app_t, link) {
2161 
2162         nxt_queue_remove(&app->link);
2163         nxt_thread_mutex_destroy(&app->mutex);
2164         nxt_mp_destroy(app->mem_pool);
2165 
2166     } nxt_queue_loop;
2167 
2168     return NXT_ERROR;
2169 }
2170 
2171 
2172 #if (NXT_TLS)
2173 
2174 static nxt_int_t
nxt_router_conf_tls_insert(nxt_router_temp_conf_t * tmcf,nxt_conf_value_t * value,nxt_socket_conf_t * skcf,nxt_tls_init_t * tls_init,nxt_bool_t last)2175 nxt_router_conf_tls_insert(nxt_router_temp_conf_t *tmcf,
2176     nxt_conf_value_t *value, nxt_socket_conf_t *skcf,
2177     nxt_tls_init_t *tls_init, nxt_bool_t last)
2178 {
2179     nxt_router_tlssock_t  *tls;
2180 
2181     tls = nxt_mp_get(tmcf->mem_pool, sizeof(nxt_router_tlssock_t));
2182     if (nxt_slow_path(tls == NULL)) {
2183         return NXT_ERROR;
2184     }
2185 
2186     tls->tls_init = tls_init;
2187     tls->socket_conf = skcf;
2188     tls->temp_conf = tmcf;
2189     tls->last = last;
2190     nxt_conf_get_string(value, &tls->name);
2191 
2192     nxt_queue_insert_tail(&tmcf->tls, &tls->link);
2193 
2194     return NXT_OK;
2195 }
2196 
2197 #endif
2198 
2199 
2200 #if (NXT_HAVE_NJS)
2201 
2202 static void
nxt_router_js_module_rpc_handler(nxt_task_t * task,nxt_port_recv_msg_t * msg,void * data)2203 nxt_router_js_module_rpc_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg,
2204     void *data)
2205 {
2206     nxt_int_t               ret;
2207     nxt_str_t               text;
2208     nxt_router_conf_t       *rtcf;
2209     nxt_router_temp_conf_t  *tmcf;
2210     nxt_router_js_module_t  *js_module;
2211 
2212     nxt_debug(task, "auto module rpc handler");
2213 
2214     js_module = data;
2215     tmcf = js_module->temp_conf;
2216 
2217     if (msg == NULL || msg->port_msg.type == _NXT_PORT_MSG_RPC_ERROR) {
2218         goto fail;
2219     }
2220 
2221     rtcf = tmcf->router_conf;
2222 
2223     ret = nxt_script_file_read(msg->fd[0], &text);
2224 
2225     nxt_fd_close(msg->fd[0]);
2226 
2227     if (nxt_slow_path(ret == NXT_ERROR)) {
2228         goto fail;
2229     }
2230 
2231     if (text.length > 0) {
2232         ret = nxt_js_add_module(rtcf->tstr_state->jcf, &js_module->name, &text);
2233 
2234         nxt_free(text.start);
2235 
2236         if (nxt_slow_path(ret == NXT_ERROR)) {
2237             goto fail;
2238         }
2239     }
2240 
2241     nxt_work_queue_add(&task->thread->engine->fast_work_queue,
2242                        nxt_router_conf_apply, task, tmcf, NULL);
2243     return;
2244 
2245 fail:
2246 
2247     nxt_router_conf_error(task, tmcf);
2248 }
2249 
2250 
2251 static nxt_int_t
nxt_router_js_module_insert(nxt_router_temp_conf_t * tmcf,nxt_conf_value_t * value)2252 nxt_router_js_module_insert(nxt_router_temp_conf_t *tmcf,
2253     nxt_conf_value_t *value)
2254 {
2255     nxt_router_js_module_t  *js_module;
2256 
2257     js_module = nxt_mp_get(tmcf->mem_pool, sizeof(nxt_router_js_module_t));
2258     if (nxt_slow_path(js_module == NULL)) {
2259         return NXT_ERROR;
2260     }
2261 
2262     js_module->temp_conf = tmcf;
2263     nxt_conf_get_string(value, &js_module->name);
2264 
2265     nxt_queue_insert_tail(&tmcf->js_modules, &js_module->link);
2266 
2267     return NXT_OK;
2268 }
2269 
2270 #endif
2271 
2272 
2273 static nxt_int_t
nxt_router_conf_process_static(nxt_task_t * task,nxt_router_conf_t * rtcf,nxt_conf_value_t * conf)2274 nxt_router_conf_process_static(nxt_task_t *task, nxt_router_conf_t *rtcf,
2275     nxt_conf_value_t *conf)
2276 {
2277     uint32_t          next, i;
2278     nxt_mp_t          *mp;
2279     nxt_str_t         *type, exten, str, *s;
2280     nxt_int_t         ret;
2281     nxt_uint_t        exts;
2282     nxt_conf_value_t  *mtypes_conf, *ext_conf, *value;
2283 
2284     static nxt_str_t  mtypes_path = nxt_string("/mime_types");
2285 
2286     mp = rtcf->mem_pool;
2287 
2288     ret = nxt_http_static_mtypes_init(mp, &rtcf->mtypes_hash);
2289     if (nxt_slow_path(ret != NXT_OK)) {
2290         return NXT_ERROR;
2291     }
2292 
2293     if (conf == NULL) {
2294         return NXT_OK;
2295     }
2296 
2297     mtypes_conf = nxt_conf_get_path(conf, &mtypes_path);
2298 
2299     if (mtypes_conf != NULL) {
2300         next = 0;
2301 
2302         for ( ;; ) {
2303             ext_conf = nxt_conf_next_object_member(mtypes_conf, &str, &next);
2304 
2305             if (ext_conf == NULL) {
2306                 break;
2307             }
2308 
2309             type = nxt_str_dup(mp, NULL, &str);
2310             if (nxt_slow_path(type == NULL)) {
2311                 return NXT_ERROR;
2312             }
2313 
2314             if (nxt_conf_type(ext_conf) == NXT_CONF_STRING) {
2315                 s = nxt_conf_get_string_dup(ext_conf, mp, &exten);
2316                 if (nxt_slow_path(s == NULL)) {
2317                     return NXT_ERROR;
2318                 }
2319 
2320                 ret = nxt_http_static_mtypes_hash_add(mp, &rtcf->mtypes_hash,
2321                                                       &exten, type);
2322                 if (nxt_slow_path(ret != NXT_OK)) {
2323                     return NXT_ERROR;
2324                 }
2325 
2326                 continue;
2327             }
2328 
2329             exts = nxt_conf_array_elements_count(ext_conf);
2330 
2331             for (i = 0; i < exts; i++) {
2332                 value = nxt_conf_get_array_element(ext_conf, i);
2333 
2334                 s = nxt_conf_get_string_dup(value, mp, &exten);
2335                 if (nxt_slow_path(s == NULL)) {
2336                     return NXT_ERROR;
2337                 }
2338 
2339                 ret = nxt_http_static_mtypes_hash_add(mp, &rtcf->mtypes_hash,
2340                                                       &exten, type);
2341                 if (nxt_slow_path(ret != NXT_OK)) {
2342                     return NXT_ERROR;
2343                 }
2344             }
2345         }
2346     }
2347 
2348     return NXT_OK;
2349 }
2350 
2351 
2352 static nxt_http_forward_t *
nxt_router_conf_forward(nxt_task_t * task,nxt_mp_t * mp,nxt_conf_value_t * conf)2353 nxt_router_conf_forward(nxt_task_t *task, nxt_mp_t *mp, nxt_conf_value_t *conf)
2354 {
2355     nxt_int_t                   ret;
2356     nxt_conf_value_t            *header_conf, *client_ip_conf, *protocol_conf;
2357     nxt_conf_value_t            *source_conf, *recursive_conf;
2358     nxt_http_forward_t          *forward;
2359     nxt_http_route_addr_rule_t  *source;
2360 
2361     static nxt_str_t  header_path = nxt_string("/header");
2362     static nxt_str_t  client_ip_path = nxt_string("/client_ip");
2363     static nxt_str_t  protocol_path = nxt_string("/protocol");
2364     static nxt_str_t  source_path = nxt_string("/source");
2365     static nxt_str_t  recursive_path = nxt_string("/recursive");
2366 
2367     header_conf = nxt_conf_get_path(conf, &header_path);
2368 
2369     if (header_conf != NULL) {
2370         client_ip_conf = nxt_conf_get_path(conf, &header_path);
2371         protocol_conf = NULL;
2372 
2373     } else {
2374         client_ip_conf = nxt_conf_get_path(conf, &client_ip_path);
2375         protocol_conf = nxt_conf_get_path(conf, &protocol_path);
2376     }
2377 
2378     source_conf = nxt_conf_get_path(conf, &source_path);
2379     recursive_conf = nxt_conf_get_path(conf, &recursive_path);
2380 
2381     if (source_conf == NULL
2382         || (protocol_conf == NULL && client_ip_conf == NULL))
2383     {
2384         return NULL;
2385     }
2386 
2387     forward = nxt_mp_zget(mp, sizeof(nxt_http_forward_t));
2388     if (nxt_slow_path(forward == NULL)) {
2389         return NULL;
2390     }
2391 
2392     source = nxt_http_route_addr_rule_create(task, mp, source_conf);
2393     if (nxt_slow_path(source == NULL)) {
2394         return NULL;
2395     }
2396 
2397     forward->source = source;
2398 
2399     if (recursive_conf != NULL) {
2400         forward->recursive = nxt_conf_get_boolean(recursive_conf);
2401     }
2402 
2403     if (client_ip_conf != NULL) {
2404         ret = nxt_router_conf_forward_header(mp, client_ip_conf,
2405                                              &forward->client_ip);
2406         if (nxt_slow_path(ret != NXT_OK)) {
2407             return NULL;
2408         }
2409     }
2410 
2411     if (protocol_conf != NULL) {
2412         ret = nxt_router_conf_forward_header(mp, protocol_conf,
2413                                              &forward->protocol);
2414         if (nxt_slow_path(ret != NXT_OK)) {
2415             return NULL;
2416         }
2417     }
2418 
2419     return forward;
2420 }
2421 
2422 
2423 static nxt_int_t
nxt_router_conf_forward_header(nxt_mp_t * mp,nxt_conf_value_t * conf,nxt_http_forward_header_t * fh)2424 nxt_router_conf_forward_header(nxt_mp_t *mp, nxt_conf_value_t *conf,
2425     nxt_http_forward_header_t *fh)
2426 {
2427     char      c;
2428     size_t    i;
2429     uint32_t  hash;
2430 
2431     fh->header = nxt_conf_get_string_dup(conf, mp, NULL);
2432     if (nxt_slow_path(fh->header == NULL)) {
2433         return NXT_ERROR;
2434     }
2435 
2436     hash = NXT_HTTP_FIELD_HASH_INIT;
2437 
2438     for (i = 0; i < fh->header->length; i++) {
2439         c = fh->header->start[i];
2440         hash = nxt_http_field_hash_char(hash, nxt_lowcase(c));
2441     }
2442 
2443     hash = nxt_http_field_hash_end(hash) & 0xFFFF;
2444 
2445     fh->header_hash = hash;
2446 
2447     return NXT_OK;
2448 }
2449 
2450 
2451 static nxt_app_t *
nxt_router_app_find(nxt_queue_t * queue,nxt_str_t * name)2452 nxt_router_app_find(nxt_queue_t *queue, nxt_str_t *name)
2453 {
2454     nxt_app_t  *app;
2455 
2456     nxt_queue_each(app, queue, nxt_app_t, link) {
2457 
2458         if (nxt_strstr_eq(name, &app->name)) {
2459             return app;
2460         }
2461 
2462     } nxt_queue_loop;
2463 
2464     return NULL;
2465 }
2466 
2467 
2468 static nxt_int_t
nxt_router_app_queue_init(nxt_task_t * task,nxt_port_t * port)2469 nxt_router_app_queue_init(nxt_task_t *task, nxt_port_t *port)
2470 {
2471     void       *mem;
2472     nxt_int_t  fd;
2473 
2474     fd = nxt_shm_open(task, sizeof(nxt_app_queue_t));
2475     if (nxt_slow_path(fd == -1)) {
2476         return NXT_ERROR;
2477     }
2478 
2479     mem = nxt_mem_mmap(NULL, sizeof(nxt_app_queue_t),
2480                        PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
2481     if (nxt_slow_path(mem == MAP_FAILED)) {
2482         nxt_fd_close(fd);
2483 
2484         return NXT_ERROR;
2485     }
2486 
2487     nxt_app_queue_init(mem);
2488 
2489     port->queue_fd = fd;
2490     port->queue = mem;
2491 
2492     return NXT_OK;
2493 }
2494 
2495 
2496 static nxt_int_t
nxt_router_port_queue_init(nxt_task_t * task,nxt_port_t * port)2497 nxt_router_port_queue_init(nxt_task_t *task, nxt_port_t *port)
2498 {
2499     void       *mem;
2500     nxt_int_t  fd;
2501 
2502     fd = nxt_shm_open(task, sizeof(nxt_port_queue_t));
2503     if (nxt_slow_path(fd == -1)) {
2504         return NXT_ERROR;
2505     }
2506 
2507     mem = nxt_mem_mmap(NULL, sizeof(nxt_port_queue_t),
2508                        PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
2509     if (nxt_slow_path(mem == MAP_FAILED)) {
2510         nxt_fd_close(fd);
2511 
2512         return NXT_ERROR;
2513     }
2514 
2515     nxt_port_queue_init(mem);
2516 
2517     port->queue_fd = fd;
2518     port->queue = mem;
2519 
2520     return NXT_OK;
2521 }
2522 
2523 
2524 static nxt_int_t
nxt_router_port_queue_map(nxt_task_t * task,nxt_port_t * port,nxt_fd_t fd)2525 nxt_router_port_queue_map(nxt_task_t *task, nxt_port_t *port, nxt_fd_t fd)
2526 {
2527     void  *mem;
2528 
2529     nxt_assert(fd != -1);
2530 
2531     mem = nxt_mem_mmap(NULL, sizeof(nxt_port_queue_t),
2532                        PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
2533     if (nxt_slow_path(mem == MAP_FAILED)) {
2534 
2535         return NXT_ERROR;
2536     }
2537 
2538     port->queue = mem;
2539 
2540     return NXT_OK;
2541 }
2542 
2543 
2544 static const nxt_lvlhsh_proto_t  nxt_router_apps_hash_proto  nxt_aligned(64) = {
2545     NXT_LVLHSH_DEFAULT,
2546     nxt_router_apps_hash_test,
2547     nxt_mp_lvlhsh_alloc,
2548     nxt_mp_lvlhsh_free,
2549 };
2550 
2551 
2552 static nxt_int_t
nxt_router_apps_hash_test(nxt_lvlhsh_query_t * lhq,void * data)2553 nxt_router_apps_hash_test(nxt_lvlhsh_query_t *lhq, void *data)
2554 {
2555     nxt_app_t  *app;
2556 
2557     app = data;
2558 
2559     return nxt_strstr_eq(&lhq->key, &app->name) ? NXT_OK : NXT_DECLINED;
2560 }
2561 
2562 
2563 static nxt_int_t
nxt_router_apps_hash_add(nxt_router_conf_t * rtcf,nxt_app_t * app)2564 nxt_router_apps_hash_add(nxt_router_conf_t *rtcf, nxt_app_t *app)
2565 {
2566     nxt_lvlhsh_query_t  lhq;
2567 
2568     lhq.key_hash = nxt_djb_hash(app->name.start, app->name.length);
2569     lhq.replace = 0;
2570     lhq.key = app->name;
2571     lhq.value = app;
2572     lhq.proto = &nxt_router_apps_hash_proto;
2573     lhq.pool = rtcf->mem_pool;
2574 
2575     switch (nxt_lvlhsh_insert(&rtcf->apps_hash, &lhq)) {
2576 
2577     case NXT_OK:
2578         return NXT_OK;
2579 
2580     case NXT_DECLINED:
2581         nxt_thread_log_alert("router app hash adding failed: "
2582                              "\"%V\" is already in hash", &lhq.key);
2583         /* Fall through. */
2584     default:
2585         return NXT_ERROR;
2586     }
2587 }
2588 
2589 
2590 static nxt_app_t *
nxt_router_apps_hash_get(nxt_router_conf_t * rtcf,nxt_str_t * name)2591 nxt_router_apps_hash_get(nxt_router_conf_t *rtcf, nxt_str_t *name)
2592 {
2593     nxt_lvlhsh_query_t  lhq;
2594 
2595     lhq.key_hash = nxt_djb_hash(name->start, name->length);
2596     lhq.key = *name;
2597     lhq.proto = &nxt_router_apps_hash_proto;
2598 
2599     if (nxt_lvlhsh_find(&rtcf->apps_hash, &lhq) != NXT_OK) {
2600         return NULL;
2601     }
2602 
2603     return lhq.value;
2604 }
2605 
2606 
2607 static void
nxt_router_apps_hash_use(nxt_task_t * task,nxt_router_conf_t * rtcf,int i)2608 nxt_router_apps_hash_use(nxt_task_t *task, nxt_router_conf_t *rtcf, int i)
2609 {
2610     nxt_app_t          *app;
2611     nxt_lvlhsh_each_t  lhe;
2612 
2613     nxt_lvlhsh_each_init(&lhe, &nxt_router_apps_hash_proto);
2614 
2615     for ( ;; ) {
2616         app = nxt_lvlhsh_each(&rtcf->apps_hash, &lhe);
2617 
2618         if (app == NULL) {
2619             break;
2620         }
2621 
2622         nxt_router_app_use(task, app, i);
2623     }
2624 }
2625 
2626 
2627 typedef struct {
2628     nxt_app_t  *app;
2629     nxt_int_t  target;
2630 } nxt_http_app_conf_t;
2631 
2632 
2633 nxt_int_t
nxt_router_application_init(nxt_router_conf_t * rtcf,nxt_str_t * name,nxt_str_t * target,nxt_http_action_t * action)2634 nxt_router_application_init(nxt_router_conf_t *rtcf, nxt_str_t *name,
2635     nxt_str_t *target, nxt_http_action_t *action)
2636 {
2637     nxt_app_t            *app;
2638     nxt_str_t            *targets;
2639     nxt_uint_t           i;
2640     nxt_http_app_conf_t  *conf;
2641 
2642     app = nxt_router_apps_hash_get(rtcf, name);
2643     if (app == NULL) {
2644         return NXT_DECLINED;
2645     }
2646 
2647     conf = nxt_mp_get(rtcf->mem_pool, sizeof(nxt_http_app_conf_t));
2648     if (nxt_slow_path(conf == NULL)) {
2649         return NXT_ERROR;
2650     }
2651 
2652     action->handler = nxt_http_application_handler;
2653     action->u.conf = conf;
2654 
2655     conf->app = app;
2656 
2657     if (target != NULL && target->length != 0) {
2658         targets = app->targets;
2659 
2660         for (i = 0; !nxt_strstr_eq(target, &targets[i]); i++);
2661 
2662         conf->target = i;
2663 
2664     } else {
2665         conf->target = 0;
2666     }
2667 
2668     return NXT_OK;
2669 }
2670 
2671 
2672 static nxt_socket_conf_t *
nxt_router_socket_conf(nxt_task_t * task,nxt_router_temp_conf_t * tmcf,nxt_str_t * name)2673 nxt_router_socket_conf(nxt_task_t *task, nxt_router_temp_conf_t *tmcf,
2674     nxt_str_t *name)
2675 {
2676     size_t               size;
2677     nxt_int_t            ret;
2678     nxt_bool_t           wildcard;
2679     nxt_sockaddr_t       *sa;
2680     nxt_socket_conf_t    *skcf;
2681     nxt_listen_socket_t  *ls;
2682 
2683     sa = nxt_sockaddr_parse(tmcf->mem_pool, name);
2684     if (nxt_slow_path(sa == NULL)) {
2685         nxt_alert(task, "invalid listener \"%V\"", name);
2686         return NULL;
2687     }
2688 
2689     sa->type = SOCK_STREAM;
2690 
2691     nxt_debug(task, "router listener: \"%*s\"",
2692               (size_t) sa->length, nxt_sockaddr_start(sa));
2693 
2694     skcf = nxt_mp_zget(tmcf->router_conf->mem_pool, sizeof(nxt_socket_conf_t));
2695     if (nxt_slow_path(skcf == NULL)) {
2696         return NULL;
2697     }
2698 
2699     size = nxt_sockaddr_size(sa);
2700 
2701     ret = nxt_router_listen_socket_find(tmcf, skcf, sa);
2702 
2703     if (ret != NXT_OK) {
2704 
2705         ls = nxt_zalloc(sizeof(nxt_listen_socket_t) + size);
2706         if (nxt_slow_path(ls == NULL)) {
2707             return NULL;
2708         }
2709 
2710         skcf->listen = ls;
2711 
2712         ls->sockaddr = nxt_pointer_to(ls, sizeof(nxt_listen_socket_t));
2713         nxt_memcpy(ls->sockaddr, sa, size);
2714 
2715         nxt_listen_socket_remote_size(ls);
2716 
2717         ls->socket = -1;
2718         ls->backlog = NXT_LISTEN_BACKLOG;
2719         ls->flags = NXT_NONBLOCK;
2720         ls->read_after_accept = 1;
2721     }
2722 
2723     switch (sa->u.sockaddr.sa_family) {
2724 #if (NXT_HAVE_UNIX_DOMAIN)
2725     case AF_UNIX:
2726         wildcard = 0;
2727         break;
2728 #endif
2729 #if (NXT_INET6)
2730     case AF_INET6:
2731         wildcard = IN6_IS_ADDR_UNSPECIFIED(&sa->u.sockaddr_in6.sin6_addr);
2732         break;
2733 #endif
2734     case AF_INET:
2735     default:
2736         wildcard = (sa->u.sockaddr_in.sin_addr.s_addr == INADDR_ANY);
2737         break;
2738     }
2739 
2740     if (!wildcard) {
2741         skcf->sockaddr = nxt_mp_zget(tmcf->router_conf->mem_pool, size);
2742         if (nxt_slow_path(skcf->sockaddr == NULL)) {
2743             return NULL;
2744         }
2745 
2746         nxt_memcpy(skcf->sockaddr, sa, size);
2747     }
2748 
2749     return skcf;
2750 }
2751 
2752 
2753 static nxt_int_t
nxt_router_listen_socket_find(nxt_router_temp_conf_t * tmcf,nxt_socket_conf_t * nskcf,nxt_sockaddr_t * sa)2754 nxt_router_listen_socket_find(nxt_router_temp_conf_t *tmcf,
2755     nxt_socket_conf_t *nskcf, nxt_sockaddr_t *sa)
2756 {
2757     nxt_router_t       *router;
2758     nxt_queue_link_t   *qlk;
2759     nxt_socket_conf_t  *skcf;
2760 
2761     router = tmcf->router_conf->router;
2762 
2763     for (qlk = nxt_queue_first(&router->sockets);
2764          qlk != nxt_queue_tail(&router->sockets);
2765          qlk = nxt_queue_next(qlk))
2766     {
2767         skcf = nxt_queue_link_data(qlk, nxt_socket_conf_t, link);
2768 
2769         if (nxt_sockaddr_cmp(skcf->listen->sockaddr, sa)) {
2770             nskcf->listen = skcf->listen;
2771 
2772             nxt_queue_remove(qlk);
2773             nxt_queue_insert_tail(&keeping_sockets, qlk);
2774 
2775             nxt_queue_insert_tail(&updating_sockets, &nskcf->link);
2776 
2777             return NXT_OK;
2778         }
2779     }
2780 
2781     nxt_queue_insert_tail(&pending_sockets, &nskcf->link);
2782 
2783     return NXT_DECLINED;
2784 }
2785 
2786 
2787 static void
nxt_router_listen_socket_rpc_create(nxt_task_t * task,nxt_router_temp_conf_t * tmcf,nxt_socket_conf_t * skcf)2788 nxt_router_listen_socket_rpc_create(nxt_task_t *task,
2789     nxt_router_temp_conf_t *tmcf, nxt_socket_conf_t *skcf)
2790 {
2791     size_t            size;
2792     uint32_t          stream;
2793     nxt_int_t         ret;
2794     nxt_buf_t         *b;
2795     nxt_port_t        *main_port, *router_port;
2796     nxt_runtime_t     *rt;
2797     nxt_socket_rpc_t  *rpc;
2798 
2799     rpc = nxt_mp_alloc(tmcf->mem_pool, sizeof(nxt_socket_rpc_t));
2800     if (rpc == NULL) {
2801         goto fail;
2802     }
2803 
2804     rpc->socket_conf = skcf;
2805     rpc->temp_conf = tmcf;
2806 
2807     size = nxt_sockaddr_size(skcf->listen->sockaddr);
2808 
2809     b = nxt_buf_mem_alloc(tmcf->mem_pool, size, 0);
2810     if (b == NULL) {
2811         goto fail;
2812     }
2813 
2814     b->completion_handler = nxt_buf_dummy_completion;
2815 
2816     b->mem.free = nxt_cpymem(b->mem.free, skcf->listen->sockaddr, size);
2817 
2818     rt = task->thread->runtime;
2819     main_port = rt->port_by_type[NXT_PROCESS_MAIN];
2820     router_port = rt->port_by_type[NXT_PROCESS_ROUTER];
2821 
2822     stream = nxt_port_rpc_register_handler(task, router_port,
2823                                            nxt_router_listen_socket_ready,
2824                                            nxt_router_listen_socket_error,
2825                                            main_port->pid, rpc);
2826     if (nxt_slow_path(stream == 0)) {
2827         goto fail;
2828     }
2829 
2830     ret = nxt_port_socket_write(task, main_port, NXT_PORT_MSG_SOCKET, -1,
2831                                 stream, router_port->id, b);
2832 
2833     if (nxt_slow_path(ret != NXT_OK)) {
2834         nxt_port_rpc_cancel(task, router_port, stream);
2835         goto fail;
2836     }
2837 
2838     return;
2839 
2840 fail:
2841 
2842     nxt_router_conf_error(task, tmcf);
2843 }
2844 
2845 
2846 static void
nxt_router_listen_socket_ready(nxt_task_t * task,nxt_port_recv_msg_t * msg,void * data)2847 nxt_router_listen_socket_ready(nxt_task_t *task, nxt_port_recv_msg_t *msg,
2848     void *data)
2849 {
2850     nxt_int_t         ret;
2851     nxt_socket_t      s;
2852     nxt_socket_rpc_t  *rpc;
2853 
2854     rpc = data;
2855 
2856     s = msg->fd[0];
2857 
2858     ret = nxt_socket_nonblocking(task, s);
2859     if (nxt_slow_path(ret != NXT_OK)) {
2860         goto fail;
2861     }
2862 
2863     nxt_socket_defer_accept(task, s, rpc->socket_conf->listen->sockaddr);
2864 
2865     ret = nxt_listen_socket(task, s, NXT_LISTEN_BACKLOG);
2866     if (nxt_slow_path(ret != NXT_OK)) {
2867         goto fail;
2868     }
2869 
2870     rpc->socket_conf->listen->socket = s;
2871 
2872     nxt_work_queue_add(&task->thread->engine->fast_work_queue,
2873                        nxt_router_conf_apply, task, rpc->temp_conf, NULL);
2874 
2875     return;
2876 
2877 fail:
2878 
2879     nxt_socket_close(task, s);
2880 
2881     nxt_router_conf_error(task, rpc->temp_conf);
2882 }
2883 
2884 
2885 static void
nxt_router_listen_socket_error(nxt_task_t * task,nxt_port_recv_msg_t * msg,void * data)2886 nxt_router_listen_socket_error(nxt_task_t *task, nxt_port_recv_msg_t *msg,
2887     void *data)
2888 {
2889     nxt_socket_rpc_t        *rpc;
2890     nxt_router_temp_conf_t  *tmcf;
2891 
2892     rpc = data;
2893     tmcf = rpc->temp_conf;
2894 
2895 #if 0
2896     u_char                  *p;
2897     size_t                  size;
2898     uint8_t                 error;
2899     nxt_buf_t               *in, *out;
2900     nxt_sockaddr_t          *sa;
2901 
2902     static nxt_str_t  socket_errors[] = {
2903         nxt_string("ListenerSystem"),
2904         nxt_string("ListenerNoIPv6"),
2905         nxt_string("ListenerPort"),
2906         nxt_string("ListenerInUse"),
2907         nxt_string("ListenerNoAddress"),
2908         nxt_string("ListenerNoAccess"),
2909         nxt_string("ListenerPath"),
2910     };
2911 
2912     sa = rpc->socket_conf->listen->sockaddr;
2913 
2914     in = nxt_buf_chk_make_plain(tmcf->mem_pool, msg->buf, msg->size);
2915 
2916     if (nxt_slow_path(in == NULL)) {
2917         return;
2918     }
2919 
2920     p = in->mem.pos;
2921 
2922     error = *p++;
2923 
2924     size = nxt_length("listen socket error: ")
2925            + nxt_length("{listener: \"\", code:\"\", message: \"\"}")
2926            + sa->length + socket_errors[error].length + (in->mem.free - p);
2927 
2928     out = nxt_buf_mem_alloc(tmcf->mem_pool, size, 0);
2929     if (nxt_slow_path(out == NULL)) {
2930         return;
2931     }
2932 
2933     out->mem.free = nxt_sprintf(out->mem.free, out->mem.end,
2934                         "listen socket error: "
2935                         "{listener: \"%*s\", code:\"%V\", message: \"%*s\"}",
2936                         (size_t) sa->length, nxt_sockaddr_start(sa),
2937                         &socket_errors[error], in->mem.free - p, p);
2938 
2939     nxt_debug(task, "%*s", out->mem.free - out->mem.pos, out->mem.pos);
2940 #endif
2941 
2942     nxt_router_conf_error(task, tmcf);
2943 }
2944 
2945 
2946 #if (NXT_TLS)
2947 
2948 static void
nxt_router_tls_rpc_handler(nxt_task_t * task,nxt_port_recv_msg_t * msg,void * data)2949 nxt_router_tls_rpc_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg,
2950     void *data)
2951 {
2952     nxt_mp_t                *mp;
2953     nxt_int_t               ret;
2954     nxt_tls_conf_t          *tlscf;
2955     nxt_router_tlssock_t    *tls;
2956     nxt_tls_bundle_conf_t   *bundle;
2957     nxt_router_temp_conf_t  *tmcf;
2958 
2959     nxt_debug(task, "tls rpc handler");
2960 
2961     tls = data;
2962     tmcf = tls->temp_conf;
2963 
2964     if (msg == NULL || msg->port_msg.type == _NXT_PORT_MSG_RPC_ERROR) {
2965         goto fail;
2966     }
2967 
2968     mp = tmcf->router_conf->mem_pool;
2969 
2970     if (tls->socket_conf->tls == NULL) {
2971         tlscf = nxt_mp_zget(mp, sizeof(nxt_tls_conf_t));
2972         if (nxt_slow_path(tlscf == NULL)) {
2973             goto fail;
2974         }
2975 
2976         tlscf->no_wait_shutdown = 1;
2977         tls->socket_conf->tls = tlscf;
2978 
2979     } else {
2980         tlscf = tls->socket_conf->tls;
2981     }
2982 
2983     tls->tls_init->conf = tlscf;
2984 
2985     bundle = nxt_mp_get(mp, sizeof(nxt_tls_bundle_conf_t));
2986     if (nxt_slow_path(bundle == NULL)) {
2987         goto fail;
2988     }
2989 
2990     if (nxt_slow_path(nxt_str_dup(mp, &bundle->name, &tls->name) == NULL)) {
2991         goto fail;
2992     }
2993 
2994     bundle->chain_file = msg->fd[0];
2995     bundle->next = tlscf->bundle;
2996     tlscf->bundle = bundle;
2997 
2998     ret = task->thread->runtime->tls->server_init(task, mp, tls->tls_init,
2999                                                   tls->last);
3000     if (nxt_slow_path(ret != NXT_OK)) {
3001         goto fail;
3002     }
3003 
3004     nxt_work_queue_add(&task->thread->engine->fast_work_queue,
3005                        nxt_router_conf_apply, task, tmcf, NULL);
3006     return;
3007 
3008 fail:
3009 
3010     nxt_router_conf_error(task, tmcf);
3011 }
3012 
3013 #endif
3014 
3015 
3016 static void
nxt_router_app_rpc_create(nxt_task_t * task,nxt_router_temp_conf_t * tmcf,nxt_app_t * app)3017 nxt_router_app_rpc_create(nxt_task_t *task,
3018     nxt_router_temp_conf_t *tmcf, nxt_app_t *app)
3019 {
3020     size_t         size;
3021     uint32_t       stream;
3022     nxt_fd_t       port_fd, queue_fd;
3023     nxt_int_t      ret;
3024     nxt_buf_t      *b;
3025     nxt_port_t     *router_port, *dport;
3026     nxt_runtime_t  *rt;
3027     nxt_app_rpc_t  *rpc;
3028 
3029     rt = task->thread->runtime;
3030 
3031     dport = app->proto_port;
3032 
3033     if (dport == NULL) {
3034         nxt_debug(task, "app '%V' prototype prefork", &app->name);
3035 
3036         size = app->name.length + 1 + app->conf.length;
3037 
3038         b = nxt_buf_mem_alloc(tmcf->mem_pool, size, 0);
3039         if (nxt_slow_path(b == NULL)) {
3040             goto fail;
3041         }
3042 
3043         b->completion_handler = nxt_buf_dummy_completion;
3044 
3045         nxt_buf_cpystr(b, &app->name);
3046         *b->mem.free++ = '\0';
3047         nxt_buf_cpystr(b, &app->conf);
3048 
3049         dport = rt->port_by_type[NXT_PROCESS_MAIN];
3050 
3051         port_fd = app->shared_port->pair[0];
3052         queue_fd = app->shared_port->queue_fd;
3053 
3054     } else {
3055         nxt_debug(task, "app '%V' prefork", &app->name);
3056 
3057         b = NULL;
3058         port_fd = -1;
3059         queue_fd = -1;
3060     }
3061 
3062     router_port = rt->port_by_type[NXT_PROCESS_ROUTER];
3063 
3064     rpc = nxt_port_rpc_register_handler_ex(task, router_port,
3065                                            nxt_router_app_prefork_ready,
3066                                            nxt_router_app_prefork_error,
3067                                            sizeof(nxt_app_rpc_t));
3068     if (nxt_slow_path(rpc == NULL)) {
3069         goto fail;
3070     }
3071 
3072     rpc->app = app;
3073     rpc->temp_conf = tmcf;
3074     rpc->proto = (b != NULL);
3075 
3076     stream = nxt_port_rpc_ex_stream(rpc);
3077 
3078     ret = nxt_port_socket_write2(task, dport, NXT_PORT_MSG_START_PROCESS,
3079                                  port_fd, queue_fd, stream, router_port->id, b);
3080     if (nxt_slow_path(ret != NXT_OK)) {
3081         nxt_port_rpc_cancel(task, router_port, stream);
3082         goto fail;
3083     }
3084 
3085     if (b == NULL) {
3086         nxt_port_rpc_ex_set_peer(task, router_port, rpc, dport->pid);
3087 
3088         app->pending_processes++;
3089     }
3090 
3091     return;
3092 
3093 fail:
3094 
3095     nxt_router_conf_error(task, tmcf);
3096 }
3097 
3098 
3099 static void
nxt_router_app_prefork_ready(nxt_task_t * task,nxt_port_recv_msg_t * msg,void * data)3100 nxt_router_app_prefork_ready(nxt_task_t *task, nxt_port_recv_msg_t *msg,
3101     void *data)
3102 {
3103     nxt_app_t           *app;
3104     nxt_port_t          *port;
3105     nxt_app_rpc_t       *rpc;
3106     nxt_event_engine_t  *engine;
3107 
3108     rpc = data;
3109     app = rpc->app;
3110 
3111     port = msg->u.new_port;
3112 
3113     nxt_assert(port != NULL);
3114     nxt_assert(port->id == 0);
3115 
3116     if (rpc->proto) {
3117         nxt_assert(app->proto_port == NULL);
3118         nxt_assert(port->type == NXT_PROCESS_PROTOTYPE);
3119 
3120         nxt_port_inc_use(port);
3121 
3122         app->proto_port = port;
3123         port->app = app;
3124 
3125         nxt_router_app_rpc_create(task, rpc->temp_conf, app);
3126 
3127         return;
3128     }
3129 
3130     nxt_assert(port->type == NXT_PROCESS_APP);
3131 
3132     port->app = app;
3133     port->main_app_port = port;
3134 
3135     app->pending_processes--;
3136     app->processes++;
3137     app->idle_processes++;
3138 
3139     engine = task->thread->engine;
3140 
3141     nxt_queue_insert_tail(&app->ports, &port->app_link);
3142     nxt_queue_insert_tail(&app->spare_ports, &port->idle_link);
3143 
3144     nxt_debug(task, "app '%V' move new port %PI:%d to spare_ports",
3145               &app->name, port->pid, port->id);
3146 
3147     nxt_port_hash_add(&app->port_hash, port);
3148     app->port_hash_count++;
3149 
3150     port->idle_start = 0;
3151 
3152     nxt_port_inc_use(port);
3153 
3154     nxt_port_socket_write(task, port, NXT_PORT_MSG_PORT_ACK, -1, 0, 0, NULL);
3155 
3156     nxt_work_queue_add(&engine->fast_work_queue,
3157                        nxt_router_conf_apply, task, rpc->temp_conf, NULL);
3158 }
3159 
3160 
3161 static void
nxt_router_app_prefork_error(nxt_task_t * task,nxt_port_recv_msg_t * msg,void * data)3162 nxt_router_app_prefork_error(nxt_task_t *task, nxt_port_recv_msg_t *msg,
3163     void *data)
3164 {
3165     nxt_app_t               *app;
3166     nxt_app_rpc_t           *rpc;
3167     nxt_router_temp_conf_t  *tmcf;
3168 
3169     rpc = data;
3170     app = rpc->app;
3171     tmcf = rpc->temp_conf;
3172 
3173     if (rpc->proto) {
3174         nxt_log(task, NXT_LOG_WARN, "failed to start prototype \"%V\"",
3175                 &app->name);
3176 
3177     } else {
3178         nxt_log(task, NXT_LOG_WARN, "failed to start application \"%V\"",
3179                 &app->name);
3180 
3181         app->pending_processes--;
3182     }
3183 
3184     nxt_router_conf_error(task, tmcf);
3185 }
3186 
3187 
3188 static nxt_int_t
nxt_router_engines_create(nxt_task_t * task,nxt_router_t * router,nxt_router_temp_conf_t * tmcf,const nxt_event_interface_t * interface)3189 nxt_router_engines_create(nxt_task_t *task, nxt_router_t *router,
3190     nxt_router_temp_conf_t *tmcf, const nxt_event_interface_t *interface)
3191 {
3192     nxt_int_t                 ret;
3193     nxt_uint_t                n, threads;
3194     nxt_queue_link_t          *qlk;
3195     nxt_router_engine_conf_t  *recf;
3196 
3197     threads = tmcf->router_conf->threads;
3198 
3199     tmcf->engines = nxt_array_create(tmcf->mem_pool, threads,
3200                                      sizeof(nxt_router_engine_conf_t));
3201     if (nxt_slow_path(tmcf->engines == NULL)) {
3202         return NXT_ERROR;
3203     }
3204 
3205     n = 0;
3206 
3207     for (qlk = nxt_queue_first(&router->engines);
3208          qlk != nxt_queue_tail(&router->engines);
3209          qlk = nxt_queue_next(qlk))
3210     {
3211         recf = nxt_array_zero_add(tmcf->engines);
3212         if (nxt_slow_path(recf == NULL)) {
3213             return NXT_ERROR;
3214         }
3215 
3216         recf->engine = nxt_queue_link_data(qlk, nxt_event_engine_t, link0);
3217 
3218         if (n < threads) {
3219             recf->action = NXT_ROUTER_ENGINE_KEEP;
3220             ret = nxt_router_engine_conf_update(tmcf, recf);
3221 
3222         } else {
3223             recf->action = NXT_ROUTER_ENGINE_DELETE;
3224             ret = nxt_router_engine_conf_delete(tmcf, recf);
3225         }
3226 
3227         if (nxt_slow_path(ret != NXT_OK)) {
3228             return ret;
3229         }
3230 
3231         n++;
3232     }
3233 
3234     tmcf->new_threads = n;
3235 
3236     while (n < threads) {
3237         recf = nxt_array_zero_add(tmcf->engines);
3238         if (nxt_slow_path(recf == NULL)) {
3239             return NXT_ERROR;
3240         }
3241 
3242         recf->action = NXT_ROUTER_ENGINE_ADD;
3243 
3244         recf->engine = nxt_event_engine_create(task, interface, NULL, 0, 0);
3245         if (nxt_slow_path(recf->engine == NULL)) {
3246             return NXT_ERROR;
3247         }
3248 
3249         ret = nxt_router_engine_conf_create(tmcf, recf);
3250         if (nxt_slow_path(ret != NXT_OK)) {
3251             return ret;
3252         }
3253 
3254         n++;
3255     }
3256 
3257     return NXT_OK;
3258 }
3259 
3260 
3261 static nxt_int_t
nxt_router_engine_conf_create(nxt_router_temp_conf_t * tmcf,nxt_router_engine_conf_t * recf)3262 nxt_router_engine_conf_create(nxt_router_temp_conf_t *tmcf,
3263     nxt_router_engine_conf_t *recf)
3264 {
3265     nxt_int_t  ret;
3266 
3267     ret = nxt_router_engine_joints_create(tmcf, recf, &creating_sockets,
3268                                           nxt_router_listen_socket_create);
3269     if (nxt_slow_path(ret != NXT_OK)) {
3270         return ret;
3271     }
3272 
3273     ret = nxt_router_engine_joints_create(tmcf, recf, &updating_sockets,
3274                                           nxt_router_listen_socket_create);
3275     if (nxt_slow_path(ret != NXT_OK)) {
3276         return ret;
3277     }
3278 
3279     return ret;
3280 }
3281 
3282 
3283 static nxt_int_t
nxt_router_engine_conf_update(nxt_router_temp_conf_t * tmcf,nxt_router_engine_conf_t * recf)3284 nxt_router_engine_conf_update(nxt_router_temp_conf_t *tmcf,
3285     nxt_router_engine_conf_t *recf)
3286 {
3287     nxt_int_t  ret;
3288 
3289     ret = nxt_router_engine_joints_create(tmcf, recf, &creating_sockets,
3290                                           nxt_router_listen_socket_create);
3291     if (nxt_slow_path(ret != NXT_OK)) {
3292         return ret;
3293     }
3294 
3295     ret = nxt_router_engine_joints_create(tmcf, recf, &updating_sockets,
3296                                           nxt_router_listen_socket_update);
3297     if (nxt_slow_path(ret != NXT_OK)) {
3298         return ret;
3299     }
3300 
3301     ret = nxt_router_engine_joints_delete(tmcf, recf, &deleting_sockets);
3302     if (nxt_slow_path(ret != NXT_OK)) {
3303         return ret;
3304     }
3305 
3306     return ret;
3307 }
3308 
3309 
3310 static nxt_int_t
nxt_router_engine_conf_delete(nxt_router_temp_conf_t * tmcf,nxt_router_engine_conf_t * recf)3311 nxt_router_engine_conf_delete(nxt_router_temp_conf_t *tmcf,
3312     nxt_router_engine_conf_t *recf)
3313 {
3314     nxt_int_t  ret;
3315 
3316     ret = nxt_router_engine_quit(tmcf, recf);
3317     if (nxt_slow_path(ret != NXT_OK)) {
3318         return ret;
3319     }
3320 
3321     ret = nxt_router_engine_joints_delete(tmcf, recf, &updating_sockets);
3322     if (nxt_slow_path(ret != NXT_OK)) {
3323         return ret;
3324     }
3325 
3326     return nxt_router_engine_joints_delete(tmcf, recf, &deleting_sockets);
3327 }
3328 
3329 
3330 static nxt_int_t
nxt_router_engine_joints_create(nxt_router_temp_conf_t * tmcf,nxt_router_engine_conf_t * recf,nxt_queue_t * sockets,nxt_work_handler_t handler)3331 nxt_router_engine_joints_create(nxt_router_temp_conf_t *tmcf,
3332     nxt_router_engine_conf_t *recf, nxt_queue_t *sockets,
3333     nxt_work_handler_t handler)
3334 {
3335     nxt_int_t                ret;
3336     nxt_joint_job_t          *job;
3337     nxt_queue_link_t         *qlk;
3338     nxt_socket_conf_t        *skcf;
3339     nxt_socket_conf_joint_t  *joint;
3340 
3341     for (qlk = nxt_queue_first(sockets);
3342          qlk != nxt_queue_tail(sockets);
3343          qlk = nxt_queue_next(qlk))
3344     {
3345         job = nxt_mp_get(tmcf->mem_pool, sizeof(nxt_joint_job_t));
3346         if (nxt_slow_path(job == NULL)) {
3347             return NXT_ERROR;
3348         }
3349 
3350         job->work.next = recf->jobs;
3351         recf->jobs = &job->work;
3352 
3353         job->task = tmcf->engine->task;
3354         job->work.handler = handler;
3355         job->work.task = &job->task;
3356         job->work.obj = job;
3357         job->tmcf = tmcf;
3358 
3359         tmcf->count++;
3360 
3361         joint = nxt_mp_alloc(tmcf->router_conf->mem_pool,
3362                              sizeof(nxt_socket_conf_joint_t));
3363         if (nxt_slow_path(joint == NULL)) {
3364             return NXT_ERROR;
3365         }
3366 
3367         job->work.data = joint;
3368 
3369         ret = nxt_upstreams_joint_create(tmcf, &joint->upstreams);
3370         if (nxt_slow_path(ret != NXT_OK)) {
3371             return ret;
3372         }
3373 
3374         joint->count = 1;
3375 
3376         skcf = nxt_queue_link_data(qlk, nxt_socket_conf_t, link);
3377         skcf->count++;
3378         joint->socket_conf = skcf;
3379 
3380         joint->engine = recf->engine;
3381     }
3382 
3383     return NXT_OK;
3384 }
3385 
3386 
3387 static nxt_int_t
nxt_router_engine_quit(nxt_router_temp_conf_t * tmcf,nxt_router_engine_conf_t * recf)3388 nxt_router_engine_quit(nxt_router_temp_conf_t *tmcf,
3389     nxt_router_engine_conf_t *recf)
3390 {
3391     nxt_joint_job_t  *job;
3392 
3393     job = nxt_mp_get(tmcf->mem_pool, sizeof(nxt_joint_job_t));
3394     if (nxt_slow_path(job == NULL)) {
3395         return NXT_ERROR;
3396     }
3397 
3398     job->work.next = recf->jobs;
3399     recf->jobs = &job->work;
3400 
3401     job->task = tmcf->engine->task;
3402     job->work.handler = nxt_router_worker_thread_quit;
3403     job->work.task = &job->task;
3404     job->work.obj = NULL;
3405     job->work.data = NULL;
3406     job->tmcf = NULL;
3407 
3408     return NXT_OK;
3409 }
3410 
3411 
3412 static nxt_int_t
nxt_router_engine_joints_delete(nxt_router_temp_conf_t * tmcf,nxt_router_engine_conf_t * recf,nxt_queue_t * sockets)3413 nxt_router_engine_joints_delete(nxt_router_temp_conf_t *tmcf,
3414     nxt_router_engine_conf_t *recf, nxt_queue_t *sockets)
3415 {
3416     nxt_joint_job_t   *job;
3417     nxt_queue_link_t  *qlk;
3418 
3419     for (qlk = nxt_queue_first(sockets);
3420          qlk != nxt_queue_tail(sockets);
3421          qlk = nxt_queue_next(qlk))
3422     {
3423         job = nxt_mp_get(tmcf->mem_pool, sizeof(nxt_joint_job_t));
3424         if (nxt_slow_path(job == NULL)) {
3425             return NXT_ERROR;
3426         }
3427 
3428         job->work.next = recf->jobs;
3429         recf->jobs = &job->work;
3430 
3431         job->task = tmcf->engine->task;
3432         job->work.handler = nxt_router_listen_socket_delete;
3433         job->work.task = &job->task;
3434         job->work.obj = job;
3435         job->work.data = nxt_queue_link_data(qlk, nxt_socket_conf_t, link);
3436         job->tmcf = tmcf;
3437 
3438         tmcf->count++;
3439     }
3440 
3441     return NXT_OK;
3442 }
3443 
3444 
3445 static nxt_int_t
nxt_router_threads_create(nxt_task_t * task,nxt_runtime_t * rt,nxt_router_temp_conf_t * tmcf)3446 nxt_router_threads_create(nxt_task_t *task, nxt_runtime_t *rt,
3447     nxt_router_temp_conf_t *tmcf)
3448 {
3449     nxt_int_t                 ret;
3450     nxt_uint_t                i, threads;
3451     nxt_router_engine_conf_t  *recf;
3452 
3453     recf = tmcf->engines->elts;
3454     threads = tmcf->router_conf->threads;
3455 
3456     for (i = tmcf->new_threads; i < threads; i++) {
3457         ret = nxt_router_thread_create(task, rt, recf[i].engine);
3458         if (nxt_slow_path(ret != NXT_OK)) {
3459             return ret;
3460         }
3461     }
3462 
3463     return NXT_OK;
3464 }
3465 
3466 
3467 static nxt_int_t
nxt_router_thread_create(nxt_task_t * task,nxt_runtime_t * rt,nxt_event_engine_t * engine)3468 nxt_router_thread_create(nxt_task_t *task, nxt_runtime_t *rt,
3469     nxt_event_engine_t *engine)
3470 {
3471     nxt_int_t            ret;
3472     nxt_thread_link_t    *link;
3473     nxt_thread_handle_t  handle;
3474 
3475     link = nxt_zalloc(sizeof(nxt_thread_link_t));
3476 
3477     if (nxt_slow_path(link == NULL)) {
3478         return NXT_ERROR;
3479     }
3480 
3481     link->start = nxt_router_thread_start;
3482     link->engine = engine;
3483     link->work.handler = nxt_router_thread_exit_handler;
3484     link->work.task = task;
3485     link->work.data = link;
3486 
3487     nxt_queue_insert_tail(&rt->engines, &engine->link);
3488 
3489     ret = nxt_thread_create(&handle, link);
3490 
3491     if (nxt_slow_path(ret != NXT_OK)) {
3492         nxt_queue_remove(&engine->link);
3493     }
3494 
3495     return ret;
3496 }
3497 
3498 
3499 static void
nxt_router_apps_sort(nxt_task_t * task,nxt_router_t * router,nxt_router_temp_conf_t * tmcf)3500 nxt_router_apps_sort(nxt_task_t *task, nxt_router_t *router,
3501     nxt_router_temp_conf_t *tmcf)
3502 {
3503     nxt_app_t  *app;
3504 
3505     nxt_queue_each(app, &router->apps, nxt_app_t, link) {
3506 
3507         nxt_router_app_unlink(task, app);
3508 
3509     } nxt_queue_loop;
3510 
3511     nxt_queue_add(&router->apps, &tmcf->previous);
3512     nxt_queue_add(&router->apps, &tmcf->apps);
3513 }
3514 
3515 
3516 static void
nxt_router_engines_post(nxt_router_t * router,nxt_router_temp_conf_t * tmcf)3517 nxt_router_engines_post(nxt_router_t *router, nxt_router_temp_conf_t *tmcf)
3518 {
3519     nxt_uint_t                n;
3520     nxt_event_engine_t        *engine;
3521     nxt_router_engine_conf_t  *recf;
3522 
3523     recf = tmcf->engines->elts;
3524 
3525     for (n = tmcf->engines->nelts; n != 0; n--) {
3526         engine = recf->engine;
3527 
3528         switch (recf->action) {
3529 
3530         case NXT_ROUTER_ENGINE_KEEP:
3531             break;
3532 
3533         case NXT_ROUTER_ENGINE_ADD:
3534             nxt_queue_insert_tail(&router->engines, &engine->link0);
3535             break;
3536 
3537         case NXT_ROUTER_ENGINE_DELETE:
3538             nxt_queue_remove(&engine->link0);
3539             break;
3540         }
3541 
3542         nxt_router_engine_post(engine, recf->jobs);
3543 
3544         recf++;
3545     }
3546 }
3547 
3548 
3549 static void
nxt_router_engine_post(nxt_event_engine_t * engine,nxt_work_t * jobs)3550 nxt_router_engine_post(nxt_event_engine_t *engine, nxt_work_t *jobs)
3551 {
3552     nxt_work_t  *work, *next;
3553 
3554     for (work = jobs; work != NULL; work = next) {
3555         next = work->next;
3556         work->next = NULL;
3557 
3558         nxt_event_engine_post(engine, work);
3559     }
3560 }
3561 
3562 
3563 static nxt_port_handlers_t  nxt_router_app_port_handlers = {
3564     .rpc_error       = nxt_port_rpc_handler,
3565     .mmap            = nxt_port_mmap_handler,
3566     .data            = nxt_port_rpc_handler,
3567     .oosm            = nxt_router_oosm_handler,
3568     .req_headers_ack = nxt_port_rpc_handler,
3569 };
3570 
3571 
3572 static void
nxt_router_thread_start(void * data)3573 nxt_router_thread_start(void *data)
3574 {
3575     nxt_int_t           ret;
3576     nxt_port_t          *port;
3577     nxt_task_t          *task;
3578     nxt_work_t          *work;
3579     nxt_thread_t        *thread;
3580     nxt_thread_link_t   *link;
3581     nxt_event_engine_t  *engine;
3582 
3583     link = data;
3584     engine = link->engine;
3585     task = &engine->task;
3586 
3587     thread = nxt_thread();
3588 
3589     nxt_event_engine_thread_adopt(engine);
3590 
3591     /* STUB */
3592     thread->runtime = engine->task.thread->runtime;
3593 
3594     engine->task.thread = thread;
3595     engine->task.log = thread->log;
3596     thread->engine = engine;
3597     thread->task = &engine->task;
3598 #if 0
3599     thread->fiber = &engine->fibers->fiber;
3600 #endif
3601 
3602     engine->mem_pool = nxt_mp_create(4096, 128, 1024, 64);
3603     if (nxt_slow_path(engine->mem_pool == NULL)) {
3604         return;
3605     }
3606 
3607     port = nxt_port_new(task, nxt_port_get_next_id(), nxt_pid,
3608                         NXT_PROCESS_ROUTER);
3609     if (nxt_slow_path(port == NULL)) {
3610         return;
3611     }
3612 
3613     ret = nxt_port_socket_init(task, port, 0);
3614     if (nxt_slow_path(ret != NXT_OK)) {
3615         nxt_port_use(task, port, -1);
3616         return;
3617     }
3618 
3619     ret = nxt_router_port_queue_init(task, port);
3620     if (nxt_slow_path(ret != NXT_OK)) {
3621         nxt_port_use(task, port, -1);
3622         return;
3623     }
3624 
3625     engine->port = port;
3626 
3627     nxt_port_enable(task, port, &nxt_router_app_port_handlers);
3628 
3629     work = nxt_zalloc(sizeof(nxt_work_t));
3630     if (nxt_slow_path(work == NULL)) {
3631         return;
3632     }
3633 
3634     work->handler = nxt_router_rt_add_port;
3635     work->task = link->work.task;
3636     work->obj = work;
3637     work->data = port;
3638 
3639     nxt_event_engine_post(link->work.task->thread->engine, work);
3640 
3641     nxt_event_engine_start(engine);
3642 }
3643 
3644 
3645 static void
nxt_router_rt_add_port(nxt_task_t * task,void * obj,void * data)3646 nxt_router_rt_add_port(nxt_task_t *task, void *obj, void *data)
3647 {
3648     nxt_int_t      res;
3649     nxt_port_t     *port;
3650     nxt_runtime_t  *rt;
3651 
3652     rt = task->thread->runtime;
3653     port = data;
3654 
3655     nxt_free(obj);
3656 
3657     res = nxt_port_hash_add(&rt->ports, port);
3658 
3659     if (nxt_fast_path(res == NXT_OK)) {
3660         nxt_port_use(task, port, 1);
3661     }
3662 }
3663 
3664 
3665 static void
nxt_router_listen_socket_create(nxt_task_t * task,void * obj,void * data)3666 nxt_router_listen_socket_create(nxt_task_t *task, void *obj, void *data)
3667 {
3668     nxt_joint_job_t          *job;
3669     nxt_socket_conf_t        *skcf;
3670     nxt_listen_event_t       *lev;
3671     nxt_listen_socket_t      *ls;
3672     nxt_thread_spinlock_t    *lock;
3673     nxt_socket_conf_joint_t  *joint;
3674 
3675     job = obj;
3676     joint = data;
3677 
3678     nxt_queue_insert_tail(&task->thread->engine->joints, &joint->link);
3679 
3680     skcf = joint->socket_conf;
3681     ls = skcf->listen;
3682 
3683     lev = nxt_listen_event(task, ls);
3684     if (nxt_slow_path(lev == NULL)) {
3685         nxt_router_listen_socket_release(task, skcf);
3686         return;
3687     }
3688 
3689     lev->socket.data = joint;
3690 
3691     lock = &skcf->router_conf->router->lock;
3692 
3693     nxt_thread_spin_lock(lock);
3694     ls->count++;
3695     nxt_thread_spin_unlock(lock);
3696 
3697     job->work.next = NULL;
3698     job->work.handler = nxt_router_conf_wait;
3699 
3700     nxt_event_engine_post(job->tmcf->engine, &job->work);
3701 }
3702 
3703 
3704 nxt_inline nxt_listen_event_t *
nxt_router_listen_event(nxt_queue_t * listen_connections,nxt_socket_conf_t * skcf)3705 nxt_router_listen_event(nxt_queue_t *listen_connections,
3706     nxt_socket_conf_t *skcf)
3707 {
3708     nxt_socket_t        fd;
3709     nxt_queue_link_t    *qlk;
3710     nxt_listen_event_t  *lev;
3711 
3712     fd = skcf->listen->socket;
3713 
3714     for (qlk = nxt_queue_first(listen_connections);
3715          qlk != nxt_queue_tail(listen_connections);
3716          qlk = nxt_queue_next(qlk))
3717     {
3718         lev = nxt_queue_link_data(qlk, nxt_listen_event_t, link);
3719 
3720         if (fd == lev->socket.fd) {
3721             return lev;
3722         }
3723     }
3724 
3725     return NULL;
3726 }
3727 
3728 
3729 static void
nxt_router_listen_socket_update(nxt_task_t * task,void * obj,void * data)3730 nxt_router_listen_socket_update(nxt_task_t *task, void *obj, void *data)
3731 {
3732     nxt_joint_job_t          *job;
3733     nxt_event_engine_t       *engine;
3734     nxt_listen_event_t       *lev;
3735     nxt_socket_conf_joint_t  *joint, *old;
3736 
3737     job = obj;
3738     joint = data;
3739 
3740     engine = task->thread->engine;
3741 
3742     nxt_queue_insert_tail(&engine->joints, &joint->link);
3743 
3744     lev = nxt_router_listen_event(&engine->listen_connections,
3745                                   joint->socket_conf);
3746 
3747     old = lev->socket.data;
3748     lev->socket.data = joint;
3749     lev->listen = joint->socket_conf->listen;
3750 
3751     job->work.next = NULL;
3752     job->work.handler = nxt_router_conf_wait;
3753 
3754     nxt_event_engine_post(job->tmcf->engine, &job->work);
3755 
3756     /*
3757      * The task is allocated from configuration temporary
3758      * memory pool so it can be freed after engine post operation.
3759      */
3760 
3761     nxt_router_conf_release(&engine->task, old);
3762 }
3763 
3764 
3765 static void
nxt_router_listen_socket_delete(nxt_task_t * task,void * obj,void * data)3766 nxt_router_listen_socket_delete(nxt_task_t *task, void *obj, void *data)
3767 {
3768     nxt_socket_conf_t        *skcf;
3769     nxt_listen_event_t       *lev;
3770     nxt_event_engine_t       *engine;
3771     nxt_socket_conf_joint_t  *joint;
3772 
3773     skcf = data;
3774 
3775     engine = task->thread->engine;
3776 
3777     lev = nxt_router_listen_event(&engine->listen_connections, skcf);
3778 
3779     nxt_fd_event_delete(engine, &lev->socket);
3780 
3781     nxt_debug(task, "engine %p: listen socket delete: %d", engine,
3782               lev->socket.fd);
3783 
3784     joint = lev->socket.data;
3785     joint->close_job = obj;
3786 
3787     lev->timer.handler = nxt_router_listen_socket_close;
3788     lev->timer.work_queue = &engine->fast_work_queue;
3789 
3790     nxt_timer_add(engine, &lev->timer, 0);
3791 }
3792 
3793 
3794 static void
nxt_router_worker_thread_quit(nxt_task_t * task,void * obj,void * data)3795 nxt_router_worker_thread_quit(nxt_task_t *task, void *obj, void *data)
3796 {
3797     nxt_event_engine_t  *engine;
3798 
3799     nxt_debug(task, "router worker thread quit");
3800 
3801     engine = task->thread->engine;
3802 
3803     engine->shutdown = 1;
3804 
3805     if (nxt_queue_is_empty(&engine->joints)) {
3806         nxt_thread_exit(task->thread);
3807     }
3808 }
3809 
3810 
3811 static void
nxt_router_listen_socket_close(nxt_task_t * task,void * obj,void * data)3812 nxt_router_listen_socket_close(nxt_task_t *task, void *obj, void *data)
3813 {
3814     nxt_timer_t              *timer;
3815     nxt_joint_job_t          *job;
3816     nxt_listen_event_t       *lev;
3817     nxt_socket_conf_joint_t  *joint;
3818 
3819     timer = obj;
3820     lev = nxt_timer_data(timer, nxt_listen_event_t, timer);
3821 
3822     nxt_debug(task, "engine %p: listen socket close: %d", task->thread->engine,
3823               lev->socket.fd);
3824 
3825     nxt_queue_remove(&lev->link);
3826 
3827     joint = lev->socket.data;
3828     lev->socket.data = NULL;
3829 
3830     /* 'task' refers to lev->task and we cannot use after nxt_free() */
3831     task = &task->thread->engine->task;
3832 
3833     nxt_router_listen_socket_release(task, joint->socket_conf);
3834 
3835     job = joint->close_job;
3836     job->work.next = NULL;
3837     job->work.handler = nxt_router_conf_wait;
3838 
3839     nxt_event_engine_post(job->tmcf->engine, &job->work);
3840 
3841     nxt_router_listen_event_release(task, lev, joint);
3842 }
3843 
3844 
3845 static void
nxt_router_listen_socket_release(nxt_task_t * task,nxt_socket_conf_t * skcf)3846 nxt_router_listen_socket_release(nxt_task_t *task, nxt_socket_conf_t *skcf)
3847 {
3848 #if (NXT_HAVE_UNIX_DOMAIN)
3849     size_t                 size;
3850     nxt_buf_t              *b;
3851     nxt_port_t             *main_port;
3852     nxt_runtime_t          *rt;
3853     nxt_sockaddr_t         *sa;
3854 #endif
3855     nxt_listen_socket_t    *ls;
3856     nxt_thread_spinlock_t  *lock;
3857 
3858     ls = skcf->listen;
3859     lock = &skcf->router_conf->router->lock;
3860 
3861     nxt_thread_spin_lock(lock);
3862 
3863     nxt_debug(task, "engine %p: listen socket release: ls->count %D",
3864               task->thread->engine, ls->count);
3865 
3866     if (--ls->count != 0) {
3867         ls = NULL;
3868     }
3869 
3870     nxt_thread_spin_unlock(lock);
3871 
3872     if (ls == NULL) {
3873         return;
3874     }
3875 
3876     nxt_socket_close(task, ls->socket);
3877 
3878 #if (NXT_HAVE_UNIX_DOMAIN)
3879     sa = ls->sockaddr;
3880     if (sa->u.sockaddr.sa_family != AF_UNIX
3881         || sa->u.sockaddr_un.sun_path[0] == '\0')
3882     {
3883         goto out_free_ls;
3884     }
3885 
3886     size = nxt_sockaddr_size(ls->sockaddr);
3887 
3888     b = nxt_buf_mem_alloc(task->thread->engine->mem_pool, size, 0);
3889     if (b == NULL) {
3890         goto out_free_ls;
3891     }
3892 
3893     b->mem.free = nxt_cpymem(b->mem.free, ls->sockaddr, size);
3894 
3895     rt = task->thread->runtime;
3896     main_port = rt->port_by_type[NXT_PROCESS_MAIN];
3897 
3898     (void) nxt_port_socket_write(task, main_port, NXT_PORT_MSG_SOCKET_UNLINK,
3899                                  -1, 0, 0, b);
3900 
3901 out_free_ls:
3902 #endif
3903     nxt_free(ls);
3904 }
3905 
3906 
3907 void
nxt_router_listen_event_release(nxt_task_t * task,nxt_listen_event_t * lev,nxt_socket_conf_joint_t * joint)3908 nxt_router_listen_event_release(nxt_task_t *task, nxt_listen_event_t *lev,
3909     nxt_socket_conf_joint_t *joint)
3910 {
3911     nxt_event_engine_t  *engine;
3912 
3913     nxt_debug(task, "listen event count: %D", lev->count);
3914 
3915     engine = task->thread->engine;
3916 
3917     if (--lev->count == 0) {
3918         if (lev->next != NULL) {
3919             nxt_sockaddr_cache_free(engine, lev->next);
3920 
3921             nxt_conn_free(task, lev->next);
3922         }
3923 
3924         nxt_free(lev);
3925     }
3926 
3927     if (joint != NULL) {
3928         nxt_router_conf_release(task, joint);
3929     }
3930 
3931     if (engine->shutdown && nxt_queue_is_empty(&engine->joints)) {
3932         nxt_thread_exit(task->thread);
3933     }
3934 }
3935 
3936 
3937 void
nxt_router_conf_release(nxt_task_t * task,nxt_socket_conf_joint_t * joint)3938 nxt_router_conf_release(nxt_task_t *task, nxt_socket_conf_joint_t *joint)
3939 {
3940     nxt_socket_conf_t      *skcf;
3941     nxt_router_conf_t      *rtcf;
3942     nxt_thread_spinlock_t  *lock;
3943 
3944     nxt_debug(task, "conf joint %p count: %D", joint, joint->count);
3945 
3946     if (--joint->count != 0) {
3947         return;
3948     }
3949 
3950     nxt_queue_remove(&joint->link);
3951 
3952     /*
3953      * The joint content can not be safely used after the critical
3954      * section protected by the spinlock because its memory pool may
3955      * be already destroyed by another thread.
3956      */
3957     skcf = joint->socket_conf;
3958     rtcf = skcf->router_conf;
3959     lock = &rtcf->router->lock;
3960 
3961     nxt_thread_spin_lock(lock);
3962 
3963     nxt_debug(task, "conf skcf %p: %D, rtcf %p: %D", skcf, skcf->count,
3964               rtcf, rtcf->count);
3965 
3966     if (--skcf->count != 0) {
3967         skcf = NULL;
3968         rtcf = NULL;
3969 
3970     } else {
3971         nxt_queue_remove(&skcf->link);
3972 
3973         if (--rtcf->count != 0) {
3974             rtcf = NULL;
3975         }
3976     }
3977 
3978     nxt_thread_spin_unlock(lock);
3979 
3980 #if (NXT_TLS)
3981     if (skcf != NULL && skcf->tls != NULL) {
3982         task->thread->runtime->tls->server_free(task, skcf->tls);
3983     }
3984 #endif
3985 
3986     /* TODO remove engine->port */
3987 
3988     if (rtcf != NULL) {
3989         nxt_debug(task, "old router conf is destroyed");
3990 
3991         nxt_router_apps_hash_use(task, rtcf, -1);
3992 
3993         nxt_router_access_log_release(task, lock, rtcf->access_log);
3994 
3995         nxt_tstr_state_release(rtcf->tstr_state);
3996 
3997         nxt_mp_thread_adopt(rtcf->mem_pool);
3998 
3999         nxt_mp_destroy(rtcf->mem_pool);
4000     }
4001 }
4002 
4003 
4004 static void
nxt_router_thread_exit_handler(nxt_task_t * task,void * obj,void * data)4005 nxt_router_thread_exit_handler(nxt_task_t *task, void *obj, void *data)
4006 {
4007     nxt_port_t           *port;
4008     nxt_thread_link_t    *link;
4009     nxt_event_engine_t   *engine;
4010     nxt_thread_handle_t  handle;
4011 
4012     handle = (nxt_thread_handle_t) (uintptr_t) obj;
4013     link = data;
4014 
4015     nxt_thread_wait(handle);
4016 
4017     engine = link->engine;
4018 
4019     nxt_queue_remove(&engine->link);
4020 
4021     port = engine->port;
4022 
4023     // TODO notify all apps
4024 
4025     port->engine = task->thread->engine;
4026     nxt_mp_thread_adopt(port->mem_pool);
4027     nxt_port_use(task, port, -1);
4028 
4029     nxt_mp_thread_adopt(engine->mem_pool);
4030     nxt_mp_destroy(engine->mem_pool);
4031 
4032     nxt_event_engine_free(engine);
4033 
4034     nxt_free(link);
4035 }
4036 
4037 
4038 static void
nxt_router_response_ready_handler(nxt_task_t * task,nxt_port_recv_msg_t * msg,void * data)4039 nxt_router_response_ready_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg,
4040     void *data)
4041 {
4042     size_t                  b_size, count;
4043     nxt_int_t               ret;
4044     nxt_app_t               *app;
4045     nxt_buf_t               *b, *next;
4046     nxt_port_t              *app_port;
4047     nxt_unit_field_t        *f;
4048     nxt_http_field_t        *field;
4049     nxt_http_request_t      *r;
4050     nxt_unit_response_t     *resp;
4051     nxt_request_rpc_data_t  *req_rpc_data;
4052 
4053     req_rpc_data = data;
4054 
4055     r = req_rpc_data->request;
4056     if (nxt_slow_path(r == NULL)) {
4057         return;
4058     }
4059 
4060     if (r->error) {
4061         nxt_request_rpc_data_unlink(task, req_rpc_data);
4062         return;
4063     }
4064 
4065     app = req_rpc_data->app;
4066     nxt_assert(app != NULL);
4067 
4068     if (msg->port_msg.type == _NXT_PORT_MSG_REQ_HEADERS_ACK) {
4069         nxt_router_req_headers_ack_handler(task, msg, req_rpc_data);
4070 
4071         return;
4072     }
4073 
4074     b = (msg->size == 0) ? NULL : msg->buf;
4075 
4076     if (msg->port_msg.last != 0) {
4077         nxt_debug(task, "router data create last buf");
4078 
4079         nxt_buf_chain_add(&b, nxt_http_buf_last(r));
4080 
4081         req_rpc_data->rpc_cancel = 0;
4082 
4083         if (req_rpc_data->apr_action == NXT_APR_REQUEST_FAILED) {
4084             req_rpc_data->apr_action = NXT_APR_GOT_RESPONSE;
4085         }
4086 
4087         nxt_request_rpc_data_unlink(task, req_rpc_data);
4088 
4089     } else {
4090         if (app->timeout != 0) {
4091             r->timer.handler = nxt_router_app_timeout;
4092             r->timer_data = req_rpc_data;
4093             nxt_timer_add(task->thread->engine, &r->timer, app->timeout);
4094         }
4095     }
4096 
4097     if (b == NULL) {
4098         return;
4099     }
4100 
4101     if (msg->buf == b) {
4102         /* Disable instant buffer completion/re-using by port. */
4103         msg->buf = NULL;
4104     }
4105 
4106     if (r->header_sent) {
4107         nxt_buf_chain_add(&r->out, b);
4108         nxt_http_request_send_body(task, r, NULL);
4109 
4110     } else {
4111         b_size = nxt_buf_is_mem(b) ? nxt_buf_mem_used_size(&b->mem) : 0;
4112 
4113         if (nxt_slow_path(b_size < sizeof(nxt_unit_response_t))) {
4114             nxt_alert(task, "response buffer too small: %z", b_size);
4115             goto fail;
4116         }
4117 
4118         resp = (void *) b->mem.pos;
4119         count = (b_size - sizeof(nxt_unit_response_t))
4120                     / sizeof(nxt_unit_field_t);
4121 
4122         if (nxt_slow_path(count < resp->fields_count)) {
4123             nxt_alert(task, "response buffer too small for fields count: %D",
4124                       resp->fields_count);
4125             goto fail;
4126         }
4127 
4128         field = NULL;
4129 
4130         for (f = resp->fields; f < resp->fields + resp->fields_count; f++) {
4131             if (f->skip) {
4132                 continue;
4133             }
4134 
4135             field = nxt_list_add(r->resp.fields);
4136 
4137             if (nxt_slow_path(field == NULL)) {
4138                 goto fail;
4139             }
4140 
4141             field->hash = f->hash;
4142             field->skip = 0;
4143             field->hopbyhop = 0;
4144 
4145             field->name_length = f->name_length;
4146             field->value_length = f->value_length;
4147             field->name = nxt_unit_sptr_get(&f->name);
4148             field->value = nxt_unit_sptr_get(&f->value);
4149 
4150             ret = nxt_http_field_process(field, &nxt_response_fields_hash, r);
4151             if (nxt_slow_path(ret != NXT_OK)) {
4152                 goto fail;
4153             }
4154 
4155             nxt_debug(task, "header%s: %*s: %*s",
4156                       (field->skip ? " skipped" : ""),
4157                       (size_t) field->name_length, field->name,
4158                       (size_t) field->value_length, field->value);
4159 
4160             if (field->skip) {
4161                 r->resp.fields->last->nelts--;
4162             }
4163         }
4164 
4165         r->status = resp->status;
4166 
4167         if (resp->piggyback_content_length != 0) {
4168             b->mem.pos = nxt_unit_sptr_get(&resp->piggyback_content);
4169             b->mem.free = b->mem.pos + resp->piggyback_content_length;
4170 
4171         } else {
4172             b->mem.pos = b->mem.free;
4173         }
4174 
4175         if (nxt_buf_mem_used_size(&b->mem) == 0) {
4176             next = b->next;
4177             b->next = NULL;
4178 
4179             nxt_work_queue_add(&task->thread->engine->fast_work_queue,
4180                                b->completion_handler, task, b, b->parent);
4181 
4182             b = next;
4183         }
4184 
4185         if (b != NULL) {
4186             nxt_buf_chain_add(&r->out, b);
4187         }
4188 
4189         nxt_http_request_header_send(task, r, nxt_http_request_send_body, NULL);
4190 
4191         if (r->websocket_handshake
4192             && r->status == NXT_HTTP_SWITCHING_PROTOCOLS)
4193         {
4194             app_port = req_rpc_data->app_port;
4195             if (nxt_slow_path(app_port == NULL)) {
4196                 goto fail;
4197             }
4198 
4199             nxt_thread_mutex_lock(&app->mutex);
4200 
4201             app_port->main_app_port->active_websockets++;
4202 
4203             nxt_thread_mutex_unlock(&app->mutex);
4204 
4205             nxt_router_app_port_release(task, app, app_port, NXT_APR_UPGRADE);
4206             req_rpc_data->apr_action = NXT_APR_CLOSE;
4207 
4208             nxt_debug(task, "stream #%uD upgrade", req_rpc_data->stream);
4209 
4210             r->state = &nxt_http_websocket;
4211 
4212         } else {
4213             r->state = &nxt_http_request_send_state;
4214         }
4215     }
4216 
4217     return;
4218 
4219 fail:
4220 
4221     nxt_http_request_error(task, r, NXT_HTTP_SERVICE_UNAVAILABLE);
4222 
4223     nxt_request_rpc_data_unlink(task, req_rpc_data);
4224 }
4225 
4226 
4227 static void
nxt_router_req_headers_ack_handler(nxt_task_t * task,nxt_port_recv_msg_t * msg,nxt_request_rpc_data_t * req_rpc_data)4228 nxt_router_req_headers_ack_handler(nxt_task_t *task,
4229     nxt_port_recv_msg_t *msg, nxt_request_rpc_data_t *req_rpc_data)
4230 {
4231     int                 res;
4232     nxt_app_t           *app;
4233     nxt_buf_t           *b;
4234     nxt_bool_t          start_process, unlinked;
4235     nxt_port_t          *app_port, *main_app_port, *idle_port;
4236     nxt_queue_link_t    *idle_lnk;
4237     nxt_http_request_t  *r;
4238 
4239     nxt_debug(task, "stream #%uD: got ack from %PI:%d",
4240               req_rpc_data->stream,
4241               msg->port_msg.pid, msg->port_msg.reply_port);
4242 
4243     nxt_port_rpc_ex_set_peer(task, msg->port, req_rpc_data,
4244                              msg->port_msg.pid);
4245 
4246     app = req_rpc_data->app;
4247     r = req_rpc_data->request;
4248 
4249     start_process = 0;
4250     unlinked = 0;
4251 
4252     nxt_thread_mutex_lock(&app->mutex);
4253 
4254     if (r->app_link.next != NULL) {
4255         nxt_queue_remove(&r->app_link);
4256         r->app_link.next = NULL;
4257 
4258         unlinked = 1;
4259     }
4260 
4261     app_port = nxt_port_hash_find(&app->port_hash, msg->port_msg.pid,
4262                                   msg->port_msg.reply_port);
4263     if (nxt_slow_path(app_port == NULL)) {
4264         nxt_thread_mutex_unlock(&app->mutex);
4265 
4266         nxt_http_request_error(task, r, NXT_HTTP_INTERNAL_SERVER_ERROR);
4267 
4268         if (unlinked) {
4269             nxt_mp_release(r->mem_pool);
4270         }
4271 
4272         return;
4273     }
4274 
4275     main_app_port = app_port->main_app_port;
4276 
4277     if (nxt_queue_chk_remove(&main_app_port->idle_link)) {
4278         app->idle_processes--;
4279 
4280         nxt_debug(task, "app '%V' move port %PI:%d out of %s (ack)",
4281                   &app->name, main_app_port->pid, main_app_port->id,
4282                   (main_app_port->idle_start ? "idle_ports" : "spare_ports"));
4283 
4284         /* Check port was in 'spare_ports' using idle_start field. */
4285         if (main_app_port->idle_start == 0
4286             && app->idle_processes >= app->spare_processes)
4287         {
4288             /*
4289              * If there is a vacant space in spare ports,
4290              * move the last idle to spare_ports.
4291              */
4292             nxt_assert(!nxt_queue_is_empty(&app->idle_ports));
4293 
4294             idle_lnk = nxt_queue_last(&app->idle_ports);
4295             idle_port = nxt_queue_link_data(idle_lnk, nxt_port_t, idle_link);
4296             nxt_queue_remove(idle_lnk);
4297 
4298             nxt_queue_insert_tail(&app->spare_ports, idle_lnk);
4299 
4300             idle_port->idle_start = 0;
4301 
4302             nxt_debug(task, "app '%V' move port %PI:%d from idle_ports "
4303                       "to spare_ports",
4304                       &app->name, idle_port->pid, idle_port->id);
4305         }
4306 
4307         if (nxt_router_app_can_start(app) && nxt_router_app_need_start(app)) {
4308             app->pending_processes++;
4309             start_process = 1;
4310         }
4311     }
4312 
4313     main_app_port->active_requests++;
4314 
4315     nxt_port_inc_use(app_port);
4316 
4317     nxt_thread_mutex_unlock(&app->mutex);
4318 
4319     if (unlinked) {
4320         nxt_mp_release(r->mem_pool);
4321     }
4322 
4323     if (start_process) {
4324         nxt_router_start_app_process(task, app);
4325     }
4326 
4327     nxt_port_use(task, req_rpc_data->app_port, -1);
4328 
4329     req_rpc_data->app_port = app_port;
4330 
4331     b = req_rpc_data->msg_info.buf;
4332 
4333     if (b != NULL) {
4334         /* First buffer is already sent.  Start from second. */
4335         b = b->next;
4336 
4337         req_rpc_data->msg_info.buf->next = NULL;
4338     }
4339 
4340     if (req_rpc_data->msg_info.body_fd != -1 || b != NULL) {
4341         nxt_debug(task, "stream #%uD: send body fd %d", req_rpc_data->stream,
4342                   req_rpc_data->msg_info.body_fd);
4343 
4344         if (req_rpc_data->msg_info.body_fd != -1) {
4345             lseek(req_rpc_data->msg_info.body_fd, 0, SEEK_SET);
4346         }
4347 
4348         res = nxt_port_socket_write(task, app_port, NXT_PORT_MSG_REQ_BODY,
4349                                     req_rpc_data->msg_info.body_fd,
4350                                     req_rpc_data->stream,
4351                                     task->thread->engine->port->id, b);
4352 
4353         if (nxt_slow_path(res != NXT_OK)) {
4354             nxt_http_request_error(task, r, NXT_HTTP_INTERNAL_SERVER_ERROR);
4355         }
4356     }
4357 
4358     if (app->timeout != 0) {
4359         r->timer.handler = nxt_router_app_timeout;
4360         r->timer_data = req_rpc_data;
4361         nxt_timer_add(task->thread->engine, &r->timer, app->timeout);
4362     }
4363 }
4364 
4365 
4366 static const nxt_http_request_state_t  nxt_http_request_send_state
4367     nxt_aligned(64) =
4368 {
4369     .error_handler = nxt_http_request_error_handler,
4370 };
4371 
4372 
4373 static void
nxt_http_request_send_body(nxt_task_t * task,void * obj,void * data)4374 nxt_http_request_send_body(nxt_task_t *task, void *obj, void *data)
4375 {
4376     nxt_buf_t           *out;
4377     nxt_http_request_t  *r;
4378 
4379     r = obj;
4380 
4381     out = r->out;
4382 
4383     if (out != NULL) {
4384         r->out = NULL;
4385         nxt_http_request_send(task, r, out);
4386     }
4387 }
4388 
4389 
4390 static void
nxt_router_response_error_handler(nxt_task_t * task,nxt_port_recv_msg_t * msg,void * data)4391 nxt_router_response_error_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg,
4392     void *data)
4393 {
4394     nxt_request_rpc_data_t  *req_rpc_data;
4395 
4396     req_rpc_data = data;
4397 
4398     req_rpc_data->rpc_cancel = 0;
4399 
4400     /* TODO cancel message and return if cancelled. */
4401     // nxt_router_msg_cancel(task, &req_rpc_data->msg_info, req_rpc_data->stream);
4402 
4403     if (req_rpc_data->request != NULL) {
4404         nxt_http_request_error(task, req_rpc_data->request,
4405                                NXT_HTTP_SERVICE_UNAVAILABLE);
4406     }
4407 
4408     nxt_request_rpc_data_unlink(task, req_rpc_data);
4409 }
4410 
4411 
4412 static void
nxt_router_app_port_ready(nxt_task_t * task,nxt_port_recv_msg_t * msg,void * data)4413 nxt_router_app_port_ready(nxt_task_t *task, nxt_port_recv_msg_t *msg,
4414     void *data)
4415 {
4416     uint32_t             n;
4417     nxt_app_t            *app;
4418     nxt_bool_t           start_process, restarted;
4419     nxt_port_t           *port;
4420     nxt_app_joint_t      *app_joint;
4421     nxt_app_joint_rpc_t  *app_joint_rpc;
4422 
4423     nxt_assert(data != NULL);
4424 
4425     app_joint_rpc = data;
4426     app_joint = app_joint_rpc->app_joint;
4427     port = msg->u.new_port;
4428 
4429     nxt_assert(app_joint != NULL);
4430     nxt_assert(port != NULL);
4431     nxt_assert(port->id == 0);
4432 
4433     app = app_joint->app;
4434 
4435     nxt_router_app_joint_use(task, app_joint, -1);
4436 
4437     if (nxt_slow_path(app == NULL)) {
4438         nxt_debug(task, "new port ready for released app, send QUIT");
4439 
4440         nxt_port_socket_write(task, port, NXT_PORT_MSG_QUIT, -1, 0, 0, NULL);
4441 
4442         return;
4443     }
4444 
4445     nxt_thread_mutex_lock(&app->mutex);
4446 
4447     restarted = (app->generation != app_joint_rpc->generation);
4448 
4449     if (app_joint_rpc->proto) {
4450         nxt_assert(app->proto_port == NULL);
4451         nxt_assert(port->type == NXT_PROCESS_PROTOTYPE);
4452 
4453         n = app->proto_port_requests;
4454         app->proto_port_requests = 0;
4455 
4456         if (nxt_slow_path(restarted)) {
4457             nxt_thread_mutex_unlock(&app->mutex);
4458 
4459             nxt_debug(task, "proto port ready for restarted app, send QUIT");
4460 
4461             nxt_port_socket_write(task, port, NXT_PORT_MSG_QUIT, -1, 0, 0,
4462                                   NULL);
4463 
4464         } else {
4465             port->app = app;
4466             app->proto_port = port;
4467 
4468             nxt_thread_mutex_unlock(&app->mutex);
4469 
4470             nxt_port_use(task, port, 1);
4471         }
4472 
4473         port = task->thread->runtime->port_by_type[NXT_PROCESS_ROUTER];
4474 
4475         while (n > 0) {
4476             nxt_router_app_use(task, app, 1);
4477 
4478             nxt_router_start_app_process_handler(task, port, app);
4479 
4480             n--;
4481         }
4482 
4483         return;
4484     }
4485 
4486     nxt_assert(port->type == NXT_PROCESS_APP);
4487     nxt_assert(app->pending_processes != 0);
4488 
4489     app->pending_processes--;
4490 
4491     if (nxt_slow_path(restarted)) {
4492         nxt_debug(task, "new port ready for restarted app, send QUIT");
4493 
4494         start_process = !task->thread->engine->shutdown
4495                         && nxt_router_app_can_start(app)
4496                         && nxt_router_app_need_start(app);
4497 
4498         if (start_process) {
4499             app->pending_processes++;
4500         }
4501 
4502         nxt_thread_mutex_unlock(&app->mutex);
4503 
4504         nxt_port_socket_write(task, port, NXT_PORT_MSG_QUIT, -1, 0, 0, NULL);
4505 
4506         if (start_process) {
4507             nxt_router_start_app_process(task, app);
4508         }
4509 
4510         return;
4511     }
4512 
4513     port->app = app;
4514     port->main_app_port = port;
4515 
4516     app->processes++;
4517     nxt_port_hash_add(&app->port_hash, port);
4518     app->port_hash_count++;
4519 
4520     nxt_thread_mutex_unlock(&app->mutex);
4521 
4522     nxt_debug(task, "app '%V' new port ready, pid %PI, %d/%d",
4523               &app->name, port->pid, app->processes, app->pending_processes);
4524 
4525     nxt_port_socket_write(task, port, NXT_PORT_MSG_PORT_ACK, -1, 0, 0, NULL);
4526 
4527     nxt_router_app_port_release(task, app, port, NXT_APR_NEW_PORT);
4528 }
4529 
4530 
4531 static void
nxt_router_app_port_error(nxt_task_t * task,nxt_port_recv_msg_t * msg,void * data)4532 nxt_router_app_port_error(nxt_task_t *task, nxt_port_recv_msg_t *msg,
4533     void *data)
4534 {
4535     nxt_app_t            *app;
4536     nxt_app_joint_t      *app_joint;
4537     nxt_queue_link_t     *link;
4538     nxt_http_request_t   *r;
4539     nxt_app_joint_rpc_t  *app_joint_rpc;
4540 
4541     nxt_assert(data != NULL);
4542 
4543     app_joint_rpc = data;
4544     app_joint = app_joint_rpc->app_joint;
4545 
4546     nxt_assert(app_joint != NULL);
4547 
4548     app = app_joint->app;
4549 
4550     nxt_router_app_joint_use(task, app_joint, -1);
4551 
4552     if (nxt_slow_path(app == NULL)) {
4553         nxt_debug(task, "start error for released app");
4554 
4555         return;
4556     }
4557 
4558     nxt_debug(task, "app '%V' %p start error", &app->name, app);
4559 
4560     link = NULL;
4561 
4562     nxt_thread_mutex_lock(&app->mutex);
4563 
4564     nxt_assert(app->pending_processes != 0);
4565 
4566     app->pending_processes--;
4567 
4568     if (app->processes == 0 && !nxt_queue_is_empty(&app->ack_waiting_req)) {
4569         link = nxt_queue_first(&app->ack_waiting_req);
4570 
4571         nxt_queue_remove(link);
4572         link->next = NULL;
4573     }
4574 
4575     nxt_thread_mutex_unlock(&app->mutex);
4576 
4577     while (link != NULL) {
4578         r = nxt_container_of(link, nxt_http_request_t, app_link);
4579 
4580         nxt_event_engine_post(r->engine, &r->err_work);
4581 
4582         link = NULL;
4583 
4584         nxt_thread_mutex_lock(&app->mutex);
4585 
4586         if (app->processes == 0 && app->pending_processes == 0
4587             && !nxt_queue_is_empty(&app->ack_waiting_req))
4588         {
4589             link = nxt_queue_first(&app->ack_waiting_req);
4590 
4591             nxt_queue_remove(link);
4592             link->next = NULL;
4593         }
4594 
4595         nxt_thread_mutex_unlock(&app->mutex);
4596     }
4597 }
4598 
4599 
4600 nxt_inline nxt_port_t *
nxt_router_app_get_port_for_quit(nxt_task_t * task,nxt_app_t * app)4601 nxt_router_app_get_port_for_quit(nxt_task_t *task, nxt_app_t *app)
4602 {
4603     nxt_port_t  *port;
4604 
4605     port = NULL;
4606 
4607     nxt_thread_mutex_lock(&app->mutex);
4608 
4609     nxt_queue_each(port, &app->ports, nxt_port_t, app_link) {
4610 
4611         /* Caller is responsible to decrease port use count. */
4612         nxt_queue_chk_remove(&port->app_link);
4613 
4614         if (nxt_queue_chk_remove(&port->idle_link)) {
4615             app->idle_processes--;
4616 
4617             nxt_debug(task, "app '%V' move port %PI:%d out of %s for quit",
4618                       &app->name, port->pid, port->id,
4619                       (port->idle_start ? "idle_ports" : "spare_ports"));
4620         }
4621 
4622         nxt_port_hash_remove(&app->port_hash, port);
4623         app->port_hash_count--;
4624 
4625         port->app = NULL;
4626         app->processes--;
4627 
4628         break;
4629 
4630     } nxt_queue_loop;
4631 
4632     nxt_thread_mutex_unlock(&app->mutex);
4633 
4634     return port;
4635 }
4636 
4637 
4638 static void
nxt_router_app_use(nxt_task_t * task,nxt_app_t * app,int i)4639 nxt_router_app_use(nxt_task_t *task, nxt_app_t *app, int i)
4640 {
4641     int  c;
4642 
4643     c = nxt_atomic_fetch_add(&app->use_count, i);
4644 
4645     if (i < 0 && c == -i) {
4646 
4647         if (task->thread->engine != app->engine) {
4648             nxt_event_engine_post(app->engine, &app->joint->free_app_work);
4649 
4650         } else {
4651             nxt_router_free_app(task, app->joint, NULL);
4652         }
4653     }
4654 }
4655 
4656 
4657 static void
nxt_router_app_unlink(nxt_task_t * task,nxt_app_t * app)4658 nxt_router_app_unlink(nxt_task_t *task, nxt_app_t *app)
4659 {
4660     nxt_debug(task, "app '%V' %p unlink", &app->name, app);
4661 
4662     nxt_queue_remove(&app->link);
4663 
4664     nxt_router_app_use(task, app, -1);
4665 }
4666 
4667 
4668 static void
nxt_router_app_port_release(nxt_task_t * task,nxt_app_t * app,nxt_port_t * port,nxt_apr_action_t action)4669 nxt_router_app_port_release(nxt_task_t *task, nxt_app_t *app, nxt_port_t *port,
4670     nxt_apr_action_t action)
4671 {
4672     int         inc_use;
4673     uint32_t    got_response, dec_requests;
4674     nxt_bool_t  adjust_idle_timer;
4675     nxt_port_t  *main_app_port;
4676 
4677     nxt_assert(port != NULL);
4678 
4679     inc_use = 0;
4680     got_response = 0;
4681     dec_requests = 0;
4682 
4683     switch (action) {
4684     case NXT_APR_NEW_PORT:
4685         break;
4686     case NXT_APR_REQUEST_FAILED:
4687         dec_requests = 1;
4688         inc_use = -1;
4689         break;
4690     case NXT_APR_GOT_RESPONSE:
4691         got_response = 1;
4692         inc_use = -1;
4693         break;
4694     case NXT_APR_UPGRADE:
4695         got_response = 1;
4696         break;
4697     case NXT_APR_CLOSE:
4698         inc_use = -1;
4699         break;
4700     }
4701 
4702     nxt_debug(task, "app '%V' release port %PI:%d: %d %d", &app->name,
4703               port->pid, port->id,
4704               (int) inc_use, (int) got_response);
4705 
4706     if (port->id == NXT_SHARED_PORT_ID) {
4707         nxt_thread_mutex_lock(&app->mutex);
4708 
4709         app->active_requests -= got_response + dec_requests;
4710 
4711         nxt_thread_mutex_unlock(&app->mutex);
4712 
4713         goto adjust_use;
4714     }
4715 
4716     main_app_port = port->main_app_port;
4717 
4718     nxt_thread_mutex_lock(&app->mutex);
4719 
4720     main_app_port->active_requests -= got_response + dec_requests;
4721     app->active_requests -= got_response + dec_requests;
4722 
4723     if (main_app_port->pair[1] != -1 && main_app_port->app_link.next == NULL) {
4724         nxt_queue_insert_tail(&app->ports, &main_app_port->app_link);
4725 
4726         nxt_port_inc_use(main_app_port);
4727     }
4728 
4729     adjust_idle_timer = 0;
4730 
4731     if (main_app_port->pair[1] != -1
4732         && main_app_port->active_requests == 0
4733         && main_app_port->active_websockets == 0
4734         && main_app_port->idle_link.next == NULL)
4735     {
4736         if (app->idle_processes == app->spare_processes
4737             && app->adjust_idle_work.data == NULL)
4738         {
4739             adjust_idle_timer = 1;
4740             app->adjust_idle_work.data = app;
4741             app->adjust_idle_work.next = NULL;
4742         }
4743 
4744         if (app->idle_processes < app->spare_processes) {
4745             nxt_queue_insert_tail(&app->spare_ports, &main_app_port->idle_link);
4746 
4747             nxt_debug(task, "app '%V' move port %PI:%d to spare_ports",
4748                       &app->name, main_app_port->pid, main_app_port->id);
4749         } else {
4750             nxt_queue_insert_tail(&app->idle_ports, &main_app_port->idle_link);
4751 
4752             main_app_port->idle_start = task->thread->engine->timers.now;
4753 
4754             nxt_debug(task, "app '%V' move port %PI:%d to idle_ports",
4755                       &app->name, main_app_port->pid, main_app_port->id);
4756         }
4757 
4758         app->idle_processes++;
4759     }
4760 
4761     nxt_thread_mutex_unlock(&app->mutex);
4762 
4763     if (adjust_idle_timer) {
4764         nxt_router_app_use(task, app, 1);
4765         nxt_event_engine_post(app->engine, &app->adjust_idle_work);
4766     }
4767 
4768     /* ? */
4769     if (main_app_port->pair[1] == -1) {
4770         nxt_debug(task, "app '%V' %p port %p already closed (pid %PI dead?)",
4771                   &app->name, app, main_app_port, main_app_port->pid);
4772 
4773         goto adjust_use;
4774     }
4775 
4776     nxt_debug(task, "app '%V' %p requests queue is empty, keep the port",
4777               &app->name, app);
4778 
4779 adjust_use:
4780 
4781     nxt_port_use(task, port, inc_use);
4782 }
4783 
4784 
4785 void
nxt_router_app_port_close(nxt_task_t * task,nxt_port_t * port)4786 nxt_router_app_port_close(nxt_task_t *task, nxt_port_t *port)
4787 {
4788     nxt_app_t         *app;
4789     nxt_bool_t        unchain, start_process;
4790     nxt_port_t        *idle_port;
4791     nxt_queue_link_t  *idle_lnk;
4792 
4793     app = port->app;
4794 
4795     nxt_assert(app != NULL);
4796 
4797     nxt_thread_mutex_lock(&app->mutex);
4798 
4799     if (port == app->proto_port) {
4800         app->proto_port = NULL;
4801         port->app = NULL;
4802 
4803         nxt_thread_mutex_unlock(&app->mutex);
4804 
4805         nxt_debug(task, "app '%V' prototype pid %PI closed", &app->name,
4806                   port->pid);
4807 
4808         nxt_port_use(task, port, -1);
4809 
4810         return;
4811     }
4812 
4813     nxt_port_hash_remove(&app->port_hash, port);
4814     app->port_hash_count--;
4815 
4816     if (port->id != 0) {
4817         nxt_thread_mutex_unlock(&app->mutex);
4818 
4819         nxt_debug(task, "app '%V' port (%PI, %d) closed", &app->name,
4820                   port->pid, port->id);
4821 
4822         return;
4823     }
4824 
4825     unchain = nxt_queue_chk_remove(&port->app_link);
4826 
4827     if (nxt_queue_chk_remove(&port->idle_link)) {
4828         app->idle_processes--;
4829 
4830         nxt_debug(task, "app '%V' move port %PI:%d out of %s before close",
4831                   &app->name, port->pid, port->id,
4832                   (port->idle_start ? "idle_ports" : "spare_ports"));
4833 
4834         if (port->idle_start == 0
4835             && app->idle_processes >= app->spare_processes)
4836         {
4837             nxt_assert(!nxt_queue_is_empty(&app->idle_ports));
4838 
4839             idle_lnk = nxt_queue_last(&app->idle_ports);
4840             idle_port = nxt_queue_link_data(idle_lnk, nxt_port_t, idle_link);
4841             nxt_queue_remove(idle_lnk);
4842 
4843             nxt_queue_insert_tail(&app->spare_ports, idle_lnk);
4844 
4845             idle_port->idle_start = 0;
4846 
4847             nxt_debug(task, "app '%V' move port %PI:%d from idle_ports "
4848                       "to spare_ports",
4849                       &app->name, idle_port->pid, idle_port->id);
4850         }
4851     }
4852 
4853     app->processes--;
4854 
4855     start_process = !task->thread->engine->shutdown
4856                     && nxt_router_app_can_start(app)
4857                     && nxt_router_app_need_start(app);
4858 
4859     if (start_process) {
4860         app->pending_processes++;
4861     }
4862 
4863     nxt_thread_mutex_unlock(&app->mutex);
4864 
4865     nxt_debug(task, "app '%V' pid %PI closed", &app->name, port->pid);
4866 
4867     if (unchain) {
4868         nxt_port_use(task, port, -1);
4869     }
4870 
4871     if (start_process) {
4872         nxt_router_start_app_process(task, app);
4873     }
4874 }
4875 
4876 
4877 static void
nxt_router_adjust_idle_timer(nxt_task_t * task,void * obj,void * data)4878 nxt_router_adjust_idle_timer(nxt_task_t *task, void *obj, void *data)
4879 {
4880     nxt_app_t           *app;
4881     nxt_bool_t          queued;
4882     nxt_port_t          *port;
4883     nxt_msec_t          timeout, threshold;
4884     nxt_queue_link_t    *lnk;
4885     nxt_event_engine_t  *engine;
4886 
4887     app = obj;
4888     queued = (data == app);
4889 
4890     nxt_debug(task, "nxt_router_adjust_idle_timer: app \"%V\", queued %b",
4891               &app->name, queued);
4892 
4893     engine = task->thread->engine;
4894 
4895     nxt_assert(app->engine == engine);
4896 
4897     threshold = engine->timers.now + app->joint->idle_timer.bias;
4898     timeout = 0;
4899 
4900     nxt_thread_mutex_lock(&app->mutex);
4901 
4902     if (queued) {
4903         app->adjust_idle_work.data = NULL;
4904     }
4905 
4906     nxt_debug(task, "app '%V' idle_processes %d, spare_processes %d",
4907               &app->name,
4908               (int) app->idle_processes, (int) app->spare_processes);
4909 
4910     while (app->idle_processes > app->spare_processes) {
4911 
4912         nxt_assert(!nxt_queue_is_empty(&app->idle_ports));
4913 
4914         lnk = nxt_queue_first(&app->idle_ports);
4915         port = nxt_queue_link_data(lnk, nxt_port_t, idle_link);
4916 
4917         timeout = port->idle_start + app->idle_timeout;
4918 
4919         nxt_debug(task, "app '%V' pid %PI, start %M, timeout %M, threshold %M",
4920                   &app->name, port->pid,
4921                   port->idle_start, timeout, threshold);
4922 
4923         if (timeout > threshold) {
4924             break;
4925         }
4926 
4927         nxt_queue_remove(lnk);
4928         lnk->next = NULL;
4929 
4930         nxt_debug(task, "app '%V' move port %PI:%d out of idle_ports (timeout)",
4931                   &app->name, port->pid, port->id);
4932 
4933         nxt_queue_chk_remove(&port->app_link);
4934 
4935         nxt_port_hash_remove(&app->port_hash, port);
4936         app->port_hash_count--;
4937 
4938         app->idle_processes--;
4939         app->processes--;
4940         port->app = NULL;
4941 
4942         nxt_thread_mutex_unlock(&app->mutex);
4943 
4944         nxt_debug(task, "app '%V' send QUIT to idle port %PI",
4945                   &app->name, port->pid);
4946 
4947         nxt_port_socket_write(task, port, NXT_PORT_MSG_QUIT, -1, 0, 0, NULL);
4948 
4949         nxt_port_use(task, port, -1);
4950 
4951         nxt_thread_mutex_lock(&app->mutex);
4952     }
4953 
4954     nxt_thread_mutex_unlock(&app->mutex);
4955 
4956     if (timeout > threshold) {
4957         nxt_timer_add(engine, &app->joint->idle_timer, timeout - threshold);
4958 
4959     } else {
4960         nxt_timer_disable(engine, &app->joint->idle_timer);
4961     }
4962 
4963     if (queued) {
4964         nxt_router_app_use(task, app, -1);
4965     }
4966 }
4967 
4968 
4969 static void
nxt_router_app_idle_timeout(nxt_task_t * task,void * obj,void * data)4970 nxt_router_app_idle_timeout(nxt_task_t *task, void *obj, void *data)
4971 {
4972     nxt_timer_t      *timer;
4973     nxt_app_joint_t  *app_joint;
4974 
4975     timer = obj;
4976     app_joint = nxt_container_of(timer, nxt_app_joint_t, idle_timer);
4977 
4978     if (nxt_fast_path(app_joint->app != NULL)) {
4979         nxt_router_adjust_idle_timer(task, app_joint->app, NULL);
4980     }
4981 }
4982 
4983 
4984 static void
nxt_router_app_joint_release_handler(nxt_task_t * task,void * obj,void * data)4985 nxt_router_app_joint_release_handler(nxt_task_t *task, void *obj, void *data)
4986 {
4987     nxt_timer_t      *timer;
4988     nxt_app_joint_t  *app_joint;
4989 
4990     timer = obj;
4991     app_joint = nxt_container_of(timer, nxt_app_joint_t, idle_timer);
4992 
4993     nxt_router_app_joint_use(task, app_joint, -1);
4994 }
4995 
4996 
4997 static void
nxt_router_free_app(nxt_task_t * task,void * obj,void * data)4998 nxt_router_free_app(nxt_task_t *task, void *obj, void *data)
4999 {
5000     nxt_app_t        *app;
5001     nxt_port_t       *port, *proto_port;
5002     nxt_app_joint_t  *app_joint;
5003 
5004     app_joint = obj;
5005     app = app_joint->app;
5006 
5007     for ( ;; ) {
5008         port = nxt_router_app_get_port_for_quit(task, app);
5009         if (port == NULL) {
5010             break;
5011         }
5012 
5013         nxt_port_use(task, port, -1);
5014     }
5015 
5016     nxt_thread_mutex_lock(&app->mutex);
5017 
5018     for ( ;; ) {
5019         port = nxt_port_hash_retrieve(&app->port_hash);
5020         if (port == NULL) {
5021             break;
5022         }
5023 
5024         app->port_hash_count--;
5025 
5026         port->app = NULL;
5027 
5028         nxt_port_close(task, port);
5029 
5030         nxt_port_use(task, port, -1);
5031     }
5032 
5033     proto_port = app->proto_port;
5034 
5035     if (proto_port != NULL) {
5036         nxt_debug(task, "send QUIT to prototype '%V' pid %PI", &app->name,
5037                   proto_port->pid);
5038 
5039         app->proto_port = NULL;
5040         proto_port->app = NULL;
5041     }
5042 
5043     nxt_thread_mutex_unlock(&app->mutex);
5044 
5045     if (proto_port != NULL) {
5046         nxt_port_socket_write(task, proto_port, NXT_PORT_MSG_QUIT,
5047                               -1, 0, 0, NULL);
5048 
5049         nxt_port_close(task, proto_port);
5050 
5051         nxt_port_use(task, proto_port, -1);
5052     }
5053 
5054     nxt_assert(app->proto_port == NULL);
5055     nxt_assert(app->processes == 0);
5056     nxt_assert(app->active_requests == 0);
5057     nxt_assert(app->port_hash_count == 0);
5058     nxt_assert(app->idle_processes == 0);
5059     nxt_assert(nxt_queue_is_empty(&app->ports));
5060     nxt_assert(nxt_queue_is_empty(&app->spare_ports));
5061     nxt_assert(nxt_queue_is_empty(&app->idle_ports));
5062 
5063     nxt_port_mmaps_destroy(&app->outgoing, 1);
5064 
5065     nxt_thread_mutex_destroy(&app->outgoing.mutex);
5066 
5067     if (app->shared_port != NULL) {
5068         app->shared_port->app = NULL;
5069         nxt_port_close(task, app->shared_port);
5070         nxt_port_use(task, app->shared_port, -1);
5071 
5072         app->shared_port = NULL;
5073     }
5074 
5075     nxt_thread_mutex_destroy(&app->mutex);
5076     nxt_mp_destroy(app->mem_pool);
5077 
5078     app_joint->app = NULL;
5079 
5080     if (nxt_timer_delete(task->thread->engine, &app_joint->idle_timer)) {
5081         app_joint->idle_timer.handler = nxt_router_app_joint_release_handler;
5082         nxt_timer_add(task->thread->engine, &app_joint->idle_timer, 0);
5083 
5084     } else {
5085         nxt_router_app_joint_use(task, app_joint, -1);
5086     }
5087 }
5088 
5089 
5090 static void
nxt_router_app_port_get(nxt_task_t * task,nxt_app_t * app,nxt_request_rpc_data_t * req_rpc_data)5091 nxt_router_app_port_get(nxt_task_t *task, nxt_app_t *app,
5092     nxt_request_rpc_data_t *req_rpc_data)
5093 {
5094     nxt_bool_t          start_process;
5095     nxt_port_t          *port;
5096     nxt_http_request_t  *r;
5097 
5098     start_process = 0;
5099 
5100     nxt_thread_mutex_lock(&app->mutex);
5101 
5102     port = app->shared_port;
5103     nxt_port_inc_use(port);
5104 
5105     app->active_requests++;
5106 
5107     if (nxt_router_app_can_start(app) && nxt_router_app_need_start(app)) {
5108         app->pending_processes++;
5109         start_process = 1;
5110     }
5111 
5112     r = req_rpc_data->request;
5113 
5114     /*
5115      * Put request into application-wide list to be able to cancel request
5116      * if something goes wrong with application processes.
5117      */
5118     nxt_queue_insert_tail(&app->ack_waiting_req, &r->app_link);
5119 
5120     nxt_thread_mutex_unlock(&app->mutex);
5121 
5122     /*
5123      * Retain request memory pool while request is linked in ack_waiting_req
5124      * to guarantee request structure memory is accessble.
5125      */
5126     nxt_mp_retain(r->mem_pool);
5127 
5128     req_rpc_data->app_port = port;
5129     req_rpc_data->apr_action = NXT_APR_REQUEST_FAILED;
5130 
5131     if (start_process) {
5132         nxt_router_start_app_process(task, app);
5133     }
5134 }
5135 
5136 
5137 void
nxt_router_process_http_request(nxt_task_t * task,nxt_http_request_t * r,nxt_http_action_t * action)5138 nxt_router_process_http_request(nxt_task_t *task, nxt_http_request_t *r,
5139     nxt_http_action_t *action)
5140 {
5141     nxt_event_engine_t      *engine;
5142     nxt_http_app_conf_t     *conf;
5143     nxt_request_rpc_data_t  *req_rpc_data;
5144 
5145     conf = action->u.conf;
5146     engine = task->thread->engine;
5147 
5148     r->app_target = conf->target;
5149 
5150     req_rpc_data = nxt_port_rpc_register_handler_ex(task, engine->port,
5151                                           nxt_router_response_ready_handler,
5152                                           nxt_router_response_error_handler,
5153                                           sizeof(nxt_request_rpc_data_t));
5154     if (nxt_slow_path(req_rpc_data == NULL)) {
5155         nxt_http_request_error(task, r, NXT_HTTP_INTERNAL_SERVER_ERROR);
5156         return;
5157     }
5158 
5159     /*
5160      * At this point we have request req_rpc_data allocated and registered
5161      * in port handlers.  Need to fixup request memory pool.  Counterpart
5162      * release will be called via following call chain:
5163      *    nxt_request_rpc_data_unlink() ->
5164      *        nxt_router_http_request_release_post() ->
5165      *            nxt_router_http_request_release()
5166      */
5167     nxt_mp_retain(r->mem_pool);
5168 
5169     r->timer.task = &engine->task;
5170     r->timer.work_queue = &engine->fast_work_queue;
5171     r->timer.log = engine->task.log;
5172     r->timer.bias = NXT_TIMER_DEFAULT_BIAS;
5173 
5174     r->engine = engine;
5175     r->err_work.handler = nxt_router_http_request_error;
5176     r->err_work.task = task;
5177     r->err_work.obj = r;
5178 
5179     req_rpc_data->stream = nxt_port_rpc_ex_stream(req_rpc_data);
5180     req_rpc_data->app = conf->app;
5181     req_rpc_data->msg_info.body_fd = -1;
5182     req_rpc_data->rpc_cancel = 1;
5183 
5184     nxt_router_app_use(task, conf->app, 1);
5185 
5186     req_rpc_data->request = r;
5187     r->req_rpc_data = req_rpc_data;
5188 
5189     if (r->last != NULL) {
5190         r->last->completion_handler = nxt_router_http_request_done;
5191     }
5192 
5193     nxt_router_app_port_get(task, conf->app, req_rpc_data);
5194     nxt_router_app_prepare_request(task, req_rpc_data);
5195 }
5196 
5197 
5198 static void
nxt_router_http_request_error(nxt_task_t * task,void * obj,void * data)5199 nxt_router_http_request_error(nxt_task_t *task, void *obj, void *data)
5200 {
5201     nxt_http_request_t  *r;
5202 
5203     r = obj;
5204 
5205     nxt_debug(task, "router http request error (rpc_data %p)", r->req_rpc_data);
5206 
5207     nxt_http_request_error(task, r, NXT_HTTP_SERVICE_UNAVAILABLE);
5208 
5209     if (r->req_rpc_data != NULL) {
5210         nxt_request_rpc_data_unlink(task, r->req_rpc_data);
5211     }
5212 
5213     nxt_mp_release(r->mem_pool);
5214 }
5215 
5216 
5217 static void
nxt_router_http_request_done(nxt_task_t * task,void * obj,void * data)5218 nxt_router_http_request_done(nxt_task_t *task, void *obj, void *data)
5219 {
5220     nxt_http_request_t  *r;
5221 
5222     r = data;
5223 
5224     nxt_debug(task, "router http request done (rpc_data %p)", r->req_rpc_data);
5225 
5226     if (r->req_rpc_data != NULL) {
5227         nxt_request_rpc_data_unlink(task, r->req_rpc_data);
5228     }
5229 
5230     nxt_http_request_close_handler(task, r, r->proto.any);
5231 }
5232 
5233 
5234 static void
nxt_router_app_prepare_request(nxt_task_t * task,nxt_request_rpc_data_t * req_rpc_data)5235 nxt_router_app_prepare_request(nxt_task_t *task,
5236     nxt_request_rpc_data_t *req_rpc_data)
5237 {
5238     nxt_app_t         *app;
5239     nxt_buf_t         *buf, *body;
5240     nxt_int_t         res;
5241     nxt_port_t        *port, *reply_port;
5242 
5243     int                   notify;
5244     struct {
5245         nxt_port_msg_t       pm;
5246         nxt_port_mmap_msg_t  mm;
5247     } msg;
5248 
5249 
5250     app = req_rpc_data->app;
5251 
5252     nxt_assert(app != NULL);
5253 
5254     port = req_rpc_data->app_port;
5255 
5256     nxt_assert(port != NULL);
5257     nxt_assert(port->queue != NULL);
5258 
5259     reply_port = task->thread->engine->port;
5260 
5261     buf = nxt_router_prepare_msg(task, req_rpc_data->request, app,
5262                                  nxt_app_msg_prefix[app->type]);
5263     if (nxt_slow_path(buf == NULL)) {
5264         nxt_alert(task, "stream #%uD, app '%V': failed to prepare app message",
5265                   req_rpc_data->stream, &app->name);
5266 
5267         nxt_http_request_error(task, req_rpc_data->request,
5268                                NXT_HTTP_INTERNAL_SERVER_ERROR);
5269 
5270         return;
5271     }
5272 
5273     nxt_debug(task, "about to send %O bytes buffer to app process port %d",
5274                     nxt_buf_used_size(buf),
5275                     port->socket.fd);
5276 
5277     req_rpc_data->msg_info.buf = buf;
5278 
5279     body = req_rpc_data->request->body;
5280 
5281     if (body != NULL && nxt_buf_is_file(body)) {
5282         req_rpc_data->msg_info.body_fd = body->file->fd;
5283 
5284         body->file->fd = -1;
5285 
5286     } else {
5287         req_rpc_data->msg_info.body_fd = -1;
5288     }
5289 
5290     msg.pm.stream = req_rpc_data->stream;
5291     msg.pm.pid = reply_port->pid;
5292     msg.pm.reply_port = reply_port->id;
5293     msg.pm.type = NXT_PORT_MSG_REQ_HEADERS;
5294     msg.pm.last = 0;
5295     msg.pm.mmap = 1;
5296     msg.pm.nf = 0;
5297     msg.pm.mf = 0;
5298 
5299     nxt_port_mmap_handler_t *mmap_handler = buf->parent;
5300     nxt_port_mmap_header_t *hdr = mmap_handler->hdr;
5301 
5302     msg.mm.mmap_id = hdr->id;
5303     msg.mm.chunk_id = nxt_port_mmap_chunk_id(hdr, buf->mem.pos);
5304     msg.mm.size = nxt_buf_used_size(buf);
5305 
5306     res = nxt_app_queue_send(port->queue, &msg, sizeof(msg),
5307                              req_rpc_data->stream, &notify,
5308                              &req_rpc_data->msg_info.tracking_cookie);
5309     if (nxt_fast_path(res == NXT_OK)) {
5310         if (notify != 0) {
5311             (void) nxt_port_socket_write(task, port,
5312                                          NXT_PORT_MSG_READ_QUEUE,
5313                                          -1, req_rpc_data->stream,
5314                                          reply_port->id, NULL);
5315 
5316         } else {
5317             nxt_debug(task, "queue is not empty");
5318         }
5319 
5320         buf->is_port_mmap_sent = 1;
5321         buf->mem.pos = buf->mem.free;
5322 
5323     } else {
5324         nxt_alert(task, "stream #%uD, app '%V': failed to send app message",
5325                   req_rpc_data->stream, &app->name);
5326 
5327         nxt_http_request_error(task, req_rpc_data->request,
5328                                NXT_HTTP_INTERNAL_SERVER_ERROR);
5329     }
5330 }
5331 
5332 
5333 struct nxt_fields_iter_s {
5334     nxt_list_part_t   *part;
5335     nxt_http_field_t  *field;
5336 };
5337 
5338 typedef struct nxt_fields_iter_s  nxt_fields_iter_t;
5339 
5340 
5341 static nxt_http_field_t *
nxt_fields_part_first(nxt_list_part_t * part,nxt_fields_iter_t * i)5342 nxt_fields_part_first(nxt_list_part_t *part, nxt_fields_iter_t *i)
5343 {
5344     if (part == NULL) {
5345         return NULL;
5346     }
5347 
5348     while (part->nelts == 0) {
5349         part = part->next;
5350         if (part == NULL) {
5351             return NULL;
5352         }
5353     }
5354 
5355     i->part = part;
5356     i->field = nxt_list_data(i->part);
5357 
5358     return i->field;
5359 }
5360 
5361 
5362 static nxt_http_field_t *
nxt_fields_first(nxt_list_t * fields,nxt_fields_iter_t * i)5363 nxt_fields_first(nxt_list_t *fields, nxt_fields_iter_t *i)
5364 {
5365     return nxt_fields_part_first(nxt_list_part(fields), i);
5366 }
5367 
5368 
5369 static nxt_http_field_t *
nxt_fields_next(nxt_fields_iter_t * i)5370 nxt_fields_next(nxt_fields_iter_t *i)
5371 {
5372     nxt_http_field_t  *end = nxt_list_data(i->part);
5373 
5374     end += i->part->nelts;
5375     i->field++;
5376 
5377     if (i->field < end) {
5378         return i->field;
5379     }
5380 
5381     return nxt_fields_part_first(i->part->next, i);
5382 }
5383 
5384 
5385 static nxt_buf_t *
nxt_router_prepare_msg(nxt_task_t * task,nxt_http_request_t * r,nxt_app_t * app,const nxt_str_t * prefix)5386 nxt_router_prepare_msg(nxt_task_t *task, nxt_http_request_t *r,
5387     nxt_app_t *app, const nxt_str_t *prefix)
5388 {
5389     void                *target_pos, *query_pos;
5390     u_char              *pos, *end, *p, c;
5391     size_t              fields_count, req_size, size, free_size;
5392     size_t              copy_size;
5393     nxt_off_t           content_length;
5394     nxt_buf_t           *b, *buf, *out, **tail;
5395     nxt_http_field_t    *field, *dup;
5396     nxt_unit_field_t    *dst_field;
5397     nxt_fields_iter_t   iter, dup_iter;
5398     nxt_unit_request_t  *req;
5399 
5400     req_size = sizeof(nxt_unit_request_t)
5401                + r->method->length + 1
5402                + r->version.length + 1
5403                + r->remote->address_length + 1
5404                + r->local->address_length + 1
5405                + nxt_sockaddr_port_length(r->local) + 1
5406                + r->server_name.length + 1
5407                + r->target.length + 1
5408                + (r->path->start != r->target.start ? r->path->length + 1 : 0);
5409 
5410     content_length = r->content_length_n < 0 ? 0 : r->content_length_n;
5411     fields_count = 0;
5412 
5413     nxt_list_each(field, r->fields) {
5414         fields_count++;
5415 
5416         req_size += field->name_length + prefix->length + 1
5417                     + field->value_length + 1;
5418     } nxt_list_loop;
5419 
5420     req_size += fields_count * sizeof(nxt_unit_field_t);
5421 
5422     if (nxt_slow_path(req_size > PORT_MMAP_DATA_SIZE)) {
5423         nxt_alert(task, "headers to big to fit in shared memory (%d)",
5424                   (int) req_size);
5425 
5426         return NULL;
5427     }
5428 
5429     out = nxt_port_mmap_get_buf(task, &app->outgoing,
5430               nxt_min(req_size + content_length, PORT_MMAP_DATA_SIZE));
5431     if (nxt_slow_path(out == NULL)) {
5432         return NULL;
5433     }
5434 
5435     req = (nxt_unit_request_t *) out->mem.free;
5436     out->mem.free += req_size;
5437 
5438     req->app_target = r->app_target;
5439 
5440     req->content_length = content_length;
5441 
5442     p = (u_char *) (req->fields + fields_count);
5443 
5444     nxt_debug(task, "fields_count=%d", (int) fields_count);
5445 
5446     req->method_length = r->method->length;
5447     nxt_unit_sptr_set(&req->method, p);
5448     p = nxt_cpymem(p, r->method->start, r->method->length);
5449     *p++ = '\0';
5450 
5451     req->version_length = r->version.length;
5452     nxt_unit_sptr_set(&req->version, p);
5453     p = nxt_cpymem(p, r->version.start, r->version.length);
5454     *p++ = '\0';
5455 
5456     req->remote_length = r->remote->address_length;
5457     nxt_unit_sptr_set(&req->remote, p);
5458     p = nxt_cpymem(p, nxt_sockaddr_address(r->remote),
5459                    r->remote->address_length);
5460     *p++ = '\0';
5461 
5462     req->local_addr_length = r->local->address_length;
5463     nxt_unit_sptr_set(&req->local_addr, p);
5464     p = nxt_cpymem(p, nxt_sockaddr_address(r->local), r->local->address_length);
5465     *p++ = '\0';
5466 
5467     req->local_port_length = nxt_sockaddr_port_length(r->local);
5468     nxt_unit_sptr_set(&req->local_port, p);
5469     p = nxt_cpymem(p, nxt_sockaddr_port(r->local),
5470                    nxt_sockaddr_port_length(r->local));
5471     *p++ = '\0';
5472 
5473     req->tls = r->tls;
5474     req->websocket_handshake = r->websocket_handshake;
5475 
5476     req->server_name_length = r->server_name.length;
5477     nxt_unit_sptr_set(&req->server_name, p);
5478     p = nxt_cpymem(p, r->server_name.start, r->server_name.length);
5479     *p++ = '\0';
5480 
5481     target_pos = p;
5482     req->target_length = (uint32_t) r->target.length;
5483     nxt_unit_sptr_set(&req->target, p);
5484     p = nxt_cpymem(p, r->target.start, r->target.length);
5485     *p++ = '\0';
5486 
5487     req->path_length = (uint32_t) r->path->length;
5488     if (r->path->start == r->target.start) {
5489         nxt_unit_sptr_set(&req->path, target_pos);
5490 
5491     } else {
5492         nxt_unit_sptr_set(&req->path, p);
5493         p = nxt_cpymem(p, r->path->start, r->path->length);
5494         *p++ = '\0';
5495     }
5496 
5497     req->query_length = (uint32_t) r->args->length;
5498     if (r->args->start != NULL) {
5499         query_pos = nxt_pointer_to(target_pos,
5500                                    r->args->start - r->target.start);
5501 
5502         nxt_unit_sptr_set(&req->query, query_pos);
5503 
5504     } else {
5505         req->query.offset = 0;
5506     }
5507 
5508     req->content_length_field = NXT_UNIT_NONE_FIELD;
5509     req->content_type_field   = NXT_UNIT_NONE_FIELD;
5510     req->cookie_field         = NXT_UNIT_NONE_FIELD;
5511     req->authorization_field  = NXT_UNIT_NONE_FIELD;
5512 
5513     dst_field = req->fields;
5514 
5515     for (field = nxt_fields_first(r->fields, &iter);
5516          field != NULL;
5517          field = nxt_fields_next(&iter))
5518     {
5519         if (field->skip) {
5520             continue;
5521         }
5522 
5523         dst_field->hash = field->hash;
5524         dst_field->skip = 0;
5525         dst_field->name_length = field->name_length + prefix->length;
5526         dst_field->value_length = field->value_length;
5527 
5528         if (field == r->content_length) {
5529             req->content_length_field = dst_field - req->fields;
5530 
5531         } else if (field == r->content_type) {
5532             req->content_type_field = dst_field - req->fields;
5533 
5534         } else if (field == r->cookie) {
5535             req->cookie_field = dst_field - req->fields;
5536 
5537         } else if (field == r->authorization) {
5538             req->authorization_field = dst_field - req->fields;
5539         }
5540 
5541         nxt_debug(task, "add field 0x%04Xd, %d, %d, %p : %d %p",
5542                   (int) field->hash, (int) field->skip,
5543                   (int) field->name_length, field->name,
5544                   (int) field->value_length, field->value);
5545 
5546         if (prefix->length != 0) {
5547             nxt_unit_sptr_set(&dst_field->name, p);
5548             p = nxt_cpymem(p, prefix->start, prefix->length);
5549 
5550             end = field->name + field->name_length;
5551             for (pos = field->name; pos < end; pos++) {
5552                 c = *pos;
5553 
5554                 if (c >= 'a' && c <= 'z') {
5555                     *p++ = (c & ~0x20);
5556                     continue;
5557                 }
5558 
5559                 if (c == '-') {
5560                     *p++ = '_';
5561                     continue;
5562                 }
5563 
5564                 *p++ = c;
5565             }
5566 
5567         } else {
5568             nxt_unit_sptr_set(&dst_field->name, p);
5569             p = nxt_cpymem(p, field->name, field->name_length);
5570         }
5571 
5572         *p++ = '\0';
5573 
5574         nxt_unit_sptr_set(&dst_field->value, p);
5575         p = nxt_cpymem(p, field->value, field->value_length);
5576 
5577         if (prefix->length != 0) {
5578             dup_iter = iter;
5579 
5580             for (dup = nxt_fields_next(&dup_iter);
5581                  dup != NULL;
5582                  dup = nxt_fields_next(&dup_iter))
5583             {
5584                 if (dup->name_length != field->name_length
5585                     || dup->skip
5586                     || dup->hash != field->hash
5587                     || nxt_memcasecmp(dup->name, field->name, dup->name_length))
5588                 {
5589                     continue;
5590                 }
5591 
5592                 p = nxt_cpymem(p, ", ", 2);
5593                 p = nxt_cpymem(p, dup->value, dup->value_length);
5594 
5595                 dst_field->value_length += 2 + dup->value_length;
5596 
5597                 dup->skip = 1;
5598             }
5599         }
5600 
5601         *p++ = '\0';
5602 
5603         dst_field++;
5604     }
5605 
5606     req->fields_count = (uint32_t) (dst_field - req->fields);
5607 
5608     nxt_unit_sptr_set(&req->preread_content, out->mem.free);
5609 
5610     buf = out;
5611     tail = &buf->next;
5612 
5613     for (b = r->body; b != NULL; b = b->next) {
5614         size = nxt_buf_mem_used_size(&b->mem);
5615         pos = b->mem.pos;
5616 
5617         while (size > 0) {
5618             if (buf == NULL) {
5619                 free_size = nxt_min(size, PORT_MMAP_DATA_SIZE);
5620 
5621                 buf = nxt_port_mmap_get_buf(task, &app->outgoing, free_size);
5622                 if (nxt_slow_path(buf == NULL)) {
5623                     while (out != NULL) {
5624                         buf = out->next;
5625                         out->next = NULL;
5626                         out->completion_handler(task, out, out->parent);
5627                         out = buf;
5628                     }
5629                     return NULL;
5630                 }
5631 
5632                 *tail = buf;
5633                 tail = &buf->next;
5634 
5635             } else {
5636                 free_size = nxt_buf_mem_free_size(&buf->mem);
5637                 if (free_size < size
5638                     && nxt_port_mmap_increase_buf(task, buf, size, 1)
5639                        == NXT_OK)
5640                 {
5641                     free_size = nxt_buf_mem_free_size(&buf->mem);
5642                 }
5643             }
5644 
5645             if (free_size > 0) {
5646                 copy_size = nxt_min(free_size, size);
5647 
5648                 buf->mem.free = nxt_cpymem(buf->mem.free, pos, copy_size);
5649 
5650                 size -= copy_size;
5651                 pos += copy_size;
5652 
5653                 if (size == 0) {
5654                     break;
5655                 }
5656             }
5657 
5658             buf = NULL;
5659         }
5660     }
5661 
5662     return out;
5663 }
5664 
5665 
5666 static void
nxt_router_app_timeout(nxt_task_t * task,void * obj,void * data)5667 nxt_router_app_timeout(nxt_task_t *task, void *obj, void *data)
5668 {
5669     nxt_timer_t              *timer;
5670     nxt_http_request_t       *r;
5671     nxt_request_rpc_data_t   *req_rpc_data;
5672 
5673     timer = obj;
5674 
5675     nxt_debug(task, "router app timeout");
5676 
5677     r = nxt_timer_data(timer, nxt_http_request_t, timer);
5678     req_rpc_data = r->timer_data;
5679 
5680     nxt_http_request_error(task, r, NXT_HTTP_SERVICE_UNAVAILABLE);
5681 
5682     nxt_request_rpc_data_unlink(task, req_rpc_data);
5683 }
5684 
5685 
5686 static void
nxt_router_http_request_release_post(nxt_task_t * task,nxt_http_request_t * r)5687 nxt_router_http_request_release_post(nxt_task_t *task, nxt_http_request_t *r)
5688 {
5689     r->timer.handler = nxt_router_http_request_release;
5690     nxt_timer_add(task->thread->engine, &r->timer, 0);
5691 }
5692 
5693 
5694 static void
nxt_router_http_request_release(nxt_task_t * task,void * obj,void * data)5695 nxt_router_http_request_release(nxt_task_t *task, void *obj, void *data)
5696 {
5697     nxt_http_request_t  *r;
5698 
5699     nxt_debug(task, "http request pool release");
5700 
5701     r = nxt_timer_data(obj, nxt_http_request_t, timer);
5702 
5703     nxt_mp_release(r->mem_pool);
5704 }
5705 
5706 
5707 static void
nxt_router_oosm_handler(nxt_task_t * task,nxt_port_recv_msg_t * msg)5708 nxt_router_oosm_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
5709 {
5710     size_t                   mi;
5711     uint32_t                 i;
5712     nxt_bool_t               ack;
5713     nxt_process_t            *process;
5714     nxt_free_map_t           *m;
5715     nxt_port_mmap_handler_t  *mmap_handler;
5716 
5717     nxt_debug(task, "oosm in %PI", msg->port_msg.pid);
5718 
5719     process = nxt_runtime_process_find(task->thread->runtime,
5720                                        msg->port_msg.pid);
5721     if (nxt_slow_path(process == NULL)) {
5722         return;
5723     }
5724 
5725     ack = 0;
5726 
5727     /*
5728      * To mitigate possible racing condition (when OOSM message received
5729      * after some of the memory was already freed), need to try to find
5730      * first free segment in shared memory and send ACK if found.
5731      */
5732 
5733     nxt_thread_mutex_lock(&process->incoming.mutex);
5734 
5735     for (i = 0; i < process->incoming.size; i++) {
5736         mmap_handler = process->incoming.elts[i].mmap_handler;
5737 
5738         if (nxt_slow_path(mmap_handler == NULL)) {
5739             continue;
5740         }
5741 
5742         m = mmap_handler->hdr->free_map;
5743 
5744         for (mi = 0; mi < MAX_FREE_IDX; mi++) {
5745             if (m[mi] != 0) {
5746                 ack = 1;
5747 
5748                 nxt_debug(task, "oosm: already free #%uD %uz = 0x%08xA",
5749                           i, mi, m[mi]);
5750 
5751                 break;
5752             }
5753         }
5754     }
5755 
5756     nxt_thread_mutex_unlock(&process->incoming.mutex);
5757 
5758     if (ack) {
5759         nxt_process_broadcast_shm_ack(task, process);
5760     }
5761 }
5762 
5763 
5764 static void
nxt_router_get_mmap_handler(nxt_task_t * task,nxt_port_recv_msg_t * msg)5765 nxt_router_get_mmap_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
5766 {
5767     nxt_fd_t                 fd;
5768     nxt_port_t               *port;
5769     nxt_runtime_t            *rt;
5770     nxt_port_mmaps_t         *mmaps;
5771     nxt_port_msg_get_mmap_t  *get_mmap_msg;
5772     nxt_port_mmap_handler_t  *mmap_handler;
5773 
5774     rt = task->thread->runtime;
5775 
5776     port = nxt_runtime_port_find(rt, msg->port_msg.pid,
5777                                  msg->port_msg.reply_port);
5778     if (nxt_slow_path(port == NULL)) {
5779         nxt_alert(task, "get_mmap_handler: reply_port %PI:%d not found",
5780                   msg->port_msg.pid, msg->port_msg.reply_port);
5781 
5782         return;
5783     }
5784 
5785     if (nxt_slow_path(nxt_buf_used_size(msg->buf)
5786                       < (int) sizeof(nxt_port_msg_get_mmap_t)))
5787     {
5788         nxt_alert(task, "get_mmap_handler: message buffer too small (%d)",
5789                   (int) nxt_buf_used_size(msg->buf));
5790 
5791         return;
5792     }
5793 
5794     get_mmap_msg = (nxt_port_msg_get_mmap_t *) msg->buf->mem.pos;
5795 
5796     nxt_assert(port->type == NXT_PROCESS_APP);
5797 
5798     if (nxt_slow_path(port->app == NULL)) {
5799         nxt_alert(task, "get_mmap_handler: app == NULL for reply port %PI:%d",
5800                   port->pid, port->id);
5801 
5802         // FIXME
5803         nxt_port_socket_write(task, port, NXT_PORT_MSG_RPC_ERROR,
5804                               -1, msg->port_msg.stream, 0, NULL);
5805 
5806         return;
5807     }
5808 
5809     mmaps = &port->app->outgoing;
5810     nxt_thread_mutex_lock(&mmaps->mutex);
5811 
5812     if (nxt_slow_path(get_mmap_msg->id >= mmaps->size)) {
5813         nxt_thread_mutex_unlock(&mmaps->mutex);
5814 
5815         nxt_alert(task, "get_mmap_handler: mmap id is too big (%d)",
5816                   (int) get_mmap_msg->id);
5817 
5818         // FIXME
5819         nxt_port_socket_write(task, port, NXT_PORT_MSG_RPC_ERROR,
5820                               -1, msg->port_msg.stream, 0, NULL);
5821         return;
5822     }
5823 
5824     mmap_handler = mmaps->elts[get_mmap_msg->id].mmap_handler;
5825 
5826     fd = mmap_handler->fd;
5827 
5828     nxt_thread_mutex_unlock(&mmaps->mutex);
5829 
5830     nxt_debug(task, "get mmap %PI:%d found",
5831               msg->port_msg.pid, (int) get_mmap_msg->id);
5832 
5833     (void) nxt_port_socket_write(task, port, NXT_PORT_MSG_MMAP, fd, 0, 0, NULL);
5834 }
5835 
5836 
5837 static void
nxt_router_get_port_handler(nxt_task_t * task,nxt_port_recv_msg_t * msg)5838 nxt_router_get_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
5839 {
5840     nxt_port_t               *port, *reply_port;
5841     nxt_runtime_t            *rt;
5842     nxt_port_msg_get_port_t  *get_port_msg;
5843 
5844     rt = task->thread->runtime;
5845 
5846     reply_port = nxt_runtime_port_find(rt, msg->port_msg.pid,
5847                                        msg->port_msg.reply_port);
5848     if (nxt_slow_path(reply_port == NULL)) {
5849         nxt_alert(task, "get_port_handler: reply_port %PI:%d not found",
5850                   msg->port_msg.pid, msg->port_msg.reply_port);
5851 
5852         return;
5853     }
5854 
5855     if (nxt_slow_path(nxt_buf_used_size(msg->buf)
5856                       < (int) sizeof(nxt_port_msg_get_port_t)))
5857     {
5858         nxt_alert(task, "get_port_handler: message buffer too small (%d)",
5859                   (int) nxt_buf_used_size(msg->buf));
5860 
5861         return;
5862     }
5863 
5864     get_port_msg = (nxt_port_msg_get_port_t *) msg->buf->mem.pos;
5865 
5866     port = nxt_runtime_port_find(rt, get_port_msg->pid, get_port_msg->id);
5867     if (nxt_slow_path(port == NULL)) {
5868         nxt_alert(task, "get_port_handler: port %PI:%d not found",
5869                   get_port_msg->pid, get_port_msg->id);
5870 
5871         return;
5872     }
5873 
5874     nxt_debug(task, "get port %PI:%d found", get_port_msg->pid,
5875               get_port_msg->id);
5876 
5877     (void) nxt_port_send_port(task, reply_port, port, msg->port_msg.stream);
5878 }
5879