1
2 /*
3 * Copyright (C) Igor Sysoev
4 * Copyright (C) Valentin V. Bartenev
5 * Copyright (C) NGINX, Inc.
6 */
7
8 #include <nxt_router.h>
9 #include <nxt_conf.h>
10 #if (NXT_TLS)
11 #include <nxt_cert.h>
12 #endif
13 #include <nxt_http.h>
14 #include <nxt_port_memory_int.h>
15 #include <nxt_unit_request.h>
16 #include <nxt_unit_response.h>
17 #include <nxt_router_request.h>
18 #include <nxt_app_queue.h>
19 #include <nxt_port_queue.h>
20
21 #define NXT_SHARED_PORT_ID 0xFFFFu
22
23 typedef struct {
24 nxt_str_t type;
25 uint32_t processes;
26 uint32_t max_processes;
27 uint32_t spare_processes;
28 nxt_msec_t timeout;
29 nxt_msec_t idle_timeout;
30 nxt_conf_value_t *limits_value;
31 nxt_conf_value_t *processes_value;
32 nxt_conf_value_t *targets_value;
33 } nxt_router_app_conf_t;
34
35
36 typedef struct {
37 nxt_str_t pass;
38 nxt_str_t application;
39 } nxt_router_listener_conf_t;
40
41
42 #if (NXT_TLS)
43
44 typedef struct {
45 nxt_str_t name;
46 nxt_socket_conf_t *socket_conf;
47 nxt_router_temp_conf_t *temp_conf;
48 nxt_tls_init_t *tls_init;
49 nxt_bool_t last;
50
51 nxt_queue_link_t link; /* for nxt_socket_conf_t.tls */
52 } nxt_router_tlssock_t;
53
54 #endif
55
56
57 typedef struct {
58 nxt_str_t *name;
59 nxt_socket_conf_t *socket_conf;
60 nxt_router_temp_conf_t *temp_conf;
61 nxt_bool_t last;
62 } nxt_socket_rpc_t;
63
64
65 typedef struct {
66 nxt_app_t *app;
67 nxt_router_temp_conf_t *temp_conf;
68 uint8_t proto; /* 1 bit */
69 } nxt_app_rpc_t;
70
71
72 typedef struct {
73 nxt_app_joint_t *app_joint;
74 uint32_t generation;
75 uint8_t proto; /* 1 bit */
76 } nxt_app_joint_rpc_t;
77
78
79 static nxt_int_t nxt_router_prefork(nxt_task_t *task, nxt_process_t *process,
80 nxt_mp_t *mp);
81 static nxt_int_t nxt_router_start(nxt_task_t *task, nxt_process_data_t *data);
82 static void nxt_router_greet_controller(nxt_task_t *task,
83 nxt_port_t *controller_port);
84
85 static nxt_int_t nxt_router_start_app_process(nxt_task_t *task, nxt_app_t *app);
86
87 static void nxt_router_new_port_handler(nxt_task_t *task,
88 nxt_port_recv_msg_t *msg);
89 static void nxt_router_conf_data_handler(nxt_task_t *task,
90 nxt_port_recv_msg_t *msg);
91 static void nxt_router_app_restart_handler(nxt_task_t *task,
92 nxt_port_recv_msg_t *msg);
93 static void nxt_router_remove_pid_handler(nxt_task_t *task,
94 nxt_port_recv_msg_t *msg);
95 static void nxt_router_access_log_reopen_handler(nxt_task_t *task,
96 nxt_port_recv_msg_t *msg);
97
98 static nxt_router_temp_conf_t *nxt_router_temp_conf(nxt_task_t *task);
99 static void nxt_router_conf_apply(nxt_task_t *task, void *obj, void *data);
100 static void nxt_router_conf_ready(nxt_task_t *task,
101 nxt_router_temp_conf_t *tmcf);
102 static void nxt_router_conf_error(nxt_task_t *task,
103 nxt_router_temp_conf_t *tmcf);
104 static void nxt_router_conf_send(nxt_task_t *task,
105 nxt_router_temp_conf_t *tmcf, nxt_port_msg_type_t type);
106
107 static nxt_int_t nxt_router_conf_create(nxt_task_t *task,
108 nxt_router_temp_conf_t *tmcf, u_char *start, u_char *end);
109 static nxt_int_t nxt_router_conf_process_static(nxt_task_t *task,
110 nxt_router_conf_t *rtcf, nxt_conf_value_t *conf);
111 static nxt_http_forward_t *nxt_router_conf_forward(nxt_task_t *task,
112 nxt_mp_t *mp, nxt_conf_value_t *conf);
113 static nxt_int_t nxt_router_conf_forward_header(nxt_mp_t *mp,
114 nxt_conf_value_t *conf, nxt_http_forward_header_t *fh);
115
116 static nxt_app_t *nxt_router_app_find(nxt_queue_t *queue, nxt_str_t *name);
117 static nxt_int_t nxt_router_apps_hash_test(nxt_lvlhsh_query_t *lhq, void *data);
118 static nxt_int_t nxt_router_apps_hash_add(nxt_router_conf_t *rtcf,
119 nxt_app_t *app);
120 static nxt_app_t *nxt_router_apps_hash_get(nxt_router_conf_t *rtcf,
121 nxt_str_t *name);
122 static void nxt_router_apps_hash_use(nxt_task_t *task, nxt_router_conf_t *rtcf,
123 int i);
124
125 static nxt_int_t nxt_router_app_queue_init(nxt_task_t *task,
126 nxt_port_t *port);
127 static nxt_int_t nxt_router_port_queue_init(nxt_task_t *task,
128 nxt_port_t *port);
129 static nxt_int_t nxt_router_port_queue_map(nxt_task_t *task,
130 nxt_port_t *port, nxt_fd_t fd);
131 static void nxt_router_listen_socket_rpc_create(nxt_task_t *task,
132 nxt_router_temp_conf_t *tmcf, nxt_socket_conf_t *skcf);
133 static void nxt_router_listen_socket_ready(nxt_task_t *task,
134 nxt_port_recv_msg_t *msg, void *data);
135 static void nxt_router_listen_socket_error(nxt_task_t *task,
136 nxt_port_recv_msg_t *msg, void *data);
137 #if (NXT_TLS)
138 static void nxt_router_tls_rpc_handler(nxt_task_t *task,
139 nxt_port_recv_msg_t *msg, void *data);
140 static nxt_int_t nxt_router_conf_tls_insert(nxt_router_temp_conf_t *tmcf,
141 nxt_conf_value_t *value, nxt_socket_conf_t *skcf, nxt_tls_init_t *tls_init,
142 nxt_bool_t last);
143 #endif
144 static void nxt_router_app_rpc_create(nxt_task_t *task,
145 nxt_router_temp_conf_t *tmcf, nxt_app_t *app);
146 static void nxt_router_app_prefork_ready(nxt_task_t *task,
147 nxt_port_recv_msg_t *msg, void *data);
148 static void nxt_router_app_prefork_error(nxt_task_t *task,
149 nxt_port_recv_msg_t *msg, void *data);
150 static nxt_socket_conf_t *nxt_router_socket_conf(nxt_task_t *task,
151 nxt_router_temp_conf_t *tmcf, nxt_str_t *name);
152 static nxt_int_t nxt_router_listen_socket_find(nxt_router_temp_conf_t *tmcf,
153 nxt_socket_conf_t *nskcf, nxt_sockaddr_t *sa);
154
155 static nxt_int_t nxt_router_engines_create(nxt_task_t *task,
156 nxt_router_t *router, nxt_router_temp_conf_t *tmcf,
157 const nxt_event_interface_t *interface);
158 static nxt_int_t nxt_router_engine_conf_create(nxt_router_temp_conf_t *tmcf,
159 nxt_router_engine_conf_t *recf);
160 static nxt_int_t nxt_router_engine_conf_update(nxt_router_temp_conf_t *tmcf,
161 nxt_router_engine_conf_t *recf);
162 static nxt_int_t nxt_router_engine_conf_delete(nxt_router_temp_conf_t *tmcf,
163 nxt_router_engine_conf_t *recf);
164 static nxt_int_t nxt_router_engine_joints_create(nxt_router_temp_conf_t *tmcf,
165 nxt_router_engine_conf_t *recf, nxt_queue_t *sockets,
166 nxt_work_handler_t handler);
167 static nxt_int_t nxt_router_engine_quit(nxt_router_temp_conf_t *tmcf,
168 nxt_router_engine_conf_t *recf);
169 static nxt_int_t nxt_router_engine_joints_delete(nxt_router_temp_conf_t *tmcf,
170 nxt_router_engine_conf_t *recf, nxt_queue_t *sockets);
171
172 static nxt_int_t nxt_router_threads_create(nxt_task_t *task, nxt_runtime_t *rt,
173 nxt_router_temp_conf_t *tmcf);
174 static nxt_int_t nxt_router_thread_create(nxt_task_t *task, nxt_runtime_t *rt,
175 nxt_event_engine_t *engine);
176 static void nxt_router_apps_sort(nxt_task_t *task, nxt_router_t *router,
177 nxt_router_temp_conf_t *tmcf);
178
179 static void nxt_router_engines_post(nxt_router_t *router,
180 nxt_router_temp_conf_t *tmcf);
181 static void nxt_router_engine_post(nxt_event_engine_t *engine,
182 nxt_work_t *jobs);
183
184 static void nxt_router_thread_start(void *data);
185 static void nxt_router_rt_add_port(nxt_task_t *task, void *obj,
186 void *data);
187 static void nxt_router_listen_socket_create(nxt_task_t *task, void *obj,
188 void *data);
189 static void nxt_router_listen_socket_update(nxt_task_t *task, void *obj,
190 void *data);
191 static void nxt_router_listen_socket_delete(nxt_task_t *task, void *obj,
192 void *data);
193 static void nxt_router_worker_thread_quit(nxt_task_t *task, void *obj,
194 void *data);
195 static void nxt_router_listen_socket_close(nxt_task_t *task, void *obj,
196 void *data);
197 static void nxt_router_thread_exit_handler(nxt_task_t *task, void *obj,
198 void *data);
199 static void nxt_router_req_headers_ack_handler(nxt_task_t *task,
200 nxt_port_recv_msg_t *msg, nxt_request_rpc_data_t *req_rpc_data);
201 static void nxt_router_listen_socket_release(nxt_task_t *task,
202 nxt_socket_conf_t *skcf);
203
204 static void nxt_router_access_log_writer(nxt_task_t *task,
205 nxt_http_request_t *r, nxt_router_access_log_t *access_log);
206 static u_char *nxt_router_access_log_date(u_char *buf, nxt_realtime_t *now,
207 struct tm *tm, size_t size, const char *format);
208 static void nxt_router_access_log_open(nxt_task_t *task,
209 nxt_router_temp_conf_t *tmcf);
210 static void nxt_router_access_log_ready(nxt_task_t *task,
211 nxt_port_recv_msg_t *msg, void *data);
212 static void nxt_router_access_log_error(nxt_task_t *task,
213 nxt_port_recv_msg_t *msg, void *data);
214 static void nxt_router_access_log_use(nxt_thread_spinlock_t *lock,
215 nxt_router_access_log_t *access_log);
216 static void nxt_router_access_log_release(nxt_task_t *task,
217 nxt_thread_spinlock_t *lock, nxt_router_access_log_t *access_log);
218 static void nxt_router_access_log_reopen_completion(nxt_task_t *task, void *obj,
219 void *data);
220 static void nxt_router_access_log_reopen_ready(nxt_task_t *task,
221 nxt_port_recv_msg_t *msg, void *data);
222 static void nxt_router_access_log_reopen_error(nxt_task_t *task,
223 nxt_port_recv_msg_t *msg, void *data);
224
225 static void nxt_router_app_port_ready(nxt_task_t *task,
226 nxt_port_recv_msg_t *msg, void *data);
227 static void nxt_router_app_port_error(nxt_task_t *task,
228 nxt_port_recv_msg_t *msg, void *data);
229
230 static void nxt_router_app_use(nxt_task_t *task, nxt_app_t *app, int i);
231 static void nxt_router_app_unlink(nxt_task_t *task, nxt_app_t *app);
232
233 static void nxt_router_app_port_release(nxt_task_t *task, nxt_app_t *app,
234 nxt_port_t *port, nxt_apr_action_t action);
235 static void nxt_router_app_port_get(nxt_task_t *task, nxt_app_t *app,
236 nxt_request_rpc_data_t *req_rpc_data);
237 static void nxt_router_http_request_error(nxt_task_t *task, void *obj,
238 void *data);
239 static void nxt_router_http_request_done(nxt_task_t *task, void *obj,
240 void *data);
241
242 static void nxt_router_app_prepare_request(nxt_task_t *task,
243 nxt_request_rpc_data_t *req_rpc_data);
244 static nxt_buf_t *nxt_router_prepare_msg(nxt_task_t *task,
245 nxt_http_request_t *r, nxt_app_t *app, const nxt_str_t *prefix);
246
247 static void nxt_router_app_timeout(nxt_task_t *task, void *obj, void *data);
248 static void nxt_router_adjust_idle_timer(nxt_task_t *task, void *obj,
249 void *data);
250 static void nxt_router_app_idle_timeout(nxt_task_t *task, void *obj,
251 void *data);
252 static void nxt_router_app_joint_release_handler(nxt_task_t *task, void *obj,
253 void *data);
254 static void nxt_router_free_app(nxt_task_t *task, void *obj, void *data);
255
256 static const nxt_http_request_state_t nxt_http_request_send_state;
257 static void nxt_http_request_send_body(nxt_task_t *task, void *obj, void *data);
258
259 static void nxt_router_app_joint_use(nxt_task_t *task,
260 nxt_app_joint_t *app_joint, int i);
261
262 static void nxt_router_http_request_release_post(nxt_task_t *task,
263 nxt_http_request_t *r);
264 static void nxt_router_http_request_release(nxt_task_t *task, void *obj,
265 void *data);
266 static void nxt_router_oosm_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg);
267 static void nxt_router_get_port_handler(nxt_task_t *task,
268 nxt_port_recv_msg_t *msg);
269 static void nxt_router_get_mmap_handler(nxt_task_t *task,
270 nxt_port_recv_msg_t *msg);
271
272 extern const nxt_http_request_state_t nxt_http_websocket;
273
274 static nxt_router_t *nxt_router;
275
276 static const nxt_str_t http_prefix = nxt_string("HTTP_");
277 static const nxt_str_t empty_prefix = nxt_string("");
278
279 static const nxt_str_t *nxt_app_msg_prefix[] = {
280 &empty_prefix,
281 &empty_prefix,
282 &http_prefix,
283 &http_prefix,
284 &http_prefix,
285 &empty_prefix,
286 };
287
288
289 static const nxt_port_handlers_t nxt_router_process_port_handlers = {
290 .quit = nxt_signal_quit_handler,
291 .new_port = nxt_router_new_port_handler,
292 .get_port = nxt_router_get_port_handler,
293 .change_file = nxt_port_change_log_file_handler,
294 .mmap = nxt_port_mmap_handler,
295 .get_mmap = nxt_router_get_mmap_handler,
296 .data = nxt_router_conf_data_handler,
297 .app_restart = nxt_router_app_restart_handler,
298 .remove_pid = nxt_router_remove_pid_handler,
299 .access_log = nxt_router_access_log_reopen_handler,
300 .rpc_ready = nxt_port_rpc_handler,
301 .rpc_error = nxt_port_rpc_handler,
302 .oosm = nxt_router_oosm_handler,
303 };
304
305
306 const nxt_process_init_t nxt_router_process = {
307 .name = "router",
308 .type = NXT_PROCESS_ROUTER,
309 .prefork = nxt_router_prefork,
310 .restart = 1,
311 .setup = nxt_process_core_setup,
312 .start = nxt_router_start,
313 .port_handlers = &nxt_router_process_port_handlers,
314 .signals = nxt_process_signals,
315 };
316
317
318 /* Queues of nxt_socket_conf_t */
319 nxt_queue_t creating_sockets;
320 nxt_queue_t pending_sockets;
321 nxt_queue_t updating_sockets;
322 nxt_queue_t keeping_sockets;
323 nxt_queue_t deleting_sockets;
324
325
326 static nxt_int_t
nxt_router_prefork(nxt_task_t * task,nxt_process_t * process,nxt_mp_t * mp)327 nxt_router_prefork(nxt_task_t *task, nxt_process_t *process, nxt_mp_t *mp)
328 {
329 nxt_runtime_stop_app_processes(task, task->thread->runtime);
330
331 return NXT_OK;
332 }
333
334
335 static nxt_int_t
nxt_router_start(nxt_task_t * task,nxt_process_data_t * data)336 nxt_router_start(nxt_task_t *task, nxt_process_data_t *data)
337 {
338 nxt_int_t ret;
339 nxt_port_t *controller_port;
340 nxt_router_t *router;
341 nxt_runtime_t *rt;
342
343 rt = task->thread->runtime;
344
345 nxt_log(task, NXT_LOG_INFO, "router started");
346
347 #if (NXT_TLS)
348 rt->tls = nxt_service_get(rt->services, "SSL/TLS", "OpenSSL");
349 if (nxt_slow_path(rt->tls == NULL)) {
350 return NXT_ERROR;
351 }
352
353 ret = rt->tls->library_init(task);
354 if (nxt_slow_path(ret != NXT_OK)) {
355 return ret;
356 }
357 #endif
358
359 ret = nxt_http_init(task);
360 if (nxt_slow_path(ret != NXT_OK)) {
361 return ret;
362 }
363
364 router = nxt_zalloc(sizeof(nxt_router_t));
365 if (nxt_slow_path(router == NULL)) {
366 return NXT_ERROR;
367 }
368
369 nxt_queue_init(&router->engines);
370 nxt_queue_init(&router->sockets);
371 nxt_queue_init(&router->apps);
372
373 nxt_router = router;
374
375 controller_port = rt->port_by_type[NXT_PROCESS_CONTROLLER];
376 if (controller_port != NULL) {
377 nxt_router_greet_controller(task, controller_port);
378 }
379
380 return NXT_OK;
381 }
382
383
384 static void
nxt_router_greet_controller(nxt_task_t * task,nxt_port_t * controller_port)385 nxt_router_greet_controller(nxt_task_t *task, nxt_port_t *controller_port)
386 {
387 nxt_port_socket_write(task, controller_port, NXT_PORT_MSG_PROCESS_READY,
388 -1, 0, 0, NULL);
389 }
390
391
392 static void
nxt_router_start_app_process_handler(nxt_task_t * task,nxt_port_t * port,void * data)393 nxt_router_start_app_process_handler(nxt_task_t *task, nxt_port_t *port,
394 void *data)
395 {
396 size_t size;
397 uint32_t stream;
398 nxt_fd_t port_fd, queue_fd;
399 nxt_int_t ret;
400 nxt_app_t *app;
401 nxt_buf_t *b;
402 nxt_port_t *dport;
403 nxt_runtime_t *rt;
404 nxt_app_joint_rpc_t *app_joint_rpc;
405
406 app = data;
407
408 nxt_thread_mutex_lock(&app->mutex);
409
410 dport = app->proto_port;
411
412 nxt_thread_mutex_unlock(&app->mutex);
413
414 if (dport != NULL) {
415 nxt_debug(task, "app '%V' %p start process", &app->name, app);
416
417 b = NULL;
418 port_fd = -1;
419 queue_fd = -1;
420
421 } else {
422 if (app->proto_port_requests > 0) {
423 nxt_debug(task, "app '%V' %p wait for prototype process",
424 &app->name, app);
425
426 app->proto_port_requests++;
427
428 goto skip;
429 }
430
431 nxt_debug(task, "app '%V' %p start prototype process", &app->name, app);
432
433 rt = task->thread->runtime;
434 dport = rt->port_by_type[NXT_PROCESS_MAIN];
435
436 size = app->name.length + 1 + app->conf.length;
437
438 b = nxt_buf_mem_alloc(task->thread->engine->mem_pool, size, 0);
439 if (nxt_slow_path(b == NULL)) {
440 goto failed;
441 }
442
443 nxt_buf_cpystr(b, &app->name);
444 *b->mem.free++ = '\0';
445 nxt_buf_cpystr(b, &app->conf);
446
447 port_fd = app->shared_port->pair[0];
448 queue_fd = app->shared_port->queue_fd;
449 }
450
451 app_joint_rpc = nxt_port_rpc_register_handler_ex(task, port,
452 nxt_router_app_port_ready,
453 nxt_router_app_port_error,
454 sizeof(nxt_app_joint_rpc_t));
455 if (nxt_slow_path(app_joint_rpc == NULL)) {
456 goto failed;
457 }
458
459 stream = nxt_port_rpc_ex_stream(app_joint_rpc);
460
461 ret = nxt_port_socket_write2(task, dport, NXT_PORT_MSG_START_PROCESS,
462 port_fd, queue_fd, stream, port->id, b);
463 if (nxt_slow_path(ret != NXT_OK)) {
464 nxt_port_rpc_cancel(task, port, stream);
465
466 goto failed;
467 }
468
469 app_joint_rpc->app_joint = app->joint;
470 app_joint_rpc->generation = app->generation;
471 app_joint_rpc->proto = (b != NULL);
472
473 if (b != NULL) {
474 app->proto_port_requests++;
475
476 b = NULL;
477 }
478
479 nxt_router_app_joint_use(task, app->joint, 1);
480
481 failed:
482
483 if (b != NULL) {
484 nxt_mp_free(b->data, b);
485 }
486
487 skip:
488
489 nxt_router_app_use(task, app, -1);
490 }
491
492
493 static void
nxt_router_app_joint_use(nxt_task_t * task,nxt_app_joint_t * app_joint,int i)494 nxt_router_app_joint_use(nxt_task_t *task, nxt_app_joint_t *app_joint, int i)
495 {
496 app_joint->use_count += i;
497
498 if (app_joint->use_count == 0) {
499 nxt_assert(app_joint->app == NULL);
500
501 nxt_free(app_joint);
502 }
503 }
504
505
506 static nxt_int_t
nxt_router_start_app_process(nxt_task_t * task,nxt_app_t * app)507 nxt_router_start_app_process(nxt_task_t *task, nxt_app_t *app)
508 {
509 nxt_int_t res;
510 nxt_port_t *router_port;
511 nxt_runtime_t *rt;
512
513 nxt_debug(task, "app '%V' start process", &app->name);
514
515 rt = task->thread->runtime;
516 router_port = rt->port_by_type[NXT_PROCESS_ROUTER];
517
518 nxt_router_app_use(task, app, 1);
519
520 res = nxt_port_post(task, router_port, nxt_router_start_app_process_handler,
521 app);
522
523 if (res == NXT_OK) {
524 return res;
525 }
526
527 nxt_thread_mutex_lock(&app->mutex);
528
529 app->pending_processes--;
530
531 nxt_thread_mutex_unlock(&app->mutex);
532
533 nxt_router_app_use(task, app, -1);
534
535 return NXT_ERROR;
536 }
537
538
539 nxt_inline nxt_bool_t
nxt_router_msg_cancel(nxt_task_t * task,nxt_request_rpc_data_t * req_rpc_data)540 nxt_router_msg_cancel(nxt_task_t *task, nxt_request_rpc_data_t *req_rpc_data)
541 {
542 nxt_buf_t *b, *next;
543 nxt_bool_t cancelled;
544 nxt_port_t *app_port;
545 nxt_msg_info_t *msg_info;
546
547 msg_info = &req_rpc_data->msg_info;
548
549 if (msg_info->buf == NULL) {
550 return 0;
551 }
552
553 app_port = req_rpc_data->app_port;
554
555 if (app_port != NULL && app_port->id == NXT_SHARED_PORT_ID) {
556 cancelled = nxt_app_queue_cancel(app_port->queue,
557 msg_info->tracking_cookie,
558 req_rpc_data->stream);
559
560 if (cancelled) {
561 nxt_debug(task, "stream #%uD: cancelled by router",
562 req_rpc_data->stream);
563 }
564
565 } else {
566 cancelled = 0;
567 }
568
569 for (b = msg_info->buf; b != NULL; b = next) {
570 next = b->next;
571 b->next = NULL;
572
573 if (b->is_port_mmap_sent) {
574 b->is_port_mmap_sent = cancelled == 0;
575 }
576
577 b->completion_handler(task, b, b->parent);
578 }
579
580 msg_info->buf = NULL;
581
582 return cancelled;
583 }
584
585
586 nxt_inline nxt_bool_t
nxt_queue_chk_remove(nxt_queue_link_t * lnk)587 nxt_queue_chk_remove(nxt_queue_link_t *lnk)
588 {
589 if (lnk->next != NULL) {
590 nxt_queue_remove(lnk);
591
592 lnk->next = NULL;
593
594 return 1;
595 }
596
597 return 0;
598 }
599
600
601 nxt_inline void
nxt_request_rpc_data_unlink(nxt_task_t * task,nxt_request_rpc_data_t * req_rpc_data)602 nxt_request_rpc_data_unlink(nxt_task_t *task,
603 nxt_request_rpc_data_t *req_rpc_data)
604 {
605 nxt_app_t *app;
606 nxt_bool_t unlinked;
607 nxt_http_request_t *r;
608
609 nxt_router_msg_cancel(task, req_rpc_data);
610
611 app = req_rpc_data->app;
612
613 if (req_rpc_data->app_port != NULL) {
614 nxt_router_app_port_release(task, app, req_rpc_data->app_port,
615 req_rpc_data->apr_action);
616
617 req_rpc_data->app_port = NULL;
618 }
619
620 r = req_rpc_data->request;
621
622 if (r != NULL) {
623 r->timer_data = NULL;
624
625 nxt_router_http_request_release_post(task, r);
626
627 r->req_rpc_data = NULL;
628 req_rpc_data->request = NULL;
629
630 if (app != NULL) {
631 unlinked = 0;
632
633 nxt_thread_mutex_lock(&app->mutex);
634
635 if (r->app_link.next != NULL) {
636 nxt_queue_remove(&r->app_link);
637 r->app_link.next = NULL;
638
639 unlinked = 1;
640 }
641
642 nxt_thread_mutex_unlock(&app->mutex);
643
644 if (unlinked) {
645 nxt_mp_release(r->mem_pool);
646 }
647 }
648 }
649
650 if (app != NULL) {
651 nxt_router_app_use(task, app, -1);
652
653 req_rpc_data->app = NULL;
654 }
655
656 if (req_rpc_data->msg_info.body_fd != -1) {
657 nxt_fd_close(req_rpc_data->msg_info.body_fd);
658
659 req_rpc_data->msg_info.body_fd = -1;
660 }
661
662 if (req_rpc_data->rpc_cancel) {
663 req_rpc_data->rpc_cancel = 0;
664
665 nxt_port_rpc_cancel(task, task->thread->engine->port,
666 req_rpc_data->stream);
667 }
668 }
669
670
671 static void
nxt_router_new_port_handler(nxt_task_t * task,nxt_port_recv_msg_t * msg)672 nxt_router_new_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
673 {
674 nxt_int_t res;
675 nxt_app_t *app;
676 nxt_port_t *port, *main_app_port;
677 nxt_runtime_t *rt;
678
679 nxt_port_new_port_handler(task, msg);
680
681 port = msg->u.new_port;
682
683 if (port != NULL && port->type == NXT_PROCESS_CONTROLLER) {
684 nxt_router_greet_controller(task, msg->u.new_port);
685 }
686
687 if (port != NULL && port->type == NXT_PROCESS_PROTOTYPE) {
688 nxt_port_rpc_handler(task, msg);
689
690 return;
691 }
692
693 if (port == NULL || port->type != NXT_PROCESS_APP) {
694
695 if (msg->port_msg.stream == 0) {
696 return;
697 }
698
699 msg->port_msg.type = _NXT_PORT_MSG_RPC_ERROR;
700
701 } else {
702 if (msg->fd[1] != -1) {
703 res = nxt_router_port_queue_map(task, port, msg->fd[1]);
704 if (nxt_slow_path(res != NXT_OK)) {
705 return;
706 }
707
708 nxt_fd_close(msg->fd[1]);
709 msg->fd[1] = -1;
710 }
711 }
712
713 if (msg->port_msg.stream != 0) {
714 nxt_port_rpc_handler(task, msg);
715 return;
716 }
717
718 nxt_debug(task, "new port id %d (%d)", port->id, port->type);
719
720 /*
721 * Port with "id == 0" is application 'main' port and it always
722 * should come with non-zero stream.
723 */
724 nxt_assert(port->id != 0);
725
726 /* Find 'main' app port and get app reference. */
727 rt = task->thread->runtime;
728
729 /*
730 * It is safe to access 'runtime->ports' hash because 'NEW_PORT'
731 * sent to main port (with id == 0) and processed in main thread.
732 */
733 main_app_port = nxt_port_hash_find(&rt->ports, port->pid, 0);
734 nxt_assert(main_app_port != NULL);
735
736 app = main_app_port->app;
737
738 if (nxt_fast_path(app != NULL)) {
739 nxt_thread_mutex_lock(&app->mutex);
740
741 /* TODO here should be find-and-add code because there can be
742 port waiters in port_hash */
743 nxt_port_hash_add(&app->port_hash, port);
744 app->port_hash_count++;
745
746 nxt_thread_mutex_unlock(&app->mutex);
747
748 port->app = app;
749 }
750
751 port->main_app_port = main_app_port;
752
753 nxt_port_socket_write(task, port, NXT_PORT_MSG_PORT_ACK, -1, 0, 0, NULL);
754 }
755
756
757 static void
nxt_router_conf_data_handler(nxt_task_t * task,nxt_port_recv_msg_t * msg)758 nxt_router_conf_data_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
759 {
760 void *p;
761 size_t size;
762 nxt_int_t ret;
763 nxt_port_t *port;
764 nxt_router_temp_conf_t *tmcf;
765
766 port = nxt_runtime_port_find(task->thread->runtime,
767 msg->port_msg.pid,
768 msg->port_msg.reply_port);
769 if (nxt_slow_path(port == NULL)) {
770 nxt_alert(task, "conf_data_handler: reply port not found");
771 return;
772 }
773
774 p = MAP_FAILED;
775
776 /*
777 * Ancient compilers like gcc 4.8.5 on CentOS 7 wants 'size' to be
778 * initialized in 'cleanup' section.
779 */
780 size = 0;
781
782 tmcf = nxt_router_temp_conf(task);
783 if (nxt_slow_path(tmcf == NULL)) {
784 goto fail;
785 }
786
787 if (nxt_slow_path(msg->fd[0] == -1)) {
788 nxt_alert(task, "conf_data_handler: invalid shm fd");
789 goto fail;
790 }
791
792 if (nxt_buf_mem_used_size(&msg->buf->mem) != sizeof(size_t)) {
793 nxt_alert(task, "conf_data_handler: unexpected buffer size (%d)",
794 (int) nxt_buf_mem_used_size(&msg->buf->mem));
795 goto fail;
796 }
797
798 nxt_memcpy(&size, msg->buf->mem.pos, sizeof(size_t));
799
800 p = nxt_mem_mmap(NULL, size, PROT_READ, MAP_SHARED, msg->fd[0], 0);
801
802 nxt_fd_close(msg->fd[0]);
803 msg->fd[0] = -1;
804
805 if (nxt_slow_path(p == MAP_FAILED)) {
806 goto fail;
807 }
808
809 nxt_debug(task, "conf_data_handler(%uz): %*s", size, size, p);
810
811 tmcf->router_conf->router = nxt_router;
812 tmcf->stream = msg->port_msg.stream;
813 tmcf->port = port;
814
815 nxt_port_use(task, tmcf->port, 1);
816
817 ret = nxt_router_conf_create(task, tmcf, p, nxt_pointer_to(p, size));
818
819 if (nxt_fast_path(ret == NXT_OK)) {
820 nxt_router_conf_apply(task, tmcf, NULL);
821
822 } else {
823 nxt_router_conf_error(task, tmcf);
824 }
825
826 goto cleanup;
827
828 fail:
829
830 nxt_port_socket_write(task, port, NXT_PORT_MSG_RPC_ERROR, -1,
831 msg->port_msg.stream, 0, NULL);
832
833 if (tmcf != NULL) {
834 nxt_mp_release(tmcf->mem_pool);
835 }
836
837 cleanup:
838
839 if (p != MAP_FAILED) {
840 nxt_mem_munmap(p, size);
841 }
842
843 if (msg->fd[0] != -1) {
844 nxt_fd_close(msg->fd[0]);
845 msg->fd[0] = -1;
846 }
847 }
848
849
850 static void
nxt_router_app_restart_handler(nxt_task_t * task,nxt_port_recv_msg_t * msg)851 nxt_router_app_restart_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
852 {
853 nxt_app_t *app;
854 nxt_int_t ret;
855 nxt_str_t app_name;
856 nxt_port_t *reply_port, *shared_port, *old_shared_port;
857 nxt_port_t *proto_port;
858 nxt_port_msg_type_t reply;
859
860 reply_port = nxt_runtime_port_find(task->thread->runtime,
861 msg->port_msg.pid,
862 msg->port_msg.reply_port);
863 if (nxt_slow_path(reply_port == NULL)) {
864 nxt_alert(task, "app_restart_handler: reply port not found");
865 return;
866 }
867
868 app_name.length = nxt_buf_mem_used_size(&msg->buf->mem);
869 app_name.start = msg->buf->mem.pos;
870
871 nxt_debug(task, "app_restart_handler: %V", &app_name);
872
873 app = nxt_router_app_find(&nxt_router->apps, &app_name);
874
875 if (nxt_fast_path(app != NULL)) {
876 shared_port = nxt_port_new(task, NXT_SHARED_PORT_ID, nxt_pid,
877 NXT_PROCESS_APP);
878 if (nxt_slow_path(shared_port == NULL)) {
879 goto fail;
880 }
881
882 ret = nxt_port_socket_init(task, shared_port, 0);
883 if (nxt_slow_path(ret != NXT_OK)) {
884 nxt_port_use(task, shared_port, -1);
885 goto fail;
886 }
887
888 ret = nxt_router_app_queue_init(task, shared_port);
889 if (nxt_slow_path(ret != NXT_OK)) {
890 nxt_port_write_close(shared_port);
891 nxt_port_read_close(shared_port);
892 nxt_port_use(task, shared_port, -1);
893 goto fail;
894 }
895
896 nxt_port_write_enable(task, shared_port);
897
898 nxt_thread_mutex_lock(&app->mutex);
899
900 proto_port = app->proto_port;
901
902 if (proto_port != NULL) {
903 nxt_debug(task, "send QUIT to prototype '%V' pid %PI", &app->name,
904 proto_port->pid);
905
906 app->proto_port = NULL;
907 proto_port->app = NULL;
908 }
909
910 app->generation++;
911
912 shared_port->app = app;
913
914 old_shared_port = app->shared_port;
915 old_shared_port->app = NULL;
916
917 app->shared_port = shared_port;
918
919 nxt_thread_mutex_unlock(&app->mutex);
920
921 nxt_port_close(task, old_shared_port);
922 nxt_port_use(task, old_shared_port, -1);
923
924 if (proto_port != NULL) {
925 (void) nxt_port_socket_write(task, proto_port, NXT_PORT_MSG_QUIT,
926 -1, 0, 0, NULL);
927
928 nxt_port_close(task, proto_port);
929
930 nxt_port_use(task, proto_port, -1);
931 }
932
933 reply = NXT_PORT_MSG_RPC_READY_LAST;
934
935 } else {
936
937 fail:
938
939 reply = NXT_PORT_MSG_RPC_ERROR;
940 }
941
942 nxt_port_socket_write(task, reply_port, reply, -1, msg->port_msg.stream,
943 0, NULL);
944 }
945
946
947 static void
nxt_router_app_process_remove_pid(nxt_task_t * task,nxt_port_t * port,void * data)948 nxt_router_app_process_remove_pid(nxt_task_t *task, nxt_port_t *port,
949 void *data)
950 {
951 union {
952 nxt_pid_t removed_pid;
953 void *data;
954 } u;
955
956 u.data = data;
957
958 nxt_port_rpc_remove_peer(task, port, u.removed_pid);
959 }
960
961
962 static void
nxt_router_remove_pid_handler(nxt_task_t * task,nxt_port_recv_msg_t * msg)963 nxt_router_remove_pid_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
964 {
965 nxt_event_engine_t *engine;
966
967 nxt_port_remove_pid_handler(task, msg);
968
969 nxt_queue_each(engine, &nxt_router->engines, nxt_event_engine_t, link0)
970 {
971 if (nxt_fast_path(engine->port != NULL)) {
972 nxt_port_post(task, engine->port, nxt_router_app_process_remove_pid,
973 msg->u.data);
974 }
975 }
976 nxt_queue_loop;
977
978 if (msg->port_msg.stream == 0) {
979 return;
980 }
981
982 msg->port_msg.type = _NXT_PORT_MSG_RPC_ERROR;
983
984 nxt_port_rpc_handler(task, msg);
985 }
986
987
988 static nxt_router_temp_conf_t *
nxt_router_temp_conf(nxt_task_t * task)989 nxt_router_temp_conf(nxt_task_t *task)
990 {
991 nxt_mp_t *mp, *tmp;
992 nxt_router_conf_t *rtcf;
993 nxt_router_temp_conf_t *tmcf;
994
995 mp = nxt_mp_create(1024, 128, 256, 32);
996 if (nxt_slow_path(mp == NULL)) {
997 return NULL;
998 }
999
1000 rtcf = nxt_mp_zget(mp, sizeof(nxt_router_conf_t));
1001 if (nxt_slow_path(rtcf == NULL)) {
1002 goto fail;
1003 }
1004
1005 rtcf->mem_pool = mp;
1006
1007 tmp = nxt_mp_create(1024, 128, 256, 32);
1008 if (nxt_slow_path(tmp == NULL)) {
1009 goto fail;
1010 }
1011
1012 tmcf = nxt_mp_zget(tmp, sizeof(nxt_router_temp_conf_t));
1013 if (nxt_slow_path(tmcf == NULL)) {
1014 goto temp_fail;
1015 }
1016
1017 tmcf->mem_pool = tmp;
1018 tmcf->router_conf = rtcf;
1019 tmcf->count = 1;
1020 tmcf->engine = task->thread->engine;
1021
1022 tmcf->engines = nxt_array_create(tmcf->mem_pool, 4,
1023 sizeof(nxt_router_engine_conf_t));
1024 if (nxt_slow_path(tmcf->engines == NULL)) {
1025 goto temp_fail;
1026 }
1027
1028 nxt_queue_init(&creating_sockets);
1029 nxt_queue_init(&pending_sockets);
1030 nxt_queue_init(&updating_sockets);
1031 nxt_queue_init(&keeping_sockets);
1032 nxt_queue_init(&deleting_sockets);
1033
1034 #if (NXT_TLS)
1035 nxt_queue_init(&tmcf->tls);
1036 #endif
1037
1038 nxt_queue_init(&tmcf->apps);
1039 nxt_queue_init(&tmcf->previous);
1040
1041 return tmcf;
1042
1043 temp_fail:
1044
1045 nxt_mp_destroy(tmp);
1046
1047 fail:
1048
1049 nxt_mp_destroy(mp);
1050
1051 return NULL;
1052 }
1053
1054
1055 nxt_inline nxt_bool_t
nxt_router_app_can_start(nxt_app_t * app)1056 nxt_router_app_can_start(nxt_app_t *app)
1057 {
1058 return app->processes + app->pending_processes < app->max_processes
1059 && app->pending_processes < app->max_pending_processes;
1060 }
1061
1062
1063 nxt_inline nxt_bool_t
nxt_router_app_need_start(nxt_app_t * app)1064 nxt_router_app_need_start(nxt_app_t *app)
1065 {
1066 return (app->active_requests
1067 > app->port_hash_count + app->pending_processes)
1068 || (app->spare_processes
1069 > app->idle_processes + app->pending_processes);
1070 }
1071
1072
1073 static void
nxt_router_conf_apply(nxt_task_t * task,void * obj,void * data)1074 nxt_router_conf_apply(nxt_task_t *task, void *obj, void *data)
1075 {
1076 nxt_int_t ret;
1077 nxt_app_t *app;
1078 nxt_router_t *router;
1079 nxt_runtime_t *rt;
1080 nxt_queue_link_t *qlk;
1081 nxt_socket_conf_t *skcf;
1082 nxt_router_conf_t *rtcf;
1083 nxt_router_temp_conf_t *tmcf;
1084 const nxt_event_interface_t *interface;
1085 #if (NXT_TLS)
1086 nxt_router_tlssock_t *tls;
1087 #endif
1088
1089 tmcf = obj;
1090
1091 qlk = nxt_queue_first(&pending_sockets);
1092
1093 if (qlk != nxt_queue_tail(&pending_sockets)) {
1094 nxt_queue_remove(qlk);
1095 nxt_queue_insert_tail(&creating_sockets, qlk);
1096
1097 skcf = nxt_queue_link_data(qlk, nxt_socket_conf_t, link);
1098
1099 nxt_router_listen_socket_rpc_create(task, tmcf, skcf);
1100
1101 return;
1102 }
1103
1104 #if (NXT_TLS)
1105 qlk = nxt_queue_last(&tmcf->tls);
1106
1107 if (qlk != nxt_queue_head(&tmcf->tls)) {
1108 nxt_queue_remove(qlk);
1109
1110 tls = nxt_queue_link_data(qlk, nxt_router_tlssock_t, link);
1111
1112 nxt_cert_store_get(task, &tls->name, tmcf->mem_pool,
1113 nxt_router_tls_rpc_handler, tls);
1114 return;
1115 }
1116 #endif
1117
1118 nxt_queue_each(app, &tmcf->apps, nxt_app_t, link) {
1119
1120 if (nxt_router_app_need_start(app)) {
1121 nxt_router_app_rpc_create(task, tmcf, app);
1122 return;
1123 }
1124
1125 } nxt_queue_loop;
1126
1127 rtcf = tmcf->router_conf;
1128
1129 if (rtcf->access_log != NULL && rtcf->access_log->fd == -1) {
1130 nxt_router_access_log_open(task, tmcf);
1131 return;
1132 }
1133
1134 rt = task->thread->runtime;
1135
1136 interface = nxt_service_get(rt->services, "engine", NULL);
1137
1138 router = rtcf->router;
1139
1140 ret = nxt_router_engines_create(task, router, tmcf, interface);
1141 if (nxt_slow_path(ret != NXT_OK)) {
1142 goto fail;
1143 }
1144
1145 ret = nxt_router_threads_create(task, rt, tmcf);
1146 if (nxt_slow_path(ret != NXT_OK)) {
1147 goto fail;
1148 }
1149
1150 nxt_router_apps_sort(task, router, tmcf);
1151
1152 nxt_router_apps_hash_use(task, rtcf, 1);
1153
1154 nxt_router_engines_post(router, tmcf);
1155
1156 nxt_queue_add(&router->sockets, &updating_sockets);
1157 nxt_queue_add(&router->sockets, &creating_sockets);
1158
1159 if (router->access_log != rtcf->access_log) {
1160 nxt_router_access_log_use(&router->lock, rtcf->access_log);
1161
1162 nxt_router_access_log_release(task, &router->lock, router->access_log);
1163
1164 router->access_log = rtcf->access_log;
1165 }
1166
1167 nxt_router_conf_ready(task, tmcf);
1168
1169 return;
1170
1171 fail:
1172
1173 nxt_router_conf_error(task, tmcf);
1174
1175 return;
1176 }
1177
1178
1179 static void
nxt_router_conf_wait(nxt_task_t * task,void * obj,void * data)1180 nxt_router_conf_wait(nxt_task_t *task, void *obj, void *data)
1181 {
1182 nxt_joint_job_t *job;
1183
1184 job = obj;
1185
1186 nxt_router_conf_ready(task, job->tmcf);
1187 }
1188
1189
1190 static void
nxt_router_conf_ready(nxt_task_t * task,nxt_router_temp_conf_t * tmcf)1191 nxt_router_conf_ready(nxt_task_t *task, nxt_router_temp_conf_t *tmcf)
1192 {
1193 uint32_t count;
1194 nxt_router_conf_t *rtcf;
1195 nxt_thread_spinlock_t *lock;
1196
1197 nxt_debug(task, "temp conf %p count: %D", tmcf, tmcf->count);
1198
1199 if (--tmcf->count > 0) {
1200 return;
1201 }
1202
1203 nxt_router_conf_send(task, tmcf, NXT_PORT_MSG_RPC_READY_LAST);
1204
1205 rtcf = tmcf->router_conf;
1206
1207 lock = &rtcf->router->lock;
1208
1209 nxt_thread_spin_lock(lock);
1210
1211 count = rtcf->count;
1212
1213 nxt_thread_spin_unlock(lock);
1214
1215 nxt_debug(task, "rtcf %p: %D", rtcf, count);
1216
1217 if (count == 0) {
1218 nxt_router_apps_hash_use(task, rtcf, -1);
1219
1220 nxt_router_access_log_release(task, lock, rtcf->access_log);
1221
1222 nxt_mp_destroy(rtcf->mem_pool);
1223 }
1224
1225 nxt_mp_release(tmcf->mem_pool);
1226 }
1227
1228
1229 static void
nxt_router_conf_error(nxt_task_t * task,nxt_router_temp_conf_t * tmcf)1230 nxt_router_conf_error(nxt_task_t *task, nxt_router_temp_conf_t *tmcf)
1231 {
1232 nxt_app_t *app;
1233 nxt_socket_t s;
1234 nxt_router_t *router;
1235 nxt_queue_link_t *qlk;
1236 nxt_socket_conf_t *skcf;
1237 nxt_router_conf_t *rtcf;
1238
1239 nxt_alert(task, "failed to apply new conf");
1240
1241 for (qlk = nxt_queue_first(&creating_sockets);
1242 qlk != nxt_queue_tail(&creating_sockets);
1243 qlk = nxt_queue_next(qlk))
1244 {
1245 skcf = nxt_queue_link_data(qlk, nxt_socket_conf_t, link);
1246 s = skcf->listen->socket;
1247
1248 if (s != -1) {
1249 nxt_socket_close(task, s);
1250 }
1251
1252 nxt_free(skcf->listen);
1253 }
1254
1255 rtcf = tmcf->router_conf;
1256
1257 nxt_queue_each(app, &tmcf->apps, nxt_app_t, link) {
1258
1259 nxt_router_app_unlink(task, app);
1260
1261 } nxt_queue_loop;
1262
1263 router = rtcf->router;
1264
1265 nxt_queue_add(&router->sockets, &keeping_sockets);
1266 nxt_queue_add(&router->sockets, &deleting_sockets);
1267
1268 nxt_queue_add(&router->apps, &tmcf->previous);
1269
1270 // TODO: new engines and threads
1271
1272 nxt_router_access_log_release(task, &router->lock, rtcf->access_log);
1273
1274 nxt_mp_destroy(rtcf->mem_pool);
1275
1276 nxt_router_conf_send(task, tmcf, NXT_PORT_MSG_RPC_ERROR);
1277
1278 nxt_mp_release(tmcf->mem_pool);
1279 }
1280
1281
1282 static void
nxt_router_conf_send(nxt_task_t * task,nxt_router_temp_conf_t * tmcf,nxt_port_msg_type_t type)1283 nxt_router_conf_send(nxt_task_t *task, nxt_router_temp_conf_t *tmcf,
1284 nxt_port_msg_type_t type)
1285 {
1286 nxt_port_socket_write(task, tmcf->port, type, -1, tmcf->stream, 0, NULL);
1287
1288 nxt_port_use(task, tmcf->port, -1);
1289
1290 tmcf->port = NULL;
1291 }
1292
1293
1294 static nxt_conf_map_t nxt_router_conf[] = {
1295 {
1296 nxt_string("listeners_threads"),
1297 NXT_CONF_MAP_INT32,
1298 offsetof(nxt_router_conf_t, threads),
1299 },
1300 };
1301
1302
1303 static nxt_conf_map_t nxt_router_app_conf[] = {
1304 {
1305 nxt_string("type"),
1306 NXT_CONF_MAP_STR,
1307 offsetof(nxt_router_app_conf_t, type),
1308 },
1309
1310 {
1311 nxt_string("limits"),
1312 NXT_CONF_MAP_PTR,
1313 offsetof(nxt_router_app_conf_t, limits_value),
1314 },
1315
1316 {
1317 nxt_string("processes"),
1318 NXT_CONF_MAP_INT32,
1319 offsetof(nxt_router_app_conf_t, processes),
1320 },
1321
1322 {
1323 nxt_string("processes"),
1324 NXT_CONF_MAP_PTR,
1325 offsetof(nxt_router_app_conf_t, processes_value),
1326 },
1327
1328 {
1329 nxt_string("targets"),
1330 NXT_CONF_MAP_PTR,
1331 offsetof(nxt_router_app_conf_t, targets_value),
1332 },
1333 };
1334
1335
1336 static nxt_conf_map_t nxt_router_app_limits_conf[] = {
1337 {
1338 nxt_string("timeout"),
1339 NXT_CONF_MAP_MSEC,
1340 offsetof(nxt_router_app_conf_t, timeout),
1341 },
1342 };
1343
1344
1345 static nxt_conf_map_t nxt_router_app_processes_conf[] = {
1346 {
1347 nxt_string("spare"),
1348 NXT_CONF_MAP_INT32,
1349 offsetof(nxt_router_app_conf_t, spare_processes),
1350 },
1351
1352 {
1353 nxt_string("max"),
1354 NXT_CONF_MAP_INT32,
1355 offsetof(nxt_router_app_conf_t, max_processes),
1356 },
1357
1358 {
1359 nxt_string("idle_timeout"),
1360 NXT_CONF_MAP_MSEC,
1361 offsetof(nxt_router_app_conf_t, idle_timeout),
1362 },
1363 };
1364
1365
1366 static nxt_conf_map_t nxt_router_listener_conf[] = {
1367 {
1368 nxt_string("pass"),
1369 NXT_CONF_MAP_STR_COPY,
1370 offsetof(nxt_router_listener_conf_t, pass),
1371 },
1372
1373 {
1374 nxt_string("application"),
1375 NXT_CONF_MAP_STR_COPY,
1376 offsetof(nxt_router_listener_conf_t, application),
1377 },
1378 };
1379
1380
1381 static nxt_conf_map_t nxt_router_http_conf[] = {
1382 {
1383 nxt_string("header_buffer_size"),
1384 NXT_CONF_MAP_SIZE,
1385 offsetof(nxt_socket_conf_t, header_buffer_size),
1386 },
1387
1388 {
1389 nxt_string("large_header_buffer_size"),
1390 NXT_CONF_MAP_SIZE,
1391 offsetof(nxt_socket_conf_t, large_header_buffer_size),
1392 },
1393
1394 {
1395 nxt_string("large_header_buffers"),
1396 NXT_CONF_MAP_SIZE,
1397 offsetof(nxt_socket_conf_t, large_header_buffers),
1398 },
1399
1400 {
1401 nxt_string("body_buffer_size"),
1402 NXT_CONF_MAP_SIZE,
1403 offsetof(nxt_socket_conf_t, body_buffer_size),
1404 },
1405
1406 {
1407 nxt_string("max_body_size"),
1408 NXT_CONF_MAP_SIZE,
1409 offsetof(nxt_socket_conf_t, max_body_size),
1410 },
1411
1412 {
1413 nxt_string("idle_timeout"),
1414 NXT_CONF_MAP_MSEC,
1415 offsetof(nxt_socket_conf_t, idle_timeout),
1416 },
1417
1418 {
1419 nxt_string("header_read_timeout"),
1420 NXT_CONF_MAP_MSEC,
1421 offsetof(nxt_socket_conf_t, header_read_timeout),
1422 },
1423
1424 {
1425 nxt_string("body_read_timeout"),
1426 NXT_CONF_MAP_MSEC,
1427 offsetof(nxt_socket_conf_t, body_read_timeout),
1428 },
1429
1430 {
1431 nxt_string("send_timeout"),
1432 NXT_CONF_MAP_MSEC,
1433 offsetof(nxt_socket_conf_t, send_timeout),
1434 },
1435
1436 {
1437 nxt_string("body_temp_path"),
1438 NXT_CONF_MAP_STR,
1439 offsetof(nxt_socket_conf_t, body_temp_path),
1440 },
1441
1442 {
1443 nxt_string("discard_unsafe_fields"),
1444 NXT_CONF_MAP_INT8,
1445 offsetof(nxt_socket_conf_t, discard_unsafe_fields),
1446 },
1447 };
1448
1449
1450 static nxt_conf_map_t nxt_router_websocket_conf[] = {
1451 {
1452 nxt_string("max_frame_size"),
1453 NXT_CONF_MAP_SIZE,
1454 offsetof(nxt_websocket_conf_t, max_frame_size),
1455 },
1456
1457 {
1458 nxt_string("read_timeout"),
1459 NXT_CONF_MAP_MSEC,
1460 offsetof(nxt_websocket_conf_t, read_timeout),
1461 },
1462
1463 {
1464 nxt_string("keepalive_interval"),
1465 NXT_CONF_MAP_MSEC,
1466 offsetof(nxt_websocket_conf_t, keepalive_interval),
1467 },
1468
1469 };
1470
1471
1472 static nxt_int_t
nxt_router_conf_create(nxt_task_t * task,nxt_router_temp_conf_t * tmcf,u_char * start,u_char * end)1473 nxt_router_conf_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf,
1474 u_char *start, u_char *end)
1475 {
1476 u_char *p;
1477 size_t size;
1478 nxt_mp_t *mp, *app_mp;
1479 uint32_t next, next_target;
1480 nxt_int_t ret;
1481 nxt_str_t name, path, target;
1482 nxt_app_t *app, *prev;
1483 nxt_str_t *t, *s, *targets;
1484 nxt_uint_t n, i;
1485 nxt_port_t *port;
1486 nxt_router_t *router;
1487 nxt_app_joint_t *app_joint;
1488 #if (NXT_TLS)
1489 nxt_tls_init_t *tls_init;
1490 nxt_conf_value_t *certificate;
1491 #endif
1492 nxt_conf_value_t *root, *conf, *http, *value, *websocket;
1493 nxt_conf_value_t *applications, *application;
1494 nxt_conf_value_t *listeners, *listener;
1495 nxt_socket_conf_t *skcf;
1496 nxt_router_conf_t *rtcf;
1497 nxt_http_routes_t *routes;
1498 nxt_event_engine_t *engine;
1499 nxt_app_lang_module_t *lang;
1500 nxt_router_app_conf_t apcf;
1501 nxt_router_access_log_t *access_log;
1502 nxt_router_listener_conf_t lscf;
1503
1504 static nxt_str_t http_path = nxt_string("/settings/http");
1505 static nxt_str_t applications_path = nxt_string("/applications");
1506 static nxt_str_t listeners_path = nxt_string("/listeners");
1507 static nxt_str_t routes_path = nxt_string("/routes");
1508 static nxt_str_t access_log_path = nxt_string("/access_log");
1509 #if (NXT_TLS)
1510 static nxt_str_t certificate_path = nxt_string("/tls/certificate");
1511 static nxt_str_t conf_commands_path = nxt_string("/tls/conf_commands");
1512 static nxt_str_t conf_cache_path = nxt_string("/tls/session/cache_size");
1513 static nxt_str_t conf_timeout_path = nxt_string("/tls/session/timeout");
1514 static nxt_str_t conf_tickets = nxt_string("/tls/session/tickets");
1515 #endif
1516 static nxt_str_t static_path = nxt_string("/settings/http/static");
1517 static nxt_str_t websocket_path = nxt_string("/settings/http/websocket");
1518 static nxt_str_t forwarded_path = nxt_string("/forwarded");
1519 static nxt_str_t client_ip_path = nxt_string("/client_ip");
1520
1521 root = nxt_conf_json_parse(tmcf->mem_pool, start, end, NULL);
1522 if (root == NULL) {
1523 nxt_alert(task, "configuration parsing error");
1524 return NXT_ERROR;
1525 }
1526
1527 rtcf = tmcf->router_conf;
1528 mp = rtcf->mem_pool;
1529
1530 ret = nxt_conf_map_object(mp, root, nxt_router_conf,
1531 nxt_nitems(nxt_router_conf), rtcf);
1532 if (ret != NXT_OK) {
1533 nxt_alert(task, "root map error");
1534 return NXT_ERROR;
1535 }
1536
1537 if (rtcf->threads == 0) {
1538 rtcf->threads = nxt_ncpu;
1539 }
1540
1541 conf = nxt_conf_get_path(root, &static_path);
1542
1543 ret = nxt_router_conf_process_static(task, rtcf, conf);
1544 if (nxt_slow_path(ret != NXT_OK)) {
1545 return NXT_ERROR;
1546 }
1547
1548 router = rtcf->router;
1549
1550 applications = nxt_conf_get_path(root, &applications_path);
1551
1552 if (applications != NULL) {
1553 next = 0;
1554
1555 for ( ;; ) {
1556 application = nxt_conf_next_object_member(applications,
1557 &name, &next);
1558 if (application == NULL) {
1559 break;
1560 }
1561
1562 nxt_debug(task, "application \"%V\"", &name);
1563
1564 size = nxt_conf_json_length(application, NULL);
1565
1566 app_mp = nxt_mp_create(4096, 128, 1024, 64);
1567 if (nxt_slow_path(app_mp == NULL)) {
1568 goto fail;
1569 }
1570
1571 app = nxt_mp_get(app_mp, sizeof(nxt_app_t) + name.length + size);
1572 if (app == NULL) {
1573 goto app_fail;
1574 }
1575
1576 nxt_memzero(app, sizeof(nxt_app_t));
1577
1578 app->mem_pool = app_mp;
1579
1580 app->name.start = nxt_pointer_to(app, sizeof(nxt_app_t));
1581 app->conf.start = nxt_pointer_to(app, sizeof(nxt_app_t)
1582 + name.length);
1583
1584 p = nxt_conf_json_print(app->conf.start, application, NULL);
1585 app->conf.length = p - app->conf.start;
1586
1587 nxt_assert(app->conf.length <= size);
1588
1589 nxt_debug(task, "application conf \"%V\"", &app->conf);
1590
1591 prev = nxt_router_app_find(&router->apps, &name);
1592
1593 if (prev != NULL && nxt_strstr_eq(&app->conf, &prev->conf)) {
1594 nxt_mp_destroy(app_mp);
1595
1596 nxt_queue_remove(&prev->link);
1597 nxt_queue_insert_tail(&tmcf->previous, &prev->link);
1598
1599 ret = nxt_router_apps_hash_add(rtcf, prev);
1600 if (nxt_slow_path(ret != NXT_OK)) {
1601 goto fail;
1602 }
1603
1604 continue;
1605 }
1606
1607 apcf.processes = 1;
1608 apcf.max_processes = 1;
1609 apcf.spare_processes = 0;
1610 apcf.timeout = 0;
1611 apcf.idle_timeout = 15000;
1612 apcf.limits_value = NULL;
1613 apcf.processes_value = NULL;
1614 apcf.targets_value = NULL;
1615
1616 app_joint = nxt_malloc(sizeof(nxt_app_joint_t));
1617 if (nxt_slow_path(app_joint == NULL)) {
1618 goto app_fail;
1619 }
1620
1621 nxt_memzero(app_joint, sizeof(nxt_app_joint_t));
1622
1623 ret = nxt_conf_map_object(mp, application, nxt_router_app_conf,
1624 nxt_nitems(nxt_router_app_conf), &apcf);
1625 if (ret != NXT_OK) {
1626 nxt_alert(task, "application map error");
1627 goto app_fail;
1628 }
1629
1630 if (apcf.limits_value != NULL) {
1631
1632 if (nxt_conf_type(apcf.limits_value) != NXT_CONF_OBJECT) {
1633 nxt_alert(task, "application limits is not object");
1634 goto app_fail;
1635 }
1636
1637 ret = nxt_conf_map_object(mp, apcf.limits_value,
1638 nxt_router_app_limits_conf,
1639 nxt_nitems(nxt_router_app_limits_conf),
1640 &apcf);
1641 if (ret != NXT_OK) {
1642 nxt_alert(task, "application limits map error");
1643 goto app_fail;
1644 }
1645 }
1646
1647 if (apcf.processes_value != NULL
1648 && nxt_conf_type(apcf.processes_value) == NXT_CONF_OBJECT)
1649 {
1650 ret = nxt_conf_map_object(mp, apcf.processes_value,
1651 nxt_router_app_processes_conf,
1652 nxt_nitems(nxt_router_app_processes_conf),
1653 &apcf);
1654 if (ret != NXT_OK) {
1655 nxt_alert(task, "application processes map error");
1656 goto app_fail;
1657 }
1658
1659 } else {
1660 apcf.max_processes = apcf.processes;
1661 apcf.spare_processes = apcf.processes;
1662 }
1663
1664 if (apcf.targets_value != NULL) {
1665 n = nxt_conf_object_members_count(apcf.targets_value);
1666
1667 targets = nxt_mp_get(app_mp, sizeof(nxt_str_t) * n);
1668 if (nxt_slow_path(targets == NULL)) {
1669 goto app_fail;
1670 }
1671
1672 next_target = 0;
1673
1674 for (i = 0; i < n; i++) {
1675 (void) nxt_conf_next_object_member(apcf.targets_value,
1676 &target, &next_target);
1677
1678 s = nxt_str_dup(app_mp, &targets[i], &target);
1679 if (nxt_slow_path(s == NULL)) {
1680 goto app_fail;
1681 }
1682 }
1683
1684 } else {
1685 targets = NULL;
1686 }
1687
1688 nxt_debug(task, "application type: %V", &apcf.type);
1689 nxt_debug(task, "application processes: %D", apcf.processes);
1690 nxt_debug(task, "application request timeout: %M", apcf.timeout);
1691
1692 lang = nxt_app_lang_module(task->thread->runtime, &apcf.type);
1693
1694 if (lang == NULL) {
1695 nxt_alert(task, "unknown application type: \"%V\"", &apcf.type);
1696 goto app_fail;
1697 }
1698
1699 nxt_debug(task, "application language module: \"%s\"", lang->file);
1700
1701 ret = nxt_thread_mutex_create(&app->mutex);
1702 if (ret != NXT_OK) {
1703 goto app_fail;
1704 }
1705
1706 nxt_queue_init(&app->ports);
1707 nxt_queue_init(&app->spare_ports);
1708 nxt_queue_init(&app->idle_ports);
1709 nxt_queue_init(&app->ack_waiting_req);
1710
1711 app->name.length = name.length;
1712 nxt_memcpy(app->name.start, name.start, name.length);
1713
1714 app->type = lang->type;
1715 app->max_processes = apcf.max_processes;
1716 app->spare_processes = apcf.spare_processes;
1717 app->max_pending_processes = apcf.spare_processes
1718 ? apcf.spare_processes : 1;
1719 app->timeout = apcf.timeout;
1720 app->idle_timeout = apcf.idle_timeout;
1721
1722 app->targets = targets;
1723
1724 engine = task->thread->engine;
1725
1726 app->engine = engine;
1727
1728 app->adjust_idle_work.handler = nxt_router_adjust_idle_timer;
1729 app->adjust_idle_work.task = &engine->task;
1730 app->adjust_idle_work.obj = app;
1731
1732 nxt_queue_insert_tail(&tmcf->apps, &app->link);
1733
1734 ret = nxt_router_apps_hash_add(rtcf, app);
1735 if (nxt_slow_path(ret != NXT_OK)) {
1736 goto app_fail;
1737 }
1738
1739 nxt_router_app_use(task, app, 1);
1740
1741 app->joint = app_joint;
1742
1743 app_joint->use_count = 1;
1744 app_joint->app = app;
1745
1746 app_joint->idle_timer.bias = NXT_TIMER_DEFAULT_BIAS;
1747 app_joint->idle_timer.work_queue = &engine->fast_work_queue;
1748 app_joint->idle_timer.handler = nxt_router_app_idle_timeout;
1749 app_joint->idle_timer.task = &engine->task;
1750 app_joint->idle_timer.log = app_joint->idle_timer.task->log;
1751
1752 app_joint->free_app_work.handler = nxt_router_free_app;
1753 app_joint->free_app_work.task = &engine->task;
1754 app_joint->free_app_work.obj = app_joint;
1755
1756 port = nxt_port_new(task, NXT_SHARED_PORT_ID, nxt_pid,
1757 NXT_PROCESS_APP);
1758 if (nxt_slow_path(port == NULL)) {
1759 return NXT_ERROR;
1760 }
1761
1762 ret = nxt_port_socket_init(task, port, 0);
1763 if (nxt_slow_path(ret != NXT_OK)) {
1764 nxt_port_use(task, port, -1);
1765 return NXT_ERROR;
1766 }
1767
1768 ret = nxt_router_app_queue_init(task, port);
1769 if (nxt_slow_path(ret != NXT_OK)) {
1770 nxt_port_write_close(port);
1771 nxt_port_read_close(port);
1772 nxt_port_use(task, port, -1);
1773 return NXT_ERROR;
1774 }
1775
1776 nxt_port_write_enable(task, port);
1777 port->app = app;
1778
1779 app->shared_port = port;
1780
1781 nxt_thread_mutex_create(&app->outgoing.mutex);
1782 }
1783 }
1784
1785 conf = nxt_conf_get_path(root, &routes_path);
1786 if (nxt_fast_path(conf != NULL)) {
1787 routes = nxt_http_routes_create(task, tmcf, conf);
1788 if (nxt_slow_path(routes == NULL)) {
1789 return NXT_ERROR;
1790 }
1791 rtcf->routes = routes;
1792 }
1793
1794 ret = nxt_upstreams_create(task, tmcf, root);
1795 if (nxt_slow_path(ret != NXT_OK)) {
1796 return ret;
1797 }
1798
1799 http = nxt_conf_get_path(root, &http_path);
1800 #if 0
1801 if (http == NULL) {
1802 nxt_alert(task, "no \"http\" block");
1803 return NXT_ERROR;
1804 }
1805 #endif
1806
1807 websocket = nxt_conf_get_path(root, &websocket_path);
1808
1809 listeners = nxt_conf_get_path(root, &listeners_path);
1810
1811 if (listeners != NULL) {
1812 next = 0;
1813
1814 for ( ;; ) {
1815 listener = nxt_conf_next_object_member(listeners, &name, &next);
1816 if (listener == NULL) {
1817 break;
1818 }
1819
1820 skcf = nxt_router_socket_conf(task, tmcf, &name);
1821 if (skcf == NULL) {
1822 goto fail;
1823 }
1824
1825 nxt_memzero(&lscf, sizeof(lscf));
1826
1827 ret = nxt_conf_map_object(mp, listener, nxt_router_listener_conf,
1828 nxt_nitems(nxt_router_listener_conf),
1829 &lscf);
1830 if (ret != NXT_OK) {
1831 nxt_alert(task, "listener map error");
1832 goto fail;
1833 }
1834
1835 nxt_debug(task, "application: %V", &lscf.application);
1836
1837 // STUB, default values if http block is not defined.
1838 skcf->header_buffer_size = 2048;
1839 skcf->large_header_buffer_size = 8192;
1840 skcf->large_header_buffers = 4;
1841 skcf->discard_unsafe_fields = 1;
1842 skcf->body_buffer_size = 16 * 1024;
1843 skcf->max_body_size = 8 * 1024 * 1024;
1844 skcf->proxy_header_buffer_size = 64 * 1024;
1845 skcf->proxy_buffer_size = 4096;
1846 skcf->proxy_buffers = 256;
1847 skcf->idle_timeout = 180 * 1000;
1848 skcf->header_read_timeout = 30 * 1000;
1849 skcf->body_read_timeout = 30 * 1000;
1850 skcf->send_timeout = 30 * 1000;
1851 skcf->proxy_timeout = 60 * 1000;
1852 skcf->proxy_send_timeout = 30 * 1000;
1853 skcf->proxy_read_timeout = 30 * 1000;
1854
1855 skcf->websocket_conf.max_frame_size = 1024 * 1024;
1856 skcf->websocket_conf.read_timeout = 60 * 1000;
1857 skcf->websocket_conf.keepalive_interval = 30 * 1000;
1858
1859 nxt_str_null(&skcf->body_temp_path);
1860
1861 if (http != NULL) {
1862 ret = nxt_conf_map_object(mp, http, nxt_router_http_conf,
1863 nxt_nitems(nxt_router_http_conf),
1864 skcf);
1865 if (ret != NXT_OK) {
1866 nxt_alert(task, "http map error");
1867 goto fail;
1868 }
1869 }
1870
1871 if (websocket != NULL) {
1872 ret = nxt_conf_map_object(mp, websocket,
1873 nxt_router_websocket_conf,
1874 nxt_nitems(nxt_router_websocket_conf),
1875 &skcf->websocket_conf);
1876 if (ret != NXT_OK) {
1877 nxt_alert(task, "websocket map error");
1878 goto fail;
1879 }
1880 }
1881
1882 t = &skcf->body_temp_path;
1883
1884 if (t->length == 0) {
1885 t->start = (u_char *) task->thread->runtime->tmp;
1886 t->length = nxt_strlen(t->start);
1887 }
1888
1889 conf = nxt_conf_get_path(listener, &forwarded_path);
1890
1891 if (conf != NULL) {
1892 skcf->forwarded = nxt_router_conf_forward(task, mp, conf);
1893 if (nxt_slow_path(skcf->forwarded == NULL)) {
1894 return NXT_ERROR;
1895 }
1896 }
1897
1898 conf = nxt_conf_get_path(listener, &client_ip_path);
1899
1900 if (conf != NULL) {
1901 skcf->client_ip = nxt_router_conf_forward(task, mp, conf);
1902 if (nxt_slow_path(skcf->client_ip == NULL)) {
1903 return NXT_ERROR;
1904 }
1905 }
1906
1907 #if (NXT_TLS)
1908 certificate = nxt_conf_get_path(listener, &certificate_path);
1909
1910 if (certificate != NULL) {
1911 tls_init = nxt_mp_get(tmcf->mem_pool, sizeof(nxt_tls_init_t));
1912 if (nxt_slow_path(tls_init == NULL)) {
1913 return NXT_ERROR;
1914 }
1915
1916 tls_init->cache_size = 0;
1917 tls_init->timeout = 300;
1918
1919 value = nxt_conf_get_path(listener, &conf_cache_path);
1920 if (value != NULL) {
1921 tls_init->cache_size = nxt_conf_get_number(value);
1922 }
1923
1924 value = nxt_conf_get_path(listener, &conf_timeout_path);
1925 if (value != NULL) {
1926 tls_init->timeout = nxt_conf_get_number(value);
1927 }
1928
1929 tls_init->conf_cmds = nxt_conf_get_path(listener,
1930 &conf_commands_path);
1931
1932 tls_init->tickets_conf = nxt_conf_get_path(listener,
1933 &conf_tickets);
1934
1935 n = nxt_conf_array_elements_count_or_1(certificate);
1936
1937 for (i = 0; i < n; i++) {
1938 value = nxt_conf_get_array_element_or_itself(certificate,
1939 i);
1940 nxt_assert(value != NULL);
1941
1942 ret = nxt_router_conf_tls_insert(tmcf, value, skcf,
1943 tls_init, i == 0);
1944 if (nxt_slow_path(ret != NXT_OK)) {
1945 goto fail;
1946 }
1947 }
1948 }
1949 #endif
1950
1951 skcf->listen->handler = nxt_http_conn_init;
1952 skcf->router_conf = rtcf;
1953 skcf->router_conf->count++;
1954
1955 if (lscf.pass.length != 0) {
1956 skcf->action = nxt_http_action_create(task, tmcf, &lscf.pass);
1957
1958 /* COMPATIBILITY: listener application. */
1959 } else if (lscf.application.length > 0) {
1960 skcf->action = nxt_http_pass_application(task, rtcf,
1961 &lscf.application);
1962 }
1963
1964 if (nxt_slow_path(skcf->action == NULL)) {
1965 goto fail;
1966 }
1967 }
1968 }
1969
1970 ret = nxt_http_routes_resolve(task, tmcf);
1971 if (nxt_slow_path(ret != NXT_OK)) {
1972 goto fail;
1973 }
1974
1975 value = nxt_conf_get_path(root, &access_log_path);
1976
1977 if (value != NULL) {
1978 nxt_conf_get_string(value, &path);
1979
1980 access_log = router->access_log;
1981
1982 if (access_log != NULL && nxt_strstr_eq(&path, &access_log->path)) {
1983 nxt_router_access_log_use(&router->lock, access_log);
1984
1985 } else {
1986 access_log = nxt_malloc(sizeof(nxt_router_access_log_t)
1987 + path.length);
1988 if (access_log == NULL) {
1989 nxt_alert(task, "failed to allocate access log structure");
1990 goto fail;
1991 }
1992
1993 access_log->fd = -1;
1994 access_log->handler = &nxt_router_access_log_writer;
1995 access_log->count = 1;
1996
1997 access_log->path.length = path.length;
1998 access_log->path.start = (u_char *) access_log
1999 + sizeof(nxt_router_access_log_t);
2000
2001 nxt_memcpy(access_log->path.start, path.start, path.length);
2002 }
2003
2004 rtcf->access_log = access_log;
2005 }
2006
2007 nxt_queue_add(&deleting_sockets, &router->sockets);
2008 nxt_queue_init(&router->sockets);
2009
2010 return NXT_OK;
2011
2012 app_fail:
2013
2014 nxt_mp_destroy(app_mp);
2015
2016 fail:
2017
2018 nxt_queue_each(app, &tmcf->apps, nxt_app_t, link) {
2019
2020 nxt_queue_remove(&app->link);
2021 nxt_thread_mutex_destroy(&app->mutex);
2022 nxt_mp_destroy(app->mem_pool);
2023
2024 } nxt_queue_loop;
2025
2026 return NXT_ERROR;
2027 }
2028
2029
2030 #if (NXT_TLS)
2031
2032 static nxt_int_t
nxt_router_conf_tls_insert(nxt_router_temp_conf_t * tmcf,nxt_conf_value_t * value,nxt_socket_conf_t * skcf,nxt_tls_init_t * tls_init,nxt_bool_t last)2033 nxt_router_conf_tls_insert(nxt_router_temp_conf_t *tmcf,
2034 nxt_conf_value_t *value, nxt_socket_conf_t *skcf,
2035 nxt_tls_init_t *tls_init, nxt_bool_t last)
2036 {
2037 nxt_router_tlssock_t *tls;
2038
2039 tls = nxt_mp_get(tmcf->mem_pool, sizeof(nxt_router_tlssock_t));
2040 if (nxt_slow_path(tls == NULL)) {
2041 return NXT_ERROR;
2042 }
2043
2044 tls->tls_init = tls_init;
2045 tls->socket_conf = skcf;
2046 tls->temp_conf = tmcf;
2047 tls->last = last;
2048 nxt_conf_get_string(value, &tls->name);
2049
2050 nxt_queue_insert_tail(&tmcf->tls, &tls->link);
2051
2052 return NXT_OK;
2053 }
2054
2055 #endif
2056
2057
2058 static nxt_int_t
nxt_router_conf_process_static(nxt_task_t * task,nxt_router_conf_t * rtcf,nxt_conf_value_t * conf)2059 nxt_router_conf_process_static(nxt_task_t *task, nxt_router_conf_t *rtcf,
2060 nxt_conf_value_t *conf)
2061 {
2062 uint32_t next, i;
2063 nxt_mp_t *mp;
2064 nxt_str_t *type, exten, str;
2065 nxt_int_t ret;
2066 nxt_uint_t exts;
2067 nxt_conf_value_t *mtypes_conf, *ext_conf, *value;
2068
2069 static nxt_str_t mtypes_path = nxt_string("/mime_types");
2070
2071 mp = rtcf->mem_pool;
2072
2073 ret = nxt_http_static_mtypes_init(mp, &rtcf->mtypes_hash);
2074 if (nxt_slow_path(ret != NXT_OK)) {
2075 return NXT_ERROR;
2076 }
2077
2078 if (conf == NULL) {
2079 return NXT_OK;
2080 }
2081
2082 mtypes_conf = nxt_conf_get_path(conf, &mtypes_path);
2083
2084 if (mtypes_conf != NULL) {
2085 next = 0;
2086
2087 for ( ;; ) {
2088 ext_conf = nxt_conf_next_object_member(mtypes_conf, &str, &next);
2089
2090 if (ext_conf == NULL) {
2091 break;
2092 }
2093
2094 type = nxt_str_dup(mp, NULL, &str);
2095 if (nxt_slow_path(type == NULL)) {
2096 return NXT_ERROR;
2097 }
2098
2099 if (nxt_conf_type(ext_conf) == NXT_CONF_STRING) {
2100 nxt_conf_get_string(ext_conf, &str);
2101
2102 if (nxt_slow_path(nxt_str_dup(mp, &exten, &str) == NULL)) {
2103 return NXT_ERROR;
2104 }
2105
2106 ret = nxt_http_static_mtypes_hash_add(mp, &rtcf->mtypes_hash,
2107 &exten, type);
2108 if (nxt_slow_path(ret != NXT_OK)) {
2109 return NXT_ERROR;
2110 }
2111
2112 continue;
2113 }
2114
2115 exts = nxt_conf_array_elements_count(ext_conf);
2116
2117 for (i = 0; i < exts; i++) {
2118 value = nxt_conf_get_array_element(ext_conf, i);
2119
2120 nxt_conf_get_string(value, &str);
2121
2122 if (nxt_slow_path(nxt_str_dup(mp, &exten, &str) == NULL)) {
2123 return NXT_ERROR;
2124 }
2125
2126 ret = nxt_http_static_mtypes_hash_add(mp, &rtcf->mtypes_hash,
2127 &exten, type);
2128 if (nxt_slow_path(ret != NXT_OK)) {
2129 return NXT_ERROR;
2130 }
2131 }
2132 }
2133 }
2134
2135 return NXT_OK;
2136 }
2137
2138
2139 static nxt_http_forward_t *
nxt_router_conf_forward(nxt_task_t * task,nxt_mp_t * mp,nxt_conf_value_t * conf)2140 nxt_router_conf_forward(nxt_task_t *task, nxt_mp_t *mp, nxt_conf_value_t *conf)
2141 {
2142 nxt_int_t ret;
2143 nxt_conf_value_t *header_conf, *client_ip_conf, *protocol_conf;
2144 nxt_conf_value_t *source_conf, *recursive_conf;
2145 nxt_http_forward_t *forward;
2146 nxt_http_route_addr_rule_t *source;
2147
2148 static nxt_str_t header_path = nxt_string("/header");
2149 static nxt_str_t client_ip_path = nxt_string("/client_ip");
2150 static nxt_str_t protocol_path = nxt_string("/protocol");
2151 static nxt_str_t source_path = nxt_string("/source");
2152 static nxt_str_t recursive_path = nxt_string("/recursive");
2153
2154 header_conf = nxt_conf_get_path(conf, &header_path);
2155
2156 if (header_conf != NULL) {
2157 client_ip_conf = nxt_conf_get_path(conf, &header_path);
2158 protocol_conf = NULL;
2159
2160 } else {
2161 client_ip_conf = nxt_conf_get_path(conf, &client_ip_path);
2162 protocol_conf = nxt_conf_get_path(conf, &protocol_path);
2163 }
2164
2165 source_conf = nxt_conf_get_path(conf, &source_path);
2166 recursive_conf = nxt_conf_get_path(conf, &recursive_path);
2167
2168 if (source_conf == NULL
2169 || (protocol_conf == NULL && client_ip_conf == NULL))
2170 {
2171 return NULL;
2172 }
2173
2174 forward = nxt_mp_zget(mp, sizeof(nxt_http_forward_t));
2175 if (nxt_slow_path(forward == NULL)) {
2176 return NULL;
2177 }
2178
2179 source = nxt_http_route_addr_rule_create(task, mp, source_conf);
2180 if (nxt_slow_path(source == NULL)) {
2181 return NULL;
2182 }
2183
2184 forward->source = source;
2185
2186 if (recursive_conf != NULL) {
2187 forward->recursive = nxt_conf_get_boolean(recursive_conf);
2188 }
2189
2190 if (client_ip_conf != NULL) {
2191 ret = nxt_router_conf_forward_header(mp, client_ip_conf,
2192 &forward->client_ip);
2193 if (nxt_slow_path(ret != NXT_OK)) {
2194 return NULL;
2195 }
2196 }
2197
2198 if (protocol_conf != NULL) {
2199 ret = nxt_router_conf_forward_header(mp, protocol_conf,
2200 &forward->protocol);
2201 if (nxt_slow_path(ret != NXT_OK)) {
2202 return NULL;
2203 }
2204 }
2205
2206 return forward;
2207 }
2208
2209
2210 static nxt_int_t
nxt_router_conf_forward_header(nxt_mp_t * mp,nxt_conf_value_t * conf,nxt_http_forward_header_t * fh)2211 nxt_router_conf_forward_header(nxt_mp_t *mp, nxt_conf_value_t *conf,
2212 nxt_http_forward_header_t *fh)
2213 {
2214 char c;
2215 size_t i;
2216 uint32_t hash;
2217 nxt_str_t header;
2218
2219 nxt_conf_get_string(conf, &header);
2220
2221 fh->header = nxt_str_dup(mp, NULL, &header);
2222 if (nxt_slow_path(fh->header == NULL)) {
2223 return NXT_ERROR;
2224 }
2225
2226 hash = NXT_HTTP_FIELD_HASH_INIT;
2227
2228 for (i = 0; i < fh->header->length; i++) {
2229 c = fh->header->start[i];
2230 hash = nxt_http_field_hash_char(hash, nxt_lowcase(c));
2231 }
2232
2233 hash = nxt_http_field_hash_end(hash) & 0xFFFF;
2234
2235 fh->header_hash = hash;
2236
2237 return NXT_OK;
2238 }
2239
2240
2241 static nxt_app_t *
nxt_router_app_find(nxt_queue_t * queue,nxt_str_t * name)2242 nxt_router_app_find(nxt_queue_t *queue, nxt_str_t *name)
2243 {
2244 nxt_app_t *app;
2245
2246 nxt_queue_each(app, queue, nxt_app_t, link) {
2247
2248 if (nxt_strstr_eq(name, &app->name)) {
2249 return app;
2250 }
2251
2252 } nxt_queue_loop;
2253
2254 return NULL;
2255 }
2256
2257
2258 static nxt_int_t
nxt_router_app_queue_init(nxt_task_t * task,nxt_port_t * port)2259 nxt_router_app_queue_init(nxt_task_t *task, nxt_port_t *port)
2260 {
2261 void *mem;
2262 nxt_int_t fd;
2263
2264 fd = nxt_shm_open(task, sizeof(nxt_app_queue_t));
2265 if (nxt_slow_path(fd == -1)) {
2266 return NXT_ERROR;
2267 }
2268
2269 mem = nxt_mem_mmap(NULL, sizeof(nxt_app_queue_t),
2270 PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
2271 if (nxt_slow_path(mem == MAP_FAILED)) {
2272 nxt_fd_close(fd);
2273
2274 return NXT_ERROR;
2275 }
2276
2277 nxt_app_queue_init(mem);
2278
2279 port->queue_fd = fd;
2280 port->queue = mem;
2281
2282 return NXT_OK;
2283 }
2284
2285
2286 static nxt_int_t
nxt_router_port_queue_init(nxt_task_t * task,nxt_port_t * port)2287 nxt_router_port_queue_init(nxt_task_t *task, nxt_port_t *port)
2288 {
2289 void *mem;
2290 nxt_int_t fd;
2291
2292 fd = nxt_shm_open(task, sizeof(nxt_port_queue_t));
2293 if (nxt_slow_path(fd == -1)) {
2294 return NXT_ERROR;
2295 }
2296
2297 mem = nxt_mem_mmap(NULL, sizeof(nxt_port_queue_t),
2298 PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
2299 if (nxt_slow_path(mem == MAP_FAILED)) {
2300 nxt_fd_close(fd);
2301
2302 return NXT_ERROR;
2303 }
2304
2305 nxt_port_queue_init(mem);
2306
2307 port->queue_fd = fd;
2308 port->queue = mem;
2309
2310 return NXT_OK;
2311 }
2312
2313
2314 static nxt_int_t
nxt_router_port_queue_map(nxt_task_t * task,nxt_port_t * port,nxt_fd_t fd)2315 nxt_router_port_queue_map(nxt_task_t *task, nxt_port_t *port, nxt_fd_t fd)
2316 {
2317 void *mem;
2318
2319 nxt_assert(fd != -1);
2320
2321 mem = nxt_mem_mmap(NULL, sizeof(nxt_port_queue_t),
2322 PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
2323 if (nxt_slow_path(mem == MAP_FAILED)) {
2324
2325 return NXT_ERROR;
2326 }
2327
2328 port->queue = mem;
2329
2330 return NXT_OK;
2331 }
2332
2333
2334 static const nxt_lvlhsh_proto_t nxt_router_apps_hash_proto nxt_aligned(64) = {
2335 NXT_LVLHSH_DEFAULT,
2336 nxt_router_apps_hash_test,
2337 nxt_mp_lvlhsh_alloc,
2338 nxt_mp_lvlhsh_free,
2339 };
2340
2341
2342 static nxt_int_t
nxt_router_apps_hash_test(nxt_lvlhsh_query_t * lhq,void * data)2343 nxt_router_apps_hash_test(nxt_lvlhsh_query_t *lhq, void *data)
2344 {
2345 nxt_app_t *app;
2346
2347 app = data;
2348
2349 return nxt_strstr_eq(&lhq->key, &app->name) ? NXT_OK : NXT_DECLINED;
2350 }
2351
2352
2353 static nxt_int_t
nxt_router_apps_hash_add(nxt_router_conf_t * rtcf,nxt_app_t * app)2354 nxt_router_apps_hash_add(nxt_router_conf_t *rtcf, nxt_app_t *app)
2355 {
2356 nxt_lvlhsh_query_t lhq;
2357
2358 lhq.key_hash = nxt_djb_hash(app->name.start, app->name.length);
2359 lhq.replace = 0;
2360 lhq.key = app->name;
2361 lhq.value = app;
2362 lhq.proto = &nxt_router_apps_hash_proto;
2363 lhq.pool = rtcf->mem_pool;
2364
2365 switch (nxt_lvlhsh_insert(&rtcf->apps_hash, &lhq)) {
2366
2367 case NXT_OK:
2368 return NXT_OK;
2369
2370 case NXT_DECLINED:
2371 nxt_thread_log_alert("router app hash adding failed: "
2372 "\"%V\" is already in hash", &lhq.key);
2373 /* Fall through. */
2374 default:
2375 return NXT_ERROR;
2376 }
2377 }
2378
2379
2380 static nxt_app_t *
nxt_router_apps_hash_get(nxt_router_conf_t * rtcf,nxt_str_t * name)2381 nxt_router_apps_hash_get(nxt_router_conf_t *rtcf, nxt_str_t *name)
2382 {
2383 nxt_lvlhsh_query_t lhq;
2384
2385 lhq.key_hash = nxt_djb_hash(name->start, name->length);
2386 lhq.key = *name;
2387 lhq.proto = &nxt_router_apps_hash_proto;
2388
2389 if (nxt_lvlhsh_find(&rtcf->apps_hash, &lhq) != NXT_OK) {
2390 return NULL;
2391 }
2392
2393 return lhq.value;
2394 }
2395
2396
2397 static void
nxt_router_apps_hash_use(nxt_task_t * task,nxt_router_conf_t * rtcf,int i)2398 nxt_router_apps_hash_use(nxt_task_t *task, nxt_router_conf_t *rtcf, int i)
2399 {
2400 nxt_app_t *app;
2401 nxt_lvlhsh_each_t lhe;
2402
2403 nxt_lvlhsh_each_init(&lhe, &nxt_router_apps_hash_proto);
2404
2405 for ( ;; ) {
2406 app = nxt_lvlhsh_each(&rtcf->apps_hash, &lhe);
2407
2408 if (app == NULL) {
2409 break;
2410 }
2411
2412 nxt_router_app_use(task, app, i);
2413 }
2414 }
2415
2416
2417 typedef struct {
2418 nxt_app_t *app;
2419 nxt_int_t target;
2420 } nxt_http_app_conf_t;
2421
2422
2423 nxt_int_t
nxt_router_application_init(nxt_router_conf_t * rtcf,nxt_str_t * name,nxt_str_t * target,nxt_http_action_t * action)2424 nxt_router_application_init(nxt_router_conf_t *rtcf, nxt_str_t *name,
2425 nxt_str_t *target, nxt_http_action_t *action)
2426 {
2427 nxt_app_t *app;
2428 nxt_str_t *targets;
2429 nxt_uint_t i;
2430 nxt_http_app_conf_t *conf;
2431
2432 app = nxt_router_apps_hash_get(rtcf, name);
2433 if (app == NULL) {
2434 return NXT_DECLINED;
2435 }
2436
2437 conf = nxt_mp_get(rtcf->mem_pool, sizeof(nxt_http_app_conf_t));
2438 if (nxt_slow_path(conf == NULL)) {
2439 return NXT_ERROR;
2440 }
2441
2442 action->handler = nxt_http_application_handler;
2443 action->u.conf = conf;
2444
2445 conf->app = app;
2446
2447 if (target != NULL && target->length != 0) {
2448 targets = app->targets;
2449
2450 for (i = 0; !nxt_strstr_eq(target, &targets[i]); i++);
2451
2452 conf->target = i;
2453
2454 } else {
2455 conf->target = 0;
2456 }
2457
2458 return NXT_OK;
2459 }
2460
2461
2462 static nxt_socket_conf_t *
nxt_router_socket_conf(nxt_task_t * task,nxt_router_temp_conf_t * tmcf,nxt_str_t * name)2463 nxt_router_socket_conf(nxt_task_t *task, nxt_router_temp_conf_t *tmcf,
2464 nxt_str_t *name)
2465 {
2466 size_t size;
2467 nxt_int_t ret;
2468 nxt_bool_t wildcard;
2469 nxt_sockaddr_t *sa;
2470 nxt_socket_conf_t *skcf;
2471 nxt_listen_socket_t *ls;
2472
2473 sa = nxt_sockaddr_parse(tmcf->mem_pool, name);
2474 if (nxt_slow_path(sa == NULL)) {
2475 nxt_alert(task, "invalid listener \"%V\"", name);
2476 return NULL;
2477 }
2478
2479 sa->type = SOCK_STREAM;
2480
2481 nxt_debug(task, "router listener: \"%*s\"",
2482 (size_t) sa->length, nxt_sockaddr_start(sa));
2483
2484 skcf = nxt_mp_zget(tmcf->router_conf->mem_pool, sizeof(nxt_socket_conf_t));
2485 if (nxt_slow_path(skcf == NULL)) {
2486 return NULL;
2487 }
2488
2489 size = nxt_sockaddr_size(sa);
2490
2491 ret = nxt_router_listen_socket_find(tmcf, skcf, sa);
2492
2493 if (ret != NXT_OK) {
2494
2495 ls = nxt_zalloc(sizeof(nxt_listen_socket_t) + size);
2496 if (nxt_slow_path(ls == NULL)) {
2497 return NULL;
2498 }
2499
2500 skcf->listen = ls;
2501
2502 ls->sockaddr = nxt_pointer_to(ls, sizeof(nxt_listen_socket_t));
2503 nxt_memcpy(ls->sockaddr, sa, size);
2504
2505 nxt_listen_socket_remote_size(ls);
2506
2507 ls->socket = -1;
2508 ls->backlog = NXT_LISTEN_BACKLOG;
2509 ls->flags = NXT_NONBLOCK;
2510 ls->read_after_accept = 1;
2511 }
2512
2513 switch (sa->u.sockaddr.sa_family) {
2514 #if (NXT_HAVE_UNIX_DOMAIN)
2515 case AF_UNIX:
2516 wildcard = 0;
2517 break;
2518 #endif
2519 #if (NXT_INET6)
2520 case AF_INET6:
2521 wildcard = IN6_IS_ADDR_UNSPECIFIED(&sa->u.sockaddr_in6.sin6_addr);
2522 break;
2523 #endif
2524 case AF_INET:
2525 default:
2526 wildcard = (sa->u.sockaddr_in.sin_addr.s_addr == INADDR_ANY);
2527 break;
2528 }
2529
2530 if (!wildcard) {
2531 skcf->sockaddr = nxt_mp_zget(tmcf->router_conf->mem_pool, size);
2532 if (nxt_slow_path(skcf->sockaddr == NULL)) {
2533 return NULL;
2534 }
2535
2536 nxt_memcpy(skcf->sockaddr, sa, size);
2537 }
2538
2539 return skcf;
2540 }
2541
2542
2543 static nxt_int_t
nxt_router_listen_socket_find(nxt_router_temp_conf_t * tmcf,nxt_socket_conf_t * nskcf,nxt_sockaddr_t * sa)2544 nxt_router_listen_socket_find(nxt_router_temp_conf_t *tmcf,
2545 nxt_socket_conf_t *nskcf, nxt_sockaddr_t *sa)
2546 {
2547 nxt_router_t *router;
2548 nxt_queue_link_t *qlk;
2549 nxt_socket_conf_t *skcf;
2550
2551 router = tmcf->router_conf->router;
2552
2553 for (qlk = nxt_queue_first(&router->sockets);
2554 qlk != nxt_queue_tail(&router->sockets);
2555 qlk = nxt_queue_next(qlk))
2556 {
2557 skcf = nxt_queue_link_data(qlk, nxt_socket_conf_t, link);
2558
2559 if (nxt_sockaddr_cmp(skcf->listen->sockaddr, sa)) {
2560 nskcf->listen = skcf->listen;
2561
2562 nxt_queue_remove(qlk);
2563 nxt_queue_insert_tail(&keeping_sockets, qlk);
2564
2565 nxt_queue_insert_tail(&updating_sockets, &nskcf->link);
2566
2567 return NXT_OK;
2568 }
2569 }
2570
2571 nxt_queue_insert_tail(&pending_sockets, &nskcf->link);
2572
2573 return NXT_DECLINED;
2574 }
2575
2576
2577 static void
nxt_router_listen_socket_rpc_create(nxt_task_t * task,nxt_router_temp_conf_t * tmcf,nxt_socket_conf_t * skcf)2578 nxt_router_listen_socket_rpc_create(nxt_task_t *task,
2579 nxt_router_temp_conf_t *tmcf, nxt_socket_conf_t *skcf)
2580 {
2581 size_t size;
2582 uint32_t stream;
2583 nxt_int_t ret;
2584 nxt_buf_t *b;
2585 nxt_port_t *main_port, *router_port;
2586 nxt_runtime_t *rt;
2587 nxt_socket_rpc_t *rpc;
2588
2589 rpc = nxt_mp_alloc(tmcf->mem_pool, sizeof(nxt_socket_rpc_t));
2590 if (rpc == NULL) {
2591 goto fail;
2592 }
2593
2594 rpc->socket_conf = skcf;
2595 rpc->temp_conf = tmcf;
2596
2597 size = nxt_sockaddr_size(skcf->listen->sockaddr);
2598
2599 b = nxt_buf_mem_alloc(tmcf->mem_pool, size, 0);
2600 if (b == NULL) {
2601 goto fail;
2602 }
2603
2604 b->completion_handler = nxt_buf_dummy_completion;
2605
2606 b->mem.free = nxt_cpymem(b->mem.free, skcf->listen->sockaddr, size);
2607
2608 rt = task->thread->runtime;
2609 main_port = rt->port_by_type[NXT_PROCESS_MAIN];
2610 router_port = rt->port_by_type[NXT_PROCESS_ROUTER];
2611
2612 stream = nxt_port_rpc_register_handler(task, router_port,
2613 nxt_router_listen_socket_ready,
2614 nxt_router_listen_socket_error,
2615 main_port->pid, rpc);
2616 if (nxt_slow_path(stream == 0)) {
2617 goto fail;
2618 }
2619
2620 ret = nxt_port_socket_write(task, main_port, NXT_PORT_MSG_SOCKET, -1,
2621 stream, router_port->id, b);
2622
2623 if (nxt_slow_path(ret != NXT_OK)) {
2624 nxt_port_rpc_cancel(task, router_port, stream);
2625 goto fail;
2626 }
2627
2628 return;
2629
2630 fail:
2631
2632 nxt_router_conf_error(task, tmcf);
2633 }
2634
2635
2636 static void
nxt_router_listen_socket_ready(nxt_task_t * task,nxt_port_recv_msg_t * msg,void * data)2637 nxt_router_listen_socket_ready(nxt_task_t *task, nxt_port_recv_msg_t *msg,
2638 void *data)
2639 {
2640 nxt_int_t ret;
2641 nxt_socket_t s;
2642 nxt_socket_rpc_t *rpc;
2643
2644 rpc = data;
2645
2646 s = msg->fd[0];
2647
2648 ret = nxt_socket_nonblocking(task, s);
2649 if (nxt_slow_path(ret != NXT_OK)) {
2650 goto fail;
2651 }
2652
2653 nxt_socket_defer_accept(task, s, rpc->socket_conf->listen->sockaddr);
2654
2655 ret = nxt_listen_socket(task, s, NXT_LISTEN_BACKLOG);
2656 if (nxt_slow_path(ret != NXT_OK)) {
2657 goto fail;
2658 }
2659
2660 rpc->socket_conf->listen->socket = s;
2661
2662 nxt_work_queue_add(&task->thread->engine->fast_work_queue,
2663 nxt_router_conf_apply, task, rpc->temp_conf, NULL);
2664
2665 return;
2666
2667 fail:
2668
2669 nxt_socket_close(task, s);
2670
2671 nxt_router_conf_error(task, rpc->temp_conf);
2672 }
2673
2674
2675 static void
nxt_router_listen_socket_error(nxt_task_t * task,nxt_port_recv_msg_t * msg,void * data)2676 nxt_router_listen_socket_error(nxt_task_t *task, nxt_port_recv_msg_t *msg,
2677 void *data)
2678 {
2679 nxt_socket_rpc_t *rpc;
2680 nxt_router_temp_conf_t *tmcf;
2681
2682 rpc = data;
2683 tmcf = rpc->temp_conf;
2684
2685 #if 0
2686 u_char *p;
2687 size_t size;
2688 uint8_t error;
2689 nxt_buf_t *in, *out;
2690 nxt_sockaddr_t *sa;
2691
2692 static nxt_str_t socket_errors[] = {
2693 nxt_string("ListenerSystem"),
2694 nxt_string("ListenerNoIPv6"),
2695 nxt_string("ListenerPort"),
2696 nxt_string("ListenerInUse"),
2697 nxt_string("ListenerNoAddress"),
2698 nxt_string("ListenerNoAccess"),
2699 nxt_string("ListenerPath"),
2700 };
2701
2702 sa = rpc->socket_conf->listen->sockaddr;
2703
2704 in = nxt_buf_chk_make_plain(tmcf->mem_pool, msg->buf, msg->size);
2705
2706 if (nxt_slow_path(in == NULL)) {
2707 return;
2708 }
2709
2710 p = in->mem.pos;
2711
2712 error = *p++;
2713
2714 size = nxt_length("listen socket error: ")
2715 + nxt_length("{listener: \"\", code:\"\", message: \"\"}")
2716 + sa->length + socket_errors[error].length + (in->mem.free - p);
2717
2718 out = nxt_buf_mem_alloc(tmcf->mem_pool, size, 0);
2719 if (nxt_slow_path(out == NULL)) {
2720 return;
2721 }
2722
2723 out->mem.free = nxt_sprintf(out->mem.free, out->mem.end,
2724 "listen socket error: "
2725 "{listener: \"%*s\", code:\"%V\", message: \"%*s\"}",
2726 (size_t) sa->length, nxt_sockaddr_start(sa),
2727 &socket_errors[error], in->mem.free - p, p);
2728
2729 nxt_debug(task, "%*s", out->mem.free - out->mem.pos, out->mem.pos);
2730 #endif
2731
2732 nxt_router_conf_error(task, tmcf);
2733 }
2734
2735
2736 #if (NXT_TLS)
2737
2738 static void
nxt_router_tls_rpc_handler(nxt_task_t * task,nxt_port_recv_msg_t * msg,void * data)2739 nxt_router_tls_rpc_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg,
2740 void *data)
2741 {
2742 nxt_mp_t *mp;
2743 nxt_int_t ret;
2744 nxt_tls_conf_t *tlscf;
2745 nxt_router_tlssock_t *tls;
2746 nxt_tls_bundle_conf_t *bundle;
2747 nxt_router_temp_conf_t *tmcf;
2748
2749 nxt_debug(task, "tls rpc handler");
2750
2751 tls = data;
2752 tmcf = tls->temp_conf;
2753
2754 if (msg == NULL || msg->port_msg.type == _NXT_PORT_MSG_RPC_ERROR) {
2755 goto fail;
2756 }
2757
2758 mp = tmcf->router_conf->mem_pool;
2759
2760 if (tls->socket_conf->tls == NULL){
2761 tlscf = nxt_mp_zget(mp, sizeof(nxt_tls_conf_t));
2762 if (nxt_slow_path(tlscf == NULL)) {
2763 goto fail;
2764 }
2765
2766 tlscf->no_wait_shutdown = 1;
2767 tls->socket_conf->tls = tlscf;
2768
2769 } else {
2770 tlscf = tls->socket_conf->tls;
2771 }
2772
2773 tls->tls_init->conf = tlscf;
2774
2775 bundle = nxt_mp_get(mp, sizeof(nxt_tls_bundle_conf_t));
2776 if (nxt_slow_path(bundle == NULL)) {
2777 goto fail;
2778 }
2779
2780 if (nxt_slow_path(nxt_str_dup(mp, &bundle->name, &tls->name) == NULL)) {
2781 goto fail;
2782 }
2783
2784 bundle->chain_file = msg->fd[0];
2785 bundle->next = tlscf->bundle;
2786 tlscf->bundle = bundle;
2787
2788 ret = task->thread->runtime->tls->server_init(task, mp, tls->tls_init,
2789 tls->last);
2790 if (nxt_slow_path(ret != NXT_OK)) {
2791 goto fail;
2792 }
2793
2794 nxt_work_queue_add(&task->thread->engine->fast_work_queue,
2795 nxt_router_conf_apply, task, tmcf, NULL);
2796 return;
2797
2798 fail:
2799
2800 nxt_router_conf_error(task, tmcf);
2801 }
2802
2803 #endif
2804
2805
2806 static void
nxt_router_app_rpc_create(nxt_task_t * task,nxt_router_temp_conf_t * tmcf,nxt_app_t * app)2807 nxt_router_app_rpc_create(nxt_task_t *task,
2808 nxt_router_temp_conf_t *tmcf, nxt_app_t *app)
2809 {
2810 size_t size;
2811 uint32_t stream;
2812 nxt_fd_t port_fd, queue_fd;
2813 nxt_int_t ret;
2814 nxt_buf_t *b;
2815 nxt_port_t *router_port, *dport;
2816 nxt_runtime_t *rt;
2817 nxt_app_rpc_t *rpc;
2818
2819 rt = task->thread->runtime;
2820
2821 dport = app->proto_port;
2822
2823 if (dport == NULL) {
2824 nxt_debug(task, "app '%V' prototype prefork", &app->name);
2825
2826 size = app->name.length + 1 + app->conf.length;
2827
2828 b = nxt_buf_mem_alloc(tmcf->mem_pool, size, 0);
2829 if (nxt_slow_path(b == NULL)) {
2830 goto fail;
2831 }
2832
2833 b->completion_handler = nxt_buf_dummy_completion;
2834
2835 nxt_buf_cpystr(b, &app->name);
2836 *b->mem.free++ = '\0';
2837 nxt_buf_cpystr(b, &app->conf);
2838
2839 dport = rt->port_by_type[NXT_PROCESS_MAIN];
2840
2841 port_fd = app->shared_port->pair[0];
2842 queue_fd = app->shared_port->queue_fd;
2843
2844 } else {
2845 nxt_debug(task, "app '%V' prefork", &app->name);
2846
2847 b = NULL;
2848 port_fd = -1;
2849 queue_fd = -1;
2850 }
2851
2852 router_port = rt->port_by_type[NXT_PROCESS_ROUTER];
2853
2854 rpc = nxt_port_rpc_register_handler_ex(task, router_port,
2855 nxt_router_app_prefork_ready,
2856 nxt_router_app_prefork_error,
2857 sizeof(nxt_app_rpc_t));
2858 if (nxt_slow_path(rpc == NULL)) {
2859 goto fail;
2860 }
2861
2862 rpc->app = app;
2863 rpc->temp_conf = tmcf;
2864 rpc->proto = (b != NULL);
2865
2866 stream = nxt_port_rpc_ex_stream(rpc);
2867
2868 ret = nxt_port_socket_write2(task, dport, NXT_PORT_MSG_START_PROCESS,
2869 port_fd, queue_fd, stream, router_port->id, b);
2870 if (nxt_slow_path(ret != NXT_OK)) {
2871 nxt_port_rpc_cancel(task, router_port, stream);
2872 goto fail;
2873 }
2874
2875 if (b == NULL) {
2876 nxt_port_rpc_ex_set_peer(task, router_port, rpc, dport->pid);
2877
2878 app->pending_processes++;
2879 }
2880
2881 return;
2882
2883 fail:
2884
2885 nxt_router_conf_error(task, tmcf);
2886 }
2887
2888
2889 static void
nxt_router_app_prefork_ready(nxt_task_t * task,nxt_port_recv_msg_t * msg,void * data)2890 nxt_router_app_prefork_ready(nxt_task_t *task, nxt_port_recv_msg_t *msg,
2891 void *data)
2892 {
2893 nxt_app_t *app;
2894 nxt_port_t *port;
2895 nxt_app_rpc_t *rpc;
2896 nxt_event_engine_t *engine;
2897
2898 rpc = data;
2899 app = rpc->app;
2900
2901 port = msg->u.new_port;
2902
2903 nxt_assert(port != NULL);
2904 nxt_assert(port->id == 0);
2905
2906 if (rpc->proto) {
2907 nxt_assert(app->proto_port == NULL);
2908 nxt_assert(port->type == NXT_PROCESS_PROTOTYPE);
2909
2910 nxt_port_inc_use(port);
2911
2912 app->proto_port = port;
2913 port->app = app;
2914
2915 nxt_router_app_rpc_create(task, rpc->temp_conf, app);
2916
2917 return;
2918 }
2919
2920 nxt_assert(port->type == NXT_PROCESS_APP);
2921
2922 port->app = app;
2923 port->main_app_port = port;
2924
2925 app->pending_processes--;
2926 app->processes++;
2927 app->idle_processes++;
2928
2929 engine = task->thread->engine;
2930
2931 nxt_queue_insert_tail(&app->ports, &port->app_link);
2932 nxt_queue_insert_tail(&app->spare_ports, &port->idle_link);
2933
2934 nxt_debug(task, "app '%V' move new port %PI:%d to spare_ports",
2935 &app->name, port->pid, port->id);
2936
2937 nxt_port_hash_add(&app->port_hash, port);
2938 app->port_hash_count++;
2939
2940 port->idle_start = 0;
2941
2942 nxt_port_inc_use(port);
2943
2944 nxt_port_socket_write(task, port, NXT_PORT_MSG_PORT_ACK, -1, 0, 0, NULL);
2945
2946 nxt_work_queue_add(&engine->fast_work_queue,
2947 nxt_router_conf_apply, task, rpc->temp_conf, NULL);
2948 }
2949
2950
2951 static void
nxt_router_app_prefork_error(nxt_task_t * task,nxt_port_recv_msg_t * msg,void * data)2952 nxt_router_app_prefork_error(nxt_task_t *task, nxt_port_recv_msg_t *msg,
2953 void *data)
2954 {
2955 nxt_app_t *app;
2956 nxt_app_rpc_t *rpc;
2957 nxt_router_temp_conf_t *tmcf;
2958
2959 rpc = data;
2960 app = rpc->app;
2961 tmcf = rpc->temp_conf;
2962
2963 if (rpc->proto) {
2964 nxt_log(task, NXT_LOG_WARN, "failed to start prototype \"%V\"",
2965 &app->name);
2966
2967 } else {
2968 nxt_log(task, NXT_LOG_WARN, "failed to start application \"%V\"",
2969 &app->name);
2970
2971 app->pending_processes--;
2972 }
2973
2974 nxt_router_conf_error(task, tmcf);
2975 }
2976
2977
2978 static nxt_int_t
nxt_router_engines_create(nxt_task_t * task,nxt_router_t * router,nxt_router_temp_conf_t * tmcf,const nxt_event_interface_t * interface)2979 nxt_router_engines_create(nxt_task_t *task, nxt_router_t *router,
2980 nxt_router_temp_conf_t *tmcf, const nxt_event_interface_t *interface)
2981 {
2982 nxt_int_t ret;
2983 nxt_uint_t n, threads;
2984 nxt_queue_link_t *qlk;
2985 nxt_router_engine_conf_t *recf;
2986
2987 threads = tmcf->router_conf->threads;
2988
2989 tmcf->engines = nxt_array_create(tmcf->mem_pool, threads,
2990 sizeof(nxt_router_engine_conf_t));
2991 if (nxt_slow_path(tmcf->engines == NULL)) {
2992 return NXT_ERROR;
2993 }
2994
2995 n = 0;
2996
2997 for (qlk = nxt_queue_first(&router->engines);
2998 qlk != nxt_queue_tail(&router->engines);
2999 qlk = nxt_queue_next(qlk))
3000 {
3001 recf = nxt_array_zero_add(tmcf->engines);
3002 if (nxt_slow_path(recf == NULL)) {
3003 return NXT_ERROR;
3004 }
3005
3006 recf->engine = nxt_queue_link_data(qlk, nxt_event_engine_t, link0);
3007
3008 if (n < threads) {
3009 recf->action = NXT_ROUTER_ENGINE_KEEP;
3010 ret = nxt_router_engine_conf_update(tmcf, recf);
3011
3012 } else {
3013 recf->action = NXT_ROUTER_ENGINE_DELETE;
3014 ret = nxt_router_engine_conf_delete(tmcf, recf);
3015 }
3016
3017 if (nxt_slow_path(ret != NXT_OK)) {
3018 return ret;
3019 }
3020
3021 n++;
3022 }
3023
3024 tmcf->new_threads = n;
3025
3026 while (n < threads) {
3027 recf = nxt_array_zero_add(tmcf->engines);
3028 if (nxt_slow_path(recf == NULL)) {
3029 return NXT_ERROR;
3030 }
3031
3032 recf->action = NXT_ROUTER_ENGINE_ADD;
3033
3034 recf->engine = nxt_event_engine_create(task, interface, NULL, 0, 0);
3035 if (nxt_slow_path(recf->engine == NULL)) {
3036 return NXT_ERROR;
3037 }
3038
3039 ret = nxt_router_engine_conf_create(tmcf, recf);
3040 if (nxt_slow_path(ret != NXT_OK)) {
3041 return ret;
3042 }
3043
3044 n++;
3045 }
3046
3047 return NXT_OK;
3048 }
3049
3050
3051 static nxt_int_t
nxt_router_engine_conf_create(nxt_router_temp_conf_t * tmcf,nxt_router_engine_conf_t * recf)3052 nxt_router_engine_conf_create(nxt_router_temp_conf_t *tmcf,
3053 nxt_router_engine_conf_t *recf)
3054 {
3055 nxt_int_t ret;
3056
3057 ret = nxt_router_engine_joints_create(tmcf, recf, &creating_sockets,
3058 nxt_router_listen_socket_create);
3059 if (nxt_slow_path(ret != NXT_OK)) {
3060 return ret;
3061 }
3062
3063 ret = nxt_router_engine_joints_create(tmcf, recf, &updating_sockets,
3064 nxt_router_listen_socket_create);
3065 if (nxt_slow_path(ret != NXT_OK)) {
3066 return ret;
3067 }
3068
3069 return ret;
3070 }
3071
3072
3073 static nxt_int_t
nxt_router_engine_conf_update(nxt_router_temp_conf_t * tmcf,nxt_router_engine_conf_t * recf)3074 nxt_router_engine_conf_update(nxt_router_temp_conf_t *tmcf,
3075 nxt_router_engine_conf_t *recf)
3076 {
3077 nxt_int_t ret;
3078
3079 ret = nxt_router_engine_joints_create(tmcf, recf, &creating_sockets,
3080 nxt_router_listen_socket_create);
3081 if (nxt_slow_path(ret != NXT_OK)) {
3082 return ret;
3083 }
3084
3085 ret = nxt_router_engine_joints_create(tmcf, recf, &updating_sockets,
3086 nxt_router_listen_socket_update);
3087 if (nxt_slow_path(ret != NXT_OK)) {
3088 return ret;
3089 }
3090
3091 ret = nxt_router_engine_joints_delete(tmcf, recf, &deleting_sockets);
3092 if (nxt_slow_path(ret != NXT_OK)) {
3093 return ret;
3094 }
3095
3096 return ret;
3097 }
3098
3099
3100 static nxt_int_t
nxt_router_engine_conf_delete(nxt_router_temp_conf_t * tmcf,nxt_router_engine_conf_t * recf)3101 nxt_router_engine_conf_delete(nxt_router_temp_conf_t *tmcf,
3102 nxt_router_engine_conf_t *recf)
3103 {
3104 nxt_int_t ret;
3105
3106 ret = nxt_router_engine_quit(tmcf, recf);
3107 if (nxt_slow_path(ret != NXT_OK)) {
3108 return ret;
3109 }
3110
3111 ret = nxt_router_engine_joints_delete(tmcf, recf, &updating_sockets);
3112 if (nxt_slow_path(ret != NXT_OK)) {
3113 return ret;
3114 }
3115
3116 return nxt_router_engine_joints_delete(tmcf, recf, &deleting_sockets);
3117 }
3118
3119
3120 static nxt_int_t
nxt_router_engine_joints_create(nxt_router_temp_conf_t * tmcf,nxt_router_engine_conf_t * recf,nxt_queue_t * sockets,nxt_work_handler_t handler)3121 nxt_router_engine_joints_create(nxt_router_temp_conf_t *tmcf,
3122 nxt_router_engine_conf_t *recf, nxt_queue_t *sockets,
3123 nxt_work_handler_t handler)
3124 {
3125 nxt_int_t ret;
3126 nxt_joint_job_t *job;
3127 nxt_queue_link_t *qlk;
3128 nxt_socket_conf_t *skcf;
3129 nxt_socket_conf_joint_t *joint;
3130
3131 for (qlk = nxt_queue_first(sockets);
3132 qlk != nxt_queue_tail(sockets);
3133 qlk = nxt_queue_next(qlk))
3134 {
3135 job = nxt_mp_get(tmcf->mem_pool, sizeof(nxt_joint_job_t));
3136 if (nxt_slow_path(job == NULL)) {
3137 return NXT_ERROR;
3138 }
3139
3140 job->work.next = recf->jobs;
3141 recf->jobs = &job->work;
3142
3143 job->task = tmcf->engine->task;
3144 job->work.handler = handler;
3145 job->work.task = &j