1
2 /*
3 * Copyright (C) Igor Sysoev
4 * Copyright (C) Valentin V. Bartenev
5 * Copyright (C) NGINX, Inc.
6 */
7
8 #include <nxt_router.h>
9 #include <nxt_conf.h>
10 #include <nxt_status.h>
11 #if (NXT_TLS)
12 #include <nxt_cert.h>
13 #endif
14 #include <nxt_http.h>
15 #include <nxt_port_memory_int.h>
16 #include <nxt_unit_request.h>
17 #include <nxt_unit_response.h>
18 #include <nxt_router_request.h>
19 #include <nxt_app_queue.h>
20 #include <nxt_port_queue.h>
21
22 #define NXT_SHARED_PORT_ID 0xFFFFu
23
24 typedef struct {
25 nxt_str_t type;
26 uint32_t processes;
27 uint32_t max_processes;
28 uint32_t spare_processes;
29 nxt_msec_t timeout;
30 nxt_msec_t idle_timeout;
31 nxt_conf_value_t *limits_value;
32 nxt_conf_value_t *processes_value;
33 nxt_conf_value_t *targets_value;
34 } nxt_router_app_conf_t;
35
36
37 typedef struct {
38 nxt_str_t pass;
39 nxt_str_t application;
40 } nxt_router_listener_conf_t;
41
42
43 #if (NXT_TLS)
44
45 typedef struct {
46 nxt_str_t name;
47 nxt_socket_conf_t *socket_conf;
48 nxt_router_temp_conf_t *temp_conf;
49 nxt_tls_init_t *tls_init;
50 nxt_bool_t last;
51
52 nxt_queue_link_t link; /* for nxt_socket_conf_t.tls */
53 } nxt_router_tlssock_t;
54
55 #endif
56
57
58 typedef struct {
59 nxt_str_t *name;
60 nxt_socket_conf_t *socket_conf;
61 nxt_router_temp_conf_t *temp_conf;
62 nxt_bool_t last;
63 } nxt_socket_rpc_t;
64
65
66 typedef struct {
67 nxt_app_t *app;
68 nxt_router_temp_conf_t *temp_conf;
69 uint8_t proto; /* 1 bit */
70 } nxt_app_rpc_t;
71
72
73 typedef struct {
74 nxt_app_joint_t *app_joint;
75 uint32_t generation;
76 uint8_t proto; /* 1 bit */
77 } nxt_app_joint_rpc_t;
78
79
80 static nxt_int_t nxt_router_prefork(nxt_task_t *task, nxt_process_t *process,
81 nxt_mp_t *mp);
82 static nxt_int_t nxt_router_start(nxt_task_t *task, nxt_process_data_t *data);
83 static void nxt_router_greet_controller(nxt_task_t *task,
84 nxt_port_t *controller_port);
85
86 static nxt_int_t nxt_router_start_app_process(nxt_task_t *task, nxt_app_t *app);
87
88 static void nxt_router_new_port_handler(nxt_task_t *task,
89 nxt_port_recv_msg_t *msg);
90 static void nxt_router_conf_data_handler(nxt_task_t *task,
91 nxt_port_recv_msg_t *msg);
92 static void nxt_router_app_restart_handler(nxt_task_t *task,
93 nxt_port_recv_msg_t *msg);
94 static void nxt_router_status_handler(nxt_task_t *task,
95 nxt_port_recv_msg_t *msg);
96 static void nxt_router_remove_pid_handler(nxt_task_t *task,
97 nxt_port_recv_msg_t *msg);
98
99 static nxt_router_temp_conf_t *nxt_router_temp_conf(nxt_task_t *task);
100 static void nxt_router_conf_ready(nxt_task_t *task,
101 nxt_router_temp_conf_t *tmcf);
102 static void nxt_router_conf_send(nxt_task_t *task,
103 nxt_router_temp_conf_t *tmcf, nxt_port_msg_type_t type);
104
105 static nxt_int_t nxt_router_conf_create(nxt_task_t *task,
106 nxt_router_temp_conf_t *tmcf, u_char *start, u_char *end);
107 static nxt_int_t nxt_router_conf_process_static(nxt_task_t *task,
108 nxt_router_conf_t *rtcf, nxt_conf_value_t *conf);
109 static nxt_http_forward_t *nxt_router_conf_forward(nxt_task_t *task,
110 nxt_mp_t *mp, nxt_conf_value_t *conf);
111 static nxt_int_t nxt_router_conf_forward_header(nxt_mp_t *mp,
112 nxt_conf_value_t *conf, nxt_http_forward_header_t *fh);
113
114 static nxt_app_t *nxt_router_app_find(nxt_queue_t *queue, nxt_str_t *name);
115 static nxt_int_t nxt_router_apps_hash_test(nxt_lvlhsh_query_t *lhq, void *data);
116 static nxt_int_t nxt_router_apps_hash_add(nxt_router_conf_t *rtcf,
117 nxt_app_t *app);
118 static nxt_app_t *nxt_router_apps_hash_get(nxt_router_conf_t *rtcf,
119 nxt_str_t *name);
120 static void nxt_router_apps_hash_use(nxt_task_t *task, nxt_router_conf_t *rtcf,
121 int i);
122
123 static nxt_int_t nxt_router_app_queue_init(nxt_task_t *task,
124 nxt_port_t *port);
125 static nxt_int_t nxt_router_port_queue_init(nxt_task_t *task,
126 nxt_port_t *port);
127 static nxt_int_t nxt_router_port_queue_map(nxt_task_t *task,
128 nxt_port_t *port, nxt_fd_t fd);
129 static void nxt_router_listen_socket_rpc_create(nxt_task_t *task,
130 nxt_router_temp_conf_t *tmcf, nxt_socket_conf_t *skcf);
131 static void nxt_router_listen_socket_ready(nxt_task_t *task,
132 nxt_port_recv_msg_t *msg, void *data);
133 static void nxt_router_listen_socket_error(nxt_task_t *task,
134 nxt_port_recv_msg_t *msg, void *data);
135 #if (NXT_TLS)
136 static void nxt_router_tls_rpc_handler(nxt_task_t *task,
137 nxt_port_recv_msg_t *msg, void *data);
138 static nxt_int_t nxt_router_conf_tls_insert(nxt_router_temp_conf_t *tmcf,
139 nxt_conf_value_t *value, nxt_socket_conf_t *skcf, nxt_tls_init_t *tls_init,
140 nxt_bool_t last);
141 #endif
142 static void nxt_router_app_rpc_create(nxt_task_t *task,
143 nxt_router_temp_conf_t *tmcf, nxt_app_t *app);
144 static void nxt_router_app_prefork_ready(nxt_task_t *task,
145 nxt_port_recv_msg_t *msg, void *data);
146 static void nxt_router_app_prefork_error(nxt_task_t *task,
147 nxt_port_recv_msg_t *msg, void *data);
148 static nxt_socket_conf_t *nxt_router_socket_conf(nxt_task_t *task,
149 nxt_router_temp_conf_t *tmcf, nxt_str_t *name);
150 static nxt_int_t nxt_router_listen_socket_find(nxt_router_temp_conf_t *tmcf,
151 nxt_socket_conf_t *nskcf, nxt_sockaddr_t *sa);
152
153 static nxt_int_t nxt_router_engines_create(nxt_task_t *task,
154 nxt_router_t *router, nxt_router_temp_conf_t *tmcf,
155 const nxt_event_interface_t *interface);
156 static nxt_int_t nxt_router_engine_conf_create(nxt_router_temp_conf_t *tmcf,
157 nxt_router_engine_conf_t *recf);
158 static nxt_int_t nxt_router_engine_conf_update(nxt_router_temp_conf_t *tmcf,
159 nxt_router_engine_conf_t *recf);
160 static nxt_int_t nxt_router_engine_conf_delete(nxt_router_temp_conf_t *tmcf,
161 nxt_router_engine_conf_t *recf);
162 static nxt_int_t nxt_router_engine_joints_create(nxt_router_temp_conf_t *tmcf,
163 nxt_router_engine_conf_t *recf, nxt_queue_t *sockets,
164 nxt_work_handler_t handler);
165 static nxt_int_t nxt_router_engine_quit(nxt_router_temp_conf_t *tmcf,
166 nxt_router_engine_conf_t *recf);
167 static nxt_int_t nxt_router_engine_joints_delete(nxt_router_temp_conf_t *tmcf,
168 nxt_router_engine_conf_t *recf, nxt_queue_t *sockets);
169
170 static nxt_int_t nxt_router_threads_create(nxt_task_t *task, nxt_runtime_t *rt,
171 nxt_router_temp_conf_t *tmcf);
172 static nxt_int_t nxt_router_thread_create(nxt_task_t *task, nxt_runtime_t *rt,
173 nxt_event_engine_t *engine);
174 static void nxt_router_apps_sort(nxt_task_t *task, nxt_router_t *router,
175 nxt_router_temp_conf_t *tmcf);
176
177 static void nxt_router_engines_post(nxt_router_t *router,
178 nxt_router_temp_conf_t *tmcf);
179 static void nxt_router_engine_post(nxt_event_engine_t *engine,
180 nxt_work_t *jobs);
181
182 static void nxt_router_thread_start(void *data);
183 static void nxt_router_rt_add_port(nxt_task_t *task, void *obj,
184 void *data);
185 static void nxt_router_listen_socket_create(nxt_task_t *task, void *obj,
186 void *data);
187 static void nxt_router_listen_socket_update(nxt_task_t *task, void *obj,
188 void *data);
189 static void nxt_router_listen_socket_delete(nxt_task_t *task, void *obj,
190 void *data);
191 static void nxt_router_worker_thread_quit(nxt_task_t *task, void *obj,
192 void *data);
193 static void nxt_router_listen_socket_close(nxt_task_t *task, void *obj,
194 void *data);
195 static void nxt_router_thread_exit_handler(nxt_task_t *task, void *obj,
196 void *data);
197 static void nxt_router_req_headers_ack_handler(nxt_task_t *task,
198 nxt_port_recv_msg_t *msg, nxt_request_rpc_data_t *req_rpc_data);
199 static void nxt_router_listen_socket_release(nxt_task_t *task,
200 nxt_socket_conf_t *skcf);
201
202 static void nxt_router_app_port_ready(nxt_task_t *task,
203 nxt_port_recv_msg_t *msg, void *data);
204 static void nxt_router_app_port_error(nxt_task_t *task,
205 nxt_port_recv_msg_t *msg, void *data);
206
207 static void nxt_router_app_use(nxt_task_t *task, nxt_app_t *app, int i);
208 static void nxt_router_app_unlink(nxt_task_t *task, nxt_app_t *app);
209
210 static void nxt_router_app_port_release(nxt_task_t *task, nxt_app_t *app,
211 nxt_port_t *port, nxt_apr_action_t action);
212 static void nxt_router_app_port_get(nxt_task_t *task, nxt_app_t *app,
213 nxt_request_rpc_data_t *req_rpc_data);
214 static void nxt_router_http_request_error(nxt_task_t *task, void *obj,
215 void *data);
216 static void nxt_router_http_request_done(nxt_task_t *task, void *obj,
217 void *data);
218
219 static void nxt_router_app_prepare_request(nxt_task_t *task,
220 nxt_request_rpc_data_t *req_rpc_data);
221 static nxt_buf_t *nxt_router_prepare_msg(nxt_task_t *task,
222 nxt_http_request_t *r, nxt_app_t *app, const nxt_str_t *prefix);
223
224 static void nxt_router_app_timeout(nxt_task_t *task, void *obj, void *data);
225 static void nxt_router_adjust_idle_timer(nxt_task_t *task, void *obj,
226 void *data);
227 static void nxt_router_app_idle_timeout(nxt_task_t *task, void *obj,
228 void *data);
229 static void nxt_router_app_joint_release_handler(nxt_task_t *task, void *obj,
230 void *data);
231 static void nxt_router_free_app(nxt_task_t *task, void *obj, void *data);
232
233 static const nxt_http_request_state_t nxt_http_request_send_state;
234 static void nxt_http_request_send_body(nxt_task_t *task, void *obj, void *data);
235
236 static void nxt_router_app_joint_use(nxt_task_t *task,
237 nxt_app_joint_t *app_joint, int i);
238
239 static void nxt_router_http_request_release_post(nxt_task_t *task,
240 nxt_http_request_t *r);
241 static void nxt_router_http_request_release(nxt_task_t *task, void *obj,
242 void *data);
243 static void nxt_router_oosm_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg);
244 static void nxt_router_get_port_handler(nxt_task_t *task,
245 nxt_port_recv_msg_t *msg);
246 static void nxt_router_get_mmap_handler(nxt_task_t *task,
247 nxt_port_recv_msg_t *msg);
248
249 extern const nxt_http_request_state_t nxt_http_websocket;
250
251 nxt_router_t *nxt_router;
252
253 static const nxt_str_t http_prefix = nxt_string("HTTP_");
254 static const nxt_str_t empty_prefix = nxt_string("");
255
256 static const nxt_str_t *nxt_app_msg_prefix[] = {
257 &empty_prefix,
258 &empty_prefix,
259 &http_prefix,
260 &http_prefix,
261 &http_prefix,
262 &empty_prefix,
263 };
264
265
266 static const nxt_port_handlers_t nxt_router_process_port_handlers = {
267 .quit = nxt_signal_quit_handler,
268 .new_port = nxt_router_new_port_handler,
269 .get_port = nxt_router_get_port_handler,
270 .change_file = nxt_port_change_log_file_handler,
271 .mmap = nxt_port_mmap_handler,
272 .get_mmap = nxt_router_get_mmap_handler,
273 .data = nxt_router_conf_data_handler,
274 .app_restart = nxt_router_app_restart_handler,
275 .status = nxt_router_status_handler,
276 .remove_pid = nxt_router_remove_pid_handler,
277 .access_log = nxt_router_access_log_reopen_handler,
278 .rpc_ready = nxt_port_rpc_handler,
279 .rpc_error = nxt_port_rpc_handler,
280 .oosm = nxt_router_oosm_handler,
281 };
282
283
284 const nxt_process_init_t nxt_router_process = {
285 .name = "router",
286 .type = NXT_PROCESS_ROUTER,
287 .prefork = nxt_router_prefork,
288 .restart = 1,
289 .setup = nxt_process_core_setup,
290 .start = nxt_router_start,
291 .port_handlers = &nxt_router_process_port_handlers,
292 .signals = nxt_process_signals,
293 };
294
295
296 /* Queues of nxt_socket_conf_t */
297 nxt_queue_t creating_sockets;
298 nxt_queue_t pending_sockets;
299 nxt_queue_t updating_sockets;
300 nxt_queue_t keeping_sockets;
301 nxt_queue_t deleting_sockets;
302
303
304 static nxt_int_t
nxt_router_prefork(nxt_task_t * task,nxt_process_t * process,nxt_mp_t * mp)305 nxt_router_prefork(nxt_task_t *task, nxt_process_t *process, nxt_mp_t *mp)
306 {
307 nxt_runtime_stop_app_processes(task, task->thread->runtime);
308
309 return NXT_OK;
310 }
311
312
313 static nxt_int_t
nxt_router_start(nxt_task_t * task,nxt_process_data_t * data)314 nxt_router_start(nxt_task_t *task, nxt_process_data_t *data)
315 {
316 nxt_int_t ret;
317 nxt_port_t *controller_port;
318 nxt_router_t *router;
319 nxt_runtime_t *rt;
320
321 rt = task->thread->runtime;
322
323 nxt_log(task, NXT_LOG_INFO, "router started");
324
325 #if (NXT_TLS)
326 rt->tls = nxt_service_get(rt->services, "SSL/TLS", "OpenSSL");
327 if (nxt_slow_path(rt->tls == NULL)) {
328 return NXT_ERROR;
329 }
330
331 ret = rt->tls->library_init(task);
332 if (nxt_slow_path(ret != NXT_OK)) {
333 return ret;
334 }
335 #endif
336
337 ret = nxt_http_init(task);
338 if (nxt_slow_path(ret != NXT_OK)) {
339 return ret;
340 }
341
342 router = nxt_zalloc(sizeof(nxt_router_t));
343 if (nxt_slow_path(router == NULL)) {
344 return NXT_ERROR;
345 }
346
347 nxt_queue_init(&router->engines);
348 nxt_queue_init(&router->sockets);
349 nxt_queue_init(&router->apps);
350
351 nxt_router = router;
352
353 controller_port = rt->port_by_type[NXT_PROCESS_CONTROLLER];
354 if (controller_port != NULL) {
355 nxt_router_greet_controller(task, controller_port);
356 }
357
358 return NXT_OK;
359 }
360
361
362 static void
nxt_router_greet_controller(nxt_task_t * task,nxt_port_t * controller_port)363 nxt_router_greet_controller(nxt_task_t *task, nxt_port_t *controller_port)
364 {
365 nxt_port_socket_write(task, controller_port, NXT_PORT_MSG_PROCESS_READY,
366 -1, 0, 0, NULL);
367 }
368
369
370 static void
nxt_router_start_app_process_handler(nxt_task_t * task,nxt_port_t * port,void * data)371 nxt_router_start_app_process_handler(nxt_task_t *task, nxt_port_t *port,
372 void *data)
373 {
374 size_t size;
375 uint32_t stream;
376 nxt_fd_t port_fd, queue_fd;
377 nxt_int_t ret;
378 nxt_app_t *app;
379 nxt_buf_t *b;
380 nxt_port_t *dport;
381 nxt_runtime_t *rt;
382 nxt_app_joint_rpc_t *app_joint_rpc;
383
384 app = data;
385
386 nxt_thread_mutex_lock(&app->mutex);
387
388 dport = app->proto_port;
389
390 nxt_thread_mutex_unlock(&app->mutex);
391
392 if (dport != NULL) {
393 nxt_debug(task, "app '%V' %p start process", &app->name, app);
394
395 b = NULL;
396 port_fd = -1;
397 queue_fd = -1;
398
399 } else {
400 if (app->proto_port_requests > 0) {
401 nxt_debug(task, "app '%V' %p wait for prototype process",
402 &app->name, app);
403
404 app->proto_port_requests++;
405
406 goto skip;
407 }
408
409 nxt_debug(task, "app '%V' %p start prototype process", &app->name, app);
410
411 rt = task->thread->runtime;
412 dport = rt->port_by_type[NXT_PROCESS_MAIN];
413
414 size = app->name.length + 1 + app->conf.length;
415
416 b = nxt_buf_mem_alloc(task->thread->engine->mem_pool, size, 0);
417 if (nxt_slow_path(b == NULL)) {
418 goto failed;
419 }
420
421 nxt_buf_cpystr(b, &app->name);
422 *b->mem.free++ = '\0';
423 nxt_buf_cpystr(b, &app->conf);
424
425 port_fd = app->shared_port->pair[0];
426 queue_fd = app->shared_port->queue_fd;
427 }
428
429 app_joint_rpc = nxt_port_rpc_register_handler_ex(task, port,
430 nxt_router_app_port_ready,
431 nxt_router_app_port_error,
432 sizeof(nxt_app_joint_rpc_t));
433 if (nxt_slow_path(app_joint_rpc == NULL)) {
434 goto failed;
435 }
436
437 stream = nxt_port_rpc_ex_stream(app_joint_rpc);
438
439 ret = nxt_port_socket_write2(task, dport, NXT_PORT_MSG_START_PROCESS,
440 port_fd, queue_fd, stream, port->id, b);
441 if (nxt_slow_path(ret != NXT_OK)) {
442 nxt_port_rpc_cancel(task, port, stream);
443
444 goto failed;
445 }
446
447 app_joint_rpc->app_joint = app->joint;
448 app_joint_rpc->generation = app->generation;
449 app_joint_rpc->proto = (b != NULL);
450
451 if (b != NULL) {
452 app->proto_port_requests++;
453
454 b = NULL;
455 }
456
457 nxt_router_app_joint_use(task, app->joint, 1);
458
459 failed:
460
461 if (b != NULL) {
462 nxt_mp_free(b->data, b);
463 }
464
465 skip:
466
467 nxt_router_app_use(task, app, -1);
468 }
469
470
471 static void
nxt_router_app_joint_use(nxt_task_t * task,nxt_app_joint_t * app_joint,int i)472 nxt_router_app_joint_use(nxt_task_t *task, nxt_app_joint_t *app_joint, int i)
473 {
474 app_joint->use_count += i;
475
476 if (app_joint->use_count == 0) {
477 nxt_assert(app_joint->app == NULL);
478
479 nxt_free(app_joint);
480 }
481 }
482
483
484 static nxt_int_t
nxt_router_start_app_process(nxt_task_t * task,nxt_app_t * app)485 nxt_router_start_app_process(nxt_task_t *task, nxt_app_t *app)
486 {
487 nxt_int_t res;
488 nxt_port_t *router_port;
489 nxt_runtime_t *rt;
490
491 nxt_debug(task, "app '%V' start process", &app->name);
492
493 rt = task->thread->runtime;
494 router_port = rt->port_by_type[NXT_PROCESS_ROUTER];
495
496 nxt_router_app_use(task, app, 1);
497
498 res = nxt_port_post(task, router_port, nxt_router_start_app_process_handler,
499 app);
500
501 if (res == NXT_OK) {
502 return res;
503 }
504
505 nxt_thread_mutex_lock(&app->mutex);
506
507 app->pending_processes--;
508
509 nxt_thread_mutex_unlock(&app->mutex);
510
511 nxt_router_app_use(task, app, -1);
512
513 return NXT_ERROR;
514 }
515
516
517 nxt_inline nxt_bool_t
nxt_router_msg_cancel(nxt_task_t * task,nxt_request_rpc_data_t * req_rpc_data)518 nxt_router_msg_cancel(nxt_task_t *task, nxt_request_rpc_data_t *req_rpc_data)
519 {
520 nxt_buf_t *b, *next;
521 nxt_bool_t cancelled;
522 nxt_port_t *app_port;
523 nxt_msg_info_t *msg_info;
524
525 msg_info = &req_rpc_data->msg_info;
526
527 if (msg_info->buf == NULL) {
528 return 0;
529 }
530
531 app_port = req_rpc_data->app_port;
532
533 if (app_port != NULL && app_port->id == NXT_SHARED_PORT_ID) {
534 cancelled = nxt_app_queue_cancel(app_port->queue,
535 msg_info->tracking_cookie,
536 req_rpc_data->stream);
537
538 if (cancelled) {
539 nxt_debug(task, "stream #%uD: cancelled by router",
540 req_rpc_data->stream);
541 }
542
543 } else {
544 cancelled = 0;
545 }
546
547 for (b = msg_info->buf; b != NULL; b = next) {
548 next = b->next;
549 b->next = NULL;
550
551 if (b->is_port_mmap_sent) {
552 b->is_port_mmap_sent = cancelled == 0;
553 }
554
555 b->completion_handler(task, b, b->parent);
556 }
557
558 msg_info->buf = NULL;
559
560 return cancelled;
561 }
562
563
564 nxt_inline nxt_bool_t
nxt_queue_chk_remove(nxt_queue_link_t * lnk)565 nxt_queue_chk_remove(nxt_queue_link_t *lnk)
566 {
567 if (lnk->next != NULL) {
568 nxt_queue_remove(lnk);
569
570 lnk->next = NULL;
571
572 return 1;
573 }
574
575 return 0;
576 }
577
578
579 nxt_inline void
nxt_request_rpc_data_unlink(nxt_task_t * task,nxt_request_rpc_data_t * req_rpc_data)580 nxt_request_rpc_data_unlink(nxt_task_t *task,
581 nxt_request_rpc_data_t *req_rpc_data)
582 {
583 nxt_app_t *app;
584 nxt_bool_t unlinked;
585 nxt_http_request_t *r;
586
587 nxt_router_msg_cancel(task, req_rpc_data);
588
589 app = req_rpc_data->app;
590
591 if (req_rpc_data->app_port != NULL) {
592 nxt_router_app_port_release(task, app, req_rpc_data->app_port,
593 req_rpc_data->apr_action);
594
595 req_rpc_data->app_port = NULL;
596 }
597
598 r = req_rpc_data->request;
599
600 if (r != NULL) {
601 r->timer_data = NULL;
602
603 nxt_router_http_request_release_post(task, r);
604
605 r->req_rpc_data = NULL;
606 req_rpc_data->request = NULL;
607
608 if (app != NULL) {
609 unlinked = 0;
610
611 nxt_thread_mutex_lock(&app->mutex);
612
613 if (r->app_link.next != NULL) {
614 nxt_queue_remove(&r->app_link);
615 r->app_link.next = NULL;
616
617 unlinked = 1;
618 }
619
620 nxt_thread_mutex_unlock(&app->mutex);
621
622 if (unlinked) {
623 nxt_mp_release(r->mem_pool);
624 }
625 }
626 }
627
628 if (app != NULL) {
629 nxt_router_app_use(task, app, -1);
630
631 req_rpc_data->app = NULL;
632 }
633
634 if (req_rpc_data->msg_info.body_fd != -1) {
635 nxt_fd_close(req_rpc_data->msg_info.body_fd);
636
637 req_rpc_data->msg_info.body_fd = -1;
638 }
639
640 if (req_rpc_data->rpc_cancel) {
641 req_rpc_data->rpc_cancel = 0;
642
643 nxt_port_rpc_cancel(task, task->thread->engine->port,
644 req_rpc_data->stream);
645 }
646 }
647
648
649 static void
nxt_router_new_port_handler(nxt_task_t * task,nxt_port_recv_msg_t * msg)650 nxt_router_new_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
651 {
652 nxt_int_t res;
653 nxt_app_t *app;
654 nxt_port_t *port, *main_app_port;
655 nxt_runtime_t *rt;
656
657 nxt_port_new_port_handler(task, msg);
658
659 port = msg->u.new_port;
660
661 if (port != NULL && port->type == NXT_PROCESS_CONTROLLER) {
662 nxt_router_greet_controller(task, msg->u.new_port);
663 }
664
665 if (port != NULL && port->type == NXT_PROCESS_PROTOTYPE) {
666 nxt_port_rpc_handler(task, msg);
667
668 return;
669 }
670
671 if (port == NULL || port->type != NXT_PROCESS_APP) {
672
673 if (msg->port_msg.stream == 0) {
674 return;
675 }
676
677 msg->port_msg.type = _NXT_PORT_MSG_RPC_ERROR;
678
679 } else {
680 if (msg->fd[1] != -1) {
681 res = nxt_router_port_queue_map(task, port, msg->fd[1]);
682 if (nxt_slow_path(res != NXT_OK)) {
683 return;
684 }
685
686 nxt_fd_close(msg->fd[1]);
687 msg->fd[1] = -1;
688 }
689 }
690
691 if (msg->port_msg.stream != 0) {
692 nxt_port_rpc_handler(task, msg);
693 return;
694 }
695
696 nxt_debug(task, "new port id %d (%d)", port->id, port->type);
697
698 /*
699 * Port with "id == 0" is application 'main' port and it always
700 * should come with non-zero stream.
701 */
702 nxt_assert(port->id != 0);
703
704 /* Find 'main' app port and get app reference. */
705 rt = task->thread->runtime;
706
707 /*
708 * It is safe to access 'runtime->ports' hash because 'NEW_PORT'
709 * sent to main port (with id == 0) and processed in main thread.
710 */
711 main_app_port = nxt_port_hash_find(&rt->ports, port->pid, 0);
712 nxt_assert(main_app_port != NULL);
713
714 app = main_app_port->app;
715
716 if (nxt_fast_path(app != NULL)) {
717 nxt_thread_mutex_lock(&app->mutex);
718
719 /* TODO here should be find-and-add code because there can be
720 port waiters in port_hash */
721 nxt_port_hash_add(&app->port_hash, port);
722 app->port_hash_count++;
723
724 nxt_thread_mutex_unlock(&app->mutex);
725
726 port->app = app;
727 }
728
729 port->main_app_port = main_app_port;
730
731 nxt_port_socket_write(task, port, NXT_PORT_MSG_PORT_ACK, -1, 0, 0, NULL);
732 }
733
734
735 static void
nxt_router_conf_data_handler(nxt_task_t * task,nxt_port_recv_msg_t * msg)736 nxt_router_conf_data_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
737 {
738 void *p;
739 size_t size;
740 nxt_int_t ret;
741 nxt_port_t *port;
742 nxt_router_temp_conf_t *tmcf;
743
744 port = nxt_runtime_port_find(task->thread->runtime,
745 msg->port_msg.pid,
746 msg->port_msg.reply_port);
747 if (nxt_slow_path(port == NULL)) {
748 nxt_alert(task, "conf_data_handler: reply port not found");
749 return;
750 }
751
752 p = MAP_FAILED;
753
754 /*
755 * Ancient compilers like gcc 4.8.5 on CentOS 7 wants 'size' to be
756 * initialized in 'cleanup' section.
757 */
758 size = 0;
759
760 tmcf = nxt_router_temp_conf(task);
761 if (nxt_slow_path(tmcf == NULL)) {
762 goto fail;
763 }
764
765 if (nxt_slow_path(msg->fd[0] == -1)) {
766 nxt_alert(task, "conf_data_handler: invalid shm fd");
767 goto fail;
768 }
769
770 if (nxt_buf_mem_used_size(&msg->buf->mem) != sizeof(size_t)) {
771 nxt_alert(task, "conf_data_handler: unexpected buffer size (%d)",
772 (int) nxt_buf_mem_used_size(&msg->buf->mem));
773 goto fail;
774 }
775
776 nxt_memcpy(&size, msg->buf->mem.pos, sizeof(size_t));
777
778 p = nxt_mem_mmap(NULL, size, PROT_READ, MAP_SHARED, msg->fd[0], 0);
779
780 nxt_fd_close(msg->fd[0]);
781 msg->fd[0] = -1;
782
783 if (nxt_slow_path(p == MAP_FAILED)) {
784 goto fail;
785 }
786
787 nxt_debug(task, "conf_data_handler(%uz): %*s", size, size, p);
788
789 tmcf->router_conf->router = nxt_router;
790 tmcf->stream = msg->port_msg.stream;
791 tmcf->port = port;
792
793 nxt_port_use(task, tmcf->port, 1);
794
795 ret = nxt_router_conf_create(task, tmcf, p, nxt_pointer_to(p, size));
796
797 if (nxt_fast_path(ret == NXT_OK)) {
798 nxt_router_conf_apply(task, tmcf, NULL);
799
800 } else {
801 nxt_router_conf_error(task, tmcf);
802 }
803
804 goto cleanup;
805
806 fail:
807
808 nxt_port_socket_write(task, port, NXT_PORT_MSG_RPC_ERROR, -1,
809 msg->port_msg.stream, 0, NULL);
810
811 if (tmcf != NULL) {
812 nxt_mp_release(tmcf->mem_pool);
813 }
814
815 cleanup:
816
817 if (p != MAP_FAILED) {
818 nxt_mem_munmap(p, size);
819 }
820
821 if (msg->fd[0] != -1) {
822 nxt_fd_close(msg->fd[0]);
823 msg->fd[0] = -1;
824 }
825 }
826
827
828 static void
nxt_router_app_restart_handler(nxt_task_t * task,nxt_port_recv_msg_t * msg)829 nxt_router_app_restart_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
830 {
831 nxt_app_t *app;
832 nxt_int_t ret;
833 nxt_str_t app_name;
834 nxt_port_t *reply_port, *shared_port, *old_shared_port;
835 nxt_port_t *proto_port;
836 nxt_port_msg_type_t reply;
837
838 reply_port = nxt_runtime_port_find(task->thread->runtime,
839 msg->port_msg.pid,
840 msg->port_msg.reply_port);
841 if (nxt_slow_path(reply_port == NULL)) {
842 nxt_alert(task, "app_restart_handler: reply port not found");
843 return;
844 }
845
846 app_name.length = nxt_buf_mem_used_size(&msg->buf->mem);
847 app_name.start = msg->buf->mem.pos;
848
849 nxt_debug(task, "app_restart_handler: %V", &app_name);
850
851 app = nxt_router_app_find(&nxt_router->apps, &app_name);
852
853 if (nxt_fast_path(app != NULL)) {
854 shared_port = nxt_port_new(task, NXT_SHARED_PORT_ID, nxt_pid,
855 NXT_PROCESS_APP);
856 if (nxt_slow_path(shared_port == NULL)) {
857 goto fail;
858 }
859
860 ret = nxt_port_socket_init(task, shared_port, 0);
861 if (nxt_slow_path(ret != NXT_OK)) {
862 nxt_port_use(task, shared_port, -1);
863 goto fail;
864 }
865
866 ret = nxt_router_app_queue_init(task, shared_port);
867 if (nxt_slow_path(ret != NXT_OK)) {
868 nxt_port_write_close(shared_port);
869 nxt_port_read_close(shared_port);
870 nxt_port_use(task, shared_port, -1);
871 goto fail;
872 }
873
874 nxt_port_write_enable(task, shared_port);
875
876 nxt_thread_mutex_lock(&app->mutex);
877
878 proto_port = app->proto_port;
879
880 if (proto_port != NULL) {
881 nxt_debug(task, "send QUIT to prototype '%V' pid %PI", &app->name,
882 proto_port->pid);
883
884 app->proto_port = NULL;
885 proto_port->app = NULL;
886 }
887
888 app->generation++;
889
890 shared_port->app = app;
891
892 old_shared_port = app->shared_port;
893 old_shared_port->app = NULL;
894
895 app->shared_port = shared_port;
896
897 nxt_thread_mutex_unlock(&app->mutex);
898
899 nxt_port_close(task, old_shared_port);
900 nxt_port_use(task, old_shared_port, -1);
901
902 if (proto_port != NULL) {
903 (void) nxt_port_socket_write(task, proto_port, NXT_PORT_MSG_QUIT,
904 -1, 0, 0, NULL);
905
906 nxt_port_close(task, proto_port);
907
908 nxt_port_use(task, proto_port, -1);
909 }
910
911 reply = NXT_PORT_MSG_RPC_READY_LAST;
912
913 } else {
914
915 fail:
916
917 reply = NXT_PORT_MSG_RPC_ERROR;
918 }
919
920 nxt_port_socket_write(task, reply_port, reply, -1, msg->port_msg.stream,
921 0, NULL);
922 }
923
924
925 static void
nxt_router_status_handler(nxt_task_t * task,nxt_port_recv_msg_t * msg)926 nxt_router_status_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
927 {
928 u_char *p;
929 size_t alloc;
930 nxt_app_t *app;
931 nxt_buf_t *b;
932 nxt_uint_t type;
933 nxt_port_t *port;
934 nxt_status_app_t *app_stat;
935 nxt_event_engine_t *engine;
936 nxt_status_report_t *report;
937
938 port = nxt_runtime_port_find(task->thread->runtime,
939 msg->port_msg.pid,
940 msg->port_msg.reply_port);
941 if (nxt_slow_path(port == NULL)) {
942 nxt_alert(task, "nxt_router_status_handler(): reply port not found");
943 return;
944 }
945
946 alloc = sizeof(nxt_status_report_t);
947
948 nxt_queue_each(app, &nxt_router->apps, nxt_app_t, link) {
949
950 alloc += sizeof(nxt_status_app_t) + app->name.length;
951
952 } nxt_queue_loop;
953
954 b = nxt_buf_mem_alloc(port->mem_pool, alloc, 0);
955 if (nxt_slow_path(b == NULL)) {
956 type = NXT_PORT_MSG_RPC_ERROR;
957 goto fail;
958 }
959
960 report = (nxt_status_report_t *) b->mem.free;
961 b->mem.free = b->mem.end;
962
963 nxt_memzero(report, sizeof(nxt_status_report_t));
964
965 nxt_queue_each(engine, &nxt_router->engines, nxt_event_engine_t, link0) {
966
967 report->accepted_conns += engine->accepted_conns_cnt;
968 report->idle_conns += engine->idle_conns_cnt;
969 report->closed_conns += engine->closed_conns_cnt;
970 report->requests += engine->requests_cnt;
971
972 } nxt_queue_loop;
973
974 report->apps_count = 0;
975 app_stat = report->apps;
976 p = b->mem.end;
977
978 nxt_queue_each(app, &nxt_router->apps, nxt_app_t, link) {
979 p -= app->name.length;
980
981 nxt_memcpy(p, app->name.start, app->name.length);
982
983 app_stat->name.length = app->name.length;
984 app_stat->name.start = (u_char *) (p - b->mem.pos);
985
986 app_stat->active_requests = app->active_requests;
987 app_stat->pending_processes = app->pending_processes;
988 app_stat->processes = app->processes;
989 app_stat->idle_processes = app->idle_processes;
990
991 report->apps_count++;
992 app_stat++;
993 } nxt_queue_loop;
994
995 type = NXT_PORT_MSG_RPC_READY_LAST;
996
997 fail:
998
999 nxt_port_socket_write(task, port, type, -1, msg->port_msg.stream, 0, b);
1000 }
1001
1002
1003 static void
nxt_router_app_process_remove_pid(nxt_task_t * task,nxt_port_t * port,void * data)1004 nxt_router_app_process_remove_pid(nxt_task_t *task, nxt_port_t *port,
1005 void *data)
1006 {
1007 union {
1008 nxt_pid_t removed_pid;
1009 void *data;
1010 } u;
1011
1012 u.data = data;
1013
1014 nxt_port_rpc_remove_peer(task, port, u.removed_pid);
1015 }
1016
1017
1018 static void
nxt_router_remove_pid_handler(nxt_task_t * task,nxt_port_recv_msg_t * msg)1019 nxt_router_remove_pid_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
1020 {
1021 nxt_event_engine_t *engine;
1022
1023 nxt_port_remove_pid_handler(task, msg);
1024
1025 nxt_queue_each(engine, &nxt_router->engines, nxt_event_engine_t, link0)
1026 {
1027 if (nxt_fast_path(engine->port != NULL)) {
1028 nxt_port_post(task, engine->port, nxt_router_app_process_remove_pid,
1029 msg->u.data);
1030 }
1031 }
1032 nxt_queue_loop;
1033
1034 if (msg->port_msg.stream == 0) {
1035 return;
1036 }
1037
1038 msg->port_msg.type = _NXT_PORT_MSG_RPC_ERROR;
1039
1040 nxt_port_rpc_handler(task, msg);
1041 }
1042
1043
1044 static nxt_router_temp_conf_t *
nxt_router_temp_conf(nxt_task_t * task)1045 nxt_router_temp_conf(nxt_task_t *task)
1046 {
1047 nxt_mp_t *mp, *tmp;
1048 nxt_router_conf_t *rtcf;
1049 nxt_router_temp_conf_t *tmcf;
1050
1051 mp = nxt_mp_create(1024, 128, 256, 32);
1052 if (nxt_slow_path(mp == NULL)) {
1053 return NULL;
1054 }
1055
1056 rtcf = nxt_mp_zget(mp, sizeof(nxt_router_conf_t));
1057 if (nxt_slow_path(rtcf == NULL)) {
1058 goto fail;
1059 }
1060
1061 rtcf->mem_pool = mp;
1062
1063 rtcf->tstr_state = nxt_tstr_state_new(mp, 0);
1064 if (nxt_slow_path(rtcf->tstr_state == NULL)) {
1065 goto fail;
1066 }
1067
1068 #if (NXT_HAVE_NJS)
1069 nxt_http_register_js_proto(rtcf->tstr_state->jcf);
1070 #endif
1071
1072 tmp = nxt_mp_create(1024, 128, 256, 32);
1073 if (nxt_slow_path(tmp == NULL)) {
1074 goto fail;
1075 }
1076
1077 tmcf = nxt_mp_zget(tmp, sizeof(nxt_router_temp_conf_t));
1078 if (nxt_slow_path(tmcf == NULL)) {
1079 goto temp_fail;
1080 }
1081
1082 tmcf->mem_pool = tmp;
1083 tmcf->router_conf = rtcf;
1084 tmcf->count = 1;
1085 tmcf->engine = task->thread->engine;
1086
1087 tmcf->engines = nxt_array_create(tmcf->mem_pool, 4,
1088 sizeof(nxt_router_engine_conf_t));
1089 if (nxt_slow_path(tmcf->engines == NULL)) {
1090 goto temp_fail;
1091 }
1092
1093 nxt_queue_init(&creating_sockets);
1094 nxt_queue_init(&pending_sockets);
1095 nxt_queue_init(&updating_sockets);
1096 nxt_queue_init(&keeping_sockets);
1097 nxt_queue_init(&deleting_sockets);
1098
1099 #if (NXT_TLS)
1100 nxt_queue_init(&tmcf->tls);
1101 #endif
1102
1103 nxt_queue_init(&tmcf->apps);
1104 nxt_queue_init(&tmcf->previous);
1105
1106 return tmcf;
1107
1108 temp_fail:
1109
1110 nxt_mp_destroy(tmp);
1111
1112 fail:
1113
1114 if (rtcf->tstr_state != NULL) {
1115 nxt_tstr_state_release(rtcf->tstr_state);
1116 }
1117
1118 nxt_mp_destroy(mp);
1119
1120 return NULL;
1121 }
1122
1123
1124 nxt_inline nxt_bool_t
nxt_router_app_can_start(nxt_app_t * app)1125 nxt_router_app_can_start(nxt_app_t *app)
1126 {
1127 return app->processes + app->pending_processes < app->max_processes
1128 && app->pending_processes < app->max_pending_processes;
1129 }
1130
1131
1132 nxt_inline nxt_bool_t
nxt_router_app_need_start(nxt_app_t * app)1133 nxt_router_app_need_start(nxt_app_t *app)
1134 {
1135 return (app->active_requests
1136 > app->port_hash_count + app->pending_processes)
1137 || (app->spare_processes
1138 > app->idle_processes + app->pending_processes);
1139 }
1140
1141
1142 void
nxt_router_conf_apply(nxt_task_t * task,void * obj,void * data)1143 nxt_router_conf_apply(nxt_task_t *task, void *obj, void *data)
1144 {
1145 nxt_int_t ret;
1146 nxt_app_t *app;
1147 nxt_router_t *router;
1148 nxt_runtime_t *rt;
1149 nxt_queue_link_t *qlk;
1150 nxt_socket_conf_t *skcf;
1151 nxt_router_conf_t *rtcf;
1152 nxt_router_temp_conf_t *tmcf;
1153 const nxt_event_interface_t *interface;
1154 #if (NXT_TLS)
1155 nxt_router_tlssock_t *tls;
1156 #endif
1157
1158 tmcf = obj;
1159
1160 qlk = nxt_queue_first(&pending_sockets);
1161
1162 if (qlk != nxt_queue_tail(&pending_sockets)) {
1163 nxt_queue_remove(qlk);
1164 nxt_queue_insert_tail(&creating_sockets, qlk);
1165
1166 skcf = nxt_queue_link_data(qlk, nxt_socket_conf_t, link);
1167
1168 nxt_router_listen_socket_rpc_create(task, tmcf, skcf);
1169
1170 return;
1171 }
1172
1173 #if (NXT_TLS)
1174 qlk = nxt_queue_last(&tmcf->tls);
1175
1176 if (qlk != nxt_queue_head(&tmcf->tls)) {
1177 nxt_queue_remove(qlk);
1178
1179 tls = nxt_queue_link_data(qlk, nxt_router_tlssock_t, link);
1180
1181 nxt_cert_store_get(task, &tls->name, tmcf->mem_pool,
1182 nxt_router_tls_rpc_handler, tls);
1183 return;
1184 }
1185 #endif
1186
1187 nxt_queue_each(app, &tmcf->apps, nxt_app_t, link) {
1188
1189 if (nxt_router_app_need_start(app)) {
1190 nxt_router_app_rpc_create(task, tmcf, app);
1191 return;
1192 }
1193
1194 } nxt_queue_loop;
1195
1196 rtcf = tmcf->router_conf;
1197
1198 if (rtcf->access_log != NULL && rtcf->access_log->fd == -1) {
1199 nxt_router_access_log_open(task, tmcf);
1200 return;
1201 }
1202
1203 rt = task->thread->runtime;
1204
1205 interface = nxt_service_get(rt->services, "engine", NULL);
1206
1207 router = rtcf->router;
1208
1209 ret = nxt_router_engines_create(task, router, tmcf, interface);
1210 if (nxt_slow_path(ret != NXT_OK)) {
1211 goto fail;
1212 }
1213
1214 ret = nxt_router_threads_create(task, rt, tmcf);
1215 if (nxt_slow_path(ret != NXT_OK)) {
1216 goto fail;
1217 }
1218
1219 nxt_router_apps_sort(task, router, tmcf);
1220
1221 nxt_router_apps_hash_use(task, rtcf, 1);
1222
1223 nxt_router_engines_post(router, tmcf);
1224
1225 nxt_queue_add(&router->sockets, &updating_sockets);
1226 nxt_queue_add(&router->sockets, &creating_sockets);
1227
1228 if (router->access_log != rtcf->access_log) {
1229 nxt_router_access_log_use(&router->lock, rtcf->access_log);
1230
1231 nxt_router_access_log_release(task, &router->lock, router->access_log);
1232
1233 router->access_log = rtcf->access_log;
1234 }
1235
1236 nxt_router_conf_ready(task, tmcf);
1237
1238 return;
1239
1240 fail:
1241
1242 nxt_router_conf_error(task, tmcf);
1243
1244 return;
1245 }
1246
1247
1248 static void
nxt_router_conf_wait(nxt_task_t * task,void * obj,void * data)1249 nxt_router_conf_wait(nxt_task_t *task, void *obj, void *data)
1250 {
1251 nxt_joint_job_t *job;
1252
1253 job = obj;
1254
1255 nxt_router_conf_ready(task, job->tmcf);
1256 }
1257
1258
1259 static void
nxt_router_conf_ready(nxt_task_t * task,nxt_router_temp_conf_t * tmcf)1260 nxt_router_conf_ready(nxt_task_t *task, nxt_router_temp_conf_t *tmcf)
1261 {
1262 uint32_t count;
1263 nxt_router_conf_t *rtcf;
1264 nxt_thread_spinlock_t *lock;
1265
1266 nxt_debug(task, "temp conf %p count: %D", tmcf, tmcf->count);
1267
1268 if (--tmcf->count > 0) {
1269 return;
1270 }
1271
1272 nxt_router_conf_send(task, tmcf, NXT_PORT_MSG_RPC_READY_LAST);
1273
1274 rtcf = tmcf->router_conf;
1275
1276 lock = &rtcf->router->lock;
1277
1278 nxt_thread_spin_lock(lock);
1279
1280 count = rtcf->count;
1281
1282 nxt_thread_spin_unlock(lock);
1283
1284 nxt_debug(task, "rtcf %p: %D", rtcf, count);
1285
1286 if (count == 0) {
1287 nxt_router_apps_hash_use(task, rtcf, -1);
1288
1289 nxt_router_access_log_release(task, lock, rtcf->access_log);
1290
1291 nxt_mp_destroy(rtcf->mem_pool);
1292 }
1293
1294 nxt_mp_release(tmcf->mem_pool);
1295 }
1296
1297
1298 void
nxt_router_conf_error(nxt_task_t * task,nxt_router_temp_conf_t * tmcf)1299 nxt_router_conf_error(nxt_task_t *task, nxt_router_temp_conf_t *tmcf)
1300 {
1301 nxt_app_t *app;
1302 nxt_socket_t s;
1303 nxt_router_t *router;
1304 nxt_queue_link_t *qlk;
1305 nxt_socket_conf_t *skcf;
1306 nxt_router_conf_t *rtcf;
1307
1308 nxt_alert(task, "failed to apply new conf");
1309
1310 for (qlk = nxt_queue_first(&creating_sockets);
1311 qlk != nxt_queue_tail(&creating_sockets);
1312 qlk = nxt_queue_next(qlk))
1313 {
1314 skcf = nxt_queue_link_data(qlk, nxt_socket_conf_t, link);
1315 s = skcf->listen->socket;
1316
1317 if (s != -1) {
1318 nxt_socket_close(task, s);
1319 }
1320
1321 nxt_free(skcf->listen);
1322 }
1323
1324 rtcf = tmcf->router_conf;
1325
1326 nxt_queue_each(app, &tmcf->apps, nxt_app_t, link) {
1327
1328 nxt_router_app_unlink(task, app);
1329
1330 } nxt_queue_loop;
1331
1332 router = rtcf->router;
1333
1334 nxt_queue_add(&router->sockets, &keeping_sockets);
1335 nxt_queue_add(&router->sockets, &deleting_sockets);
1336
1337 nxt_queue_add(&router->apps, &tmcf->previous);
1338
1339 // TODO: new engines and threads
1340
1341 nxt_router_access_log_release(task, &router->lock, rtcf->access_log);
1342
1343 nxt_mp_destroy(rtcf->mem_pool);
1344
1345 nxt_router_conf_send(task, tmcf, NXT_PORT_MSG_RPC_ERROR);
1346
1347 nxt_mp_release(tmcf->mem_pool);
1348 }
1349
1350
1351 static void
nxt_router_conf_send(nxt_task_t * task,nxt_router_temp_conf_t * tmcf,nxt_port_msg_type_t type)1352 nxt_router_conf_send(nxt_task_t *task, nxt_router_temp_conf_t *tmcf,
1353 nxt_port_msg_type_t type)
1354 {
1355 nxt_port_socket_write(task, tmcf->port, type, -1, tmcf->stream, 0, NULL);
1356
1357 nxt_port_use(task, tmcf->port, -1);
1358
1359 tmcf->port = NULL;
1360 }
1361
1362
1363 static nxt_conf_map_t nxt_router_conf[] = {
1364 {
1365 nxt_string("listeners_threads"),
1366 NXT_CONF_MAP_INT32,
1367 offsetof(nxt_router_conf_t, threads),
1368 },
1369 };
1370
1371
1372 static nxt_conf_map_t nxt_router_app_conf[] = {
1373 {
1374 nxt_string("type"),
1375 NXT_CONF_MAP_STR,
1376 offsetof(nxt_router_app_conf_t, type),
1377 },
1378
1379 {
1380 nxt_string("limits"),
1381 NXT_CONF_MAP_PTR,
1382 offsetof(nxt_router_app_conf_t, limits_value),
1383 },
1384
1385 {
1386 nxt_string("processes"),
1387 NXT_CONF_MAP_INT32,
1388 offsetof(nxt_router_app_conf_t, processes),
1389 },
1390
1391 {
1392 nxt_string("processes"),
1393 NXT_CONF_MAP_PTR,
1394 offsetof(nxt_router_app_conf_t, processes_value),
1395 },
1396
1397 {
1398 nxt_string("targets"),
1399 NXT_CONF_MAP_PTR,
1400 offsetof(nxt_router_app_conf_t, targets_value),
1401 },
1402 };
1403
1404
1405 static nxt_conf_map_t nxt_router_app_limits_conf[] = {
1406 {
1407 nxt_string("timeout"),
1408 NXT_CONF_MAP_MSEC,
1409 offsetof(nxt_router_app_conf_t, timeout),
1410 },
1411 };
1412
1413
1414 static nxt_conf_map_t nxt_router_app_processes_conf[] = {
1415 {
1416 nxt_string("spare"),
1417 NXT_CONF_MAP_INT32,
1418 offsetof(nxt_router_app_conf_t, spare_processes),
1419 },
1420
1421 {
1422 nxt_string("max"),
1423 NXT_CONF_MAP_INT32,
1424 offsetof(nxt_router_app_conf_t, max_processes),
1425 },
1426
1427 {
1428 nxt_string("idle_timeout"),
1429 NXT_CONF_MAP_MSEC,
1430 offsetof(nxt_router_app_conf_t, idle_timeout),
1431 },
1432 };
1433
1434
1435 static nxt_conf_map_t nxt_router_listener_conf[] = {
1436 {
1437 nxt_string("pass"),
1438 NXT_CONF_MAP_STR_COPY,
1439 offsetof(nxt_router_listener_conf_t, pass),
1440 },
1441
1442 {
1443 nxt_string("application"),
1444 NXT_CONF_MAP_STR_COPY,
1445 offsetof(nxt_router_listener_conf_t, application),
1446 },
1447 };
1448
1449
1450 static nxt_conf_map_t nxt_router_http_conf[] = {
1451 {
1452 nxt_string("header_buffer_size"),
1453 NXT_CONF_MAP_SIZE,
1454 offsetof(nxt_socket_conf_t, header_buffer_size),
1455 },
1456
1457 {
1458 nxt_string("large_header_buffer_size"),
1459 NXT_CONF_MAP_SIZE,
1460 offsetof(nxt_socket_conf_t, large_header_buffer_size),
1461 },
1462
1463 {
1464 nxt_string("large_header_buffers"),
1465 NXT_CONF_MAP_SIZE,
1466 offsetof(nxt_socket_conf_t, large_header_buffers),
1467 },
1468
1469 {
1470 nxt_string("body_buffer_size"),
1471 NXT_CONF_MAP_SIZE,
1472 offsetof(nxt_socket_conf_t, body_buffer_size),
1473 },
1474
1475 {
1476 nxt_string("max_body_size"),
1477 NXT_CONF_MAP_SIZE,
1478 offsetof(nxt_socket_conf_t, max_body_size),
1479 },
1480
1481 {
1482 nxt_string("idle_timeout"),
1483 NXT_CONF_MAP_MSEC,
1484 offsetof(nxt_socket_conf_t, idle_timeout),
1485 },
1486
1487 {
1488 nxt_string("header_read_timeout"),
1489 NXT_CONF_MAP_MSEC,
1490 offsetof(nxt_socket_conf_t, header_read_timeout),
1491 },
1492
1493 {
1494 nxt_string("body_read_timeout"),
1495 NXT_CONF_MAP_MSEC,
1496 offsetof(nxt_socket_conf_t, body_read_timeout),
1497 },
1498
1499 {
1500 nxt_string("send_timeout"),
1501 NXT_CONF_MAP_MSEC,
1502 offsetof(nxt_socket_conf_t, send_timeout),
1503 },
1504
1505 {
1506 nxt_string("body_temp_path"),
1507 NXT_CONF_MAP_STR,
1508 offsetof(nxt_socket_conf_t, body_temp_path),
1509 },
1510
1511 {
1512 nxt_string("discard_unsafe_fields"),
1513 NXT_CONF_MAP_INT8,
1514 offsetof(nxt_socket_conf_t, discard_unsafe_fields),
1515 },
1516
1517 {
1518 nxt_string("log_route"),
1519 NXT_CONF_MAP_INT8,
1520 offsetof(nxt_socket_conf_t, log_route),
1521 },
1522 };
1523
1524
1525 static nxt_conf_map_t nxt_router_websocket_conf[] = {
1526 {
1527 nxt_string("max_frame_size"),
1528 NXT_CONF_MAP_SIZE,
1529 offsetof(nxt_websocket_conf_t, max_frame_size),
1530 },
1531
1532 {
1533 nxt_string("read_timeout"),
1534 NXT_CONF_MAP_MSEC,
1535 offsetof(nxt_websocket_conf_t, read_timeout),
1536 },
1537
1538 {
1539 nxt_string("keepalive_interval"),
1540 NXT_CONF_MAP_MSEC,
1541 offsetof(nxt_websocket_conf_t, keepalive_interval),
1542 },
1543
1544 };
1545
1546
1547 static nxt_int_t
nxt_router_conf_create(nxt_task_t * task,nxt_router_temp_conf_t * tmcf,u_char * start,u_char * end)1548 nxt_router_conf_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf,
1549 u_char *start, u_char *end)
1550 {
1551 u_char *p;
1552 size_t size;
1553 nxt_mp_t *mp, *app_mp;
1554 uint32_t next, next_target;
1555 nxt_int_t ret;
1556 nxt_str_t name, target;
1557 nxt_app_t *app, *prev;
1558 nxt_str_t *t, *s, *targets;
1559 nxt_uint_t n, i;
1560 nxt_port_t *port;
1561 nxt_router_t *router;
1562 nxt_app_joint_t *app_joint;
1563 #if (NXT_TLS)
1564 nxt_tls_init_t *tls_init;
1565 nxt_conf_value_t *certificate;
1566 #endif
1567 nxt_conf_value_t *root, *conf, *http, *value, *websocket;
1568 nxt_conf_value_t *applications, *application;
1569 nxt_conf_value_t *listeners, *listener;
1570 nxt_socket_conf_t *skcf;
1571 nxt_router_conf_t *rtcf;
1572 nxt_http_routes_t *routes;
1573 nxt_event_engine_t *engine;
1574 nxt_app_lang_module_t *lang;
1575 nxt_router_app_conf_t apcf;
1576 nxt_router_listener_conf_t lscf;
1577
1578 static nxt_str_t http_path = nxt_string("/settings/http");
1579 static nxt_str_t applications_path = nxt_string("/applications");
1580 static nxt_str_t listeners_path = nxt_string("/listeners");
1581 static nxt_str_t routes_path = nxt_string("/routes");
1582 static nxt_str_t access_log_path = nxt_string("/access_log");
1583 #if (NXT_TLS)
1584 static nxt_str_t certificate_path = nxt_string("/tls/certificate");
1585 static nxt_str_t conf_commands_path = nxt_string("/tls/conf_commands");
1586 static nxt_str_t conf_cache_path = nxt_string("/tls/session/cache_size");
1587 static nxt_str_t conf_timeout_path = nxt_string("/tls/session/timeout");
1588 static nxt_str_t conf_tickets = nxt_string("/tls/session/tickets");
1589 #endif
1590 static nxt_str_t static_path = nxt_string("/settings/http/static");
1591 static nxt_str_t websocket_path = nxt_string("/settings/http/websocket");
1592 static nxt_str_t forwarded_path = nxt_string("/forwarded");
1593 static nxt_str_t client_ip_path = nxt_string("/client_ip");
1594
1595 root = nxt_conf_json_parse(tmcf->mem_pool, start, end, NULL);
1596 if (root == NULL) {
1597 nxt_alert(task, "configuration parsing error");
1598 return NXT_ERROR;
1599 }
1600
1601 rtcf = tmcf->router_conf;
1602 mp = rtcf->mem_pool;
1603
1604 ret = nxt_conf_map_object(mp, root, nxt_router_conf,
1605 nxt_nitems(nxt_router_conf), rtcf);
1606 if (ret != NXT_OK) {
1607 nxt_alert(task, "root map error");
1608 return NXT_ERROR;
1609 }
1610
1611 if (rtcf->threads == 0) {
1612 rtcf->threads = nxt_ncpu;
1613 }
1614
1615 conf = nxt_conf_get_path(root, &static_path);
1616
1617 ret = nxt_router_conf_process_static(task, rtcf, conf);
1618 if (nxt_slow_path(ret != NXT_OK)) {
1619 return NXT_ERROR;
1620 }
1621
1622 router = rtcf->router;
1623
1624 applications = nxt_conf_get_path(root, &applications_path);
1625
1626 if (applications != NULL) {
1627 next = 0;
1628
1629 for ( ;; ) {
1630 application = nxt_conf_next_object_member(applications,
1631 &name, &next);
1632 if (application == NULL) {
1633 break;
1634 }
1635
1636 nxt_debug(task, "application \"%V\"", &name);
1637
1638 size = nxt_conf_json_length(application, NULL);
1639
1640 app_mp = nxt_mp_create(4096, 128, 1024, 64);
1641 if (nxt_slow_path(app_mp == NULL)) {
1642 goto fail;
1643 }
1644
1645 app = nxt_mp_get(app_mp, sizeof(nxt_app_t) + name.length + size);
1646 if (app == NULL) {
1647 goto app_fail;
1648 }
1649
1650 nxt_memzero(app, sizeof(nxt_app_t));
1651
1652 app->mem_pool = app_mp;
1653
1654 app->name.start = nxt_pointer_to(app, sizeof(nxt_app_t));
1655 app->conf.start = nxt_pointer_to(app, sizeof(nxt_app_t)
1656 + name.length);
1657
1658 p = nxt_conf_json_print(app->conf.start, application, NULL);
1659 app->conf.length = p - app->conf.start;
1660
1661 nxt_assert(app->conf.length <= size);
1662
1663 nxt_debug(task, "application conf \"%V\"", &app->conf);
1664
1665 prev = nxt_router_app_find(&router->apps, &name);
1666
1667 if (prev != NULL && nxt_strstr_eq(&app->conf, &prev->conf)) {
1668 nxt_mp_destroy(app_mp);
1669
1670 nxt_queue_remove(&prev->link);
1671 nxt_queue_insert_tail(&tmcf->previous, &prev->link);
1672
1673 ret = nxt_router_apps_hash_add(rtcf, prev);
1674 if (nxt_slow_path(ret != NXT_OK)) {
1675 goto fail;
1676 }
1677
1678 continue;
1679 }
1680
1681 apcf.processes = 1;
1682 apcf.max_processes = 1;
1683 apcf.spare_processes = 0;
1684 apcf.timeout = 0;
1685 apcf.idle_timeout = 15000;
1686 apcf.limits_value = NULL;
1687 apcf.processes_value = NULL;
1688 apcf.targets_value = NULL;
1689
1690 app_joint = nxt_malloc(sizeof(nxt_app_joint_t));
1691 if (nxt_slow_path(app_joint == NULL)) {
1692 goto app_fail;
1693 }
1694
1695 nxt_memzero(app_joint, sizeof(nxt_app_joint_t));
1696
1697 ret = nxt_conf_map_object(mp, application, nxt_router_app_conf,
1698 nxt_nitems(nxt_router_app_conf), &apcf);
1699 if (ret != NXT_OK) {
1700 nxt_alert(task, "application map error");
1701 goto app_fail;
1702 }
1703
1704 if (apcf.limits_value != NULL) {
1705
1706 if (nxt_conf_type(apcf.limits_value) != NXT_CONF_OBJECT) {
1707 nxt_alert(task, "application limits is not object");
1708 goto app_fail;
1709 }
1710
1711 ret = nxt_conf_map_object(mp, apcf.limits_value,
1712 nxt_router_app_limits_conf,
1713 nxt_nitems(nxt_router_app_limits_conf),
1714 &apcf);
1715 if (ret != NXT_OK) {
1716 nxt_alert(task, "application limits map error");
1717 goto app_fail;
1718 }
1719 }
1720
1721 if (apcf.processes_value != NULL
1722 && nxt_conf_type(apcf.processes_value) == NXT_CONF_OBJECT)
1723 {
1724 ret = nxt_conf_map_object(mp, apcf.processes_value,
1725 nxt_router_app_processes_conf,
1726 nxt_nitems(nxt_router_app_processes_conf),
1727 &apcf);
1728 if (ret != NXT_OK) {
1729 nxt_alert(task, "application processes map error");
1730 goto app_fail;
1731 }
1732
1733 } else {
1734 apcf.max_processes = apcf.processes;
1735 apcf.spare_processes = apcf.processes;
1736 }
1737
1738 if (apcf.targets_value != NULL) {
1739 n = nxt_conf_object_members_count(apcf.targets_value);
1740
1741 targets = nxt_mp_get(app_mp, sizeof(nxt_str_t) * n);
1742 if (nxt_slow_path(targets == NULL)) {
1743 goto app_fail;
1744 }
1745
1746 next_target = 0;
1747
1748 for (i = 0; i < n; i++) {
1749 (void) nxt_conf_next_object_member(apcf.targets_value,
1750 &target, &next_target);
1751
1752 s = nxt_str_dup(app_mp, &targets[i], &target);
1753 if (nxt_slow_path(s == NULL)) {
1754 goto app_fail;
1755 }
1756 }
1757
1758 } else {
1759 targets = NULL;
1760 }
1761
1762 nxt_debug(task, "application type: %V", &apcf.type);
1763 nxt_debug(task, "application processes: %D", apcf.processes);
1764 nxt_debug(task, "application request timeout: %M", apcf.timeout);
1765
1766 lang = nxt_app_lang_module(task->thread->runtime, &apcf.type);
1767
1768 if (lang == NULL) {
1769 nxt_alert(task, "unknown application type: \"%V\"", &apcf.type);
1770 goto app_fail;
1771 }
1772
1773 nxt_debug(task, "application language module: \"%s\"", lang->file);
1774
1775 ret = nxt_thread_mutex_create(&app->mutex);
1776 if (ret != NXT_OK) {
1777 goto app_fail;
1778 }
1779
1780 nxt_queue_init(&app->ports);
1781 nxt_queue_init(&app->spare_ports);
1782 nxt_queue_init(&app->idle_ports);
1783 nxt_queue_init(&app->ack_waiting_req);
1784
1785 app->name.length = name.length;
1786 nxt_memcpy(app->name.start, name.start, name.length);
1787
1788 app->type = lang->type;
1789 app->max_processes = apcf.max_processes;
1790 app->spare_processes = apcf.spare_processes;
1791 app->max_pending_processes = apcf.spare_processes
1792 ? apcf.spare_processes : 1;
1793 app->timeout = apcf.timeout;
1794 app->idle_timeout = apcf.idle_timeout;
1795
1796 app->targets = targets;
1797
1798 engine = task->thread->engine;
1799
1800 app->engine = engine;
1801
1802 app->adjust_idle_work.handler = nxt_router_adjust_idle_timer;
1803 app->adjust_idle_work.task = &engine->task;
1804 app->adjust_idle_work.obj = app;
1805
1806 nxt_queue_insert_tail(&tmcf->apps, &app->link);
1807
1808 ret = nxt_router_apps_hash_add(rtcf, app);
1809 if (nxt_slow_path(ret != NXT_OK)) {
1810 goto app_fail;
1811 }
1812
1813 nxt_router_app_use(task, app, 1);
1814
1815 app->joint = app_joint;
1816
1817 app_joint->use_count = 1;
1818 app_joint->app = app;
1819
1820 app_joint->idle_timer.bias = NXT_TIMER_DEFAULT_BIAS;
1821 app_joint->idle_timer.work_queue = &engine->fast_work_queue;
1822 app_joint->idle_timer.handler = nxt_router_app_idle_timeout;
1823 app_joint->idle_timer.task = &engine->task;
1824 app_joint->idle_timer.log = app_joint->idle_timer.task->log;
1825
1826 app_joint->free_app_work.handler = nxt_router_free_app;
1827 app_joint->free_app_work.task = &engine->task;
1828 app_joint->free_app_work.obj = app_joint;
1829
1830 port = nxt_port_new(task, NXT_SHARED_PORT_ID, nxt_pid,
1831 NXT_PROCESS_APP);
1832 if (nxt_slow_path(port == NULL)) {
1833 return NXT_ERROR;
1834 }
1835
1836 ret = nxt_port_socket_init(task, port, 0);
1837 if (nxt_slow_path(ret != NXT_OK)) {
1838 nxt_port_use(task, port, -1);
1839 return NXT_ERROR;
1840 }
1841
1842 ret = nxt_router_app_queue_init(task, port);
1843 if (nxt_slow_path(ret != NXT_OK)) {
1844 nxt_port_write_close(port);
1845 nxt_port_read_close(port);
1846 nxt_port_use(task, port, -1);
1847 return NXT_ERROR;
1848 }
1849
1850 nxt_port_write_enable(task, port);
1851 port->app = app;
1852
1853 app->shared_port = port;
1854
1855 nxt_thread_mutex_create(&app->outgoing.mutex);
1856 }
1857 }
1858
1859 conf = nxt_conf_get_path(root, &routes_path);
1860 if (nxt_fast_path(conf != NULL)) {
1861 routes = nxt_http_routes_create(task, tmcf, conf);
1862 if (nxt_slow_path(routes == NULL)) {
1863 return NXT_ERROR;
1864 }
1865
1866 rtcf->routes = routes;
1867 }
1868
1869 ret = nxt_upstreams_create(task, tmcf, root);
1870 if (nxt_slow_path(ret != NXT_OK)) {
1871 return ret;
1872 }
1873
1874 http = nxt_conf_get_path(root, &http_path);
1875 #if 0
1876 if (http == NULL) {
1877 nxt_alert(task, "no \"http\" block");
1878 return NXT_ERROR;
1879 }
1880 #endif
1881
1882 websocket = nxt_conf_get_path(root, &websocket_path);
1883
1884 listeners = nxt_conf_get_path(root, &listeners_path);
1885
1886 if (listeners != NULL) {
1887 next = 0;
1888
1889 for ( ;; ) {
1890 listener = nxt_conf_next_object_member(listeners, &name, &next);
1891 if (listener == NULL) {
1892 break;
1893 }
1894
1895 skcf = nxt_router_socket_conf(task, tmcf, &name);
1896 if (skcf == NULL) {
1897 goto fail;
1898 }
1899
1900 nxt_memzero(&lscf, sizeof(lscf));
1901
1902 ret = nxt_conf_map_object(mp, listener, nxt_router_listener_conf,
1903 nxt_nitems(nxt_router_listener_conf),
1904 &lscf);
1905 if (ret != NXT_OK) {
1906 nxt_alert(task, "listener map error");
1907 goto fail;
1908 }
1909
1910 nxt_debug(task, "application: %V", &lscf.application);
1911
1912 // STUB, default values if http block is not defined.
1913 skcf->header_buffer_size = 2048;
1914 skcf->large_header_buffer_size = 8192;
1915 skcf->large_header_buffers = 4;
1916 skcf->discard_unsafe_fields = 1;
1917 skcf->body_buffer_size = 16 * 1024;
1918 skcf->max_body_size = 8 * 1024 * 1024;
1919 skcf->proxy_header_buffer_size = 64 * 1024;
1920 skcf->proxy_buffer_size = 4096;
1921 skcf->proxy_buffers = 256;
1922 skcf->idle_timeout = 180 * 1000;
1923 skcf->header_read_timeout = 30 * 1000;
1924 skcf->body_read_timeout = 30 * 1000;
1925 skcf->send_timeout = 30 * 1000;
1926 skcf->proxy_timeout = 60 * 1000;
1927 skcf->proxy_send_timeout = 30 * 1000;
1928 skcf->proxy_read_timeout = 30 * 1000;
1929
1930 skcf->websocket_conf.max_frame_size = 1024 * 1024;
1931 skcf->websocket_conf.read_timeout = 60 * 1000;
1932 skcf->websocket_conf.keepalive_interval = 30 * 1000;
1933
1934 nxt_str_null(&skcf->body_temp_path);
1935
1936 if (http != NULL) {
1937 ret = nxt_conf_map_object(mp, http, nxt_router_http_conf,
1938 nxt_nitems(nxt_router_http_conf),
1939 skcf);
1940 if (ret != NXT_OK) {
1941 nxt_alert(task, "http map error");
1942 goto fail;
1943 }
1944 }
1945
1946 if (websocket != NULL) {
1947 ret = nxt_conf_map_object(mp, websocket,
1948 nxt_router_websocket_conf,
1949 nxt_nitems(nxt_router_websocket_conf),
1950 &skcf->websocket_conf);
1951 if (ret != NXT_OK) {
1952 nxt_alert(task, "websocket map error");
1953 goto fail;
1954 }
1955 }
1956
1957 t = &skcf->body_temp_path;
1958
1959 if (t->length == 0) {
1960 t->start = (u_char *) task->thread->runtime->tmp;
1961 t->length = nxt_strlen(t->start);
1962 }
1963
1964 conf = nxt_conf_get_path(listener, &forwarded_path);
1965
1966 if (conf != NULL) {
1967 skcf->forwarded = nxt_router_conf_forward(task, mp, conf);
1968 if (nxt_slow_path(skcf->forwarded == NULL)) {
1969 return NXT_ERROR;
1970 }
1971 }
1972
1973 conf = nxt_conf_get_path(listener, &client_ip_path);
1974
1975 if (conf != NULL) {
1976 skcf->client_ip = nxt_router_conf_forward(task, mp, conf);
1977 if (nxt_slow_path(skcf->client_ip == NULL)) {
1978 return NXT_ERROR;
1979 }
1980 }
1981
1982 #if (NXT_TLS)
1983 certificate = nxt_conf_get_path(listener, &certificate_path);
1984
1985 if (certificate != NULL) {
1986 tls_init = nxt_mp_get(tmcf->mem_pool, sizeof(nxt_tls_init_t));
1987 if (nxt_slow_path(tls_init == NULL)) {
1988 return NXT_ERROR;
1989 }
1990
1991 tls_init->cache_size = 0;
1992 tls_init->timeout = 300;
1993
1994 value = nxt_conf_get_path(listener, &conf_cache_path);
1995 if (value != NULL) {
1996 tls_init->cache_size = nxt_conf_get_number(value);
1997 }
1998
1999 value = nxt_conf_get_path(listener, &conf_timeout_path);
2000 if (value != NULL) {
2001 tls_init->timeout = nxt_conf_get_number(value);
2002 }
2003
2004 tls_init->conf_cmds = nxt_conf_get_path(listener,
2005 &conf_commands_path);
2006
2007 tls_init->tickets_conf = nxt_conf_get_path(listener,
2008 &conf_tickets);
2009
2010 n = nxt_conf_array_elements_count_or_1(certificate);
2011
2012 for (i = 0; i < n; i++) {
2013 value = nxt_conf_get_array_element_or_itself(certificate,
2014 i);
2015 nxt_assert(value != NULL);
2016
2017 ret = nxt_router_conf_tls_insert(tmcf, value, skcf,
2018 tls_init, i == 0);
2019 if (nxt_slow_path(ret != NXT_OK)) {
2020 goto fail;
2021 }
2022 }
2023 }
2024 #endif
2025
2026 skcf->listen->handler = nxt_http_conn_init;
2027 skcf->router_conf = rtcf;
2028 skcf->router_conf->count++;
2029
2030 if (lscf.pass.length != 0) {
2031 skcf->action = nxt_http_action_create(task, tmcf, &lscf.pass);
2032
2033 /* COMPATIBILITY: listener application. */
2034 } else if (lscf.application.length > 0) {
2035 skcf->action = nxt_http_pass_application(task, rtcf,
2036 &lscf.application);
2037 }
2038
2039 if (nxt_slow_path(skcf->action == NULL)) {
2040 goto fail;
2041 }
2042 }
2043 }
2044
2045 ret = nxt_http_routes_resolve(task, tmcf);
2046 if (nxt_slow_path(ret != NXT_OK)) {
2047 goto fail;
2048 }
2049
2050 value = nxt_conf_get_path(root, &access_log_path);
2051
2052 if (value != NULL) {
2053 ret = nxt_router_access_log_create(task, rtcf, value);
2054 if (nxt_slow_path(ret != NXT_OK)) {
2055 goto fail;
2056 }
2057 }
2058
2059 ret = nxt_tstr_state_done(rtcf->tstr_state, NULL);
2060 if (nxt_slow_path(ret != NXT_OK)) {
2061 goto fail;
2062 }
2063
2064 nxt_queue_add(&deleting_sockets, &router->sockets);
2065 nxt_queue_init(&router->sockets);
2066
2067 return NXT_OK;
2068
2069 app_fail:
2070
2071 nxt_mp_destroy(app_mp);
2072
2073 fail:
2074
2075 nxt_queue_each(app, &tmcf->apps, nxt_app_t, link) {
2076
2077 nxt_queue_remove(&app->link);
2078 nxt_thread_mutex_destroy(&app->mutex);
2079 nxt_mp_destroy(app->mem_pool);
2080
2081 } nxt_queue_loop;
2082
2083 return NXT_ERROR;
2084 }
2085
2086
2087 #if (NXT_TLS)
2088
2089 static nxt_int_t
nxt_router_conf_tls_insert(nxt_router_temp_conf_t * tmcf,nxt_conf_value_t * value,nxt_socket_conf_t * skcf,nxt_tls_init_t * tls_init,nxt_bool_t last)2090 nxt_router_conf_tls_insert(nxt_router_temp_conf_t *tmcf,
2091 nxt_conf_value_t *value, nxt_socket_conf_t *skcf,
2092 nxt_tls_init_t *tls_init, nxt_bool_t last)
2093 {
2094 nxt_router_tlssock_t *tls;
2095
2096 tls = nxt_mp_get(tmcf->mem_pool, sizeof(nxt_router_tlssock_t));
2097 if (nxt_slow_path(tls == NULL)) {
2098 return NXT_ERROR;
2099 }
2100
2101 tls->tls_init = tls_init;
2102 tls->socket_conf = skcf;
2103 tls->temp_conf = tmcf;
2104 tls->last = last;
2105 nxt_conf_get_string(value, &tls->name);
2106
2107 nxt_queue_insert_tail(&tmcf->tls, &tls->link);
2108
2109 return NXT_OK;
2110 }
2111
2112 #endif
2113
2114
2115 static nxt_int_t
nxt_router_conf_process_static(nxt_task_t * task,nxt_router_conf_t * rtcf,nxt_conf_value_t * conf)2116 nxt_router_conf_process_static(nxt_task_t *task, nxt_router_conf_t *rtcf,
2117 nxt_conf_value_t *conf)
2118 {
2119 uint32_t next, i;
2120 nxt_mp_t *mp;
2121 nxt_str_t *type, exten, str;
2122 nxt_int_t ret;
2123 nxt_uint_t exts;
2124 nxt_conf_value_t *mtypes_conf, *ext_conf, *value;
2125
2126 static nxt_str_t mtypes_path = nxt_string("/mime_types");
2127
2128 mp = rtcf->mem_pool;
2129
2130 ret = nxt_http_static_mtypes_init(mp, &rtcf->mtypes_hash);
2131 if (nxt_slow_path(ret != NXT_OK)) {
2132 return NXT_ERROR;
2133 }
2134
2135 if (conf == NULL) {
2136 return NXT_OK;
2137 }
2138
2139 mtypes_conf = nxt_conf_get_path(conf, &mtypes_path);
2140
2141 if (mtypes_conf != NULL) {
2142 next = 0;
2143
2144 for ( ;; ) {
2145 ext_conf = nxt_conf_next_object_member(mtypes_conf, &str, &next);
2146
2147 if (ext_conf == NULL) {
2148 break;
2149 }
2150
2151 type = nxt_str_dup(mp, NULL, &str);
2152 if (nxt_slow_path(type == NULL)) {
2153 return NXT_ERROR;
2154 }
2155
2156 if (nxt_conf_type(ext_conf) == NXT_CONF_STRING) {
2157 nxt_conf_get_string(ext_conf, &str);
2158
2159 if (nxt_slow_path(nxt_str_dup(mp, &exten, &str) == NULL)) {
2160 return NXT_ERROR;
2161 }
2162
2163 ret = nxt_http_static_mtypes_hash_add(mp, &rtcf->mtypes_hash,
2164 &exten, type);
2165 if (nxt_slow_path(ret != NXT_OK)) {
2166 return NXT_ERROR;
2167 }
2168
2169 continue;
2170 }
2171
2172 exts = nxt_conf_array_elements_count(ext_conf);
2173
2174 for (i = 0; i < exts; i++) {
2175 value = nxt_conf_get_array_element(ext_conf, i);
2176
2177 nxt_conf_get_string(value, &str);
2178
2179 if (nxt_slow_path(nxt_str_dup(mp, &exten, &str) == NULL)) {
2180 return NXT_ERROR;
2181 }
2182
2183 ret = nxt_http_static_mtypes_hash_add(mp, &rtcf->mtypes_hash,
2184 &exten, type);
2185 if (nxt_slow_path(ret != NXT_OK)) {
2186 return NXT_ERROR;
2187 }
2188 }
2189 }
2190 }
2191
2192 return NXT_OK;
2193 }
2194
2195
2196 static nxt_http_forward_t *
nxt_router_conf_forward(nxt_task_t * task,nxt_mp_t * mp,nxt_conf_value_t * conf)2197 nxt_router_conf_forward(nxt_task_t *task, nxt_mp_t *mp, nxt_conf_value_t *conf)
2198 {
2199 nxt_int_t ret;
2200 nxt_conf_value_t *header_conf, *client_ip_conf, *protocol_conf;
2201 nxt_conf_value_t *source_conf, *recursive_conf;
2202 nxt_http_forward_t *forward;
2203 nxt_http_route_addr_rule_t *source;
2204
2205 static nxt_str_t header_path = nxt_string("/header");
2206 static nxt_str_t client_ip_path = nxt_string("/client_ip");
2207 static nxt_str_t protocol_path = nxt_string("/protocol");
2208 static nxt_str_t source_path = nxt_string("/source");
2209 static nxt_str_t recursive_path = nxt_string("/recursive");
2210
2211 header_conf = nxt_conf_get_path(conf, &header_path);
2212
2213 if (header_conf != NULL) {
2214 client_ip_conf = nxt_conf_get_path(conf, &header_path);
2215 protocol_conf = NULL;
2216
2217 } else {
2218 client_ip_conf = nxt_conf_get_path(conf, &client_ip_path);
2219 protocol_conf = nxt_conf_get_path(conf, &protocol_path);
2220 }
2221
2222 source_conf = nxt_conf_get_path(conf, &source_path);
2223 recursive_conf = nxt_conf_get_path(conf, &recursive_path);
2224
2225 if (source_conf == NULL
2226 || (protocol_conf == NULL && client_ip_conf == NULL))
2227 {
2228 return NULL;
2229 }
2230
2231 forward = nxt_mp_zget(mp, sizeof(nxt_http_forward_t));
2232 if (nxt_slow_path(forward == NULL)) {
2233 return NULL;
2234 }
2235
2236 source = nxt_http_route_addr_rule_create(task, mp, source_conf);
2237 if (nxt_slow_path(source == NULL)) {
2238 return NULL;
2239 }
2240
2241 forward->source = source;
2242
2243 if (recursive_conf != NULL) {
2244 forward->recursive = nxt_conf_get_boolean(recursive_conf);
2245 }
2246
2247 if (client_ip_conf != NULL) {
2248 ret = nxt_router_conf_forward_header(mp, client_ip_conf,
2249 &forward->client_ip);
2250 if (nxt_slow_path(ret != NXT_OK)) {
2251 return NULL;
2252 }
2253 }
2254
2255 if (protocol_conf != NULL) {
2256 ret = nxt_router_conf_forward_header(mp, protocol_conf,
2257 &forward->protocol);
2258 if (nxt_slow_path(ret != NXT_OK)) {
2259 return NULL;
2260 }
2261 }
2262
2263 return forward;
2264 }
2265
2266
2267 static nxt_int_t
nxt_router_conf_forward_header(nxt_mp_t * mp,nxt_conf_value_t * conf,nxt_http_forward_header_t * fh)2268 nxt_router_conf_forward_header(nxt_mp_t *mp, nxt_conf_value_t *conf,
2269 nxt_http_forward_header_t *fh)
2270 {
2271 char c;
2272 size_t i;
2273 uint32_t hash;
2274 nxt_str_t header;
2275
2276 nxt_conf_get_string(conf, &header);
2277
2278 fh->header = nxt_str_dup(mp, NULL, &header);
2279 if (nxt_slow_path(fh->header == NULL)) {
2280 return NXT_ERROR;
2281 }
2282
2283 hash = NXT_HTTP_FIELD_HASH_INIT;
2284
2285 for (i = 0; i < fh->header->length; i++) {
2286 c = fh->header->start[i];
2287 hash = nxt_http_field_hash_char(hash, nxt_lowcase(c));
2288 }
2289
2290 hash = nxt_http_field_hash_end(hash) & 0xFFFF;
2291
2292 fh->header_hash = hash;
2293
2294 return NXT_OK;
2295 }
2296
2297
2298 static nxt_app_t *
nxt_router_app_find(nxt_queue_t * queue,nxt_str_t * name)2299 nxt_router_app_find(nxt_queue_t *queue, nxt_str_t *name)
2300 {
2301 nxt_app_t *app;
2302
2303 nxt_queue_each(app, queue, nxt_app_t, link) {
2304
2305 if (nxt_strstr_eq(name, &app->name)) {
2306 return app;
2307 }
2308
2309 } nxt_queue_loop;
2310
2311 return NULL;
2312 }
2313
2314
2315 static nxt_int_t
nxt_router_app_queue_init(nxt_task_t * task,nxt_port_t * port)2316 nxt_router_app_queue_init(nxt_task_t *task, nxt_port_t *port)
2317 {
2318 void *mem;
2319 nxt_int_t fd;
2320
2321 fd = nxt_shm_open(task, sizeof(nxt_app_queue_t));
2322 if (nxt_slow_path(fd == -1)) {
2323 return NXT_ERROR;
2324 }
2325
2326 mem = nxt_mem_mmap(NULL, sizeof(nxt_app_queue_t),
2327 PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
2328 if (nxt_slow_path(mem == MAP_FAILED)) {
2329 nxt_fd_close(fd);
2330
2331 return NXT_ERROR;
2332 }
2333
2334 nxt_app_queue_init(mem);
2335
2336 port->queue_fd = fd;
2337 port->queue = mem;
2338
2339 return NXT_OK;
2340 }
2341
2342
2343 static nxt_int_t
nxt_router_port_queue_init(nxt_task_t * task,nxt_port_t * port)2344 nxt_router_port_queue_init(nxt_task_t *task, nxt_port_t *port)
2345 {
2346 void *mem;
2347 nxt_int_t fd;
2348
2349 fd = nxt_shm_open(task, sizeof(nxt_port_queue_t));
2350 if (nxt_slow_path(fd == -1)) {
2351 return NXT_ERROR;
2352 }
2353
2354 mem = nxt_mem_mmap(NULL, sizeof(nxt_port_queue_t),
2355 PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
2356 if (nxt_slow_path(mem == MAP_FAILED)) {
2357 nxt_fd_close(fd);
2358
2359 return NXT_ERROR;
2360 }
2361
2362 nxt_port_queue_init(mem);
2363
2364 port->queue_fd = fd;
2365 port->queue = mem;
2366
2367 return NXT_OK;
2368 }
2369
2370
2371 static nxt_int_t
nxt_router_port_queue_map(nxt_task_t * task,nxt_port_t * port,nxt_fd_t fd)2372 nxt_router_port_queue_map(nxt_task_t *task, nxt_port_t *port, nxt_fd_t fd)
2373 {
2374 void *mem;
2375
2376 nxt_assert(fd != -1);
2377
2378 mem = nxt_mem_mmap(NULL, sizeof(nxt_port_queue_t),
2379 PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
2380 if (nxt_slow_path(mem == MAP_FAILED)) {
2381
2382 return NXT_ERROR;
2383 }
2384
2385 port->queue = mem;
2386
2387 return NXT_OK;
2388 }
2389
2390
2391 static const nxt_lvlhsh_proto_t nxt_router_apps_hash_proto nxt_aligned(64) = {
2392 NXT_LVLHSH_DEFAULT,
2393 nxt_router_apps_hash_test,
2394 nxt_mp_lvlhsh_alloc,
2395 nxt_mp_lvlhsh_free,
2396 };
2397
2398
2399 static nxt_int_t
nxt_router_apps_hash_test(nxt_lvlhsh_query_t * lhq,void * data)2400 nxt_router_apps_hash_test(nxt_lvlhsh_query_t *lhq, void *data)
2401 {
2402 nxt_app_t *app;
2403
2404 app = data;
2405
2406 return nxt_strstr_eq(&lhq->key, &app->name) ? NXT_OK : NXT_DECLINED;
2407 }
2408
2409
2410 static nxt_int_t
nxt_router_apps_hash_add(nxt_router_conf_t * rtcf,nxt_app_t * app)2411 nxt_router_apps_hash_add(nxt_router_conf_t *rtcf, nxt_app_t *app)
2412 {
2413 nxt_lvlhsh_query_t lhq;
2414
2415 lhq.key_hash = nxt_djb_hash(app->name.start, app->name.length);
2416 lhq.replace = 0;
2417 lhq.key = app->name;
2418 lhq.value = app;
2419 lhq.proto = &nxt_router_apps_hash_proto;
2420 lhq.pool = rtcf->mem_pool;
2421
2422 switch (nxt_lvlhsh_insert(&rtcf->apps_hash, &lhq)) {
2423
2424 case NXT_OK:
2425 return NXT_OK;
2426
2427 case NXT_DECLINED:
2428 nxt_thread_log_alert("router app hash adding failed: "
2429 "\"%V\" is already in hash", &lhq.key);
2430 /* Fall through. */
2431 default:
2432 return NXT_ERROR;
2433 }
2434 }
2435
2436
2437 static nxt_app_t *
nxt_router_apps_hash_get(nxt_router_conf_t * rtcf,nxt_str_t * name)2438 nxt_router_apps_hash_get(nxt_router_conf_t *rtcf, nxt_str_t *name)
2439 {
2440 nxt_lvlhsh_query_t lhq;
2441
2442 lhq.key_hash = nxt_djb_hash(name->start, name->length);
2443 lhq.key = *name;
2444 lhq.proto = &nxt_router_apps_hash_proto;
2445
2446 if (nxt_lvlhsh_find(&rtcf->apps_hash, &lhq) != NXT_OK) {
2447 return NULL;
2448 }
2449
2450 return lhq.value;
2451 }
2452
2453
2454 static void
nxt_router_apps_hash_use(nxt_task_t * task,nxt_router_conf_t * rtcf,int i)2455 nxt_router_apps_hash_use(nxt_task_t *task, nxt_router_conf_t *rtcf, int i)
2456 {
2457 nxt_app_t *app;
2458 nxt_lvlhsh_each_t lhe;
2459
2460 nxt_lvlhsh_each_init(&lhe, &nxt_router_apps_hash_proto);
2461
2462 for ( ;; ) {
2463 app = nxt_lvlhsh_each(&rtcf->apps_hash, &lhe);
2464
2465 if (app == NULL) {
2466 break;
2467 }
2468
2469 nxt_router_app_use(task, app, i);
2470 }
2471 }
2472
2473
2474 typedef struct {
2475 nxt_app_t *app;
2476 nxt_int_t target;
2477 } nxt_http_app_conf_t;
2478
2479
2480 nxt_int_t
nxt_router_application_init(nxt_router_conf_t * rtcf,nxt_str_t * name,nxt_str_t * target,nxt_http_action_t * action)2481 nxt_router_application_init(nxt_router_conf_t *rtcf, nxt_str_t *name,
2482 nxt_str_t *target, nxt_http_action_t *action)
2483 {
2484 nxt_app_t *app;
2485 nxt_str_t *targets;
2486 nxt_uint_t i;
2487 nxt_http_app_conf_t *conf;
2488
2489 app = nxt_router_apps_hash_get(rtcf, name);
2490 if (app == NULL) {
2491 return NXT_DECLINED;
2492 }
2493
2494 conf = nxt_mp_get(rtcf->mem_pool, sizeof(nxt_http_app_conf_t));
2495 if (nxt_slow_path(conf == NULL)) {
2496 return NXT_ERROR;
2497 }
2498
2499 action->handler = nxt_http_application_handler;
2500 action->u.conf = conf;
2501
2502 conf->app = app;
2503
2504 if (target != NULL && target->length != 0) {
2505 targets = app->targets;
2506
2507 for (i = 0; !nxt_strstr_eq(target, &targets[i]); i++);
2508
2509 conf->target = i;
2510
2511 } else {
2512 conf->target = 0;
2513 }
2514
2515 return NXT_OK;
2516 }
2517
2518
2519 static nxt_socket_conf_t *
nxt_router_socket_conf(nxt_task_t * task,nxt_router_temp_conf_t * tmcf,nxt_str_t * name)2520 nxt_router_socket_conf(nxt_task_t *task, nxt_router_temp_conf_t *tmcf,
2521 nxt_str_t *name)
2522 {
2523 size_t size;
2524 nxt_int_t ret;
2525 nxt_bool_t wildcard;
2526 nxt_sockaddr_t *sa;
2527 nxt_socket_conf_t *skcf;
2528 nxt_listen_socket_t *ls;
2529
2530 sa = nxt_sockaddr_parse(tmcf->mem_pool, name);
2531 if (nxt_slow_path(sa == NULL)) {
2532 nxt_alert(task, "invalid listener \"%V\"", name);
2533 return NULL;
2534 }
2535
2536 sa->type = SOCK_STREAM;
2537
2538 nxt_debug(task, "router listener: \"%*s\"",
2539 (size_t) sa->length, nxt_sockaddr_start(sa));
2540
2541 skcf = nxt_mp_zget(tmcf->router_conf->mem_pool, sizeof(nxt_socket_conf_t));
2542 if (nxt_slow_path(skcf == NULL)) {
2543 return NULL;
2544 }
2545
2546 size = nxt_sockaddr_size(sa);
2547
2548 ret = nxt_router_listen_socket_find(tmcf, skcf, sa);
2549
2550 if (ret != NXT_OK) {
2551
2552 ls = nxt_zalloc(sizeof(nxt_listen_socket_t) + size);
2553 if (nxt_slow_path(ls == NULL)) {
2554 return NULL;
2555 }
2556
2557 skcf->listen = ls;
2558
2559 ls->sockaddr = nxt_pointer_to(ls, sizeof(nxt_listen_socket_t));
2560 nxt_memcpy(ls->sockaddr, sa, size);
2561
2562 nxt_listen_socket_remote_size(ls);
2563
2564 ls->socket = -1;
2565 ls->backlog = NXT_LISTEN_BACKLOG;
2566 ls->flags = NXT_NONBLOCK;
2567 ls->read_after_accept = 1;
2568 }
2569
2570 switch (sa->u.sockaddr.sa_family) {
2571 #if (NXT_HAVE_UNIX_DOMAIN)
2572 case AF_UNIX:
2573 wildcard = 0;
2574 break;
2575 #endif
2576 #if (NXT_INET6)
2577 case AF_INET6:
2578 wildcard = IN6_IS_ADDR_UNSPECIFIED(&sa->u.sockaddr_in6.sin6_addr);
2579 break;
2580 #endif
2581 case AF_INET:
2582 default:
2583 wildcard = (sa->u.sockaddr_in.sin_addr.s_addr == INADDR_ANY);
2584 break;
2585 }
2586
2587 if (!wildcard) {
2588 skcf->sockaddr = nxt_mp_zget(tmcf->router_conf->mem_pool, size);
2589 if (nxt_slow_path(skcf->sockaddr == NULL)) {
2590 return NULL;
2591 }
2592
2593 nxt_memcpy(skcf->sockaddr, sa, size);
2594 }
2595
2596 return skcf;
2597 }
2598
2599
2600 static nxt_int_t
nxt_router_listen_socket_find(nxt_router_temp_conf_t * tmcf,nxt_socket_conf_t * nskcf,nxt_sockaddr_t * sa)2601 nxt_router_listen_socket_find(nxt_router_temp_conf_t *tmcf,
2602 nxt_socket_conf_t *nskcf, nxt_sockaddr_t *sa)
2603 {
2604 nxt_router_t *router;
2605 nxt_queue_link_t *qlk;
2606 nxt_socket_conf_t *skcf;
2607
2608 router = tmcf->router_conf->router;
2609
2610 for (qlk = nxt_queue_first(&router->sockets);
2611 qlk != nxt_queue_tail(&router->sockets);
2612 qlk = nxt_queue_next(qlk))
2613 {
2614 skcf = nxt_queue_link_data(qlk, nxt_socket_conf_t, link);
2615
2616 if (nxt_sockaddr_cmp(skcf->listen->sockaddr, sa)) {
2617 nskcf->listen = skcf->listen;
2618
2619 nxt_queue_remove(qlk);
2620 nxt_queue_insert_tail(&keeping_sockets, qlk);
2621
2622 nxt_queue_insert_tail(&updating_sockets, &nskcf->link);
2623
2624 return NXT_OK;
2625 }
2626 }
2627
2628 nxt_queue_insert_tail(&pending_sockets, &nskcf->link);
2629
2630 return NXT_DECLINED;
2631 }
2632
2633
2634 static void
nxt_router_listen_socket_rpc_create(nxt_task_t * task,nxt_router_temp_conf_t * tmcf,nxt_socket_conf_t * skcf)2635 nxt_router_listen_socket_rpc_create(nxt_task_t *task,
2636 nxt_router_temp_conf_t *tmcf, nxt_socket_conf_t *skcf)
2637 {
2638 size_t size;
2639 uint32_t stream;
2640 nxt_int_t ret;
2641 nxt_buf_t *b;
2642 nxt_port_t *main_port, *router_port;
2643 nxt_runtime_t *rt;
2644 nxt_socket_rpc_t *rpc;
2645
2646 rpc = nxt_mp_alloc(tmcf->mem_pool, sizeof(nxt_socket_rpc_t));
2647 if (rpc == NULL) {
2648 goto fail;
2649 }
2650
2651 rpc->socket_conf = skcf;
2652 rpc->temp_conf = tmcf;
2653
2654 size = nxt_sockaddr_size(skcf->listen->sockaddr);
2655
2656 b = nxt_buf_mem_alloc(tmcf->mem_pool, size, 0);
2657 if (b == NULL) {
2658 goto fail;
2659 }
2660
2661 b->completion_handler = nxt_buf_dummy_completion;
2662
2663 b->mem.free = nxt_cpymem(b->mem.free, skcf->listen->sockaddr, size);
2664
2665 rt = task->thread->runtime;
2666 main_port = rt->port_by_type[NXT_PROCESS_MAIN];
2667 router_port = rt->port_by_type[NXT_PROCESS_ROUTER];
2668
2669 stream = nxt_port_rpc_register_handler(task, router_port,
2670 nxt_router_listen_socket_ready,
2671 nxt_router_listen_socket_error,
2672 main_port->pid, rpc);
2673 if (nxt_slow_path(stream == 0)) {
2674 goto fail;
2675 }
2676
2677 ret = nxt_port_socket_write(task, main_port, NXT_PORT_MSG_SOCKET, -1,
2678 stream, router_port->id, b);
2679
2680 if (nxt_slow_path(ret != NXT_OK)) {
2681 nxt_port_rpc_cancel(task, router_port, stream);
2682 goto fail;
2683 }
2684
2685 return;
2686
2687 fail:
2688
2689 nxt_router_conf_error(task, tmcf);
2690 }
2691
2692
2693 static void
nxt_router_listen_socket_ready(nxt_task_t * task,nxt_port_recv_msg_t * msg,void * data)2694 nxt_router_listen_socket_ready(nxt_task_t *task, nxt_port_recv_msg_t *msg,
2695 void *data)
2696 {
2697 nxt_int_t ret;
2698 nxt_socket_t s;
2699 nxt_socket_rpc_t *rpc;
2700
2701 rpc = data;
2702
2703 s = msg->fd[0];
2704
2705 ret = nxt_socket_nonblocking(task, s);
2706 if (nxt_slow_path(ret != NXT_OK)) {
2707 goto fail;
2708 }
2709
2710 nxt_socket_defer_accept(task, s, rpc->socket_conf->listen->sockaddr);
2711
2712 ret = nxt_listen_socket(task, s, NXT_LISTEN_BACKLOG);
2713 if (nxt_slow_path(ret != NXT_OK)) {
2714 goto fail;
2715 }
2716
2717 rpc->socket_conf->listen->socket = s;
2718
2719 nxt_work_queue_add(&task->thread->engine->fast_work_queue,
2720 nxt_router_conf_apply, task, rpc->temp_conf, NULL);
2721
2722 return;
2723
2724 fail:
2725
2726 nxt_socket_close(task, s);
2727
2728 nxt_router_conf_error(task, rpc->temp_conf);
2729 }
2730
2731
2732 static void
nxt_router_listen_socket_error(nxt_task_t * task,nxt_port_recv_msg_t * msg,void * data)2733 nxt_router_listen_socket_error(nxt_task_t *task, nxt_port_recv_msg_t *msg,
2734 void *data)
2735 {
2736 nxt_socket_rpc_t *rpc;
2737 nxt_router_temp_conf_t *tmcf;
2738
2739 rpc = data;
2740 tmcf = rpc->temp_conf;
2741
2742 #if 0
2743 u_char *p;
2744 size_t size;
2745 uint8_t error;
2746 nxt_buf_t *in, *out;
2747 nxt_sockaddr_t *sa;
2748
2749 static nxt_str_t socket_errors[] = {
2750 nxt_string("ListenerSystem"),
2751 nxt_string("ListenerNoIPv6"),
2752 nxt_string("ListenerPort"),
2753 nxt_string("ListenerInUse"),
2754 nxt_string("ListenerNoAddress"),
2755 nxt_string("ListenerNoAccess"),
2756 nxt_string("ListenerPath"),
2757 };
2758
2759 sa = rpc->socket_conf->listen->sockaddr;
2760
2761 in = nxt_buf_chk_make_plain(tmcf->mem_pool, msg->buf, msg->size);
2762
2763 if (nxt_slow_path(in == NULL)) {
2764 return;
2765 }
2766
2767 p = in->mem.pos;
2768
2769 error = *p++;
2770
2771 size = nxt_length("listen socket error: ")
2772 + nxt_length("{listener: \"\", code:\"\", message: \"\"}")
2773 + sa->length + socket_errors[error].length + (in->mem.free - p);
2774
2775 out = nxt_buf_mem_alloc(tmcf->mem_pool, size, 0);
2776 if (nxt_slow_path(out == NULL)) {
2777 return;
2778 }
2779
2780 out->mem.free = nxt_sprintf(out->mem.free, out->mem.end,
2781 "listen socket error: "
2782 "{listener: \"%*s\", code:\"%V\", message: \"%*s\"}",
2783 (size_t) sa->length, nxt_sockaddr_start(sa),
2784 &socket_errors[error], in->mem.free - p, p);
2785
2786 nxt_debug(task, "%*s", out->mem.free - out->mem.pos, out->mem.pos);
2787 #endif
2788
2789 nxt_router_conf_error(task, tmcf);
2790 }
2791
2792
2793 #if (NXT_TLS)
2794
2795 static void
nxt_router_tls_rpc_handler(nxt_task_t * task,nxt_port_recv_msg_t * msg,void * data)2796 nxt_router_tls_rpc_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg,
2797 void *data)
2798 {
2799 nxt_mp_t *mp;
2800 nxt_int_t ret;
2801 nxt_tls_conf_t *tlscf;
2802 nxt_router_tlssock_t *tls;
2803 nxt_tls_bundle_conf_t *bundle;
2804 nxt_router_temp_conf_t *tmcf;
2805
2806 nxt_debug(task, "tls rpc handler");
2807
2808 tls = data;
2809 tmcf = tls->temp_conf;
2810
2811 if (msg == NULL || msg->port_msg.type == _NXT_PORT_MSG_RPC_ERROR) {
2812 goto fail;
2813 }
2814
2815 mp = tmcf->router_conf->mem_pool;
2816
2817 if (tls->socket_conf->tls == NULL){
2818 tlscf = nxt_mp_zget(mp, sizeof(nxt_tls_conf_t));
2819 if (nxt_slow_path(tlscf == NULL)) {
2820 goto fail;
2821 }
2822
2823 tlscf->no_wait_shutdown = 1;
2824 tls->socket_conf->tls = tlscf;
2825
2826 } else {
2827 tlscf = tls->socket_conf->tls;
2828 }
2829
2830 tls->tls_init->conf = tlscf;
2831
2832 bundle = nxt_mp_get(mp, sizeof(nxt_tls_bundle_conf_t));
2833 if (nxt_slow_path(bundle == NULL)) {
2834 goto fail;
2835 }
2836
2837 if (nxt_slow_path(nxt_str_dup(mp, &bundle->name, &tls->name) == NULL)) {
2838 goto fail;
2839 }
2840
2841 bundle->chain_file = msg->fd[0];
2842 bundle->next = tlscf->bundle;
2843 tlscf->bundle = bundle;
2844
2845 ret = task->thread->runtime->tls->server_init(task, mp, tls->tls_init,
2846 tls->last);
2847 if (nxt_slow_path(ret != NXT_OK)) {
2848 goto fail;
2849 }
2850
2851 nxt_work_queue_add(&task->thread->engine->fast_work_queue,
2852 nxt_router_conf_apply, task, tmcf, NULL);
2853 return;
2854
2855 fail:
2856
2857 nxt_router_conf_error(task, tmcf);
2858 }
2859
2860 #endif
2861
2862
2863 static void
nxt_router_app_rpc_create(nxt_task_t * task,nxt_router_temp_conf_t * tmcf,nxt_app_t * app)2864 nxt_router_app_rpc_create(nxt_task_t *task,
2865 nxt_router_temp_conf_t *tmcf, nxt_app_t *app)
2866 {
2867 size_t size;
2868 uint32_t stream;
2869 nxt_fd_t port_fd, queue_fd;
2870 nxt_int_t ret;
2871 nxt_buf_t *b;
2872 nxt_port_t *router_port, *dport;
2873 nxt_runtime_t *rt;
2874 nxt_app_rpc_t *rpc;
2875
2876 rt = task->thread->runtime;
2877
2878 dport = app->proto_port;
2879
2880 if (dport == NULL) {
2881 nxt_debug(task, "app '%V' prototype prefork", &app->name);
2882
2883 size = app->name.length + 1 + app->conf.length;
2884
2885 b = nxt_buf_mem_alloc(tmcf->mem_pool, size, 0);
2886 if (nxt_slow_path(b == NULL)) {
2887 goto fail;
2888 }
2889
2890 b->completion_handler = nxt_buf_dummy_completion;
2891
2892 nxt_buf_cpystr(b, &app->name);
2893 *b->mem.free++ = '\0';
2894 nxt_buf_cpystr(b, &app->conf);
2895
2896 dport = rt->port_by_type[NXT_PROCESS_MAIN];
2897
2898 port_fd = app->shared_port->pair[0];
2899 queue_fd = app->shared_port->queue_fd;
2900
2901 } else {
2902 nxt_debug(task, "app '%V' prefork", &app->name);
2903
2904 b = NULL;
2905 port_fd = -1;
2906 queue_fd = -1;
2907 }
2908
2909 router_port = rt->port_by_type[NXT_PROCESS_ROUTER];
2910
2911 rpc = nxt_port_rpc_register_handler_ex(task, router_port,
2912 nxt_router_app_prefork_ready,
2913 nxt_router_app_prefork_error,
2914 sizeof(nxt_app_rpc_t));
2915 if (nxt_slow_path(rpc == NULL)) {
2916 goto fail;
2917 }
2918
2919 rpc->app = app;
2920 rpc->temp_conf = tmcf;
2921 rpc->proto = (b != NULL);
2922
2923 stream = nxt_port_rpc_ex_stream(rpc);
2924
2925 ret = nxt_port_socket_write2(task, dport, NXT_PORT_MSG_START_PROCESS,
2926 port_fd, queue_fd, stream, router_port->id, b);
2927 if (nxt_slow_path(ret != NXT_OK)) {
2928 nxt_port_rpc_cancel(task, router_port, stream);
2929 goto fail;
2930 }
2931
2932 if (b == NULL) {
2933 nxt_port_rpc_ex_set_peer(task, router_port, rpc, dport->pid);
2934
2935 app->pending_processes++;
2936 }
2937
2938 return;
2939
2940 fail:
2941
2942 nxt_router_conf_error(task, tmcf);
2943 }
2944
2945
2946 static void
nxt_router_app_prefork_ready(nxt_task_t * task,nxt_port_recv_msg_t * msg,void * data)2947 nxt_router_app_prefork_ready(nxt_task_t *task, nxt_port_recv_msg_t *msg,
2948 void *data)
2949 {
2950 nxt_app_t *app;
2951 nxt_port_t *port;
2952 nxt_app_rpc_t *rpc;
2953 nxt_event_engine_t *engine;
2954
2955 rpc = data;
2956 app = rpc->app;
2957
2958 port = msg->u.new_port;
2959
2960 nxt_assert(port != NULL);
2961 nxt_assert(port->id == 0);
2962
2963 if (rpc->proto) {
2964 nxt_assert(app->proto_port == NULL);
2965 nxt_assert(port->type == NXT_PROCESS_PROTOTYPE);
2966
2967 nxt_port_inc_use(port);
2968
2969 app->proto_port = port;
2970 port->app = app;
2971
2972 nxt_router_app_rpc_create(task, rpc->temp_conf, app);
2973
2974 return;
2975 }
2976
2977 nxt_assert(port->type == NXT_PROCESS_APP);
2978
2979 port->app = app;
2980 port->main_app_port = port;
2981
2982 app->pending_processes--;
2983 app->processes++;
2984 app->idle_processes++;
2985
2986 engine = task->thread->engine;
2987
2988 nxt_queue_insert_tail(&app->ports, &port->app_link);
2989 nxt_queue_insert_tail(&app->spare_ports, &port->idle_link);
2990
2991 nxt_debug(task, "app '%V' move new port %PI:%d to spare_ports",
2992 &app->name, port->pid, port->id);
2993
2994 nxt_port_hash_add(&app->port_hash, port);
2995 app->port_hash_count++;
2996
2997 port->idle_start = 0;
2998
2999 nxt_port_inc_use(port);
3000
3001 nxt_port_socket_write(task, port, NXT_PORT_MSG_PORT_ACK, -1, 0, 0, NULL);
3002
3003 nxt_work_queue_add(&engine->fast_work_queue,
3004 nxt_router_conf_apply, task, rpc->temp_conf, NULL);
3005 }
3006
3007
3008 static void
nxt_router_app_prefork_error(nxt_task_t * task,nxt_port_recv_msg_t * msg,void * data)3009 nxt_router_app_prefork_error(nxt_task_t *task, nxt_port_recv_msg_t *msg,
3010 void *data)
3011 {
3012 nxt_app_t *app;
3013 nxt_app_rpc_t *rpc;
3014 nxt_router_temp_conf_t *tmcf;
3015
3016 rpc = data;
3017 app = rpc->app;
3018 tmcf = rpc->temp_conf;
3019
3020 if (rpc->proto) {
3021 nxt_log(task, NXT_LOG_WARN, "failed to start prototype \"%V\"",
3022 &app->name);
3023
3024 } else {
3025 nxt_log(task, NXT_LOG_WARN, "failed to start application \"%V\"",
3026 &app->name);
3027
3028 app->pending_processes--;
3029 }
3030
3031 nxt_router_conf_error(task, tmcf);
3032 }
3033
3034
3035 static nxt_int_t
nxt_router_engines_create(nxt_task_t * task,nxt_router_t * router,nxt_router_temp_conf_t * tmcf,const nxt_event_interface_t * interface)3036 nxt_router_engines_create(nxt_task_t *task, nxt_router_t *router,
3037 nxt_router_temp_conf_t *tmcf, const nxt_event_interface_t *interface)
3038 {
3039 nxt_int_t ret;
3040 nxt_uint_t n, threads;
3041 nxt_queue_link_t *qlk;
3042 nxt_router_engine_conf_t *recf;
3043
3044 threads = tmcf->router_conf->threads;
3045
3046 tmcf->engines = nxt_array_create(tmcf->mem_pool, threads,
3047 sizeof(nxt_router_engine_conf_t));
3048 if (nxt_slow_path(tmcf->engines == NULL)) {
3049 return NXT_ERROR;
3050 }
3051
3052 n = 0;
3053
3054 for (qlk = nxt_queue_first(&router->engines);
3055 qlk != nxt_queue_tail(&router->engines);
3056 qlk = nxt_queue_next(qlk))
3057 {
3058 recf = nxt_array_zero_add(tmcf->engines);
3059 if (nxt_slow_path(recf == NULL)) {
3060 return NXT_ERROR;
3061 }
3062
3063 recf->engine = nxt_queue_link_data(qlk, nxt_event_engine_t, link0);
3064
3065 if (n < threads) {
3066 recf->action = NXT_ROUTER_ENGINE_KEEP;
3067 ret = nxt_router_engine_conf_update(tmcf, recf);
3068
3069 } else {
3070 recf->action = NXT_ROUTER_ENGINE_DELETE;
3071 ret = nxt_router_engine_conf_delete(tmcf, recf);
3072 }
3073
3074 if (nxt_slow_path(ret != NXT_OK)) {
3075 return ret;
3076 }
3077
3078 n++;
3079 }
3080
3081 tmcf->new_threads = n;
3082
3083 while (n < threads) {
3084 recf = nxt_array_zero_add(tmcf->engines);
3085 if (nxt_slow_path(recf == NULL)) {
3086 return NXT_ERROR;
3087 }
3088
3089 recf->action = NXT_ROUTER_ENGINE_ADD;
3090
3091 recf->engine = nxt_event_engine_create(task, interface, NULL, 0, 0);
3092 if (nxt_slow_path(recf->engine == NULL)) {
3093 return NXT_ERROR;
3094 }
3095
3096 ret = nxt_router_engine_conf_create(tmcf, recf);
3097 if (nxt_slow_path(ret != NXT_OK)) {
3098 return ret;
3099 }
3100
3101 n++;
3102 }
3103
3104 return NXT_OK;
3105 }
3106
3107
3108 static nxt_int_t
nxt_router_engine_conf_create(nxt_router_temp_conf_t * tmcf,nxt_router_engine_conf_t * recf)3109 nxt_router_engine_conf_create(nxt_router_temp_conf_t *tmcf,
3110 nxt_router_engine_conf_t *recf)
3111 {
3112 nxt_int_t ret;
3113
3114 ret = nxt_router_engine_joints_create(tmcf, recf, &creating_sockets,
3115 nxt_router_listen_socket_create);
3116 if (nxt_slow_path(ret != NXT_OK)) {
3117 return ret;
3118 }
3119
3120 ret = nxt_router_engine_joints_create(tmcf, recf, &updating_sockets,
3121 nxt_router_listen_socket_create);
3122 if (nxt_slow_path(ret != NXT_OK)) {
3123 return ret;
3124 }
3125
3126 return ret;
3127 }
3128
3129
3130 static nxt_int_t
nxt_router_engine_conf_update(nxt_router_temp_conf_t * tmcf,nxt_router_engine_conf_t * recf)3131 nxt_router_engine_conf_update(nxt_router_temp_conf_t *tmcf,
3132 nxt_router_engine_conf_t *recf)
3133 {
3134 nxt_int_t ret;
3135
3136 ret = nxt_router_engine_joints_create(tmcf, recf, &creating_sockets,
3137 nxt_router_listen_socket_create);
3138 if (nxt_slow_path(ret != NXT_OK)) {
3139 return ret;
3140 }
3141
3142 ret = nxt_router_engine_joints_create(tmcf, recf, &updating_sockets,
3143 nxt_router_listen_socket_update);
3144 if (nxt_slow_path(ret != NXT_OK)) {
3145 return ret;
3146 }
3147
3148 ret = nxt_router_engine_joints_delete(tmcf, recf, &deleting_sockets);
3149 if (nxt_slow_path(ret != NXT_OK)) {
3150 return ret;
3151 }
3152
3153 return ret;
3154 }
3155
3156