1
2 /*
3 * Copyright (C) Igor Sysoev
4 * Copyright (C) Valentin V. Bartenev
5 * Copyright (C) NGINX, Inc.
6 */
7
8 #include <nxt_router.h>
9 #include <nxt_conf.h>
10 #if (NXT_TLS)
11 #include <nxt_cert.h>
12 #endif
13 #include <nxt_http.h>
14 #include <nxt_port_memory_int.h>
15 #include <nxt_unit_request.h>
16 #include <nxt_unit_response.h>
17 #include <nxt_router_request.h>
18 #include <nxt_app_queue.h>
19 #include <nxt_port_queue.h>
20
21 #define NXT_SHARED_PORT_ID 0xFFFFu
22
23 typedef struct {
24 nxt_str_t type;
25 uint32_t processes;
26 uint32_t max_processes;
27 uint32_t spare_processes;
28 nxt_msec_t timeout;
29 nxt_msec_t idle_timeout;
30 nxt_conf_value_t *limits_value;
31 nxt_conf_value_t *processes_value;
32 nxt_conf_value_t *targets_value;
33 } nxt_router_app_conf_t;
34
35
36 typedef struct {
37 nxt_str_t pass;
38 nxt_str_t application;
39 } nxt_router_listener_conf_t;
40
41
42 #if (NXT_TLS)
43
44 typedef struct {
45 nxt_str_t name;
46 nxt_socket_conf_t *socket_conf;
47 nxt_router_temp_conf_t *temp_conf;
48 nxt_tls_init_t *tls_init;
49 nxt_bool_t last;
50
51 nxt_queue_link_t link; /* for nxt_socket_conf_t.tls */
52 } nxt_router_tlssock_t;
53
54 #endif
55
56
57 typedef struct {
58 nxt_str_t *name;
59 nxt_socket_conf_t *socket_conf;
60 nxt_router_temp_conf_t *temp_conf;
61 nxt_bool_t last;
62 } nxt_socket_rpc_t;
63
64
65 typedef struct {
66 nxt_app_t *app;
67 nxt_router_temp_conf_t *temp_conf;
68 uint8_t proto; /* 1 bit */
69 } nxt_app_rpc_t;
70
71
72 typedef struct {
73 nxt_app_joint_t *app_joint;
74 uint32_t generation;
75 uint8_t proto; /* 1 bit */
76 } nxt_app_joint_rpc_t;
77
78
79 static nxt_int_t nxt_router_prefork(nxt_task_t *task, nxt_process_t *process,
80 nxt_mp_t *mp);
81 static nxt_int_t nxt_router_start(nxt_task_t *task, nxt_process_data_t *data);
82 static void nxt_router_greet_controller(nxt_task_t *task,
83 nxt_port_t *controller_port);
84
85 static nxt_int_t nxt_router_start_app_process(nxt_task_t *task, nxt_app_t *app);
86
87 static void nxt_router_new_port_handler(nxt_task_t *task,
88 nxt_port_recv_msg_t *msg);
89 static void nxt_router_conf_data_handler(nxt_task_t *task,
90 nxt_port_recv_msg_t *msg);
91 static void nxt_router_app_restart_handler(nxt_task_t *task,
92 nxt_port_recv_msg_t *msg);
93 static void nxt_router_remove_pid_handler(nxt_task_t *task,
94 nxt_port_recv_msg_t *msg);
95
96 static nxt_router_temp_conf_t *nxt_router_temp_conf(nxt_task_t *task);
97 static void nxt_router_conf_ready(nxt_task_t *task,
98 nxt_router_temp_conf_t *tmcf);
99 static void nxt_router_conf_send(nxt_task_t *task,
100 nxt_router_temp_conf_t *tmcf, nxt_port_msg_type_t type);
101
102 static nxt_int_t nxt_router_conf_create(nxt_task_t *task,
103 nxt_router_temp_conf_t *tmcf, u_char *start, u_char *end);
104 static nxt_int_t nxt_router_conf_process_static(nxt_task_t *task,
105 nxt_router_conf_t *rtcf, nxt_conf_value_t *conf);
106 static nxt_http_forward_t *nxt_router_conf_forward(nxt_task_t *task,
107 nxt_mp_t *mp, nxt_conf_value_t *conf);
108 static nxt_int_t nxt_router_conf_forward_header(nxt_mp_t *mp,
109 nxt_conf_value_t *conf, nxt_http_forward_header_t *fh);
110
111 static nxt_app_t *nxt_router_app_find(nxt_queue_t *queue, nxt_str_t *name);
112 static nxt_int_t nxt_router_apps_hash_test(nxt_lvlhsh_query_t *lhq, void *data);
113 static nxt_int_t nxt_router_apps_hash_add(nxt_router_conf_t *rtcf,
114 nxt_app_t *app);
115 static nxt_app_t *nxt_router_apps_hash_get(nxt_router_conf_t *rtcf,
116 nxt_str_t *name);
117 static void nxt_router_apps_hash_use(nxt_task_t *task, nxt_router_conf_t *rtcf,
118 int i);
119
120 static nxt_int_t nxt_router_app_queue_init(nxt_task_t *task,
121 nxt_port_t *port);
122 static nxt_int_t nxt_router_port_queue_init(nxt_task_t *task,
123 nxt_port_t *port);
124 static nxt_int_t nxt_router_port_queue_map(nxt_task_t *task,
125 nxt_port_t *port, nxt_fd_t fd);
126 static void nxt_router_listen_socket_rpc_create(nxt_task_t *task,
127 nxt_router_temp_conf_t *tmcf, nxt_socket_conf_t *skcf);
128 static void nxt_router_listen_socket_ready(nxt_task_t *task,
129 nxt_port_recv_msg_t *msg, void *data);
130 static void nxt_router_listen_socket_error(nxt_task_t *task,
131 nxt_port_recv_msg_t *msg, void *data);
132 #if (NXT_TLS)
133 static void nxt_router_tls_rpc_handler(nxt_task_t *task,
134 nxt_port_recv_msg_t *msg, void *data);
135 static nxt_int_t nxt_router_conf_tls_insert(nxt_router_temp_conf_t *tmcf,
136 nxt_conf_value_t *value, nxt_socket_conf_t *skcf, nxt_tls_init_t *tls_init,
137 nxt_bool_t last);
138 #endif
139 static void nxt_router_app_rpc_create(nxt_task_t *task,
140 nxt_router_temp_conf_t *tmcf, nxt_app_t *app);
141 static void nxt_router_app_prefork_ready(nxt_task_t *task,
142 nxt_port_recv_msg_t *msg, void *data);
143 static void nxt_router_app_prefork_error(nxt_task_t *task,
144 nxt_port_recv_msg_t *msg, void *data);
145 static nxt_socket_conf_t *nxt_router_socket_conf(nxt_task_t *task,
146 nxt_router_temp_conf_t *tmcf, nxt_str_t *name);
147 static nxt_int_t nxt_router_listen_socket_find(nxt_router_temp_conf_t *tmcf,
148 nxt_socket_conf_t *nskcf, nxt_sockaddr_t *sa);
149
150 static nxt_int_t nxt_router_engines_create(nxt_task_t *task,
151 nxt_router_t *router, nxt_router_temp_conf_t *tmcf,
152 const nxt_event_interface_t *interface);
153 static nxt_int_t nxt_router_engine_conf_create(nxt_router_temp_conf_t *tmcf,
154 nxt_router_engine_conf_t *recf);
155 static nxt_int_t nxt_router_engine_conf_update(nxt_router_temp_conf_t *tmcf,
156 nxt_router_engine_conf_t *recf);
157 static nxt_int_t nxt_router_engine_conf_delete(nxt_router_temp_conf_t *tmcf,
158 nxt_router_engine_conf_t *recf);
159 static nxt_int_t nxt_router_engine_joints_create(nxt_router_temp_conf_t *tmcf,
160 nxt_router_engine_conf_t *recf, nxt_queue_t *sockets,
161 nxt_work_handler_t handler);
162 static nxt_int_t nxt_router_engine_quit(nxt_router_temp_conf_t *tmcf,
163 nxt_router_engine_conf_t *recf);
164 static nxt_int_t nxt_router_engine_joints_delete(nxt_router_temp_conf_t *tmcf,
165 nxt_router_engine_conf_t *recf, nxt_queue_t *sockets);
166
167 static nxt_int_t nxt_router_threads_create(nxt_task_t *task, nxt_runtime_t *rt,
168 nxt_router_temp_conf_t *tmcf);
169 static nxt_int_t nxt_router_thread_create(nxt_task_t *task, nxt_runtime_t *rt,
170 nxt_event_engine_t *engine);
171 static void nxt_router_apps_sort(nxt_task_t *task, nxt_router_t *router,
172 nxt_router_temp_conf_t *tmcf);
173
174 static void nxt_router_engines_post(nxt_router_t *router,
175 nxt_router_temp_conf_t *tmcf);
176 static void nxt_router_engine_post(nxt_event_engine_t *engine,
177 nxt_work_t *jobs);
178
179 static void nxt_router_thread_start(void *data);
180 static void nxt_router_rt_add_port(nxt_task_t *task, void *obj,
181 void *data);
182 static void nxt_router_listen_socket_create(nxt_task_t *task, void *obj,
183 void *data);
184 static void nxt_router_listen_socket_update(nxt_task_t *task, void *obj,
185 void *data);
186 static void nxt_router_listen_socket_delete(nxt_task_t *task, void *obj,
187 void *data);
188 static void nxt_router_worker_thread_quit(nxt_task_t *task, void *obj,
189 void *data);
190 static void nxt_router_listen_socket_close(nxt_task_t *task, void *obj,
191 void *data);
192 static void nxt_router_thread_exit_handler(nxt_task_t *task, void *obj,
193 void *data);
194 static void nxt_router_req_headers_ack_handler(nxt_task_t *task,
195 nxt_port_recv_msg_t *msg, nxt_request_rpc_data_t *req_rpc_data);
196 static void nxt_router_listen_socket_release(nxt_task_t *task,
197 nxt_socket_conf_t *skcf);
198
199 static void nxt_router_app_port_ready(nxt_task_t *task,
200 nxt_port_recv_msg_t *msg, void *data);
201 static void nxt_router_app_port_error(nxt_task_t *task,
202 nxt_port_recv_msg_t *msg, void *data);
203
204 static void nxt_router_app_use(nxt_task_t *task, nxt_app_t *app, int i);
205 static void nxt_router_app_unlink(nxt_task_t *task, nxt_app_t *app);
206
207 static void nxt_router_app_port_release(nxt_task_t *task, nxt_app_t *app,
208 nxt_port_t *port, nxt_apr_action_t action);
209 static void nxt_router_app_port_get(nxt_task_t *task, nxt_app_t *app,
210 nxt_request_rpc_data_t *req_rpc_data);
211 static void nxt_router_http_request_error(nxt_task_t *task, void *obj,
212 void *data);
213 static void nxt_router_http_request_done(nxt_task_t *task, void *obj,
214 void *data);
215
216 static void nxt_router_app_prepare_request(nxt_task_t *task,
217 nxt_request_rpc_data_t *req_rpc_data);
218 static nxt_buf_t *nxt_router_prepare_msg(nxt_task_t *task,
219 nxt_http_request_t *r, nxt_app_t *app, const nxt_str_t *prefix);
220
221 static void nxt_router_app_timeout(nxt_task_t *task, void *obj, void *data);
222 static void nxt_router_adjust_idle_timer(nxt_task_t *task, void *obj,
223 void *data);
224 static void nxt_router_app_idle_timeout(nxt_task_t *task, void *obj,
225 void *data);
226 static void nxt_router_app_joint_release_handler(nxt_task_t *task, void *obj,
227 void *data);
228 static void nxt_router_free_app(nxt_task_t *task, void *obj, void *data);
229
230 static const nxt_http_request_state_t nxt_http_request_send_state;
231 static void nxt_http_request_send_body(nxt_task_t *task, void *obj, void *data);
232
233 static void nxt_router_app_joint_use(nxt_task_t *task,
234 nxt_app_joint_t *app_joint, int i);
235
236 static void nxt_router_http_request_release_post(nxt_task_t *task,
237 nxt_http_request_t *r);
238 static void nxt_router_http_request_release(nxt_task_t *task, void *obj,
239 void *data);
240 static void nxt_router_oosm_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg);
241 static void nxt_router_get_port_handler(nxt_task_t *task,
242 nxt_port_recv_msg_t *msg);
243 static void nxt_router_get_mmap_handler(nxt_task_t *task,
244 nxt_port_recv_msg_t *msg);
245
246 extern const nxt_http_request_state_t nxt_http_websocket;
247
248 nxt_router_t *nxt_router;
249
250 static const nxt_str_t http_prefix = nxt_string("HTTP_");
251 static const nxt_str_t empty_prefix = nxt_string("");
252
253 static const nxt_str_t *nxt_app_msg_prefix[] = {
254 &empty_prefix,
255 &empty_prefix,
256 &http_prefix,
257 &http_prefix,
258 &http_prefix,
259 &empty_prefix,
260 };
261
262
263 static const nxt_port_handlers_t nxt_router_process_port_handlers = {
264 .quit = nxt_signal_quit_handler,
265 .new_port = nxt_router_new_port_handler,
266 .get_port = nxt_router_get_port_handler,
267 .change_file = nxt_port_change_log_file_handler,
268 .mmap = nxt_port_mmap_handler,
269 .get_mmap = nxt_router_get_mmap_handler,
270 .data = nxt_router_conf_data_handler,
271 .app_restart = nxt_router_app_restart_handler,
272 .remove_pid = nxt_router_remove_pid_handler,
273 .access_log = nxt_router_access_log_reopen_handler,
274 .rpc_ready = nxt_port_rpc_handler,
275 .rpc_error = nxt_port_rpc_handler,
276 .oosm = nxt_router_oosm_handler,
277 };
278
279
280 const nxt_process_init_t nxt_router_process = {
281 .name = "router",
282 .type = NXT_PROCESS_ROUTER,
283 .prefork = nxt_router_prefork,
284 .restart = 1,
285 .setup = nxt_process_core_setup,
286 .start = nxt_router_start,
287 .port_handlers = &nxt_router_process_port_handlers,
288 .signals = nxt_process_signals,
289 };
290
291
292 /* Queues of nxt_socket_conf_t */
293 nxt_queue_t creating_sockets;
294 nxt_queue_t pending_sockets;
295 nxt_queue_t updating_sockets;
296 nxt_queue_t keeping_sockets;
297 nxt_queue_t deleting_sockets;
298
299
300 static nxt_int_t
nxt_router_prefork(nxt_task_t * task,nxt_process_t * process,nxt_mp_t * mp)301 nxt_router_prefork(nxt_task_t *task, nxt_process_t *process, nxt_mp_t *mp)
302 {
303 nxt_runtime_stop_app_processes(task, task->thread->runtime);
304
305 return NXT_OK;
306 }
307
308
309 static nxt_int_t
nxt_router_start(nxt_task_t * task,nxt_process_data_t * data)310 nxt_router_start(nxt_task_t *task, nxt_process_data_t *data)
311 {
312 nxt_int_t ret;
313 nxt_port_t *controller_port;
314 nxt_router_t *router;
315 nxt_runtime_t *rt;
316
317 rt = task->thread->runtime;
318
319 nxt_log(task, NXT_LOG_INFO, "router started");
320
321 #if (NXT_TLS)
322 rt->tls = nxt_service_get(rt->services, "SSL/TLS", "OpenSSL");
323 if (nxt_slow_path(rt->tls == NULL)) {
324 return NXT_ERROR;
325 }
326
327 ret = rt->tls->library_init(task);
328 if (nxt_slow_path(ret != NXT_OK)) {
329 return ret;
330 }
331 #endif
332
333 ret = nxt_http_init(task);
334 if (nxt_slow_path(ret != NXT_OK)) {
335 return ret;
336 }
337
338 router = nxt_zalloc(sizeof(nxt_router_t));
339 if (nxt_slow_path(router == NULL)) {
340 return NXT_ERROR;
341 }
342
343 nxt_queue_init(&router->engines);
344 nxt_queue_init(&router->sockets);
345 nxt_queue_init(&router->apps);
346
347 nxt_router = router;
348
349 controller_port = rt->port_by_type[NXT_PROCESS_CONTROLLER];
350 if (controller_port != NULL) {
351 nxt_router_greet_controller(task, controller_port);
352 }
353
354 return NXT_OK;
355 }
356
357
358 static void
nxt_router_greet_controller(nxt_task_t * task,nxt_port_t * controller_port)359 nxt_router_greet_controller(nxt_task_t *task, nxt_port_t *controller_port)
360 {
361 nxt_port_socket_write(task, controller_port, NXT_PORT_MSG_PROCESS_READY,
362 -1, 0, 0, NULL);
363 }
364
365
366 static void
nxt_router_start_app_process_handler(nxt_task_t * task,nxt_port_t * port,void * data)367 nxt_router_start_app_process_handler(nxt_task_t *task, nxt_port_t *port,
368 void *data)
369 {
370 size_t size;
371 uint32_t stream;
372 nxt_fd_t port_fd, queue_fd;
373 nxt_int_t ret;
374 nxt_app_t *app;
375 nxt_buf_t *b;
376 nxt_port_t *dport;
377 nxt_runtime_t *rt;
378 nxt_app_joint_rpc_t *app_joint_rpc;
379
380 app = data;
381
382 nxt_thread_mutex_lock(&app->mutex);
383
384 dport = app->proto_port;
385
386 nxt_thread_mutex_unlock(&app->mutex);
387
388 if (dport != NULL) {
389 nxt_debug(task, "app '%V' %p start process", &app->name, app);
390
391 b = NULL;
392 port_fd = -1;
393 queue_fd = -1;
394
395 } else {
396 if (app->proto_port_requests > 0) {
397 nxt_debug(task, "app '%V' %p wait for prototype process",
398 &app->name, app);
399
400 app->proto_port_requests++;
401
402 goto skip;
403 }
404
405 nxt_debug(task, "app '%V' %p start prototype process", &app->name, app);
406
407 rt = task->thread->runtime;
408 dport = rt->port_by_type[NXT_PROCESS_MAIN];
409
410 size = app->name.length + 1 + app->conf.length;
411
412 b = nxt_buf_mem_alloc(task->thread->engine->mem_pool, size, 0);
413 if (nxt_slow_path(b == NULL)) {
414 goto failed;
415 }
416
417 nxt_buf_cpystr(b, &app->name);
418 *b->mem.free++ = '\0';
419 nxt_buf_cpystr(b, &app->conf);
420
421 port_fd = app->shared_port->pair[0];
422 queue_fd = app->shared_port->queue_fd;
423 }
424
425 app_joint_rpc = nxt_port_rpc_register_handler_ex(task, port,
426 nxt_router_app_port_ready,
427 nxt_router_app_port_error,
428 sizeof(nxt_app_joint_rpc_t));
429 if (nxt_slow_path(app_joint_rpc == NULL)) {
430 goto failed;
431 }
432
433 stream = nxt_port_rpc_ex_stream(app_joint_rpc);
434
435 ret = nxt_port_socket_write2(task, dport, NXT_PORT_MSG_START_PROCESS,
436 port_fd, queue_fd, stream, port->id, b);
437 if (nxt_slow_path(ret != NXT_OK)) {
438 nxt_port_rpc_cancel(task, port, stream);
439
440 goto failed;
441 }
442
443 app_joint_rpc->app_joint = app->joint;
444 app_joint_rpc->generation = app->generation;
445 app_joint_rpc->proto = (b != NULL);
446
447 if (b != NULL) {
448 app->proto_port_requests++;
449
450 b = NULL;
451 }
452
453 nxt_router_app_joint_use(task, app->joint, 1);
454
455 failed:
456
457 if (b != NULL) {
458 nxt_mp_free(b->data, b);
459 }
460
461 skip:
462
463 nxt_router_app_use(task, app, -1);
464 }
465
466
467 static void
nxt_router_app_joint_use(nxt_task_t * task,nxt_app_joint_t * app_joint,int i)468 nxt_router_app_joint_use(nxt_task_t *task, nxt_app_joint_t *app_joint, int i)
469 {
470 app_joint->use_count += i;
471
472 if (app_joint->use_count == 0) {
473 nxt_assert(app_joint->app == NULL);
474
475 nxt_free(app_joint);
476 }
477 }
478
479
480 static nxt_int_t
nxt_router_start_app_process(nxt_task_t * task,nxt_app_t * app)481 nxt_router_start_app_process(nxt_task_t *task, nxt_app_t *app)
482 {
483 nxt_int_t res;
484 nxt_port_t *router_port;
485 nxt_runtime_t *rt;
486
487 nxt_debug(task, "app '%V' start process", &app->name);
488
489 rt = task->thread->runtime;
490 router_port = rt->port_by_type[NXT_PROCESS_ROUTER];
491
492 nxt_router_app_use(task, app, 1);
493
494 res = nxt_port_post(task, router_port, nxt_router_start_app_process_handler,
495 app);
496
497 if (res == NXT_OK) {
498 return res;
499 }
500
501 nxt_thread_mutex_lock(&app->mutex);
502
503 app->pending_processes--;
504
505 nxt_thread_mutex_unlock(&app->mutex);
506
507 nxt_router_app_use(task, app, -1);
508
509 return NXT_ERROR;
510 }
511
512
513 nxt_inline nxt_bool_t
nxt_router_msg_cancel(nxt_task_t * task,nxt_request_rpc_data_t * req_rpc_data)514 nxt_router_msg_cancel(nxt_task_t *task, nxt_request_rpc_data_t *req_rpc_data)
515 {
516 nxt_buf_t *b, *next;
517 nxt_bool_t cancelled;
518 nxt_port_t *app_port;
519 nxt_msg_info_t *msg_info;
520
521 msg_info = &req_rpc_data->msg_info;
522
523 if (msg_info->buf == NULL) {
524 return 0;
525 }
526
527 app_port = req_rpc_data->app_port;
528
529 if (app_port != NULL && app_port->id == NXT_SHARED_PORT_ID) {
530 cancelled = nxt_app_queue_cancel(app_port->queue,
531 msg_info->tracking_cookie,
532 req_rpc_data->stream);
533
534 if (cancelled) {
535 nxt_debug(task, "stream #%uD: cancelled by router",
536 req_rpc_data->stream);
537 }
538
539 } else {
540 cancelled = 0;
541 }
542
543 for (b = msg_info->buf; b != NULL; b = next) {
544 next = b->next;
545 b->next = NULL;
546
547 if (b->is_port_mmap_sent) {
548 b->is_port_mmap_sent = cancelled == 0;
549 }
550
551 b->completion_handler(task, b, b->parent);
552 }
553
554 msg_info->buf = NULL;
555
556 return cancelled;
557 }
558
559
560 nxt_inline nxt_bool_t
nxt_queue_chk_remove(nxt_queue_link_t * lnk)561 nxt_queue_chk_remove(nxt_queue_link_t *lnk)
562 {
563 if (lnk->next != NULL) {
564 nxt_queue_remove(lnk);
565
566 lnk->next = NULL;
567
568 return 1;
569 }
570
571 return 0;
572 }
573
574
575 nxt_inline void
nxt_request_rpc_data_unlink(nxt_task_t * task,nxt_request_rpc_data_t * req_rpc_data)576 nxt_request_rpc_data_unlink(nxt_task_t *task,
577 nxt_request_rpc_data_t *req_rpc_data)
578 {
579 nxt_app_t *app;
580 nxt_bool_t unlinked;
581 nxt_http_request_t *r;
582
583 nxt_router_msg_cancel(task, req_rpc_data);
584
585 app = req_rpc_data->app;
586
587 if (req_rpc_data->app_port != NULL) {
588 nxt_router_app_port_release(task, app, req_rpc_data->app_port,
589 req_rpc_data->apr_action);
590
591 req_rpc_data->app_port = NULL;
592 }
593
594 r = req_rpc_data->request;
595
596 if (r != NULL) {
597 r->timer_data = NULL;
598
599 nxt_router_http_request_release_post(task, r);
600
601 r->req_rpc_data = NULL;
602 req_rpc_data->request = NULL;
603
604 if (app != NULL) {
605 unlinked = 0;
606
607 nxt_thread_mutex_lock(&app->mutex);
608
609 if (r->app_link.next != NULL) {
610 nxt_queue_remove(&r->app_link);
611 r->app_link.next = NULL;
612
613 unlinked = 1;
614 }
615
616 nxt_thread_mutex_unlock(&app->mutex);
617
618 if (unlinked) {
619 nxt_mp_release(r->mem_pool);
620 }
621 }
622 }
623
624 if (app != NULL) {
625 nxt_router_app_use(task, app, -1);
626
627 req_rpc_data->app = NULL;
628 }
629
630 if (req_rpc_data->msg_info.body_fd != -1) {
631 nxt_fd_close(req_rpc_data->msg_info.body_fd);
632
633 req_rpc_data->msg_info.body_fd = -1;
634 }
635
636 if (req_rpc_data->rpc_cancel) {
637 req_rpc_data->rpc_cancel = 0;
638
639 nxt_port_rpc_cancel(task, task->thread->engine->port,
640 req_rpc_data->stream);
641 }
642 }
643
644
645 static void
nxt_router_new_port_handler(nxt_task_t * task,nxt_port_recv_msg_t * msg)646 nxt_router_new_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
647 {
648 nxt_int_t res;
649 nxt_app_t *app;
650 nxt_port_t *port, *main_app_port;
651 nxt_runtime_t *rt;
652
653 nxt_port_new_port_handler(task, msg);
654
655 port = msg->u.new_port;
656
657 if (port != NULL && port->type == NXT_PROCESS_CONTROLLER) {
658 nxt_router_greet_controller(task, msg->u.new_port);
659 }
660
661 if (port != NULL && port->type == NXT_PROCESS_PROTOTYPE) {
662 nxt_port_rpc_handler(task, msg);
663
664 return;
665 }
666
667 if (port == NULL || port->type != NXT_PROCESS_APP) {
668
669 if (msg->port_msg.stream == 0) {
670 return;
671 }
672
673 msg->port_msg.type = _NXT_PORT_MSG_RPC_ERROR;
674
675 } else {
676 if (msg->fd[1] != -1) {
677 res = nxt_router_port_queue_map(task, port, msg->fd[1]);
678 if (nxt_slow_path(res != NXT_OK)) {
679 return;
680 }
681
682 nxt_fd_close(msg->fd[1]);
683 msg->fd[1] = -1;
684 }
685 }
686
687 if (msg->port_msg.stream != 0) {
688 nxt_port_rpc_handler(task, msg);
689 return;
690 }
691
692 nxt_debug(task, "new port id %d (%d)", port->id, port->type);
693
694 /*
695 * Port with "id == 0" is application 'main' port and it always
696 * should come with non-zero stream.
697 */
698 nxt_assert(port->id != 0);
699
700 /* Find 'main' app port and get app reference. */
701 rt = task->thread->runtime;
702
703 /*
704 * It is safe to access 'runtime->ports' hash because 'NEW_PORT'
705 * sent to main port (with id == 0) and processed in main thread.
706 */
707 main_app_port = nxt_port_hash_find(&rt->ports, port->pid, 0);
708 nxt_assert(main_app_port != NULL);
709
710 app = main_app_port->app;
711
712 if (nxt_fast_path(app != NULL)) {
713 nxt_thread_mutex_lock(&app->mutex);
714
715 /* TODO here should be find-and-add code because there can be
716 port waiters in port_hash */
717 nxt_port_hash_add(&app->port_hash, port);
718 app->port_hash_count++;
719
720 nxt_thread_mutex_unlock(&app->mutex);
721
722 port->app = app;
723 }
724
725 port->main_app_port = main_app_port;
726
727 nxt_port_socket_write(task, port, NXT_PORT_MSG_PORT_ACK, -1, 0, 0, NULL);
728 }
729
730
731 static void
nxt_router_conf_data_handler(nxt_task_t * task,nxt_port_recv_msg_t * msg)732 nxt_router_conf_data_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
733 {
734 void *p;
735 size_t size;
736 nxt_int_t ret;
737 nxt_port_t *port;
738 nxt_router_temp_conf_t *tmcf;
739
740 port = nxt_runtime_port_find(task->thread->runtime,
741 msg->port_msg.pid,
742 msg->port_msg.reply_port);
743 if (nxt_slow_path(port == NULL)) {
744 nxt_alert(task, "conf_data_handler: reply port not found");
745 return;
746 }
747
748 p = MAP_FAILED;
749
750 /*
751 * Ancient compilers like gcc 4.8.5 on CentOS 7 wants 'size' to be
752 * initialized in 'cleanup' section.
753 */
754 size = 0;
755
756 tmcf = nxt_router_temp_conf(task);
757 if (nxt_slow_path(tmcf == NULL)) {
758 goto fail;
759 }
760
761 if (nxt_slow_path(msg->fd[0] == -1)) {
762 nxt_alert(task, "conf_data_handler: invalid shm fd");
763 goto fail;
764 }
765
766 if (nxt_buf_mem_used_size(&msg->buf->mem) != sizeof(size_t)) {
767 nxt_alert(task, "conf_data_handler: unexpected buffer size (%d)",
768 (int) nxt_buf_mem_used_size(&msg->buf->mem));
769 goto fail;
770 }
771
772 nxt_memcpy(&size, msg->buf->mem.pos, sizeof(size_t));
773
774 p = nxt_mem_mmap(NULL, size, PROT_READ, MAP_SHARED, msg->fd[0], 0);
775
776 nxt_fd_close(msg->fd[0]);
777 msg->fd[0] = -1;
778
779 if (nxt_slow_path(p == MAP_FAILED)) {
780 goto fail;
781 }
782
783 nxt_debug(task, "conf_data_handler(%uz): %*s", size, size, p);
784
785 tmcf->router_conf->router = nxt_router;
786 tmcf->stream = msg->port_msg.stream;
787 tmcf->port = port;
788
789 nxt_port_use(task, tmcf->port, 1);
790
791 ret = nxt_router_conf_create(task, tmcf, p, nxt_pointer_to(p, size));
792
793 if (nxt_fast_path(ret == NXT_OK)) {
794 nxt_router_conf_apply(task, tmcf, NULL);
795
796 } else {
797 nxt_router_conf_error(task, tmcf);
798 }
799
800 goto cleanup;
801
802 fail:
803
804 nxt_port_socket_write(task, port, NXT_PORT_MSG_RPC_ERROR, -1,
805 msg->port_msg.stream, 0, NULL);
806
807 if (tmcf != NULL) {
808 nxt_mp_release(tmcf->mem_pool);
809 }
810
811 cleanup:
812
813 if (p != MAP_FAILED) {
814 nxt_mem_munmap(p, size);
815 }
816
817 if (msg->fd[0] != -1) {
818 nxt_fd_close(msg->fd[0]);
819 msg->fd[0] = -1;
820 }
821 }
822
823
824 static void
nxt_router_app_restart_handler(nxt_task_t * task,nxt_port_recv_msg_t * msg)825 nxt_router_app_restart_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
826 {
827 nxt_app_t *app;
828 nxt_int_t ret;
829 nxt_str_t app_name;
830 nxt_port_t *reply_port, *shared_port, *old_shared_port;
831 nxt_port_t *proto_port;
832 nxt_port_msg_type_t reply;
833
834 reply_port = nxt_runtime_port_find(task->thread->runtime,
835 msg->port_msg.pid,
836 msg->port_msg.reply_port);
837 if (nxt_slow_path(reply_port == NULL)) {
838 nxt_alert(task, "app_restart_handler: reply port not found");
839 return;
840 }
841
842 app_name.length = nxt_buf_mem_used_size(&msg->buf->mem);
843 app_name.start = msg->buf->mem.pos;
844
845 nxt_debug(task, "app_restart_handler: %V", &app_name);
846
847 app = nxt_router_app_find(&nxt_router->apps, &app_name);
848
849 if (nxt_fast_path(app != NULL)) {
850 shared_port = nxt_port_new(task, NXT_SHARED_PORT_ID, nxt_pid,
851 NXT_PROCESS_APP);
852 if (nxt_slow_path(shared_port == NULL)) {
853 goto fail;
854 }
855
856 ret = nxt_port_socket_init(task, shared_port, 0);
857 if (nxt_slow_path(ret != NXT_OK)) {
858 nxt_port_use(task, shared_port, -1);
859 goto fail;
860 }
861
862 ret = nxt_router_app_queue_init(task, shared_port);
863 if (nxt_slow_path(ret != NXT_OK)) {
864 nxt_port_write_close(shared_port);
865 nxt_port_read_close(shared_port);
866 nxt_port_use(task, shared_port, -1);
867 goto fail;
868 }
869
870 nxt_port_write_enable(task, shared_port);
871
872 nxt_thread_mutex_lock(&app->mutex);
873
874 proto_port = app->proto_port;
875
876 if (proto_port != NULL) {
877 nxt_debug(task, "send QUIT to prototype '%V' pid %PI", &app->name,
878 proto_port->pid);
879
880 app->proto_port = NULL;
881 proto_port->app = NULL;
882 }
883
884 app->generation++;
885
886 shared_port->app = app;
887
888 old_shared_port = app->shared_port;
889 old_shared_port->app = NULL;
890
891 app->shared_port = shared_port;
892
893 nxt_thread_mutex_unlock(&app->mutex);
894
895 nxt_port_close(task, old_shared_port);
896 nxt_port_use(task, old_shared_port, -1);
897
898 if (proto_port != NULL) {
899 (void) nxt_port_socket_write(task, proto_port, NXT_PORT_MSG_QUIT,
900 -1, 0, 0, NULL);
901
902 nxt_port_close(task, proto_port);
903
904 nxt_port_use(task, proto_port, -1);
905 }
906
907 reply = NXT_PORT_MSG_RPC_READY_LAST;
908
909 } else {
910
911 fail:
912
913 reply = NXT_PORT_MSG_RPC_ERROR;
914 }
915
916 nxt_port_socket_write(task, reply_port, reply, -1, msg->port_msg.stream,
917 0, NULL);
918 }
919
920
921 static void
nxt_router_app_process_remove_pid(nxt_task_t * task,nxt_port_t * port,void * data)922 nxt_router_app_process_remove_pid(nxt_task_t *task, nxt_port_t *port,
923 void *data)
924 {
925 union {
926 nxt_pid_t removed_pid;
927 void *data;
928 } u;
929
930 u.data = data;
931
932 nxt_port_rpc_remove_peer(task, port, u.removed_pid);
933 }
934
935
936 static void
nxt_router_remove_pid_handler(nxt_task_t * task,nxt_port_recv_msg_t * msg)937 nxt_router_remove_pid_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
938 {
939 nxt_event_engine_t *engine;
940
941 nxt_port_remove_pid_handler(task, msg);
942
943 nxt_queue_each(engine, &nxt_router->engines, nxt_event_engine_t, link0)
944 {
945 if (nxt_fast_path(engine->port != NULL)) {
946 nxt_port_post(task, engine->port, nxt_router_app_process_remove_pid,
947 msg->u.data);
948 }
949 }
950 nxt_queue_loop;
951
952 if (msg->port_msg.stream == 0) {
953 return;
954 }
955
956 msg->port_msg.type = _NXT_PORT_MSG_RPC_ERROR;
957
958 nxt_port_rpc_handler(task, msg);
959 }
960
961
962 static nxt_router_temp_conf_t *
nxt_router_temp_conf(nxt_task_t * task)963 nxt_router_temp_conf(nxt_task_t *task)
964 {
965 nxt_mp_t *mp, *tmp;
966 nxt_router_conf_t *rtcf;
967 nxt_router_temp_conf_t *tmcf;
968
969 mp = nxt_mp_create(1024, 128, 256, 32);
970 if (nxt_slow_path(mp == NULL)) {
971 return NULL;
972 }
973
974 rtcf = nxt_mp_zget(mp, sizeof(nxt_router_conf_t));
975 if (nxt_slow_path(rtcf == NULL)) {
976 goto fail;
977 }
978
979 rtcf->mem_pool = mp;
980
981 rtcf->var_fields = nxt_array_create(mp, 4, sizeof(nxt_var_field_t));
982 if (nxt_slow_path(rtcf->var_fields == NULL)) {
983 goto fail;
984 }
985
986 tmp = nxt_mp_create(1024, 128, 256, 32);
987 if (nxt_slow_path(tmp == NULL)) {
988 goto fail;
989 }
990
991 tmcf = nxt_mp_zget(tmp, sizeof(nxt_router_temp_conf_t));
992 if (nxt_slow_path(tmcf == NULL)) {
993 goto temp_fail;
994 }
995
996 tmcf->mem_pool = tmp;
997 tmcf->router_conf = rtcf;
998 tmcf->count = 1;
999 tmcf->engine = task->thread->engine;
1000
1001 tmcf->engines = nxt_array_create(tmcf->mem_pool, 4,
1002 sizeof(nxt_router_engine_conf_t));
1003 if (nxt_slow_path(tmcf->engines == NULL)) {
1004 goto temp_fail;
1005 }
1006
1007 nxt_queue_init(&creating_sockets);
1008 nxt_queue_init(&pending_sockets);
1009 nxt_queue_init(&updating_sockets);
1010 nxt_queue_init(&keeping_sockets);
1011 nxt_queue_init(&deleting_sockets);
1012
1013 #if (NXT_TLS)
1014 nxt_queue_init(&tmcf->tls);
1015 #endif
1016
1017 nxt_queue_init(&tmcf->apps);
1018 nxt_queue_init(&tmcf->previous);
1019
1020 return tmcf;
1021
1022 temp_fail:
1023
1024 nxt_mp_destroy(tmp);
1025
1026 fail:
1027
1028 nxt_mp_destroy(mp);
1029
1030 return NULL;
1031 }
1032
1033
1034 nxt_inline nxt_bool_t
nxt_router_app_can_start(nxt_app_t * app)1035 nxt_router_app_can_start(nxt_app_t *app)
1036 {
1037 return app->processes + app->pending_processes < app->max_processes
1038 && app->pending_processes < app->max_pending_processes;
1039 }
1040
1041
1042 nxt_inline nxt_bool_t
nxt_router_app_need_start(nxt_app_t * app)1043 nxt_router_app_need_start(nxt_app_t *app)
1044 {
1045 return (app->active_requests
1046 > app->port_hash_count + app->pending_processes)
1047 || (app->spare_processes
1048 > app->idle_processes + app->pending_processes);
1049 }
1050
1051
1052 void
nxt_router_conf_apply(nxt_task_t * task,void * obj,void * data)1053 nxt_router_conf_apply(nxt_task_t *task, void *obj, void *data)
1054 {
1055 nxt_int_t ret;
1056 nxt_app_t *app;
1057 nxt_router_t *router;
1058 nxt_runtime_t *rt;
1059 nxt_queue_link_t *qlk;
1060 nxt_socket_conf_t *skcf;
1061 nxt_router_conf_t *rtcf;
1062 nxt_router_temp_conf_t *tmcf;
1063 const nxt_event_interface_t *interface;
1064 #if (NXT_TLS)
1065 nxt_router_tlssock_t *tls;
1066 #endif
1067
1068 tmcf = obj;
1069
1070 qlk = nxt_queue_first(&pending_sockets);
1071
1072 if (qlk != nxt_queue_tail(&pending_sockets)) {
1073 nxt_queue_remove(qlk);
1074 nxt_queue_insert_tail(&creating_sockets, qlk);
1075
1076 skcf = nxt_queue_link_data(qlk, nxt_socket_conf_t, link);
1077
1078 nxt_router_listen_socket_rpc_create(task, tmcf, skcf);
1079
1080 return;
1081 }
1082
1083 #if (NXT_TLS)
1084 qlk = nxt_queue_last(&tmcf->tls);
1085
1086 if (qlk != nxt_queue_head(&tmcf->tls)) {
1087 nxt_queue_remove(qlk);
1088
1089 tls = nxt_queue_link_data(qlk, nxt_router_tlssock_t, link);
1090
1091 nxt_cert_store_get(task, &tls->name, tmcf->mem_pool,
1092 nxt_router_tls_rpc_handler, tls);
1093 return;
1094 }
1095 #endif
1096
1097 nxt_queue_each(app, &tmcf->apps, nxt_app_t, link) {
1098
1099 if (nxt_router_app_need_start(app)) {
1100 nxt_router_app_rpc_create(task, tmcf, app);
1101 return;
1102 }
1103
1104 } nxt_queue_loop;
1105
1106 rtcf = tmcf->router_conf;
1107
1108 if (rtcf->access_log != NULL && rtcf->access_log->fd == -1) {
1109 nxt_router_access_log_open(task, tmcf);
1110 return;
1111 }
1112
1113 rt = task->thread->runtime;
1114
1115 interface = nxt_service_get(rt->services, "engine", NULL);
1116
1117 router = rtcf->router;
1118
1119 ret = nxt_router_engines_create(task, router, tmcf, interface);
1120 if (nxt_slow_path(ret != NXT_OK)) {
1121 goto fail;
1122 }
1123
1124 ret = nxt_router_threads_create(task, rt, tmcf);
1125 if (nxt_slow_path(ret != NXT_OK)) {
1126 goto fail;
1127 }
1128
1129 nxt_router_apps_sort(task, router, tmcf);
1130
1131 nxt_router_apps_hash_use(task, rtcf, 1);
1132
1133 nxt_router_engines_post(router, tmcf);
1134
1135 nxt_queue_add(&router->sockets, &updating_sockets);
1136 nxt_queue_add(&router->sockets, &creating_sockets);
1137
1138 if (router->access_log != rtcf->access_log) {
1139 nxt_router_access_log_use(&router->lock, rtcf->access_log);
1140
1141 nxt_router_access_log_release(task, &router->lock, router->access_log);
1142
1143 router->access_log = rtcf->access_log;
1144 }
1145
1146 nxt_router_conf_ready(task, tmcf);
1147
1148 return;
1149
1150 fail:
1151
1152 nxt_router_conf_error(task, tmcf);
1153
1154 return;
1155 }
1156
1157
1158 static void
nxt_router_conf_wait(nxt_task_t * task,void * obj,void * data)1159 nxt_router_conf_wait(nxt_task_t *task, void *obj, void *data)
1160 {
1161 nxt_joint_job_t *job;
1162
1163 job = obj;
1164
1165 nxt_router_conf_ready(task, job->tmcf);
1166 }
1167
1168
1169 static void
nxt_router_conf_ready(nxt_task_t * task,nxt_router_temp_conf_t * tmcf)1170 nxt_router_conf_ready(nxt_task_t *task, nxt_router_temp_conf_t *tmcf)
1171 {
1172 uint32_t count;
1173 nxt_router_conf_t *rtcf;
1174 nxt_thread_spinlock_t *lock;
1175
1176 nxt_debug(task, "temp conf %p count: %D", tmcf, tmcf->count);
1177
1178 if (--tmcf->count > 0) {
1179 return;
1180 }
1181
1182 nxt_router_conf_send(task, tmcf, NXT_PORT_MSG_RPC_READY_LAST);
1183
1184 rtcf = tmcf->router_conf;
1185
1186 lock = &rtcf->router->lock;
1187
1188 nxt_thread_spin_lock(lock);
1189
1190 count = rtcf->count;
1191
1192 nxt_thread_spin_unlock(lock);
1193
1194 nxt_debug(task, "rtcf %p: %D", rtcf, count);
1195
1196 if (count == 0) {
1197 nxt_router_apps_hash_use(task, rtcf, -1);
1198
1199 nxt_router_access_log_release(task, lock, rtcf->access_log);
1200
1201 nxt_mp_destroy(rtcf->mem_pool);
1202 }
1203
1204 nxt_mp_release(tmcf->mem_pool);
1205 }
1206
1207
1208 void
nxt_router_conf_error(nxt_task_t * task,nxt_router_temp_conf_t * tmcf)1209 nxt_router_conf_error(nxt_task_t *task, nxt_router_temp_conf_t *tmcf)
1210 {
1211 nxt_app_t *app;
1212 nxt_socket_t s;
1213 nxt_router_t *router;
1214 nxt_queue_link_t *qlk;
1215 nxt_socket_conf_t *skcf;
1216 nxt_router_conf_t *rtcf;
1217
1218 nxt_alert(task, "failed to apply new conf");
1219
1220 for (qlk = nxt_queue_first(&creating_sockets);
1221 qlk != nxt_queue_tail(&creating_sockets);
1222 qlk = nxt_queue_next(qlk))
1223 {
1224 skcf = nxt_queue_link_data(qlk, nxt_socket_conf_t, link);
1225 s = skcf->listen->socket;
1226
1227 if (s != -1) {
1228 nxt_socket_close(task, s);
1229 }
1230
1231 nxt_free(skcf->listen);
1232 }
1233
1234 rtcf = tmcf->router_conf;
1235
1236 nxt_queue_each(app, &tmcf->apps, nxt_app_t, link) {
1237
1238 nxt_router_app_unlink(task, app);
1239
1240 } nxt_queue_loop;
1241
1242 router = rtcf->router;
1243
1244 nxt_queue_add(&router->sockets, &keeping_sockets);
1245 nxt_queue_add(&router->sockets, &deleting_sockets);
1246
1247 nxt_queue_add(&router->apps, &tmcf->previous);
1248
1249 // TODO: new engines and threads
1250
1251 nxt_router_access_log_release(task, &router->lock, rtcf->access_log);
1252
1253 nxt_mp_destroy(rtcf->mem_pool);
1254
1255 nxt_router_conf_send(task, tmcf, NXT_PORT_MSG_RPC_ERROR);
1256
1257 nxt_mp_release(tmcf->mem_pool);
1258 }
1259
1260
1261 static void
nxt_router_conf_send(nxt_task_t * task,nxt_router_temp_conf_t * tmcf,nxt_port_msg_type_t type)1262 nxt_router_conf_send(nxt_task_t *task, nxt_router_temp_conf_t *tmcf,
1263 nxt_port_msg_type_t type)
1264 {
1265 nxt_port_socket_write(task, tmcf->port, type, -1, tmcf->stream, 0, NULL);
1266
1267 nxt_port_use(task, tmcf->port, -1);
1268
1269 tmcf->port = NULL;
1270 }
1271
1272
1273 static nxt_conf_map_t nxt_router_conf[] = {
1274 {
1275 nxt_string("listeners_threads"),
1276 NXT_CONF_MAP_INT32,
1277 offsetof(nxt_router_conf_t, threads),
1278 },
1279 };
1280
1281
1282 static nxt_conf_map_t nxt_router_app_conf[] = {
1283 {
1284 nxt_string("type"),
1285 NXT_CONF_MAP_STR,
1286 offsetof(nxt_router_app_conf_t, type),
1287 },
1288
1289 {
1290 nxt_string("limits"),
1291 NXT_CONF_MAP_PTR,
1292 offsetof(nxt_router_app_conf_t, limits_value),
1293 },
1294
1295 {
1296 nxt_string("processes"),
1297 NXT_CONF_MAP_INT32,
1298 offsetof(nxt_router_app_conf_t, processes),
1299 },
1300
1301 {
1302 nxt_string("processes"),
1303 NXT_CONF_MAP_PTR,
1304 offsetof(nxt_router_app_conf_t, processes_value),
1305 },
1306
1307 {
1308 nxt_string("targets"),
1309 NXT_CONF_MAP_PTR,
1310 offsetof(nxt_router_app_conf_t, targets_value),
1311 },
1312 };
1313
1314
1315 static nxt_conf_map_t nxt_router_app_limits_conf[] = {
1316 {
1317 nxt_string("timeout"),
1318 NXT_CONF_MAP_MSEC,
1319 offsetof(nxt_router_app_conf_t, timeout),
1320 },
1321 };
1322
1323
1324 static nxt_conf_map_t nxt_router_app_processes_conf[] = {
1325 {
1326 nxt_string("spare"),
1327 NXT_CONF_MAP_INT32,
1328 offsetof(nxt_router_app_conf_t, spare_processes),
1329 },
1330
1331 {
1332 nxt_string("max"),
1333 NXT_CONF_MAP_INT32,
1334 offsetof(nxt_router_app_conf_t, max_processes),
1335 },
1336
1337 {
1338 nxt_string("idle_timeout"),
1339 NXT_CONF_MAP_MSEC,
1340 offsetof(nxt_router_app_conf_t, idle_timeout),
1341 },
1342 };
1343
1344
1345 static nxt_conf_map_t nxt_router_listener_conf[] = {
1346 {
1347 nxt_string("pass"),
1348 NXT_CONF_MAP_STR_COPY,
1349 offsetof(nxt_router_listener_conf_t, pass),
1350 },
1351
1352 {
1353 nxt_string("application"),
1354 NXT_CONF_MAP_STR_COPY,
1355 offsetof(nxt_router_listener_conf_t, application),
1356 },
1357 };
1358
1359
1360 static nxt_conf_map_t nxt_router_http_conf[] = {
1361 {
1362 nxt_string("header_buffer_size"),
1363 NXT_CONF_MAP_SIZE,
1364 offsetof(nxt_socket_conf_t, header_buffer_size),
1365 },
1366
1367 {
1368 nxt_string("large_header_buffer_size"),
1369 NXT_CONF_MAP_SIZE,
1370 offsetof(nxt_socket_conf_t, large_header_buffer_size),
1371 },
1372
1373 {
1374 nxt_string("large_header_buffers"),
1375 NXT_CONF_MAP_SIZE,
1376 offsetof(nxt_socket_conf_t, large_header_buffers),
1377 },
1378
1379 {
1380 nxt_string("body_buffer_size"),
1381 NXT_CONF_MAP_SIZE,
1382 offsetof(nxt_socket_conf_t, body_buffer_size),
1383 },
1384
1385 {
1386 nxt_string("max_body_size"),
1387 NXT_CONF_MAP_SIZE,
1388 offsetof(nxt_socket_conf_t, max_body_size),
1389 },
1390
1391 {
1392 nxt_string("idle_timeout"),
1393 NXT_CONF_MAP_MSEC,
1394 offsetof(nxt_socket_conf_t, idle_timeout),
1395 },
1396
1397 {
1398 nxt_string("header_read_timeout"),
1399 NXT_CONF_MAP_MSEC,
1400 offsetof(nxt_socket_conf_t, header_read_timeout),
1401 },
1402
1403 {
1404 nxt_string("body_read_timeout"),
1405 NXT_CONF_MAP_MSEC,
1406 offsetof(nxt_socket_conf_t, body_read_timeout),
1407 },
1408
1409 {
1410 nxt_string("send_timeout"),
1411 NXT_CONF_MAP_MSEC,
1412 offsetof(nxt_socket_conf_t, send_timeout),
1413 },
1414
1415 {
1416 nxt_string("body_temp_path"),
1417 NXT_CONF_MAP_STR,
1418 offsetof(nxt_socket_conf_t, body_temp_path),
1419 },
1420
1421 {
1422 nxt_string("discard_unsafe_fields"),
1423 NXT_CONF_MAP_INT8,
1424 offsetof(nxt_socket_conf_t, discard_unsafe_fields),
1425 },
1426 };
1427
1428
1429 static nxt_conf_map_t nxt_router_websocket_conf[] = {
1430 {
1431 nxt_string("max_frame_size"),
1432 NXT_CONF_MAP_SIZE,
1433 offsetof(nxt_websocket_conf_t, max_frame_size),
1434 },
1435
1436 {
1437 nxt_string("read_timeout"),
1438 NXT_CONF_MAP_MSEC,
1439 offsetof(nxt_websocket_conf_t, read_timeout),
1440 },
1441
1442 {
1443 nxt_string("keepalive_interval"),
1444 NXT_CONF_MAP_MSEC,
1445 offsetof(nxt_websocket_conf_t, keepalive_interval),
1446 },
1447
1448 };
1449
1450
1451 static nxt_int_t
nxt_router_conf_create(nxt_task_t * task,nxt_router_temp_conf_t * tmcf,u_char * start,u_char * end)1452 nxt_router_conf_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf,
1453 u_char *start, u_char *end)
1454 {
1455 u_char *p;
1456 size_t size;
1457 nxt_mp_t *mp, *app_mp;
1458 uint32_t next, next_target;
1459 nxt_int_t ret;
1460 nxt_str_t name, target;
1461 nxt_app_t *app, *prev;
1462 nxt_str_t *t, *s, *targets;
1463 nxt_uint_t n, i;
1464 nxt_port_t *port;
1465 nxt_router_t *router;
1466 nxt_app_joint_t *app_joint;
1467 #if (NXT_TLS)
1468 nxt_tls_init_t *tls_init;
1469 nxt_conf_value_t *certificate;
1470 #endif
1471 nxt_conf_value_t *root, *conf, *http, *value, *websocket;
1472 nxt_conf_value_t *applications, *application;
1473 nxt_conf_value_t *listeners, *listener;
1474 nxt_socket_conf_t *skcf;
1475 nxt_router_conf_t *rtcf;
1476 nxt_http_routes_t *routes;
1477 nxt_event_engine_t *engine;
1478 nxt_app_lang_module_t *lang;
1479 nxt_router_app_conf_t apcf;
1480 nxt_router_listener_conf_t lscf;
1481
1482 static nxt_str_t http_path = nxt_string("/settings/http");
1483 static nxt_str_t applications_path = nxt_string("/applications");
1484 static nxt_str_t listeners_path = nxt_string("/listeners");
1485 static nxt_str_t routes_path = nxt_string("/routes");
1486 static nxt_str_t access_log_path = nxt_string("/access_log");
1487 #if (NXT_TLS)
1488 static nxt_str_t certificate_path = nxt_string("/tls/certificate");
1489 static nxt_str_t conf_commands_path = nxt_string("/tls/conf_commands");
1490 static nxt_str_t conf_cache_path = nxt_string("/tls/session/cache_size");
1491 static nxt_str_t conf_timeout_path = nxt_string("/tls/session/timeout");
1492 static nxt_str_t conf_tickets = nxt_string("/tls/session/tickets");
1493 #endif
1494 static nxt_str_t static_path = nxt_string("/settings/http/static");
1495 static nxt_str_t websocket_path = nxt_string("/settings/http/websocket");
1496 static nxt_str_t forwarded_path = nxt_string("/forwarded");
1497 static nxt_str_t client_ip_path = nxt_string("/client_ip");
1498
1499 root = nxt_conf_json_parse(tmcf->mem_pool, start, end, NULL);
1500 if (root == NULL) {
1501 nxt_alert(task, "configuration parsing error");
1502 return NXT_ERROR;
1503 }
1504
1505 rtcf = tmcf->router_conf;
1506 mp = rtcf->mem_pool;
1507
1508 ret = nxt_conf_map_object(mp, root, nxt_router_conf,
1509 nxt_nitems(nxt_router_conf), rtcf);
1510 if (ret != NXT_OK) {
1511 nxt_alert(task, "root map error");
1512 return NXT_ERROR;
1513 }
1514
1515 if (rtcf->threads == 0) {
1516 rtcf->threads = nxt_ncpu;
1517 }
1518
1519 conf = nxt_conf_get_path(root, &static_path);
1520
1521 ret = nxt_router_conf_process_static(task, rtcf, conf);
1522 if (nxt_slow_path(ret != NXT_OK)) {
1523 return NXT_ERROR;
1524 }
1525
1526 router = rtcf->router;
1527
1528 applications = nxt_conf_get_path(root, &applications_path);
1529
1530 if (applications != NULL) {
1531 next = 0;
1532
1533 for ( ;; ) {
1534 application = nxt_conf_next_object_member(applications,
1535 &name, &next);
1536 if (application == NULL) {
1537 break;
1538 }
1539
1540 nxt_debug(task, "application \"%V\"", &name);
1541
1542 size = nxt_conf_json_length(application, NULL);
1543
1544 app_mp = nxt_mp_create(4096, 128, 1024, 64);
1545 if (nxt_slow_path(app_mp == NULL)) {
1546 goto fail;
1547 }
1548
1549 app = nxt_mp_get(app_mp, sizeof(nxt_app_t) + name.length + size);
1550 if (app == NULL) {
1551 goto app_fail;
1552 }
1553
1554 nxt_memzero(app, sizeof(nxt_app_t));
1555
1556 app->mem_pool = app_mp;
1557
1558 app->name.start = nxt_pointer_to(app, sizeof(nxt_app_t));
1559 app->conf.start = nxt_pointer_to(app, sizeof(nxt_app_t)
1560 + name.length);
1561
1562 p = nxt_conf_json_print(app->conf.start, application, NULL);
1563 app->conf.length = p - app->conf.start;
1564
1565 nxt_assert(app->conf.length <= size);
1566
1567 nxt_debug(task, "application conf \"%V\"", &app->conf);
1568
1569 prev = nxt_router_app_find(&router->apps, &name);
1570
1571 if (prev != NULL && nxt_strstr_eq(&app->conf, &prev->conf)) {
1572 nxt_mp_destroy(app_mp);
1573
1574 nxt_queue_remove(&prev->link);
1575 nxt_queue_insert_tail(&tmcf->previous, &prev->link);
1576
1577 ret = nxt_router_apps_hash_add(rtcf, prev);
1578 if (nxt_slow_path(ret != NXT_OK)) {
1579 goto fail;
1580 }
1581
1582 continue;
1583 }
1584
1585 apcf.processes = 1;
1586 apcf.max_processes = 1;
1587 apcf.spare_processes = 0;
1588 apcf.timeout = 0;
1589 apcf.idle_timeout = 15000;
1590 apcf.limits_value = NULL;
1591 apcf.processes_value = NULL;
1592 apcf.targets_value = NULL;
1593
1594 app_joint = nxt_malloc(sizeof(nxt_app_joint_t));
1595 if (nxt_slow_path(app_joint == NULL)) {
1596 goto app_fail;
1597 }
1598
1599 nxt_memzero(app_joint, sizeof(nxt_app_joint_t));
1600
1601 ret = nxt_conf_map_object(mp, application, nxt_router_app_conf,
1602 nxt_nitems(nxt_router_app_conf), &apcf);
1603 if (ret != NXT_OK) {
1604 nxt_alert(task, "application map error");
1605 goto app_fail;
1606 }
1607
1608 if (apcf.limits_value != NULL) {
1609
1610 if (nxt_conf_type(apcf.limits_value) != NXT_CONF_OBJECT) {
1611 nxt_alert(task, "application limits is not object");
1612 goto app_fail;
1613 }
1614
1615 ret = nxt_conf_map_object(mp, apcf.limits_value,
1616 nxt_router_app_limits_conf,
1617 nxt_nitems(nxt_router_app_limits_conf),
1618 &apcf);
1619 if (ret != NXT_OK) {
1620 nxt_alert(task, "application limits map error");
1621 goto app_fail;
1622 }
1623 }
1624
1625 if (apcf.processes_value != NULL
1626 && nxt_conf_type(apcf.processes_value) == NXT_CONF_OBJECT)
1627 {
1628 ret = nxt_conf_map_object(mp, apcf.processes_value,
1629 nxt_router_app_processes_conf,
1630 nxt_nitems(nxt_router_app_processes_conf),
1631 &apcf);
1632 if (ret != NXT_OK) {
1633 nxt_alert(task, "application processes map error");
1634 goto app_fail;
1635 }
1636
1637 } else {
1638 apcf.max_processes = apcf.processes;
1639 apcf.spare_processes = apcf.processes;
1640 }
1641
1642 if (apcf.targets_value != NULL) {
1643 n = nxt_conf_object_members_count(apcf.targets_value);
1644
1645 targets = nxt_mp_get(app_mp, sizeof(nxt_str_t) * n);
1646 if (nxt_slow_path(targets == NULL)) {
1647 goto app_fail;
1648 }
1649
1650 next_target = 0;
1651
1652 for (i = 0; i < n; i++) {
1653 (void) nxt_conf_next_object_member(apcf.targets_value,
1654 &target, &next_target);
1655
1656 s = nxt_str_dup(app_mp, &targets[i], &target);
1657 if (nxt_slow_path(s == NULL)) {
1658 goto app_fail;
1659 }
1660 }
1661
1662 } else {
1663 targets = NULL;
1664 }
1665
1666 nxt_debug(task, "application type: %V", &apcf.type);
1667 nxt_debug(task, "application processes: %D", apcf.processes);
1668 nxt_debug(task, "application request timeout: %M", apcf.timeout);
1669
1670 lang = nxt_app_lang_module(task->thread->runtime, &apcf.type);
1671
1672 if (lang == NULL) {
1673 nxt_alert(task, "unknown application type: \"%V\"", &apcf.type);
1674 goto app_fail;
1675 }
1676
1677 nxt_debug(task, "application language module: \"%s\"", lang->file);
1678
1679 ret = nxt_thread_mutex_create(&app->mutex);
1680 if (ret != NXT_OK) {
1681 goto app_fail;
1682 }
1683
1684 nxt_queue_init(&app->ports);
1685 nxt_queue_init(&app->spare_ports);
1686 nxt_queue_init(&app->idle_ports);
1687 nxt_queue_init(&app->ack_waiting_req);
1688
1689 app->name.length = name.length;
1690 nxt_memcpy(app->name.start, name.start, name.length);
1691
1692 app->type = lang->type;
1693 app->max_processes = apcf.max_processes;
1694 app->spare_processes = apcf.spare_processes;
1695 app->max_pending_processes = apcf.spare_processes
1696 ? apcf.spare_processes : 1;
1697 app->timeout = apcf.timeout;
1698 app->idle_timeout = apcf.idle_timeout;
1699
1700 app->targets = targets;
1701
1702 engine = task->thread->engine;
1703
1704 app->engine = engine;
1705
1706 app->adjust_idle_work.handler = nxt_router_adjust_idle_timer;
1707 app->adjust_idle_work.task = &engine->task;
1708 app->adjust_idle_work.obj = app;
1709
1710 nxt_queue_insert_tail(&tmcf->apps, &app->link);
1711
1712 ret = nxt_router_apps_hash_add(rtcf, app);
1713 if (nxt_slow_path(ret != NXT_OK)) {
1714 goto app_fail;
1715 }
1716
1717 nxt_router_app_use(task, app, 1);
1718
1719 app->joint = app_joint;
1720
1721 app_joint->use_count = 1;
1722 app_joint->app = app;
1723
1724 app_joint->idle_timer.bias = NXT_TIMER_DEFAULT_BIAS;
1725 app_joint->idle_timer.work_queue = &engine->fast_work_queue;
1726 app_joint->idle_timer.handler = nxt_router_app_idle_timeout;
1727 app_joint->idle_timer.task = &engine->task;
1728 app_joint->idle_timer.log = app_joint->idle_timer.task->log;
1729
1730 app_joint->free_app_work.handler = nxt_router_free_app;
1731 app_joint->free_app_work.task = &engine->task;
1732 app_joint->free_app_work.obj = app_joint;
1733
1734 port = nxt_port_new(task, NXT_SHARED_PORT_ID, nxt_pid,
1735 NXT_PROCESS_APP);
1736 if (nxt_slow_path(port == NULL)) {
1737 return NXT_ERROR;
1738 }
1739
1740 ret = nxt_port_socket_init(task, port, 0);
1741 if (nxt_slow_path(ret != NXT_OK)) {
1742 nxt_port_use(task, port, -1);
1743 return NXT_ERROR;
1744 }
1745
1746 ret = nxt_router_app_queue_init(task, port);
1747 if (nxt_slow_path(ret != NXT_OK)) {
1748 nxt_port_write_close(port);
1749 nxt_port_read_close(port);
1750 nxt_port_use(task, port, -1);
1751 return NXT_ERROR;
1752 }
1753
1754 nxt_port_write_enable(task, port);
1755 port->app = app;
1756
1757 app->shared_port = port;
1758
1759 nxt_thread_mutex_create(&app->outgoing.mutex);
1760 }
1761 }
1762
1763 conf = nxt_conf_get_path(root, &routes_path);
1764 if (nxt_fast_path(conf != NULL)) {
1765 routes = nxt_http_routes_create(task, tmcf, conf);
1766 if (nxt_slow_path(routes == NULL)) {
1767 return NXT_ERROR;
1768 }
1769
1770 rtcf->routes = routes;
1771 }
1772
1773 ret = nxt_upstreams_create(task, tmcf, root);
1774 if (nxt_slow_path(ret != NXT_OK)) {
1775 return ret;
1776 }
1777
1778 http = nxt_conf_get_path(root, &http_path);
1779 #if 0
1780 if (http == NULL) {
1781 nxt_alert(task, "no \"http\" block");
1782 return NXT_ERROR;
1783 }
1784 #endif
1785
1786 websocket = nxt_conf_get_path(root, &websocket_path);
1787
1788 listeners = nxt_conf_get_path(root, &listeners_path);
1789
1790 if (listeners != NULL) {
1791 next = 0;
1792
1793 for ( ;; ) {
1794 listener = nxt_conf_next_object_member(listeners, &name, &next);
1795 if (listener == NULL) {
1796 break;
1797 }
1798
1799 skcf = nxt_router_socket_conf(task, tmcf, &name);
1800 if (skcf == NULL) {
1801 goto fail;
1802 }
1803
1804 nxt_memzero(&lscf, sizeof(lscf));
1805
1806 ret = nxt_conf_map_object(mp, listener, nxt_router_listener_conf,
1807 nxt_nitems(nxt_router_listener_conf),
1808 &lscf);
1809 if (ret != NXT_OK) {
1810 nxt_alert(task, "listener map error");
1811 goto fail;
1812 }
1813
1814 nxt_debug(task, "application: %V", &lscf.application);
1815
1816 // STUB, default values if http block is not defined.
1817 skcf->header_buffer_size = 2048;
1818 skcf->large_header_buffer_size = 8192;
1819 skcf->large_header_buffers = 4;
1820 skcf->discard_unsafe_fields = 1;
1821 skcf->body_buffer_size = 16 * 1024;
1822 skcf->max_body_size = 8 * 1024 * 1024;
1823 skcf->proxy_header_buffer_size = 64 * 1024;
1824 skcf->proxy_buffer_size = 4096;
1825 skcf->proxy_buffers = 256;
1826 skcf->idle_timeout = 180 * 1000;
1827 skcf->header_read_timeout = 30 * 1000;
1828 skcf->body_read_timeout = 30 * 1000;
1829 skcf->send_timeout = 30 * 1000;
1830 skcf->proxy_timeout = 60 * 1000;
1831 skcf->proxy_send_timeout = 30 * 1000;
1832 skcf->proxy_read_timeout = 30 * 1000;
1833
1834 skcf->websocket_conf.max_frame_size = 1024 * 1024;
1835 skcf->websocket_conf.read_timeout = 60 * 1000;
1836 skcf->websocket_conf.keepalive_interval = 30 * 1000;
1837
1838 nxt_str_null(&skcf->body_temp_path);
1839
1840 if (http != NULL) {
1841 ret = nxt_conf_map_object(mp, http, nxt_router_http_conf,
1842 nxt_nitems(nxt_router_http_conf),
1843 skcf);
1844 if (ret != NXT_OK) {
1845 nxt_alert(task, "http map error");
1846 goto fail;
1847 }
1848 }
1849
1850 if (websocket != NULL) {
1851 ret = nxt_conf_map_object(mp, websocket,
1852 nxt_router_websocket_conf,
1853 nxt_nitems(nxt_router_websocket_conf),
1854 &skcf->websocket_conf);
1855 if (ret != NXT_OK) {
1856 nxt_alert(task, "websocket map error");
1857 goto fail;
1858 }
1859 }
1860
1861 t = &skcf->body_temp_path;
1862
1863 if (t->length == 0) {
1864 t->start = (u_char *) task->thread->runtime->tmp;
1865 t->length = nxt_strlen(t->start);
1866 }
1867
1868 conf = nxt_conf_get_path(listener, &forwarded_path);
1869
1870 if (conf != NULL) {
1871 skcf->forwarded = nxt_router_conf_forward(task, mp, conf);
1872 if (nxt_slow_path(skcf->forwarded == NULL)) {
1873 return NXT_ERROR;
1874 }
1875 }
1876
1877 conf = nxt_conf_get_path(listener, &client_ip_path);
1878
1879 if (conf != NULL) {
1880 skcf->client_ip = nxt_router_conf_forward(task, mp, conf);
1881 if (nxt_slow_path(skcf->client_ip == NULL)) {
1882 return NXT_ERROR;
1883 }
1884 }
1885
1886 #if (NXT_TLS)
1887 certificate = nxt_conf_get_path(listener, &certificate_path);
1888
1889 if (certificate != NULL) {
1890 tls_init = nxt_mp_get(tmcf->mem_pool, sizeof(nxt_tls_init_t));
1891 if (nxt_slow_path(tls_init == NULL)) {
1892 return NXT_ERROR;
1893 }
1894
1895 tls_init->cache_size = 0;
1896 tls_init->timeout = 300;
1897
1898 value = nxt_conf_get_path(listener, &conf_cache_path);
1899 if (value != NULL) {
1900 tls_init->cache_size = nxt_conf_get_number(value);
1901 }
1902
1903 value = nxt_conf_get_path(listener, &conf_timeout_path);
1904 if (value != NULL) {
1905 tls_init->timeout = nxt_conf_get_number(value);
1906 }
1907
1908 tls_init->conf_cmds = nxt_conf_get_path(listener,
1909 &conf_commands_path);
1910
1911 tls_init->tickets_conf = nxt_conf_get_path(listener,
1912 &conf_tickets);
1913
1914 n = nxt_conf_array_elements_count_or_1(certificate);
1915
1916 for (i = 0; i < n; i++) {
1917 value = nxt_conf_get_array_element_or_itself(certificate,
1918 i);
1919 nxt_assert(value != NULL);
1920
1921 ret = nxt_router_conf_tls_insert(tmcf, value, skcf,
1922 tls_init, i == 0);
1923 if (nxt_slow_path(ret != NXT_OK)) {
1924 goto fail;
1925 }
1926 }
1927 }
1928 #endif
1929
1930 skcf->listen->handler = nxt_http_conn_init;
1931 skcf->router_conf = rtcf;
1932 skcf->router_conf->count++;
1933
1934 if (lscf.pass.length != 0) {
1935 skcf->action = nxt_http_action_create(task, tmcf, &lscf.pass);
1936
1937 /* COMPATIBILITY: listener application. */
1938 } else if (lscf.application.length > 0) {
1939 skcf->action = nxt_http_pass_application(task, rtcf,
1940 &lscf.application);
1941 }
1942
1943 if (nxt_slow_path(skcf->action == NULL)) {
1944 goto fail;
1945 }
1946 }
1947 }
1948
1949 ret = nxt_http_routes_resolve(task, tmcf);
1950 if (nxt_slow_path(ret != NXT_OK)) {
1951 goto fail;
1952 }
1953
1954 value = nxt_conf_get_path(root, &access_log_path);
1955
1956 if (value != NULL) {
1957 ret = nxt_router_access_log_create(task, rtcf, value);
1958 if (nxt_slow_path(ret != NXT_OK)) {
1959 goto fail;
1960 }
1961 }
1962
1963 nxt_queue_add(&deleting_sockets, &router->sockets);
1964 nxt_queue_init(&router->sockets);
1965
1966 return NXT_OK;
1967
1968 app_fail:
1969
1970 nxt_mp_destroy(app_mp);
1971
1972 fail:
1973
1974 nxt_queue_each(app, &tmcf->apps, nxt_app_t, link) {
1975
1976 nxt_queue_remove(&app->link);
1977 nxt_thread_mutex_destroy(&app->mutex);
1978 nxt_mp_destroy(app->mem_pool);
1979
1980 } nxt_queue_loop;
1981
1982 return NXT_ERROR;
1983 }
1984
1985
1986 #if (NXT_TLS)
1987
1988 static nxt_int_t
nxt_router_conf_tls_insert(nxt_router_temp_conf_t * tmcf,nxt_conf_value_t * value,nxt_socket_conf_t * skcf,nxt_tls_init_t * tls_init,nxt_bool_t last)1989 nxt_router_conf_tls_insert(nxt_router_temp_conf_t *tmcf,
1990 nxt_conf_value_t *value, nxt_socket_conf_t *skcf,
1991 nxt_tls_init_t *tls_init, nxt_bool_t last)
1992 {
1993 nxt_router_tlssock_t *tls;
1994
1995 tls = nxt_mp_get(tmcf->mem_pool, sizeof(nxt_router_tlssock_t));
1996 if (nxt_slow_path(tls == NULL)) {
1997 return NXT_ERROR;
1998 }
1999
2000 tls->tls_init = tls_init;
2001 tls->socket_conf = skcf;
2002 tls->temp_conf = tmcf;
2003 tls->last = last;
2004 nxt_conf_get_string(value, &tls->name);
2005
2006 nxt_queue_insert_tail(&tmcf->tls, &tls->link);
2007
2008 return NXT_OK;
2009 }
2010
2011 #endif
2012
2013
2014 static nxt_int_t
nxt_router_conf_process_static(nxt_task_t * task,nxt_router_conf_t * rtcf,nxt_conf_value_t * conf)2015 nxt_router_conf_process_static(nxt_task_t *task, nxt_router_conf_t *rtcf,
2016 nxt_conf_value_t *conf)
2017 {
2018 uint32_t next, i;
2019 nxt_mp_t *mp;
2020 nxt_str_t *type, exten, str;
2021 nxt_int_t ret;
2022 nxt_uint_t exts;
2023 nxt_conf_value_t *mtypes_conf, *ext_conf, *value;
2024
2025 static nxt_str_t mtypes_path = nxt_string("/mime_types");
2026
2027 mp = rtcf->mem_pool;
2028
2029 ret = nxt_http_static_mtypes_init(mp, &rtcf->mtypes_hash);
2030 if (nxt_slow_path(ret != NXT_OK)) {
2031 return NXT_ERROR;
2032 }
2033
2034 if (conf == NULL) {
2035 return NXT_OK;
2036 }
2037
2038 mtypes_conf = nxt_conf_get_path(conf, &mtypes_path);
2039
2040 if (mtypes_conf != NULL) {
2041 next = 0;
2042
2043 for ( ;; ) {
2044 ext_conf = nxt_conf_next_object_member(mtypes_conf, &str, &next);
2045
2046 if (ext_conf == NULL) {
2047 break;
2048 }
2049
2050 type = nxt_str_dup(mp, NULL, &str);
2051 if (nxt_slow_path(type == NULL)) {
2052 return NXT_ERROR;
2053 }
2054
2055 if (nxt_conf_type(ext_conf) == NXT_CONF_STRING) {
2056 nxt_conf_get_string(ext_conf, &str);
2057
2058 if (nxt_slow_path(nxt_str_dup(mp, &exten, &str) == NULL)) {
2059 return NXT_ERROR;
2060 }
2061
2062 ret = nxt_http_static_mtypes_hash_add(mp, &rtcf->mtypes_hash,
2063 &exten, type);
2064 if (nxt_slow_path(ret != NXT_OK)) {
2065 return NXT_ERROR;
2066 }
2067
2068 continue;
2069 }
2070
2071 exts = nxt_conf_array_elements_count(ext_conf);
2072
2073 for (i = 0; i < exts; i++) {
2074 value = nxt_conf_get_array_element(ext_conf, i);
2075
2076 nxt_conf_get_string(value, &str);
2077
2078 if (nxt_slow_path(nxt_str_dup(mp, &exten, &str) == NULL)) {
2079 return NXT_ERROR;
2080 }
2081
2082 ret = nxt_http_static_mtypes_hash_add(mp, &rtcf->mtypes_hash,
2083 &exten, type);
2084 if (nxt_slow_path(ret != NXT_OK)) {
2085 return NXT_ERROR;
2086 }
2087 }
2088 }
2089 }
2090
2091 return NXT_OK;
2092 }
2093
2094
2095 static nxt_http_forward_t *
nxt_router_conf_forward(nxt_task_t * task,nxt_mp_t * mp,nxt_conf_value_t * conf)2096 nxt_router_conf_forward(nxt_task_t *task, nxt_mp_t *mp, nxt_conf_value_t *conf)
2097 {
2098 nxt_int_t ret;
2099 nxt_conf_value_t *header_conf, *client_ip_conf, *protocol_conf;
2100 nxt_conf_value_t *source_conf, *recursive_conf;
2101 nxt_http_forward_t *forward;
2102 nxt_http_route_addr_rule_t *source;
2103
2104 static nxt_str_t header_path = nxt_string("/header");
2105 static nxt_str_t client_ip_path = nxt_string("/client_ip");
2106 static nxt_str_t protocol_path = nxt_string("/protocol");
2107 static nxt_str_t source_path = nxt_string("/source");
2108 static nxt_str_t recursive_path = nxt_string("/recursive");
2109
2110 header_conf = nxt_conf_get_path(conf, &header_path);
2111
2112 if (header_conf != NULL) {
2113 client_ip_conf = nxt_conf_get_path(conf, &header_path);
2114 protocol_conf = NULL;
2115
2116 } else {
2117 client_ip_conf = nxt_conf_get_path(conf, &client_ip_path);
2118 protocol_conf = nxt_conf_get_path(conf, &protocol_path);
2119 }
2120
2121 source_conf = nxt_conf_get_path(conf, &source_path);
2122 recursive_conf = nxt_conf_get_path(conf, &recursive_path);
2123
2124 if (source_conf == NULL
2125 || (protocol_conf == NULL && client_ip_conf == NULL))
2126 {
2127 return NULL;
2128 }
2129
2130 forward = nxt_mp_zget(mp, sizeof(nxt_http_forward_t));
2131 if (nxt_slow_path(forward == NULL)) {
2132 return NULL;
2133 }
2134
2135 source = nxt_http_route_addr_rule_create(task, mp, source_conf);
2136 if (nxt_slow_path(source == NULL)) {
2137 return NULL;
2138 }
2139
2140 forward->source = source;
2141
2142 if (recursive_conf != NULL) {
2143 forward->recursive = nxt_conf_get_boolean(recursive_conf);
2144 }
2145
2146 if (client_ip_conf != NULL) {
2147 ret = nxt_router_conf_forward_header(mp, client_ip_conf,
2148 &forward->client_ip);
2149 if (nxt_slow_path(ret != NXT_OK)) {
2150 return NULL;
2151 }
2152 }
2153
2154 if (protocol_conf != NULL) {
2155 ret = nxt_router_conf_forward_header(mp, protocol_conf,
2156 &forward->protocol);
2157 if (nxt_slow_path(ret != NXT_OK)) {
2158 return NULL;
2159 }
2160 }
2161
2162 return forward;
2163 }
2164
2165
2166 static nxt_int_t
nxt_router_conf_forward_header(nxt_mp_t * mp,nxt_conf_value_t * conf,nxt_http_forward_header_t * fh)2167 nxt_router_conf_forward_header(nxt_mp_t *mp, nxt_conf_value_t *conf,
2168 nxt_http_forward_header_t *fh)
2169 {
2170 char c;
2171 size_t i;
2172 uint32_t hash;
2173 nxt_str_t header;
2174
2175 nxt_conf_get_string(conf, &header);
2176
2177 fh->header = nxt_str_dup(mp, NULL, &header);
2178 if (nxt_slow_path(fh->header == NULL)) {
2179 return NXT_ERROR;
2180 }
2181
2182 hash = NXT_HTTP_FIELD_HASH_INIT;
2183
2184 for (i = 0; i < fh->header->length; i++) {
2185 c = fh->header->start[i];
2186 hash = nxt_http_field_hash_char(hash, nxt_lowcase(c));
2187 }
2188
2189 hash = nxt_http_field_hash_end(hash) & 0xFFFF;
2190
2191 fh->header_hash = hash;
2192
2193 return NXT_OK;
2194 }
2195
2196
2197 static nxt_app_t *
nxt_router_app_find(nxt_queue_t * queue,nxt_str_t * name)2198 nxt_router_app_find(nxt_queue_t *queue, nxt_str_t *name)
2199 {
2200 nxt_app_t *app;
2201
2202 nxt_queue_each(app, queue, nxt_app_t, link) {
2203
2204 if (nxt_strstr_eq(name, &app->name)) {
2205 return app;
2206 }
2207
2208 } nxt_queue_loop;
2209
2210 return NULL;
2211 }
2212
2213
2214 static nxt_int_t
nxt_router_app_queue_init(nxt_task_t * task,nxt_port_t * port)2215 nxt_router_app_queue_init(nxt_task_t *task, nxt_port_t *port)
2216 {
2217 void *mem;
2218 nxt_int_t fd;
2219
2220 fd = nxt_shm_open(task, sizeof(nxt_app_queue_t));
2221 if (nxt_slow_path(fd == -1)) {
2222 return NXT_ERROR;
2223 }
2224
2225 mem = nxt_mem_mmap(NULL, sizeof(nxt_app_queue_t),
2226 PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
2227 if (nxt_slow_path(mem == MAP_FAILED)) {
2228 nxt_fd_close(fd);
2229
2230 return NXT_ERROR;
2231 }
2232
2233 nxt_app_queue_init(mem);
2234
2235 port->queue_fd = fd;
2236 port->queue = mem;
2237
2238 return NXT_OK;
2239 }
2240
2241
2242 static nxt_int_t
nxt_router_port_queue_init(nxt_task_t * task,nxt_port_t * port)2243 nxt_router_port_queue_init(nxt_task_t *task, nxt_port_t *port)
2244 {
2245 void *mem;
2246 nxt_int_t fd;
2247
2248 fd = nxt_shm_open(task, sizeof(nxt_port_queue_t));
2249 if (nxt_slow_path(fd == -1)) {
2250 return NXT_ERROR;
2251 }
2252
2253 mem = nxt_mem_mmap(NULL, sizeof(nxt_port_queue_t),
2254 PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
2255 if (nxt_slow_path(mem == MAP_FAILED)) {
2256 nxt_fd_close(fd);
2257
2258 return NXT_ERROR;
2259 }
2260
2261 nxt_port_queue_init(mem);
2262
2263 port->queue_fd = fd;
2264 port->queue = mem;
2265
2266 return NXT_OK;
2267 }
2268
2269
2270 static nxt_int_t
nxt_router_port_queue_map(nxt_task_t * task,nxt_port_t * port,nxt_fd_t fd)2271 nxt_router_port_queue_map(nxt_task_t *task, nxt_port_t *port, nxt_fd_t fd)
2272 {
2273 void *mem;
2274
2275 nxt_assert(fd != -1);
2276
2277 mem = nxt_mem_mmap(NULL, sizeof(nxt_port_queue_t),
2278 PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
2279 if (nxt_slow_path(mem == MAP_FAILED)) {
2280
2281 return NXT_ERROR;
2282 }
2283
2284 port->queue = mem;
2285
2286 return NXT_OK;
2287 }
2288
2289
2290 static const nxt_lvlhsh_proto_t nxt_router_apps_hash_proto nxt_aligned(64) = {
2291 NXT_LVLHSH_DEFAULT,
2292 nxt_router_apps_hash_test,
2293 nxt_mp_lvlhsh_alloc,
2294 nxt_mp_lvlhsh_free,
2295 };
2296
2297
2298 static nxt_int_t
nxt_router_apps_hash_test(nxt_lvlhsh_query_t * lhq,void * data)2299 nxt_router_apps_hash_test(nxt_lvlhsh_query_t *lhq, void *data)
2300 {
2301 nxt_app_t *app;
2302
2303 app = data;
2304
2305 return nxt_strstr_eq(&lhq->key, &app->name) ? NXT_OK : NXT_DECLINED;
2306 }
2307
2308
2309 static nxt_int_t
nxt_router_apps_hash_add(nxt_router_conf_t * rtcf,nxt_app_t * app)2310 nxt_router_apps_hash_add(nxt_router_conf_t *rtcf, nxt_app_t *app)
2311 {
2312 nxt_lvlhsh_query_t lhq;
2313
2314 lhq.key_hash = nxt_djb_hash(app->name.start, app->name.length);
2315 lhq.replace = 0;
2316 lhq.key = app->name;
2317 lhq.value = app;
2318 lhq.proto = &nxt_router_apps_hash_proto;
2319 lhq.pool = rtcf->mem_pool;
2320
2321 switch (nxt_lvlhsh_insert(&rtcf->apps_hash, &lhq)) {
2322
2323 case NXT_OK:
2324 return NXT_OK;
2325
2326 case NXT_DECLINED:
2327 nxt_thread_log_alert("router app hash adding failed: "
2328 "\"%V\" is already in hash", &lhq.key);
2329 /* Fall through. */
2330 default:
2331 return NXT_ERROR;
2332 }
2333 }
2334
2335
2336 static nxt_app_t *
nxt_router_apps_hash_get(nxt_router_conf_t * rtcf,nxt_str_t * name)2337 nxt_router_apps_hash_get(nxt_router_conf_t *rtcf, nxt_str_t *name)
2338 {
2339 nxt_lvlhsh_query_t lhq;
2340
2341 lhq.key_hash = nxt_djb_hash(name->start, name->length);
2342 lhq.key = *name;
2343 lhq.proto = &nxt_router_apps_hash_proto;
2344
2345 if (nxt_lvlhsh_find(&rtcf->apps_hash, &lhq) != NXT_OK) {
2346 return NULL;
2347 }
2348
2349 return lhq.value;
2350 }
2351
2352
2353 static void
nxt_router_apps_hash_use(nxt_task_t * task,nxt_router_conf_t * rtcf,int i)2354 nxt_router_apps_hash_use(nxt_task_t *task, nxt_router_conf_t *rtcf, int i)
2355 {
2356 nxt_app_t *app;
2357 nxt_lvlhsh_each_t lhe;
2358
2359 nxt_lvlhsh_each_init(&lhe, &nxt_router_apps_hash_proto);
2360
2361 for ( ;; ) {
2362 app = nxt_lvlhsh_each(&rtcf->apps_hash, &lhe);
2363
2364 if (app == NULL) {
2365 break;
2366 }
2367
2368 nxt_router_app_use(task, app, i);
2369 }
2370 }
2371
2372
2373 typedef struct {
2374 nxt_app_t *app;
2375 nxt_int_t target;
2376 } nxt_http_app_conf_t;
2377
2378
2379 nxt_int_t
nxt_router_application_init(nxt_router_conf_t * rtcf,nxt_str_t * name,nxt_str_t * target,nxt_http_action_t * action)2380 nxt_router_application_init(nxt_router_conf_t *rtcf, nxt_str_t *name,
2381 nxt_str_t *target, nxt_http_action_t *action)
2382 {
2383 nxt_app_t *app;
2384 nxt_str_t *targets;
2385 nxt_uint_t i;
2386 nxt_http_app_conf_t *conf;
2387
2388 app = nxt_router_apps_hash_get(rtcf, name);
2389 if (app == NULL) {
2390 return NXT_DECLINED;
2391 }
2392
2393 conf = nxt_mp_get(rtcf->mem_pool, sizeof(nxt_http_app_conf_t));
2394 if (nxt_slow_path(conf == NULL)) {
2395 return NXT_ERROR;
2396 }
2397
2398 action->handler = nxt_http_application_handler;
2399 action->u.conf = conf;
2400
2401 conf->app = app;
2402
2403 if (target != NULL && target->length != 0) {
2404 targets = app->targets;
2405
2406 for (i = 0; !nxt_strstr_eq(target, &targets[i]); i++);
2407
2408 conf->target = i;
2409
2410 } else {
2411 conf->target = 0;
2412 }
2413
2414 return NXT_OK;
2415 }
2416
2417
2418 static nxt_socket_conf_t *
nxt_router_socket_conf(nxt_task_t * task,nxt_router_temp_conf_t * tmcf,nxt_str_t * name)2419 nxt_router_socket_conf(nxt_task_t *task, nxt_router_temp_conf_t *tmcf,
2420 nxt_str_t *name)
2421 {
2422 size_t size;
2423 nxt_int_t ret;
2424 nxt_bool_t wildcard;
2425 nxt_sockaddr_t *sa;
2426 nxt_socket_conf_t *skcf;
2427 nxt_listen_socket_t *ls;
2428
2429 sa = nxt_sockaddr_parse(tmcf->mem_pool, name);
2430 if (nxt_slow_path(sa == NULL)) {
2431 nxt_alert(task, "invalid listener \"%V\"", name);
2432 return NULL;
2433 }
2434
2435 sa->type = SOCK_STREAM;
2436
2437 nxt_debug(task, "router listener: \"%*s\"",
2438 (size_t) sa->length, nxt_sockaddr_start(sa));
2439
2440 skcf = nxt_mp_zget(tmcf->router_conf->mem_pool, sizeof(nxt_socket_conf_t));
2441 if (nxt_slow_path(skcf == NULL)) {
2442 return NULL;
2443 }
2444
2445 size = nxt_sockaddr_size(sa);
2446
2447 ret = nxt_router_listen_socket_find(tmcf, skcf, sa);
2448
2449 if (ret != NXT_OK) {
2450
2451 ls = nxt_zalloc(sizeof(nxt_listen_socket_t) + size);
2452 if (nxt_slow_path(ls == NULL)) {
2453 return NULL;
2454 }
2455
2456 skcf->listen = ls;
2457
2458 ls->sockaddr = nxt_pointer_to(ls, sizeof(nxt_listen_socket_t));
2459 nxt_memcpy(ls->sockaddr, sa, size);
2460
2461 nxt_listen_socket_remote_size(ls);
2462
2463 ls->socket = -1;
2464 ls->backlog = NXT_LISTEN_BACKLOG;
2465 ls->flags = NXT_NONBLOCK;
2466 ls->read_after_accept = 1;
2467 }
2468
2469 switch (sa->u.sockaddr.sa_family) {
2470 #if (NXT_HAVE_UNIX_DOMAIN)
2471 case AF_UNIX:
2472 wildcard = 0;
2473 break;
2474 #endif
2475 #if (NXT_INET6)
2476 case AF_INET6:
2477 wildcard = IN6_IS_ADDR_UNSPECIFIED(&sa->u.sockaddr_in6.sin6_addr);
2478 break;
2479 #endif
2480 case AF_INET:
2481 default:
2482 wildcard = (sa->u.sockaddr_in.sin_addr.s_addr == INADDR_ANY);
2483 break;
2484 }
2485
2486 if (!wildcard) {
2487 skcf->sockaddr = nxt_mp_zget(tmcf->router_conf->mem_pool, size);
2488 if (nxt_slow_path(skcf->sockaddr == NULL)) {
2489 return NULL;
2490 }
2491
2492 nxt_memcpy(skcf->sockaddr, sa, size);
2493 }
2494
2495 return skcf;
2496 }
2497
2498
2499 static nxt_int_t
nxt_router_listen_socket_find(nxt_router_temp_conf_t * tmcf,nxt_socket_conf_t * nskcf,nxt_sockaddr_t * sa)2500 nxt_router_listen_socket_find(nxt_router_temp_conf_t *tmcf,
2501 nxt_socket_conf_t *nskcf, nxt_sockaddr_t *sa)
2502 {
2503 nxt_router_t *router;
2504 nxt_queue_link_t *qlk;
2505 nxt_socket_conf_t *skcf;
2506
2507 router = tmcf->router_conf->router;
2508
2509 for (qlk = nxt_queue_first(&router->sockets);
2510 qlk != nxt_queue_tail(&router->sockets);
2511 qlk = nxt_queue_next(qlk))
2512 {
2513 skcf = nxt_queue_link_data(qlk, nxt_socket_conf_t, link);
2514
2515 if (nxt_sockaddr_cmp(skcf->listen->sockaddr, sa)) {
2516 nskcf->listen = skcf->listen;
2517
2518 nxt_queue_remove(qlk);
2519 nxt_queue_insert_tail(&keeping_sockets, qlk);
2520
2521 nxt_queue_insert_tail(&updating_sockets, &nskcf->link);
2522
2523 return NXT_OK;
2524 }
2525 }
2526
2527 nxt_queue_insert_tail(&pending_sockets, &nskcf->link);
2528
2529 return NXT_DECLINED;
2530 }
2531
2532
2533 static void
nxt_router_listen_socket_rpc_create(nxt_task_t * task,nxt_router_temp_conf_t * tmcf,nxt_socket_conf_t * skcf)2534 nxt_router_listen_socket_rpc_create(nxt_task_t *task,
2535 nxt_router_temp_conf_t *tmcf, nxt_socket_conf_t *skcf)
2536 {
2537 size_t size;
2538 uint32_t stream;
2539 nxt_int_t ret;
2540 nxt_buf_t *b;
2541 nxt_port_t *main_port, *router_port;
2542 nxt_runtime_t *rt;
2543 nxt_socket_rpc_t *rpc;
2544
2545 rpc = nxt_mp_alloc(tmcf->mem_pool, sizeof(nxt_socket_rpc_t));
2546 if (rpc == NULL) {
2547 goto fail;
2548 }
2549
2550 rpc->socket_conf = skcf;
2551 rpc->temp_conf = tmcf;
2552
2553 size = nxt_sockaddr_size(skcf->listen->sockaddr);
2554
2555 b = nxt_buf_mem_alloc(tmcf->mem_pool, size, 0);
2556 if (b == NULL) {
2557 goto fail;
2558 }
2559
2560 b->completion_handler = nxt_buf_dummy_completion;
2561
2562 b->mem.free = nxt_cpymem(b->mem.free, skcf->listen->sockaddr, size);
2563
2564 rt = task->thread->runtime;
2565 main_port = rt->port_by_type[NXT_PROCESS_MAIN];
2566 router_port = rt->port_by_type[NXT_PROCESS_ROUTER];
2567
2568 stream = nxt_port_rpc_register_handler(task, router_port,
2569 nxt_router_listen_socket_ready,
2570 nxt_router_listen_socket_error,
2571 main_port->pid, rpc);
2572 if (nxt_slow_path(stream == 0)) {
2573 goto fail;
2574 }
2575
2576 ret = nxt_port_socket_write(task, main_port, NXT_PORT_MSG_SOCKET, -1,
2577 stream, router_port->id, b);
2578
2579 if (nxt_slow_path(ret != NXT_OK)) {
2580 nxt_port_rpc_cancel(task, router_port, stream);
2581 goto fail;
2582 }
2583
2584 return;
2585
2586 fail:
2587
2588 nxt_router_conf_error(task, tmcf);
2589 }
2590
2591
2592 static void
nxt_router_listen_socket_ready(nxt_task_t * task,nxt_port_recv_msg_t * msg,void * data)2593 nxt_router_listen_socket_ready(nxt_task_t *task, nxt_port_recv_msg_t *msg,
2594 void *data)
2595 {
2596 nxt_int_t ret;
2597 nxt_socket_t s;
2598 nxt_socket_rpc_t *rpc;
2599
2600 rpc = data;
2601
2602 s = msg->fd[0];
2603
2604 ret = nxt_socket_nonblocking(task, s);
2605 if (nxt_slow_path(ret != NXT_OK)) {
2606 goto fail;
2607 }
2608
2609 nxt_socket_defer_accept(task, s, rpc->socket_conf->listen->sockaddr);
2610
2611 ret = nxt_listen_socket(task, s, NXT_LISTEN_BACKLOG);
2612 if (nxt_slow_path(ret != NXT_OK)) {
2613 goto fail;
2614 }
2615
2616 rpc->socket_conf->listen->socket = s;
2617
2618 nxt_work_queue_add(&task->thread->engine->fast_work_queue,
2619 nxt_router_conf_apply, task, rpc->temp_conf, NULL);
2620
2621 return;
2622
2623 fail:
2624
2625 nxt_socket_close(task, s);
2626
2627 nxt_router_conf_error(task, rpc->temp_conf);
2628 }
2629
2630
2631 static void
nxt_router_listen_socket_error(nxt_task_t * task,nxt_port_recv_msg_t * msg,void * data)2632 nxt_router_listen_socket_error(nxt_task_t *task, nxt_port_recv_msg_t *msg,
2633 void *data)
2634 {
2635 nxt_socket_rpc_t *rpc;
2636 nxt_router_temp_conf_t *tmcf;
2637
2638 rpc = data;
2639 tmcf = rpc->temp_conf;
2640
2641 #if 0
2642 u_char *p;
2643 size_t size;
2644 uint8_t error;
2645 nxt_buf_t *in, *out;
2646 nxt_sockaddr_t *sa;
2647
2648 static nxt_str_t socket_errors[] = {
2649 nxt_string("ListenerSystem"),
2650 nxt_string("ListenerNoIPv6"),
2651 nxt_string("ListenerPort"),
2652 nxt_string("ListenerInUse"),
2653 nxt_string("ListenerNoAddress"),
2654 nxt_string("ListenerNoAccess"),
2655 nxt_string("ListenerPath"),
2656 };
2657
2658 sa = rpc->socket_conf->listen->sockaddr;
2659
2660 in = nxt_buf_chk_make_plain(tmcf->mem_pool, msg->buf, msg->size);
2661
2662 if (nxt_slow_path(in == NULL)) {
2663 return;
2664 }
2665
2666 p = in->mem.pos;
2667
2668 error = *p++;
2669
2670 size = nxt_length("listen socket error: ")
2671 + nxt_length("{listener: \"\", code:\"\", message: \"\"}")
2672 + sa->length + socket_errors[error].length + (in->mem.free - p);
2673
2674 out = nxt_buf_mem_alloc(tmcf->mem_pool, size, 0);
2675 if (nxt_slow_path(out == NULL)) {
2676 return;
2677 }
2678
2679 out->mem.free = nxt_sprintf(out->mem.free, out->mem.end,
2680 "listen socket error: "
2681 "{listener: \"%*s\", code:\"%V\", message: \"%*s\"}",
2682 (size_t) sa->length, nxt_sockaddr_start(sa),
2683 &socket_errors[error], in->mem.free - p, p);
2684
2685 nxt_debug(task, "%*s", out->mem.free - out->mem.pos, out->mem.pos);
2686 #endif
2687
2688 nxt_router_conf_error(task, tmcf);
2689 }
2690
2691
2692 #if (NXT_TLS)
2693
2694 static void
nxt_router_tls_rpc_handler(nxt_task_t * task,nxt_port_recv_msg_t * msg,void * data)2695 nxt_router_tls_rpc_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg,
2696 void *data)
2697 {
2698 nxt_mp_t *mp;
2699 nxt_int_t ret;
2700 nxt_tls_conf_t *tlscf;
2701 nxt_router_tlssock_t *tls;
2702 nxt_tls_bundle_conf_t *bundle;
2703 nxt_router_temp_conf_t *tmcf;
2704
2705 nxt_debug(task, "tls rpc handler");
2706
2707 tls = data;
2708 tmcf = tls->temp_conf;
2709
2710 if (msg == NULL || msg->port_msg.type == _NXT_PORT_MSG_RPC_ERROR) {
2711 goto fail;
2712 }
2713
2714 mp = tmcf->router_conf->mem_pool;
2715
2716 if (tls->socket_conf->tls == NULL){
2717 tlscf = nxt_mp_zget(mp, sizeof(nxt_tls_conf_t));
2718 if (nxt_slow_path(tlscf == NULL)) {
2719 goto fail;
2720 }
2721
2722 tlscf->no_wait_shutdown = 1;
2723 tls->socket_conf->tls = tlscf;
2724
2725 } else {
2726 tlscf = tls->socket_conf->tls;
2727 }
2728
2729 tls->tls_init->conf = tlscf;
2730
2731 bundle = nxt_mp_get(mp, sizeof(nxt_tls_bundle_conf_t));
2732 if (nxt_slow_path(bundle == NULL)) {
2733 goto fail;
2734 }
2735
2736 if (nxt_slow_path(nxt_str_dup(mp, &bundle->name, &tls->name) == NULL)) {
2737 goto fail;
2738 }
2739
2740 bundle->chain_file = msg->fd[0];
2741 bundle->next = tlscf->bundle;
2742 tlscf->bundle = bundle;
2743
2744 ret = task->thread->runtime->tls->server_init(task, mp, tls->tls_init,
2745 tls->last);
2746 if (nxt_slow_path(ret != NXT_OK)) {
2747 goto fail;
2748 }
2749
2750 nxt_work_queue_add(&task->thread->engine->fast_work_queue,
2751 nxt_router_conf_apply, task, tmcf, NULL);
2752 return;
2753
2754 fail:
2755
2756 nxt_router_conf_error(task, tmcf);
2757 }
2758
2759 #endif
2760
2761
2762 static void
nxt_router_app_rpc_create(nxt_task_t * task,nxt_router_temp_conf_t * tmcf,nxt_app_t * app)2763 nxt_router_app_rpc_create(nxt_task_t *task,
2764 nxt_router_temp_conf_t *tmcf, nxt_app_t *app)
2765 {
2766 size_t size;
2767 uint32_t stream;
2768 nxt_fd_t port_fd, queue_fd;
2769 nxt_int_t ret;
2770 nxt_buf_t *b;
2771 nxt_port_t *router_port, *dport;
2772 nxt_runtime_t *rt;
2773 nxt_app_rpc_t *rpc;
2774
2775 rt = task->thread->runtime;
2776
2777 dport = app->proto_port;
2778
2779 if (dport == NULL) {
2780 nxt_debug(task, "app '%V' prototype prefork", &app->name);
2781
2782 size = app->name.length + 1 + app->conf.length;
2783
2784 b = nxt_buf_mem_alloc(tmcf->mem_pool, size, 0);
2785 if (nxt_slow_path(b == NULL)) {
2786 goto fail;
2787 }
2788
2789 b->completion_handler = nxt_buf_dummy_completion;
2790
2791 nxt_buf_cpystr(b, &app->name);
2792 *b->mem.free++ = '\0';
2793 nxt_buf_cpystr(b, &app->conf);
2794
2795 dport = rt->port_by_type[NXT_PROCESS_MAIN];
2796
2797 port_fd = app->shared_port->pair[0];
2798 queue_fd = app->shared_port->queue_fd;
2799
2800 } else {
2801 nxt_debug(task, "app '%V' prefork", &app->name);
2802
2803 b = NULL;
2804 port_fd = -1;
2805 queue_fd = -1;
2806 }
2807
2808 router_port = rt->port_by_type[NXT_PROCESS_ROUTER];
2809
2810 rpc = nxt_port_rpc_register_handler_ex(task, router_port,
2811 nxt_router_app_prefork_ready,
2812 nxt_router_app_prefork_error,
2813 sizeof(nxt_app_rpc_t));
2814 if (nxt_slow_path(rpc == NULL)) {
2815 goto fail;
2816 }
2817
2818 rpc->app = app;
2819 rpc->temp_conf = tmcf;
2820 rpc->proto = (b != NULL);
2821
2822 stream = nxt_port_rpc_ex_stream(rpc);
2823
2824 ret = nxt_port_socket_write2(task, dport, NXT_PORT_MSG_START_PROCESS,
2825 port_fd, queue_fd, stream, router_port->id, b);
2826 if (nxt_slow_path(ret != NXT_OK)) {
2827 nxt_port_rpc_cancel(task, router_port, stream);
2828 goto fail;
2829 }
2830
2831 if (b == NULL) {
2832 nxt_port_rpc_ex_set_peer(task, router_port, rpc, dport->pid);
2833
2834 app->pending_processes++;
2835 }
2836
2837 return;
2838
2839 fail:
2840
2841 nxt_router_conf_error(task, tmcf);
2842 }
2843
2844
2845 static void
nxt_router_app_prefork_ready(nxt_task_t * task,nxt_port_recv_msg_t * msg,void * data)2846 nxt_router_app_prefork_ready(nxt_task_t *task, nxt_port_recv_msg_t *msg,
2847 void *data)
2848 {
2849 nxt_app_t *app;
2850 nxt_port_t *port;
2851 nxt_app_rpc_t *rpc;
2852 nxt_event_engine_t *engine;
2853
2854 rpc = data;
2855 app = rpc->app;
2856
2857 port = msg->u.new_port;
2858
2859 nxt_assert(port != NULL);
2860 nxt_assert(port->id == 0);
2861
2862 if (rpc->proto) {
2863 nxt_assert(app->proto_port == NULL);
2864 nxt_assert(port->type == NXT_PROCESS_PROTOTYPE);
2865
2866 nxt_port_inc_use(port);
2867
2868 app->proto_port = port;
2869 port->app = app;
2870
2871 nxt_router_app_rpc_create(task, rpc->temp_conf, app);
2872
2873 return;
2874 }
2875
2876 nxt_assert(port->type == NXT_PROCESS_APP);
2877
2878 port->app = app;
2879 port->main_app_port = port;
2880
2881 app->pending_processes--;
2882 app->processes++;
2883 app->idle_processes++;
2884
2885 engine = task->thread->engine;
2886
2887 nxt_queue_insert_tail(&app->ports, &port->app_link);
2888 nxt_queue_insert_tail(&app->spare_ports, &port->idle_link);
2889
2890 nxt_debug(task, "app '%V' move new port %PI:%d to spare_ports",
2891 &app->name, port->pid, port->id);
2892
2893 nxt_port_hash_add(&app->port_hash, port);
2894 app->port_hash_count++;
2895
2896 port->idle_start = 0;
2897
2898 nxt_port_inc_use(port);
2899
2900 nxt_port_socket_write(task, port, NXT_PORT_MSG_PORT_ACK, -1, 0, 0, NULL);
2901
2902 nxt_work_queue_add(&engine->fast_work_queue,
2903 nxt_router_conf_apply, task, rpc->temp_conf, NULL);
2904 }
2905
2906
2907 static void
nxt_router_app_prefork_error(nxt_task_t * task,nxt_port_recv_msg_t * msg,void * data)2908 nxt_router_app_prefork_error(nxt_task_t *task, nxt_port_recv_msg_t *msg,
2909 void *data)
2910 {
2911 nxt_app_t *app;
2912 nxt_app_rpc_t *rpc;
2913 nxt_router_temp_conf_t *tmcf;
2914
2915 rpc = data;
2916 app = rpc->app;
2917 tmcf = rpc->temp_conf;
2918
2919 if (rpc->proto) {
2920 nxt_log(task, NXT_LOG_WARN, "failed to start prototype \"%V\"",
2921 &app->name);
2922
2923 } else {
2924 nxt_log(task, NXT_LOG_WARN, "failed to start application \"%V\"",
2925 &app->name);
2926
2927 app->pending_processes--;
2928 }
2929
2930 nxt_router_conf_error(task, tmcf);
2931 }
2932
2933
2934 static nxt_int_t
nxt_router_engines_create(nxt_task_t * task,nxt_router_t * router,nxt_router_temp_conf_t * tmcf,const nxt_event_interface_t * interface)2935 nxt_router_engines_create(nxt_task_t *task, nxt_router_t *router,
2936 nxt_router_temp_conf_t *tmcf, const nxt_event_interface_t *interface)
2937 {
2938 nxt_int_t ret;
2939 nxt_uint_t n, threads;
2940 nxt_queue_link_t *qlk;
2941 nxt_router_engine_conf_t *recf;
2942
2943 threads = tmcf->router_conf->threads;
2944
2945 tmcf->engines = nxt_array_create(tmcf->mem_pool, threads,
2946 sizeof(nxt_router_engine_conf_t));
2947 if (nxt_slow_path(tmcf->engines == NULL)) {
2948 return NXT_ERROR;
2949 }
2950
2951 n = 0;
2952
2953 for (qlk = nxt_queue_first(&router->engines);
2954 qlk != nxt_queue_tail(&router->engines);
2955 qlk = nxt_queue_next(qlk))
2956 {
2957 recf = nxt_array_zero_add(tmcf->engines);
2958 if (nxt_slow_path(recf == NULL)) {
2959 return NXT_ERROR;
2960 }
2961
2962 recf->engine = nxt_queue_link_data(qlk, nxt_event_engine_t, link0);
2963
2964 if (n < threads) {
2965 recf->action = NXT_ROUTER_ENGINE_KEEP;
2966 ret = nxt_router_engine_conf_update(tmcf, recf);
2967
2968 } else {
2969 recf->action = NXT_ROUTER_ENGINE_DELETE;
2970 ret = nxt_router_engine_conf_delete(tmcf, recf);
2971 }
2972
2973 if (nxt_slow_path(ret != NXT_OK)) {
2974 return ret;
2975 }
2976
2977 n++;
2978 }
2979
2980 tmcf->new_threads = n;
2981
2982 while (n < threads) {
2983 recf = nxt_array_zero_add(tmcf->engines);
2984 if (nxt_slow_path(recf == NULL)) {
2985 return NXT_ERROR;
2986 }
2987
2988 recf->action = NXT_ROUTER_ENGINE_ADD;
2989
2990 recf->engine = nxt_event_engine_create(task, interface, NULL, 0, 0);
2991 if (nxt_slow_path(recf->engine == NULL)) {
2992 return NXT_ERROR;
2993 }
2994
2995 ret = nxt_router_engine_conf_create(tmcf, recf);
2996 if (nxt_slow_path(ret != NXT_OK)) {
2997 return ret;
2998 }
2999
3000 n++;
3001 }
3002
3003 return NXT_OK;
3004 }
3005
3006
3007 static nxt_int_t
nxt_router_engine_conf_create(nxt_router_temp_conf_t * tmcf,nxt_router_engine_conf_t * recf)3008 nxt_router_engine_conf_create(nxt_router_temp_conf_t *tmcf,
3009 nxt_router_engine_conf_t *recf)
3010 {
3011 nxt_int_t ret;
3012
3013 ret = nxt_router_engine_joints_create(tmcf, recf, &creating_sockets,
3014 nxt_router_listen_socket_create);
3015 if (nxt_slow_path(ret != NXT_OK)) {
3016 return ret;
3017 }
3018
3019 ret = nxt_router_engine_joints_create(tmcf, recf, &updating_sockets,
3020 nxt_router_listen_socket_create);
3021 if (nxt_slow_path(ret != NXT_OK)) {
3022 return ret;
3023 }
3024
3025 return ret;
3026 }
3027
3028
3029 static nxt_int_t
nxt_router_engine_conf_update(nxt_router_temp_conf_t * tmcf,nxt_router_engine_conf_t * recf)3030 nxt_router_engine_conf_update(nxt_router_temp_conf_t *tmcf,
3031 nxt_router_engine_conf_t *recf)
3032 {
3033 nxt_int_t ret;
3034
3035 ret = nxt_router_engine_joints_create(tmcf, recf, &creating_sockets,
3036 nxt_router_listen_socket_create);
3037 if (nxt_slow_path(ret != NXT_OK)) {
3038 return ret;
3039 }
3040
3041 ret = nxt_router_engine_joints_create(tmcf, recf, &updating_sockets,
3042 nxt_router_listen_socket_update);
3043 if (nxt_slow_path(ret != NXT_OK)) {
3044 return ret;
3045 }
3046
3047 ret = nxt_router_engine_joints_delete(tmcf, recf, &deleting_sockets);
3048 if (nxt_slow_path(ret != NXT_OK)) {
3049 return ret;
3050 }
3051
3052 return ret;
3053 }
3054
3055
3056 static nxt_int_t
nxt_router_engine_conf_delete(nxt_router_temp_conf_t * tmcf,nxt_router_engine_conf_t * recf)3057 nxt_router_engine_conf_delete(nxt_router_temp_conf_t *tmcf,
3058 nxt_router_engine_conf_t *recf)
3059 {
3060 nxt_int_t ret;
3061
3062 ret = nxt_router_engine_quit(tmcf, recf);
3063 if (nxt_slow_path(ret != NXT_OK)) {
3064 return ret;
3065 }
3066
3067 ret = nxt_router_engine_joints_delete(tmcf, recf, &updating_sockets);
3068 if (nxt_slow_path(ret != NXT_OK)) {
3069 return ret;
3070 }
3071
3072 return nxt_router_engine_joints_delete(tmcf, recf, &deleting_sockets);
3073 }
3074
3075
3076 static nxt_int_t
nxt_router_engine_joints_create(nxt_router_temp_conf_t * tmcf,nxt_router_engine_conf_t * recf,nxt_queue_t * sockets,nxt_work_handler_t handler)3077 nxt_router_engine_joints_create(nxt_router_temp_conf_t *tmcf,
3078 nxt_router_engine_conf_t *recf, nxt_queue_t *sockets,
3079 nxt_work_handler_t handler)
3080 {
3081 nxt_int_t ret;
3082 nxt_joint_job_t *job;
3083 nxt_queue_link_t *qlk;
3084 nxt_socket_conf_t *skcf;
3085 nxt_socket_conf_joint_t *joint;
3086
3087 for (qlk = nxt_queue_first(sockets);
3088 qlk != nxt_queue_tail(sockets);
3089 qlk = nxt_queue_next(qlk))
3090 {
3091 job = nxt_mp_get(tmcf->mem_pool, sizeof(nxt_joint_job_t));
3092 if (nxt_slow_path(job == NULL)) {
3093 return NXT_ERROR;
3094 }
3095
3096 job->work.next = recf->jobs;
3097 recf->jobs = &job->work;
3098
3099 job->task = tmcf->engine->task;
3100 job->work.handler = handler;
3101 job->work.task = &job->task;
3102 job->work.obj = job;
3103 job->tmcf = tmcf;
3104
3105 tmcf->count++;
3106
3107 joint = nxt_mp_alloc(tmcf->router_conf->mem_pool,
3108 sizeof(nxt_socket_conf_joint_t));
3109 if (nxt_slow_path(joint == NULL)) {
3110 return NXT_ERROR;
3111 }
3112
3113 job->work.data = joint;
3114
3115 ret = nxt_upstreams_joint_create(tmcf, &joint->upstreams);
3116 if (nxt_slow_path(ret != NXT_OK)) {
3117 return ret;
3118 }
3119
3120 joint->count = 1;
3121
3122 skcf = nxt_queue_link_data(qlk, nxt_socket_conf_t, link);
3123 skcf->count++;
3124 joint->socket_conf = skcf;
3125
3126 joint->engine = recf->engine;
3127 }
3128
3129 return NXT_OK;
3130 }
3131
3132
3133 static nxt_int_t
nxt_router_engine_quit(nxt_router_temp_conf_t * tmcf,nxt_router_engine_conf_t * recf)3134 nxt_router_engine_quit(nxt_router_temp_conf_t *tmcf,
3135 nxt_router_engine_conf_t *recf)
3136 {
3137 nxt_joint_job_t *job;
3138
3139 job = nxt_mp_get(tmcf->mem_pool, sizeof(nxt_joint_job_t));
3140 if (nxt_slow_path(job == NULL)) {
3141 return NXT_ERROR;
3142 }
3143
3144 job->work.next = recf->jobs;
3145 recf->jobs = &job->work;
3146
3147 job->task = tmcf->engine->task;
3148 job->work.handler = nxt_router_worker_thread_quit;
3149 job->work.task = &job->task;
3150 job->work.obj = NULL;
3151 job->work.data = NULL;
3152 job->tmcf = NULL;
3153
3154 return NXT_OK;
3155 }
3156
3157
3158 static nxt_int_t
nxt_router_engine_joints_delete(nxt_router_temp_conf_t * tmcf,nxt_router_engine_conf_t * recf,nxt_queue_t * sockets)3159