1
2 /*
3 * Copyright (C) Igor Sysoev
4 * Copyright (C) Valentin V. Bartenev
5 * Copyright (C) NGINX, Inc.
6 */
7
8 #include <nxt_router.h>
9 #include <nxt_conf.h>
10 #if (NXT_TLS)
11 #include <nxt_cert.h>
12 #endif
13 #include <nxt_http.h>
14 #include <nxt_port_memory_int.h>
15 #include <nxt_unit_request.h>
16 #include <nxt_unit_response.h>
17 #include <nxt_router_request.h>
18 #include <nxt_app_queue.h>
19 #include <nxt_port_queue.h>
20
21 #define NXT_SHARED_PORT_ID 0xFFFFu
22
23 typedef struct {
24 nxt_str_t type;
25 uint32_t processes;
26 uint32_t max_processes;
27 uint32_t spare_processes;
28 nxt_msec_t timeout;
29 nxt_msec_t idle_timeout;
30 nxt_conf_value_t *limits_value;
31 nxt_conf_value_t *processes_value;
32 nxt_conf_value_t *targets_value;
33 } nxt_router_app_conf_t;
34
35
36 typedef struct {
37 nxt_str_t pass;
38 nxt_str_t application;
39 } nxt_router_listener_conf_t;
40
41
42 #if (NXT_TLS)
43
44 typedef struct {
45 nxt_str_t name;
46 nxt_socket_conf_t *socket_conf;
47 nxt_router_temp_conf_t *temp_conf;
48 nxt_tls_init_t *tls_init;
49 nxt_bool_t last;
50
51 nxt_queue_link_t link; /* for nxt_socket_conf_t.tls */
52 } nxt_router_tlssock_t;
53
54 #endif
55
56
57 typedef struct {
58 nxt_str_t *name;
59 nxt_socket_conf_t *socket_conf;
60 nxt_router_temp_conf_t *temp_conf;
61 nxt_bool_t last;
62 } nxt_socket_rpc_t;
63
64
65 typedef struct {
66 nxt_app_t *app;
67 nxt_router_temp_conf_t *temp_conf;
68 uint8_t proto; /* 1 bit */
69 } nxt_app_rpc_t;
70
71
72 typedef struct {
73 nxt_app_joint_t *app_joint;
74 uint32_t generation;
75 uint8_t proto; /* 1 bit */
76 } nxt_app_joint_rpc_t;
77
78
79 static nxt_int_t nxt_router_prefork(nxt_task_t *task, nxt_process_t *process,
80 nxt_mp_t *mp);
81 static nxt_int_t nxt_router_start(nxt_task_t *task, nxt_process_data_t *data);
82 static void nxt_router_greet_controller(nxt_task_t *task,
83 nxt_port_t *controller_port);
84
85 static nxt_int_t nxt_router_start_app_process(nxt_task_t *task, nxt_app_t *app);
86
87 static void nxt_router_new_port_handler(nxt_task_t *task,
88 nxt_port_recv_msg_t *msg);
89 static void nxt_router_conf_data_handler(nxt_task_t *task,
90 nxt_port_recv_msg_t *msg);
91 static void nxt_router_app_restart_handler(nxt_task_t *task,
92 nxt_port_recv_msg_t *msg);
93 static void nxt_router_remove_pid_handler(nxt_task_t *task,
94 nxt_port_recv_msg_t *msg);
95 static void nxt_router_access_log_reopen_handler(nxt_task_t *task,
96 nxt_port_recv_msg_t *msg);
97
98 static nxt_router_temp_conf_t *nxt_router_temp_conf(nxt_task_t *task);
99 static void nxt_router_conf_apply(nxt_task_t *task, void *obj, void *data);
100 static void nxt_router_conf_ready(nxt_task_t *task,
101 nxt_router_temp_conf_t *tmcf);
102 static void nxt_router_conf_error(nxt_task_t *task,
103 nxt_router_temp_conf_t *tmcf);
104 static void nxt_router_conf_send(nxt_task_t *task,
105 nxt_router_temp_conf_t *tmcf, nxt_port_msg_type_t type);
106
107 static nxt_int_t nxt_router_conf_create(nxt_task_t *task,
108 nxt_router_temp_conf_t *tmcf, u_char *start, u_char *end);
109 static nxt_int_t nxt_router_conf_process_static(nxt_task_t *task,
110 nxt_router_conf_t *rtcf, nxt_conf_value_t *conf);
111 static nxt_int_t nxt_router_conf_process_client_ip(nxt_task_t *task,
112 nxt_router_temp_conf_t *tmcf, nxt_socket_conf_t *skcf,
113 nxt_conf_value_t *conf);
114
115 static nxt_app_t *nxt_router_app_find(nxt_queue_t *queue, nxt_str_t *name);
116 static nxt_int_t nxt_router_apps_hash_test(nxt_lvlhsh_query_t *lhq, void *data);
117 static nxt_int_t nxt_router_apps_hash_add(nxt_router_conf_t *rtcf,
118 nxt_app_t *app);
119 static nxt_app_t *nxt_router_apps_hash_get(nxt_router_conf_t *rtcf,
120 nxt_str_t *name);
121 static void nxt_router_apps_hash_use(nxt_task_t *task, nxt_router_conf_t *rtcf,
122 int i);
123
124 static nxt_int_t nxt_router_app_queue_init(nxt_task_t *task,
125 nxt_port_t *port);
126 static nxt_int_t nxt_router_port_queue_init(nxt_task_t *task,
127 nxt_port_t *port);
128 static nxt_int_t nxt_router_port_queue_map(nxt_task_t *task,
129 nxt_port_t *port, nxt_fd_t fd);
130 static void nxt_router_listen_socket_rpc_create(nxt_task_t *task,
131 nxt_router_temp_conf_t *tmcf, nxt_socket_conf_t *skcf);
132 static void nxt_router_listen_socket_ready(nxt_task_t *task,
133 nxt_port_recv_msg_t *msg, void *data);
134 static void nxt_router_listen_socket_error(nxt_task_t *task,
135 nxt_port_recv_msg_t *msg, void *data);
136 #if (NXT_TLS)
137 static void nxt_router_tls_rpc_handler(nxt_task_t *task,
138 nxt_port_recv_msg_t *msg, void *data);
139 static nxt_int_t nxt_router_conf_tls_insert(nxt_router_temp_conf_t *tmcf,
140 nxt_conf_value_t *value, nxt_socket_conf_t *skcf, nxt_tls_init_t *tls_init,
141 nxt_bool_t last);
142 #endif
143 static void nxt_router_app_rpc_create(nxt_task_t *task,
144 nxt_router_temp_conf_t *tmcf, nxt_app_t *app);
145 static void nxt_router_app_prefork_ready(nxt_task_t *task,
146 nxt_port_recv_msg_t *msg, void *data);
147 static void nxt_router_app_prefork_error(nxt_task_t *task,
148 nxt_port_recv_msg_t *msg, void *data);
149 static nxt_socket_conf_t *nxt_router_socket_conf(nxt_task_t *task,
150 nxt_router_temp_conf_t *tmcf, nxt_str_t *name);
151 static nxt_int_t nxt_router_listen_socket_find(nxt_router_temp_conf_t *tmcf,
152 nxt_socket_conf_t *nskcf, nxt_sockaddr_t *sa);
153
154 static nxt_int_t nxt_router_engines_create(nxt_task_t *task,
155 nxt_router_t *router, nxt_router_temp_conf_t *tmcf,
156 const nxt_event_interface_t *interface);
157 static nxt_int_t nxt_router_engine_conf_create(nxt_router_temp_conf_t *tmcf,
158 nxt_router_engine_conf_t *recf);
159 static nxt_int_t nxt_router_engine_conf_update(nxt_router_temp_conf_t *tmcf,
160 nxt_router_engine_conf_t *recf);
161 static nxt_int_t nxt_router_engine_conf_delete(nxt_router_temp_conf_t *tmcf,
162 nxt_router_engine_conf_t *recf);
163 static nxt_int_t nxt_router_engine_joints_create(nxt_router_temp_conf_t *tmcf,
164 nxt_router_engine_conf_t *recf, nxt_queue_t *sockets,
165 nxt_work_handler_t handler);
166 static nxt_int_t nxt_router_engine_quit(nxt_router_temp_conf_t *tmcf,
167 nxt_router_engine_conf_t *recf);
168 static nxt_int_t nxt_router_engine_joints_delete(nxt_router_temp_conf_t *tmcf,
169 nxt_router_engine_conf_t *recf, nxt_queue_t *sockets);
170
171 static nxt_int_t nxt_router_threads_create(nxt_task_t *task, nxt_runtime_t *rt,
172 nxt_router_temp_conf_t *tmcf);
173 static nxt_int_t nxt_router_thread_create(nxt_task_t *task, nxt_runtime_t *rt,
174 nxt_event_engine_t *engine);
175 static void nxt_router_apps_sort(nxt_task_t *task, nxt_router_t *router,
176 nxt_router_temp_conf_t *tmcf);
177
178 static void nxt_router_engines_post(nxt_router_t *router,
179 nxt_router_temp_conf_t *tmcf);
180 static void nxt_router_engine_post(nxt_event_engine_t *engine,
181 nxt_work_t *jobs);
182
183 static void nxt_router_thread_start(void *data);
184 static void nxt_router_rt_add_port(nxt_task_t *task, void *obj,
185 void *data);
186 static void nxt_router_listen_socket_create(nxt_task_t *task, void *obj,
187 void *data);
188 static void nxt_router_listen_socket_update(nxt_task_t *task, void *obj,
189 void *data);
190 static void nxt_router_listen_socket_delete(nxt_task_t *task, void *obj,
191 void *data);
192 static void nxt_router_worker_thread_quit(nxt_task_t *task, void *obj,
193 void *data);
194 static void nxt_router_listen_socket_close(nxt_task_t *task, void *obj,
195 void *data);
196 static void nxt_router_thread_exit_handler(nxt_task_t *task, void *obj,
197 void *data);
198 static void nxt_router_req_headers_ack_handler(nxt_task_t *task,
199 nxt_port_recv_msg_t *msg, nxt_request_rpc_data_t *req_rpc_data);
200 static void nxt_router_listen_socket_release(nxt_task_t *task,
201 nxt_socket_conf_t *skcf);
202
203 static void nxt_router_access_log_writer(nxt_task_t *task,
204 nxt_http_request_t *r, nxt_router_access_log_t *access_log);
205 static u_char *nxt_router_access_log_date(u_char *buf, nxt_realtime_t *now,
206 struct tm *tm, size_t size, const char *format);
207 static void nxt_router_access_log_open(nxt_task_t *task,
208 nxt_router_temp_conf_t *tmcf);
209 static void nxt_router_access_log_ready(nxt_task_t *task,
210 nxt_port_recv_msg_t *msg, void *data);
211 static void nxt_router_access_log_error(nxt_task_t *task,
212 nxt_port_recv_msg_t *msg, void *data);
213 static void nxt_router_access_log_use(nxt_thread_spinlock_t *lock,
214 nxt_router_access_log_t *access_log);
215 static void nxt_router_access_log_release(nxt_task_t *task,
216 nxt_thread_spinlock_t *lock, nxt_router_access_log_t *access_log);
217 static void nxt_router_access_log_reopen_completion(nxt_task_t *task, void *obj,
218 void *data);
219 static void nxt_router_access_log_reopen_ready(nxt_task_t *task,
220 nxt_port_recv_msg_t *msg, void *data);
221 static void nxt_router_access_log_reopen_error(nxt_task_t *task,
222 nxt_port_recv_msg_t *msg, void *data);
223
224 static void nxt_router_app_port_ready(nxt_task_t *task,
225 nxt_port_recv_msg_t *msg, void *data);
226 static void nxt_router_app_port_error(nxt_task_t *task,
227 nxt_port_recv_msg_t *msg, void *data);
228
229 static void nxt_router_app_use(nxt_task_t *task, nxt_app_t *app, int i);
230 static void nxt_router_app_unlink(nxt_task_t *task, nxt_app_t *app);
231
232 static void nxt_router_app_port_release(nxt_task_t *task, nxt_app_t *app,
233 nxt_port_t *port, nxt_apr_action_t action);
234 static void nxt_router_app_port_get(nxt_task_t *task, nxt_app_t *app,
235 nxt_request_rpc_data_t *req_rpc_data);
236 static void nxt_router_http_request_error(nxt_task_t *task, void *obj,
237 void *data);
238 static void nxt_router_http_request_done(nxt_task_t *task, void *obj,
239 void *data);
240
241 static void nxt_router_app_prepare_request(nxt_task_t *task,
242 nxt_request_rpc_data_t *req_rpc_data);
243 static nxt_buf_t *nxt_router_prepare_msg(nxt_task_t *task,
244 nxt_http_request_t *r, nxt_app_t *app, const nxt_str_t *prefix);
245
246 static void nxt_router_app_timeout(nxt_task_t *task, void *obj, void *data);
247 static void nxt_router_adjust_idle_timer(nxt_task_t *task, void *obj,
248 void *data);
249 static void nxt_router_app_idle_timeout(nxt_task_t *task, void *obj,
250 void *data);
251 static void nxt_router_app_joint_release_handler(nxt_task_t *task, void *obj,
252 void *data);
253 static void nxt_router_free_app(nxt_task_t *task, void *obj, void *data);
254
255 static const nxt_http_request_state_t nxt_http_request_send_state;
256 static void nxt_http_request_send_body(nxt_task_t *task, void *obj, void *data);
257
258 static void nxt_router_app_joint_use(nxt_task_t *task,
259 nxt_app_joint_t *app_joint, int i);
260
261 static void nxt_router_http_request_release_post(nxt_task_t *task,
262 nxt_http_request_t *r);
263 static void nxt_router_http_request_release(nxt_task_t *task, void *obj,
264 void *data);
265 static void nxt_router_oosm_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg);
266 static void nxt_router_get_port_handler(nxt_task_t *task,
267 nxt_port_recv_msg_t *msg);
268 static void nxt_router_get_mmap_handler(nxt_task_t *task,
269 nxt_port_recv_msg_t *msg);
270
271 extern const nxt_http_request_state_t nxt_http_websocket;
272
273 static nxt_router_t *nxt_router;
274
275 static const nxt_str_t http_prefix = nxt_string("HTTP_");
276 static const nxt_str_t empty_prefix = nxt_string("");
277
278 static const nxt_str_t *nxt_app_msg_prefix[] = {
279 &empty_prefix,
280 &empty_prefix,
281 &http_prefix,
282 &http_prefix,
283 &http_prefix,
284 &empty_prefix,
285 };
286
287
288 static const nxt_port_handlers_t nxt_router_process_port_handlers = {
289 .quit = nxt_signal_quit_handler,
290 .new_port = nxt_router_new_port_handler,
291 .get_port = nxt_router_get_port_handler,
292 .change_file = nxt_port_change_log_file_handler,
293 .mmap = nxt_port_mmap_handler,
294 .get_mmap = nxt_router_get_mmap_handler,
295 .data = nxt_router_conf_data_handler,
296 .app_restart = nxt_router_app_restart_handler,
297 .remove_pid = nxt_router_remove_pid_handler,
298 .access_log = nxt_router_access_log_reopen_handler,
299 .rpc_ready = nxt_port_rpc_handler,
300 .rpc_error = nxt_port_rpc_handler,
301 .oosm = nxt_router_oosm_handler,
302 };
303
304
305 const nxt_process_init_t nxt_router_process = {
306 .name = "router",
307 .type = NXT_PROCESS_ROUTER,
308 .prefork = nxt_router_prefork,
309 .restart = 1,
310 .setup = nxt_process_core_setup,
311 .start = nxt_router_start,
312 .port_handlers = &nxt_router_process_port_handlers,
313 .signals = nxt_process_signals,
314 };
315
316
317 /* Queues of nxt_socket_conf_t */
318 nxt_queue_t creating_sockets;
319 nxt_queue_t pending_sockets;
320 nxt_queue_t updating_sockets;
321 nxt_queue_t keeping_sockets;
322 nxt_queue_t deleting_sockets;
323
324
325 static nxt_int_t
nxt_router_prefork(nxt_task_t * task,nxt_process_t * process,nxt_mp_t * mp)326 nxt_router_prefork(nxt_task_t *task, nxt_process_t *process, nxt_mp_t *mp)
327 {
328 nxt_runtime_stop_app_processes(task, task->thread->runtime);
329
330 return NXT_OK;
331 }
332
333
334 static nxt_int_t
nxt_router_start(nxt_task_t * task,nxt_process_data_t * data)335 nxt_router_start(nxt_task_t *task, nxt_process_data_t *data)
336 {
337 nxt_int_t ret;
338 nxt_port_t *controller_port;
339 nxt_router_t *router;
340 nxt_runtime_t *rt;
341
342 rt = task->thread->runtime;
343
344 nxt_log(task, NXT_LOG_INFO, "router started");
345
346 #if (NXT_TLS)
347 rt->tls = nxt_service_get(rt->services, "SSL/TLS", "OpenSSL");
348 if (nxt_slow_path(rt->tls == NULL)) {
349 return NXT_ERROR;
350 }
351
352 ret = rt->tls->library_init(task);
353 if (nxt_slow_path(ret != NXT_OK)) {
354 return ret;
355 }
356 #endif
357
358 ret = nxt_http_init(task);
359 if (nxt_slow_path(ret != NXT_OK)) {
360 return ret;
361 }
362
363 router = nxt_zalloc(sizeof(nxt_router_t));
364 if (nxt_slow_path(router == NULL)) {
365 return NXT_ERROR;
366 }
367
368 nxt_queue_init(&router->engines);
369 nxt_queue_init(&router->sockets);
370 nxt_queue_init(&router->apps);
371
372 nxt_router = router;
373
374 controller_port = rt->port_by_type[NXT_PROCESS_CONTROLLER];
375 if (controller_port != NULL) {
376 nxt_router_greet_controller(task, controller_port);
377 }
378
379 return NXT_OK;
380 }
381
382
383 static void
nxt_router_greet_controller(nxt_task_t * task,nxt_port_t * controller_port)384 nxt_router_greet_controller(nxt_task_t *task, nxt_port_t *controller_port)
385 {
386 nxt_port_socket_write(task, controller_port, NXT_PORT_MSG_PROCESS_READY,
387 -1, 0, 0, NULL);
388 }
389
390
391 static void
nxt_router_start_app_process_handler(nxt_task_t * task,nxt_port_t * port,void * data)392 nxt_router_start_app_process_handler(nxt_task_t *task, nxt_port_t *port,
393 void *data)
394 {
395 size_t size;
396 uint32_t stream;
397 nxt_fd_t port_fd, queue_fd;
398 nxt_int_t ret;
399 nxt_app_t *app;
400 nxt_buf_t *b;
401 nxt_port_t *dport;
402 nxt_runtime_t *rt;
403 nxt_app_joint_rpc_t *app_joint_rpc;
404
405 app = data;
406
407 nxt_thread_mutex_lock(&app->mutex);
408
409 dport = app->proto_port;
410
411 nxt_thread_mutex_unlock(&app->mutex);
412
413 if (dport != NULL) {
414 nxt_debug(task, "app '%V' %p start process", &app->name, app);
415
416 b = NULL;
417 port_fd = -1;
418 queue_fd = -1;
419
420 } else {
421 if (app->proto_port_requests > 0) {
422 nxt_debug(task, "app '%V' %p wait for prototype process",
423 &app->name, app);
424
425 app->proto_port_requests++;
426
427 goto skip;
428 }
429
430 nxt_debug(task, "app '%V' %p start prototype process", &app->name, app);
431
432 rt = task->thread->runtime;
433 dport = rt->port_by_type[NXT_PROCESS_MAIN];
434
435 size = app->name.length + 1 + app->conf.length;
436
437 b = nxt_buf_mem_alloc(task->thread->engine->mem_pool, size, 0);
438 if (nxt_slow_path(b == NULL)) {
439 goto failed;
440 }
441
442 nxt_buf_cpystr(b, &app->name);
443 *b->mem.free++ = '\0';
444 nxt_buf_cpystr(b, &app->conf);
445
446 port_fd = app->shared_port->pair[0];
447 queue_fd = app->shared_port->queue_fd;
448 }
449
450 app_joint_rpc = nxt_port_rpc_register_handler_ex(task, port,
451 nxt_router_app_port_ready,
452 nxt_router_app_port_error,
453 sizeof(nxt_app_joint_rpc_t));
454 if (nxt_slow_path(app_joint_rpc == NULL)) {
455 goto failed;
456 }
457
458 stream = nxt_port_rpc_ex_stream(app_joint_rpc);
459
460 ret = nxt_port_socket_write2(task, dport, NXT_PORT_MSG_START_PROCESS,
461 port_fd, queue_fd, stream, port->id, b);
462 if (nxt_slow_path(ret != NXT_OK)) {
463 nxt_port_rpc_cancel(task, port, stream);
464
465 goto failed;
466 }
467
468 app_joint_rpc->app_joint = app->joint;
469 app_joint_rpc->generation = app->generation;
470 app_joint_rpc->proto = (b != NULL);
471
472 if (b != NULL) {
473 app->proto_port_requests++;
474
475 b = NULL;
476 }
477
478 nxt_router_app_joint_use(task, app->joint, 1);
479
480 failed:
481
482 if (b != NULL) {
483 nxt_mp_free(b->data, b);
484 }
485
486 skip:
487
488 nxt_router_app_use(task, app, -1);
489 }
490
491
492 static void
nxt_router_app_joint_use(nxt_task_t * task,nxt_app_joint_t * app_joint,int i)493 nxt_router_app_joint_use(nxt_task_t *task, nxt_app_joint_t *app_joint, int i)
494 {
495 app_joint->use_count += i;
496
497 if (app_joint->use_count == 0) {
498 nxt_assert(app_joint->app == NULL);
499
500 nxt_free(app_joint);
501 }
502 }
503
504
505 static nxt_int_t
nxt_router_start_app_process(nxt_task_t * task,nxt_app_t * app)506 nxt_router_start_app_process(nxt_task_t *task, nxt_app_t *app)
507 {
508 nxt_int_t res;
509 nxt_port_t *router_port;
510 nxt_runtime_t *rt;
511
512 nxt_debug(task, "app '%V' start process", &app->name);
513
514 rt = task->thread->runtime;
515 router_port = rt->port_by_type[NXT_PROCESS_ROUTER];
516
517 nxt_router_app_use(task, app, 1);
518
519 res = nxt_port_post(task, router_port, nxt_router_start_app_process_handler,
520 app);
521
522 if (res == NXT_OK) {
523 return res;
524 }
525
526 nxt_thread_mutex_lock(&app->mutex);
527
528 app->pending_processes--;
529
530 nxt_thread_mutex_unlock(&app->mutex);
531
532 nxt_router_app_use(task, app, -1);
533
534 return NXT_ERROR;
535 }
536
537
538 nxt_inline nxt_bool_t
nxt_router_msg_cancel(nxt_task_t * task,nxt_request_rpc_data_t * req_rpc_data)539 nxt_router_msg_cancel(nxt_task_t *task, nxt_request_rpc_data_t *req_rpc_data)
540 {
541 nxt_buf_t *b, *next;
542 nxt_bool_t cancelled;
543 nxt_port_t *app_port;
544 nxt_msg_info_t *msg_info;
545
546 msg_info = &req_rpc_data->msg_info;
547
548 if (msg_info->buf == NULL) {
549 return 0;
550 }
551
552 app_port = req_rpc_data->app_port;
553
554 if (app_port != NULL && app_port->id == NXT_SHARED_PORT_ID) {
555 cancelled = nxt_app_queue_cancel(app_port->queue,
556 msg_info->tracking_cookie,
557 req_rpc_data->stream);
558
559 if (cancelled) {
560 nxt_debug(task, "stream #%uD: cancelled by router",
561 req_rpc_data->stream);
562 }
563
564 } else {
565 cancelled = 0;
566 }
567
568 for (b = msg_info->buf; b != NULL; b = next) {
569 next = b->next;
570 b->next = NULL;
571
572 if (b->is_port_mmap_sent) {
573 b->is_port_mmap_sent = cancelled == 0;
574 }
575
576 b->completion_handler(task, b, b->parent);
577 }
578
579 msg_info->buf = NULL;
580
581 return cancelled;
582 }
583
584
585 nxt_inline nxt_bool_t
nxt_queue_chk_remove(nxt_queue_link_t * lnk)586 nxt_queue_chk_remove(nxt_queue_link_t *lnk)
587 {
588 if (lnk->next != NULL) {
589 nxt_queue_remove(lnk);
590
591 lnk->next = NULL;
592
593 return 1;
594 }
595
596 return 0;
597 }
598
599
600 nxt_inline void
nxt_request_rpc_data_unlink(nxt_task_t * task,nxt_request_rpc_data_t * req_rpc_data)601 nxt_request_rpc_data_unlink(nxt_task_t *task,
602 nxt_request_rpc_data_t *req_rpc_data)
603 {
604 nxt_app_t *app;
605 nxt_bool_t unlinked;
606 nxt_http_request_t *r;
607
608 nxt_router_msg_cancel(task, req_rpc_data);
609
610 app = req_rpc_data->app;
611
612 if (req_rpc_data->app_port != NULL) {
613 nxt_router_app_port_release(task, app, req_rpc_data->app_port,
614 req_rpc_data->apr_action);
615
616 req_rpc_data->app_port = NULL;
617 }
618
619 r = req_rpc_data->request;
620
621 if (r != NULL) {
622 r->timer_data = NULL;
623
624 nxt_router_http_request_release_post(task, r);
625
626 r->req_rpc_data = NULL;
627 req_rpc_data->request = NULL;
628
629 if (app != NULL) {
630 unlinked = 0;
631
632 nxt_thread_mutex_lock(&app->mutex);
633
634 if (r->app_link.next != NULL) {
635 nxt_queue_remove(&r->app_link);
636 r->app_link.next = NULL;
637
638 unlinked = 1;
639 }
640
641 nxt_thread_mutex_unlock(&app->mutex);
642
643 if (unlinked) {
644 nxt_mp_release(r->mem_pool);
645 }
646 }
647 }
648
649 if (app != NULL) {
650 nxt_router_app_use(task, app, -1);
651
652 req_rpc_data->app = NULL;
653 }
654
655 if (req_rpc_data->msg_info.body_fd != -1) {
656 nxt_fd_close(req_rpc_data->msg_info.body_fd);
657
658 req_rpc_data->msg_info.body_fd = -1;
659 }
660
661 if (req_rpc_data->rpc_cancel) {
662 req_rpc_data->rpc_cancel = 0;
663
664 nxt_port_rpc_cancel(task, task->thread->engine->port,
665 req_rpc_data->stream);
666 }
667 }
668
669
670 static void
nxt_router_new_port_handler(nxt_task_t * task,nxt_port_recv_msg_t * msg)671 nxt_router_new_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
672 {
673 nxt_int_t res;
674 nxt_app_t *app;
675 nxt_port_t *port, *main_app_port;
676 nxt_runtime_t *rt;
677
678 nxt_port_new_port_handler(task, msg);
679
680 port = msg->u.new_port;
681
682 if (port != NULL && port->type == NXT_PROCESS_CONTROLLER) {
683 nxt_router_greet_controller(task, msg->u.new_port);
684 }
685
686 if (port != NULL && port->type == NXT_PROCESS_PROTOTYPE) {
687 nxt_port_rpc_handler(task, msg);
688
689 return;
690 }
691
692 if (port == NULL || port->type != NXT_PROCESS_APP) {
693
694 if (msg->port_msg.stream == 0) {
695 return;
696 }
697
698 msg->port_msg.type = _NXT_PORT_MSG_RPC_ERROR;
699
700 } else {
701 if (msg->fd[1] != -1) {
702 res = nxt_router_port_queue_map(task, port, msg->fd[1]);
703 if (nxt_slow_path(res != NXT_OK)) {
704 return;
705 }
706
707 nxt_fd_close(msg->fd[1]);
708 msg->fd[1] = -1;
709 }
710 }
711
712 if (msg->port_msg.stream != 0) {
713 nxt_port_rpc_handler(task, msg);
714 return;
715 }
716
717 nxt_debug(task, "new port id %d (%d)", port->id, port->type);
718
719 /*
720 * Port with "id == 0" is application 'main' port and it always
721 * should come with non-zero stream.
722 */
723 nxt_assert(port->id != 0);
724
725 /* Find 'main' app port and get app reference. */
726 rt = task->thread->runtime;
727
728 /*
729 * It is safe to access 'runtime->ports' hash because 'NEW_PORT'
730 * sent to main port (with id == 0) and processed in main thread.
731 */
732 main_app_port = nxt_port_hash_find(&rt->ports, port->pid, 0);
733 nxt_assert(main_app_port != NULL);
734
735 app = main_app_port->app;
736
737 if (nxt_fast_path(app != NULL)) {
738 nxt_thread_mutex_lock(&app->mutex);
739
740 /* TODO here should be find-and-add code because there can be
741 port waiters in port_hash */
742 nxt_port_hash_add(&app->port_hash, port);
743 app->port_hash_count++;
744
745 nxt_thread_mutex_unlock(&app->mutex);
746
747 port->app = app;
748 }
749
750 port->main_app_port = main_app_port;
751
752 nxt_port_socket_write(task, port, NXT_PORT_MSG_PORT_ACK, -1, 0, 0, NULL);
753 }
754
755
756 static void
nxt_router_conf_data_handler(nxt_task_t * task,nxt_port_recv_msg_t * msg)757 nxt_router_conf_data_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
758 {
759 void *p;
760 size_t size;
761 nxt_int_t ret;
762 nxt_port_t *port;
763 nxt_router_temp_conf_t *tmcf;
764
765 port = nxt_runtime_port_find(task->thread->runtime,
766 msg->port_msg.pid,
767 msg->port_msg.reply_port);
768 if (nxt_slow_path(port == NULL)) {
769 nxt_alert(task, "conf_data_handler: reply port not found");
770 return;
771 }
772
773 p = MAP_FAILED;
774
775 /*
776 * Ancient compilers like gcc 4.8.5 on CentOS 7 wants 'size' to be
777 * initialized in 'cleanup' section.
778 */
779 size = 0;
780
781 tmcf = nxt_router_temp_conf(task);
782 if (nxt_slow_path(tmcf == NULL)) {
783 goto fail;
784 }
785
786 if (nxt_slow_path(msg->fd[0] == -1)) {
787 nxt_alert(task, "conf_data_handler: invalid shm fd");
788 goto fail;
789 }
790
791 if (nxt_buf_mem_used_size(&msg->buf->mem) != sizeof(size_t)) {
792 nxt_alert(task, "conf_data_handler: unexpected buffer size (%d)",
793 (int) nxt_buf_mem_used_size(&msg->buf->mem));
794 goto fail;
795 }
796
797 nxt_memcpy(&size, msg->buf->mem.pos, sizeof(size_t));
798
799 p = nxt_mem_mmap(NULL, size, PROT_READ, MAP_SHARED, msg->fd[0], 0);
800
801 nxt_fd_close(msg->fd[0]);
802 msg->fd[0] = -1;
803
804 if (nxt_slow_path(p == MAP_FAILED)) {
805 goto fail;
806 }
807
808 nxt_debug(task, "conf_data_handler(%uz): %*s", size, size, p);
809
810 tmcf->router_conf->router = nxt_router;
811 tmcf->stream = msg->port_msg.stream;
812 tmcf->port = port;
813
814 nxt_port_use(task, tmcf->port, 1);
815
816 ret = nxt_router_conf_create(task, tmcf, p, nxt_pointer_to(p, size));
817
818 if (nxt_fast_path(ret == NXT_OK)) {
819 nxt_router_conf_apply(task, tmcf, NULL);
820
821 } else {
822 nxt_router_conf_error(task, tmcf);
823 }
824
825 goto cleanup;
826
827 fail:
828
829 nxt_port_socket_write(task, port, NXT_PORT_MSG_RPC_ERROR, -1,
830 msg->port_msg.stream, 0, NULL);
831
832 if (tmcf != NULL) {
833 nxt_mp_release(tmcf->mem_pool);
834 }
835
836 cleanup:
837
838 if (p != MAP_FAILED) {
839 nxt_mem_munmap(p, size);
840 }
841
842 if (msg->fd[0] != -1) {
843 nxt_fd_close(msg->fd[0]);
844 msg->fd[0] = -1;
845 }
846 }
847
848
849 static void
nxt_router_app_restart_handler(nxt_task_t * task,nxt_port_recv_msg_t * msg)850 nxt_router_app_restart_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
851 {
852 nxt_app_t *app;
853 nxt_int_t ret;
854 nxt_str_t app_name;
855 nxt_port_t *reply_port, *shared_port, *old_shared_port;
856 nxt_port_t *proto_port;
857 nxt_port_msg_type_t reply;
858
859 reply_port = nxt_runtime_port_find(task->thread->runtime,
860 msg->port_msg.pid,
861 msg->port_msg.reply_port);
862 if (nxt_slow_path(reply_port == NULL)) {
863 nxt_alert(task, "app_restart_handler: reply port not found");
864 return;
865 }
866
867 app_name.length = nxt_buf_mem_used_size(&msg->buf->mem);
868 app_name.start = msg->buf->mem.pos;
869
870 nxt_debug(task, "app_restart_handler: %V", &app_name);
871
872 app = nxt_router_app_find(&nxt_router->apps, &app_name);
873
874 if (nxt_fast_path(app != NULL)) {
875 shared_port = nxt_port_new(task, NXT_SHARED_PORT_ID, nxt_pid,
876 NXT_PROCESS_APP);
877 if (nxt_slow_path(shared_port == NULL)) {
878 goto fail;
879 }
880
881 ret = nxt_port_socket_init(task, shared_port, 0);
882 if (nxt_slow_path(ret != NXT_OK)) {
883 nxt_port_use(task, shared_port, -1);
884 goto fail;
885 }
886
887 ret = nxt_router_app_queue_init(task, shared_port);
888 if (nxt_slow_path(ret != NXT_OK)) {
889 nxt_port_write_close(shared_port);
890 nxt_port_read_close(shared_port);
891 nxt_port_use(task, shared_port, -1);
892 goto fail;
893 }
894
895 nxt_port_write_enable(task, shared_port);
896
897 nxt_thread_mutex_lock(&app->mutex);
898
899 proto_port = app->proto_port;
900
901 if (proto_port != NULL) {
902 nxt_debug(task, "send QUIT to prototype '%V' pid %PI", &app->name,
903 proto_port->pid);
904
905 app->proto_port = NULL;
906 proto_port->app = NULL;
907 }
908
909 app->generation++;
910
911 shared_port->app = app;
912
913 old_shared_port = app->shared_port;
914 old_shared_port->app = NULL;
915
916 app->shared_port = shared_port;
917
918 nxt_thread_mutex_unlock(&app->mutex);
919
920 nxt_port_close(task, old_shared_port);
921 nxt_port_use(task, old_shared_port, -1);
922
923 if (proto_port != NULL) {
924 (void) nxt_port_socket_write(task, proto_port, NXT_PORT_MSG_QUIT,
925 -1, 0, 0, NULL);
926
927 nxt_port_close(task, proto_port);
928
929 nxt_port_use(task, proto_port, -1);
930 }
931
932 reply = NXT_PORT_MSG_RPC_READY_LAST;
933
934 } else {
935
936 fail:
937
938 reply = NXT_PORT_MSG_RPC_ERROR;
939 }
940
941 nxt_port_socket_write(task, reply_port, reply, -1, msg->port_msg.stream,
942 0, NULL);
943 }
944
945
946 static void
nxt_router_app_process_remove_pid(nxt_task_t * task,nxt_port_t * port,void * data)947 nxt_router_app_process_remove_pid(nxt_task_t *task, nxt_port_t *port,
948 void *data)
949 {
950 union {
951 nxt_pid_t removed_pid;
952 void *data;
953 } u;
954
955 u.data = data;
956
957 nxt_port_rpc_remove_peer(task, port, u.removed_pid);
958 }
959
960
961 static void
nxt_router_remove_pid_handler(nxt_task_t * task,nxt_port_recv_msg_t * msg)962 nxt_router_remove_pid_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
963 {
964 nxt_event_engine_t *engine;
965
966 nxt_port_remove_pid_handler(task, msg);
967
968 nxt_queue_each(engine, &nxt_router->engines, nxt_event_engine_t, link0)
969 {
970 if (nxt_fast_path(engine->port != NULL)) {
971 nxt_port_post(task, engine->port, nxt_router_app_process_remove_pid,
972 msg->u.data);
973 }
974 }
975 nxt_queue_loop;
976
977 if (msg->port_msg.stream == 0) {
978 return;
979 }
980
981 msg->port_msg.type = _NXT_PORT_MSG_RPC_ERROR;
982
983 nxt_port_rpc_handler(task, msg);
984 }
985
986
987 static nxt_router_temp_conf_t *
nxt_router_temp_conf(nxt_task_t * task)988 nxt_router_temp_conf(nxt_task_t *task)
989 {
990 nxt_mp_t *mp, *tmp;
991 nxt_router_conf_t *rtcf;
992 nxt_router_temp_conf_t *tmcf;
993
994 mp = nxt_mp_create(1024, 128, 256, 32);
995 if (nxt_slow_path(mp == NULL)) {
996 return NULL;
997 }
998
999 rtcf = nxt_mp_zget(mp, sizeof(nxt_router_conf_t));
1000 if (nxt_slow_path(rtcf == NULL)) {
1001 goto fail;
1002 }
1003
1004 rtcf->mem_pool = mp;
1005
1006 tmp = nxt_mp_create(1024, 128, 256, 32);
1007 if (nxt_slow_path(tmp == NULL)) {
1008 goto fail;
1009 }
1010
1011 tmcf = nxt_mp_zget(tmp, sizeof(nxt_router_temp_conf_t));
1012 if (nxt_slow_path(tmcf == NULL)) {
1013 goto temp_fail;
1014 }
1015
1016 tmcf->mem_pool = tmp;
1017 tmcf->router_conf = rtcf;
1018 tmcf->count = 1;
1019 tmcf->engine = task->thread->engine;
1020
1021 tmcf->engines = nxt_array_create(tmcf->mem_pool, 4,
1022 sizeof(nxt_router_engine_conf_t));
1023 if (nxt_slow_path(tmcf->engines == NULL)) {
1024 goto temp_fail;
1025 }
1026
1027 nxt_queue_init(&creating_sockets);
1028 nxt_queue_init(&pending_sockets);
1029 nxt_queue_init(&updating_sockets);
1030 nxt_queue_init(&keeping_sockets);
1031 nxt_queue_init(&deleting_sockets);
1032
1033 #if (NXT_TLS)
1034 nxt_queue_init(&tmcf->tls);
1035 #endif
1036
1037 nxt_queue_init(&tmcf->apps);
1038 nxt_queue_init(&tmcf->previous);
1039
1040 return tmcf;
1041
1042 temp_fail:
1043
1044 nxt_mp_destroy(tmp);
1045
1046 fail:
1047
1048 nxt_mp_destroy(mp);
1049
1050 return NULL;
1051 }
1052
1053
1054 nxt_inline nxt_bool_t
nxt_router_app_can_start(nxt_app_t * app)1055 nxt_router_app_can_start(nxt_app_t *app)
1056 {
1057 return app->processes + app->pending_processes < app->max_processes
1058 && app->pending_processes < app->max_pending_processes;
1059 }
1060
1061
1062 nxt_inline nxt_bool_t
nxt_router_app_need_start(nxt_app_t * app)1063 nxt_router_app_need_start(nxt_app_t *app)
1064 {
1065 return (app->active_requests
1066 > app->port_hash_count + app->pending_processes)
1067 || (app->spare_processes
1068 > app->idle_processes + app->pending_processes);
1069 }
1070
1071
1072 static void
nxt_router_conf_apply(nxt_task_t * task,void * obj,void * data)1073 nxt_router_conf_apply(nxt_task_t *task, void *obj, void *data)
1074 {
1075 nxt_int_t ret;
1076 nxt_app_t *app;
1077 nxt_router_t *router;
1078 nxt_runtime_t *rt;
1079 nxt_queue_link_t *qlk;
1080 nxt_socket_conf_t *skcf;
1081 nxt_router_conf_t *rtcf;
1082 nxt_router_temp_conf_t *tmcf;
1083 const nxt_event_interface_t *interface;
1084 #if (NXT_TLS)
1085 nxt_router_tlssock_t *tls;
1086 #endif
1087
1088 tmcf = obj;
1089
1090 qlk = nxt_queue_first(&pending_sockets);
1091
1092 if (qlk != nxt_queue_tail(&pending_sockets)) {
1093 nxt_queue_remove(qlk);
1094 nxt_queue_insert_tail(&creating_sockets, qlk);
1095
1096 skcf = nxt_queue_link_data(qlk, nxt_socket_conf_t, link);
1097
1098 nxt_router_listen_socket_rpc_create(task, tmcf, skcf);
1099
1100 return;
1101 }
1102
1103 #if (NXT_TLS)
1104 qlk = nxt_queue_last(&tmcf->tls);
1105
1106 if (qlk != nxt_queue_head(&tmcf->tls)) {
1107 nxt_queue_remove(qlk);
1108
1109 tls = nxt_queue_link_data(qlk, nxt_router_tlssock_t, link);
1110
1111 nxt_cert_store_get(task, &tls->name, tmcf->mem_pool,
1112 nxt_router_tls_rpc_handler, tls);
1113 return;
1114 }
1115 #endif
1116
1117 nxt_queue_each(app, &tmcf->apps, nxt_app_t, link) {
1118
1119 if (nxt_router_app_need_start(app)) {
1120 nxt_router_app_rpc_create(task, tmcf, app);
1121 return;
1122 }
1123
1124 } nxt_queue_loop;
1125
1126 rtcf = tmcf->router_conf;
1127
1128 if (rtcf->access_log != NULL && rtcf->access_log->fd == -1) {
1129 nxt_router_access_log_open(task, tmcf);
1130 return;
1131 }
1132
1133 rt = task->thread->runtime;
1134
1135 interface = nxt_service_get(rt->services, "engine", NULL);
1136
1137 router = rtcf->router;
1138
1139 ret = nxt_router_engines_create(task, router, tmcf, interface);
1140 if (nxt_slow_path(ret != NXT_OK)) {
1141 goto fail;
1142 }
1143
1144 ret = nxt_router_threads_create(task, rt, tmcf);
1145 if (nxt_slow_path(ret != NXT_OK)) {
1146 goto fail;
1147 }
1148
1149 nxt_router_apps_sort(task, router, tmcf);
1150
1151 nxt_router_apps_hash_use(task, rtcf, 1);
1152
1153 nxt_router_engines_post(router, tmcf);
1154
1155 nxt_queue_add(&router->sockets, &updating_sockets);
1156 nxt_queue_add(&router->sockets, &creating_sockets);
1157
1158 if (router->access_log != rtcf->access_log) {
1159 nxt_router_access_log_use(&router->lock, rtcf->access_log);
1160
1161 nxt_router_access_log_release(task, &router->lock, router->access_log);
1162
1163 router->access_log = rtcf->access_log;
1164 }
1165
1166 nxt_router_conf_ready(task, tmcf);
1167
1168 return;
1169
1170 fail:
1171
1172 nxt_router_conf_error(task, tmcf);
1173
1174 return;
1175 }
1176
1177
1178 static void
nxt_router_conf_wait(nxt_task_t * task,void * obj,void * data)1179 nxt_router_conf_wait(nxt_task_t *task, void *obj, void *data)
1180 {
1181 nxt_joint_job_t *job;
1182
1183 job = obj;
1184
1185 nxt_router_conf_ready(task, job->tmcf);
1186 }
1187
1188
1189 static void
nxt_router_conf_ready(nxt_task_t * task,nxt_router_temp_conf_t * tmcf)1190 nxt_router_conf_ready(nxt_task_t *task, nxt_router_temp_conf_t *tmcf)
1191 {
1192 uint32_t count;
1193 nxt_router_conf_t *rtcf;
1194 nxt_thread_spinlock_t *lock;
1195
1196 nxt_debug(task, "temp conf %p count: %D", tmcf, tmcf->count);
1197
1198 if (--tmcf->count > 0) {
1199 return;
1200 }
1201
1202 nxt_router_conf_send(task, tmcf, NXT_PORT_MSG_RPC_READY_LAST);
1203
1204 rtcf = tmcf->router_conf;
1205
1206 lock = &rtcf->router->lock;
1207
1208 nxt_thread_spin_lock(lock);
1209
1210 count = rtcf->count;
1211
1212 nxt_thread_spin_unlock(lock);
1213
1214 nxt_debug(task, "rtcf %p: %D", rtcf, count);
1215
1216 if (count == 0) {
1217 nxt_router_apps_hash_use(task, rtcf, -1);
1218
1219 nxt_router_access_log_release(task, lock, rtcf->access_log);
1220
1221 nxt_mp_destroy(rtcf->mem_pool);
1222 }
1223
1224 nxt_mp_release(tmcf->mem_pool);
1225 }
1226
1227
1228 static void
nxt_router_conf_error(nxt_task_t * task,nxt_router_temp_conf_t * tmcf)1229 nxt_router_conf_error(nxt_task_t *task, nxt_router_temp_conf_t *tmcf)
1230 {
1231 nxt_app_t *app;
1232 nxt_queue_t new_socket_confs;
1233 nxt_socket_t s;
1234 nxt_router_t *router;
1235 nxt_queue_link_t *qlk;
1236 nxt_socket_conf_t *skcf;
1237 nxt_router_conf_t *rtcf;
1238
1239 nxt_alert(task, "failed to apply new conf");
1240
1241 for (qlk = nxt_queue_first(&creating_sockets);
1242 qlk != nxt_queue_tail(&creating_sockets);
1243 qlk = nxt_queue_next(qlk))
1244 {
1245 skcf = nxt_queue_link_data(qlk, nxt_socket_conf_t, link);
1246 s = skcf->listen->socket;
1247
1248 if (s != -1) {
1249 nxt_socket_close(task, s);
1250 }
1251
1252 nxt_free(skcf->listen);
1253 }
1254
1255 nxt_queue_init(&new_socket_confs);
1256 nxt_queue_add(&new_socket_confs, &updating_sockets);
1257 nxt_queue_add(&new_socket_confs, &pending_sockets);
1258 nxt_queue_add(&new_socket_confs, &creating_sockets);
1259
1260 rtcf = tmcf->router_conf;
1261
1262 nxt_queue_each(app, &tmcf->apps, nxt_app_t, link) {
1263
1264 nxt_router_app_unlink(task, app);
1265
1266 } nxt_queue_loop;
1267
1268 router = rtcf->router;
1269
1270 nxt_queue_add(&router->sockets, &keeping_sockets);
1271 nxt_queue_add(&router->sockets, &deleting_sockets);
1272
1273 nxt_queue_add(&router->apps, &tmcf->previous);
1274
1275 // TODO: new engines and threads
1276
1277 nxt_router_access_log_release(task, &router->lock, rtcf->access_log);
1278
1279 nxt_mp_destroy(rtcf->mem_pool);
1280
1281 nxt_router_conf_send(task, tmcf, NXT_PORT_MSG_RPC_ERROR);
1282
1283 nxt_mp_release(tmcf->mem_pool);
1284 }
1285
1286
1287 static void
nxt_router_conf_send(nxt_task_t * task,nxt_router_temp_conf_t * tmcf,nxt_port_msg_type_t type)1288 nxt_router_conf_send(nxt_task_t *task, nxt_router_temp_conf_t *tmcf,
1289 nxt_port_msg_type_t type)
1290 {
1291 nxt_port_socket_write(task, tmcf->port, type, -1, tmcf->stream, 0, NULL);
1292
1293 nxt_port_use(task, tmcf->port, -1);
1294
1295 tmcf->port = NULL;
1296 }
1297
1298
1299 static nxt_conf_map_t nxt_router_conf[] = {
1300 {
1301 nxt_string("listeners_threads"),
1302 NXT_CONF_MAP_INT32,
1303 offsetof(nxt_router_conf_t, threads),
1304 },
1305 };
1306
1307
1308 static nxt_conf_map_t nxt_router_app_conf[] = {
1309 {
1310 nxt_string("type"),
1311 NXT_CONF_MAP_STR,
1312 offsetof(nxt_router_app_conf_t, type),
1313 },
1314
1315 {
1316 nxt_string("limits"),
1317 NXT_CONF_MAP_PTR,
1318 offsetof(nxt_router_app_conf_t, limits_value),
1319 },
1320
1321 {
1322 nxt_string("processes"),
1323 NXT_CONF_MAP_INT32,
1324 offsetof(nxt_router_app_conf_t, processes),
1325 },
1326
1327 {
1328 nxt_string("processes"),
1329 NXT_CONF_MAP_PTR,
1330 offsetof(nxt_router_app_conf_t, processes_value),
1331 },
1332
1333 {
1334 nxt_string("targets"),
1335 NXT_CONF_MAP_PTR,
1336 offsetof(nxt_router_app_conf_t, targets_value),
1337 },
1338 };
1339
1340
1341 static nxt_conf_map_t nxt_router_app_limits_conf[] = {
1342 {
1343 nxt_string("timeout"),
1344 NXT_CONF_MAP_MSEC,
1345 offsetof(nxt_router_app_conf_t, timeout),
1346 },
1347 };
1348
1349
1350 static nxt_conf_map_t nxt_router_app_processes_conf[] = {
1351 {
1352 nxt_string("spare"),
1353 NXT_CONF_MAP_INT32,
1354 offsetof(nxt_router_app_conf_t, spare_processes),
1355 },
1356
1357 {
1358 nxt_string("max"),
1359 NXT_CONF_MAP_INT32,
1360 offsetof(nxt_router_app_conf_t, max_processes),
1361 },
1362
1363 {
1364 nxt_string("idle_timeout"),
1365 NXT_CONF_MAP_MSEC,
1366 offsetof(nxt_router_app_conf_t, idle_timeout),
1367 },
1368 };
1369
1370
1371 static nxt_conf_map_t nxt_router_listener_conf[] = {
1372 {
1373 nxt_string("pass"),
1374 NXT_CONF_MAP_STR_COPY,
1375 offsetof(nxt_router_listener_conf_t, pass),
1376 },
1377
1378 {
1379 nxt_string("application"),
1380 NXT_CONF_MAP_STR_COPY,
1381 offsetof(nxt_router_listener_conf_t, application),
1382 },
1383 };
1384
1385
1386 static nxt_conf_map_t nxt_router_http_conf[] = {
1387 {
1388 nxt_string("header_buffer_size"),
1389 NXT_CONF_MAP_SIZE,
1390 offsetof(nxt_socket_conf_t, header_buffer_size),
1391 },
1392
1393 {
1394 nxt_string("large_header_buffer_size"),
1395 NXT_CONF_MAP_SIZE,
1396 offsetof(nxt_socket_conf_t, large_header_buffer_size),
1397 },
1398
1399 {
1400 nxt_string("large_header_buffers"),
1401 NXT_CONF_MAP_SIZE,
1402 offsetof(nxt_socket_conf_t, large_header_buffers),
1403 },
1404
1405 {
1406 nxt_string("body_buffer_size"),
1407 NXT_CONF_MAP_SIZE,
1408 offsetof(nxt_socket_conf_t, body_buffer_size),
1409 },
1410
1411 {
1412 nxt_string("max_body_size"),
1413 NXT_CONF_MAP_SIZE,
1414 offsetof(nxt_socket_conf_t, max_body_size),
1415 },
1416
1417 {
1418 nxt_string("idle_timeout"),
1419 NXT_CONF_MAP_MSEC,
1420 offsetof(nxt_socket_conf_t, idle_timeout),
1421 },
1422
1423 {
1424 nxt_string("header_read_timeout"),
1425 NXT_CONF_MAP_MSEC,
1426 offsetof(nxt_socket_conf_t, header_read_timeout),
1427 },
1428
1429 {
1430 nxt_string("body_read_timeout"),
1431 NXT_CONF_MAP_MSEC,
1432 offsetof(nxt_socket_conf_t, body_read_timeout),
1433 },
1434
1435 {
1436 nxt_string("send_timeout"),
1437 NXT_CONF_MAP_MSEC,
1438 offsetof(nxt_socket_conf_t, send_timeout),
1439 },
1440
1441 {
1442 nxt_string("body_temp_path"),
1443 NXT_CONF_MAP_STR,
1444 offsetof(nxt_socket_conf_t, body_temp_path),
1445 },
1446
1447 {
1448 nxt_string("discard_unsafe_fields"),
1449 NXT_CONF_MAP_INT8,
1450 offsetof(nxt_socket_conf_t, discard_unsafe_fields),
1451 },
1452 };
1453
1454
1455 static nxt_conf_map_t nxt_router_websocket_conf[] = {
1456 {
1457 nxt_string("max_frame_size"),
1458 NXT_CONF_MAP_SIZE,
1459 offsetof(nxt_websocket_conf_t, max_frame_size),
1460 },
1461
1462 {
1463 nxt_string("read_timeout"),
1464 NXT_CONF_MAP_MSEC,
1465 offsetof(nxt_websocket_conf_t, read_timeout),
1466 },
1467
1468 {
1469 nxt_string("keepalive_interval"),
1470 NXT_CONF_MAP_MSEC,
1471 offsetof(nxt_websocket_conf_t, keepalive_interval),
1472 },
1473
1474 };
1475
1476
1477 static nxt_int_t
nxt_router_conf_create(nxt_task_t * task,nxt_router_temp_conf_t * tmcf,u_char * start,u_char * end)1478 nxt_router_conf_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf,
1479 u_char *start, u_char *end)
1480 {
1481 u_char *p;
1482 size_t size;
1483 nxt_mp_t *mp, *app_mp;
1484 uint32_t next, next_target;
1485 nxt_int_t ret;
1486 nxt_str_t name, path, target;
1487 nxt_app_t *app, *prev;
1488 nxt_str_t *t, *s, *targets;
1489 nxt_uint_t n, i;
1490 nxt_port_t *port;
1491 nxt_router_t *router;
1492 nxt_app_joint_t *app_joint;
1493 #if (NXT_TLS)
1494 nxt_tls_init_t *tls_init;
1495 nxt_conf_value_t *certificate;
1496 #endif
1497 nxt_conf_value_t *conf, *http, *value, *websocket;
1498 nxt_conf_value_t *applications, *application;
1499 nxt_conf_value_t *listeners, *listener;
1500 nxt_conf_value_t *routes_conf, *static_conf, *client_ip_conf;
1501 nxt_socket_conf_t *skcf;
1502 nxt_http_routes_t *routes;
1503 nxt_event_engine_t *engine;
1504 nxt_app_lang_module_t *lang;
1505 nxt_router_app_conf_t apcf;
1506 nxt_router_access_log_t *access_log;
1507 nxt_router_listener_conf_t lscf;
1508
1509 static nxt_str_t http_path = nxt_string("/settings/http");
1510 static nxt_str_t applications_path = nxt_string("/applications");
1511 static nxt_str_t listeners_path = nxt_string("/listeners");
1512 static nxt_str_t routes_path = nxt_string("/routes");
1513 static nxt_str_t access_log_path = nxt_string("/access_log");
1514 #if (NXT_TLS)
1515 static nxt_str_t certificate_path = nxt_string("/tls/certificate");
1516 static nxt_str_t conf_commands_path = nxt_string("/tls/conf_commands");
1517 static nxt_str_t conf_cache_path = nxt_string("/tls/session/cache_size");
1518 static nxt_str_t conf_timeout_path = nxt_string("/tls/session/timeout");
1519 static nxt_str_t conf_tickets = nxt_string("/tls/session/tickets");
1520 #endif
1521 static nxt_str_t static_path = nxt_string("/settings/http/static");
1522 static nxt_str_t websocket_path = nxt_string("/settings/http/websocket");
1523 static nxt_str_t client_ip_path = nxt_string("/client_ip");
1524
1525 conf = nxt_conf_json_parse(tmcf->mem_pool, start, end, NULL);
1526 if (conf == NULL) {
1527 nxt_alert(task, "configuration parsing error");
1528 return NXT_ERROR;
1529 }
1530
1531 mp = tmcf->router_conf->mem_pool;
1532
1533 ret = nxt_conf_map_object(mp, conf, nxt_router_conf,
1534 nxt_nitems(nxt_router_conf), tmcf->router_conf);
1535 if (ret != NXT_OK) {
1536 nxt_alert(task, "root map error");
1537 return NXT_ERROR;
1538 }
1539
1540 if (tmcf->router_conf->threads == 0) {
1541 tmcf->router_conf->threads = nxt_ncpu;
1542 }
1543
1544 static_conf = nxt_conf_get_path(conf, &static_path);
1545
1546 ret = nxt_router_conf_process_static(task, tmcf->router_conf, static_conf);
1547 if (nxt_slow_path(ret != NXT_OK)) {
1548 return NXT_ERROR;
1549 }
1550
1551 router = tmcf->router_conf->router;
1552
1553 applications = nxt_conf_get_path(conf, &applications_path);
1554
1555 if (applications != NULL) {
1556 next = 0;
1557
1558 for ( ;; ) {
1559 application = nxt_conf_next_object_member(applications,
1560 &name, &next);
1561 if (application == NULL) {
1562 break;
1563 }
1564
1565 nxt_debug(task, "application \"%V\"", &name);
1566
1567 size = nxt_conf_json_length(application, NULL);
1568
1569 app_mp = nxt_mp_create(4096, 128, 1024, 64);
1570 if (nxt_slow_path(app_mp == NULL)) {
1571 goto fail;
1572 }
1573
1574 app = nxt_mp_get(app_mp, sizeof(nxt_app_t) + name.length + size);
1575 if (app == NULL) {
1576 goto app_fail;
1577 }
1578
1579 nxt_memzero(app, sizeof(nxt_app_t));
1580
1581 app->mem_pool = app_mp;
1582
1583 app->name.start = nxt_pointer_to(app, sizeof(nxt_app_t));
1584 app->conf.start = nxt_pointer_to(app, sizeof(nxt_app_t)
1585 + name.length);
1586
1587 p = nxt_conf_json_print(app->conf.start, application, NULL);
1588 app->conf.length = p - app->conf.start;
1589
1590 nxt_assert(app->conf.length <= size);
1591
1592 nxt_debug(task, "application conf \"%V\"", &app->conf);
1593
1594 prev = nxt_router_app_find(&router->apps, &name);
1595
1596 if (prev != NULL && nxt_strstr_eq(&app->conf, &prev->conf)) {
1597 nxt_mp_destroy(app_mp);
1598
1599 nxt_queue_remove(&prev->link);
1600 nxt_queue_insert_tail(&tmcf->previous, &prev->link);
1601
1602 ret = nxt_router_apps_hash_add(tmcf->router_conf, prev);
1603 if (nxt_slow_path(ret != NXT_OK)) {
1604 goto fail;
1605 }
1606
1607 continue;
1608 }
1609
1610 apcf.processes = 1;
1611 apcf.max_processes = 1;
1612 apcf.spare_processes = 0;
1613 apcf.timeout = 0;
1614 apcf.idle_timeout = 15000;
1615 apcf.limits_value = NULL;
1616 apcf.processes_value = NULL;
1617 apcf.targets_value = NULL;
1618
1619 app_joint = nxt_malloc(sizeof(nxt_app_joint_t));
1620 if (nxt_slow_path(app_joint == NULL)) {
1621 goto app_fail;
1622 }
1623
1624 nxt_memzero(app_joint, sizeof(nxt_app_joint_t));
1625
1626 ret = nxt_conf_map_object(mp, application, nxt_router_app_conf,
1627 nxt_nitems(nxt_router_app_conf), &apcf);
1628 if (ret != NXT_OK) {
1629 nxt_alert(task, "application map error");
1630 goto app_fail;
1631 }
1632
1633 if (apcf.limits_value != NULL) {
1634
1635 if (nxt_conf_type(apcf.limits_value) != NXT_CONF_OBJECT) {
1636 nxt_alert(task, "application limits is not object");
1637 goto app_fail;
1638 }
1639
1640 ret = nxt_conf_map_object(mp, apcf.limits_value,
1641 nxt_router_app_limits_conf,
1642 nxt_nitems(nxt_router_app_limits_conf),
1643 &apcf);
1644 if (ret != NXT_OK) {
1645 nxt_alert(task, "application limits map error");
1646 goto app_fail;
1647 }
1648 }
1649
1650 if (apcf.processes_value != NULL
1651 && nxt_conf_type(apcf.processes_value) == NXT_CONF_OBJECT)
1652 {
1653 ret = nxt_conf_map_object(mp, apcf.processes_value,
1654 nxt_router_app_processes_conf,
1655 nxt_nitems(nxt_router_app_processes_conf),
1656 &apcf);
1657 if (ret != NXT_OK) {
1658 nxt_alert(task, "application processes map error");
1659 goto app_fail;
1660 }
1661
1662 } else {
1663 apcf.max_processes = apcf.processes;
1664 apcf.spare_processes = apcf.processes;
1665 }
1666
1667 if (apcf.targets_value != NULL) {
1668 n = nxt_conf_object_members_count(apcf.targets_value);
1669
1670 targets = nxt_mp_get(app_mp, sizeof(nxt_str_t) * n);
1671 if (nxt_slow_path(targets == NULL)) {
1672 goto app_fail;
1673 }
1674
1675 next_target = 0;
1676
1677 for (i = 0; i < n; i++) {
1678 (void) nxt_conf_next_object_member(apcf.targets_value,
1679 &target, &next_target);
1680
1681 s = nxt_str_dup(app_mp, &targets[i], &target);
1682 if (nxt_slow_path(s == NULL)) {
1683 goto app_fail;
1684 }
1685 }
1686
1687 } else {
1688 targets = NULL;
1689 }
1690
1691 nxt_debug(task, "application type: %V", &apcf.type);
1692 nxt_debug(task, "application processes: %D", apcf.processes);
1693 nxt_debug(task, "application request timeout: %M", apcf.timeout);
1694
1695 lang = nxt_app_lang_module(task->thread->runtime, &apcf.type);
1696
1697 if (lang == NULL) {
1698 nxt_alert(task, "unknown application type: \"%V\"", &apcf.type);
1699 goto app_fail;
1700 }
1701
1702 nxt_debug(task, "application language module: \"%s\"", lang->file);
1703
1704 ret = nxt_thread_mutex_create(&app->mutex);
1705 if (ret != NXT_OK) {
1706 goto app_fail;
1707 }
1708
1709 nxt_queue_init(&app->ports);
1710 nxt_queue_init(&app->spare_ports);
1711 nxt_queue_init(&app->idle_ports);
1712 nxt_queue_init(&app->ack_waiting_req);
1713
1714 app->name.length = name.length;
1715 nxt_memcpy(app->name.start, name.start, name.length);
1716
1717 app->type = lang->type;
1718 app->max_processes = apcf.max_processes;
1719 app->spare_processes = apcf.spare_processes;
1720 app->max_pending_processes = apcf.spare_processes
1721 ? apcf.spare_processes : 1;
1722 app->timeout = apcf.timeout;
1723 app->idle_timeout = apcf.idle_timeout;
1724
1725 app->targets = targets;
1726
1727 engine = task->thread->engine;
1728
1729 app->engine = engine;
1730
1731 app->adjust_idle_work.handler = nxt_router_adjust_idle_timer;
1732 app->adjust_idle_work.task = &engine->task;
1733 app->adjust_idle_work.obj = app;
1734
1735 nxt_queue_insert_tail(&tmcf->apps, &app->link);
1736
1737 ret = nxt_router_apps_hash_add(tmcf->router_conf, app);
1738 if (nxt_slow_path(ret != NXT_OK)) {
1739 goto app_fail;
1740 }
1741
1742 nxt_router_app_use(task, app, 1);
1743
1744 app->joint = app_joint;
1745
1746 app_joint->use_count = 1;
1747 app_joint->app = app;
1748
1749 app_joint->idle_timer.bias = NXT_TIMER_DEFAULT_BIAS;
1750 app_joint->idle_timer.work_queue = &engine->fast_work_queue;
1751 app_joint->idle_timer.handler = nxt_router_app_idle_timeout;
1752 app_joint->idle_timer.task = &engine->task;
1753 app_joint->idle_timer.log = app_joint->idle_timer.task->log;
1754
1755 app_joint->free_app_work.handler = nxt_router_free_app;
1756 app_joint->free_app_work.task = &engine->task;
1757 app_joint->free_app_work.obj = app_joint;
1758
1759 port = nxt_port_new(task, NXT_SHARED_PORT_ID, nxt_pid,
1760 NXT_PROCESS_APP);
1761 if (nxt_slow_path(port == NULL)) {
1762 return NXT_ERROR;
1763 }
1764
1765 ret = nxt_port_socket_init(task, port, 0);
1766 if (nxt_slow_path(ret != NXT_OK)) {
1767 nxt_port_use(task, port, -1);
1768 return NXT_ERROR;
1769 }
1770
1771 ret = nxt_router_app_queue_init(task, port);
1772 if (nxt_slow_path(ret != NXT_OK)) {
1773 nxt_port_write_close(port);
1774 nxt_port_read_close(port);
1775 nxt_port_use(task, port, -1);
1776 return NXT_ERROR;
1777 }
1778
1779 nxt_port_write_enable(task, port);
1780 port->app = app;
1781
1782 app->shared_port = port;
1783
1784 nxt_thread_mutex_create(&app->outgoing.mutex);
1785 }
1786 }
1787
1788 routes_conf = nxt_conf_get_path(conf, &routes_path);
1789 if (nxt_fast_path(routes_conf != NULL)) {
1790 routes = nxt_http_routes_create(task, tmcf, routes_conf);
1791 if (nxt_slow_path(routes == NULL)) {
1792 return NXT_ERROR;
1793 }
1794 tmcf->router_conf->routes = routes;
1795 }
1796
1797 ret = nxt_upstreams_create(task, tmcf, conf);
1798 if (nxt_slow_path(ret != NXT_OK)) {
1799 return ret;
1800 }
1801
1802 http = nxt_conf_get_path(conf, &http_path);
1803 #if 0
1804 if (http == NULL) {
1805 nxt_alert(task, "no \"http\" block");
1806 return NXT_ERROR;
1807 }
1808 #endif
1809
1810 websocket = nxt_conf_get_path(conf, &websocket_path);
1811
1812 listeners = nxt_conf_get_path(conf, &listeners_path);
1813
1814 if (listeners != NULL) {
1815 next = 0;
1816
1817 for ( ;; ) {
1818 listener = nxt_conf_next_object_member(listeners, &name, &next);
1819 if (listener == NULL) {
1820 break;
1821 }
1822
1823 skcf = nxt_router_socket_conf(task, tmcf, &name);
1824 if (skcf == NULL) {
1825 goto fail;
1826 }
1827
1828 nxt_memzero(&lscf, sizeof(lscf));
1829
1830 ret = nxt_conf_map_object(mp, listener, nxt_router_listener_conf,
1831 nxt_nitems(nxt_router_listener_conf),
1832 &lscf);
1833 if (ret != NXT_OK) {
1834 nxt_alert(task, "listener map error");
1835 goto fail;
1836 }
1837
1838 nxt_debug(task, "application: %V", &lscf.application);
1839
1840 // STUB, default values if http block is not defined.
1841 skcf->header_buffer_size = 2048;
1842 skcf->large_header_buffer_size = 8192;
1843 skcf->large_header_buffers = 4;
1844 skcf->discard_unsafe_fields = 1;
1845 skcf->body_buffer_size = 16 * 1024;
1846 skcf->max_body_size = 8 * 1024 * 1024;
1847 skcf->proxy_header_buffer_size = 64 * 1024;
1848 skcf->proxy_buffer_size = 4096;
1849 skcf->proxy_buffers = 256;
1850 skcf->idle_timeout = 180 * 1000;
1851 skcf->header_read_timeout = 30 * 1000;
1852 skcf->body_read_timeout = 30 * 1000;
1853 skcf->send_timeout = 30 * 1000;
1854 skcf->proxy_timeout = 60 * 1000;
1855 skcf->proxy_send_timeout = 30 * 1000;
1856 skcf->proxy_read_timeout = 30 * 1000;
1857
1858 skcf->websocket_conf.max_frame_size = 1024 * 1024;
1859 skcf->websocket_conf.read_timeout = 60 * 1000;
1860 skcf->websocket_conf.keepalive_interval = 30 * 1000;
1861
1862 nxt_str_null(&skcf->body_temp_path);
1863
1864 if (http != NULL) {
1865 ret = nxt_conf_map_object(mp, http, nxt_router_http_conf,
1866 nxt_nitems(nxt_router_http_conf),
1867 skcf);
1868 if (ret != NXT_OK) {
1869 nxt_alert(task, "http map error");
1870 goto fail;
1871 }
1872 }
1873
1874 if (websocket != NULL) {
1875 ret = nxt_conf_map_object(mp, websocket,
1876 nxt_router_websocket_conf,
1877 nxt_nitems(nxt_router_websocket_conf),
1878 &skcf->websocket_conf);
1879 if (ret != NXT_OK) {
1880 nxt_alert(task, "websocket map error");
1881 goto fail;
1882 }
1883 }
1884
1885 t = &skcf->body_temp_path;
1886
1887 if (t->length == 0) {
1888 t->start = (u_char *) task->thread->runtime->tmp;
1889 t->length = nxt_strlen(t->start);
1890 }
1891
1892 client_ip_conf = nxt_conf_get_path(listener, &client_ip_path);
1893 ret = nxt_router_conf_process_client_ip(task, tmcf, skcf,
1894 client_ip_conf);
1895 if (nxt_slow_path(ret != NXT_OK)) {
1896 return NXT_ERROR;
1897 }
1898
1899 #if (NXT_TLS)
1900 certificate = nxt_conf_get_path(listener, &certificate_path);
1901
1902 if (certificate != NULL) {
1903 tls_init = nxt_mp_get(tmcf->mem_pool, sizeof(nxt_tls_init_t));
1904 if (nxt_slow_path(tls_init == NULL)) {
1905 return NXT_ERROR;
1906 }
1907
1908 tls_init->cache_size = 0;
1909 tls_init->timeout = 300;
1910
1911 value = nxt_conf_get_path(listener, &conf_cache_path);
1912 if (value != NULL) {
1913 tls_init->cache_size = nxt_conf_get_number(value);
1914 }
1915
1916 value = nxt_conf_get_path(listener, &conf_timeout_path);
1917 if (value != NULL) {
1918 tls_init->timeout = nxt_conf_get_number(value);
1919 }
1920
1921 tls_init->conf_cmds = nxt_conf_get_path(listener,
1922 &conf_commands_path);
1923
1924 tls_init->tickets_conf = nxt_conf_get_path(listener,
1925 &conf_tickets);
1926
1927 n = nxt_conf_array_elements_count_or_1(certificate);
1928
1929 for (i = 0; i < n; i++) {
1930 value = nxt_conf_get_array_element_or_itself(certificate,
1931 i);
1932 nxt_assert(value != NULL);
1933
1934 ret = nxt_router_conf_tls_insert(tmcf, value, skcf,
1935 tls_init, i == 0);
1936 if (nxt_slow_path(ret != NXT_OK)) {
1937 goto fail;
1938 }
1939 }
1940 }
1941 #endif
1942
1943 skcf->listen->handler = nxt_http_conn_init;
1944 skcf->router_conf = tmcf->router_conf;
1945 skcf->router_conf->count++;
1946
1947 if (lscf.pass.length != 0) {
1948 skcf->action = nxt_http_action_create(task, tmcf, &lscf.pass);
1949
1950 /* COMPATIBILITY: listener application. */
1951 } else if (lscf.application.length > 0) {
1952 skcf->action = nxt_http_pass_application(task,
1953 tmcf->router_conf,
1954 &lscf.application);
1955 }
1956
1957 if (nxt_slow_path(skcf->action == NULL)) {
1958 goto fail;
1959 }
1960 }
1961 }
1962
1963 ret = nxt_http_routes_resolve(task, tmcf);
1964 if (nxt_slow_path(ret != NXT_OK)) {
1965 goto fail;
1966 }
1967
1968 value = nxt_conf_get_path(conf, &access_log_path);
1969
1970 if (value != NULL) {
1971 nxt_conf_get_string(value, &path);
1972
1973 access_log = router->access_log;
1974
1975 if (access_log != NULL && nxt_strstr_eq(&path, &access_log->path)) {
1976 nxt_router_access_log_use(&router->lock, access_log);
1977
1978 } else {
1979 access_log = nxt_malloc(sizeof(nxt_router_access_log_t)
1980 + path.length);
1981 if (access_log == NULL) {
1982 nxt_alert(task, "failed to allocate access log structure");
1983 goto fail;
1984 }
1985
1986 access_log->fd = -1;
1987 access_log->handler = &nxt_router_access_log_writer;
1988 access_log->count = 1;
1989
1990 access_log->path.length = path.length;
1991 access_log->path.start = (u_char *) access_log
1992 + sizeof(nxt_router_access_log_t);
1993
1994 nxt_memcpy(access_log->path.start, path.start, path.length);
1995 }
1996
1997 tmcf->router_conf->access_log = access_log;
1998 }
1999
2000 nxt_queue_add(&deleting_sockets, &router->sockets);
2001 nxt_queue_init(&router->sockets);
2002
2003 return NXT_OK;
2004
2005 app_fail:
2006
2007 nxt_mp_destroy(app_mp);
2008
2009 fail:
2010
2011 nxt_queue_each(app, &tmcf->apps, nxt_app_t, link) {
2012
2013 nxt_queue_remove(&app->link);
2014 nxt_thread_mutex_destroy(&app->mutex);
2015 nxt_mp_destroy(app->mem_pool);
2016
2017 } nxt_queue_loop;
2018
2019 return NXT_ERROR;
2020 }
2021
2022
2023 #if (NXT_TLS)
2024
2025 static nxt_int_t
nxt_router_conf_tls_insert(nxt_router_temp_conf_t * tmcf,nxt_conf_value_t * value,nxt_socket_conf_t * skcf,nxt_tls_init_t * tls_init,nxt_bool_t last)2026 nxt_router_conf_tls_insert(nxt_router_temp_conf_t *tmcf,
2027 nxt_conf_value_t *value, nxt_socket_conf_t *skcf,
2028 nxt_tls_init_t *tls_init, nxt_bool_t last)
2029 {
2030 nxt_router_tlssock_t *tls;
2031
2032 tls = nxt_mp_get(tmcf->mem_pool, sizeof(nxt_router_tlssock_t));
2033 if (nxt_slow_path(tls == NULL)) {
2034 return NXT_ERROR;
2035 }
2036
2037 tls->tls_init = tls_init;
2038 tls->socket_conf = skcf;
2039 tls->temp_conf = tmcf;
2040 tls->last = last;
2041 nxt_conf_get_string(value, &tls->name);
2042
2043 nxt_queue_insert_tail(&tmcf->tls, &tls->link);
2044
2045 return NXT_OK;
2046 }
2047
2048 #endif
2049
2050
2051 static nxt_int_t
nxt_router_conf_process_static(nxt_task_t * task,nxt_router_conf_t * rtcf,nxt_conf_value_t * conf)2052 nxt_router_conf_process_static(nxt_task_t *task, nxt_router_conf_t *rtcf,
2053 nxt_conf_value_t *conf)
2054 {
2055 uint32_t next, i;
2056 nxt_mp_t *mp;
2057 nxt_str_t *type, exten, str;
2058 nxt_int_t ret;
2059 nxt_uint_t exts;
2060 nxt_conf_value_t *mtypes_conf, *ext_conf, *value;
2061
2062 static nxt_str_t mtypes_path = nxt_string("/mime_types");
2063
2064 mp = rtcf->mem_pool;
2065
2066 ret = nxt_http_static_mtypes_init(mp, &rtcf->mtypes_hash);
2067 if (nxt_slow_path(ret != NXT_OK)) {
2068 return NXT_ERROR;
2069 }
2070
2071 if (conf == NULL) {
2072 return NXT_OK;
2073 }
2074
2075 mtypes_conf = nxt_conf_get_path(conf, &mtypes_path);
2076
2077 if (mtypes_conf != NULL) {
2078 next = 0;
2079
2080 for ( ;; ) {
2081 ext_conf = nxt_conf_next_object_member(mtypes_conf, &str, &next);
2082
2083 if (ext_conf == NULL) {
2084 break;
2085 }
2086
2087 type = nxt_str_dup(mp, NULL, &str);
2088 if (nxt_slow_path(type == NULL)) {
2089 return NXT_ERROR;
2090 }
2091
2092 if (nxt_conf_type(ext_conf) == NXT_CONF_STRING) {
2093 nxt_conf_get_string(ext_conf, &str);
2094
2095 if (nxt_slow_path(nxt_str_dup(mp, &exten, &str) == NULL)) {
2096 return NXT_ERROR;
2097 }
2098
2099 ret = nxt_http_static_mtypes_hash_add(mp, &rtcf->mtypes_hash,
2100 &exten, type);
2101 if (nxt_slow_path(ret != NXT_OK)) {
2102 return NXT_ERROR;
2103 }
2104
2105 continue;
2106 }
2107
2108 exts = nxt_conf_array_elements_count(ext_conf);
2109
2110 for (i = 0; i < exts; i++) {
2111 value = nxt_conf_get_array_element(ext_conf, i);
2112
2113 nxt_conf_get_string(value, &str);
2114
2115 if (nxt_slow_path(nxt_str_dup(mp, &exten, &str) == NULL)) {
2116 return NXT_ERROR;
2117 }
2118
2119 ret = nxt_http_static_mtypes_hash_add(mp, &rtcf->mtypes_hash,
2120 &exten, type);
2121 if (nxt_slow_path(ret != NXT_OK)) {
2122 return NXT_ERROR;
2123 }
2124 }
2125 }
2126 }
2127
2128 return NXT_OK;
2129 }
2130
2131
2132 static nxt_int_t
nxt_router_conf_process_client_ip(nxt_task_t * task,nxt_router_temp_conf_t * tmcf,nxt_socket_conf_t * skcf,nxt_conf_value_t * conf)2133 nxt_router_conf_process_client_ip(nxt_task_t *task, nxt_router_temp_conf_t *tmcf,
2134 nxt_socket_conf_t *skcf, nxt_conf_value_t *conf)
2135 {
2136 char c;
2137 size_t i;
2138 nxt_mp_t *mp;
2139 uint32_t hash;
2140 nxt_str_t header;
2141 nxt_conf_value_t *source_conf, *header_conf, *recursive_conf;
2142 nxt_http_client_ip_t *client_ip;
2143 nxt_http_route_addr_rule_t *source;
2144
2145 static nxt_str_t header_path = nxt_string("/header");
2146 static nxt_str_t source_path = nxt_string("/source");
2147 static nxt_str_t recursive_path = nxt_string("/recursive");
2148
2149 if (conf == NULL) {
2150 skcf->client_ip = NULL;
2151
2152 return NXT_OK;
2153 }
2154
2155 mp = tmcf->router_conf->mem_pool;
2156
2157 source_conf = nxt_conf_get_path(conf, &source_path);
2158 header_conf = nxt_conf_get_path(conf, &header_path);
2159 recursive_conf = nxt_conf_get_path(conf, &recursive_path);
2160
2161 if (source_conf == NULL || header_conf == NULL) {
2162 return NXT_ERROR;
2163 }
2164
2165 client_ip = nxt_mp_zget(mp, sizeof(nxt_http_client_ip_t));
2166 if (nxt_slow_path(client_ip == NULL)) {
2167 return NXT_ERROR;
2168 }
2169
2170 source = nxt_http_route_addr_rule_create(task, mp, source_conf);
2171 if (nxt_slow_path(source == NULL)) {
2172 return NXT_ERROR;
2173 }
2174
2175 client_ip->source = source;
2176
2177 nxt_conf_get_string(header_conf, &header);
2178
2179 if (recursive_conf != NULL) {
2180 client_ip->recursive = nxt_conf_get_boolean(recursive_conf);
2181 }
2182
2183 client_ip->header = nxt_str_dup(mp, NULL, &header);
2184 if (nxt_slow_path(client_ip->header == NULL)) {
2185 return NXT_ERROR;
2186 }
2187
2188 hash = NXT_HTTP_FIELD_HASH_INIT;
2189
2190 for (i = 0; i < client_ip->header->length; i++) {
2191 c = client_ip->header->start[i];
2192 hash = nxt_http_field_hash_char(hash, nxt_lowcase(c));
2193 }
2194
2195 hash = nxt_http_field_hash_end(hash) & 0xFFFF;
2196
2197 client_ip->header_hash = hash;
2198
2199 skcf->client_ip = client_ip;
2200
2201 return NXT_OK;
2202 }
2203
2204
2205 static nxt_app_t *
nxt_router_app_find(nxt_queue_t * queue,nxt_str_t * name)2206 nxt_router_app_find(nxt_queue_t *queue, nxt_str_t *name)
2207 {
2208 nxt_app_t *app;
2209
2210 nxt_queue_each(app, queue, nxt_app_t, link) {
2211
2212 if (nxt_strstr_eq(name, &app->name)) {
2213 return app;
2214 }
2215
2216 } nxt_queue_loop;
2217
2218 return NULL;
2219 }
2220
2221
2222 static nxt_int_t
nxt_router_app_queue_init(nxt_task_t * task,nxt_port_t * port)2223 nxt_router_app_queue_init(nxt_task_t *task, nxt_port_t *port)
2224 {
2225 void *mem;
2226 nxt_int_t fd;
2227
2228 fd = nxt_shm_open(task, sizeof(nxt_app_queue_t));
2229 if (nxt_slow_path(fd == -1)) {
2230 return NXT_ERROR;
2231 }
2232
2233 mem = nxt_mem_mmap(NULL, sizeof(nxt_app_queue_t),
2234 PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
2235 if (nxt_slow_path(mem == MAP_FAILED)) {
2236 nxt_fd_close(fd);
2237
2238 return NXT_ERROR;
2239 }
2240
2241 nxt_app_queue_init(mem);
2242
2243 port->queue_fd = fd;
2244 port->queue = mem;
2245
2246 return NXT_OK;
2247 }
2248
2249
2250 static nxt_int_t
nxt_router_port_queue_init(nxt_task_t * task,nxt_port_t * port)2251 nxt_router_port_queue_init(nxt_task_t *task, nxt_port_t *port)
2252 {
2253 void *mem;
2254 nxt_int_t fd;
2255
2256 fd = nxt_shm_open(task, sizeof(nxt_port_queue_t));
2257 if (nxt_slow_path(fd == -1)) {
2258 return NXT_ERROR;
2259 }
2260
2261 mem = nxt_mem_mmap(NULL, sizeof(nxt_port_queue_t),
2262 PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
2263 if (nxt_slow_path(mem == MAP_FAILED)) {
2264 nxt_fd_close(fd);
2265
2266 return NXT_ERROR;
2267 }
2268
2269 nxt_port_queue_init(mem);
2270
2271 port->queue_fd = fd;
2272 port->queue = mem;
2273
2274 return NXT_OK;
2275 }
2276
2277
2278 static nxt_int_t
nxt_router_port_queue_map(nxt_task_t * task,nxt_port_t * port,nxt_fd_t fd)2279 nxt_router_port_queue_map(nxt_task_t *task, nxt_port_t *port, nxt_fd_t fd)
2280 {
2281 void *mem;
2282
2283 nxt_assert(fd != -1);
2284
2285 mem = nxt_mem_mmap(NULL, sizeof(nxt_port_queue_t),
2286 PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
2287 if (nxt_slow_path(mem == MAP_FAILED)) {
2288
2289 return NXT_ERROR;
2290 }
2291
2292 port->queue = mem;
2293
2294 return NXT_OK;
2295 }
2296
2297
2298 static const nxt_lvlhsh_proto_t nxt_router_apps_hash_proto nxt_aligned(64) = {
2299 NXT_LVLHSH_DEFAULT,
2300 nxt_router_apps_hash_test,
2301 nxt_mp_lvlhsh_alloc,
2302 nxt_mp_lvlhsh_free,
2303 };
2304
2305
2306 static nxt_int_t
nxt_router_apps_hash_test(nxt_lvlhsh_query_t * lhq,void * data)2307 nxt_router_apps_hash_test(nxt_lvlhsh_query_t *lhq, void *data)
2308 {
2309 nxt_app_t *app;
2310
2311 app = data;
2312
2313 return nxt_strstr_eq(&lhq->key, &app->name) ? NXT_OK : NXT_DECLINED;
2314 }
2315
2316
2317 static nxt_int_t
nxt_router_apps_hash_add(nxt_router_conf_t * rtcf,nxt_app_t * app)2318 nxt_router_apps_hash_add(nxt_router_conf_t *rtcf, nxt_app_t *app)
2319 {
2320 nxt_lvlhsh_query_t lhq;
2321
2322 lhq.key_hash = nxt_djb_hash(app->name.start, app->name.length);
2323 lhq.replace = 0;
2324 lhq.key = app->name;
2325 lhq.value = app;
2326 lhq.proto = &nxt_router_apps_hash_proto;
2327 lhq.pool = rtcf->mem_pool;
2328
2329 switch (nxt_lvlhsh_insert(&rtcf->apps_hash, &lhq)) {
2330
2331 case NXT_OK:
2332 return NXT_OK;
2333
2334 case NXT_DECLINED:
2335 nxt_thread_log_alert("router app hash adding failed: "
2336 "\"%V\" is already in hash", &lhq.key);
2337 /* Fall through. */
2338 default:
2339 return NXT_ERROR;
2340 }
2341 }
2342
2343
2344 static nxt_app_t *
nxt_router_apps_hash_get(nxt_router_conf_t * rtcf,nxt_str_t * name)2345 nxt_router_apps_hash_get(nxt_router_conf_t *rtcf, nxt_str_t *name)
2346 {
2347 nxt_lvlhsh_query_t lhq;
2348
2349 lhq.key_hash = nxt_djb_hash(name->start, name->length);
2350 lhq.key = *name;
2351 lhq.proto = &nxt_router_apps_hash_proto;
2352
2353 if (nxt_lvlhsh_find(&rtcf->apps_hash, &lhq) != NXT_OK) {
2354 return NULL;
2355 }
2356
2357 return lhq.value;
2358 }
2359
2360
2361 static void
nxt_router_apps_hash_use(nxt_task_t * task,nxt_router_conf_t * rtcf,int i)2362 nxt_router_apps_hash_use(nxt_task_t *task, nxt_router_conf_t *rtcf, int i)
2363 {
2364 nxt_app_t *app;
2365 nxt_lvlhsh_each_t lhe;
2366
2367 nxt_lvlhsh_each_init(&lhe, &nxt_router_apps_hash_proto);
2368
2369 for ( ;; ) {
2370 app = nxt_lvlhsh_each(&rtcf->apps_hash, &lhe);
2371
2372 if (app == NULL) {
2373 break;
2374 }
2375
2376 nxt_router_app_use(task, app, i);
2377 }
2378 }
2379
2380
2381 typedef struct {
2382 nxt_app_t *app;
2383 nxt_int_t target;
2384 } nxt_http_app_conf_t;
2385
2386
2387 nxt_int_t
nxt_router_application_init(nxt_router_conf_t * rtcf,nxt_str_t * name,nxt_str_t * target,nxt_http_action_t * action)2388 nxt_router_application_init(nxt_router_conf_t *rtcf, nxt_str_t *name,
2389 nxt_str_t *target, nxt_http_action_t *action)
2390 {
2391 nxt_app_t *app;
2392 nxt_str_t *targets;
2393 nxt_uint_t i;
2394 nxt_http_app_conf_t *conf;
2395
2396 app = nxt_router_apps_hash_get(rtcf, name);
2397 if (app == NULL) {
2398 return NXT_DECLINED;
2399 }
2400
2401 conf = nxt_mp_get(rtcf->mem_pool, sizeof(nxt_http_app_conf_t));
2402 if (nxt_slow_path(conf == NULL)) {
2403 return NXT_ERROR;
2404 }
2405
2406 action->handler = nxt_http_application_handler;
2407 action->u.conf = conf;
2408
2409 conf->app = app;
2410
2411 if (target != NULL && target->length != 0) {
2412 targets = app->targets;
2413
2414 for (i = 0; !nxt_strstr_eq(target, &targets[i]); i++);
2415
2416 conf->target = i;
2417
2418 } else {
2419 conf->target = 0;
2420 }
2421
2422 return NXT_OK;
2423 }
2424
2425
2426 static nxt_socket_conf_t *
nxt_router_socket_conf(nxt_task_t * task,nxt_router_temp_conf_t * tmcf,nxt_str_t * name)2427 nxt_router_socket_conf(nxt_task_t *task, nxt_router_temp_conf_t *tmcf,
2428 nxt_str_t *name)
2429 {
2430 size_t size;
2431 nxt_int_t ret;
2432 nxt_bool_t wildcard;
2433 nxt_sockaddr_t *sa;
2434 nxt_socket_conf_t *skcf;
2435 nxt_listen_socket_t *ls;
2436
2437 sa = nxt_sockaddr_parse(tmcf->mem_pool, name);
2438 if (nxt_slow_path(sa == NULL)) {
2439 nxt_alert(task, "invalid listener \"%V\"", name);
2440 return NULL;
2441 }
2442
2443 sa->type = SOCK_STREAM;
2444
2445 nxt_debug(task, "router listener: \"%*s\"",
2446 (size_t) sa->length, nxt_sockaddr_start(sa));
2447
2448 skcf = nxt_mp_zget(tmcf->router_conf->mem_pool, sizeof(nxt_socket_conf_t));
2449 if (nxt_slow_path(skcf == NULL)) {
2450 return NULL;
2451 }
2452
2453 size = nxt_sockaddr_size(sa);
2454
2455 ret = nxt_router_listen_socket_find(tmcf, skcf, sa);
2456
2457 if (ret != NXT_OK) {
2458
2459 ls = nxt_zalloc(sizeof(nxt_listen_socket_t) + size);
2460 if (nxt_slow_path(ls == NULL)) {
2461 return NULL;
2462 }
2463
2464 skcf->listen = ls;
2465
2466 ls->sockaddr = nxt_pointer_to(ls, sizeof(nxt_listen_socket_t));
2467 nxt_memcpy(ls->sockaddr, sa, size);
2468
2469 nxt_listen_socket_remote_size(ls);
2470
2471 ls->socket = -1;
2472 ls->backlog = NXT_LISTEN_BACKLOG;
2473 ls->flags = NXT_NONBLOCK;
2474 ls->read_after_accept = 1;
2475 }
2476
2477 switch (sa->u.sockaddr.sa_family) {
2478 #if (NXT_HAVE_UNIX_DOMAIN)
2479 case AF_UNIX:
2480 wildcard = 0;
2481 break;
2482 #endif
2483 #if (NXT_INET6)
2484 case AF_INET6:
2485 wildcard = IN6_IS_ADDR_UNSPECIFIED(&sa->u.sockaddr_in6.sin6_addr);
2486 break;
2487 #endif
2488 case AF_INET:
2489 default:
2490 wildcard = (sa->u.sockaddr_in.sin_addr.s_addr == INADDR_ANY);
2491 break;
2492 }
2493
2494 if (!wildcard) {
2495 skcf->sockaddr = nxt_mp_zget(tmcf->router_conf->mem_pool, size);
2496 if (nxt_slow_path(skcf->sockaddr == NULL)) {
2497 return NULL;
2498 }
2499
2500 nxt_memcpy(skcf->sockaddr, sa, size);
2501 }
2502
2503 return skcf;
2504 }
2505
2506
2507 static nxt_int_t
nxt_router_listen_socket_find(nxt_router_temp_conf_t * tmcf,nxt_socket_conf_t * nskcf,nxt_sockaddr_t * sa)2508 nxt_router_listen_socket_find(nxt_router_temp_conf_t *tmcf,
2509 nxt_socket_conf_t *nskcf, nxt_sockaddr_t *sa)
2510 {
2511 nxt_router_t *router;
2512 nxt_queue_link_t *qlk;
2513 nxt_socket_conf_t *skcf;
2514
2515 router = tmcf->router_conf->router;
2516
2517 for (qlk = nxt_queue_first(&router->sockets);
2518 qlk != nxt_queue_tail(&router->sockets);
2519 qlk = nxt_queue_next(qlk))
2520 {
2521 skcf = nxt_queue_link_data(qlk, nxt_socket_conf_t, link);
2522
2523 if (nxt_sockaddr_cmp(skcf->listen->sockaddr, sa)) {
2524 nskcf->listen = skcf->listen;
2525
2526 nxt_queue_remove(qlk);
2527 nxt_queue_insert_tail(&keeping_sockets, qlk);
2528
2529 nxt_queue_insert_tail(&updating_sockets, &nskcf->link);
2530
2531 return NXT_OK;
2532 }
2533 }
2534
2535 nxt_queue_insert_tail(&pending_sockets, &nskcf->link);
2536
2537 return NXT_DECLINED;
2538 }
2539
2540
2541 static void
nxt_router_listen_socket_rpc_create(nxt_task_t * task,nxt_router_temp_conf_t * tmcf,nxt_socket_conf_t * skcf)2542 nxt_router_listen_socket_rpc_create(nxt_task_t *task,
2543 nxt_router_temp_conf_t *tmcf, nxt_socket_conf_t *skcf)
2544 {
2545 size_t size;
2546 uint32_t stream;
2547 nxt_int_t ret;
2548 nxt_buf_t *b;
2549 nxt_port_t *main_port, *router_port;
2550 nxt_runtime_t *rt;
2551 nxt_socket_rpc_t *rpc;
2552
2553 rpc = nxt_mp_alloc(tmcf->mem_pool, sizeof(nxt_socket_rpc_t));
2554 if (rpc == NULL) {
2555 goto fail;
2556 }
2557
2558 rpc->socket_conf = skcf;
2559 rpc->temp_conf = tmcf;
2560
2561 size = nxt_sockaddr_size(skcf->listen->sockaddr);
2562
2563 b = nxt_buf_mem_alloc(tmcf->mem_pool, size, 0);
2564 if (b == NULL) {
2565 goto fail;
2566 }
2567
2568 b->completion_handler = nxt_buf_dummy_completion;
2569
2570 b->mem.free = nxt_cpymem(b->mem.free, skcf->listen->sockaddr, size);
2571
2572 rt = task->thread->runtime;
2573 main_port = rt->port_by_type[NXT_PROCESS_MAIN];
2574 router_port = rt->port_by_type[NXT_PROCESS_ROUTER];
2575
2576 stream = nxt_port_rpc_register_handler(task, router_port,
2577 nxt_router_listen_socket_ready,
2578 nxt_router_listen_socket_error,
2579 main_port->pid, rpc);
2580 if (nxt_slow_path(stream == 0)) {
2581 goto fail;
2582 }
2583
2584 ret = nxt_port_socket_write(task, main_port, NXT_PORT_MSG_SOCKET, -1,
2585 stream, router_port->id, b);
2586
2587 if (nxt_slow_path(ret != NXT_OK)) {
2588 nxt_port_rpc_cancel(task, router_port, stream);
2589 goto fail;
2590 }
2591
2592 return;
2593
2594 fail:
2595
2596 nxt_router_conf_error(task, tmcf);
2597 }
2598
2599
2600 static void
nxt_router_listen_socket_ready(nxt_task_t * task,nxt_port_recv_msg_t * msg,void * data)2601 nxt_router_listen_socket_ready(nxt_task_t *task, nxt_port_recv_msg_t *msg,
2602 void *data)
2603 {
2604 nxt_int_t ret;
2605 nxt_socket_t s;
2606 nxt_socket_rpc_t *rpc;
2607
2608 rpc = data;
2609
2610 s = msg->fd[0];
2611
2612 ret = nxt_socket_nonblocking(task, s);
2613 if (nxt_slow_path(ret != NXT_OK)) {
2614 goto fail;
2615 }
2616
2617 nxt_socket_defer_accept(task, s, rpc->socket_conf->listen->sockaddr);
2618
2619 ret = nxt_listen_socket(task, s, NXT_LISTEN_BACKLOG);
2620 if (nxt_slow_path(ret != NXT_OK)) {
2621 goto fail;
2622 }
2623
2624 rpc->socket_conf->listen->socket = s;
2625
2626 nxt_work_queue_add(&task->thread->engine->fast_work_queue,
2627 nxt_router_conf_apply, task, rpc->temp_conf, NULL);
2628
2629 return;
2630
2631 fail:
2632
2633 nxt_socket_close(task, s);
2634
2635 nxt_router_conf_error(task, rpc->temp_conf);
2636 }
2637
2638
2639 static void
nxt_router_listen_socket_error(nxt_task_t * task,nxt_port_recv_msg_t * msg,void * data)2640 nxt_router_listen_socket_error(nxt_task_t *task, nxt_port_recv_msg_t *msg,
2641 void *data)
2642 {
2643 nxt_socket_rpc_t *rpc;
2644 nxt_router_temp_conf_t *tmcf;
2645
2646 rpc = data;
2647 tmcf = rpc->temp_conf;
2648
2649 #if 0
2650 u_char *p;
2651 size_t size;
2652 uint8_t error;
2653 nxt_buf_t *in, *out;
2654 nxt_sockaddr_t *sa;
2655
2656 static nxt_str_t socket_errors[] = {
2657 nxt_string("ListenerSystem"),
2658 nxt_string("ListenerNoIPv6"),
2659 nxt_string("ListenerPort"),
2660 nxt_string("ListenerInUse"),
2661 nxt_string("ListenerNoAddress"),
2662 nxt_string("ListenerNoAccess"),
2663 nxt_string("ListenerPath"),
2664 };
2665
2666 sa = rpc->socket_conf->listen->sockaddr;
2667
2668 in = nxt_buf_chk_make_plain(tmcf->mem_pool, msg->buf, msg->size);
2669
2670 if (nxt_slow_path(in == NULL)) {
2671 return;
2672 }
2673
2674 p = in->mem.pos;
2675
2676 error = *p++;
2677
2678 size = nxt_length("listen socket error: ")
2679 + nxt_length("{listener: \"\", code:\"\", message: \"\"}")
2680 + sa->length + socket_errors[error].length + (in->mem.free - p);
2681
2682 out = nxt_buf_mem_alloc(tmcf->mem_pool, size, 0);
2683 if (nxt_slow_path(out == NULL)) {
2684 return;
2685 }
2686
2687 out->mem.free = nxt_sprintf(out->mem.free, out->mem.end,
2688 "listen socket error: "
2689 "{listener: \"%*s\", code:\"%V\", message: \"%*s\"}",
2690 (size_t) sa->length, nxt_sockaddr_start(sa),
2691 &socket_errors[error], in->mem.free - p, p);
2692
2693 nxt_debug(task, "%*s", out->mem.free - out->mem.pos, out->mem.pos);
2694 #endif
2695
2696 nxt_router_conf_error(task, tmcf);
2697 }
2698
2699
2700 #if (NXT_TLS)
2701
2702 static void
nxt_router_tls_rpc_handler(nxt_task_t * task,nxt_port_recv_msg_t * msg,void * data)2703 nxt_router_tls_rpc_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg,
2704 void *data)
2705 {
2706 nxt_mp_t *mp;
2707 nxt_int_t ret;
2708 nxt_tls_conf_t *tlscf;
2709 nxt_router_tlssock_t *tls;
2710 nxt_tls_bundle_conf_t *bundle;
2711 nxt_router_temp_conf_t *tmcf;
2712
2713 nxt_debug(task, "tls rpc handler");
2714
2715 tls = data;
2716 tmcf = tls->temp_conf;
2717
2718 if (msg == NULL || msg->port_msg.type == _NXT_PORT_MSG_RPC_ERROR) {
2719 goto fail;
2720 }
2721
2722 mp = tmcf->router_conf->mem_pool;
2723
2724 if (tls->socket_conf->tls == NULL){
2725 tlscf = nxt_mp_zget(mp, sizeof(nxt_tls_conf_t));
2726 if (nxt_slow_path(tlscf == NULL)) {
2727 goto fail;
2728 }
2729
2730 tlscf->no_wait_shutdown = 1;
2731 tls->socket_conf->tls = tlscf;
2732
2733 } else {
2734 tlscf = tls->socket_conf->tls;
2735 }
2736
2737 tls->tls_init->conf = tlscf;
2738
2739 bundle = nxt_mp_get(mp, sizeof(nxt_tls_bundle_conf_t));
2740 if (nxt_slow_path(bundle == NULL)) {
2741 goto fail;
2742 }
2743
2744 if (nxt_slow_path(nxt_str_dup(mp, &bundle->name, &tls->name) == NULL)) {
2745 goto fail;
2746 }
2747
2748 bundle->chain_file = msg->fd[0];
2749 bundle->next = tlscf->bundle;
2750 tlscf->bundle = bundle;
2751
2752 ret = task->thread->runtime->tls->server_init(task, mp, tls->tls_init,
2753 tls->last);
2754 if (nxt_slow_path(ret != NXT_OK)) {
2755 goto fail;
2756 }
2757
2758 nxt_work_queue_add(&task->thread->engine->fast_work_queue,
2759 nxt_router_conf_apply, task, tmcf, NULL);
2760 return;
2761
2762 fail:
2763
2764 nxt_router_conf_error(task, tmcf);
2765 }
2766
2767 #endif
2768
2769
2770 static void
nxt_router_app_rpc_create(nxt_task_t * task,nxt_router_temp_conf_t * tmcf,nxt_app_t * app)2771 nxt_router_app_rpc_create(nxt_task_t *task,
2772 nxt_router_temp_conf_t *tmcf, nxt_app_t *app)
2773 {
2774 size_t size;
2775 uint32_t stream;
2776 nxt_fd_t port_fd, queue_fd;
2777 nxt_int_t ret;
2778 nxt_buf_t *b;
2779 nxt_port_t *router_port, *dport;
2780 nxt_runtime_t *rt;
2781 nxt_app_rpc_t *rpc;
2782
2783 rt = task->thread->runtime;
2784
2785 dport = app->proto_port;
2786
2787 if (dport == NULL) {
2788 nxt_debug(task, "app '%V' prototype prefork", &app->name);
2789
2790 size = app->name.length + 1 + app->conf.length;
2791
2792 b = nxt_buf_mem_alloc(tmcf->mem_pool, size, 0);
2793 if (nxt_slow_path(b == NULL)) {
2794 goto fail;
2795 }
2796
2797 b->completion_handler = nxt_buf_dummy_completion;
2798
2799 nxt_buf_cpystr(b, &app->name);
2800 *b->mem.free++ = '\0';
2801 nxt_buf_cpystr(b, &app->conf);
2802
2803 dport = rt->port_by_type[NXT_PROCESS_MAIN];
2804
2805 port_fd = app->shared_port->pair[0];
2806 queue_fd = app->shared_port->queue_fd;
2807
2808 } else {
2809 nxt_debug(task, "app '%V' prefork", &app->name);
2810
2811 b = NULL;
2812 port_fd = -1;
2813 queue_fd = -1;
2814 }
2815
2816 router_port = rt->port_by_type[NXT_PROCESS_ROUTER];
2817
2818 rpc = nxt_port_rpc_register_handler_ex(task, router_port,
2819 nxt_router_app_prefork_ready,
2820 nxt_router_app_prefork_error,
2821 sizeof(nxt_app_rpc_t));
2822 if (nxt_slow_path(rpc == NULL)) {
2823 goto fail;
2824 }
2825
2826 rpc->app = app;
2827 rpc->temp_conf = tmcf;
2828 rpc->proto = (b != NULL);
2829
2830 stream = nxt_port_rpc_ex_stream(rpc);
2831
2832 ret = nxt_port_socket_write2(task, dport, NXT_PORT_MSG_START_PROCESS,
2833 port_fd, queue_fd, stream, router_port->id, b);
2834 if (nxt_slow_path(ret != NXT_OK)) {
2835 nxt_port_rpc_cancel(task, router_port, stream);
2836 goto fail;
2837 }
2838
2839 if (b == NULL) {
2840 nxt_port_rpc_ex_set_peer(task, router_port, rpc, dport->pid);
2841
2842 app->pending_processes++;
2843 }
2844
2845 return;
2846
2847 fail:
2848
2849 nxt_router_conf_error(task, tmcf);
2850 }
2851
2852
2853 static void
nxt_router_app_prefork_ready(nxt_task_t * task,nxt_port_recv_msg_t * msg,void * data)2854 nxt_router_app_prefork_ready(nxt_task_t *task, nxt_port_recv_msg_t *msg,
2855 void *data)
2856 {
2857 nxt_app_t *app;
2858 nxt_port_t *port;
2859 nxt_app_rpc_t *rpc;
2860 nxt_event_engine_t *engine;
2861
2862 rpc = data;
2863 app = rpc->app;
2864
2865 port = msg->u.new_port;
2866
2867 nxt_assert(port != NULL);
2868 nxt_assert(port->id == 0);
2869
2870 if (rpc->proto) {
2871 nxt_assert(app->proto_port == NULL);
2872 nxt_assert(port->type == NXT_PROCESS_PROTOTYPE);
2873
2874 nxt_port_inc_use(port);
2875
2876 app->proto_port = port;
2877 port->app = app;
2878
2879 nxt_router_app_rpc_create(task, rpc->temp_conf, app);
2880
2881 return;
2882 }
2883
2884 nxt_assert(port->type == NXT_PROCESS_APP);
2885
2886 port->app = app;
2887 port->main_app_port = port;
2888
2889 app->pending_processes--;
2890 app->processes++;
2891 app->idle_processes++;
2892
2893 engine = task->thread->engine;
2894
2895 nxt_queue_insert_tail(&app->ports, &port->app_link);
2896 nxt_queue_insert_tail(&app->spare_ports, &port->idle_link);
2897
2898 nxt_debug(task, "app '%V' move new port %PI:%d to spare_ports",
2899 &app->name, port->pid, port->id);
2900
2901 nxt_port_hash_add(&app->port_hash, port);
2902 app->port_hash_count++;
2903
2904 port->idle_start = 0;
2905
2906 nxt_port_inc_use(port);
2907
2908 nxt_port_socket_write(task, port, NXT_PORT_MSG_PORT_ACK, -1, 0, 0, NULL);
2909
2910 nxt_work_queue_add(&engine->fast_work_queue,
2911 nxt_router_conf_apply, task, rpc->temp_conf, NULL);
2912 }
2913
2914
2915 static void
nxt_router_app_prefork_error(nxt_task_t * task,nxt_port_recv_msg_t * msg,void * data)2916 nxt_router_app_prefork_error(nxt_task_t *task, nxt_port_recv_msg_t *msg,
2917 void *data)
2918 {
2919 nxt_app_t *app;
2920 nxt_app_rpc_t *rpc;
2921 nxt_router_temp_conf_t *tmcf;
2922
2923 rpc = data;
2924 app = rpc->app;
2925 tmcf = rpc->temp_conf;
2926
2927 if (rpc->proto) {
2928 nxt_log(task, NXT_LOG_WARN, "failed to start prototype \"%V\"",
2929 &app->name);
2930
2931 } else {
2932 nxt_log(task, NXT_LOG_WARN, "failed to start application \"%V\"",
2933 &app->name);
2934
2935 app->pending_processes--;
2936 }
2937
2938 nxt_router_conf_error(task, tmcf);
2939 }
2940
2941
2942 static nxt_int_t
nxt_router_engines_create(nxt_task_t * task,nxt_router_t * router,nxt_router_temp_conf_t * tmcf,const nxt_event_interface_t * interface)2943 nxt_router_engines_create(nxt_task_t *task, nxt_router_t *router,
2944 nxt_router_temp_conf_t *tmcf, const nxt_event_interface_t *interface)
2945 {
2946 nxt_int_t ret;
2947 nxt_uint_t n, threads;
2948 nxt_queue_link_t *qlk;
2949 nxt_router_engine_conf_t *recf;
2950
2951 threads = tmcf->router_conf->threads;
2952
2953 tmcf->engines = nxt_array_create(tmcf->mem_pool, threads,
2954 sizeof(nxt_router_engine_conf_t));
2955 if (nxt_slow_path(tmcf->engines == NULL)) {
2956 return NXT_ERROR;
2957 }
2958
2959 n = 0;
2960
2961 for (qlk = nxt_queue_first(&router->engines);
2962 qlk != nxt_queue_tail(&router->engines);
2963 qlk = nxt_queue_next(qlk))
2964 {
2965 recf = nxt_array_zero_add(tmcf->engines);
2966 if (nxt_slow_path(recf == NULL)) {
2967 return NXT_ERROR;
2968 }
2969
2970 recf->engine = nxt_queue_link_data(qlk, nxt_event_engine_t, link0);
2971
2972 if (n < threads) {
2973 recf->action = NXT_ROUTER_ENGINE_KEEP;
2974 ret = nxt_router_engine_conf_update(tmcf, recf);
2975
2976 } else {
2977 recf->action = NXT_ROUTER_ENGINE_DELETE;
2978 ret = nxt_router_engine_conf_delete(tmcf, recf);
2979 }
2980
2981 if (nxt_slow_path(ret != NXT_OK)) {
2982 return ret;
2983 }
2984
2985 n++;
2986 }
2987
2988 tmcf->new_threads = n;
2989
2990 while (n < threads) {
2991 recf = nxt_array_zero_add(tmcf->engines);
2992 if (nxt_slow_path(recf == NULL)) {
2993 return NXT_ERROR;
2994 }
2995
2996 recf->action = NXT_ROUTER_ENGINE_ADD;
2997
2998 recf->engine = nxt_event_engine_create(task, interface, NULL, 0, 0);
2999 if (nxt_slow_path(recf->engine == NULL)) {
3000 return NXT_ERROR;
3001 }
3002
3003 ret = nxt_router_engine_conf_create(tmcf, recf);
3004 if (nxt_slow_path(ret != NXT_OK)) {
3005 return ret;
3006 }
3007
3008 n++;
3009 }
3010
3011 return NXT_OK;
3012 }
3013
3014
3015 static nxt_int_t
nxt_router_engine_conf_create(nxt_router_temp_conf_t * tmcf,nxt_router_engine_conf_t * recf)3016 nxt_router_engine_conf_create(nxt_router_temp_conf_t *tmcf,
3017 nxt_router_engine_conf_t *recf)
3018 {
3019 nxt_int_t ret;
3020
3021 ret = nxt_router_engine_joints_create(tmcf, recf, &creating_sockets,
3022 nxt_router_listen_socket_create);
3023 if (nxt_slow_path(ret != NXT_OK)) {
3024 return ret;
3025 }
3026
3027 ret = nxt_router_engine_joints_create(tmcf, recf, &updating_sockets,
3028 nxt_router_listen_socket_create);
3029 if (nxt_slow_path(ret != NXT_OK)) {
3030 return ret;
3031 }
3032
3033 return ret;
3034 }
3035
3036
3037 static nxt_int_t
nxt_router_engine_conf_update(nxt_router_temp_conf_t * tmcf,nxt_router_engine_conf_t * recf)3038 nxt_router_engine_conf_update(nxt_router_temp_conf_t *tmcf,
3039 nxt_router_engine_conf_t *recf)
3040 {
3041 nxt_int_t ret;
3042
3043 ret = nxt_router_engine_joints_create(tmcf, recf, &creating_sockets,
3044 nxt_router_listen_socket_create);
3045 if (nxt_slow_path(ret != NXT_OK)) {
3046 return ret;
3047 }
3048
3049 ret = nxt_router_engine_joints_create(tmcf, recf, &updating_sockets,
3050 nxt_router_listen_socket_update);
3051 if (nxt_slow_path(ret != NXT_OK)) {
3052 return ret;
3053 }
3054
3055 ret = nxt_router_engine_joints_delete(tmcf, recf, &deleting_sockets);
3056 if (nxt_slow_path(ret != NXT_OK)) {
3057 return ret;
3058 }
3059
3060 return ret;
3061 }
3062
3063
3064 static nxt_int_t
nxt_router_engine_conf_delete(nxt_router_temp_conf_t * tmcf,nxt_router_engine_conf_t * recf)3065 nxt_router_engine_conf_delete(nxt_router_temp_conf_t *tmcf,
3066 nxt_router_engine_conf_t *recf)
3067 {
3068 nxt_int_t ret;
3069
3070 ret = nxt_router_engine_quit(tmcf, recf);
3071 if (nxt_slow_path(ret != NXT_OK)) {
3072 return ret;
3073 }
3074
3075 ret = nxt_router_engine_joints_delete(tmcf, recf, &updating_sockets);
3076 if (nxt_slow_path(ret != NXT_OK)) {
3077 return ret;
3078 }
3079
3080 return nxt_router_engine_joints_delete(tmcf, recf, &deleting_sockets);
3081 }
3082
3083
3084 static nxt_int_t
nxt_router_engine_joints_create(nxt_router_temp_conf_t * tmcf,nxt_router_engine_conf_t * recf,nxt_queue_t * sockets,nxt_work_handler_t handler)3085 nxt_router_engine_joints_create(nxt_router_temp_conf_t *tmcf,
3086 nxt_router_engine_conf_t *recf, nxt_queue_t *sockets,
3087 nxt_work_handler_t handler)
3088 {
3089 nxt_int_t ret;
3090 nxt_joint_job_t *job;
3091 nxt_queue_link_t *qlk;
3092 nxt_socket_conf_t *skcf;
3093 nxt_socket_conf_joint_t *joint;
3094
3095 for (qlk = nxt_queue_first(sockets);
3096 qlk != nxt_queue_tail(sockets);
3097 qlk = nxt_queue_next(qlk))
3098 {
3099 job = nxt_mp_get(tmcf->mem_pool, sizeof(nxt_joint_job_t));
3100 if (nxt_slow_path(job == NULL)) {
3101 return NXT_ERROR;
3102 }
3103
3104 job->work.next = recf->jobs;
3105 recf->jobs = &job->work;
3106
3107 job->task = tmcf->engine->task;
3108 job->work.handler = handler;
3109 job->work.task = &job->task;
3110 job->work.obj = job;
3111 job->tmcf = tmcf;
3112
3113 tmcf->count++;
3114
3115 joint = nxt_mp_alloc(tmcf->router_conf->mem_pool,
3116 sizeof(nxt_socket_conf_joint_t));
3117 if (nxt_slow_path(joint == NULL)) {
3118 return NXT_ERROR;
3119 }
3120
3121 job->work.data = joint;
3122
3123 ret = nxt_upstreams_joint_create(tmcf, &joint->upstreams);
3124 if (nxt_slow_path(ret != NXT_OK)) {
3125 return ret;
3126 }
3127
3128 joint->count = 1;
3129
3130 skcf = nxt_queue_link_data(qlk, nxt_socket_conf_t, link);
3131 skcf->count++;
3132 joint->socket_conf = skcf;
3133
3134 joint->engine = recf->engine;
3135 }
3136
3137 return NXT_OK;
3138 }
3139
3140
3141 static nxt_int_t
nxt_router_engine_quit(nxt_router_temp_conf_t * tmcf,nxt_router_engine_conf_t * recf)3142