1
2 /*
3 * Copyright (C) Igor Sysoev
4 * Copyright (C) Valentin V. Bartenev
5 * Copyright (C) NGINX, Inc.
6 */
7
8 #include <nxt_router.h>
9 #include <nxt_conf.h>
10 #include <nxt_status.h>
11 #if (NXT_TLS)
12 #include <nxt_cert.h>
13 #endif
14 #include <nxt_http.h>
15 #include <nxt_port_memory_int.h>
16 #include <nxt_unit_request.h>
17 #include <nxt_unit_response.h>
18 #include <nxt_router_request.h>
19 #include <nxt_app_queue.h>
20 #include <nxt_port_queue.h>
21
22 #define NXT_SHARED_PORT_ID 0xFFFFu
23
24 typedef struct {
25 nxt_str_t type;
26 uint32_t processes;
27 uint32_t max_processes;
28 uint32_t spare_processes;
29 nxt_msec_t timeout;
30 nxt_msec_t idle_timeout;
31 nxt_conf_value_t *limits_value;
32 nxt_conf_value_t *processes_value;
33 nxt_conf_value_t *targets_value;
34 } nxt_router_app_conf_t;
35
36
37 typedef struct {
38 nxt_str_t pass;
39 nxt_str_t application;
40 } nxt_router_listener_conf_t;
41
42
43 #if (NXT_TLS)
44
45 typedef struct {
46 nxt_str_t name;
47 nxt_socket_conf_t *socket_conf;
48 nxt_router_temp_conf_t *temp_conf;
49 nxt_tls_init_t *tls_init;
50 nxt_bool_t last;
51
52 nxt_queue_link_t link; /* for nxt_socket_conf_t.tls */
53 } nxt_router_tlssock_t;
54
55 #endif
56
57
58 typedef struct {
59 nxt_str_t *name;
60 nxt_socket_conf_t *socket_conf;
61 nxt_router_temp_conf_t *temp_conf;
62 nxt_bool_t last;
63 } nxt_socket_rpc_t;
64
65
66 typedef struct {
67 nxt_app_t *app;
68 nxt_router_temp_conf_t *temp_conf;
69 uint8_t proto; /* 1 bit */
70 } nxt_app_rpc_t;
71
72
73 typedef struct {
74 nxt_app_joint_t *app_joint;
75 uint32_t generation;
76 uint8_t proto; /* 1 bit */
77 } nxt_app_joint_rpc_t;
78
79
80 static nxt_int_t nxt_router_prefork(nxt_task_t *task, nxt_process_t *process,
81 nxt_mp_t *mp);
82 static nxt_int_t nxt_router_start(nxt_task_t *task, nxt_process_data_t *data);
83 static void nxt_router_greet_controller(nxt_task_t *task,
84 nxt_port_t *controller_port);
85
86 static nxt_int_t nxt_router_start_app_process(nxt_task_t *task, nxt_app_t *app);
87
88 static void nxt_router_new_port_handler(nxt_task_t *task,
89 nxt_port_recv_msg_t *msg);
90 static void nxt_router_conf_data_handler(nxt_task_t *task,
91 nxt_port_recv_msg_t *msg);
92 static void nxt_router_app_restart_handler(nxt_task_t *task,
93 nxt_port_recv_msg_t *msg);
94 static void nxt_router_status_handler(nxt_task_t *task,
95 nxt_port_recv_msg_t *msg);
96 static void nxt_router_remove_pid_handler(nxt_task_t *task,
97 nxt_port_recv_msg_t *msg);
98
99 static nxt_router_temp_conf_t *nxt_router_temp_conf(nxt_task_t *task);
100 static void nxt_router_conf_ready(nxt_task_t *task,
101 nxt_router_temp_conf_t *tmcf);
102 static void nxt_router_conf_send(nxt_task_t *task,
103 nxt_router_temp_conf_t *tmcf, nxt_port_msg_type_t type);
104
105 static nxt_int_t nxt_router_conf_create(nxt_task_t *task,
106 nxt_router_temp_conf_t *tmcf, u_char *start, u_char *end);
107 static nxt_int_t nxt_router_conf_process_static(nxt_task_t *task,
108 nxt_router_conf_t *rtcf, nxt_conf_value_t *conf);
109 static nxt_http_forward_t *nxt_router_conf_forward(nxt_task_t *task,
110 nxt_mp_t *mp, nxt_conf_value_t *conf);
111 static nxt_int_t nxt_router_conf_forward_header(nxt_mp_t *mp,
112 nxt_conf_value_t *conf, nxt_http_forward_header_t *fh);
113
114 static nxt_app_t *nxt_router_app_find(nxt_queue_t *queue, nxt_str_t *name);
115 static nxt_int_t nxt_router_apps_hash_test(nxt_lvlhsh_query_t *lhq, void *data);
116 static nxt_int_t nxt_router_apps_hash_add(nxt_router_conf_t *rtcf,
117 nxt_app_t *app);
118 static nxt_app_t *nxt_router_apps_hash_get(nxt_router_conf_t *rtcf,
119 nxt_str_t *name);
120 static void nxt_router_apps_hash_use(nxt_task_t *task, nxt_router_conf_t *rtcf,
121 int i);
122
123 static nxt_int_t nxt_router_app_queue_init(nxt_task_t *task,
124 nxt_port_t *port);
125 static nxt_int_t nxt_router_port_queue_init(nxt_task_t *task,
126 nxt_port_t *port);
127 static nxt_int_t nxt_router_port_queue_map(nxt_task_t *task,
128 nxt_port_t *port, nxt_fd_t fd);
129 static void nxt_router_listen_socket_rpc_create(nxt_task_t *task,
130 nxt_router_temp_conf_t *tmcf, nxt_socket_conf_t *skcf);
131 static void nxt_router_listen_socket_ready(nxt_task_t *task,
132 nxt_port_recv_msg_t *msg, void *data);
133 static void nxt_router_listen_socket_error(nxt_task_t *task,
134 nxt_port_recv_msg_t *msg, void *data);
135 #if (NXT_TLS)
136 static void nxt_router_tls_rpc_handler(nxt_task_t *task,
137 nxt_port_recv_msg_t *msg, void *data);
138 static nxt_int_t nxt_router_conf_tls_insert(nxt_router_temp_conf_t *tmcf,
139 nxt_conf_value_t *value, nxt_socket_conf_t *skcf, nxt_tls_init_t *tls_init,
140 nxt_bool_t last);
141 #endif
142 static void nxt_router_app_rpc_create(nxt_task_t *task,
143 nxt_router_temp_conf_t *tmcf, nxt_app_t *app);
144 static void nxt_router_app_prefork_ready(nxt_task_t *task,
145 nxt_port_recv_msg_t *msg, void *data);
146 static void nxt_router_app_prefork_error(nxt_task_t *task,
147 nxt_port_recv_msg_t *msg, void *data);
148 static nxt_socket_conf_t *nxt_router_socket_conf(nxt_task_t *task,
149 nxt_router_temp_conf_t *tmcf, nxt_str_t *name);
150 static nxt_int_t nxt_router_listen_socket_find(nxt_router_temp_conf_t *tmcf,
151 nxt_socket_conf_t *nskcf, nxt_sockaddr_t *sa);
152
153 static nxt_int_t nxt_router_engines_create(nxt_task_t *task,
154 nxt_router_t *router, nxt_router_temp_conf_t *tmcf,
155 const nxt_event_interface_t *interface);
156 static nxt_int_t nxt_router_engine_conf_create(nxt_router_temp_conf_t *tmcf,
157 nxt_router_engine_conf_t *recf);
158 static nxt_int_t nxt_router_engine_conf_update(nxt_router_temp_conf_t *tmcf,
159 nxt_router_engine_conf_t *recf);
160 static nxt_int_t nxt_router_engine_conf_delete(nxt_router_temp_conf_t *tmcf,
161 nxt_router_engine_conf_t *recf);
162 static nxt_int_t nxt_router_engine_joints_create(nxt_router_temp_conf_t *tmcf,
163 nxt_router_engine_conf_t *recf, nxt_queue_t *sockets,
164 nxt_work_handler_t handler);
165 static nxt_int_t nxt_router_engine_quit(nxt_router_temp_conf_t *tmcf,
166 nxt_router_engine_conf_t *recf);
167 static nxt_int_t nxt_router_engine_joints_delete(nxt_router_temp_conf_t *tmcf,
168 nxt_router_engine_conf_t *recf, nxt_queue_t *sockets);
169
170 static nxt_int_t nxt_router_threads_create(nxt_task_t *task, nxt_runtime_t *rt,
171 nxt_router_temp_conf_t *tmcf);
172 static nxt_int_t nxt_router_thread_create(nxt_task_t *task, nxt_runtime_t *rt,
173 nxt_event_engine_t *engine);
174 static void nxt_router_apps_sort(nxt_task_t *task, nxt_router_t *router,
175 nxt_router_temp_conf_t *tmcf);
176
177 static void nxt_router_engines_post(nxt_router_t *router,
178 nxt_router_temp_conf_t *tmcf);
179 static void nxt_router_engine_post(nxt_event_engine_t *engine,
180 nxt_work_t *jobs);
181
182 static void nxt_router_thread_start(void *data);
183 static void nxt_router_rt_add_port(nxt_task_t *task, void *obj,
184 void *data);
185 static void nxt_router_listen_socket_create(nxt_task_t *task, void *obj,
186 void *data);
187 static void nxt_router_listen_socket_update(nxt_task_t *task, void *obj,
188 void *data);
189 static void nxt_router_listen_socket_delete(nxt_task_t *task, void *obj,
190 void *data);
191 static void nxt_router_worker_thread_quit(nxt_task_t *task, void *obj,
192 void *data);
193 static void nxt_router_listen_socket_close(nxt_task_t *task, void *obj,
194 void *data);
195 static void nxt_router_thread_exit_handler(nxt_task_t *task, void *obj,
196 void *data);
197 static void nxt_router_req_headers_ack_handler(nxt_task_t *task,
198 nxt_port_recv_msg_t *msg, nxt_request_rpc_data_t *req_rpc_data);
199 static void nxt_router_listen_socket_release(nxt_task_t *task,
200 nxt_socket_conf_t *skcf);
201
202 static void nxt_router_app_port_ready(nxt_task_t *task,
203 nxt_port_recv_msg_t *msg, void *data);
204 static void nxt_router_app_port_error(nxt_task_t *task,
205 nxt_port_recv_msg_t *msg, void *data);
206
207 static void nxt_router_app_use(nxt_task_t *task, nxt_app_t *app, int i);
208 static void nxt_router_app_unlink(nxt_task_t *task, nxt_app_t *app);
209
210 static void nxt_router_app_port_release(nxt_task_t *task, nxt_app_t *app,
211 nxt_port_t *port, nxt_apr_action_t action);
212 static void nxt_router_app_port_get(nxt_task_t *task, nxt_app_t *app,
213 nxt_request_rpc_data_t *req_rpc_data);
214 static void nxt_router_http_request_error(nxt_task_t *task, void *obj,
215 void *data);
216 static void nxt_router_http_request_done(nxt_task_t *task, void *obj,
217 void *data);
218
219 static void nxt_router_app_prepare_request(nxt_task_t *task,
220 nxt_request_rpc_data_t *req_rpc_data);
221 static nxt_buf_t *nxt_router_prepare_msg(nxt_task_t *task,
222 nxt_http_request_t *r, nxt_app_t *app, const nxt_str_t *prefix);
223
224 static void nxt_router_app_timeout(nxt_task_t *task, void *obj, void *data);
225 static void nxt_router_adjust_idle_timer(nxt_task_t *task, void *obj,
226 void *data);
227 static void nxt_router_app_idle_timeout(nxt_task_t *task, void *obj,
228 void *data);
229 static void nxt_router_app_joint_release_handler(nxt_task_t *task, void *obj,
230 void *data);
231 static void nxt_router_free_app(nxt_task_t *task, void *obj, void *data);
232
233 static const nxt_http_request_state_t nxt_http_request_send_state;
234 static void nxt_http_request_send_body(nxt_task_t *task, void *obj, void *data);
235
236 static void nxt_router_app_joint_use(nxt_task_t *task,
237 nxt_app_joint_t *app_joint, int i);
238
239 static void nxt_router_http_request_release_post(nxt_task_t *task,
240 nxt_http_request_t *r);
241 static void nxt_router_http_request_release(nxt_task_t *task, void *obj,
242 void *data);
243 static void nxt_router_oosm_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg);
244 static void nxt_router_get_port_handler(nxt_task_t *task,
245 nxt_port_recv_msg_t *msg);
246 static void nxt_router_get_mmap_handler(nxt_task_t *task,
247 nxt_port_recv_msg_t *msg);
248
249 extern const nxt_http_request_state_t nxt_http_websocket;
250
251 nxt_router_t *nxt_router;
252
253 static const nxt_str_t http_prefix = nxt_string("HTTP_");
254 static const nxt_str_t empty_prefix = nxt_string("");
255
256 static const nxt_str_t *nxt_app_msg_prefix[] = {
257 &empty_prefix,
258 &empty_prefix,
259 &http_prefix,
260 &http_prefix,
261 &http_prefix,
262 &empty_prefix,
263 };
264
265
266 static const nxt_port_handlers_t nxt_router_process_port_handlers = {
267 .quit = nxt_signal_quit_handler,
268 .new_port = nxt_router_new_port_handler,
269 .get_port = nxt_router_get_port_handler,
270 .change_file = nxt_port_change_log_file_handler,
271 .mmap = nxt_port_mmap_handler,
272 .get_mmap = nxt_router_get_mmap_handler,
273 .data = nxt_router_conf_data_handler,
274 .app_restart = nxt_router_app_restart_handler,
275 .status = nxt_router_status_handler,
276 .remove_pid = nxt_router_remove_pid_handler,
277 .access_log = nxt_router_access_log_reopen_handler,
278 .rpc_ready = nxt_port_rpc_handler,
279 .rpc_error = nxt_port_rpc_handler,
280 .oosm = nxt_router_oosm_handler,
281 };
282
283
284 const nxt_process_init_t nxt_router_process = {
285 .name = "router",
286 .type = NXT_PROCESS_ROUTER,
287 .prefork = nxt_router_prefork,
288 .restart = 1,
289 .setup = nxt_process_core_setup,
290 .start = nxt_router_start,
291 .port_handlers = &nxt_router_process_port_handlers,
292 .signals = nxt_process_signals,
293 };
294
295
296 /* Queues of nxt_socket_conf_t */
297 nxt_queue_t creating_sockets;
298 nxt_queue_t pending_sockets;
299 nxt_queue_t updating_sockets;
300 nxt_queue_t keeping_sockets;
301 nxt_queue_t deleting_sockets;
302
303
304 static nxt_int_t
nxt_router_prefork(nxt_task_t * task,nxt_process_t * process,nxt_mp_t * mp)305 nxt_router_prefork(nxt_task_t *task, nxt_process_t *process, nxt_mp_t *mp)
306 {
307 nxt_runtime_stop_app_processes(task, task->thread->runtime);
308
309 return NXT_OK;
310 }
311
312
313 static nxt_int_t
nxt_router_start(nxt_task_t * task,nxt_process_data_t * data)314 nxt_router_start(nxt_task_t *task, nxt_process_data_t *data)
315 {
316 nxt_int_t ret;
317 nxt_port_t *controller_port;
318 nxt_router_t *router;
319 nxt_runtime_t *rt;
320
321 rt = task->thread->runtime;
322
323 nxt_log(task, NXT_LOG_INFO, "router started");
324
325 #if (NXT_TLS)
326 rt->tls = nxt_service_get(rt->services, "SSL/TLS", "OpenSSL");
327 if (nxt_slow_path(rt->tls == NULL)) {
328 return NXT_ERROR;
329 }
330
331 ret = rt->tls->library_init(task);
332 if (nxt_slow_path(ret != NXT_OK)) {
333 return ret;
334 }
335 #endif
336
337 ret = nxt_http_init(task);
338 if (nxt_slow_path(ret != NXT_OK)) {
339 return ret;
340 }
341
342 router = nxt_zalloc(sizeof(nxt_router_t));
343 if (nxt_slow_path(router == NULL)) {
344 return NXT_ERROR;
345 }
346
347 nxt_queue_init(&router->engines);
348 nxt_queue_init(&router->sockets);
349 nxt_queue_init(&router->apps);
350
351 nxt_router = router;
352
353 controller_port = rt->port_by_type[NXT_PROCESS_CONTROLLER];
354 if (controller_port != NULL) {
355 nxt_router_greet_controller(task, controller_port);
356 }
357
358 return NXT_OK;
359 }
360
361
362 static void
nxt_router_greet_controller(nxt_task_t * task,nxt_port_t * controller_port)363 nxt_router_greet_controller(nxt_task_t *task, nxt_port_t *controller_port)
364 {
365 nxt_port_socket_write(task, controller_port, NXT_PORT_MSG_PROCESS_READY,
366 -1, 0, 0, NULL);
367 }
368
369
370 static void
nxt_router_start_app_process_handler(nxt_task_t * task,nxt_port_t * port,void * data)371 nxt_router_start_app_process_handler(nxt_task_t *task, nxt_port_t *port,
372 void *data)
373 {
374 size_t size;
375 uint32_t stream;
376 nxt_fd_t port_fd, queue_fd;
377 nxt_int_t ret;
378 nxt_app_t *app;
379 nxt_buf_t *b;
380 nxt_port_t *dport;
381 nxt_runtime_t *rt;
382 nxt_app_joint_rpc_t *app_joint_rpc;
383
384 app = data;
385
386 nxt_thread_mutex_lock(&app->mutex);
387
388 dport = app->proto_port;
389
390 nxt_thread_mutex_unlock(&app->mutex);
391
392 if (dport != NULL) {
393 nxt_debug(task, "app '%V' %p start process", &app->name, app);
394
395 b = NULL;
396 port_fd = -1;
397 queue_fd = -1;
398
399 } else {
400 if (app->proto_port_requests > 0) {
401 nxt_debug(task, "app '%V' %p wait for prototype process",
402 &app->name, app);
403
404 app->proto_port_requests++;
405
406 goto skip;
407 }
408
409 nxt_debug(task, "app '%V' %p start prototype process", &app->name, app);
410
411 rt = task->thread->runtime;
412 dport = rt->port_by_type[NXT_PROCESS_MAIN];
413
414 size = app->name.length + 1 + app->conf.length;
415
416 b = nxt_buf_mem_alloc(task->thread->engine->mem_pool, size, 0);
417 if (nxt_slow_path(b == NULL)) {
418 goto failed;
419 }
420
421 nxt_buf_cpystr(b, &app->name);
422 *b->mem.free++ = '\0';
423 nxt_buf_cpystr(b, &app->conf);
424
425 port_fd = app->shared_port->pair[0];
426 queue_fd = app->shared_port->queue_fd;
427 }
428
429 app_joint_rpc = nxt_port_rpc_register_handler_ex(task, port,
430 nxt_router_app_port_ready,
431 nxt_router_app_port_error,
432 sizeof(nxt_app_joint_rpc_t));
433 if (nxt_slow_path(app_joint_rpc == NULL)) {
434 goto failed;
435 }
436
437 stream = nxt_port_rpc_ex_stream(app_joint_rpc);
438
439 ret = nxt_port_socket_write2(task, dport, NXT_PORT_MSG_START_PROCESS,
440 port_fd, queue_fd, stream, port->id, b);
441 if (nxt_slow_path(ret != NXT_OK)) {
442 nxt_port_rpc_cancel(task, port, stream);
443
444 goto failed;
445 }
446
447 app_joint_rpc->app_joint = app->joint;
448 app_joint_rpc->generation = app->generation;
449 app_joint_rpc->proto = (b != NULL);
450
451 if (b != NULL) {
452 app->proto_port_requests++;
453
454 b = NULL;
455 }
456
457 nxt_router_app_joint_use(task, app->joint, 1);
458
459 failed:
460
461 if (b != NULL) {
462 nxt_mp_free(b->data, b);
463 }
464
465 skip:
466
467 nxt_router_app_use(task, app, -1);
468 }
469
470
471 static void
nxt_router_app_joint_use(nxt_task_t * task,nxt_app_joint_t * app_joint,int i)472 nxt_router_app_joint_use(nxt_task_t *task, nxt_app_joint_t *app_joint, int i)
473 {
474 app_joint->use_count += i;
475
476 if (app_joint->use_count == 0) {
477 nxt_assert(app_joint->app == NULL);
478
479 nxt_free(app_joint);
480 }
481 }
482
483
484 static nxt_int_t
nxt_router_start_app_process(nxt_task_t * task,nxt_app_t * app)485 nxt_router_start_app_process(nxt_task_t *task, nxt_app_t *app)
486 {
487 nxt_int_t res;
488 nxt_port_t *router_port;
489 nxt_runtime_t *rt;
490
491 nxt_debug(task, "app '%V' start process", &app->name);
492
493 rt = task->thread->runtime;
494 router_port = rt->port_by_type[NXT_PROCESS_ROUTER];
495
496 nxt_router_app_use(task, app, 1);
497
498 res = nxt_port_post(task, router_port, nxt_router_start_app_process_handler,
499 app);
500
501 if (res == NXT_OK) {
502 return res;
503 }
504
505 nxt_thread_mutex_lock(&app->mutex);
506
507 app->pending_processes--;
508
509 nxt_thread_mutex_unlock(&app->mutex);
510
511 nxt_router_app_use(task, app, -1);
512
513 return NXT_ERROR;
514 }
515
516
517 nxt_inline nxt_bool_t
nxt_router_msg_cancel(nxt_task_t * task,nxt_request_rpc_data_t * req_rpc_data)518 nxt_router_msg_cancel(nxt_task_t *task, nxt_request_rpc_data_t *req_rpc_data)
519 {
520 nxt_buf_t *b, *next;
521 nxt_bool_t cancelled;
522 nxt_port_t *app_port;
523 nxt_msg_info_t *msg_info;
524
525 msg_info = &req_rpc_data->msg_info;
526
527 if (msg_info->buf == NULL) {
528 return 0;
529 }
530
531 app_port = req_rpc_data->app_port;
532
533 if (app_port != NULL && app_port->id == NXT_SHARED_PORT_ID) {
534 cancelled = nxt_app_queue_cancel(app_port->queue,
535 msg_info->tracking_cookie,
536 req_rpc_data->stream);
537
538 if (cancelled) {
539 nxt_debug(task, "stream #%uD: cancelled by router",
540 req_rpc_data->stream);
541 }
542
543 } else {
544 cancelled = 0;
545 }
546
547 for (b = msg_info->buf; b != NULL; b = next) {
548 next = b->next;
549 b->next = NULL;
550
551 if (b->is_port_mmap_sent) {
552 b->is_port_mmap_sent = cancelled == 0;
553 }
554
555 b->completion_handler(task, b, b->parent);
556 }
557
558 msg_info->buf = NULL;
559
560 return cancelled;
561 }
562
563
564 nxt_inline nxt_bool_t
nxt_queue_chk_remove(nxt_queue_link_t * lnk)565 nxt_queue_chk_remove(nxt_queue_link_t *lnk)
566 {
567 if (lnk->next != NULL) {
568 nxt_queue_remove(lnk);
569
570 lnk->next = NULL;
571
572 return 1;
573 }
574
575 return 0;
576 }
577
578
579 nxt_inline void
nxt_request_rpc_data_unlink(nxt_task_t * task,nxt_request_rpc_data_t * req_rpc_data)580 nxt_request_rpc_data_unlink(nxt_task_t *task,
581 nxt_request_rpc_data_t *req_rpc_data)
582 {
583 nxt_app_t *app;
584 nxt_bool_t unlinked;
585 nxt_http_request_t *r;
586
587 nxt_router_msg_cancel(task, req_rpc_data);
588
589 app = req_rpc_data->app;
590
591 if (req_rpc_data->app_port != NULL) {
592 nxt_router_app_port_release(task, app, req_rpc_data->app_port,
593 req_rpc_data->apr_action);
594
595 req_rpc_data->app_port = NULL;
596 }
597
598 r = req_rpc_data->request;
599
600 if (r != NULL) {
601 r->timer_data = NULL;
602
603 nxt_router_http_request_release_post(task, r);
604
605 r->req_rpc_data = NULL;
606 req_rpc_data->request = NULL;
607
608 if (app != NULL) {
609 unlinked = 0;
610
611 nxt_thread_mutex_lock(&app->mutex);
612
613 if (r->app_link.next != NULL) {
614 nxt_queue_remove(&r->app_link);
615 r->app_link.next = NULL;
616
617 unlinked = 1;
618 }
619
620 nxt_thread_mutex_unlock(&app->mutex);
621
622 if (unlinked) {
623 nxt_mp_release(r->mem_pool);
624 }
625 }
626 }
627
628 if (app != NULL) {
629 nxt_router_app_use(task, app, -1);
630
631 req_rpc_data->app = NULL;
632 }
633
634 if (req_rpc_data->msg_info.body_fd != -1) {
635 nxt_fd_close(req_rpc_data->msg_info.body_fd);
636
637 req_rpc_data->msg_info.body_fd = -1;
638 }
639
640 if (req_rpc_data->rpc_cancel) {
641 req_rpc_data->rpc_cancel = 0;
642
643 nxt_port_rpc_cancel(task, task->thread->engine->port,
644 req_rpc_data->stream);
645 }
646 }
647
648
649 static void
nxt_router_new_port_handler(nxt_task_t * task,nxt_port_recv_msg_t * msg)650 nxt_router_new_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
651 {
652 nxt_int_t res;
653 nxt_app_t *app;
654 nxt_port_t *port, *main_app_port;
655 nxt_runtime_t *rt;
656
657 nxt_port_new_port_handler(task, msg);
658
659 port = msg->u.new_port;
660
661 if (port != NULL && port->type == NXT_PROCESS_CONTROLLER) {
662 nxt_router_greet_controller(task, msg->u.new_port);
663 }
664
665 if (port != NULL && port->type == NXT_PROCESS_PROTOTYPE) {
666 nxt_port_rpc_handler(task, msg);
667
668 return;
669 }
670
671 if (port == NULL || port->type != NXT_PROCESS_APP) {
672
673 if (msg->port_msg.stream == 0) {
674 return;
675 }
676
677 msg->port_msg.type = _NXT_PORT_MSG_RPC_ERROR;
678
679 } else {
680 if (msg->fd[1] != -1) {
681 res = nxt_router_port_queue_map(task, port, msg->fd[1]);
682 if (nxt_slow_path(res != NXT_OK)) {
683 return;
684 }
685
686 nxt_fd_close(msg->fd[1]);
687 msg->fd[1] = -1;
688 }
689 }
690
691 if (msg->port_msg.stream != 0) {
692 nxt_port_rpc_handler(task, msg);
693 return;
694 }
695
696 nxt_debug(task, "new port id %d (%d)", port->id, port->type);
697
698 /*
699 * Port with "id == 0" is application 'main' port and it always
700 * should come with non-zero stream.
701 */
702 nxt_assert(port->id != 0);
703
704 /* Find 'main' app port and get app reference. */
705 rt = task->thread->runtime;
706
707 /*
708 * It is safe to access 'runtime->ports' hash because 'NEW_PORT'
709 * sent to main port (with id == 0) and processed in main thread.
710 */
711 main_app_port = nxt_port_hash_find(&rt->ports, port->pid, 0);
712 nxt_assert(main_app_port != NULL);
713
714 app = main_app_port->app;
715
716 if (nxt_fast_path(app != NULL)) {
717 nxt_thread_mutex_lock(&app->mutex);
718
719 /* TODO here should be find-and-add code because there can be
720 port waiters in port_hash */
721 nxt_port_hash_add(&app->port_hash, port);
722 app->port_hash_count++;
723
724 nxt_thread_mutex_unlock(&app->mutex);
725
726 port->app = app;
727 }
728
729 port->main_app_port = main_app_port;
730
731 nxt_port_socket_write(task, port, NXT_PORT_MSG_PORT_ACK, -1, 0, 0, NULL);
732 }
733
734
735 static void
nxt_router_conf_data_handler(nxt_task_t * task,nxt_port_recv_msg_t * msg)736 nxt_router_conf_data_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
737 {
738 void *p;
739 size_t size;
740 nxt_int_t ret;
741 nxt_port_t *port;
742 nxt_router_temp_conf_t *tmcf;
743
744 port = nxt_runtime_port_find(task->thread->runtime,
745 msg->port_msg.pid,
746 msg->port_msg.reply_port);
747 if (nxt_slow_path(port == NULL)) {
748 nxt_alert(task, "conf_data_handler: reply port not found");
749 return;
750 }
751
752 p = MAP_FAILED;
753
754 /*
755 * Ancient compilers like gcc 4.8.5 on CentOS 7 wants 'size' to be
756 * initialized in 'cleanup' section.
757 */
758 size = 0;
759
760 tmcf = nxt_router_temp_conf(task);
761 if (nxt_slow_path(tmcf == NULL)) {
762 goto fail;
763 }
764
765 if (nxt_slow_path(msg->fd[0] == -1)) {
766 nxt_alert(task, "conf_data_handler: invalid shm fd");
767 goto fail;
768 }
769
770 if (nxt_buf_mem_used_size(&msg->buf->mem) != sizeof(size_t)) {
771 nxt_alert(task, "conf_data_handler: unexpected buffer size (%d)",
772 (int) nxt_buf_mem_used_size(&msg->buf->mem));
773 goto fail;
774 }
775
776 nxt_memcpy(&size, msg->buf->mem.pos, sizeof(size_t));
777
778 p = nxt_mem_mmap(NULL, size, PROT_READ, MAP_SHARED, msg->fd[0], 0);
779
780 nxt_fd_close(msg->fd[0]);
781 msg->fd[0] = -1;
782
783 if (nxt_slow_path(p == MAP_FAILED)) {
784 goto fail;
785 }
786
787 nxt_debug(task, "conf_data_handler(%uz): %*s", size, size, p);
788
789 tmcf->router_conf->router = nxt_router;
790 tmcf->stream = msg->port_msg.stream;
791 tmcf->port = port;
792
793 nxt_port_use(task, tmcf->port, 1);
794
795 ret = nxt_router_conf_create(task, tmcf, p, nxt_pointer_to(p, size));
796
797 if (nxt_fast_path(ret == NXT_OK)) {
798 nxt_router_conf_apply(task, tmcf, NULL);
799
800 } else {
801 nxt_router_conf_error(task, tmcf);
802 }
803
804 goto cleanup;
805
806 fail:
807
808 nxt_port_socket_write(task, port, NXT_PORT_MSG_RPC_ERROR, -1,
809 msg->port_msg.stream, 0, NULL);
810
811 if (tmcf != NULL) {
812 nxt_mp_release(tmcf->mem_pool);
813 }
814
815 cleanup:
816
817 if (p != MAP_FAILED) {
818 nxt_mem_munmap(p, size);
819 }
820
821 if (msg->fd[0] != -1) {
822 nxt_fd_close(msg->fd[0]);
823 msg->fd[0] = -1;
824 }
825 }
826
827
828 static void
nxt_router_app_restart_handler(nxt_task_t * task,nxt_port_recv_msg_t * msg)829 nxt_router_app_restart_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
830 {
831 nxt_app_t *app;
832 nxt_int_t ret;
833 nxt_str_t app_name;
834 nxt_port_t *reply_port, *shared_port, *old_shared_port;
835 nxt_port_t *proto_port;
836 nxt_port_msg_type_t reply;
837
838 reply_port = nxt_runtime_port_find(task->thread->runtime,
839 msg->port_msg.pid,
840 msg->port_msg.reply_port);
841 if (nxt_slow_path(reply_port == NULL)) {
842 nxt_alert(task, "app_restart_handler: reply port not found");
843 return;
844 }
845
846 app_name.length = nxt_buf_mem_used_size(&msg->buf->mem);
847 app_name.start = msg->buf->mem.pos;
848
849 nxt_debug(task, "app_restart_handler: %V", &app_name);
850
851 app = nxt_router_app_find(&nxt_router->apps, &app_name);
852
853 if (nxt_fast_path(app != NULL)) {
854 shared_port = nxt_port_new(task, NXT_SHARED_PORT_ID, nxt_pid,
855 NXT_PROCESS_APP);
856 if (nxt_slow_path(shared_port == NULL)) {
857 goto fail;
858 }
859
860 ret = nxt_port_socket_init(task, shared_port, 0);
861 if (nxt_slow_path(ret != NXT_OK)) {
862 nxt_port_use(task, shared_port, -1);
863 goto fail;
864 }
865
866 ret = nxt_router_app_queue_init(task, shared_port);
867 if (nxt_slow_path(ret != NXT_OK)) {
868 nxt_port_write_close(shared_port);
869 nxt_port_read_close(shared_port);
870 nxt_port_use(task, shared_port, -1);
871 goto fail;
872 }
873
874 nxt_port_write_enable(task, shared_port);
875
876 nxt_thread_mutex_lock(&app->mutex);
877
878 proto_port = app->proto_port;
879
880 if (proto_port != NULL) {
881 nxt_debug(task, "send QUIT to prototype '%V' pid %PI", &app->name,
882 proto_port->pid);
883
884 app->proto_port = NULL;
885 proto_port->app = NULL;
886 }
887
888 app->generation++;
889
890 shared_port->app = app;
891
892 old_shared_port = app->shared_port;
893 old_shared_port->app = NULL;
894
895 app->shared_port = shared_port;
896
897 nxt_thread_mutex_unlock(&app->mutex);
898
899 nxt_port_close(task, old_shared_port);
900 nxt_port_use(task, old_shared_port, -1);
901
902 if (proto_port != NULL) {
903 (void) nxt_port_socket_write(task, proto_port, NXT_PORT_MSG_QUIT,
904 -1, 0, 0, NULL);
905
906 nxt_port_close(task, proto_port);
907
908 nxt_port_use(task, proto_port, -1);
909 }
910
911 reply = NXT_PORT_MSG_RPC_READY_LAST;
912
913 } else {
914
915 fail:
916
917 reply = NXT_PORT_MSG_RPC_ERROR;
918 }
919
920 nxt_port_socket_write(task, reply_port, reply, -1, msg->port_msg.stream,
921 0, NULL);
922 }
923
924
925 static void
nxt_router_status_handler(nxt_task_t * task,nxt_port_recv_msg_t * msg)926 nxt_router_status_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
927 {
928 u_char *p;
929 size_t alloc;
930 nxt_app_t *app;
931 nxt_buf_t *b;
932 nxt_uint_t type;
933 nxt_port_t *port;
934 nxt_status_app_t *app_stat;
935 nxt_event_engine_t *engine;
936 nxt_status_report_t *report;
937
938 port = nxt_runtime_port_find(task->thread->runtime,
939 msg->port_msg.pid,
940 msg->port_msg.reply_port);
941 if (nxt_slow_path(port == NULL)) {
942 nxt_alert(task, "nxt_router_status_handler(): reply port not found");
943 return;
944 }
945
946 alloc = sizeof(nxt_status_report_t);
947
948 nxt_queue_each(app, &nxt_router->apps, nxt_app_t, link) {
949
950 alloc += sizeof(nxt_status_app_t) + app->name.length;
951
952 } nxt_queue_loop;
953
954 b = nxt_buf_mem_alloc(port->mem_pool, alloc, 0);
955 if (nxt_slow_path(b == NULL)) {
956 type = NXT_PORT_MSG_RPC_ERROR;
957 goto fail;
958 }
959
960 report = (nxt_status_report_t *) b->mem.free;
961 b->mem.free = b->mem.end;
962
963 nxt_memzero(report, sizeof(nxt_status_report_t));
964
965 nxt_queue_each(engine, &nxt_router->engines, nxt_event_engine_t, link0) {
966
967 report->accepted_conns += engine->accepted_conns_cnt;
968 report->idle_conns += engine->idle_conns_cnt;
969 report->closed_conns += engine->closed_conns_cnt;
970 report->requests += engine->requests_cnt;
971
972 } nxt_queue_loop;
973
974 report->apps_count = 0;
975 app_stat = report->apps;
976 p = b->mem.end;
977
978 nxt_queue_each(app, &nxt_router->apps, nxt_app_t, link) {
979 p -= app->name.length;
980
981 nxt_memcpy(p, app->name.start, app->name.length);
982
983 app_stat->name.length = app->name.length;
984 app_stat->name.start = (u_char *) (p - b->mem.pos);
985
986 app_stat->active_requests = app->active_requests;
987 app_stat->pending_processes = app->pending_processes;
988 app_stat->processes = app->processes;
989 app_stat->idle_processes = app->idle_processes;
990
991 report->apps_count++;
992 app_stat++;
993 } nxt_queue_loop;
994
995 type = NXT_PORT_MSG_RPC_READY_LAST;
996
997 fail:
998
999 nxt_port_socket_write(task, port, type, -1, msg->port_msg.stream, 0, b);
1000 }
1001
1002
1003 static void
nxt_router_app_process_remove_pid(nxt_task_t * task,nxt_port_t * port,void * data)1004 nxt_router_app_process_remove_pid(nxt_task_t *task, nxt_port_t *port,
1005 void *data)
1006 {
1007 union {
1008 nxt_pid_t removed_pid;
1009 void *data;
1010 } u;
1011
1012 u.data = data;
1013
1014 nxt_port_rpc_remove_peer(task, port, u.removed_pid);
1015 }
1016
1017
1018 static void
nxt_router_remove_pid_handler(nxt_task_t * task,nxt_port_recv_msg_t * msg)1019 nxt_router_remove_pid_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
1020 {
1021 nxt_event_engine_t *engine;
1022
1023 nxt_port_remove_pid_handler(task, msg);
1024
1025 nxt_queue_each(engine, &nxt_router->engines, nxt_event_engine_t, link0)
1026 {
1027 if (nxt_fast_path(engine->port != NULL)) {
1028 nxt_port_post(task, engine->port, nxt_router_app_process_remove_pid,
1029 msg->u.data);
1030 }
1031 }
1032 nxt_queue_loop;
1033
1034 if (msg->port_msg.stream == 0) {
1035 return;
1036 }
1037
1038 msg->port_msg.type = _NXT_PORT_MSG_RPC_ERROR;
1039
1040 nxt_port_rpc_handler(task, msg);
1041 }
1042
1043
1044 static nxt_router_temp_conf_t *
nxt_router_temp_conf(nxt_task_t * task)1045 nxt_router_temp_conf(nxt_task_t *task)
1046 {
1047 nxt_mp_t *mp, *tmp;
1048 nxt_router_conf_t *rtcf;
1049 nxt_router_temp_conf_t *tmcf;
1050
1051 mp = nxt_mp_create(1024, 128, 256, 32);
1052 if (nxt_slow_path(mp == NULL)) {
1053 return NULL;
1054 }
1055
1056 rtcf = nxt_mp_zget(mp, sizeof(nxt_router_conf_t));
1057 if (nxt_slow_path(rtcf == NULL)) {
1058 goto fail;
1059 }
1060
1061 rtcf->mem_pool = mp;
1062
1063 rtcf->tstr_state = nxt_tstr_state_new(mp, 0);
1064 if (nxt_slow_path(rtcf->tstr_state == NULL)) {
1065 goto fail;
1066 }
1067
1068 #if (NXT_HAVE_NJS)
1069 nxt_http_register_js_proto(rtcf->tstr_state->jcf);
1070 #endif
1071
1072 tmp = nxt_mp_create(1024, 128, 256, 32);
1073 if (nxt_slow_path(tmp == NULL)) {
1074 goto fail;
1075 }
1076
1077 tmcf = nxt_mp_zget(tmp, sizeof(nxt_router_temp_conf_t));
1078 if (nxt_slow_path(tmcf == NULL)) {
1079 goto temp_fail;
1080 }
1081
1082 tmcf->mem_pool = tmp;
1083 tmcf->router_conf = rtcf;
1084 tmcf->count = 1;
1085 tmcf->engine = task->thread->engine;
1086
1087 tmcf->engines = nxt_array_create(tmcf->mem_pool, 4,
1088 sizeof(nxt_router_engine_conf_t));
1089 if (nxt_slow_path(tmcf->engines == NULL)) {
1090 goto temp_fail;
1091 }
1092
1093 nxt_queue_init(&creating_sockets);
1094 nxt_queue_init(&pending_sockets);
1095 nxt_queue_init(&updating_sockets);
1096 nxt_queue_init(&keeping_sockets);
1097 nxt_queue_init(&deleting_sockets);
1098
1099 #if (NXT_TLS)
1100 nxt_queue_init(&tmcf->tls);
1101 #endif
1102
1103 nxt_queue_init(&tmcf->apps);
1104 nxt_queue_init(&tmcf->previous);
1105
1106 return tmcf;
1107
1108 temp_fail:
1109
1110 nxt_mp_destroy(tmp);
1111
1112 fail:
1113
1114 if (rtcf->tstr_state != NULL) {
1115 nxt_tstr_state_release(rtcf->tstr_state);
1116 }
1117
1118 nxt_mp_destroy(mp);
1119
1120 return NULL;
1121 }
1122
1123
1124 nxt_inline nxt_bool_t
nxt_router_app_can_start(nxt_app_t * app)1125 nxt_router_app_can_start(nxt_app_t *app)
1126 {
1127 return app->processes + app->pending_processes < app->max_processes
1128 && app->pending_processes < app->max_pending_processes;
1129 }
1130
1131
1132 nxt_inline nxt_bool_t
nxt_router_app_need_start(nxt_app_t * app)1133 nxt_router_app_need_start(nxt_app_t *app)
1134 {
1135 return (app->active_requests
1136 > app->port_hash_count + app->pending_processes)
1137 || (app->spare_processes
1138 > app->idle_processes + app->pending_processes);
1139 }
1140
1141
1142 void
nxt_router_conf_apply(nxt_task_t * task,void * obj,void * data)1143 nxt_router_conf_apply(nxt_task_t *task, void *obj, void *data)
1144 {
1145 nxt_int_t ret;
1146 nxt_app_t *app;
1147 nxt_router_t *router;
1148 nxt_runtime_t *rt;
1149 nxt_queue_link_t *qlk;
1150 nxt_socket_conf_t *skcf;
1151 nxt_router_conf_t *rtcf;
1152 nxt_router_temp_conf_t *tmcf;
1153 const nxt_event_interface_t *interface;
1154 #if (NXT_TLS)
1155 nxt_router_tlssock_t *tls;
1156 #endif
1157
1158 tmcf = obj;
1159
1160 qlk = nxt_queue_first(&pending_sockets);
1161
1162 if (qlk != nxt_queue_tail(&pending_sockets)) {
1163 nxt_queue_remove(qlk);
1164 nxt_queue_insert_tail(&creating_sockets, qlk);
1165
1166 skcf = nxt_queue_link_data(qlk, nxt_socket_conf_t, link);
1167
1168 nxt_router_listen_socket_rpc_create(task, tmcf, skcf);
1169
1170 return;
1171 }
1172
1173 #if (NXT_TLS)
1174 qlk = nxt_queue_last(&tmcf->tls);
1175
1176 if (qlk != nxt_queue_head(&tmcf->tls)) {
1177 nxt_queue_remove(qlk);
1178
1179 tls = nxt_queue_link_data(qlk, nxt_router_tlssock_t, link);
1180
1181 nxt_cert_store_get(task, &tls->name, tmcf->mem_pool,
1182 nxt_router_tls_rpc_handler, tls);
1183 return;
1184 }
1185 #endif
1186
1187 nxt_queue_each(app, &tmcf->apps, nxt_app_t, link) {
1188
1189 if (nxt_router_app_need_start(app)) {
1190 nxt_router_app_rpc_create(task, tmcf, app);
1191 return;
1192 }
1193
1194 } nxt_queue_loop;
1195
1196 rtcf = tmcf->router_conf;
1197
1198 if (rtcf->access_log != NULL && rtcf->access_log->fd == -1) {
1199 nxt_router_access_log_open(task, tmcf);
1200 return;
1201 }
1202
1203 rt = task->thread->runtime;
1204
1205 interface = nxt_service_get(rt->services, "engine", NULL);
1206
1207 router = rtcf->router;
1208
1209 ret = nxt_router_engines_create(task, router, tmcf, interface);
1210 if (nxt_slow_path(ret != NXT_OK)) {
1211 goto fail;
1212 }
1213
1214 ret = nxt_router_threads_create(task, rt, tmcf);
1215 if (nxt_slow_path(ret != NXT_OK)) {
1216 goto fail;
1217 }
1218
1219 nxt_router_apps_sort(task, router, tmcf);
1220
1221 nxt_router_apps_hash_use(task, rtcf, 1);
1222
1223 nxt_router_engines_post(router, tmcf);
1224
1225 nxt_queue_add(&router->sockets, &updating_sockets);
1226 nxt_queue_add(&router->sockets, &creating_sockets);
1227
1228 if (router->access_log != rtcf->access_log) {
1229 nxt_router_access_log_use(&router->lock, rtcf->access_log);
1230
1231 nxt_router_access_log_release(task, &router->lock, router->access_log);
1232
1233 router->access_log = rtcf->access_log;
1234 }
1235
1236 nxt_router_conf_ready(task, tmcf);
1237
1238 return;
1239
1240 fail:
1241
1242 nxt_router_conf_error(task, tmcf);
1243
1244 return;
1245 }
1246
1247
1248 static void
nxt_router_conf_wait(nxt_task_t * task,void * obj,void * data)1249 nxt_router_conf_wait(nxt_task_t *task, void *obj, void *data)
1250 {
1251 nxt_joint_job_t *job;
1252
1253 job = obj;
1254
1255 nxt_router_conf_ready(task, job->tmcf);
1256 }
1257
1258
1259 static void
nxt_router_conf_ready(nxt_task_t * task,nxt_router_temp_conf_t * tmcf)1260 nxt_router_conf_ready(nxt_task_t *task, nxt_router_temp_conf_t *tmcf)
1261 {
1262 uint32_t count;
1263 nxt_router_conf_t *rtcf;
1264 nxt_thread_spinlock_t *lock;
1265
1266 nxt_debug(task, "temp conf %p count: %D", tmcf, tmcf->count);
1267
1268 if (--tmcf->count > 0) {
1269 return;
1270 }
1271
1272 nxt_router_conf_send(task, tmcf, NXT_PORT_MSG_RPC_READY_LAST);
1273
1274 rtcf = tmcf->router_conf;
1275
1276 lock = &rtcf->router->lock;
1277
1278 nxt_thread_spin_lock(lock);
1279
1280 count = rtcf->count;
1281
1282 nxt_thread_spin_unlock(lock);
1283
1284 nxt_debug(task, "rtcf %p: %D", rtcf, count);
1285
1286 if (count == 0) {
1287 nxt_router_apps_hash_use(task, rtcf, -1);
1288
1289 nxt_router_access_log_release(task, lock, rtcf->access_log);
1290
1291 nxt_mp_destroy(rtcf->mem_pool);
1292 }
1293
1294 nxt_mp_release(tmcf->mem_pool);
1295 }
1296
1297
1298 void
nxt_router_conf_error(nxt_task_t * task,nxt_router_temp_conf_t * tmcf)1299 nxt_router_conf_error(nxt_task_t *task, nxt_router_temp_conf_t *tmcf)
1300 {
1301 nxt_app_t *app;
1302 nxt_socket_t s;
1303 nxt_router_t *router;
1304 nxt_queue_link_t *qlk;
1305 nxt_socket_conf_t *skcf;
1306 nxt_router_conf_t *rtcf;
1307
1308 nxt_alert(task, "failed to apply new conf");
1309
1310 for (qlk = nxt_queue_first(&creating_sockets);
1311 qlk != nxt_queue_tail(&creating_sockets);
1312 qlk = nxt_queue_next(qlk))
1313 {
1314 skcf = nxt_queue_link_data(qlk, nxt_socket_conf_t, link);
1315 s = skcf->listen->socket;
1316
1317 if (s != -1) {
1318 nxt_socket_close(task, s);
1319 }
1320
1321 nxt_free(skcf->listen);
1322 }
1323
1324 rtcf = tmcf->router_conf;
1325
1326 nxt_queue_each(app, &tmcf->apps, nxt_app_t, link) {
1327
1328 nxt_router_app_unlink(task, app);
1329
1330 } nxt_queue_loop;
1331
1332 router = rtcf->router;
1333
1334 nxt_queue_add(&router->sockets, &keeping_sockets);
1335 nxt_queue_add(&router->sockets, &deleting_sockets);
1336
1337 nxt_queue_add(&router->apps, &tmcf->previous);
1338
1339 // TODO: new engines and threads
1340
1341 nxt_router_access_log_release(task, &router->lock, rtcf->access_log);
1342
1343 nxt_mp_destroy(rtcf->mem_pool);
1344
1345 nxt_router_conf_send(task, tmcf, NXT_PORT_MSG_RPC_ERROR);
1346
1347 nxt_mp_release(tmcf->mem_pool);
1348 }
1349
1350
1351 static void
nxt_router_conf_send(nxt_task_t * task,nxt_router_temp_conf_t * tmcf,nxt_port_msg_type_t type)1352 nxt_router_conf_send(nxt_task_t *task, nxt_router_temp_conf_t *tmcf,
1353 nxt_port_msg_type_t type)
1354 {
1355 nxt_port_socket_write(task, tmcf->port, type, -1, tmcf->stream, 0, NULL);
1356
1357 nxt_port_use(task, tmcf->port, -1);
1358
1359 tmcf->port = NULL;
1360 }
1361
1362
1363 static nxt_conf_map_t nxt_router_conf[] = {
1364 {
1365 nxt_string("listeners_threads"),
1366 NXT_CONF_MAP_INT32,
1367 offsetof(nxt_router_conf_t, threads),
1368 },
1369 };
1370
1371
1372 static nxt_conf_map_t nxt_router_app_conf[] = {
1373 {
1374 nxt_string("type"),
1375 NXT_CONF_MAP_STR,
1376 offsetof(nxt_router_app_conf_t, type),
1377 },
1378
1379 {
1380 nxt_string("limits"),
1381 NXT_CONF_MAP_PTR,
1382 offsetof(nxt_router_app_conf_t, limits_value),
1383 },
1384
1385 {
1386 nxt_string("processes"),
1387 NXT_CONF_MAP_INT32,
1388 offsetof(nxt_router_app_conf_t, processes),
1389 },
1390
1391 {
1392 nxt_string("processes"),
1393 NXT_CONF_MAP_PTR,
1394 offsetof(nxt_router_app_conf_t, processes_value),
1395 },
1396
1397 {
1398 nxt_string("targets"),
1399 NXT_CONF_MAP_PTR,
1400 offsetof(nxt_router_app_conf_t, targets_value),
1401 },
1402 };
1403
1404
1405 static nxt_conf_map_t nxt_router_app_limits_conf[] = {
1406 {
1407 nxt_string("timeout"),
1408 NXT_CONF_MAP_MSEC,
1409 offsetof(nxt_router_app_conf_t, timeout),
1410 },
1411 };
1412
1413
1414 static nxt_conf_map_t nxt_router_app_processes_conf[] = {
1415 {
1416 nxt_string("spare"),
1417 NXT_CONF_MAP_INT32,
1418 offsetof(nxt_router_app_conf_t, spare_processes),
1419 },
1420
1421 {
1422 nxt_string("max"),
1423 NXT_CONF_MAP_INT32,
1424 offsetof(nxt_router_app_conf_t, max_processes),
1425 },
1426
1427 {
1428 nxt_string("idle_timeout"),
1429 NXT_CONF_MAP_MSEC,
1430 offsetof(nxt_router_app_conf_t, idle_timeout),
1431 },
1432 };
1433
1434
1435 static nxt_conf_map_t nxt_router_listener_conf[] = {
1436 {
1437 nxt_string("pass"),
1438 NXT_CONF_MAP_STR_COPY,
1439 offsetof(nxt_router_listener_conf_t, pass),
1440 },
1441
1442 {
1443 nxt_string("application"),
1444 NXT_CONF_MAP_STR_COPY,
1445 offsetof(nxt_router_listener_conf_t, application),
1446 },
1447 };
1448
1449
1450 static nxt_conf_map_t nxt_router_http_conf[] = {
1451 {
1452 nxt_string("header_buffer_size"),
1453 NXT_CONF_MAP_SIZE,
1454 offsetof(nxt_socket_conf_t, header_buffer_size),
1455 },
1456
1457 {
1458 nxt_string("large_header_buffer_size"),
1459 NXT_CONF_MAP_SIZE,
1460 offsetof(nxt_socket_conf_t, large_header_buffer_size),
1461 },
1462
1463 {
1464 nxt_string("large_header_buffers"),
1465 NXT_CONF_MAP_SIZE,
1466 offsetof(nxt_socket_conf_t, large_header_buffers),
1467 },
1468
1469 {
1470 nxt_string("body_buffer_size"),
1471 NXT_CONF_MAP_SIZE,
1472 offsetof(nxt_socket_conf_t, body_buffer_size),
1473 },
1474
1475 {
1476 nxt_string("max_body_size"),
1477 NXT_CONF_MAP_SIZE,
1478 offsetof(nxt_socket_conf_t, max_body_size),
1479 },
1480
1481 {
1482 nxt_string("idle_timeout"),
1483 NXT_CONF_MAP_MSEC,
1484 offsetof(nxt_socket_conf_t, idle_timeout),
1485 },
1486
1487 {
1488 nxt_string("header_read_timeout"),
1489 NXT_CONF_MAP_MSEC,
1490 offsetof(nxt_socket_conf_t, header_read_timeout),
1491 },
1492
1493 {
1494 nxt_string("body_read_timeout"),
1495 NXT_CONF_MAP_MSEC,
1496 offsetof(nxt_socket_conf_t, body_read_timeout),
1497 },
1498
1499 {
1500 nxt_string("send_timeout"),
1501 NXT_CONF_MAP_MSEC,
1502 offsetof(nxt_socket_conf_t, send_timeout),
1503 },
1504
1505 {
1506 nxt_string("body_temp_path"),
1507 NXT_CONF_MAP_STR,
1508 offsetof(nxt_socket_conf_t, body_temp_path),
1509 },
1510
1511 {
1512 nxt_string("discard_unsafe_fields"),
1513 NXT_CONF_MAP_INT8,
1514 offsetof(nxt_socket_conf_t, discard_unsafe_fields),
1515 },
1516 };
1517
1518
1519 static nxt_conf_map_t nxt_router_websocket_conf[] = {
1520 {
1521 nxt_string("max_frame_size"),
1522 NXT_CONF_MAP_SIZE,
1523 offsetof(nxt_websocket_conf_t, max_frame_size),
1524 },
1525
1526 {
1527 nxt_string("read_timeout"),
1528 NXT_CONF_MAP_MSEC,
1529 offsetof(nxt_websocket_conf_t, read_timeout),
1530 },
1531
1532 {
1533 nxt_string("keepalive_interval"),
1534 NXT_CONF_MAP_MSEC,
1535 offsetof(nxt_websocket_conf_t, keepalive_interval),
1536 },
1537
1538 };
1539
1540
1541 static nxt_int_t
nxt_router_conf_create(nxt_task_t * task,nxt_router_temp_conf_t * tmcf,u_char * start,u_char * end)1542 nxt_router_conf_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf,
1543 u_char *start, u_char *end)
1544 {
1545 u_char *p;
1546 size_t size;
1547 nxt_mp_t *mp, *app_mp;
1548 uint32_t next, next_target;
1549 nxt_int_t ret;
1550 nxt_str_t name, target;
1551 nxt_app_t *app, *prev;
1552 nxt_str_t *t, *s, *targets;
1553 nxt_uint_t n, i;
1554 nxt_port_t *port;
1555 nxt_router_t *router;
1556 nxt_app_joint_t *app_joint;
1557 #if (NXT_TLS)
1558 nxt_tls_init_t *tls_init;
1559 nxt_conf_value_t *certificate;
1560 #endif
1561 nxt_conf_value_t *root, *conf, *http, *value, *websocket;
1562 nxt_conf_value_t *applications, *application;
1563 nxt_conf_value_t *listeners, *listener;
1564 nxt_socket_conf_t *skcf;
1565 nxt_router_conf_t *rtcf;
1566 nxt_http_routes_t *routes;
1567 nxt_event_engine_t *engine;
1568 nxt_app_lang_module_t *lang;
1569 nxt_router_app_conf_t apcf;
1570 nxt_router_listener_conf_t lscf;
1571
1572 static nxt_str_t http_path = nxt_string("/settings/http");
1573 static nxt_str_t applications_path = nxt_string("/applications");
1574 static nxt_str_t listeners_path = nxt_string("/listeners");
1575 static nxt_str_t routes_path = nxt_string("/routes");
1576 static nxt_str_t access_log_path = nxt_string("/access_log");
1577 #if (NXT_TLS)
1578 static nxt_str_t certificate_path = nxt_string("/tls/certificate");
1579 static nxt_str_t conf_commands_path = nxt_string("/tls/conf_commands");
1580 static nxt_str_t conf_cache_path = nxt_string("/tls/session/cache_size");
1581 static nxt_str_t conf_timeout_path = nxt_string("/tls/session/timeout");
1582 static nxt_str_t conf_tickets = nxt_string("/tls/session/tickets");
1583 #endif
1584 static nxt_str_t static_path = nxt_string("/settings/http/static");
1585 static nxt_str_t websocket_path = nxt_string("/settings/http/websocket");
1586 static nxt_str_t forwarded_path = nxt_string("/forwarded");
1587 static nxt_str_t client_ip_path = nxt_string("/client_ip");
1588
1589 root = nxt_conf_json_parse(tmcf->mem_pool, start, end, NULL);
1590 if (root == NULL) {
1591 nxt_alert(task, "configuration parsing error");
1592 return NXT_ERROR;
1593 }
1594
1595 rtcf = tmcf->router_conf;
1596 mp = rtcf->mem_pool;
1597
1598 ret = nxt_conf_map_object(mp, root, nxt_router_conf,
1599 nxt_nitems(nxt_router_conf), rtcf);
1600 if (ret != NXT_OK) {
1601 nxt_alert(task, "root map error");
1602 return NXT_ERROR;
1603 }
1604
1605 if (rtcf->threads == 0) {
1606 rtcf->threads = nxt_ncpu;
1607 }
1608
1609 conf = nxt_conf_get_path(root, &static_path);
1610
1611 ret = nxt_router_conf_process_static(task, rtcf, conf);
1612 if (nxt_slow_path(ret != NXT_OK)) {
1613 return NXT_ERROR;
1614 }
1615
1616 router = rtcf->router;
1617
1618 applications = nxt_conf_get_path(root, &applications_path);
1619
1620 if (applications != NULL) {
1621 next = 0;
1622
1623 for ( ;; ) {
1624 application = nxt_conf_next_object_member(applications,
1625 &name, &next);
1626 if (application == NULL) {
1627 break;
1628 }
1629
1630 nxt_debug(task, "application \"%V\"", &name);
1631
1632 size = nxt_conf_json_length(application, NULL);
1633
1634 app_mp = nxt_mp_create(4096, 128, 1024, 64);
1635 if (nxt_slow_path(app_mp == NULL)) {
1636 goto fail;
1637 }
1638
1639 app = nxt_mp_get(app_mp, sizeof(nxt_app_t) + name.length + size);
1640 if (app == NULL) {
1641 goto app_fail;
1642 }
1643
1644 nxt_memzero(app, sizeof(nxt_app_t));
1645
1646 app->mem_pool = app_mp;
1647
1648 app->name.start = nxt_pointer_to(app, sizeof(nxt_app_t));
1649 app->conf.start = nxt_pointer_to(app, sizeof(nxt_app_t)
1650 + name.length);
1651
1652 p = nxt_conf_json_print(app->conf.start, application, NULL);
1653 app->conf.length = p - app->conf.start;
1654
1655 nxt_assert(app->conf.length <= size);
1656
1657 nxt_debug(task, "application conf \"%V\"", &app->conf);
1658
1659 prev = nxt_router_app_find(&router->apps, &name);
1660
1661 if (prev != NULL && nxt_strstr_eq(&app->conf, &prev->conf)) {
1662 nxt_mp_destroy(app_mp);
1663
1664 nxt_queue_remove(&prev->link);
1665 nxt_queue_insert_tail(&tmcf->previous, &prev->link);
1666
1667 ret = nxt_router_apps_hash_add(rtcf, prev);
1668 if (nxt_slow_path(ret != NXT_OK)) {
1669 goto fail;
1670 }
1671
1672 continue;
1673 }
1674
1675 apcf.processes = 1;
1676 apcf.max_processes = 1;
1677 apcf.spare_processes = 0;
1678 apcf.timeout = 0;
1679 apcf.idle_timeout = 15000;
1680 apcf.limits_value = NULL;
1681 apcf.processes_value = NULL;
1682 apcf.targets_value = NULL;
1683
1684 app_joint = nxt_malloc(sizeof(nxt_app_joint_t));
1685 if (nxt_slow_path(app_joint == NULL)) {
1686 goto app_fail;
1687 }
1688
1689 nxt_memzero(app_joint, sizeof(nxt_app_joint_t));
1690
1691 ret = nxt_conf_map_object(mp, application, nxt_router_app_conf,
1692 nxt_nitems(nxt_router_app_conf), &apcf);
1693 if (ret != NXT_OK) {
1694 nxt_alert(task, "application map error");
1695 goto app_fail;
1696 }
1697
1698 if (apcf.limits_value != NULL) {
1699
1700 if (nxt_conf_type(apcf.limits_value) != NXT_CONF_OBJECT) {
1701 nxt_alert(task, "application limits is not object");
1702 goto app_fail;
1703 }
1704
1705 ret = nxt_conf_map_object(mp, apcf.limits_value,
1706 nxt_router_app_limits_conf,
1707 nxt_nitems(nxt_router_app_limits_conf),
1708 &apcf);
1709 if (ret != NXT_OK) {
1710 nxt_alert(task, "application limits map error");
1711 goto app_fail;
1712 }
1713 }
1714
1715 if (apcf.processes_value != NULL
1716 && nxt_conf_type(apcf.processes_value) == NXT_CONF_OBJECT)
1717 {
1718 ret = nxt_conf_map_object(mp, apcf.processes_value,
1719 nxt_router_app_processes_conf,
1720 nxt_nitems(nxt_router_app_processes_conf),
1721 &apcf);
1722 if (ret != NXT_OK) {
1723 nxt_alert(task, "application processes map error");
1724 goto app_fail;
1725 }
1726
1727 } else {
1728 apcf.max_processes = apcf.processes;
1729 apcf.spare_processes = apcf.processes;
1730 }
1731
1732 if (apcf.targets_value != NULL) {
1733 n = nxt_conf_object_members_count(apcf.targets_value);
1734
1735 targets = nxt_mp_get(app_mp, sizeof(nxt_str_t) * n);
1736 if (nxt_slow_path(targets == NULL)) {
1737 goto app_fail;
1738 }
1739
1740 next_target = 0;
1741
1742 for (i = 0; i < n; i++) {
1743 (void) nxt_conf_next_object_member(apcf.targets_value,
1744 &target, &next_target);
1745
1746 s = nxt_str_dup(app_mp, &targets[i], &target);
1747 if (nxt_slow_path(s == NULL)) {
1748 goto app_fail;
1749 }
1750 }
1751
1752 } else {
1753 targets = NULL;
1754 }
1755
1756 nxt_debug(task, "application type: %V", &apcf.type);
1757 nxt_debug(task, "application processes: %D", apcf.processes);
1758 nxt_debug(task, "application request timeout: %M", apcf.timeout);
1759
1760 lang = nxt_app_lang_module(task->thread->runtime, &apcf.type);
1761
1762 if (lang == NULL) {
1763 nxt_alert(task, "unknown application type: \"%V\"", &apcf.type);
1764 goto app_fail;
1765 }
1766
1767 nxt_debug(task, "application language module: \"%s\"", lang->file);
1768
1769 ret = nxt_thread_mutex_create(&app->mutex);
1770 if (ret != NXT_OK) {
1771 goto app_fail;
1772 }
1773
1774 nxt_queue_init(&app->ports);
1775 nxt_queue_init(&app->spare_ports);
1776 nxt_queue_init(&app->idle_ports);
1777 nxt_queue_init(&app->ack_waiting_req);
1778
1779 app->name.length = name.length;
1780 nxt_memcpy(app->name.start, name.start, name.length);
1781
1782 app->type = lang->type;
1783 app->max_processes = apcf.max_processes;
1784 app->spare_processes = apcf.spare_processes;
1785 app->max_pending_processes = apcf.spare_processes
1786 ? apcf.spare_processes : 1;
1787 app->timeout = apcf.timeout;
1788 app->idle_timeout = apcf.idle_timeout;
1789
1790 app->targets = targets;
1791
1792 engine = task->thread->engine;
1793
1794 app->engine = engine;
1795
1796 app->adjust_idle_work.handler = nxt_router_adjust_idle_timer;
1797 app->adjust_idle_work.task = &engine->task;
1798 app->adjust_idle_work.obj = app;
1799
1800 nxt_queue_insert_tail(&tmcf->apps, &app->link);
1801
1802 ret = nxt_router_apps_hash_add(rtcf, app);
1803 if (nxt_slow_path(ret != NXT_OK)) {
1804 goto app_fail;
1805 }
1806
1807 nxt_router_app_use(task, app, 1);
1808
1809 app->joint = app_joint;
1810
1811 app_joint->use_count = 1;
1812 app_joint->app = app;
1813
1814 app_joint->idle_timer.bias = NXT_TIMER_DEFAULT_BIAS;
1815 app_joint->idle_timer.work_queue = &engine->fast_work_queue;
1816 app_joint->idle_timer.handler = nxt_router_app_idle_timeout;
1817 app_joint->idle_timer.task = &engine->task;
1818 app_joint->idle_timer.log = app_joint->idle_timer.task->log;
1819
1820 app_joint->free_app_work.handler = nxt_router_free_app;
1821 app_joint->free_app_work.task = &engine->task;
1822 app_joint->free_app_work.obj = app_joint;
1823
1824 port = nxt_port_new(task, NXT_SHARED_PORT_ID, nxt_pid,
1825 NXT_PROCESS_APP);
1826 if (nxt_slow_path(port == NULL)) {
1827 return NXT_ERROR;
1828 }
1829
1830 ret = nxt_port_socket_init(task, port, 0);
1831 if (nxt_slow_path(ret != NXT_OK)) {
1832 nxt_port_use(task, port, -1);
1833 return NXT_ERROR;
1834 }
1835
1836 ret = nxt_router_app_queue_init(task, port);
1837 if (nxt_slow_path(ret != NXT_OK)) {
1838 nxt_port_write_close(port);
1839 nxt_port_read_close(port);
1840 nxt_port_use(task, port, -1);
1841 return NXT_ERROR;
1842 }
1843
1844 nxt_port_write_enable(task, port);
1845 port->app = app;
1846
1847 app->shared_port = port;
1848
1849 nxt_thread_mutex_create(&app->outgoing.mutex);
1850 }
1851 }
1852
1853 conf = nxt_conf_get_path(root, &routes_path);
1854 if (nxt_fast_path(conf != NULL)) {
1855 routes = nxt_http_routes_create(task, tmcf, conf);
1856 if (nxt_slow_path(routes == NULL)) {
1857 return NXT_ERROR;
1858 }
1859
1860 rtcf->routes = routes;
1861 }
1862
1863 ret = nxt_upstreams_create(task, tmcf, root);
1864 if (nxt_slow_path(ret != NXT_OK)) {
1865 return ret;
1866 }
1867
1868 http = nxt_conf_get_path(root, &http_path);
1869 #if 0
1870 if (http == NULL) {
1871 nxt_alert(task, "no \"http\" block");
1872 return NXT_ERROR;
1873 }
1874 #endif
1875
1876 websocket = nxt_conf_get_path(root, &websocket_path);
1877
1878 listeners = nxt_conf_get_path(root, &listeners_path);
1879
1880 if (listeners != NULL) {
1881 next = 0;
1882
1883 for ( ;; ) {
1884 listener = nxt_conf_next_object_member(listeners, &name, &next);
1885 if (listener == NULL) {
1886 break;
1887 }
1888
1889 skcf = nxt_router_socket_conf(task, tmcf, &name);
1890 if (skcf == NULL) {
1891 goto fail;
1892 }
1893
1894 nxt_memzero(&lscf, sizeof(lscf));
1895
1896 ret = nxt_conf_map_object(mp, listener, nxt_router_listener_conf,
1897 nxt_nitems(nxt_router_listener_conf),
1898 &lscf);
1899 if (ret != NXT_OK) {
1900 nxt_alert(task, "listener map error");
1901 goto fail;
1902 }
1903
1904 nxt_debug(task, "application: %V", &lscf.application);
1905
1906 // STUB, default values if http block is not defined.
1907 skcf->header_buffer_size = 2048;
1908 skcf->large_header_buffer_size = 8192;
1909 skcf->large_header_buffers = 4;
1910 skcf->discard_unsafe_fields = 1;
1911 skcf->body_buffer_size = 16 * 1024;
1912 skcf->max_body_size = 8 * 1024 * 1024;
1913 skcf->proxy_header_buffer_size = 64 * 1024;
1914 skcf->proxy_buffer_size = 4096;
1915 skcf->proxy_buffers = 256;
1916 skcf->idle_timeout = 180 * 1000;
1917 skcf->header_read_timeout = 30 * 1000;
1918 skcf->body_read_timeout = 30 * 1000;
1919 skcf->send_timeout = 30 * 1000;
1920 skcf->proxy_timeout = 60 * 1000;
1921 skcf->proxy_send_timeout = 30 * 1000;
1922 skcf->proxy_read_timeout = 30 * 1000;
1923
1924 skcf->websocket_conf.max_frame_size = 1024 * 1024;
1925 skcf->websocket_conf.read_timeout = 60 * 1000;
1926 skcf->websocket_conf.keepalive_interval = 30 * 1000;
1927
1928 nxt_str_null(&skcf->body_temp_path);
1929
1930 if (http != NULL) {
1931 ret = nxt_conf_map_object(mp, http, nxt_router_http_conf,
1932 nxt_nitems(nxt_router_http_conf),
1933 skcf);
1934 if (ret != NXT_OK) {
1935 nxt_alert(task, "http map error");
1936 goto fail;
1937 }
1938 }
1939
1940 if (websocket != NULL) {
1941 ret = nxt_conf_map_object(mp, websocket,
1942 nxt_router_websocket_conf,
1943 nxt_nitems(nxt_router_websocket_conf),
1944 &skcf->websocket_conf);
1945 if (ret != NXT_OK) {
1946 nxt_alert(task, "websocket map error");
1947 goto fail;
1948 }
1949 }
1950
1951 t = &skcf->body_temp_path;
1952
1953 if (t->length == 0) {
1954 t->start = (u_char *) task->thread->runtime->tmp;
1955 t->length = nxt_strlen(t->start);
1956 }
1957
1958 conf = nxt_conf_get_path(listener, &forwarded_path);
1959
1960 if (conf != NULL) {
1961 skcf->forwarded = nxt_router_conf_forward(task, mp, conf);
1962 if (nxt_slow_path(skcf->forwarded == NULL)) {
1963 return NXT_ERROR;
1964 }
1965 }
1966
1967 conf = nxt_conf_get_path(listener, &client_ip_path);
1968
1969 if (conf != NULL) {
1970 skcf->client_ip = nxt_router_conf_forward(task, mp, conf);
1971 if (nxt_slow_path(skcf->client_ip == NULL)) {
1972 return NXT_ERROR;
1973 }
1974 }
1975
1976 #if (NXT_TLS)
1977 certificate = nxt_conf_get_path(listener, &certificate_path);
1978
1979 if (certificate != NULL) {
1980 tls_init = nxt_mp_get(tmcf->mem_pool, sizeof(nxt_tls_init_t));
1981 if (nxt_slow_path(tls_init == NULL)) {
1982 return NXT_ERROR;
1983 }
1984
1985 tls_init->cache_size = 0;
1986 tls_init->timeout = 300;
1987
1988 value = nxt_conf_get_path(listener, &conf_cache_path);
1989 if (value != NULL) {
1990 tls_init->cache_size = nxt_conf_get_number(value);
1991 }
1992
1993 value = nxt_conf_get_path(listener, &conf_timeout_path);
1994 if (value != NULL) {
1995 tls_init->timeout = nxt_conf_get_number(value);
1996 }
1997
1998 tls_init->conf_cmds = nxt_conf_get_path(listener,
1999 &conf_commands_path);
2000
2001 tls_init->tickets_conf = nxt_conf_get_path(listener,
2002 &conf_tickets);
2003
2004 n = nxt_conf_array_elements_count_or_1(certificate);
2005
2006 for (i = 0; i < n; i++) {
2007 value = nxt_conf_get_array_element_or_itself(certificate,
2008 i);
2009 nxt_assert(value != NULL);
2010
2011 ret = nxt_router_conf_tls_insert(tmcf, value, skcf,
2012 tls_init, i == 0);
2013 if (nxt_slow_path(ret != NXT_OK)) {
2014 goto fail;
2015 }
2016 }
2017 }
2018 #endif
2019
2020 skcf->listen->handler = nxt_http_conn_init;
2021 skcf->router_conf = rtcf;
2022 skcf->router_conf->count++;
2023
2024 if (lscf.pass.length != 0) {
2025 skcf->action = nxt_http_action_create(task, tmcf, &lscf.pass);
2026
2027 /* COMPATIBILITY: listener application. */
2028 } else if (lscf.application.length > 0) {
2029 skcf->action = nxt_http_pass_application(task, rtcf,
2030 &lscf.application);
2031 }
2032
2033 if (nxt_slow_path(skcf->action == NULL)) {
2034 goto fail;
2035 }
2036 }
2037 }
2038
2039 ret = nxt_http_routes_resolve(task, tmcf);
2040 if (nxt_slow_path(ret != NXT_OK)) {
2041 goto fail;
2042 }
2043
2044 value = nxt_conf_get_path(root, &access_log_path);
2045
2046 if (value != NULL) {
2047 ret = nxt_router_access_log_create(task, rtcf, value);
2048 if (nxt_slow_path(ret != NXT_OK)) {
2049 goto fail;
2050 }
2051 }
2052
2053 ret = nxt_tstr_state_done(rtcf->tstr_state, NULL);
2054 if (nxt_slow_path(ret != NXT_OK)) {
2055 goto fail;
2056 }
2057
2058 nxt_queue_add(&deleting_sockets, &router->sockets);
2059 nxt_queue_init(&router->sockets);
2060
2061 return NXT_OK;
2062
2063 app_fail:
2064
2065 nxt_mp_destroy(app_mp);
2066
2067 fail:
2068
2069 nxt_queue_each(app, &tmcf->apps, nxt_app_t, link) {
2070
2071 nxt_queue_remove(&app->link);
2072 nxt_thread_mutex_destroy(&app->mutex);
2073 nxt_mp_destroy(app->mem_pool);
2074
2075 } nxt_queue_loop;
2076
2077 return NXT_ERROR;
2078 }
2079
2080
2081 #if (NXT_TLS)
2082
2083 static nxt_int_t
nxt_router_conf_tls_insert(nxt_router_temp_conf_t * tmcf,nxt_conf_value_t * value,nxt_socket_conf_t * skcf,nxt_tls_init_t * tls_init,nxt_bool_t last)2084 nxt_router_conf_tls_insert(nxt_router_temp_conf_t *tmcf,
2085 nxt_conf_value_t *value, nxt_socket_conf_t *skcf,
2086 nxt_tls_init_t *tls_init, nxt_bool_t last)
2087 {
2088 nxt_router_tlssock_t *tls;
2089
2090 tls = nxt_mp_get(tmcf->mem_pool, sizeof(nxt_router_tlssock_t));
2091 if (nxt_slow_path(tls == NULL)) {
2092 return NXT_ERROR;
2093 }
2094
2095 tls->tls_init = tls_init;
2096 tls->socket_conf = skcf;
2097 tls->temp_conf = tmcf;
2098 tls->last = last;
2099 nxt_conf_get_string(value, &tls->name);
2100
2101 nxt_queue_insert_tail(&tmcf->tls, &tls->link);
2102
2103 return NXT_OK;
2104 }
2105
2106 #endif
2107
2108
2109 static nxt_int_t
nxt_router_conf_process_static(nxt_task_t * task,nxt_router_conf_t * rtcf,nxt_conf_value_t * conf)2110 nxt_router_conf_process_static(nxt_task_t *task, nxt_router_conf_t *rtcf,
2111 nxt_conf_value_t *conf)
2112 {
2113 uint32_t next, i;
2114 nxt_mp_t *mp;
2115 nxt_str_t *type, exten, str;
2116 nxt_int_t ret;
2117 nxt_uint_t exts;
2118 nxt_conf_value_t *mtypes_conf, *ext_conf, *value;
2119
2120 static nxt_str_t mtypes_path = nxt_string("/mime_types");
2121
2122 mp = rtcf->mem_pool;
2123
2124 ret = nxt_http_static_mtypes_init(mp, &rtcf->mtypes_hash);
2125 if (nxt_slow_path(ret != NXT_OK)) {
2126 return NXT_ERROR;
2127 }
2128
2129 if (conf == NULL) {
2130 return NXT_OK;
2131 }
2132
2133 mtypes_conf = nxt_conf_get_path(conf, &mtypes_path);
2134
2135 if (mtypes_conf != NULL) {
2136 next = 0;
2137
2138 for ( ;; ) {
2139 ext_conf = nxt_conf_next_object_member(mtypes_conf, &str, &next);
2140
2141 if (ext_conf == NULL) {
2142 break;
2143 }
2144
2145 type = nxt_str_dup(mp, NULL, &str);
2146 if (nxt_slow_path(type == NULL)) {
2147 return NXT_ERROR;
2148 }
2149
2150 if (nxt_conf_type(ext_conf) == NXT_CONF_STRING) {
2151 nxt_conf_get_string(ext_conf, &str);
2152
2153 if (nxt_slow_path(nxt_str_dup(mp, &exten, &str) == NULL)) {
2154 return NXT_ERROR;
2155 }
2156
2157 ret = nxt_http_static_mtypes_hash_add(mp, &rtcf->mtypes_hash,
2158 &exten, type);
2159 if (nxt_slow_path(ret != NXT_OK)) {
2160 return NXT_ERROR;
2161 }
2162
2163 continue;
2164 }
2165
2166 exts = nxt_conf_array_elements_count(ext_conf);
2167
2168 for (i = 0; i < exts; i++) {
2169 value = nxt_conf_get_array_element(ext_conf, i);
2170
2171 nxt_conf_get_string(value, &str);
2172
2173 if (nxt_slow_path(nxt_str_dup(mp, &exten, &str) == NULL)) {
2174 return NXT_ERROR;
2175 }
2176
2177 ret = nxt_http_static_mtypes_hash_add(mp, &rtcf->mtypes_hash,
2178 &exten, type);
2179 if (nxt_slow_path(ret != NXT_OK)) {
2180 return NXT_ERROR;
2181 }
2182 }
2183 }
2184 }
2185
2186 return NXT_OK;
2187 }
2188
2189
2190 static nxt_http_forward_t *
nxt_router_conf_forward(nxt_task_t * task,nxt_mp_t * mp,nxt_conf_value_t * conf)2191 nxt_router_conf_forward(nxt_task_t *task, nxt_mp_t *mp, nxt_conf_value_t *conf)
2192 {
2193 nxt_int_t ret;
2194 nxt_conf_value_t *header_conf, *client_ip_conf, *protocol_conf;
2195 nxt_conf_value_t *source_conf, *recursive_conf;
2196 nxt_http_forward_t *forward;
2197 nxt_http_route_addr_rule_t *source;
2198
2199 static nxt_str_t header_path = nxt_string("/header");
2200 static nxt_str_t client_ip_path = nxt_string("/client_ip");
2201 static nxt_str_t protocol_path = nxt_string("/protocol");
2202 static nxt_str_t source_path = nxt_string("/source");
2203 static nxt_str_t recursive_path = nxt_string("/recursive");
2204
2205 header_conf = nxt_conf_get_path(conf, &header_path);
2206
2207 if (header_conf != NULL) {
2208 client_ip_conf = nxt_conf_get_path(conf, &header_path);
2209 protocol_conf = NULL;
2210
2211 } else {
2212 client_ip_conf = nxt_conf_get_path(conf, &client_ip_path);
2213 protocol_conf = nxt_conf_get_path(conf, &protocol_path);
2214 }
2215
2216 source_conf = nxt_conf_get_path(conf, &source_path);
2217 recursive_conf = nxt_conf_get_path(conf, &recursive_path);
2218
2219 if (source_conf == NULL
2220 || (protocol_conf == NULL && client_ip_conf == NULL))
2221 {
2222 return NULL;
2223 }
2224
2225 forward = nxt_mp_zget(mp, sizeof(nxt_http_forward_t));
2226 if (nxt_slow_path(forward == NULL)) {
2227 return NULL;
2228 }
2229
2230 source = nxt_http_route_addr_rule_create(task, mp, source_conf);
2231 if (nxt_slow_path(source == NULL)) {
2232 return NULL;
2233 }
2234
2235 forward->source = source;
2236
2237 if (recursive_conf != NULL) {
2238 forward->recursive = nxt_conf_get_boolean(recursive_conf);
2239 }
2240
2241 if (client_ip_conf != NULL) {
2242 ret = nxt_router_conf_forward_header(mp, client_ip_conf,
2243 &forward->client_ip);
2244 if (nxt_slow_path(ret != NXT_OK)) {
2245 return NULL;
2246 }
2247 }
2248
2249 if (protocol_conf != NULL) {
2250 ret = nxt_router_conf_forward_header(mp, protocol_conf,
2251 &forward->protocol);
2252 if (nxt_slow_path(ret != NXT_OK)) {
2253 return NULL;
2254 }
2255 }
2256
2257 return forward;
2258 }
2259
2260
2261 static nxt_int_t
nxt_router_conf_forward_header(nxt_mp_t * mp,nxt_conf_value_t * conf,nxt_http_forward_header_t * fh)2262 nxt_router_conf_forward_header(nxt_mp_t *mp, nxt_conf_value_t *conf,
2263 nxt_http_forward_header_t *fh)
2264 {
2265 char c;
2266 size_t i;
2267 uint32_t hash;
2268 nxt_str_t header;
2269
2270 nxt_conf_get_string(conf, &header);
2271
2272 fh->header = nxt_str_dup(mp, NULL, &header);
2273 if (nxt_slow_path(fh->header == NULL)) {
2274 return NXT_ERROR;
2275 }
2276
2277 hash = NXT_HTTP_FIELD_HASH_INIT;
2278
2279 for (i = 0; i < fh->header->length; i++) {
2280 c = fh->header->start[i];
2281 hash = nxt_http_field_hash_char(hash, nxt_lowcase(c));
2282 }
2283
2284 hash = nxt_http_field_hash_end(hash) & 0xFFFF;
2285
2286 fh->header_hash = hash;
2287
2288 return NXT_OK;
2289 }
2290
2291
2292 static nxt_app_t *
nxt_router_app_find(nxt_queue_t * queue,nxt_str_t * name)2293 nxt_router_app_find(nxt_queue_t *queue, nxt_str_t *name)
2294 {
2295 nxt_app_t *app;
2296
2297 nxt_queue_each(app, queue, nxt_app_t, link) {
2298
2299 if (nxt_strstr_eq(name, &app->name)) {
2300 return app;
2301 }
2302
2303 } nxt_queue_loop;
2304
2305 return NULL;
2306 }
2307
2308
2309 static nxt_int_t
nxt_router_app_queue_init(nxt_task_t * task,nxt_port_t * port)2310 nxt_router_app_queue_init(nxt_task_t *task, nxt_port_t *port)
2311 {
2312 void *mem;
2313 nxt_int_t fd;
2314
2315 fd = nxt_shm_open(task, sizeof(nxt_app_queue_t));
2316 if (nxt_slow_path(fd == -1)) {
2317 return NXT_ERROR;
2318 }
2319
2320 mem = nxt_mem_mmap(NULL, sizeof(nxt_app_queue_t),
2321 PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
2322 if (nxt_slow_path(mem == MAP_FAILED)) {
2323 nxt_fd_close(fd);
2324
2325 return NXT_ERROR;
2326 }
2327
2328 nxt_app_queue_init(mem);
2329
2330 port->queue_fd = fd;
2331 port->queue = mem;
2332
2333 return NXT_OK;
2334 }
2335
2336
2337 static nxt_int_t
nxt_router_port_queue_init(nxt_task_t * task,nxt_port_t * port)2338 nxt_router_port_queue_init(nxt_task_t *task, nxt_port_t *port)
2339 {
2340 void *mem;
2341 nxt_int_t fd;
2342
2343 fd = nxt_shm_open(task, sizeof(nxt_port_queue_t));
2344 if (nxt_slow_path(fd == -1)) {
2345 return NXT_ERROR;
2346 }
2347
2348 mem = nxt_mem_mmap(NULL, sizeof(nxt_port_queue_t),
2349 PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
2350 if (nxt_slow_path(mem == MAP_FAILED)) {
2351 nxt_fd_close(fd);
2352
2353 return NXT_ERROR;
2354 }
2355
2356 nxt_port_queue_init(mem);
2357
2358 port->queue_fd = fd;
2359 port->queue = mem;
2360
2361 return NXT_OK;
2362 }
2363
2364
2365 static nxt_int_t
nxt_router_port_queue_map(nxt_task_t * task,nxt_port_t * port,nxt_fd_t fd)2366 nxt_router_port_queue_map(nxt_task_t *task, nxt_port_t *port, nxt_fd_t fd)
2367 {
2368 void *mem;
2369
2370 nxt_assert(fd != -1);
2371
2372 mem = nxt_mem_mmap(NULL, sizeof(nxt_port_queue_t),
2373 PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
2374 if (nxt_slow_path(mem == MAP_FAILED)) {
2375
2376 return NXT_ERROR;
2377 }
2378
2379 port->queue = mem;
2380
2381 return NXT_OK;
2382 }
2383
2384
2385 static const nxt_lvlhsh_proto_t nxt_router_apps_hash_proto nxt_aligned(64) = {
2386 NXT_LVLHSH_DEFAULT,
2387 nxt_router_apps_hash_test,
2388 nxt_mp_lvlhsh_alloc,
2389 nxt_mp_lvlhsh_free,
2390 };
2391
2392
2393 static nxt_int_t
nxt_router_apps_hash_test(nxt_lvlhsh_query_t * lhq,void * data)2394 nxt_router_apps_hash_test(nxt_lvlhsh_query_t *lhq, void *data)
2395 {
2396 nxt_app_t *app;
2397
2398 app = data;
2399
2400 return nxt_strstr_eq(&lhq->key, &app->name) ? NXT_OK : NXT_DECLINED;
2401 }
2402
2403
2404 static nxt_int_t
nxt_router_apps_hash_add(nxt_router_conf_t * rtcf,nxt_app_t * app)2405 nxt_router_apps_hash_add(nxt_router_conf_t *rtcf, nxt_app_t *app)
2406 {
2407 nxt_lvlhsh_query_t lhq;
2408
2409 lhq.key_hash = nxt_djb_hash(app->name.start, app->name.length);
2410 lhq.replace = 0;
2411 lhq.key = app->name;
2412 lhq.value = app;
2413 lhq.proto = &nxt_router_apps_hash_proto;
2414 lhq.pool = rtcf->mem_pool;
2415
2416 switch (nxt_lvlhsh_insert(&rtcf->apps_hash, &lhq)) {
2417
2418 case NXT_OK:
2419 return NXT_OK;
2420
2421 case NXT_DECLINED:
2422 nxt_thread_log_alert("router app hash adding failed: "
2423 "\"%V\" is already in hash", &lhq.key);
2424 /* Fall through. */
2425 default:
2426 return NXT_ERROR;
2427 }
2428 }
2429
2430
2431 static nxt_app_t *
nxt_router_apps_hash_get(nxt_router_conf_t * rtcf,nxt_str_t * name)2432 nxt_router_apps_hash_get(nxt_router_conf_t *rtcf, nxt_str_t *name)
2433 {
2434 nxt_lvlhsh_query_t lhq;
2435
2436 lhq.key_hash = nxt_djb_hash(name->start, name->length);
2437 lhq.key = *name;
2438 lhq.proto = &nxt_router_apps_hash_proto;
2439
2440 if (nxt_lvlhsh_find(&rtcf->apps_hash, &lhq) != NXT_OK) {
2441 return NULL;
2442 }
2443
2444 return lhq.value;
2445 }
2446
2447
2448 static void
nxt_router_apps_hash_use(nxt_task_t * task,nxt_router_conf_t * rtcf,int i)2449 nxt_router_apps_hash_use(nxt_task_t *task, nxt_router_conf_t *rtcf, int i)
2450 {
2451 nxt_app_t *app;
2452 nxt_lvlhsh_each_t lhe;
2453
2454 nxt_lvlhsh_each_init(&lhe, &nxt_router_apps_hash_proto);
2455
2456 for ( ;; ) {
2457 app = nxt_lvlhsh_each(&rtcf->apps_hash, &lhe);
2458
2459 if (app == NULL) {
2460 break;
2461 }
2462
2463 nxt_router_app_use(task, app, i);
2464 }
2465 }
2466
2467
2468 typedef struct {
2469 nxt_app_t *app;
2470 nxt_int_t target;
2471 } nxt_http_app_conf_t;
2472
2473
2474 nxt_int_t
nxt_router_application_init(nxt_router_conf_t * rtcf,nxt_str_t * name,nxt_str_t * target,nxt_http_action_t * action)2475 nxt_router_application_init(nxt_router_conf_t *rtcf, nxt_str_t *name,
2476 nxt_str_t *target, nxt_http_action_t *action)
2477 {
2478 nxt_app_t *app;
2479 nxt_str_t *targets;
2480 nxt_uint_t i;
2481 nxt_http_app_conf_t *conf;
2482
2483 app = nxt_router_apps_hash_get(rtcf, name);
2484 if (app == NULL) {
2485 return NXT_DECLINED;
2486 }
2487
2488 conf = nxt_mp_get(rtcf->mem_pool, sizeof(nxt_http_app_conf_t));
2489 if (nxt_slow_path(conf == NULL)) {
2490 return NXT_ERROR;
2491 }
2492
2493 action->handler = nxt_http_application_handler;
2494 action->u.conf = conf;
2495
2496 conf->app = app;
2497
2498 if (target != NULL && target->length != 0) {
2499 targets = app->targets;
2500
2501 for (i = 0; !nxt_strstr_eq(target, &targets[i]); i++);
2502
2503 conf->target = i;
2504
2505 } else {
2506 conf->target = 0;
2507 }
2508
2509 return NXT_OK;
2510 }
2511
2512
2513 static nxt_socket_conf_t *
nxt_router_socket_conf(nxt_task_t * task,nxt_router_temp_conf_t * tmcf,nxt_str_t * name)2514 nxt_router_socket_conf(nxt_task_t *task, nxt_router_temp_conf_t *tmcf,
2515 nxt_str_t *name)
2516 {
2517 size_t size;
2518 nxt_int_t ret;
2519 nxt_bool_t wildcard;
2520 nxt_sockaddr_t *sa;
2521 nxt_socket_conf_t *skcf;
2522 nxt_listen_socket_t *ls;
2523
2524 sa = nxt_sockaddr_parse(tmcf->mem_pool, name);
2525 if (nxt_slow_path(sa == NULL)) {
2526 nxt_alert(task, "invalid listener \"%V\"", name);
2527 return NULL;
2528 }
2529
2530 sa->type = SOCK_STREAM;
2531
2532 nxt_debug(task, "router listener: \"%*s\"",
2533 (size_t) sa->length, nxt_sockaddr_start(sa));
2534
2535 skcf = nxt_mp_zget(tmcf->router_conf->mem_pool, sizeof(nxt_socket_conf_t));
2536 if (nxt_slow_path(skcf == NULL)) {
2537 return NULL;
2538 }
2539
2540 size = nxt_sockaddr_size(sa);
2541
2542 ret = nxt_router_listen_socket_find(tmcf, skcf, sa);
2543
2544 if (ret != NXT_OK) {
2545
2546 ls = nxt_zalloc(sizeof(nxt_listen_socket_t) + size);
2547 if (nxt_slow_path(ls == NULL)) {
2548 return NULL;
2549 }
2550
2551 skcf->listen = ls;
2552
2553 ls->sockaddr = nxt_pointer_to(ls, sizeof(nxt_listen_socket_t));
2554 nxt_memcpy(ls->sockaddr, sa, size);
2555
2556 nxt_listen_socket_remote_size(ls);
2557
2558 ls->socket = -1;
2559 ls->backlog = NXT_LISTEN_BACKLOG;
2560 ls->flags = NXT_NONBLOCK;
2561 ls->read_after_accept = 1;
2562 }
2563
2564 switch (sa->u.sockaddr.sa_family) {
2565 #if (NXT_HAVE_UNIX_DOMAIN)
2566 case AF_UNIX:
2567 wildcard = 0;
2568 break;
2569 #endif
2570 #if (NXT_INET6)
2571 case AF_INET6:
2572 wildcard = IN6_IS_ADDR_UNSPECIFIED(&sa->u.sockaddr_in6.sin6_addr);
2573 break;
2574 #endif
2575 case AF_INET:
2576 default:
2577 wildcard = (sa->u.sockaddr_in.sin_addr.s_addr == INADDR_ANY);
2578 break;
2579 }
2580
2581 if (!wildcard) {
2582 skcf->sockaddr = nxt_mp_zget(tmcf->router_conf->mem_pool, size);
2583 if (nxt_slow_path(skcf->sockaddr == NULL)) {
2584 return NULL;
2585 }
2586
2587 nxt_memcpy(skcf->sockaddr, sa, size);
2588 }
2589
2590 return skcf;
2591 }
2592
2593
2594 static nxt_int_t
nxt_router_listen_socket_find(nxt_router_temp_conf_t * tmcf,nxt_socket_conf_t * nskcf,nxt_sockaddr_t * sa)2595 nxt_router_listen_socket_find(nxt_router_temp_conf_t *tmcf,
2596 nxt_socket_conf_t *nskcf, nxt_sockaddr_t *sa)
2597 {
2598 nxt_router_t *router;
2599 nxt_queue_link_t *qlk;
2600 nxt_socket_conf_t *skcf;
2601
2602 router = tmcf->router_conf->router;
2603
2604 for (qlk = nxt_queue_first(&router->sockets);
2605 qlk != nxt_queue_tail(&router->sockets);
2606 qlk = nxt_queue_next(qlk))
2607 {
2608 skcf = nxt_queue_link_data(qlk, nxt_socket_conf_t, link);
2609
2610 if (nxt_sockaddr_cmp(skcf->listen->sockaddr, sa)) {
2611 nskcf->listen = skcf->listen;
2612
2613 nxt_queue_remove(qlk);
2614 nxt_queue_insert_tail(&keeping_sockets, qlk);
2615
2616 nxt_queue_insert_tail(&updating_sockets, &nskcf->link);
2617
2618 return NXT_OK;
2619 }
2620 }
2621
2622 nxt_queue_insert_tail(&pending_sockets, &nskcf->link);
2623
2624 return NXT_DECLINED;
2625 }
2626
2627
2628 static void
nxt_router_listen_socket_rpc_create(nxt_task_t * task,nxt_router_temp_conf_t * tmcf,nxt_socket_conf_t * skcf)2629 nxt_router_listen_socket_rpc_create(nxt_task_t *task,
2630 nxt_router_temp_conf_t *tmcf, nxt_socket_conf_t *skcf)
2631 {
2632 size_t size;
2633 uint32_t stream;
2634 nxt_int_t ret;
2635 nxt_buf_t *b;
2636 nxt_port_t *main_port, *router_port;
2637 nxt_runtime_t *rt;
2638 nxt_socket_rpc_t *rpc;
2639
2640 rpc = nxt_mp_alloc(tmcf->mem_pool, sizeof(nxt_socket_rpc_t));
2641 if (rpc == NULL) {
2642 goto fail;
2643 }
2644
2645 rpc->socket_conf = skcf;
2646 rpc->temp_conf = tmcf;
2647
2648 size = nxt_sockaddr_size(skcf->listen->sockaddr);
2649
2650 b = nxt_buf_mem_alloc(tmcf->mem_pool, size, 0);
2651 if (b == NULL) {
2652 goto fail;
2653 }
2654
2655 b->completion_handler = nxt_buf_dummy_completion;
2656
2657 b->mem.free = nxt_cpymem(b->mem.free, skcf->listen->sockaddr, size);
2658
2659 rt = task->thread->runtime;
2660 main_port = rt->port_by_type[NXT_PROCESS_MAIN];
2661 router_port = rt->port_by_type[NXT_PROCESS_ROUTER];
2662
2663 stream = nxt_port_rpc_register_handler(task, router_port,
2664 nxt_router_listen_socket_ready,
2665 nxt_router_listen_socket_error,
2666 main_port->pid, rpc);
2667 if (nxt_slow_path(stream == 0)) {
2668 goto fail;
2669 }
2670
2671 ret = nxt_port_socket_write(task, main_port, NXT_PORT_MSG_SOCKET, -1,
2672 stream, router_port->id, b);
2673
2674 if (nxt_slow_path(ret != NXT_OK)) {
2675 nxt_port_rpc_cancel(task, router_port, stream);
2676 goto fail;
2677 }
2678
2679 return;
2680
2681 fail:
2682
2683 nxt_router_conf_error(task, tmcf);
2684 }
2685
2686
2687 static void
nxt_router_listen_socket_ready(nxt_task_t * task,nxt_port_recv_msg_t * msg,void * data)2688 nxt_router_listen_socket_ready(nxt_task_t *task, nxt_port_recv_msg_t *msg,
2689 void *data)
2690 {
2691 nxt_int_t ret;
2692 nxt_socket_t s;
2693 nxt_socket_rpc_t *rpc;
2694
2695 rpc = data;
2696
2697 s = msg->fd[0];
2698
2699 ret = nxt_socket_nonblocking(task, s);
2700 if (nxt_slow_path(ret != NXT_OK)) {
2701 goto fail;
2702 }
2703
2704 nxt_socket_defer_accept(task, s, rpc->socket_conf->listen->sockaddr);
2705
2706 ret = nxt_listen_socket(task, s, NXT_LISTEN_BACKLOG);
2707 if (nxt_slow_path(ret != NXT_OK)) {
2708 goto fail;
2709 }
2710
2711 rpc->socket_conf->listen->socket = s;
2712
2713 nxt_work_queue_add(&task->thread->engine->fast_work_queue,
2714 nxt_router_conf_apply, task, rpc->temp_conf, NULL);
2715
2716 return;
2717
2718 fail:
2719
2720 nxt_socket_close(task, s);
2721
2722 nxt_router_conf_error(task, rpc->temp_conf);
2723 }
2724
2725
2726 static void
nxt_router_listen_socket_error(nxt_task_t * task,nxt_port_recv_msg_t * msg,void * data)2727 nxt_router_listen_socket_error(nxt_task_t *task, nxt_port_recv_msg_t *msg,
2728 void *data)
2729 {
2730 nxt_socket_rpc_t *rpc;
2731 nxt_router_temp_conf_t *tmcf;
2732
2733 rpc = data;
2734 tmcf = rpc->temp_conf;
2735
2736 #if 0
2737 u_char *p;
2738 size_t size;
2739 uint8_t error;
2740 nxt_buf_t *in, *out;
2741 nxt_sockaddr_t *sa;
2742
2743 static nxt_str_t socket_errors[] = {
2744 nxt_string("ListenerSystem"),
2745 nxt_string("ListenerNoIPv6"),
2746 nxt_string("ListenerPort"),
2747 nxt_string("ListenerInUse"),
2748 nxt_string("ListenerNoAddress"),
2749 nxt_string("ListenerNoAccess"),
2750 nxt_string("ListenerPath"),
2751 };
2752
2753 sa = rpc->socket_conf->listen->sockaddr;
2754
2755 in = nxt_buf_chk_make_plain(tmcf->mem_pool, msg->buf, msg->size);
2756
2757 if (nxt_slow_path(in == NULL)) {
2758 return;
2759 }
2760
2761 p = in->mem.pos;
2762
2763 error = *p++;
2764
2765 size = nxt_length("listen socket error: ")
2766 + nxt_length("{listener: \"\", code:\"\", message: \"\"}")
2767 + sa->length + socket_errors[error].length + (in->mem.free - p);
2768
2769 out = nxt_buf_mem_alloc(tmcf->mem_pool, size, 0);
2770 if (nxt_slow_path(out == NULL)) {
2771 return;
2772 }
2773
2774 out->mem.free = nxt_sprintf(out->mem.free, out->mem.end,
2775 "listen socket error: "
2776 "{listener: \"%*s\", code:\"%V\", message: \"%*s\"}",
2777 (size_t) sa->length, nxt_sockaddr_start(sa),
2778 &socket_errors[error], in->mem.free - p, p);
2779
2780 nxt_debug(task, "%*s", out->mem.free - out->mem.pos, out->mem.pos);
2781 #endif
2782
2783 nxt_router_conf_error(task, tmcf);
2784 }
2785
2786
2787 #if (NXT_TLS)
2788
2789 static void
nxt_router_tls_rpc_handler(nxt_task_t * task,nxt_port_recv_msg_t * msg,void * data)2790 nxt_router_tls_rpc_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg,
2791 void *data)
2792 {
2793 nxt_mp_t *mp;
2794 nxt_int_t ret;
2795 nxt_tls_conf_t *tlscf;
2796 nxt_router_tlssock_t *tls;
2797 nxt_tls_bundle_conf_t *bundle;
2798 nxt_router_temp_conf_t *tmcf;
2799
2800 nxt_debug(task, "tls rpc handler");
2801
2802 tls = data;
2803 tmcf = tls->temp_conf;
2804
2805 if (msg == NULL || msg->port_msg.type == _NXT_PORT_MSG_RPC_ERROR) {
2806 goto fail;
2807 }
2808
2809 mp = tmcf->router_conf->mem_pool;
2810
2811 if (tls->socket_conf->tls == NULL){
2812 tlscf = nxt_mp_zget(mp, sizeof(nxt_tls_conf_t));
2813 if (nxt_slow_path(tlscf == NULL)) {
2814 goto fail;
2815 }
2816
2817 tlscf->no_wait_shutdown = 1;
2818 tls->socket_conf->tls = tlscf;
2819
2820 } else {
2821 tlscf = tls->socket_conf->tls;
2822 }
2823
2824 tls->tls_init->conf = tlscf;
2825
2826 bundle = nxt_mp_get(mp, sizeof(nxt_tls_bundle_conf_t));
2827 if (nxt_slow_path(bundle == NULL)) {
2828 goto fail;
2829 }
2830
2831 if (nxt_slow_path(nxt_str_dup(mp, &bundle->name, &tls->name) == NULL)) {
2832 goto fail;
2833 }
2834
2835 bundle->chain_file = msg->fd[0];
2836 bundle->next = tlscf->bundle;
2837 tlscf->bundle = bundle;
2838
2839 ret = task->thread->runtime->tls->server_init(task, mp, tls->tls_init,
2840 tls->last);
2841 if (nxt_slow_path(ret != NXT_OK)) {
2842 goto fail;
2843 }
2844
2845 nxt_work_queue_add(&task->thread->engine->fast_work_queue,
2846 nxt_router_conf_apply, task, tmcf, NULL);
2847 return;
2848
2849 fail:
2850
2851 nxt_router_conf_error(task, tmcf);
2852 }
2853
2854 #endif
2855
2856
2857 static void
nxt_router_app_rpc_create(nxt_task_t * task,nxt_router_temp_conf_t * tmcf,nxt_app_t * app)2858 nxt_router_app_rpc_create(nxt_task_t *task,
2859 nxt_router_temp_conf_t *tmcf, nxt_app_t *app)
2860 {
2861 size_t size;
2862 uint32_t stream;
2863 nxt_fd_t port_fd, queue_fd;
2864 nxt_int_t ret;
2865 nxt_buf_t *b;
2866 nxt_port_t *router_port, *dport;
2867 nxt_runtime_t *rt;
2868 nxt_app_rpc_t *rpc;
2869
2870 rt = task->thread->runtime;
2871
2872 dport = app->proto_port;
2873
2874 if (dport == NULL) {
2875 nxt_debug(task, "app '%V' prototype prefork", &app->name);
2876
2877 size = app->name.length + 1 + app->conf.length;
2878
2879 b = nxt_buf_mem_alloc(tmcf->mem_pool, size, 0);
2880 if (nxt_slow_path(b == NULL)) {
2881 goto fail;
2882 }
2883
2884 b->completion_handler = nxt_buf_dummy_completion;
2885
2886 nxt_buf_cpystr(b, &app->name);
2887 *b->mem.free++ = '\0';
2888 nxt_buf_cpystr(b, &app->conf);
2889
2890 dport = rt->port_by_type[NXT_PROCESS_MAIN];
2891
2892 port_fd = app->shared_port->pair[0];
2893 queue_fd = app->shared_port->queue_fd;
2894
2895 } else {
2896 nxt_debug(task, "app '%V' prefork", &app->name);
2897
2898 b = NULL;
2899 port_fd = -1;
2900 queue_fd = -1;
2901 }
2902
2903 router_port = rt->port_by_type[NXT_PROCESS_ROUTER];
2904
2905 rpc = nxt_port_rpc_register_handler_ex(task, router_port,
2906 nxt_router_app_prefork_ready,
2907 nxt_router_app_prefork_error,
2908 sizeof(nxt_app_rpc_t));
2909 if (nxt_slow_path(rpc == NULL)) {
2910 goto fail;
2911 }
2912
2913 rpc->app = app;
2914 rpc->temp_conf = tmcf;
2915 rpc->proto = (b != NULL);
2916
2917 stream = nxt_port_rpc_ex_stream(rpc);
2918
2919 ret = nxt_port_socket_write2(task, dport, NXT_PORT_MSG_START_PROCESS,
2920 port_fd, queue_fd, stream, router_port->id, b);
2921 if (nxt_slow_path(ret != NXT_OK)) {
2922 nxt_port_rpc_cancel(task, router_port, stream);
2923 goto fail;
2924 }
2925
2926 if (b == NULL) {
2927 nxt_port_rpc_ex_set_peer(task, router_port, rpc, dport->pid);
2928
2929 app->pending_processes++;
2930 }
2931
2932 return;
2933
2934 fail:
2935
2936 nxt_router_conf_error(task, tmcf);
2937 }
2938
2939
2940 static void
nxt_router_app_prefork_ready(nxt_task_t * task,nxt_port_recv_msg_t * msg,void * data)2941 nxt_router_app_prefork_ready(nxt_task_t *task, nxt_port_recv_msg_t *msg,
2942 void *data)
2943 {
2944 nxt_app_t *app;
2945 nxt_port_t *port;
2946 nxt_app_rpc_t *rpc;
2947 nxt_event_engine_t *engine;
2948
2949 rpc = data;
2950 app = rpc->app;
2951
2952 port = msg->u.new_port;
2953
2954 nxt_assert(port != NULL);
2955 nxt_assert(port->id == 0);
2956
2957 if (rpc->proto) {
2958 nxt_assert(app->proto_port == NULL);
2959 nxt_assert(port->type == NXT_PROCESS_PROTOTYPE);
2960
2961 nxt_port_inc_use(port);
2962
2963 app->proto_port = port;
2964 port->app = app;
2965
2966 nxt_router_app_rpc_create(task, rpc->temp_conf, app);
2967
2968 return;
2969 }
2970
2971 nxt_assert(port->type == NXT_PROCESS_APP);
2972
2973 port->app = app;
2974 port->main_app_port = port;
2975
2976 app->pending_processes--;
2977 app->processes++;
2978 app->idle_processes++;
2979
2980 engine = task->thread->engine;
2981
2982 nxt_queue_insert_tail(&app->ports, &port->app_link);
2983 nxt_queue_insert_tail(&app->spare_ports, &port->idle_link);
2984
2985 nxt_debug(task, "app '%V' move new port %PI:%d to spare_ports",
2986 &app->name, port->pid, port->id);
2987
2988 nxt_port_hash_add(&app->port_hash, port);
2989 app->port_hash_count++;
2990
2991 port->idle_start = 0;
2992
2993 nxt_port_inc_use(port);
2994
2995 nxt_port_socket_write(task, port, NXT_PORT_MSG_PORT_ACK, -1, 0, 0, NULL);
2996
2997 nxt_work_queue_add(&engine->fast_work_queue,
2998 nxt_router_conf_apply, task, rpc->temp_conf, NULL);
2999 }
3000
3001
3002 static void
nxt_router_app_prefork_error(nxt_task_t * task,nxt_port_recv_msg_t * msg,void * data)3003 nxt_router_app_prefork_error(nxt_task_t *task, nxt_port_recv_msg_t *msg,
3004 void *data)
3005 {
3006 nxt_app_t *app;
3007 nxt_app_rpc_t *rpc;
3008 nxt_router_temp_conf_t *tmcf;
3009
3010 rpc = data;
3011 app = rpc->app;
3012 tmcf = rpc->temp_conf;
3013
3014 if (rpc->proto) {
3015 nxt_log(task, NXT_LOG_WARN, "failed to start prototype \"%V\"",
3016 &app->name);
3017
3018 } else {
3019 nxt_log(task, NXT_LOG_WARN, "failed to start application \"%V\"",
3020 &app->name);
3021
3022 app->pending_processes--;
3023 }
3024
3025 nxt_router_conf_error(task, tmcf);
3026 }
3027
3028
3029 static nxt_int_t
nxt_router_engines_create(nxt_task_t * task,nxt_router_t * router,nxt_router_temp_conf_t * tmcf,const nxt_event_interface_t * interface)3030 nxt_router_engines_create(nxt_task_t *task, nxt_router_t *router,
3031 nxt_router_temp_conf_t *tmcf, const nxt_event_interface_t *interface)
3032 {
3033 nxt_int_t ret;
3034 nxt_uint_t n, threads;
3035 nxt_queue_link_t *qlk;
3036 nxt_router_engine_conf_t *recf;
3037
3038 threads = tmcf->router_conf->threads;
3039
3040 tmcf->engines = nxt_array_create(tmcf->mem_pool, threads,
3041 sizeof(nxt_router_engine_conf_t));
3042 if (nxt_slow_path(tmcf->engines == NULL)) {
3043 return NXT_ERROR;
3044 }
3045
3046 n = 0;
3047
3048 for (qlk = nxt_queue_first(&router->engines);
3049 qlk != nxt_queue_tail(&router->engines);
3050 qlk = nxt_queue_next(qlk))
3051 {
3052 recf = nxt_array_zero_add(tmcf->engines);
3053 if (nxt_slow_path(recf == NULL)) {
3054 return NXT_ERROR;
3055 }
3056
3057 recf->engine = nxt_queue_link_data(qlk, nxt_event_engine_t, link0);
3058
3059 if (n < threads) {
3060 recf->action = NXT_ROUTER_ENGINE_KEEP;
3061 ret = nxt_router_engine_conf_update(tmcf, recf);
3062
3063 } else {
3064 recf->action = NXT_ROUTER_ENGINE_DELETE;
3065 ret = nxt_router_engine_conf_delete(tmcf, recf);
3066 }
3067
3068 if (nxt_slow_path(ret != NXT_OK)) {
3069 return ret;
3070 }
3071
3072 n++;
3073 }
3074
3075 tmcf->new_threads = n;
3076
3077 while (n < threads) {
3078 recf = nxt_array_zero_add(tmcf->engines);
3079 if (nxt_slow_path(recf == NULL)) {
3080 return NXT_ERROR;
3081 }
3082
3083 recf->action = NXT_ROUTER_ENGINE_ADD;
3084
3085 recf->engine = nxt_event_engine_create(task, interface, NULL, 0, 0);
3086 if (nxt_slow_path(recf->engine == NULL)) {
3087 return NXT_ERROR;
3088 }
3089
3090 ret = nxt_router_engine_conf_create(tmcf, recf);
3091 if (nxt_slow_path(ret != NXT_OK)) {
3092 return ret;
3093 }
3094
3095 n++;
3096 }
3097
3098 return NXT_OK;
3099 }
3100
3101
3102 static nxt_int_t
nxt_router_engine_conf_create(nxt_router_temp_conf_t * tmcf,nxt_router_engine_conf_t * recf)3103 nxt_router_engine_conf_create(nxt_router_temp_conf_t *tmcf,
3104 nxt_router_engine_conf_t *recf)
3105 {
3106 nxt_int_t ret;
3107
3108 ret = nxt_router_engine_joints_create(tmcf, recf, &creating_sockets,
3109 nxt_router_listen_socket_create);
3110 if (nxt_slow_path(ret != NXT_OK)) {
3111 return ret;
3112 }
3113
3114 ret = nxt_router_engine_joints_create(tmcf, recf, &updating_sockets,
3115 nxt_router_listen_socket_create);
3116 if (nxt_slow_path(ret != NXT_OK)) {
3117 return ret;
3118 }
3119
3120 return ret;
3121 }
3122
3123
3124 static nxt_int_t
nxt_router_engine_conf_update(nxt_router_temp_conf_t * tmcf,nxt_router_engine_conf_t * recf)3125 nxt_router_engine_conf_update(nxt_router_temp_conf_t *tmcf,
3126 nxt_router_engine_conf_t *recf)
3127 {
3128 nxt_int_t ret;
3129
3130 ret = nxt_router_engine_joints_create(tmcf, recf, &creating_sockets,
3131 nxt_router_listen_socket_create);
3132 if (nxt_slow_path(ret != NXT_OK)) {
3133 return ret;
3134 }
3135
3136 ret = nxt_router_engine_joints_create(tmcf, recf, &updating_sockets,
3137 nxt_router_listen_socket_update);
3138 if (nxt_slow_path(ret != NXT_OK)) {
3139 return ret;
3140 }
3141
3142 ret = nxt_router_engine_joints_delete(tmcf, recf, &deleting_sockets);
3143 if (nxt_slow_path(ret != NXT_OK)) {
3144 return ret;
3145 }
3146
3147 return ret;
3148 }
3149
3150
3151 static nxt_int_t
nxt_router_engine_conf_delete(nxt_router_temp_conf_t * tmcf,nxt_router_engine_conf_t * recf)3152 nxt_router_engine_conf_delete(nxt_router_temp_conf_t *tmcf,
3153