1
2 /*
3 * Copyright (C) Igor Sysoev
4 * Copyright (C) Valentin V. Bartenev
5 * Copyright (C) NGINX, Inc.
6 */
7
8 #include <nxt_router.h>
9 #include <nxt_conf.h>
10 #include <nxt_status.h>
11 #if (NXT_TLS)
12 #include <nxt_cert.h>
13 #endif
14 #if (NXT_HAVE_NJS)
15 #include <nxt_script.h>
16 #endif
17 #include <nxt_http.h>
18 #include <nxt_port_memory_int.h>
19 #include <nxt_unit_request.h>
20 #include <nxt_unit_response.h>
21 #include <nxt_router_request.h>
22 #include <nxt_app_queue.h>
23 #include <nxt_port_queue.h>
24
25 #define NXT_SHARED_PORT_ID 0xFFFFu
26
27 typedef struct {
28 nxt_str_t type;
29 uint32_t processes;
30 uint32_t max_processes;
31 uint32_t spare_processes;
32 nxt_msec_t timeout;
33 nxt_msec_t idle_timeout;
34 nxt_conf_value_t *limits_value;
35 nxt_conf_value_t *processes_value;
36 nxt_conf_value_t *targets_value;
37 } nxt_router_app_conf_t;
38
39
40 typedef struct {
41 nxt_str_t pass;
42 nxt_str_t application;
43 } nxt_router_listener_conf_t;
44
45
46 #if (NXT_TLS)
47
48 typedef struct {
49 nxt_str_t name;
50 nxt_socket_conf_t *socket_conf;
51 nxt_router_temp_conf_t *temp_conf;
52 nxt_tls_init_t *tls_init;
53 nxt_bool_t last;
54
55 nxt_queue_link_t link; /* for nxt_socket_conf_t.tls */
56 } nxt_router_tlssock_t;
57
58 #endif
59
60
61 #if (NXT_HAVE_NJS)
62
63 typedef struct {
64 nxt_str_t name;
65 nxt_router_temp_conf_t *temp_conf;
66 nxt_queue_link_t link;
67 } nxt_router_js_module_t;
68
69 #endif
70
71
72 typedef struct {
73 nxt_str_t *name;
74 nxt_socket_conf_t *socket_conf;
75 nxt_router_temp_conf_t *temp_conf;
76 nxt_bool_t last;
77 } nxt_socket_rpc_t;
78
79
80 typedef struct {
81 nxt_app_t *app;
82 nxt_router_temp_conf_t *temp_conf;
83 uint8_t proto; /* 1 bit */
84 } nxt_app_rpc_t;
85
86
87 typedef struct {
88 nxt_app_joint_t *app_joint;
89 uint32_t generation;
90 uint8_t proto; /* 1 bit */
91 } nxt_app_joint_rpc_t;
92
93
94 static nxt_int_t nxt_router_prefork(nxt_task_t *task, nxt_process_t *process,
95 nxt_mp_t *mp);
96 static nxt_int_t nxt_router_start(nxt_task_t *task, nxt_process_data_t *data);
97 static void nxt_router_greet_controller(nxt_task_t *task,
98 nxt_port_t *controller_port);
99
100 static nxt_int_t nxt_router_start_app_process(nxt_task_t *task, nxt_app_t *app);
101
102 static void nxt_router_new_port_handler(nxt_task_t *task,
103 nxt_port_recv_msg_t *msg);
104 static void nxt_router_conf_data_handler(nxt_task_t *task,
105 nxt_port_recv_msg_t *msg);
106 static void nxt_router_app_restart_handler(nxt_task_t *task,
107 nxt_port_recv_msg_t *msg);
108 static void nxt_router_status_handler(nxt_task_t *task,
109 nxt_port_recv_msg_t *msg);
110 static void nxt_router_remove_pid_handler(nxt_task_t *task,
111 nxt_port_recv_msg_t *msg);
112
113 static nxt_router_temp_conf_t *nxt_router_temp_conf(nxt_task_t *task);
114 static void nxt_router_conf_ready(nxt_task_t *task,
115 nxt_router_temp_conf_t *tmcf);
116 static void nxt_router_conf_send(nxt_task_t *task,
117 nxt_router_temp_conf_t *tmcf, nxt_port_msg_type_t type);
118
119 static nxt_int_t nxt_router_conf_create(nxt_task_t *task,
120 nxt_router_temp_conf_t *tmcf, u_char *start, u_char *end);
121 static nxt_int_t nxt_router_conf_process_static(nxt_task_t *task,
122 nxt_router_conf_t *rtcf, nxt_conf_value_t *conf);
123 static nxt_http_forward_t *nxt_router_conf_forward(nxt_task_t *task,
124 nxt_mp_t *mp, nxt_conf_value_t *conf);
125 static nxt_int_t nxt_router_conf_forward_header(nxt_mp_t *mp,
126 nxt_conf_value_t *conf, nxt_http_forward_header_t *fh);
127
128 static nxt_app_t *nxt_router_app_find(nxt_queue_t *queue, nxt_str_t *name);
129 static nxt_int_t nxt_router_apps_hash_test(nxt_lvlhsh_query_t *lhq, void *data);
130 static nxt_int_t nxt_router_apps_hash_add(nxt_router_conf_t *rtcf,
131 nxt_app_t *app);
132 static nxt_app_t *nxt_router_apps_hash_get(nxt_router_conf_t *rtcf,
133 nxt_str_t *name);
134 static void nxt_router_apps_hash_use(nxt_task_t *task, nxt_router_conf_t *rtcf,
135 int i);
136
137 static nxt_int_t nxt_router_app_queue_init(nxt_task_t *task,
138 nxt_port_t *port);
139 static nxt_int_t nxt_router_port_queue_init(nxt_task_t *task,
140 nxt_port_t *port);
141 static nxt_int_t nxt_router_port_queue_map(nxt_task_t *task,
142 nxt_port_t *port, nxt_fd_t fd);
143 static void nxt_router_listen_socket_rpc_create(nxt_task_t *task,
144 nxt_router_temp_conf_t *tmcf, nxt_socket_conf_t *skcf);
145 static void nxt_router_listen_socket_ready(nxt_task_t *task,
146 nxt_port_recv_msg_t *msg, void *data);
147 static void nxt_router_listen_socket_error(nxt_task_t *task,
148 nxt_port_recv_msg_t *msg, void *data);
149 #if (NXT_TLS)
150 static void nxt_router_tls_rpc_handler(nxt_task_t *task,
151 nxt_port_recv_msg_t *msg, void *data);
152 static nxt_int_t nxt_router_conf_tls_insert(nxt_router_temp_conf_t *tmcf,
153 nxt_conf_value_t *value, nxt_socket_conf_t *skcf, nxt_tls_init_t *tls_init,
154 nxt_bool_t last);
155 #endif
156 #if (NXT_HAVE_NJS)
157 static void nxt_router_js_module_rpc_handler(nxt_task_t *task,
158 nxt_port_recv_msg_t *msg, void *data);
159 static nxt_int_t nxt_router_js_module_insert(nxt_router_temp_conf_t *tmcf,
160 nxt_conf_value_t *value);
161 #endif
162 static void nxt_router_app_rpc_create(nxt_task_t *task,
163 nxt_router_temp_conf_t *tmcf, nxt_app_t *app);
164 static void nxt_router_app_prefork_ready(nxt_task_t *task,
165 nxt_port_recv_msg_t *msg, void *data);
166 static void nxt_router_app_prefork_error(nxt_task_t *task,
167 nxt_port_recv_msg_t *msg, void *data);
168 static nxt_socket_conf_t *nxt_router_socket_conf(nxt_task_t *task,
169 nxt_router_temp_conf_t *tmcf, nxt_str_t *name);
170 static nxt_int_t nxt_router_listen_socket_find(nxt_router_temp_conf_t *tmcf,
171 nxt_socket_conf_t *nskcf, nxt_sockaddr_t *sa);
172
173 static nxt_int_t nxt_router_engines_create(nxt_task_t *task,
174 nxt_router_t *router, nxt_router_temp_conf_t *tmcf,
175 const nxt_event_interface_t *interface);
176 static nxt_int_t nxt_router_engine_conf_create(nxt_router_temp_conf_t *tmcf,
177 nxt_router_engine_conf_t *recf);
178 static nxt_int_t nxt_router_engine_conf_update(nxt_router_temp_conf_t *tmcf,
179 nxt_router_engine_conf_t *recf);
180 static nxt_int_t nxt_router_engine_conf_delete(nxt_router_temp_conf_t *tmcf,
181 nxt_router_engine_conf_t *recf);
182 static nxt_int_t nxt_router_engine_joints_create(nxt_router_temp_conf_t *tmcf,
183 nxt_router_engine_conf_t *recf, nxt_queue_t *sockets,
184 nxt_work_handler_t handler);
185 static nxt_int_t nxt_router_engine_quit(nxt_router_temp_conf_t *tmcf,
186 nxt_router_engine_conf_t *recf);
187 static nxt_int_t nxt_router_engine_joints_delete(nxt_router_temp_conf_t *tmcf,
188 nxt_router_engine_conf_t *recf, nxt_queue_t *sockets);
189
190 static nxt_int_t nxt_router_threads_create(nxt_task_t *task, nxt_runtime_t *rt,
191 nxt_router_temp_conf_t *tmcf);
192 static nxt_int_t nxt_router_thread_create(nxt_task_t *task, nxt_runtime_t *rt,
193 nxt_event_engine_t *engine);
194 static void nxt_router_apps_sort(nxt_task_t *task, nxt_router_t *router,
195 nxt_router_temp_conf_t *tmcf);
196
197 static void nxt_router_engines_post(nxt_router_t *router,
198 nxt_router_temp_conf_t *tmcf);
199 static void nxt_router_engine_post(nxt_event_engine_t *engine,
200 nxt_work_t *jobs);
201
202 static void nxt_router_thread_start(void *data);
203 static void nxt_router_rt_add_port(nxt_task_t *task, void *obj,
204 void *data);
205 static void nxt_router_listen_socket_create(nxt_task_t *task, void *obj,
206 void *data);
207 static void nxt_router_listen_socket_update(nxt_task_t *task, void *obj,
208 void *data);
209 static void nxt_router_listen_socket_delete(nxt_task_t *task, void *obj,
210 void *data);
211 static void nxt_router_worker_thread_quit(nxt_task_t *task, void *obj,
212 void *data);
213 static void nxt_router_listen_socket_close(nxt_task_t *task, void *obj,
214 void *data);
215 static void nxt_router_thread_exit_handler(nxt_task_t *task, void *obj,
216 void *data);
217 static void nxt_router_req_headers_ack_handler(nxt_task_t *task,
218 nxt_port_recv_msg_t *msg, nxt_request_rpc_data_t *req_rpc_data);
219 static void nxt_router_listen_socket_release(nxt_task_t *task,
220 nxt_socket_conf_t *skcf);
221
222 static void nxt_router_app_port_ready(nxt_task_t *task,
223 nxt_port_recv_msg_t *msg, void *data);
224 static void nxt_router_app_port_error(nxt_task_t *task,
225 nxt_port_recv_msg_t *msg, void *data);
226
227 static void nxt_router_app_use(nxt_task_t *task, nxt_app_t *app, int i);
228 static void nxt_router_app_unlink(nxt_task_t *task, nxt_app_t *app);
229
230 static void nxt_router_app_port_release(nxt_task_t *task, nxt_app_t *app,
231 nxt_port_t *port, nxt_apr_action_t action);
232 static void nxt_router_app_port_get(nxt_task_t *task, nxt_app_t *app,
233 nxt_request_rpc_data_t *req_rpc_data);
234 static void nxt_router_http_request_error(nxt_task_t *task, void *obj,
235 void *data);
236 static void nxt_router_http_request_done(nxt_task_t *task, void *obj,
237 void *data);
238
239 static void nxt_router_app_prepare_request(nxt_task_t *task,
240 nxt_request_rpc_data_t *req_rpc_data);
241 static nxt_buf_t *nxt_router_prepare_msg(nxt_task_t *task,
242 nxt_http_request_t *r, nxt_app_t *app, const nxt_str_t *prefix);
243
244 static void nxt_router_app_timeout(nxt_task_t *task, void *obj, void *data);
245 static void nxt_router_adjust_idle_timer(nxt_task_t *task, void *obj,
246 void *data);
247 static void nxt_router_app_idle_timeout(nxt_task_t *task, void *obj,
248 void *data);
249 static void nxt_router_app_joint_release_handler(nxt_task_t *task, void *obj,
250 void *data);
251 static void nxt_router_free_app(nxt_task_t *task, void *obj, void *data);
252
253 static const nxt_http_request_state_t nxt_http_request_send_state;
254 static void nxt_http_request_send_body(nxt_task_t *task, void *obj, void *data);
255
256 static void nxt_router_app_joint_use(nxt_task_t *task,
257 nxt_app_joint_t *app_joint, int i);
258
259 static void nxt_router_http_request_release_post(nxt_task_t *task,
260 nxt_http_request_t *r);
261 static void nxt_router_http_request_release(nxt_task_t *task, void *obj,
262 void *data);
263 static void nxt_router_oosm_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg);
264 static void nxt_router_get_port_handler(nxt_task_t *task,
265 nxt_port_recv_msg_t *msg);
266 static void nxt_router_get_mmap_handler(nxt_task_t *task,
267 nxt_port_recv_msg_t *msg);
268
269 extern const nxt_http_request_state_t nxt_http_websocket;
270
271 nxt_router_t *nxt_router;
272
273 static const nxt_str_t http_prefix = nxt_string("HTTP_");
274 static const nxt_str_t empty_prefix = nxt_string("");
275
276 static const nxt_str_t *nxt_app_msg_prefix[] = {
277 &empty_prefix,
278 &empty_prefix,
279 &http_prefix,
280 &http_prefix,
281 &http_prefix,
282 &empty_prefix,
283 };
284
285
286 static const nxt_port_handlers_t nxt_router_process_port_handlers = {
287 .quit = nxt_signal_quit_handler,
288 .new_port = nxt_router_new_port_handler,
289 .get_port = nxt_router_get_port_handler,
290 .change_file = nxt_port_change_log_file_handler,
291 .mmap = nxt_port_mmap_handler,
292 .get_mmap = nxt_router_get_mmap_handler,
293 .data = nxt_router_conf_data_handler,
294 .app_restart = nxt_router_app_restart_handler,
295 .status = nxt_router_status_handler,
296 .remove_pid = nxt_router_remove_pid_handler,
297 .access_log = nxt_router_access_log_reopen_handler,
298 .rpc_ready = nxt_port_rpc_handler,
299 .rpc_error = nxt_port_rpc_handler,
300 .oosm = nxt_router_oosm_handler,
301 };
302
303
304 const nxt_process_init_t nxt_router_process = {
305 .name = "router",
306 .type = NXT_PROCESS_ROUTER,
307 .prefork = nxt_router_prefork,
308 .restart = 1,
309 .setup = nxt_process_core_setup,
310 .start = nxt_router_start,
311 .port_handlers = &nxt_router_process_port_handlers,
312 .signals = nxt_process_signals,
313 };
314
315
316 /* Queues of nxt_socket_conf_t */
317 nxt_queue_t creating_sockets;
318 nxt_queue_t pending_sockets;
319 nxt_queue_t updating_sockets;
320 nxt_queue_t keeping_sockets;
321 nxt_queue_t deleting_sockets;
322
323
324 static nxt_int_t
nxt_router_prefork(nxt_task_t * task,nxt_process_t * process,nxt_mp_t * mp)325 nxt_router_prefork(nxt_task_t *task, nxt_process_t *process, nxt_mp_t *mp)
326 {
327 nxt_runtime_stop_app_processes(task, task->thread->runtime);
328
329 return NXT_OK;
330 }
331
332
333 static nxt_int_t
nxt_router_start(nxt_task_t * task,nxt_process_data_t * data)334 nxt_router_start(nxt_task_t *task, nxt_process_data_t *data)
335 {
336 nxt_int_t ret;
337 nxt_port_t *controller_port;
338 nxt_router_t *router;
339 nxt_runtime_t *rt;
340
341 rt = task->thread->runtime;
342
343 nxt_log(task, NXT_LOG_INFO, "router started");
344
345 #if (NXT_TLS)
346 rt->tls = nxt_service_get(rt->services, "SSL/TLS", "OpenSSL");
347 if (nxt_slow_path(rt->tls == NULL)) {
348 return NXT_ERROR;
349 }
350
351 ret = rt->tls->library_init(task);
352 if (nxt_slow_path(ret != NXT_OK)) {
353 return ret;
354 }
355 #endif
356
357 ret = nxt_http_init(task);
358 if (nxt_slow_path(ret != NXT_OK)) {
359 return ret;
360 }
361
362 router = nxt_zalloc(sizeof(nxt_router_t));
363 if (nxt_slow_path(router == NULL)) {
364 return NXT_ERROR;
365 }
366
367 nxt_queue_init(&router->engines);
368 nxt_queue_init(&router->sockets);
369 nxt_queue_init(&router->apps);
370
371 nxt_router = router;
372
373 controller_port = rt->port_by_type[NXT_PROCESS_CONTROLLER];
374 if (controller_port != NULL) {
375 nxt_router_greet_controller(task, controller_port);
376 }
377
378 return NXT_OK;
379 }
380
381
382 static void
nxt_router_greet_controller(nxt_task_t * task,nxt_port_t * controller_port)383 nxt_router_greet_controller(nxt_task_t *task, nxt_port_t *controller_port)
384 {
385 nxt_port_socket_write(task, controller_port, NXT_PORT_MSG_PROCESS_READY,
386 -1, 0, 0, NULL);
387 }
388
389
390 static void
nxt_router_start_app_process_handler(nxt_task_t * task,nxt_port_t * port,void * data)391 nxt_router_start_app_process_handler(nxt_task_t *task, nxt_port_t *port,
392 void *data)
393 {
394 size_t size;
395 uint32_t stream;
396 nxt_fd_t port_fd, queue_fd;
397 nxt_int_t ret;
398 nxt_app_t *app;
399 nxt_buf_t *b;
400 nxt_port_t *dport;
401 nxt_runtime_t *rt;
402 nxt_app_joint_rpc_t *app_joint_rpc;
403
404 app = data;
405
406 nxt_thread_mutex_lock(&app->mutex);
407
408 dport = app->proto_port;
409
410 nxt_thread_mutex_unlock(&app->mutex);
411
412 if (dport != NULL) {
413 nxt_debug(task, "app '%V' %p start process", &app->name, app);
414
415 b = NULL;
416 port_fd = -1;
417 queue_fd = -1;
418
419 } else {
420 if (app->proto_port_requests > 0) {
421 nxt_debug(task, "app '%V' %p wait for prototype process",
422 &app->name, app);
423
424 app->proto_port_requests++;
425
426 goto skip;
427 }
428
429 nxt_debug(task, "app '%V' %p start prototype process", &app->name, app);
430
431 rt = task->thread->runtime;
432 dport = rt->port_by_type[NXT_PROCESS_MAIN];
433
434 size = app->name.length + 1 + app->conf.length;
435
436 b = nxt_buf_mem_alloc(task->thread->engine->mem_pool, size, 0);
437 if (nxt_slow_path(b == NULL)) {
438 goto failed;
439 }
440
441 nxt_buf_cpystr(b, &app->name);
442 *b->mem.free++ = '\0';
443 nxt_buf_cpystr(b, &app->conf);
444
445 port_fd = app->shared_port->pair[0];
446 queue_fd = app->shared_port->queue_fd;
447 }
448
449 app_joint_rpc = nxt_port_rpc_register_handler_ex(task, port,
450 nxt_router_app_port_ready,
451 nxt_router_app_port_error,
452 sizeof(nxt_app_joint_rpc_t));
453 if (nxt_slow_path(app_joint_rpc == NULL)) {
454 goto failed;
455 }
456
457 stream = nxt_port_rpc_ex_stream(app_joint_rpc);
458
459 ret = nxt_port_socket_write2(task, dport, NXT_PORT_MSG_START_PROCESS,
460 port_fd, queue_fd, stream, port->id, b);
461 if (nxt_slow_path(ret != NXT_OK)) {
462 nxt_port_rpc_cancel(task, port, stream);
463
464 goto failed;
465 }
466
467 app_joint_rpc->app_joint = app->joint;
468 app_joint_rpc->generation = app->generation;
469 app_joint_rpc->proto = (b != NULL);
470
471 if (b != NULL) {
472 app->proto_port_requests++;
473
474 b = NULL;
475 }
476
477 nxt_router_app_joint_use(task, app->joint, 1);
478
479 failed:
480
481 if (b != NULL) {
482 nxt_mp_free(b->data, b);
483 }
484
485 skip:
486
487 nxt_router_app_use(task, app, -1);
488 }
489
490
491 static void
nxt_router_app_joint_use(nxt_task_t * task,nxt_app_joint_t * app_joint,int i)492 nxt_router_app_joint_use(nxt_task_t *task, nxt_app_joint_t *app_joint, int i)
493 {
494 app_joint->use_count += i;
495
496 if (app_joint->use_count == 0) {
497 nxt_assert(app_joint->app == NULL);
498
499 nxt_free(app_joint);
500 }
501 }
502
503
504 static nxt_int_t
nxt_router_start_app_process(nxt_task_t * task,nxt_app_t * app)505 nxt_router_start_app_process(nxt_task_t *task, nxt_app_t *app)
506 {
507 nxt_int_t res;
508 nxt_port_t *router_port;
509 nxt_runtime_t *rt;
510
511 nxt_debug(task, "app '%V' start process", &app->name);
512
513 rt = task->thread->runtime;
514 router_port = rt->port_by_type[NXT_PROCESS_ROUTER];
515
516 nxt_router_app_use(task, app, 1);
517
518 res = nxt_port_post(task, router_port, nxt_router_start_app_process_handler,
519 app);
520
521 if (res == NXT_OK) {
522 return res;
523 }
524
525 nxt_thread_mutex_lock(&app->mutex);
526
527 app->pending_processes--;
528
529 nxt_thread_mutex_unlock(&app->mutex);
530
531 nxt_router_app_use(task, app, -1);
532
533 return NXT_ERROR;
534 }
535
536
537 nxt_inline nxt_bool_t
nxt_router_msg_cancel(nxt_task_t * task,nxt_request_rpc_data_t * req_rpc_data)538 nxt_router_msg_cancel(nxt_task_t *task, nxt_request_rpc_data_t *req_rpc_data)
539 {
540 nxt_buf_t *b, *next;
541 nxt_bool_t cancelled;
542 nxt_port_t *app_port;
543 nxt_msg_info_t *msg_info;
544
545 msg_info = &req_rpc_data->msg_info;
546
547 if (msg_info->buf == NULL) {
548 return 0;
549 }
550
551 app_port = req_rpc_data->app_port;
552
553 if (app_port != NULL && app_port->id == NXT_SHARED_PORT_ID) {
554 cancelled = nxt_app_queue_cancel(app_port->queue,
555 msg_info->tracking_cookie,
556 req_rpc_data->stream);
557
558 if (cancelled) {
559 nxt_debug(task, "stream #%uD: cancelled by router",
560 req_rpc_data->stream);
561 }
562
563 } else {
564 cancelled = 0;
565 }
566
567 for (b = msg_info->buf; b != NULL; b = next) {
568 next = b->next;
569 b->next = NULL;
570
571 if (b->is_port_mmap_sent) {
572 b->is_port_mmap_sent = cancelled == 0;
573 }
574
575 b->completion_handler(task, b, b->parent);
576 }
577
578 msg_info->buf = NULL;
579
580 return cancelled;
581 }
582
583
584 nxt_inline nxt_bool_t
nxt_queue_chk_remove(nxt_queue_link_t * lnk)585 nxt_queue_chk_remove(nxt_queue_link_t *lnk)
586 {
587 if (lnk->next != NULL) {
588 nxt_queue_remove(lnk);
589
590 lnk->next = NULL;
591
592 return 1;
593 }
594
595 return 0;
596 }
597
598
599 nxt_inline void
nxt_request_rpc_data_unlink(nxt_task_t * task,nxt_request_rpc_data_t * req_rpc_data)600 nxt_request_rpc_data_unlink(nxt_task_t *task,
601 nxt_request_rpc_data_t *req_rpc_data)
602 {
603 nxt_app_t *app;
604 nxt_bool_t unlinked;
605 nxt_http_request_t *r;
606
607 nxt_router_msg_cancel(task, req_rpc_data);
608
609 app = req_rpc_data->app;
610
611 if (req_rpc_data->app_port != NULL) {
612 nxt_router_app_port_release(task, app, req_rpc_data->app_port,
613 req_rpc_data->apr_action);
614
615 req_rpc_data->app_port = NULL;
616 }
617
618 r = req_rpc_data->request;
619
620 if (r != NULL) {
621 r->timer_data = NULL;
622
623 nxt_router_http_request_release_post(task, r);
624
625 r->req_rpc_data = NULL;
626 req_rpc_data->request = NULL;
627
628 if (app != NULL) {
629 unlinked = 0;
630
631 nxt_thread_mutex_lock(&app->mutex);
632
633 if (r->app_link.next != NULL) {
634 nxt_queue_remove(&r->app_link);
635 r->app_link.next = NULL;
636
637 unlinked = 1;
638 }
639
640 nxt_thread_mutex_unlock(&app->mutex);
641
642 if (unlinked) {
643 nxt_mp_release(r->mem_pool);
644 }
645 }
646 }
647
648 if (app != NULL) {
649 nxt_router_app_use(task, app, -1);
650
651 req_rpc_data->app = NULL;
652 }
653
654 if (req_rpc_data->msg_info.body_fd != -1) {
655 nxt_fd_close(req_rpc_data->msg_info.body_fd);
656
657 req_rpc_data->msg_info.body_fd = -1;
658 }
659
660 if (req_rpc_data->rpc_cancel) {
661 req_rpc_data->rpc_cancel = 0;
662
663 nxt_port_rpc_cancel(task, task->thread->engine->port,
664 req_rpc_data->stream);
665 }
666 }
667
668
669 static void
nxt_router_new_port_handler(nxt_task_t * task,nxt_port_recv_msg_t * msg)670 nxt_router_new_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
671 {
672 nxt_int_t res;
673 nxt_app_t *app;
674 nxt_port_t *port, *main_app_port;
675 nxt_runtime_t *rt;
676
677 nxt_port_new_port_handler(task, msg);
678
679 port = msg->u.new_port;
680
681 if (port != NULL && port->type == NXT_PROCESS_CONTROLLER) {
682 nxt_router_greet_controller(task, msg->u.new_port);
683 }
684
685 if (port != NULL && port->type == NXT_PROCESS_PROTOTYPE) {
686 nxt_port_rpc_handler(task, msg);
687
688 return;
689 }
690
691 if (port == NULL || port->type != NXT_PROCESS_APP) {
692
693 if (msg->port_msg.stream == 0) {
694 return;
695 }
696
697 msg->port_msg.type = _NXT_PORT_MSG_RPC_ERROR;
698
699 } else {
700 if (msg->fd[1] != -1) {
701 res = nxt_router_port_queue_map(task, port, msg->fd[1]);
702 if (nxt_slow_path(res != NXT_OK)) {
703 return;
704 }
705
706 nxt_fd_close(msg->fd[1]);
707 msg->fd[1] = -1;
708 }
709 }
710
711 if (msg->port_msg.stream != 0) {
712 nxt_port_rpc_handler(task, msg);
713 return;
714 }
715
716 nxt_debug(task, "new port id %d (%d)", port->id, port->type);
717
718 /*
719 * Port with "id == 0" is application 'main' port and it always
720 * should come with non-zero stream.
721 */
722 nxt_assert(port->id != 0);
723
724 /* Find 'main' app port and get app reference. */
725 rt = task->thread->runtime;
726
727 /*
728 * It is safe to access 'runtime->ports' hash because 'NEW_PORT'
729 * sent to main port (with id == 0) and processed in main thread.
730 */
731 main_app_port = nxt_port_hash_find(&rt->ports, port->pid, 0);
732 nxt_assert(main_app_port != NULL);
733
734 app = main_app_port->app;
735
736 if (nxt_fast_path(app != NULL)) {
737 nxt_thread_mutex_lock(&app->mutex);
738
739 /* TODO here should be find-and-add code because there can be
740 port waiters in port_hash */
741 nxt_port_hash_add(&app->port_hash, port);
742 app->port_hash_count++;
743
744 nxt_thread_mutex_unlock(&app->mutex);
745
746 port->app = app;
747 }
748
749 port->main_app_port = main_app_port;
750
751 nxt_port_socket_write(task, port, NXT_PORT_MSG_PORT_ACK, -1, 0, 0, NULL);
752 }
753
754
755 static void
nxt_router_conf_data_handler(nxt_task_t * task,nxt_port_recv_msg_t * msg)756 nxt_router_conf_data_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
757 {
758 void *p;
759 size_t size;
760 nxt_int_t ret;
761 nxt_port_t *port;
762 nxt_router_temp_conf_t *tmcf;
763
764 port = nxt_runtime_port_find(task->thread->runtime,
765 msg->port_msg.pid,
766 msg->port_msg.reply_port);
767 if (nxt_slow_path(port == NULL)) {
768 nxt_alert(task, "conf_data_handler: reply port not found");
769 return;
770 }
771
772 p = MAP_FAILED;
773
774 /*
775 * Ancient compilers like gcc 4.8.5 on CentOS 7 wants 'size' to be
776 * initialized in 'cleanup' section.
777 */
778 size = 0;
779
780 tmcf = nxt_router_temp_conf(task);
781 if (nxt_slow_path(tmcf == NULL)) {
782 goto fail;
783 }
784
785 if (nxt_slow_path(msg->fd[0] == -1)) {
786 nxt_alert(task, "conf_data_handler: invalid shm fd");
787 goto fail;
788 }
789
790 if (nxt_buf_mem_used_size(&msg->buf->mem) != sizeof(size_t)) {
791 nxt_alert(task, "conf_data_handler: unexpected buffer size (%d)",
792 (int) nxt_buf_mem_used_size(&msg->buf->mem));
793 goto fail;
794 }
795
796 nxt_memcpy(&size, msg->buf->mem.pos, sizeof(size_t));
797
798 p = nxt_mem_mmap(NULL, size, PROT_READ, MAP_SHARED, msg->fd[0], 0);
799
800 nxt_fd_close(msg->fd[0]);
801 msg->fd[0] = -1;
802
803 if (nxt_slow_path(p == MAP_FAILED)) {
804 goto fail;
805 }
806
807 nxt_debug(task, "conf_data_handler(%uz): %*s", size, size, p);
808
809 tmcf->router_conf->router = nxt_router;
810 tmcf->stream = msg->port_msg.stream;
811 tmcf->port = port;
812
813 nxt_port_use(task, tmcf->port, 1);
814
815 ret = nxt_router_conf_create(task, tmcf, p, nxt_pointer_to(p, size));
816
817 if (nxt_fast_path(ret == NXT_OK)) {
818 nxt_router_conf_apply(task, tmcf, NULL);
819
820 } else {
821 nxt_router_conf_error(task, tmcf);
822 }
823
824 goto cleanup;
825
826 fail:
827
828 nxt_port_socket_write(task, port, NXT_PORT_MSG_RPC_ERROR, -1,
829 msg->port_msg.stream, 0, NULL);
830
831 if (tmcf != NULL) {
832 nxt_mp_release(tmcf->mem_pool);
833 }
834
835 cleanup:
836
837 if (p != MAP_FAILED) {
838 nxt_mem_munmap(p, size);
839 }
840
841 if (msg->fd[0] != -1) {
842 nxt_fd_close(msg->fd[0]);
843 msg->fd[0] = -1;
844 }
845 }
846
847
848 static void
nxt_router_app_restart_handler(nxt_task_t * task,nxt_port_recv_msg_t * msg)849 nxt_router_app_restart_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
850 {
851 nxt_app_t *app;
852 nxt_int_t ret;
853 nxt_str_t app_name;
854 nxt_port_t *reply_port, *shared_port, *old_shared_port;
855 nxt_port_t *proto_port;
856 nxt_port_msg_type_t reply;
857
858 reply_port = nxt_runtime_port_find(task->thread->runtime,
859 msg->port_msg.pid,
860 msg->port_msg.reply_port);
861 if (nxt_slow_path(reply_port == NULL)) {
862 nxt_alert(task, "app_restart_handler: reply port not found");
863 return;
864 }
865
866 app_name.length = nxt_buf_mem_used_size(&msg->buf->mem);
867 app_name.start = msg->buf->mem.pos;
868
869 nxt_debug(task, "app_restart_handler: %V", &app_name);
870
871 app = nxt_router_app_find(&nxt_router->apps, &app_name);
872
873 if (nxt_fast_path(app != NULL)) {
874 shared_port = nxt_port_new(task, NXT_SHARED_PORT_ID, nxt_pid,
875 NXT_PROCESS_APP);
876 if (nxt_slow_path(shared_port == NULL)) {
877 goto fail;
878 }
879
880 ret = nxt_port_socket_init(task, shared_port, 0);
881 if (nxt_slow_path(ret != NXT_OK)) {
882 nxt_port_use(task, shared_port, -1);
883 goto fail;
884 }
885
886 ret = nxt_router_app_queue_init(task, shared_port);
887 if (nxt_slow_path(ret != NXT_OK)) {
888 nxt_port_write_close(shared_port);
889 nxt_port_read_close(shared_port);
890 nxt_port_use(task, shared_port, -1);
891 goto fail;
892 }
893
894 nxt_port_write_enable(task, shared_port);
895
896 nxt_thread_mutex_lock(&app->mutex);
897
898 proto_port = app->proto_port;
899
900 if (proto_port != NULL) {
901 nxt_debug(task, "send QUIT to prototype '%V' pid %PI", &app->name,
902 proto_port->pid);
903
904 app->proto_port = NULL;
905 proto_port->app = NULL;
906 }
907
908 app->generation++;
909
910 shared_port->app = app;
911
912 old_shared_port = app->shared_port;
913 old_shared_port->app = NULL;
914
915 app->shared_port = shared_port;
916
917 nxt_thread_mutex_unlock(&app->mutex);
918
919 nxt_port_close(task, old_shared_port);
920 nxt_port_use(task, old_shared_port, -1);
921
922 if (proto_port != NULL) {
923 (void) nxt_port_socket_write(task, proto_port, NXT_PORT_MSG_QUIT,
924 -1, 0, 0, NULL);
925
926 nxt_port_close(task, proto_port);
927
928 nxt_port_use(task, proto_port, -1);
929 }
930
931 reply = NXT_PORT_MSG_RPC_READY_LAST;
932
933 } else {
934
935 fail:
936
937 reply = NXT_PORT_MSG_RPC_ERROR;
938 }
939
940 nxt_port_socket_write(task, reply_port, reply, -1, msg->port_msg.stream,
941 0, NULL);
942 }
943
944
945 static void
nxt_router_status_handler(nxt_task_t * task,nxt_port_recv_msg_t * msg)946 nxt_router_status_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
947 {
948 u_char *p;
949 size_t alloc;
950 nxt_app_t *app;
951 nxt_buf_t *b;
952 nxt_uint_t type;
953 nxt_port_t *port;
954 nxt_status_app_t *app_stat;
955 nxt_event_engine_t *engine;
956 nxt_status_report_t *report;
957
958 port = nxt_runtime_port_find(task->thread->runtime,
959 msg->port_msg.pid,
960 msg->port_msg.reply_port);
961 if (nxt_slow_path(port == NULL)) {
962 nxt_alert(task, "nxt_router_status_handler(): reply port not found");
963 return;
964 }
965
966 alloc = sizeof(nxt_status_report_t);
967
968 nxt_queue_each(app, &nxt_router->apps, nxt_app_t, link) {
969
970 alloc += sizeof(nxt_status_app_t) + app->name.length;
971
972 } nxt_queue_loop;
973
974 b = nxt_buf_mem_alloc(port->mem_pool, alloc, 0);
975 if (nxt_slow_path(b == NULL)) {
976 type = NXT_PORT_MSG_RPC_ERROR;
977 goto fail;
978 }
979
980 report = (nxt_status_report_t *) b->mem.free;
981 b->mem.free = b->mem.end;
982
983 nxt_memzero(report, sizeof(nxt_status_report_t));
984
985 nxt_queue_each(engine, &nxt_router->engines, nxt_event_engine_t, link0) {
986
987 report->accepted_conns += engine->accepted_conns_cnt;
988 report->idle_conns += engine->idle_conns_cnt;
989 report->closed_conns += engine->closed_conns_cnt;
990 report->requests += engine->requests_cnt;
991
992 } nxt_queue_loop;
993
994 report->apps_count = 0;
995 app_stat = report->apps;
996 p = b->mem.end;
997
998 nxt_queue_each(app, &nxt_router->apps, nxt_app_t, link) {
999 p -= app->name.length;
1000
1001 nxt_memcpy(p, app->name.start, app->name.length);
1002
1003 app_stat->name.length = app->name.length;
1004 app_stat->name.start = (u_char *) (p - b->mem.pos);
1005
1006 app_stat->active_requests = app->active_requests;
1007 app_stat->pending_processes = app->pending_processes;
1008 app_stat->processes = app->processes;
1009 app_stat->idle_processes = app->idle_processes;
1010
1011 report->apps_count++;
1012 app_stat++;
1013 } nxt_queue_loop;
1014
1015 type = NXT_PORT_MSG_RPC_READY_LAST;
1016
1017 fail:
1018
1019 nxt_port_socket_write(task, port, type, -1, msg->port_msg.stream, 0, b);
1020 }
1021
1022
1023 static void
nxt_router_app_process_remove_pid(nxt_task_t * task,nxt_port_t * port,void * data)1024 nxt_router_app_process_remove_pid(nxt_task_t *task, nxt_port_t *port,
1025 void *data)
1026 {
1027 union {
1028 nxt_pid_t removed_pid;
1029 void *data;
1030 } u;
1031
1032 u.data = data;
1033
1034 nxt_port_rpc_remove_peer(task, port, u.removed_pid);
1035 }
1036
1037
1038 static void
nxt_router_remove_pid_handler(nxt_task_t * task,nxt_port_recv_msg_t * msg)1039 nxt_router_remove_pid_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
1040 {
1041 nxt_event_engine_t *engine;
1042
1043 nxt_port_remove_pid_handler(task, msg);
1044
1045 nxt_queue_each(engine, &nxt_router->engines, nxt_event_engine_t, link0)
1046 {
1047 if (nxt_fast_path(engine->port != NULL)) {
1048 nxt_port_post(task, engine->port, nxt_router_app_process_remove_pid,
1049 msg->u.data);
1050 }
1051 }
1052 nxt_queue_loop;
1053
1054 if (msg->port_msg.stream == 0) {
1055 return;
1056 }
1057
1058 msg->port_msg.type = _NXT_PORT_MSG_RPC_ERROR;
1059
1060 nxt_port_rpc_handler(task, msg);
1061 }
1062
1063
1064 static nxt_router_temp_conf_t *
nxt_router_temp_conf(nxt_task_t * task)1065 nxt_router_temp_conf(nxt_task_t *task)
1066 {
1067 nxt_mp_t *mp, *tmp;
1068 nxt_router_conf_t *rtcf;
1069 nxt_router_temp_conf_t *tmcf;
1070
1071 mp = nxt_mp_create(1024, 128, 256, 32);
1072 if (nxt_slow_path(mp == NULL)) {
1073 return NULL;
1074 }
1075
1076 rtcf = nxt_mp_zget(mp, sizeof(nxt_router_conf_t));
1077 if (nxt_slow_path(rtcf == NULL)) {
1078 goto fail;
1079 }
1080
1081 rtcf->mem_pool = mp;
1082
1083 rtcf->tstr_state = nxt_tstr_state_new(mp, 0);
1084 if (nxt_slow_path(rtcf->tstr_state == NULL)) {
1085 goto fail;
1086 }
1087
1088 #if (NXT_HAVE_NJS)
1089 nxt_http_register_js_proto(rtcf->tstr_state->jcf);
1090 #endif
1091
1092 tmp = nxt_mp_create(1024, 128, 256, 32);
1093 if (nxt_slow_path(tmp == NULL)) {
1094 goto fail;
1095 }
1096
1097 tmcf = nxt_mp_zget(tmp, sizeof(nxt_router_temp_conf_t));
1098 if (nxt_slow_path(tmcf == NULL)) {
1099 goto temp_fail;
1100 }
1101
1102 tmcf->mem_pool = tmp;
1103 tmcf->router_conf = rtcf;
1104 tmcf->count = 1;
1105 tmcf->engine = task->thread->engine;
1106
1107 tmcf->engines = nxt_array_create(tmcf->mem_pool, 4,
1108 sizeof(nxt_router_engine_conf_t));
1109 if (nxt_slow_path(tmcf->engines == NULL)) {
1110 goto temp_fail;
1111 }
1112
1113 nxt_queue_init(&creating_sockets);
1114 nxt_queue_init(&pending_sockets);
1115 nxt_queue_init(&updating_sockets);
1116 nxt_queue_init(&keeping_sockets);
1117 nxt_queue_init(&deleting_sockets);
1118
1119 #if (NXT_TLS)
1120 nxt_queue_init(&tmcf->tls);
1121 #endif
1122
1123 #if (NXT_HAVE_NJS)
1124 nxt_queue_init(&tmcf->js_modules);
1125 #endif
1126
1127 nxt_queue_init(&tmcf->apps);
1128 nxt_queue_init(&tmcf->previous);
1129
1130 return tmcf;
1131
1132 temp_fail:
1133
1134 nxt_mp_destroy(tmp);
1135
1136 fail:
1137
1138 if (rtcf->tstr_state != NULL) {
1139 nxt_tstr_state_release(rtcf->tstr_state);
1140 }
1141
1142 nxt_mp_destroy(mp);
1143
1144 return NULL;
1145 }
1146
1147
1148 nxt_inline nxt_bool_t
nxt_router_app_can_start(nxt_app_t * app)1149 nxt_router_app_can_start(nxt_app_t *app)
1150 {
1151 return app->processes + app->pending_processes < app->max_processes
1152 && app->pending_processes < app->max_pending_processes;
1153 }
1154
1155
1156 nxt_inline nxt_bool_t
nxt_router_app_need_start(nxt_app_t * app)1157 nxt_router_app_need_start(nxt_app_t *app)
1158 {
1159 return (app->active_requests
1160 > app->port_hash_count + app->pending_processes)
1161 || (app->spare_processes
1162 > app->idle_processes + app->pending_processes);
1163 }
1164
1165
1166 void
nxt_router_conf_apply(nxt_task_t * task,void * obj,void * data)1167 nxt_router_conf_apply(nxt_task_t *task, void *obj, void *data)
1168 {
1169 nxt_int_t ret;
1170 nxt_app_t *app;
1171 nxt_router_t *router;
1172 nxt_runtime_t *rt;
1173 nxt_queue_link_t *qlk;
1174 nxt_socket_conf_t *skcf;
1175 nxt_router_conf_t *rtcf;
1176 nxt_router_temp_conf_t *tmcf;
1177 const nxt_event_interface_t *interface;
1178 #if (NXT_TLS)
1179 nxt_router_tlssock_t *tls;
1180 #endif
1181 #if (NXT_HAVE_NJS)
1182 nxt_router_js_module_t *js_module;
1183 #endif
1184
1185 tmcf = obj;
1186
1187 qlk = nxt_queue_first(&pending_sockets);
1188
1189 if (qlk != nxt_queue_tail(&pending_sockets)) {
1190 nxt_queue_remove(qlk);
1191 nxt_queue_insert_tail(&creating_sockets, qlk);
1192
1193 skcf = nxt_queue_link_data(qlk, nxt_socket_conf_t, link);
1194
1195 nxt_router_listen_socket_rpc_create(task, tmcf, skcf);
1196
1197 return;
1198 }
1199
1200 #if (NXT_TLS)
1201 qlk = nxt_queue_last(&tmcf->tls);
1202
1203 if (qlk != nxt_queue_head(&tmcf->tls)) {
1204 nxt_queue_remove(qlk);
1205
1206 tls = nxt_queue_link_data(qlk, nxt_router_tlssock_t, link);
1207
1208 nxt_cert_store_get(task, &tls->name, tmcf->mem_pool,
1209 nxt_router_tls_rpc_handler, tls);
1210 return;
1211 }
1212 #endif
1213
1214 #if (NXT_HAVE_NJS)
1215 qlk = nxt_queue_last(&tmcf->js_modules);
1216
1217 if (qlk != nxt_queue_head(&tmcf->js_modules)) {
1218 nxt_queue_remove(qlk);
1219
1220 js_module = nxt_queue_link_data(qlk, nxt_router_js_module_t, link);
1221
1222 nxt_script_store_get(task, &js_module->name, tmcf->mem_pool,
1223 nxt_router_js_module_rpc_handler, js_module);
1224 return;
1225 }
1226 #endif
1227
1228 rtcf = tmcf->router_conf;
1229
1230 ret = nxt_tstr_state_done(rtcf->tstr_state, NULL);
1231 if (nxt_slow_path(ret != NXT_OK)) {
1232 goto fail;
1233 }
1234
1235 nxt_queue_each(app, &tmcf->apps, nxt_app_t, link) {
1236
1237 if (nxt_router_app_need_start(app)) {
1238 nxt_router_app_rpc_create(task, tmcf, app);
1239 return;
1240 }
1241
1242 } nxt_queue_loop;
1243
1244 if (rtcf->access_log != NULL && rtcf->access_log->fd == -1) {
1245 nxt_router_access_log_open(task, tmcf);
1246 return;
1247 }
1248
1249 rt = task->thread->runtime;
1250
1251 interface = nxt_service_get(rt->services, "engine", NULL);
1252
1253 router = rtcf->router;
1254
1255 ret = nxt_router_engines_create(task, router, tmcf, interface);
1256 if (nxt_slow_path(ret != NXT_OK)) {
1257 goto fail;
1258 }
1259
1260 ret = nxt_router_threads_create(task, rt, tmcf);
1261 if (nxt_slow_path(ret != NXT_OK)) {
1262 goto fail;
1263 }
1264
1265 nxt_router_apps_sort(task, router, tmcf);
1266
1267 nxt_router_apps_hash_use(task, rtcf, 1);
1268
1269 nxt_router_engines_post(router, tmcf);
1270
1271 nxt_queue_add(&router->sockets, &updating_sockets);
1272 nxt_queue_add(&router->sockets, &creating_sockets);
1273
1274 if (router->access_log != rtcf->access_log) {
1275 nxt_router_access_log_use(&router->lock, rtcf->access_log);
1276
1277 nxt_router_access_log_release(task, &router->lock, router->access_log);
1278
1279 router->access_log = rtcf->access_log;
1280 }
1281
1282 nxt_router_conf_ready(task, tmcf);
1283
1284 return;
1285
1286 fail:
1287
1288 nxt_router_conf_error(task, tmcf);
1289
1290 return;
1291 }
1292
1293
1294 static void
nxt_router_conf_wait(nxt_task_t * task,void * obj,void * data)1295 nxt_router_conf_wait(nxt_task_t *task, void *obj, void *data)
1296 {
1297 nxt_joint_job_t *job;
1298
1299 job = obj;
1300
1301 nxt_router_conf_ready(task, job->tmcf);
1302 }
1303
1304
1305 static void
nxt_router_conf_ready(nxt_task_t * task,nxt_router_temp_conf_t * tmcf)1306 nxt_router_conf_ready(nxt_task_t *task, nxt_router_temp_conf_t *tmcf)
1307 {
1308 uint32_t count;
1309 nxt_router_conf_t *rtcf;
1310 nxt_thread_spinlock_t *lock;
1311
1312 nxt_debug(task, "temp conf %p count: %D", tmcf, tmcf->count);
1313
1314 if (--tmcf->count > 0) {
1315 return;
1316 }
1317
1318 nxt_router_conf_send(task, tmcf, NXT_PORT_MSG_RPC_READY_LAST);
1319
1320 rtcf = tmcf->router_conf;
1321
1322 lock = &rtcf->router->lock;
1323
1324 nxt_thread_spin_lock(lock);
1325
1326 count = rtcf->count;
1327
1328 nxt_thread_spin_unlock(lock);
1329
1330 nxt_debug(task, "rtcf %p: %D", rtcf, count);
1331
1332 if (count == 0) {
1333 nxt_router_apps_hash_use(task, rtcf, -1);
1334
1335 nxt_router_access_log_release(task, lock, rtcf->access_log);
1336
1337 nxt_mp_destroy(rtcf->mem_pool);
1338 }
1339
1340 nxt_mp_release(tmcf->mem_pool);
1341 }
1342
1343
1344 void
nxt_router_conf_error(nxt_task_t * task,nxt_router_temp_conf_t * tmcf)1345 nxt_router_conf_error(nxt_task_t *task, nxt_router_temp_conf_t *tmcf)
1346 {
1347 nxt_app_t *app;
1348 nxt_socket_t s;
1349 nxt_router_t *router;
1350 nxt_queue_link_t *qlk;
1351 nxt_socket_conf_t *skcf;
1352 nxt_router_conf_t *rtcf;
1353
1354 nxt_alert(task, "failed to apply new conf");
1355
1356 for (qlk = nxt_queue_first(&creating_sockets);
1357 qlk != nxt_queue_tail(&creating_sockets);
1358 qlk = nxt_queue_next(qlk))
1359 {
1360 skcf = nxt_queue_link_data(qlk, nxt_socket_conf_t, link);
1361 s = skcf->listen->socket;
1362
1363 if (s != -1) {
1364 nxt_socket_close(task, s);
1365 }
1366
1367 nxt_free(skcf->listen);
1368 }
1369
1370 rtcf = tmcf->router_conf;
1371
1372 nxt_queue_each(app, &tmcf->apps, nxt_app_t, link) {
1373
1374 nxt_router_app_unlink(task, app);
1375
1376 } nxt_queue_loop;
1377
1378 router = rtcf->router;
1379
1380 nxt_queue_add(&router->sockets, &keeping_sockets);
1381 nxt_queue_add(&router->sockets, &deleting_sockets);
1382
1383 nxt_queue_add(&router->apps, &tmcf->previous);
1384
1385 // TODO: new engines and threads
1386
1387 nxt_router_access_log_release(task, &router->lock, rtcf->access_log);
1388
1389 nxt_mp_destroy(rtcf->mem_pool);
1390
1391 nxt_router_conf_send(task, tmcf, NXT_PORT_MSG_RPC_ERROR);
1392
1393 nxt_mp_release(tmcf->mem_pool);
1394 }
1395
1396
1397 static void
nxt_router_conf_send(nxt_task_t * task,nxt_router_temp_conf_t * tmcf,nxt_port_msg_type_t type)1398 nxt_router_conf_send(nxt_task_t *task, nxt_router_temp_conf_t *tmcf,
1399 nxt_port_msg_type_t type)
1400 {
1401 nxt_port_socket_write(task, tmcf->port, type, -1, tmcf->stream, 0, NULL);
1402
1403 nxt_port_use(task, tmcf->port, -1);
1404
1405 tmcf->port = NULL;
1406 }
1407
1408
1409 static nxt_conf_map_t nxt_router_conf[] = {
1410 {
1411 nxt_string("listeners_threads"),
1412 NXT_CONF_MAP_INT32,
1413 offsetof(nxt_router_conf_t, threads),
1414 },
1415 };
1416
1417
1418 static nxt_conf_map_t nxt_router_app_conf[] = {
1419 {
1420 nxt_string("type"),
1421 NXT_CONF_MAP_STR,
1422 offsetof(nxt_router_app_conf_t, type),
1423 },
1424
1425 {
1426 nxt_string("limits"),
1427 NXT_CONF_MAP_PTR,
1428 offsetof(nxt_router_app_conf_t, limits_value),
1429 },
1430
1431 {
1432 nxt_string("processes"),
1433 NXT_CONF_MAP_INT32,
1434 offsetof(nxt_router_app_conf_t, processes),
1435 },
1436
1437 {
1438 nxt_string("processes"),
1439 NXT_CONF_MAP_PTR,
1440 offsetof(nxt_router_app_conf_t, processes_value),
1441 },
1442
1443 {
1444 nxt_string("targets"),
1445 NXT_CONF_MAP_PTR,
1446 offsetof(nxt_router_app_conf_t, targets_value),
1447 },
1448 };
1449
1450
1451 static nxt_conf_map_t nxt_router_app_limits_conf[] = {
1452 {
1453 nxt_string("timeout"),
1454 NXT_CONF_MAP_MSEC,
1455 offsetof(nxt_router_app_conf_t, timeout),
1456 },
1457 };
1458
1459
1460 static nxt_conf_map_t nxt_router_app_processes_conf[] = {
1461 {
1462 nxt_string("spare"),
1463 NXT_CONF_MAP_INT32,
1464 offsetof(nxt_router_app_conf_t, spare_processes),
1465 },
1466
1467 {
1468 nxt_string("max"),
1469 NXT_CONF_MAP_INT32,
1470 offsetof(nxt_router_app_conf_t, max_processes),
1471 },
1472
1473 {
1474 nxt_string("idle_timeout"),
1475 NXT_CONF_MAP_MSEC,
1476 offsetof(nxt_router_app_conf_t, idle_timeout),
1477 },
1478 };
1479
1480
1481 static nxt_conf_map_t nxt_router_listener_conf[] = {
1482 {
1483 nxt_string("pass"),
1484 NXT_CONF_MAP_STR_COPY,
1485 offsetof(nxt_router_listener_conf_t, pass),
1486 },
1487
1488 {
1489 nxt_string("application"),
1490 NXT_CONF_MAP_STR_COPY,
1491 offsetof(nxt_router_listener_conf_t, application),
1492 },
1493 };
1494
1495
1496 static nxt_conf_map_t nxt_router_http_conf[] = {
1497 {
1498 nxt_string("header_buffer_size"),
1499 NXT_CONF_MAP_SIZE,
1500 offsetof(nxt_socket_conf_t, header_buffer_size),
1501 },
1502
1503 {
1504 nxt_string("large_header_buffer_size"),
1505 NXT_CONF_MAP_SIZE,
1506 offsetof(nxt_socket_conf_t, large_header_buffer_size),
1507 },
1508
1509 {
1510 nxt_string("large_header_buffers"),
1511 NXT_CONF_MAP_SIZE,
1512 offsetof(nxt_socket_conf_t, large_header_buffers),
1513 },
1514
1515 {
1516 nxt_string("body_buffer_size"),
1517 NXT_CONF_MAP_SIZE,
1518 offsetof(nxt_socket_conf_t, body_buffer_size),
1519 },
1520
1521 {
1522 nxt_string("max_body_size"),
1523 NXT_CONF_MAP_SIZE,
1524 offsetof(nxt_socket_conf_t, max_body_size),
1525 },
1526
1527 {
1528 nxt_string("idle_timeout"),
1529 NXT_CONF_MAP_MSEC,
1530 offsetof(nxt_socket_conf_t, idle_timeout),
1531 },
1532
1533 {
1534 nxt_string("header_read_timeout"),
1535 NXT_CONF_MAP_MSEC,
1536 offsetof(nxt_socket_conf_t, header_read_timeout),
1537 },
1538
1539 {
1540 nxt_string("body_read_timeout"),
1541 NXT_CONF_MAP_MSEC,
1542 offsetof(nxt_socket_conf_t, body_read_timeout),
1543 },
1544
1545 {
1546 nxt_string("send_timeout"),
1547 NXT_CONF_MAP_MSEC,
1548 offsetof(nxt_socket_conf_t, send_timeout),
1549 },
1550
1551 {
1552 nxt_string("body_temp_path"),
1553 NXT_CONF_MAP_STR,
1554 offsetof(nxt_socket_conf_t, body_temp_path),
1555 },
1556
1557 {
1558 nxt_string("discard_unsafe_fields"),
1559 NXT_CONF_MAP_INT8,
1560 offsetof(nxt_socket_conf_t, discard_unsafe_fields),
1561 },
1562
1563 {
1564 nxt_string("log_route"),
1565 NXT_CONF_MAP_INT8,
1566 offsetof(nxt_socket_conf_t, log_route),
1567 },
1568
1569 {
1570 nxt_string("server_version"),
1571 NXT_CONF_MAP_INT8,
1572 offsetof(nxt_socket_conf_t, server_version),
1573 },
1574 };
1575
1576
1577 static nxt_conf_map_t nxt_router_websocket_conf[] = {
1578 {
1579 nxt_string("max_frame_size"),
1580 NXT_CONF_MAP_SIZE,
1581 offsetof(nxt_websocket_conf_t, max_frame_size),
1582 },
1583
1584 {
1585 nxt_string("read_timeout"),
1586 NXT_CONF_MAP_MSEC,
1587 offsetof(nxt_websocket_conf_t, read_timeout),
1588 },
1589
1590 {
1591 nxt_string("keepalive_interval"),
1592 NXT_CONF_MAP_MSEC,
1593 offsetof(nxt_websocket_conf_t, keepalive_interval),
1594 },
1595
1596 };
1597
1598
1599 static nxt_int_t
nxt_router_conf_create(nxt_task_t * task,nxt_router_temp_conf_t * tmcf,u_char * start,u_char * end)1600 nxt_router_conf_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf,
1601 u_char *start, u_char *end)
1602 {
1603 u_char *p;
1604 size_t size;
1605 nxt_mp_t *mp, *app_mp;
1606 uint32_t next, next_target;
1607 nxt_int_t ret;
1608 nxt_str_t name, target;
1609 nxt_app_t *app, *prev;
1610 nxt_str_t *t, *s, *targets;
1611 nxt_uint_t n, i;
1612 nxt_port_t *port;
1613 nxt_router_t *router;
1614 nxt_app_joint_t *app_joint;
1615 #if (NXT_TLS)
1616 nxt_tls_init_t *tls_init;
1617 nxt_conf_value_t *certificate;
1618 #endif
1619 #if (NXT_HAVE_NJS)
1620 nxt_conf_value_t *js_module;
1621 #endif
1622 nxt_conf_value_t *root, *conf, *http, *value, *websocket;
1623 nxt_conf_value_t *applications, *application;
1624 nxt_conf_value_t *listeners, *listener;
1625 nxt_socket_conf_t *skcf;
1626 nxt_router_conf_t *rtcf;
1627 nxt_http_routes_t *routes;
1628 nxt_event_engine_t *engine;
1629 nxt_app_lang_module_t *lang;
1630 nxt_router_app_conf_t apcf;
1631 nxt_router_listener_conf_t lscf;
1632
1633 static nxt_str_t http_path = nxt_string("/settings/http");
1634 static nxt_str_t applications_path = nxt_string("/applications");
1635 static nxt_str_t listeners_path = nxt_string("/listeners");
1636 static nxt_str_t routes_path = nxt_string("/routes");
1637 static nxt_str_t access_log_path = nxt_string("/access_log");
1638 #if (NXT_TLS)
1639 static nxt_str_t certificate_path = nxt_string("/tls/certificate");
1640 static nxt_str_t conf_commands_path = nxt_string("/tls/conf_commands");
1641 static nxt_str_t conf_cache_path = nxt_string("/tls/session/cache_size");
1642 static nxt_str_t conf_timeout_path = nxt_string("/tls/session/timeout");
1643 static nxt_str_t conf_tickets = nxt_string("/tls/session/tickets");
1644 #endif
1645 #if (NXT_HAVE_NJS)
1646 static nxt_str_t js_module_path = nxt_string("/settings/js_module");
1647 #endif
1648 static nxt_str_t static_path = nxt_string("/settings/http/static");
1649 static nxt_str_t websocket_path = nxt_string("/settings/http/websocket");
1650 static nxt_str_t forwarded_path = nxt_string("/forwarded");
1651 static nxt_str_t client_ip_path = nxt_string("/client_ip");
1652
1653 root = nxt_conf_json_parse(tmcf->mem_pool, start, end, NULL);
1654 if (root == NULL) {
1655 nxt_alert(task, "configuration parsing error");
1656 return NXT_ERROR;
1657 }
1658
1659 rtcf = tmcf->router_conf;
1660 mp = rtcf->mem_pool;
1661
1662 ret = nxt_conf_map_object(mp, root, nxt_router_conf,
1663 nxt_nitems(nxt_router_conf), rtcf);
1664 if (ret != NXT_OK) {
1665 nxt_alert(task, "root map error");
1666 return NXT_ERROR;
1667 }
1668
1669 if (rtcf->threads == 0) {
1670 rtcf->threads = nxt_ncpu;
1671 }
1672
1673 conf = nxt_conf_get_path(root, &static_path);
1674
1675 ret = nxt_router_conf_process_static(task, rtcf, conf);
1676 if (nxt_slow_path(ret != NXT_OK)) {
1677 return NXT_ERROR;
1678 }
1679
1680 router = rtcf->router;
1681
1682 applications = nxt_conf_get_path(root, &applications_path);
1683
1684 if (applications != NULL) {
1685 next = 0;
1686
1687 for ( ;; ) {
1688 application = nxt_conf_next_object_member(applications,
1689 &name, &next);
1690 if (application == NULL) {
1691 break;
1692 }
1693
1694 nxt_debug(task, "application \"%V\"", &name);
1695
1696 size = nxt_conf_json_length(application, NULL);
1697
1698 app_mp = nxt_mp_create(4096, 128, 1024, 64);
1699 if (nxt_slow_path(app_mp == NULL)) {
1700 goto fail;
1701 }
1702
1703 app = nxt_mp_get(app_mp, sizeof(nxt_app_t) + name.length + size);
1704 if (app == NULL) {
1705 goto app_fail;
1706 }
1707
1708 nxt_memzero(app, sizeof(nxt_app_t));
1709
1710 app->mem_pool = app_mp;
1711
1712 app->name.start = nxt_pointer_to(app, sizeof(nxt_app_t));
1713 app->conf.start = nxt_pointer_to(app, sizeof(nxt_app_t)
1714 + name.length);
1715
1716 p = nxt_conf_json_print(app->conf.start, application, NULL);
1717 app->conf.length = p - app->conf.start;
1718
1719 nxt_assert(app->conf.length <= size);
1720
1721 nxt_debug(task, "application conf \"%V\"", &app->conf);
1722
1723 prev = nxt_router_app_find(&router->apps, &name);
1724
1725 if (prev != NULL && nxt_strstr_eq(&app->conf, &prev->conf)) {
1726 nxt_mp_destroy(app_mp);
1727
1728 nxt_queue_remove(&prev->link);
1729 nxt_queue_insert_tail(&tmcf->previous, &prev->link);
1730
1731 ret = nxt_router_apps_hash_add(rtcf, prev);
1732 if (nxt_slow_path(ret != NXT_OK)) {
1733 goto fail;
1734 }
1735
1736 continue;
1737 }
1738
1739 apcf.processes = 1;
1740 apcf.max_processes = 1;
1741 apcf.spare_processes = 0;
1742 apcf.timeout = 0;
1743 apcf.idle_timeout = 15000;
1744 apcf.limits_value = NULL;
1745 apcf.processes_value = NULL;
1746 apcf.targets_value = NULL;
1747
1748 app_joint = nxt_malloc(sizeof(nxt_app_joint_t));
1749 if (nxt_slow_path(app_joint == NULL)) {
1750 goto app_fail;
1751 }
1752
1753 nxt_memzero(app_joint, sizeof(nxt_app_joint_t));
1754
1755 ret = nxt_conf_map_object(mp, application, nxt_router_app_conf,
1756 nxt_nitems(nxt_router_app_conf), &apcf);
1757 if (ret != NXT_OK) {
1758 nxt_alert(task, "application map error");
1759 goto app_fail;
1760 }
1761
1762 if (apcf.limits_value != NULL) {
1763
1764 if (nxt_conf_type(apcf.limits_value) != NXT_CONF_OBJECT) {
1765 nxt_alert(task, "application limits is not object");
1766 goto app_fail;
1767 }
1768
1769 ret = nxt_conf_map_object(mp, apcf.limits_value,
1770 nxt_router_app_limits_conf,
1771 nxt_nitems(nxt_router_app_limits_conf),
1772 &apcf);
1773 if (ret != NXT_OK) {
1774 nxt_alert(task, "application limits map error");
1775 goto app_fail;
1776 }
1777 }
1778
1779 if (apcf.processes_value != NULL
1780 && nxt_conf_type(apcf.processes_value) == NXT_CONF_OBJECT)
1781 {
1782 ret = nxt_conf_map_object(mp, apcf.processes_value,
1783 nxt_router_app_processes_conf,
1784 nxt_nitems(nxt_router_app_processes_conf),
1785 &apcf);
1786 if (ret != NXT_OK) {
1787 nxt_alert(task, "application processes map error");
1788 goto app_fail;
1789 }
1790
1791 } else {
1792 apcf.max_processes = apcf.processes;
1793 apcf.spare_processes = apcf.processes;
1794 }
1795
1796 if (apcf.targets_value != NULL) {
1797 n = nxt_conf_object_members_count(apcf.targets_value);
1798
1799 targets = nxt_mp_get(app_mp, sizeof(nxt_str_t) * n);
1800 if (nxt_slow_path(targets == NULL)) {
1801 goto app_fail;
1802 }
1803
1804 next_target = 0;
1805
1806 for (i = 0; i < n; i++) {
1807 (void) nxt_conf_next_object_member(apcf.targets_value,
1808 &target, &next_target);
1809
1810 s = nxt_str_dup(app_mp, &targets[i], &target);
1811 if (nxt_slow_path(s == NULL)) {
1812 goto app_fail;
1813 }
1814 }
1815
1816 } else {
1817 targets = NULL;
1818 }
1819
1820 nxt_debug(task, "application type: %V", &apcf.type);
1821 nxt_debug(task, "application processes: %D", apcf.processes);
1822 nxt_debug(task, "application request timeout: %M", apcf.timeout);
1823
1824 lang = nxt_app_lang_module(task->thread->runtime, &apcf.type);
1825
1826 if (lang == NULL) {
1827 nxt_alert(task, "unknown application type: \"%V\"", &apcf.type);
1828 goto app_fail;
1829 }
1830
1831 nxt_debug(task, "application language module: \"%s\"", lang->file);
1832
1833 ret = nxt_thread_mutex_create(&app->mutex);
1834 if (ret != NXT_OK) {
1835 goto app_fail;
1836 }
1837
1838 nxt_queue_init(&app->ports);
1839 nxt_queue_init(&app->spare_ports);
1840 nxt_queue_init(&app->idle_ports);
1841 nxt_queue_init(&app->ack_waiting_req);
1842
1843 app->name.length = name.length;
1844 nxt_memcpy(app->name.start, name.start, name.length);
1845
1846 app->type = lang->type;
1847 app->max_processes = apcf.max_processes;
1848 app->spare_processes = apcf.spare_processes;
1849 app->max_pending_processes = apcf.spare_processes
1850 ? apcf.spare_processes : 1;
1851 app->timeout = apcf.timeout;
1852 app->idle_timeout = apcf.idle_timeout;
1853
1854 app->targets = targets;
1855
1856 engine = task->thread->engine;
1857
1858 app->engine = engine;
1859
1860 app->adjust_idle_work.handler = nxt_router_adjust_idle_timer;
1861 app->adjust_idle_work.task = &engine->task;
1862 app->adjust_idle_work.obj = app;
1863
1864 nxt_queue_insert_tail(&tmcf->apps, &app->link);
1865
1866 ret = nxt_router_apps_hash_add(rtcf, app);
1867 if (nxt_slow_path(ret != NXT_OK)) {
1868 goto app_fail;
1869 }
1870
1871 nxt_router_app_use(task, app, 1);
1872
1873 app->joint = app_joint;
1874
1875 app_joint->use_count = 1;
1876 app_joint->app = app;
1877
1878 app_joint->idle_timer.bias = NXT_TIMER_DEFAULT_BIAS;
1879 app_joint->idle_timer.work_queue = &engine->fast_work_queue;
1880 app_joint->idle_timer.handler = nxt_router_app_idle_timeout;
1881 app_joint->idle_timer.task = &engine->task;
1882 app_joint->idle_timer.log = app_joint->idle_timer.task->log;
1883
1884 app_joint->free_app_work.handler = nxt_router_free_app;
1885 app_joint->free_app_work.task = &engine->task;
1886 app_joint->free_app_work.obj = app_joint;
1887
1888 port = nxt_port_new(task, NXT_SHARED_PORT_ID, nxt_pid,
1889 NXT_PROCESS_APP);
1890 if (nxt_slow_path(port == NULL)) {
1891 return NXT_ERROR;
1892 }
1893
1894 ret = nxt_port_socket_init(task, port, 0);
1895 if (nxt_slow_path(ret != NXT_OK)) {
1896 nxt_port_use(task, port, -1);
1897 return NXT_ERROR;
1898 }
1899
1900 ret = nxt_router_app_queue_init(task, port);
1901 if (nxt_slow_path(ret != NXT_OK)) {
1902 nxt_port_write_close(port);
1903 nxt_port_read_close(port);
1904 nxt_port_use(task, port, -1);
1905 return NXT_ERROR;
1906 }
1907
1908 nxt_port_write_enable(task, port);
1909 port->app = app;
1910
1911 app->shared_port = port;
1912
1913 nxt_thread_mutex_create(&app->outgoing.mutex);
1914 }
1915 }
1916
1917 conf = nxt_conf_get_path(root, &routes_path);
1918 if (nxt_fast_path(conf != NULL)) {
1919 routes = nxt_http_routes_create(task, tmcf, conf);
1920 if (nxt_slow_path(routes == NULL)) {
1921 return NXT_ERROR;
1922 }
1923
1924 rtcf->routes = routes;
1925 }
1926
1927 ret = nxt_upstreams_create(task, tmcf, root);
1928 if (nxt_slow_path(ret != NXT_OK)) {
1929 return ret;
1930 }
1931
1932 http = nxt_conf_get_path(root, &http_path);
1933 #if 0
1934 if (http == NULL) {
1935 nxt_alert(task, "no \"http\" block");
1936 return NXT_ERROR;
1937 }
1938 #endif
1939
1940 websocket = nxt_conf_get_path(root, &websocket_path);
1941
1942 listeners = nxt_conf_get_path(root, &listeners_path);
1943
1944 if (listeners != NULL) {
1945 next = 0;
1946
1947 for ( ;; ) {
1948 listener = nxt_conf_next_object_member(listeners, &name, &next);
1949 if (listener == NULL) {
1950 break;
1951 }
1952
1953 skcf = nxt_router_socket_conf(task, tmcf, &name);
1954 if (skcf == NULL) {
1955 goto fail;
1956 }
1957
1958 nxt_memzero(&lscf, sizeof(lscf));
1959
1960 ret = nxt_conf_map_object(mp, listener, nxt_router_listener_conf,
1961 nxt_nitems(nxt_router_listener_conf),
1962 &lscf);
1963 if (ret != NXT_OK) {
1964 nxt_alert(task, "listener map error");
1965 goto fail;
1966 }
1967
1968 nxt_debug(task, "application: %V", &lscf.application);
1969
1970 // STUB, default values if http block is not defined.
1971 skcf->header_buffer_size = 2048;
1972 skcf->large_header_buffer_size = 8192;
1973 skcf->large_header_buffers = 4;
1974 skcf->discard_unsafe_fields = 1;
1975 skcf->body_buffer_size = 16 * 1024;
1976 skcf->max_body_size = 8 * 1024 * 1024;
1977 skcf->proxy_header_buffer_size = 64 * 1024;
1978 skcf->proxy_buffer_size = 4096;
1979 skcf->proxy_buffers = 256;
1980 skcf->idle_timeout = 180 * 1000;
1981 skcf->header_read_timeout = 30 * 1000;
1982 skcf->body_read_timeout = 30 * 1000;
1983 skcf->send_timeout = 30 * 1000;
1984 skcf->proxy_timeout = 60 * 1000;
1985 skcf->proxy_send_timeout = 30 * 1000;
1986 skcf->proxy_read_timeout = 30 * 1000;
1987
1988 skcf->server_version = 1;
1989
1990 skcf->websocket_conf.max_frame_size = 1024 * 1024;
1991 skcf->websocket_conf.read_timeout = 60 * 1000;
1992 skcf->websocket_conf.keepalive_interval = 30 * 1000;
1993
1994 nxt_str_null(&skcf->body_temp_path);
1995
1996 if (http != NULL) {
1997 ret = nxt_conf_map_object(mp, http, nxt_router_http_conf,
1998 nxt_nitems(nxt_router_http_conf),
1999 skcf);
2000 if (ret != NXT_OK) {
2001 nxt_alert(task, "http map error");
2002 goto fail;
2003 }
2004 }
2005
2006 if (websocket != NULL) {
2007 ret = nxt_conf_map_object(mp, websocket,
2008 nxt_router_websocket_conf,
2009 nxt_nitems(nxt_router_websocket_conf),
2010 &skcf->websocket_conf);
2011 if (ret != NXT_OK) {
2012 nxt_alert(task, "websocket map error");
2013 goto fail;
2014 }
2015 }
2016
2017 t = &skcf->body_temp_path;
2018
2019 if (t->length == 0) {
2020 t->start = (u_char *) task->thread->runtime->tmp;
2021 t->length = nxt_strlen(t->start);
2022 }
2023
2024 conf = nxt_conf_get_path(listener, &forwarded_path);
2025
2026 if (conf != NULL) {
2027 skcf->forwarded = nxt_router_conf_forward(task, mp, conf);
2028 if (nxt_slow_path(skcf->forwarded == NULL)) {
2029 return NXT_ERROR;
2030 }
2031 }
2032
2033 conf = nxt_conf_get_path(listener, &client_ip_path);
2034
2035 if (conf != NULL) {
2036 skcf->client_ip = nxt_router_conf_forward(task, mp, conf);
2037 if (nxt_slow_path(skcf->client_ip == NULL)) {
2038 return NXT_ERROR;
2039 }
2040 }
2041
2042 #if (NXT_TLS)
2043 certificate = nxt_conf_get_path(listener, &certificate_path);
2044
2045 if (certificate != NULL) {
2046 tls_init = nxt_mp_get(tmcf->mem_pool, sizeof(nxt_tls_init_t));
2047 if (nxt_slow_path(tls_init == NULL)) {
2048 return NXT_ERROR;
2049 }
2050
2051 tls_init->cache_size = 0;
2052 tls_init->timeout = 300;
2053
2054 value = nxt_conf_get_path(listener, &conf_cache_path);
2055 if (value != NULL) {
2056 tls_init->cache_size = nxt_conf_get_number(value);
2057 }
2058
2059 value = nxt_conf_get_path(listener, &conf_timeout_path);
2060 if (value != NULL) {
2061 tls_init->timeout = nxt_conf_get_number(value);
2062 }
2063
2064 tls_init->conf_cmds = nxt_conf_get_path(listener,
2065 &conf_commands_path);
2066
2067 tls_init->tickets_conf = nxt_conf_get_path(listener,
2068 &conf_tickets);
2069
2070 n = nxt_conf_array_elements_count_or_1(certificate);
2071
2072 for (i = 0; i < n; i++) {
2073 value = nxt_conf_get_array_element_or_itself(certificate,
2074 i);
2075 nxt_assert(value != NULL);
2076
2077 ret = nxt_router_conf_tls_insert(tmcf, value, skcf,
2078 tls_init, i == 0);
2079 if (nxt_slow_path(ret != NXT_OK)) {
2080 goto fail;
2081 }
2082 }
2083 }
2084 #endif
2085
2086 skcf->listen->handler = nxt_http_conn_init;
2087 skcf->router_conf = rtcf;
2088 skcf->router_conf->count++;
2089
2090 if (lscf.pass.length != 0) {
2091 skcf->action = nxt_http_action_create(task, tmcf, &lscf.pass);
2092
2093 /* COMPATIBILITY: listener application. */
2094 } else if (lscf.application.length > 0) {
2095 skcf->action = nxt_http_pass_application(task, rtcf,
2096 &lscf.application);
2097 }
2098
2099 if (nxt_slow_path(skcf->action == NULL)) {
2100 goto fail;
2101 }
2102 }
2103 }
2104
2105 ret = nxt_http_routes_resolve(task, tmcf);
2106 if (nxt_slow_path(ret != NXT_OK)) {
2107 goto fail;
2108 }
2109
2110 value = nxt_conf_get_path(root, &access_log_path);
2111
2112 if (value != NULL) {
2113 ret = nxt_router_access_log_create(task, rtcf, value);
2114 if (nxt_slow_path(ret != NXT_OK)) {
2115 goto fail;
2116 }
2117 }
2118
2119 #if (NXT_HAVE_NJS)
2120 js_module = nxt_conf_get_path(root, &js_module_path);
2121
2122 if (js_module != NULL) {
2123 if (nxt_conf_type(js_module) == NXT_CONF_ARRAY) {
2124 n = nxt_conf_array_elements_count(js_module);
2125
2126 for (i = 0; i < n; i++) {
2127 value = nxt_conf_get_array_element(js_module, i);
2128
2129 ret = nxt_router_js_module_insert(tmcf, value);
2130 if (nxt_slow_path(ret != NXT_OK)) {
2131 goto fail;
2132 }
2133 }
2134
2135 } else {
2136 /* NXT_CONF_STRING */
2137
2138 ret = nxt_router_js_module_insert(tmcf, js_module);
2139 if (nxt_slow_path(ret != NXT_OK)) {
2140 goto fail;
2141 }
2142 }
2143 }
2144
2145 #endif
2146
2147 nxt_queue_add(&deleting_sockets, &router->sockets);
2148 nxt_queue_init(&router->sockets);
2149
2150 return NXT_OK;
2151
2152 app_fail:
2153
2154 nxt_mp_destroy(app_mp);
2155
2156 fail:
2157
2158 nxt_queue_each(app, &tmcf->apps, nxt_app_t, link) {
2159
2160 nxt_queue_remove(&app->link);
2161 nxt_thread_mutex_destroy(&app->mutex);
2162 nxt_mp_destroy(app->mem_pool);
2163
2164 } nxt_queue_loop;
2165
2166 return NXT_ERROR;
2167 }
2168
2169
2170 #if (NXT_TLS)
2171
2172 static nxt_int_t
nxt_router_conf_tls_insert(nxt_router_temp_conf_t * tmcf,nxt_conf_value_t * value,nxt_socket_conf_t * skcf,nxt_tls_init_t * tls_init,nxt_bool_t last)2173 nxt_router_conf_tls_insert(nxt_router_temp_conf_t *tmcf,
2174 nxt_conf_value_t *value, nxt_socket_conf_t *skcf,
2175 nxt_tls_init_t *tls_init, nxt_bool_t last)
2176 {
2177 nxt_router_tlssock_t *tls;
2178
2179 tls = nxt_mp_get(tmcf->mem_pool, sizeof(nxt_router_tlssock_t));
2180 if (nxt_slow_path(tls == NULL)) {
2181 return NXT_ERROR;
2182 }
2183
2184 tls->tls_init = tls_init;
2185 tls->socket_conf = skcf;
2186 tls->temp_conf = tmcf;
2187 tls->last = last;
2188 nxt_conf_get_string(value, &tls->name);
2189
2190 nxt_queue_insert_tail(&tmcf->tls, &tls->link);
2191
2192 return NXT_OK;
2193 }
2194
2195 #endif
2196
2197
2198 #if (NXT_HAVE_NJS)
2199
2200 static void
nxt_router_js_module_rpc_handler(nxt_task_t * task,nxt_port_recv_msg_t * msg,void * data)2201 nxt_router_js_module_rpc_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg,
2202 void *data)
2203 {
2204 nxt_int_t ret;
2205 nxt_str_t text;
2206 nxt_router_conf_t *rtcf;
2207 nxt_router_temp_conf_t *tmcf;
2208 nxt_router_js_module_t *js_module;
2209
2210 nxt_debug(task, "auto module rpc handler");
2211
2212 js_module = data;
2213 tmcf = js_module->temp_conf;
2214
2215 if (msg == NULL || msg->port_msg.type == _NXT_PORT_MSG_RPC_ERROR) {
2216 goto fail;
2217 }
2218
2219 rtcf = tmcf->router_conf;
2220
2221 ret = nxt_script_file_read(msg->fd[0], &text);
2222
2223 nxt_fd_close(msg->fd[0]);
2224
2225 if (nxt_slow_path(ret == NXT_ERROR)) {
2226 goto fail;
2227 }
2228
2229 if (text.length > 0) {
2230 ret = nxt_js_add_module(rtcf->tstr_state->jcf, &js_module->name, &text);
2231
2232 nxt_free(text.start);
2233
2234 if (nxt_slow_path(ret == NXT_ERROR)) {
2235 goto fail;
2236 }
2237 }
2238
2239 nxt_work_queue_add(&task->thread->engine->fast_work_queue,
2240 nxt_router_conf_apply, task, tmcf, NULL);
2241 return;
2242
2243 fail:
2244
2245 nxt_router_conf_error(task, tmcf);
2246 }
2247
2248
2249 static nxt_int_t
nxt_router_js_module_insert(nxt_router_temp_conf_t * tmcf,nxt_conf_value_t * value)2250 nxt_router_js_module_insert(nxt_router_temp_conf_t *tmcf,
2251 nxt_conf_value_t *value)
2252 {
2253 nxt_router_js_module_t *js_module;
2254
2255 js_module = nxt_mp_get(tmcf->mem_pool, sizeof(nxt_router_js_module_t));
2256 if (nxt_slow_path(js_module == NULL)) {
2257 return NXT_ERROR;
2258 }
2259
2260 js_module->temp_conf = tmcf;
2261 nxt_conf_get_string(value, &js_module->name);
2262
2263 nxt_queue_insert_tail(&tmcf->js_modules, &js_module->link);
2264
2265 return NXT_OK;
2266 }
2267
2268 #endif
2269
2270
2271 static nxt_int_t
nxt_router_conf_process_static(nxt_task_t * task,nxt_router_conf_t * rtcf,nxt_conf_value_t * conf)2272 nxt_router_conf_process_static(nxt_task_t *task, nxt_router_conf_t *rtcf,
2273 nxt_conf_value_t *conf)
2274 {
2275 uint32_t next, i;
2276 nxt_mp_t *mp;
2277 nxt_str_t *type, exten, str;
2278 nxt_int_t ret;
2279 nxt_uint_t exts;
2280 nxt_conf_value_t *mtypes_conf, *ext_conf, *value;
2281
2282 static nxt_str_t mtypes_path = nxt_string("/mime_types");
2283
2284 mp = rtcf->mem_pool;
2285
2286 ret = nxt_http_static_mtypes_init(mp, &rtcf->mtypes_hash);
2287 if (nxt_slow_path(ret != NXT_OK)) {
2288 return NXT_ERROR;
2289 }
2290
2291 if (conf == NULL) {
2292 return NXT_OK;
2293 }
2294
2295 mtypes_conf = nxt_conf_get_path(conf, &mtypes_path);
2296
2297 if (mtypes_conf != NULL) {
2298 next = 0;
2299
2300 for ( ;; ) {
2301 ext_conf = nxt_conf_next_object_member(mtypes_conf, &str, &next);
2302
2303 if (ext_conf == NULL) {
2304 break;
2305 }
2306
2307 type = nxt_str_dup(mp, NULL, &str);
2308 if (nxt_slow_path(type == NULL)) {
2309 return NXT_ERROR;
2310 }
2311
2312 if (nxt_conf_type(ext_conf) == NXT_CONF_STRING) {
2313 nxt_conf_get_string(ext_conf, &str);
2314
2315 if (nxt_slow_path(nxt_str_dup(mp, &exten, &str) == NULL)) {
2316 return NXT_ERROR;
2317 }
2318
2319 ret = nxt_http_static_mtypes_hash_add(mp, &rtcf->mtypes_hash,
2320 &exten, type);
2321 if (nxt_slow_path(ret != NXT_OK)) {
2322 return NXT_ERROR;
2323 }
2324
2325 continue;
2326 }
2327
2328 exts = nxt_conf_array_elements_count(ext_conf);
2329
2330 for (i = 0; i < exts; i++) {
2331 value = nxt_conf_get_array_element(ext_conf, i);
2332
2333 nxt_conf_get_string(value, &str);
2334
2335 if (nxt_slow_path(nxt_str_dup(mp, &exten, &str) == NULL)) {
2336 return NXT_ERROR;
2337 }
2338
2339 ret = nxt_http_static_mtypes_hash_add(mp, &rtcf->mtypes_hash,
2340 &exten, type);
2341 if (nxt_slow_path(ret != NXT_OK)) {
2342 return NXT_ERROR;
2343 }
2344 }
2345 }
2346 }
2347
2348 return NXT_OK;
2349 }
2350
2351
2352 static nxt_http_forward_t *
nxt_router_conf_forward(nxt_task_t * task,nxt_mp_t * mp,nxt_conf_value_t * conf)2353 nxt_router_conf_forward(nxt_task_t *task, nxt_mp_t *mp, nxt_conf_value_t *conf)
2354 {
2355 nxt_int_t ret;
2356 nxt_conf_value_t *header_conf, *client_ip_conf, *protocol_conf;
2357 nxt_conf_value_t *source_conf, *recursive_conf;
2358 nxt_http_forward_t *forward;
2359 nxt_http_route_addr_rule_t *source;
2360
2361 static nxt_str_t header_path = nxt_string("/header");
2362 static nxt_str_t client_ip_path = nxt_string("/client_ip");
2363 static nxt_str_t protocol_path = nxt_string("/protocol");
2364 static nxt_str_t source_path = nxt_string("/source");
2365 static nxt_str_t recursive_path = nxt_string("/recursive");
2366
2367 header_conf = nxt_conf_get_path(conf, &header_path);
2368
2369 if (header_conf != NULL) {
2370 client_ip_conf = nxt_conf_get_path(conf, &header_path);
2371 protocol_conf = NULL;
2372
2373 } else {
2374 client_ip_conf = nxt_conf_get_path(conf, &client_ip_path);
2375 protocol_conf = nxt_conf_get_path(conf, &protocol_path);
2376 }
2377
2378 source_conf = nxt_conf_get_path(conf, &source_path);
2379 recursive_conf = nxt_conf_get_path(conf, &recursive_path);
2380
2381 if (source_conf == NULL
2382 || (protocol_conf == NULL && client_ip_conf == NULL))
2383 {
2384 return NULL;
2385 }
2386
2387 forward = nxt_mp_zget(mp, sizeof(nxt_http_forward_t));
2388 if (nxt_slow_path(forward == NULL)) {
2389 return NULL;
2390 }
2391
2392 source = nxt_http_route_addr_rule_create(task, mp, source_conf);
2393 if (nxt_slow_path(source == NULL)) {
2394 return NULL;
2395 }
2396
2397 forward->source = source;
2398
2399 if (recursive_conf != NULL) {
2400 forward->recursive = nxt_conf_get_boolean(recursive_conf);
2401 }
2402
2403 if (client_ip_conf != NULL) {
2404 ret = nxt_router_conf_forward_header(mp, client_ip_conf,
2405 &forward->client_ip);
2406 if (nxt_slow_path(ret != NXT_OK)) {
2407 return NULL;
2408 }
2409 }
2410
2411 if (protocol_conf != NULL) {
2412 ret = nxt_router_conf_forward_header(mp, protocol_conf,
2413 &forward->protocol);
2414 if (nxt_slow_path(ret != NXT_OK)) {
2415 return NULL;
2416 }
2417 }
2418
2419 return forward;
2420 }
2421
2422
2423 static nxt_int_t
nxt_router_conf_forward_header(nxt_mp_t * mp,nxt_conf_value_t * conf,nxt_http_forward_header_t * fh)2424 nxt_router_conf_forward_header(nxt_mp_t *mp, nxt_conf_value_t *conf,
2425 nxt_http_forward_header_t *fh)
2426 {
2427 char c;
2428 size_t i;
2429 uint32_t hash;
2430 nxt_str_t header;
2431
2432 nxt_conf_get_string(conf, &header);
2433
2434 fh->header = nxt_str_dup(mp, NULL, &header);
2435 if (nxt_slow_path(fh->header == NULL)) {
2436 return NXT_ERROR;
2437 }
2438
2439 hash = NXT_HTTP_FIELD_HASH_INIT;
2440
2441 for (i = 0; i < fh->header->length; i++) {
2442 c = fh->header->start[i];
2443 hash = nxt_http_field_hash_char(hash, nxt_lowcase(c));
2444 }
2445
2446 hash = nxt_http_field_hash_end(hash) & 0xFFFF;
2447
2448 fh->header_hash = hash;
2449
2450 return NXT_OK;
2451 }
2452
2453
2454 static nxt_app_t *
nxt_router_app_find(nxt_queue_t * queue,nxt_str_t * name)2455 nxt_router_app_find(nxt_queue_t *queue, nxt_str_t *name)
2456 {
2457 nxt_app_t *app;
2458
2459 nxt_queue_each(app, queue, nxt_app_t, link) {
2460
2461 if (nxt_strstr_eq(name, &app->name)) {
2462 return app;
2463 }
2464
2465 } nxt_queue_loop;
2466
2467 return NULL;
2468 }
2469
2470
2471 static nxt_int_t
nxt_router_app_queue_init(nxt_task_t * task,nxt_port_t * port)2472 nxt_router_app_queue_init(nxt_task_t *task, nxt_port_t *port)
2473 {
2474 void *mem;
2475 nxt_int_t fd;
2476
2477 fd = nxt_shm_open(task, sizeof(nxt_app_queue_t));
2478 if (nxt_slow_path(fd == -1)) {
2479 return NXT_ERROR;
2480 }
2481
2482 mem = nxt_mem_mmap(NULL, sizeof(nxt_app_queue_t),
2483 PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
2484 if (nxt_slow_path(mem == MAP_FAILED)) {
2485 nxt_fd_close(fd);
2486
2487 return NXT_ERROR;
2488 }
2489
2490 nxt_app_queue_init(mem);
2491
2492 port->queue_fd = fd;
2493 port->queue = mem;
2494
2495 return NXT_OK;
2496 }
2497
2498
2499 static nxt_int_t
nxt_router_port_queue_init(nxt_task_t * task,nxt_port_t * port)2500 nxt_router_port_queue_init(nxt_task_t *task, nxt_port_t *port)
2501 {
2502 void *mem;
2503 nxt_int_t fd;
2504
2505 fd = nxt_shm_open(task, sizeof(nxt_port_queue_t));
2506 if (nxt_slow_path(fd == -1)) {
2507 return NXT_ERROR;
2508 }
2509
2510 mem = nxt_mem_mmap(NULL, sizeof(nxt_port_queue_t),
2511 PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
2512 if (nxt_slow_path(mem == MAP_FAILED)) {
2513 nxt_fd_close(fd);
2514
2515 return NXT_ERROR;
2516 }
2517
2518 nxt_port_queue_init(mem);
2519
2520 port->queue_fd = fd;
2521 port->queue = mem;
2522
2523 return NXT_OK;
2524 }
2525
2526
2527 static nxt_int_t
nxt_router_port_queue_map(nxt_task_t * task,nxt_port_t * port,nxt_fd_t fd)2528 nxt_router_port_queue_map(nxt_task_t *task, nxt_port_t *port, nxt_fd_t fd)
2529 {
2530 void *mem;
2531
2532 nxt_assert(fd != -1);
2533
2534 mem = nxt_mem_mmap(NULL, sizeof(nxt_port_queue_t),
2535 PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
2536 if (nxt_slow_path(mem == MAP_FAILED)) {
2537
2538 return NXT_ERROR;
2539 }
2540
2541 port->queue = mem;
2542
2543 return NXT_OK;
2544 }
2545
2546
2547 static const nxt_lvlhsh_proto_t nxt_router_apps_hash_proto nxt_aligned(64) = {
2548 NXT_LVLHSH_DEFAULT,
2549 nxt_router_apps_hash_test,
2550 nxt_mp_lvlhsh_alloc,
2551 nxt_mp_lvlhsh_free,
2552 };
2553
2554
2555 static nxt_int_t
nxt_router_apps_hash_test(nxt_lvlhsh_query_t * lhq,void * data)2556 nxt_router_apps_hash_test(nxt_lvlhsh_query_t *lhq, void *data)
2557 {
2558 nxt_app_t *app;
2559
2560 app = data;
2561
2562 return nxt_strstr_eq(&lhq->key, &app->name) ? NXT_OK : NXT_DECLINED;
2563 }
2564
2565
2566 static nxt_int_t
nxt_router_apps_hash_add(nxt_router_conf_t * rtcf,nxt_app_t * app)2567 nxt_router_apps_hash_add(nxt_router_conf_t *rtcf, nxt_app_t *app)
2568 {
2569 nxt_lvlhsh_query_t lhq;
2570
2571 lhq.key_hash = nxt_djb_hash(app->name.start, app->name.length);
2572 lhq.replace = 0;
2573 lhq.key = app->name;
2574 lhq.value = app;
2575 lhq.proto = &nxt_router_apps_hash_proto;
2576 lhq.pool = rtcf->mem_pool;
2577
2578 switch (nxt_lvlhsh_insert(&rtcf->apps_hash, &lhq)) {
2579
2580 case NXT_OK:
2581 return NXT_OK;
2582
2583 case NXT_DECLINED:
2584 nxt_thread_log_alert("router app hash adding failed: "
2585 "\"%V\" is already in hash", &lhq.key);
2586 /* Fall through. */
2587 default:
2588 return NXT_ERROR;
2589 }
2590 }
2591
2592
2593 static nxt_app_t *
nxt_router_apps_hash_get(nxt_router_conf_t * rtcf,nxt_str_t * name)2594 nxt_router_apps_hash_get(nxt_router_conf_t *rtcf, nxt_str_t *name)
2595 {
2596 nxt_lvlhsh_query_t lhq;
2597
2598 lhq.key_hash = nxt_djb_hash(name->start, name->length);
2599 lhq.key = *name;
2600 lhq.proto = &nxt_router_apps_hash_proto;
2601
2602 if (nxt_lvlhsh_find(&rtcf->apps_hash, &lhq) != NXT_OK) {
2603 return NULL;
2604 }
2605
2606 return lhq.value;
2607 }
2608
2609
2610 static void
nxt_router_apps_hash_use(nxt_task_t * task,nxt_router_conf_t * rtcf,int i)2611 nxt_router_apps_hash_use(nxt_task_t *task, nxt_router_conf_t *rtcf, int i)
2612 {
2613 nxt_app_t *app;
2614 nxt_lvlhsh_each_t lhe;
2615
2616 nxt_lvlhsh_each_init(&lhe, &nxt_router_apps_hash_proto);
2617
2618 for ( ;; ) {
2619 app = nxt_lvlhsh_each(&rtcf->apps_hash, &lhe);
2620
2621 if (app == NULL) {
2622 break;
2623 }
2624
2625 nxt_router_app_use(task, app, i);
2626 }
2627 }
2628
2629
2630 typedef struct {
2631 nxt_app_t *app;
2632 nxt_int_t target;
2633 } nxt_http_app_conf_t;
2634
2635
2636 nxt_int_t
nxt_router_application_init(nxt_router_conf_t * rtcf,nxt_str_t * name,nxt_str_t * target,nxt_http_action_t * action)2637 nxt_router_application_init(nxt_router_conf_t *rtcf, nxt_str_t *name,
2638 nxt_str_t *target, nxt_http_action_t *action)
2639 {
2640 nxt_app_t *app;
2641 nxt_str_t *targets;
2642 nxt_uint_t i;
2643 nxt_http_app_conf_t *conf;
2644
2645 app = nxt_router_apps_hash_get(rtcf, name);
2646 if (app == NULL) {
2647 return NXT_DECLINED;
2648 }
2649
2650 conf = nxt_mp_get(rtcf->mem_pool, sizeof(nxt_http_app_conf_t));
2651 if (nxt_slow_path(conf == NULL)) {
2652 return NXT_ERROR;
2653 }
2654
2655 action->handler = nxt_http_application_handler;
2656 action->u.conf = conf;
2657
2658 conf->app = app;
2659
2660 if (target != NULL && target->length != 0) {
2661 targets = app->targets;
2662
2663 for (i = 0; !nxt_strstr_eq(target, &targets[i]); i++);
2664
2665 conf->target = i;
2666
2667 } else {
2668 conf->target = 0;
2669 }
2670
2671 return NXT_OK;
2672 }
2673
2674
2675 static nxt_socket_conf_t *
nxt_router_socket_conf(nxt_task_t * task,nxt_router_temp_conf_t * tmcf,nxt_str_t * name)2676 nxt_router_socket_conf(nxt_task_t *task, nxt_router_temp_conf_t *tmcf,
2677 nxt_str_t *name)
2678 {
2679 size_t size;
2680 nxt_int_t ret;
2681 nxt_bool_t wildcard;
2682 nxt_sockaddr_t *sa;
2683 nxt_socket_conf_t *skcf;
2684 nxt_listen_socket_t *ls;
2685
2686 sa = nxt_sockaddr_parse(tmcf->mem_pool, name);
2687 if (nxt_slow_path(sa == NULL)) {
2688 nxt_alert(task, "invalid listener \"%V\"", name);
2689 return NULL;
2690 }
2691
2692 sa->type = SOCK_STREAM;
2693
2694 nxt_debug(task, "router listener: \"%*s\"",
2695 (size_t) sa->length, nxt_sockaddr_start(sa));
2696
2697 skcf = nxt_mp_zget(tmcf->router_conf->mem_pool, sizeof(nxt_socket_conf_t));
2698 if (nxt_slow_path(skcf == NULL)) {
2699 return NULL;
2700 }
2701
2702 size = nxt_sockaddr_size(sa);
2703
2704 ret = nxt_router_listen_socket_find(tmcf, skcf, sa);
2705
2706 if (ret != NXT_OK) {
2707
2708 ls = nxt_zalloc(sizeof(nxt_listen_socket_t) + size);
2709 if (nxt_slow_path(ls == NULL)) {
2710 return NULL;
2711 }
2712
2713 skcf->listen = ls;
2714
2715 ls->sockaddr = nxt_pointer_to(ls, sizeof(nxt_listen_socket_t));
2716 nxt_memcpy(ls->sockaddr, sa, size);
2717
2718 nxt_listen_socket_remote_size(ls);
2719
2720 ls->socket = -1;
2721 ls->backlog = NXT_LISTEN_BACKLOG;
2722 ls->flags = NXT_NONBLOCK;
2723 ls->read_after_accept = 1;
2724 }
2725
2726 switch (sa->u.sockaddr.sa_family) {
2727 #if (NXT_HAVE_UNIX_DOMAIN)
2728 case AF_UNIX:
2729 wildcard = 0;
2730 break;
2731 #endif
2732 #if (NXT_INET6)
2733 case AF_INET6:
2734 wildcard = IN6_IS_ADDR_UNSPECIFIED(&sa->u.sockaddr_in6.sin6_addr);
2735 break;
2736 #endif
2737 case AF_INET:
2738 default:
2739 wildcard = (sa->u.sockaddr_in.sin_addr.s_addr == INADDR_ANY);
2740 break;
2741 }
2742
2743 if (!wildcard) {
2744 skcf->sockaddr = nxt_mp_zget(tmcf->router_conf->mem_pool, size);
2745 if (nxt_slow_path(skcf->sockaddr == NULL)) {
2746 return NULL;
2747 }
2748
2749 nxt_memcpy(skcf->sockaddr, sa, size);
2750 }
2751
2752 return skcf;
2753 }
2754
2755
2756 static nxt_int_t
nxt_router_listen_socket_find(nxt_router_temp_conf_t * tmcf,nxt_socket_conf_t * nskcf,nxt_sockaddr_t * sa)2757 nxt_router_listen_socket_find(nxt_router_temp_conf_t *tmcf,
2758 nxt_socket_conf_t *nskcf, nxt_sockaddr_t *sa)
2759 {
2760 nxt_router_t *router;
2761 nxt_queue_link_t *qlk;
2762 nxt_socket_conf_t *skcf;
2763
2764 router = tmcf->router_conf->router;
2765
2766 for (qlk = nxt_queue_first(&router->sockets);
2767 qlk != nxt_queue_tail(&router->sockets);
2768 qlk = nxt_queue_next(qlk))
2769 {
2770 skcf = nxt_queue_link_data(qlk, nxt_socket_conf_t, link);
2771
2772 if (nxt_sockaddr_cmp(skcf->listen->sockaddr, sa)) {
2773 nskcf->listen = skcf->listen;
2774
2775 nxt_queue_remove(qlk);
2776 nxt_queue_insert_tail(&keeping_sockets, qlk);
2777
2778 nxt_queue_insert_tail(&updating_sockets, &nskcf->link);
2779
2780 return NXT_OK;
2781 }
2782 }
2783
2784 nxt_queue_insert_tail(&pending_sockets, &nskcf->link);
2785
2786 return NXT_DECLINED;
2787 }
2788
2789
2790 static void
nxt_router_listen_socket_rpc_create(nxt_task_t * task,nxt_router_temp_conf_t * tmcf,nxt_socket_conf_t * skcf)2791 nxt_router_listen_socket_rpc_create(nxt_task_t *task,
2792 nxt_router_temp_conf_t *tmcf, nxt_socket_conf_t *skcf)
2793 {
2794 size_t size;
2795 uint32_t stream;
2796 nxt_int_t ret;
2797 nxt_buf_t *b;
2798 nxt_port_t *main_port, *router_port;
2799 nxt_runtime_t *rt;
2800 nxt_socket_rpc_t *rpc;
2801
2802 rpc = nxt_mp_alloc(tmcf->mem_pool, sizeof(nxt_socket_rpc_t));
2803 if (rpc == NULL) {
2804 goto fail;
2805 }
2806
2807 rpc->socket_conf = skcf;
2808 rpc->temp_conf = tmcf;
2809
2810 size = nxt_sockaddr_size(skcf->listen->sockaddr);
2811
2812 b = nxt_buf_mem_alloc(tmcf->mem_pool, size, 0);
2813 if (b == NULL) {
2814 goto fail;
2815 }
2816
2817 b->completion_handler = nxt_buf_dummy_completion;
2818
2819 b->mem.free = nxt_cpymem(b->mem.free, skcf->listen->sockaddr, size);
2820
2821 rt = task->thread->runtime;
2822 main_port = rt->port_by_type[NXT_PROCESS_MAIN];
2823 router_port = rt->port_by_type[NXT_PROCESS_ROUTER];
2824
2825 stream = nxt_port_rpc_register_handler(task, router_port,
2826 nxt_router_listen_socket_ready,
2827 nxt_router_listen_socket_error,
2828 main_port->pid, rpc);
2829 if (nxt_slow_path(stream == 0)) {
2830 goto fail;
2831 }
2832
2833 ret = nxt_port_socket_write(task, main_port, NXT_PORT_MSG_SOCKET, -1,
2834 stream, router_port->id, b);
2835
2836 if (nxt_slow_path(ret != NXT_OK)) {
2837 nxt_port_rpc_cancel(task, router_port, stream);
2838 goto fail;
2839 }
2840
2841 return;
2842
2843 fail:
2844
2845 nxt_router_conf_error(task, tmcf);
2846 }
2847
2848
2849 static void
nxt_router_listen_socket_ready(nxt_task_t * task,nxt_port_recv_msg_t * msg,void * data)2850 nxt_router_listen_socket_ready(nxt_task_t *task, nxt_port_recv_msg_t *msg,
2851 void *data)
2852 {
2853 nxt_int_t ret;
2854 nxt_socket_t s;
2855 nxt_socket_rpc_t *rpc;
2856
2857 rpc = data;
2858
2859 s = msg->fd[0];
2860
2861 ret = nxt_socket_nonblocking(task, s);
2862 if (nxt_slow_path(ret != NXT_OK)) {
2863 goto fail;
2864 }
2865
2866 nxt_socket_defer_accept(task, s, rpc->socket_conf->listen->sockaddr);
2867
2868 ret = nxt_listen_socket(task, s, NXT_LISTEN_BACKLOG);
2869 if (nxt_slow_path(ret != NXT_OK)) {
2870 goto fail;
2871 }
2872
2873 rpc->socket_conf->listen->socket = s;
2874
2875 nxt_work_queue_add(&task->thread->engine->fast_work_queue,
2876 nxt_router_conf_apply, task, rpc->temp_conf, NULL);
2877
2878 return;
2879
2880 fail:
2881
2882 nxt_socket_close(task, s);
2883
2884 nxt_router_conf_error(task, rpc->temp_conf);
2885 }
2886
2887
2888 static void
nxt_router_listen_socket_error(nxt_task_t * task,nxt_port_recv_msg_t * msg,void * data)2889 nxt_router_listen_socket_error(nxt_task_t *task, nxt_port_recv_msg_t *msg,
2890 void *data)
2891 {
2892 nxt_socket_rpc_t *rpc;
2893 nxt_router_temp_conf_t *tmcf;
2894
2895 rpc = data;
2896 tmcf = rpc->temp_conf;
2897
2898 #if 0
2899 u_char *p;
2900 size_t size;
2901 uint8_t error;
2902 nxt_buf_t *in, *out;
2903 nxt_sockaddr_t *sa;
2904
2905 static nxt_str_t socket_errors[] = {
2906 nxt_string("ListenerSystem"),
2907 nxt_string("ListenerNoIPv6"),
2908 nxt_string("ListenerPort"),
2909 nxt_string("ListenerInUse"),
2910 nxt_string("ListenerNoAddress"),
2911 nxt_string("ListenerNoAccess"),
2912 nxt_string("ListenerPath"),
2913 };
2914
2915 sa = rpc->socket_conf->listen->sockaddr;
2916
2917 in = nxt_buf_chk_make_plain(tmcf->mem_pool, msg->buf, msg->size);
2918
2919 if (nxt_slow_path(in == NULL)) {
2920 return;
2921 }
2922
2923 p = in->mem.pos;
2924
2925 error = *p++;
2926
2927 size = nxt_length("listen socket error: ")
2928 + nxt_length("{listener: \"\", code:\"\", message: \"\"}")
2929 + sa->length + socket_errors[error].length + (in->mem.free - p);
2930
2931 out = nxt_buf_mem_alloc(tmcf->mem_pool, size, 0);
2932 if (nxt_slow_path(out == NULL)) {
2933 return;
2934 }
2935
2936 out->mem.free = nxt_sprintf(out->mem.free, out->mem.end,
2937 "listen socket error: "
2938 "{listener: \"%*s\", code:\"%V\", message: \"%*s\"}",
2939 (size_t) sa->length, nxt_sockaddr_start(sa),
2940 &socket_errors[error], in->mem.free - p, p);
2941
2942 nxt_debug(task, "%*s", out->mem.free - out->mem.pos, out->mem.pos);
2943 #endif
2944
2945 nxt_router_conf_error(task, tmcf);
2946 }
2947
2948
2949 #if (NXT_TLS)
2950
2951 static void
nxt_router_tls_rpc_handler(nxt_task_t * task,nxt_port_recv_msg_t * msg,void * data)2952 nxt_router_tls_rpc_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg,
2953 void *data)
2954 {
2955 nxt_mp_t *mp;
2956 nxt_int_t ret;
2957 nxt_tls_conf_t *tlscf;
2958 nxt_router_tlssock_t *tls;
2959 nxt_tls_bundle_conf_t *bundle;
2960 nxt_router_temp_conf_t *tmcf;
2961
2962 nxt_debug(task, "tls rpc handler");
2963
2964 tls = data;
2965 tmcf = tls->temp_conf;
2966
2967 if (msg == NULL || msg->port_msg.type == _NXT_PORT_MSG_RPC_ERROR) {
2968 goto fail;
2969 }
2970
2971 mp = tmcf->router_conf->mem_pool;
2972
2973 if (tls->socket_conf->tls == NULL){
2974 tlscf = nxt_mp_zget(mp, sizeof(nxt_tls_conf_t));
2975 if (nxt_slow_path(tlscf == NULL)) {
2976 goto fail;
2977 }
2978
2979 tlscf->no_wait_shutdown = 1;
2980 tls->socket_conf->tls = tlscf;
2981
2982 } else {
2983 tlscf = tls->socket_conf->tls;
2984 }
2985
2986 tls->tls_init->conf = tlscf;
2987
2988 bundle = nxt_mp_get(mp, sizeof(nxt_tls_bundle_conf_t));
2989 if (nxt_slow_path(bundle == NULL)) {
2990 goto fail;
2991 }
2992
2993 if (nxt_slow_path(nxt_str_dup(mp, &bundle->name, &tls->name) == NULL)) {
2994 goto fail;
2995 }
2996
2997 bundle->chain_file = msg->fd[0];
2998 bundle->next = tlscf->bundle;
2999 tlscf->bundle = bundle;
3000
3001 ret = task->thread->runtime->tls->server_init(task, mp, tls->tls_init,
3002 tls->last);
3003 if (nxt_slow_path(ret != NXT_OK)) {
3004 goto fail;
3005 }
3006
3007 nxt_work_queue_add(&task->thread->engine->fast_work_queue,
3008 nxt_router_conf_apply, task, tmcf, NULL);
3009 return;
3010
3011 fail:
3012
3013 nxt_router_conf_error(task, tmcf);
3014 }
3015
3016 #endif
3017
3018
3019 static void
nxt_router_app_rpc_create(nxt_task_t * task,nxt_router_temp_conf_t * tmcf,nxt_app_t * app)3020 nxt_router_app_rpc_create(nxt_task_t *task,
3021 nxt_router_temp_conf_t *tmcf, nxt_app_t *app)
3022 {
3023 size_t size;
3024 uint32_t stream;
3025 nxt_fd_t port_fd, queue_fd;
3026 nxt_int_t ret;
3027 nxt_buf_t *b;
3028 nxt_port_t *router_port, *dport;
3029 nxt_runtime_t *rt;
3030 nxt_app_rpc_t *rpc;
3031
3032 rt = task->thread->runtime;
3033
3034 dport = app->proto_port;
3035
3036 if (dport == NULL) {
3037 nxt_debug(task, "app '%V' prototype prefork", &app->name);
3038
3039 size = app->name.length + 1 + app->conf.length;
3040
3041 b = nxt_buf_mem_alloc(tmcf->mem_pool, size, 0);
3042 if (nxt_slow_path(b == NULL)) {
3043 goto fail;
3044 }
3045
3046 b->completion_handler = nxt_buf_dummy_completion;
3047
3048 nxt_buf_cpystr(b, &app->name);
3049 *b->mem.free++ = '\0';
3050 nxt_buf_cpystr(b, &app->conf);
3051
3052 dport = rt->port_by_type[NXT_PROCESS_MAIN];
3053
3054 port_fd = app->shared_port->pair[0];
3055 queue_fd = app->shared_port->queue_fd;
3056
3057 } else {
3058 nxt_debug(task, "app '%V' prefork", &app->name);
3059
3060 b = NULL;
3061 port_fd = -1;
3062 queue_fd = -1;
3063 }
3064
3065 router_port = rt->port_by_type[NXT_PROCESS_ROUTER];
3066
3067 rpc = nxt_port_rpc_register_handler_ex(task, router_port,
3068 nxt_router_app_prefork_ready,
3069 nxt_router_app_prefork_error,
3070 sizeof(nxt_app_rpc_t));
3071 if (nxt_slow_path(rpc == NULL)) {
3072 goto fail;
3073 }
3074
3075 rpc->app = app;
3076 rpc->temp_conf = tmcf;
3077 rpc->proto = (b != NULL);
3078
3079 stream = nxt_port_rpc_ex_stream(rpc);
3080
3081 ret = nxt_port_socket_write2(task, dport, NXT_PORT_MSG_START_PROCESS,
3082 port_fd, queue_fd, stream, router_port->id, b);
3083 if (nxt_slow_path(ret != NXT_OK)) {
3084 nxt_port_rpc_cancel(task, router_port, stream);
3085 goto fail;
3086 }
3087
3088 if (b == NULL) {
3089 nxt_port_rpc_ex_set_peer(task, router_port, rpc, dport->pid);
3090
3091 app->pending_processes++;
3092 }
3093
3094 return;
3095
3096 fail:
3097
3098 nxt_router_conf_error(task, tmcf);
3099 }
3100
3101
3102 static void
nxt_router_app_prefork_ready(nxt_task_t * task,nxt_port_recv_msg_t * msg,void * data)3103 nxt_router_app_prefork_ready(nxt_task_t *task, nxt_port_recv_msg_t *msg,
3104 void *data)
3105 {
3106 nxt_app_t *app;
3107 nxt_port_t *port;
3108 nxt_app_rpc_t *rpc;
3109 nxt_event_engine_t *engine;
3110
3111 rpc = data;
3112 app = rpc->app;
3113
3114 port = msg->u.new_port;
3115
3116 nxt_assert(port != NULL);
3117 nxt_assert(port->id == 0);
3118
3119 if (rpc->proto) {
3120 nxt_assert(app->proto_port == NULL);
3121 nxt_assert(port->type == NXT_PROCESS_PROTOTYPE);
3122
3123 nxt_port_inc_use(port);
3124
3125 app->proto_port = port;
3126 port->app = app;
3127
3128 nxt_router_app_rpc_create(task, rpc->temp_conf, app);
3129
3130 return;
3131 }
3132
3133 nxt_assert(port->type == NXT_PROCESS_APP);
3134
3135 port->app = app;
3136 port->main_app_port = port;
3137
3138 app->pending_processes--;
3139 app->processes++;
3140 app->idle_processes++;
3141
3142 engine = task->thread->engine;
3143
3144 nxt_queue_insert_tail(&app->ports, &port->app_link);
3145 nxt_queue_insert_tail(&app->spare_ports, &port->idle_link);
3146
3147 nxt_debug(task, "app '%V' move new port %PI:%d to spare_ports",
3148 &app->name, port->pid, port->id);
3149
3150 nxt_port_hash_add(&app->port_hash, port);
3151 app->port_hash_count++;
3152
3153 port->idle_start = 0;
3154
3155 nxt_port_inc_use(port);
3156
3157 nxt_port_socket_write(task, port, NXT_PORT_MSG_PORT_ACK, -1, 0, 0, NULL);
3158
3159 nxt_work_queue_add(&engine->fast_work_queue,
3160 nxt_router_conf_apply, task, rpc->temp_conf, NULL);
3161 }
3162
3163
3164 static void
nxt_router_app_prefork_error(nxt_task_t * task,nxt_port_recv_msg_t * msg,void * data)3165 nxt_router_app_prefork_error(nxt_task_t *task, nxt_port_recv_msg_t *msg,
3166 void *