1
2 /*
3 * Copyright (C) Igor Sysoev
4 * Copyright (C) Valentin V. Bartenev
5 * Copyright (C) NGINX, Inc.
6 */
7
8 #include <nxt_router.h>
9 #include <nxt_conf.h>
10 #include <nxt_status.h>
11 #if (NXT_TLS)
12 #include <nxt_cert.h>
13 #endif
14 #if (NXT_HAVE_NJS)
15 #include <nxt_script.h>
16 #endif
17 #include <nxt_http.h>
18 #include <nxt_port_memory_int.h>
19 #include <nxt_unit_request.h>
20 #include <nxt_unit_response.h>
21 #include <nxt_router_request.h>
22 #include <nxt_app_queue.h>
23 #include <nxt_port_queue.h>
24
25 #define NXT_SHARED_PORT_ID 0xFFFFu
26
27 typedef struct {
28 nxt_str_t type;
29 uint32_t processes;
30 uint32_t max_processes;
31 uint32_t spare_processes;
32 nxt_msec_t timeout;
33 nxt_msec_t idle_timeout;
34 nxt_conf_value_t *limits_value;
35 nxt_conf_value_t *processes_value;
36 nxt_conf_value_t *targets_value;
37 } nxt_router_app_conf_t;
38
39
40 typedef struct {
41 nxt_str_t pass;
42 nxt_str_t application;
43 } nxt_router_listener_conf_t;
44
45
46 #if (NXT_TLS)
47
48 typedef struct {
49 nxt_str_t name;
50 nxt_socket_conf_t *socket_conf;
51 nxt_router_temp_conf_t *temp_conf;
52 nxt_tls_init_t *tls_init;
53 nxt_bool_t last;
54
55 nxt_queue_link_t link; /* for nxt_socket_conf_t.tls */
56 } nxt_router_tlssock_t;
57
58 #endif
59
60
61 #if (NXT_HAVE_NJS)
62
63 typedef struct {
64 nxt_str_t name;
65 nxt_router_temp_conf_t *temp_conf;
66 nxt_queue_link_t link;
67 } nxt_router_js_module_t;
68
69 #endif
70
71
72 typedef struct {
73 nxt_str_t *name;
74 nxt_socket_conf_t *socket_conf;
75 nxt_router_temp_conf_t *temp_conf;
76 nxt_bool_t last;
77 } nxt_socket_rpc_t;
78
79
80 typedef struct {
81 nxt_app_t *app;
82 nxt_router_temp_conf_t *temp_conf;
83 uint8_t proto; /* 1 bit */
84 } nxt_app_rpc_t;
85
86
87 typedef struct {
88 nxt_app_joint_t *app_joint;
89 uint32_t generation;
90 uint8_t proto; /* 1 bit */
91 } nxt_app_joint_rpc_t;
92
93
94 static nxt_int_t nxt_router_prefork(nxt_task_t *task, nxt_process_t *process,
95 nxt_mp_t *mp);
96 static nxt_int_t nxt_router_start(nxt_task_t *task, nxt_process_data_t *data);
97 static void nxt_router_greet_controller(nxt_task_t *task,
98 nxt_port_t *controller_port);
99
100 static nxt_int_t nxt_router_start_app_process(nxt_task_t *task, nxt_app_t *app);
101
102 static void nxt_router_new_port_handler(nxt_task_t *task,
103 nxt_port_recv_msg_t *msg);
104 static void nxt_router_conf_data_handler(nxt_task_t *task,
105 nxt_port_recv_msg_t *msg);
106 static void nxt_router_app_restart_handler(nxt_task_t *task,
107 nxt_port_recv_msg_t *msg);
108 static void nxt_router_status_handler(nxt_task_t *task,
109 nxt_port_recv_msg_t *msg);
110 static void nxt_router_remove_pid_handler(nxt_task_t *task,
111 nxt_port_recv_msg_t *msg);
112
113 static nxt_router_temp_conf_t *nxt_router_temp_conf(nxt_task_t *task);
114 static void nxt_router_conf_ready(nxt_task_t *task,
115 nxt_router_temp_conf_t *tmcf);
116 static void nxt_router_conf_send(nxt_task_t *task,
117 nxt_router_temp_conf_t *tmcf, nxt_port_msg_type_t type);
118
119 static nxt_int_t nxt_router_conf_create(nxt_task_t *task,
120 nxt_router_temp_conf_t *tmcf, u_char *start, u_char *end);
121 static nxt_int_t nxt_router_conf_process_static(nxt_task_t *task,
122 nxt_router_conf_t *rtcf, nxt_conf_value_t *conf);
123 static nxt_http_forward_t *nxt_router_conf_forward(nxt_task_t *task,
124 nxt_mp_t *mp, nxt_conf_value_t *conf);
125 static nxt_int_t nxt_router_conf_forward_header(nxt_mp_t *mp,
126 nxt_conf_value_t *conf, nxt_http_forward_header_t *fh);
127
128 static nxt_app_t *nxt_router_app_find(nxt_queue_t *queue, nxt_str_t *name);
129 static nxt_int_t nxt_router_apps_hash_test(nxt_lvlhsh_query_t *lhq, void *data);
130 static nxt_int_t nxt_router_apps_hash_add(nxt_router_conf_t *rtcf,
131 nxt_app_t *app);
132 static nxt_app_t *nxt_router_apps_hash_get(nxt_router_conf_t *rtcf,
133 nxt_str_t *name);
134 static void nxt_router_apps_hash_use(nxt_task_t *task, nxt_router_conf_t *rtcf,
135 int i);
136
137 static nxt_int_t nxt_router_app_queue_init(nxt_task_t *task,
138 nxt_port_t *port);
139 static nxt_int_t nxt_router_port_queue_init(nxt_task_t *task,
140 nxt_port_t *port);
141 static nxt_int_t nxt_router_port_queue_map(nxt_task_t *task,
142 nxt_port_t *port, nxt_fd_t fd);
143 static void nxt_router_listen_socket_rpc_create(nxt_task_t *task,
144 nxt_router_temp_conf_t *tmcf, nxt_socket_conf_t *skcf);
145 static void nxt_router_listen_socket_ready(nxt_task_t *task,
146 nxt_port_recv_msg_t *msg, void *data);
147 static void nxt_router_listen_socket_error(nxt_task_t *task,
148 nxt_port_recv_msg_t *msg, void *data);
149 #if (NXT_TLS)
150 static void nxt_router_tls_rpc_handler(nxt_task_t *task,
151 nxt_port_recv_msg_t *msg, void *data);
152 static nxt_int_t nxt_router_conf_tls_insert(nxt_router_temp_conf_t *tmcf,
153 nxt_conf_value_t *value, nxt_socket_conf_t *skcf, nxt_tls_init_t *tls_init,
154 nxt_bool_t last);
155 #endif
156 #if (NXT_HAVE_NJS)
157 static void nxt_router_js_module_rpc_handler(nxt_task_t *task,
158 nxt_port_recv_msg_t *msg, void *data);
159 static nxt_int_t nxt_router_js_module_insert(nxt_router_temp_conf_t *tmcf,
160 nxt_conf_value_t *value);
161 #endif
162 static void nxt_router_app_rpc_create(nxt_task_t *task,
163 nxt_router_temp_conf_t *tmcf, nxt_app_t *app);
164 static void nxt_router_app_prefork_ready(nxt_task_t *task,
165 nxt_port_recv_msg_t *msg, void *data);
166 static void nxt_router_app_prefork_error(nxt_task_t *task,
167 nxt_port_recv_msg_t *msg, void *data);
168 static nxt_socket_conf_t *nxt_router_socket_conf(nxt_task_t *task,
169 nxt_router_temp_conf_t *tmcf, nxt_str_t *name);
170 static nxt_int_t nxt_router_listen_socket_find(nxt_router_temp_conf_t *tmcf,
171 nxt_socket_conf_t *nskcf, nxt_sockaddr_t *sa);
172
173 static nxt_int_t nxt_router_engines_create(nxt_task_t *task,
174 nxt_router_t *router, nxt_router_temp_conf_t *tmcf,
175 const nxt_event_interface_t *interface);
176 static nxt_int_t nxt_router_engine_conf_create(nxt_router_temp_conf_t *tmcf,
177 nxt_router_engine_conf_t *recf);
178 static nxt_int_t nxt_router_engine_conf_update(nxt_router_temp_conf_t *tmcf,
179 nxt_router_engine_conf_t *recf);
180 static nxt_int_t nxt_router_engine_conf_delete(nxt_router_temp_conf_t *tmcf,
181 nxt_router_engine_conf_t *recf);
182 static nxt_int_t nxt_router_engine_joints_create(nxt_router_temp_conf_t *tmcf,
183 nxt_router_engine_conf_t *recf, nxt_queue_t *sockets,
184 nxt_work_handler_t handler);
185 static nxt_int_t nxt_router_engine_quit(nxt_router_temp_conf_t *tmcf,
186 nxt_router_engine_conf_t *recf);
187 static nxt_int_t nxt_router_engine_joints_delete(nxt_router_temp_conf_t *tmcf,
188 nxt_router_engine_conf_t *recf, nxt_queue_t *sockets);
189
190 static nxt_int_t nxt_router_threads_create(nxt_task_t *task, nxt_runtime_t *rt,
191 nxt_router_temp_conf_t *tmcf);
192 static nxt_int_t nxt_router_thread_create(nxt_task_t *task, nxt_runtime_t *rt,
193 nxt_event_engine_t *engine);
194 static void nxt_router_apps_sort(nxt_task_t *task, nxt_router_t *router,
195 nxt_router_temp_conf_t *tmcf);
196
197 static void nxt_router_engines_post(nxt_router_t *router,
198 nxt_router_temp_conf_t *tmcf);
199 static void nxt_router_engine_post(nxt_event_engine_t *engine,
200 nxt_work_t *jobs);
201
202 static void nxt_router_thread_start(void *data);
203 static void nxt_router_rt_add_port(nxt_task_t *task, void *obj,
204 void *data);
205 static void nxt_router_listen_socket_create(nxt_task_t *task, void *obj,
206 void *data);
207 static void nxt_router_listen_socket_update(nxt_task_t *task, void *obj,
208 void *data);
209 static void nxt_router_listen_socket_delete(nxt_task_t *task, void *obj,
210 void *data);
211 static void nxt_router_worker_thread_quit(nxt_task_t *task, void *obj,
212 void *data);
213 static void nxt_router_listen_socket_close(nxt_task_t *task, void *obj,
214 void *data);
215 static void nxt_router_thread_exit_handler(nxt_task_t *task, void *obj,
216 void *data);
217 static void nxt_router_req_headers_ack_handler(nxt_task_t *task,
218 nxt_port_recv_msg_t *msg, nxt_request_rpc_data_t *req_rpc_data);
219 static void nxt_router_listen_socket_release(nxt_task_t *task,
220 nxt_socket_conf_t *skcf);
221
222 static void nxt_router_app_port_ready(nxt_task_t *task,
223 nxt_port_recv_msg_t *msg, void *data);
224 static void nxt_router_app_port_error(nxt_task_t *task,
225 nxt_port_recv_msg_t *msg, void *data);
226
227 static void nxt_router_app_use(nxt_task_t *task, nxt_app_t *app, int i);
228 static void nxt_router_app_unlink(nxt_task_t *task, nxt_app_t *app);
229
230 static void nxt_router_app_port_release(nxt_task_t *task, nxt_app_t *app,
231 nxt_port_t *port, nxt_apr_action_t action);
232 static void nxt_router_app_port_get(nxt_task_t *task, nxt_app_t *app,
233 nxt_request_rpc_data_t *req_rpc_data);
234 static void nxt_router_http_request_error(nxt_task_t *task, void *obj,
235 void *data);
236 static void nxt_router_http_request_done(nxt_task_t *task, void *obj,
237 void *data);
238
239 static void nxt_router_app_prepare_request(nxt_task_t *task,
240 nxt_request_rpc_data_t *req_rpc_data);
241 static nxt_buf_t *nxt_router_prepare_msg(nxt_task_t *task,
242 nxt_http_request_t *r, nxt_app_t *app, const nxt_str_t *prefix);
243
244 static void nxt_router_app_timeout(nxt_task_t *task, void *obj, void *data);
245 static void nxt_router_adjust_idle_timer(nxt_task_t *task, void *obj,
246 void *data);
247 static void nxt_router_app_idle_timeout(nxt_task_t *task, void *obj,
248 void *data);
249 static void nxt_router_app_joint_release_handler(nxt_task_t *task, void *obj,
250 void *data);
251 static void nxt_router_free_app(nxt_task_t *task, void *obj, void *data);
252
253 static const nxt_http_request_state_t nxt_http_request_send_state;
254 static void nxt_http_request_send_body(nxt_task_t *task, void *obj, void *data);
255
256 static void nxt_router_app_joint_use(nxt_task_t *task,
257 nxt_app_joint_t *app_joint, int i);
258
259 static void nxt_router_http_request_release_post(nxt_task_t *task,
260 nxt_http_request_t *r);
261 static void nxt_router_http_request_release(nxt_task_t *task, void *obj,
262 void *data);
263 static void nxt_router_oosm_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg);
264 static void nxt_router_get_port_handler(nxt_task_t *task,
265 nxt_port_recv_msg_t *msg);
266 static void nxt_router_get_mmap_handler(nxt_task_t *task,
267 nxt_port_recv_msg_t *msg);
268
269 extern const nxt_http_request_state_t nxt_http_websocket;
270
271 nxt_router_t *nxt_router;
272
273 static const nxt_str_t http_prefix = nxt_string("HTTP_");
274 static const nxt_str_t empty_prefix = nxt_string("");
275
276 static const nxt_str_t *nxt_app_msg_prefix[] = {
277 [NXT_APP_EXTERNAL] = &empty_prefix,
278 [NXT_APP_PYTHON] = &empty_prefix,
279 [NXT_APP_PHP] = &http_prefix,
280 [NXT_APP_PERL] = &http_prefix,
281 [NXT_APP_RUBY] = &http_prefix,
282 [NXT_APP_JAVA] = &empty_prefix,
283 [NXT_APP_WASM] = &empty_prefix,
284 [NXT_APP_WASM_WC] = &empty_prefix,
285 };
286
287
288 static const nxt_port_handlers_t nxt_router_process_port_handlers = {
289 .quit = nxt_signal_quit_handler,
290 .new_port = nxt_router_new_port_handler,
291 .get_port = nxt_router_get_port_handler,
292 .change_file = nxt_port_change_log_file_handler,
293 .mmap = nxt_port_mmap_handler,
294 .get_mmap = nxt_router_get_mmap_handler,
295 .data = nxt_router_conf_data_handler,
296 .app_restart = nxt_router_app_restart_handler,
297 .status = nxt_router_status_handler,
298 .remove_pid = nxt_router_remove_pid_handler,
299 .access_log = nxt_router_access_log_reopen_handler,
300 .rpc_ready = nxt_port_rpc_handler,
301 .rpc_error = nxt_port_rpc_handler,
302 .oosm = nxt_router_oosm_handler,
303 };
304
305
306 const nxt_process_init_t nxt_router_process = {
307 .name = "router",
308 .type = NXT_PROCESS_ROUTER,
309 .prefork = nxt_router_prefork,
310 .restart = 1,
311 .setup = nxt_process_core_setup,
312 .start = nxt_router_start,
313 .port_handlers = &nxt_router_process_port_handlers,
314 .signals = nxt_process_signals,
315 };
316
317
318 /* Queues of nxt_socket_conf_t */
319 nxt_queue_t creating_sockets;
320 nxt_queue_t pending_sockets;
321 nxt_queue_t updating_sockets;
322 nxt_queue_t keeping_sockets;
323 nxt_queue_t deleting_sockets;
324
325
326 static nxt_int_t
nxt_router_prefork(nxt_task_t * task,nxt_process_t * process,nxt_mp_t * mp)327 nxt_router_prefork(nxt_task_t *task, nxt_process_t *process, nxt_mp_t *mp)
328 {
329 nxt_runtime_stop_app_processes(task, task->thread->runtime);
330
331 return NXT_OK;
332 }
333
334
335 static nxt_int_t
nxt_router_start(nxt_task_t * task,nxt_process_data_t * data)336 nxt_router_start(nxt_task_t *task, nxt_process_data_t *data)
337 {
338 nxt_int_t ret;
339 nxt_port_t *controller_port;
340 nxt_router_t *router;
341 nxt_runtime_t *rt;
342
343 rt = task->thread->runtime;
344
345 nxt_log(task, NXT_LOG_INFO, "router started");
346
347 #if (NXT_TLS)
348 rt->tls = nxt_service_get(rt->services, "SSL/TLS", "OpenSSL");
349 if (nxt_slow_path(rt->tls == NULL)) {
350 return NXT_ERROR;
351 }
352
353 ret = rt->tls->library_init(task);
354 if (nxt_slow_path(ret != NXT_OK)) {
355 return ret;
356 }
357 #endif
358
359 ret = nxt_http_init(task);
360 if (nxt_slow_path(ret != NXT_OK)) {
361 return ret;
362 }
363
364 router = nxt_zalloc(sizeof(nxt_router_t));
365 if (nxt_slow_path(router == NULL)) {
366 return NXT_ERROR;
367 }
368
369 nxt_queue_init(&router->engines);
370 nxt_queue_init(&router->sockets);
371 nxt_queue_init(&router->apps);
372
373 nxt_router = router;
374
375 controller_port = rt->port_by_type[NXT_PROCESS_CONTROLLER];
376 if (controller_port != NULL) {
377 nxt_router_greet_controller(task, controller_port);
378 }
379
380 return NXT_OK;
381 }
382
383
384 static void
nxt_router_greet_controller(nxt_task_t * task,nxt_port_t * controller_port)385 nxt_router_greet_controller(nxt_task_t *task, nxt_port_t *controller_port)
386 {
387 nxt_port_socket_write(task, controller_port, NXT_PORT_MSG_PROCESS_READY,
388 -1, 0, 0, NULL);
389 }
390
391
392 static void
nxt_router_start_app_process_handler(nxt_task_t * task,nxt_port_t * port,void * data)393 nxt_router_start_app_process_handler(nxt_task_t *task, nxt_port_t *port,
394 void *data)
395 {
396 size_t size;
397 uint32_t stream;
398 nxt_fd_t port_fd, queue_fd;
399 nxt_int_t ret;
400 nxt_app_t *app;
401 nxt_buf_t *b;
402 nxt_port_t *dport;
403 nxt_runtime_t *rt;
404 nxt_app_joint_rpc_t *app_joint_rpc;
405
406 app = data;
407
408 nxt_thread_mutex_lock(&app->mutex);
409
410 dport = app->proto_port;
411
412 nxt_thread_mutex_unlock(&app->mutex);
413
414 if (dport != NULL) {
415 nxt_debug(task, "app '%V' %p start process", &app->name, app);
416
417 b = NULL;
418 port_fd = -1;
419 queue_fd = -1;
420
421 } else {
422 if (app->proto_port_requests > 0) {
423 nxt_debug(task, "app '%V' %p wait for prototype process",
424 &app->name, app);
425
426 app->proto_port_requests++;
427
428 goto skip;
429 }
430
431 nxt_debug(task, "app '%V' %p start prototype process", &app->name, app);
432
433 rt = task->thread->runtime;
434 dport = rt->port_by_type[NXT_PROCESS_MAIN];
435
436 size = app->name.length + 1 + app->conf.length;
437
438 b = nxt_buf_mem_alloc(task->thread->engine->mem_pool, size, 0);
439 if (nxt_slow_path(b == NULL)) {
440 goto failed;
441 }
442
443 nxt_buf_cpystr(b, &app->name);
444 *b->mem.free++ = '\0';
445 nxt_buf_cpystr(b, &app->conf);
446
447 port_fd = app->shared_port->pair[0];
448 queue_fd = app->shared_port->queue_fd;
449 }
450
451 app_joint_rpc = nxt_port_rpc_register_handler_ex(task, port,
452 nxt_router_app_port_ready,
453 nxt_router_app_port_error,
454 sizeof(nxt_app_joint_rpc_t));
455 if (nxt_slow_path(app_joint_rpc == NULL)) {
456 goto failed;
457 }
458
459 stream = nxt_port_rpc_ex_stream(app_joint_rpc);
460
461 ret = nxt_port_socket_write2(task, dport, NXT_PORT_MSG_START_PROCESS,
462 port_fd, queue_fd, stream, port->id, b);
463 if (nxt_slow_path(ret != NXT_OK)) {
464 nxt_port_rpc_cancel(task, port, stream);
465
466 goto failed;
467 }
468
469 app_joint_rpc->app_joint = app->joint;
470 app_joint_rpc->generation = app->generation;
471 app_joint_rpc->proto = (b != NULL);
472
473 if (b != NULL) {
474 app->proto_port_requests++;
475
476 b = NULL;
477 }
478
479 nxt_router_app_joint_use(task, app->joint, 1);
480
481 failed:
482
483 if (b != NULL) {
484 nxt_mp_free(b->data, b);
485 }
486
487 skip:
488
489 nxt_router_app_use(task, app, -1);
490 }
491
492
493 static void
nxt_router_app_joint_use(nxt_task_t * task,nxt_app_joint_t * app_joint,int i)494 nxt_router_app_joint_use(nxt_task_t *task, nxt_app_joint_t *app_joint, int i)
495 {
496 app_joint->use_count += i;
497
498 if (app_joint->use_count == 0) {
499 nxt_assert(app_joint->app == NULL);
500
501 nxt_free(app_joint);
502 }
503 }
504
505
506 static nxt_int_t
nxt_router_start_app_process(nxt_task_t * task,nxt_app_t * app)507 nxt_router_start_app_process(nxt_task_t *task, nxt_app_t *app)
508 {
509 nxt_int_t res;
510 nxt_port_t *router_port;
511 nxt_runtime_t *rt;
512
513 nxt_debug(task, "app '%V' start process", &app->name);
514
515 rt = task->thread->runtime;
516 router_port = rt->port_by_type[NXT_PROCESS_ROUTER];
517
518 nxt_router_app_use(task, app, 1);
519
520 res = nxt_port_post(task, router_port, nxt_router_start_app_process_handler,
521 app);
522
523 if (res == NXT_OK) {
524 return res;
525 }
526
527 nxt_thread_mutex_lock(&app->mutex);
528
529 app->pending_processes--;
530
531 nxt_thread_mutex_unlock(&app->mutex);
532
533 nxt_router_app_use(task, app, -1);
534
535 return NXT_ERROR;
536 }
537
538
539 nxt_inline nxt_bool_t
nxt_router_msg_cancel(nxt_task_t * task,nxt_request_rpc_data_t * req_rpc_data)540 nxt_router_msg_cancel(nxt_task_t *task, nxt_request_rpc_data_t *req_rpc_data)
541 {
542 nxt_buf_t *b, *next;
543 nxt_bool_t cancelled;
544 nxt_port_t *app_port;
545 nxt_msg_info_t *msg_info;
546
547 msg_info = &req_rpc_data->msg_info;
548
549 if (msg_info->buf == NULL) {
550 return 0;
551 }
552
553 app_port = req_rpc_data->app_port;
554
555 if (app_port != NULL && app_port->id == NXT_SHARED_PORT_ID) {
556 cancelled = nxt_app_queue_cancel(app_port->queue,
557 msg_info->tracking_cookie,
558 req_rpc_data->stream);
559
560 if (cancelled) {
561 nxt_debug(task, "stream #%uD: cancelled by router",
562 req_rpc_data->stream);
563 }
564
565 } else {
566 cancelled = 0;
567 }
568
569 for (b = msg_info->buf; b != NULL; b = next) {
570 next = b->next;
571 b->next = NULL;
572
573 if (b->is_port_mmap_sent) {
574 b->is_port_mmap_sent = cancelled == 0;
575 }
576
577 b->completion_handler(task, b, b->parent);
578 }
579
580 msg_info->buf = NULL;
581
582 return cancelled;
583 }
584
585
586 nxt_inline nxt_bool_t
nxt_queue_chk_remove(nxt_queue_link_t * lnk)587 nxt_queue_chk_remove(nxt_queue_link_t *lnk)
588 {
589 if (lnk->next != NULL) {
590 nxt_queue_remove(lnk);
591
592 lnk->next = NULL;
593
594 return 1;
595 }
596
597 return 0;
598 }
599
600
601 nxt_inline void
nxt_request_rpc_data_unlink(nxt_task_t * task,nxt_request_rpc_data_t * req_rpc_data)602 nxt_request_rpc_data_unlink(nxt_task_t *task,
603 nxt_request_rpc_data_t *req_rpc_data)
604 {
605 nxt_app_t *app;
606 nxt_bool_t unlinked;
607 nxt_http_request_t *r;
608
609 nxt_router_msg_cancel(task, req_rpc_data);
610
611 app = req_rpc_data->app;
612
613 if (req_rpc_data->app_port != NULL) {
614 nxt_router_app_port_release(task, app, req_rpc_data->app_port,
615 req_rpc_data->apr_action);
616
617 req_rpc_data->app_port = NULL;
618 }
619
620 r = req_rpc_data->request;
621
622 if (r != NULL) {
623 r->timer_data = NULL;
624
625 nxt_router_http_request_release_post(task, r);
626
627 r->req_rpc_data = NULL;
628 req_rpc_data->request = NULL;
629
630 if (app != NULL) {
631 unlinked = 0;
632
633 nxt_thread_mutex_lock(&app->mutex);
634
635 if (r->app_link.next != NULL) {
636 nxt_queue_remove(&r->app_link);
637 r->app_link.next = NULL;
638
639 unlinked = 1;
640 }
641
642 nxt_thread_mutex_unlock(&app->mutex);
643
644 if (unlinked) {
645 nxt_mp_release(r->mem_pool);
646 }
647 }
648 }
649
650 if (app != NULL) {
651 nxt_router_app_use(task, app, -1);
652
653 req_rpc_data->app = NULL;
654 }
655
656 if (req_rpc_data->msg_info.body_fd != -1) {
657 nxt_fd_close(req_rpc_data->msg_info.body_fd);
658
659 req_rpc_data->msg_info.body_fd = -1;
660 }
661
662 if (req_rpc_data->rpc_cancel) {
663 req_rpc_data->rpc_cancel = 0;
664
665 nxt_port_rpc_cancel(task, task->thread->engine->port,
666 req_rpc_data->stream);
667 }
668 }
669
670
671 static void
nxt_router_new_port_handler(nxt_task_t * task,nxt_port_recv_msg_t * msg)672 nxt_router_new_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
673 {
674 nxt_int_t res;
675 nxt_app_t *app;
676 nxt_port_t *port, *main_app_port;
677 nxt_runtime_t *rt;
678
679 nxt_port_new_port_handler(task, msg);
680
681 port = msg->u.new_port;
682
683 if (port != NULL && port->type == NXT_PROCESS_CONTROLLER) {
684 nxt_router_greet_controller(task, msg->u.new_port);
685 }
686
687 if (port != NULL && port->type == NXT_PROCESS_PROTOTYPE) {
688 nxt_port_rpc_handler(task, msg);
689
690 return;
691 }
692
693 if (port == NULL || port->type != NXT_PROCESS_APP) {
694
695 if (msg->port_msg.stream == 0) {
696 return;
697 }
698
699 msg->port_msg.type = _NXT_PORT_MSG_RPC_ERROR;
700
701 } else {
702 if (msg->fd[1] != -1) {
703 res = nxt_router_port_queue_map(task, port, msg->fd[1]);
704 if (nxt_slow_path(res != NXT_OK)) {
705 return;
706 }
707
708 nxt_fd_close(msg->fd[1]);
709 msg->fd[1] = -1;
710 }
711 }
712
713 if (msg->port_msg.stream != 0) {
714 nxt_port_rpc_handler(task, msg);
715 return;
716 }
717
718 nxt_debug(task, "new port id %d (%d)", port->id, port->type);
719
720 /*
721 * Port with "id == 0" is application 'main' port and it always
722 * should come with non-zero stream.
723 */
724 nxt_assert(port->id != 0);
725
726 /* Find 'main' app port and get app reference. */
727 rt = task->thread->runtime;
728
729 /*
730 * It is safe to access 'runtime->ports' hash because 'NEW_PORT'
731 * sent to main port (with id == 0) and processed in main thread.
732 */
733 main_app_port = nxt_port_hash_find(&rt->ports, port->pid, 0);
734 nxt_assert(main_app_port != NULL);
735
736 app = main_app_port->app;
737
738 if (nxt_fast_path(app != NULL)) {
739 nxt_thread_mutex_lock(&app->mutex);
740
741 /* TODO here should be find-and-add code because there can be
742 port waiters in port_hash */
743 nxt_port_hash_add(&app->port_hash, port);
744 app->port_hash_count++;
745
746 nxt_thread_mutex_unlock(&app->mutex);
747
748 port->app = app;
749 }
750
751 port->main_app_port = main_app_port;
752
753 nxt_port_socket_write(task, port, NXT_PORT_MSG_PORT_ACK, -1, 0, 0, NULL);
754 }
755
756
757 static void
nxt_router_conf_data_handler(nxt_task_t * task,nxt_port_recv_msg_t * msg)758 nxt_router_conf_data_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
759 {
760 void *p;
761 size_t size;
762 nxt_int_t ret;
763 nxt_port_t *port;
764 nxt_router_temp_conf_t *tmcf;
765
766 port = nxt_runtime_port_find(task->thread->runtime,
767 msg->port_msg.pid,
768 msg->port_msg.reply_port);
769 if (nxt_slow_path(port == NULL)) {
770 nxt_alert(task, "conf_data_handler: reply port not found");
771 return;
772 }
773
774 p = MAP_FAILED;
775
776 /*
777 * Ancient compilers like gcc 4.8.5 on CentOS 7 wants 'size' to be
778 * initialized in 'cleanup' section.
779 */
780 size = 0;
781
782 tmcf = nxt_router_temp_conf(task);
783 if (nxt_slow_path(tmcf == NULL)) {
784 goto fail;
785 }
786
787 if (nxt_slow_path(msg->fd[0] == -1)) {
788 nxt_alert(task, "conf_data_handler: invalid shm fd");
789 goto fail;
790 }
791
792 if (nxt_buf_mem_used_size(&msg->buf->mem) != sizeof(size_t)) {
793 nxt_alert(task, "conf_data_handler: unexpected buffer size (%d)",
794 (int) nxt_buf_mem_used_size(&msg->buf->mem));
795 goto fail;
796 }
797
798 nxt_memcpy(&size, msg->buf->mem.pos, sizeof(size_t));
799
800 p = nxt_mem_mmap(NULL, size, PROT_READ, MAP_SHARED, msg->fd[0], 0);
801
802 nxt_fd_close(msg->fd[0]);
803 msg->fd[0] = -1;
804
805 if (nxt_slow_path(p == MAP_FAILED)) {
806 goto fail;
807 }
808
809 nxt_debug(task, "conf_data_handler(%uz): %*s", size, size, p);
810
811 tmcf->router_conf->router = nxt_router;
812 tmcf->stream = msg->port_msg.stream;
813 tmcf->port = port;
814
815 nxt_port_use(task, tmcf->port, 1);
816
817 ret = nxt_router_conf_create(task, tmcf, p, nxt_pointer_to(p, size));
818
819 if (nxt_fast_path(ret == NXT_OK)) {
820 nxt_router_conf_apply(task, tmcf, NULL);
821
822 } else {
823 nxt_router_conf_error(task, tmcf);
824 }
825
826 goto cleanup;
827
828 fail:
829
830 nxt_port_socket_write(task, port, NXT_PORT_MSG_RPC_ERROR, -1,
831 msg->port_msg.stream, 0, NULL);
832
833 if (tmcf != NULL) {
834 nxt_mp_release(tmcf->mem_pool);
835 }
836
837 cleanup:
838
839 if (p != MAP_FAILED) {
840 nxt_mem_munmap(p, size);
841 }
842
843 if (msg->fd[0] != -1) {
844 nxt_fd_close(msg->fd[0]);
845 msg->fd[0] = -1;
846 }
847 }
848
849
850 static void
nxt_router_app_restart_handler(nxt_task_t * task,nxt_port_recv_msg_t * msg)851 nxt_router_app_restart_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
852 {
853 nxt_app_t *app;
854 nxt_int_t ret;
855 nxt_str_t app_name;
856 nxt_port_t *reply_port, *shared_port, *old_shared_port;
857 nxt_port_t *proto_port;
858 nxt_port_msg_type_t reply;
859
860 reply_port = nxt_runtime_port_find(task->thread->runtime,
861 msg->port_msg.pid,
862 msg->port_msg.reply_port);
863 if (nxt_slow_path(reply_port == NULL)) {
864 nxt_alert(task, "app_restart_handler: reply port not found");
865 return;
866 }
867
868 app_name.length = nxt_buf_mem_used_size(&msg->buf->mem);
869 app_name.start = msg->buf->mem.pos;
870
871 nxt_debug(task, "app_restart_handler: %V", &app_name);
872
873 app = nxt_router_app_find(&nxt_router->apps, &app_name);
874
875 if (nxt_fast_path(app != NULL)) {
876 shared_port = nxt_port_new(task, NXT_SHARED_PORT_ID, nxt_pid,
877 NXT_PROCESS_APP);
878 if (nxt_slow_path(shared_port == NULL)) {
879 goto fail;
880 }
881
882 ret = nxt_port_socket_init(task, shared_port, 0);
883 if (nxt_slow_path(ret != NXT_OK)) {
884 nxt_port_use(task, shared_port, -1);
885 goto fail;
886 }
887
888 ret = nxt_router_app_queue_init(task, shared_port);
889 if (nxt_slow_path(ret != NXT_OK)) {
890 nxt_port_write_close(shared_port);
891 nxt_port_read_close(shared_port);
892 nxt_port_use(task, shared_port, -1);
893 goto fail;
894 }
895
896 nxt_port_write_enable(task, shared_port);
897
898 nxt_thread_mutex_lock(&app->mutex);
899
900 proto_port = app->proto_port;
901
902 if (proto_port != NULL) {
903 nxt_debug(task, "send QUIT to prototype '%V' pid %PI", &app->name,
904 proto_port->pid);
905
906 app->proto_port = NULL;
907 proto_port->app = NULL;
908 }
909
910 app->generation++;
911
912 shared_port->app = app;
913
914 old_shared_port = app->shared_port;
915 old_shared_port->app = NULL;
916
917 app->shared_port = shared_port;
918
919 nxt_thread_mutex_unlock(&app->mutex);
920
921 nxt_port_close(task, old_shared_port);
922 nxt_port_use(task, old_shared_port, -1);
923
924 if (proto_port != NULL) {
925 (void) nxt_port_socket_write(task, proto_port, NXT_PORT_MSG_QUIT,
926 -1, 0, 0, NULL);
927
928 nxt_port_close(task, proto_port);
929
930 nxt_port_use(task, proto_port, -1);
931 }
932
933 reply = NXT_PORT_MSG_RPC_READY_LAST;
934
935 } else {
936
937 fail:
938
939 reply = NXT_PORT_MSG_RPC_ERROR;
940 }
941
942 nxt_port_socket_write(task, reply_port, reply, -1, msg->port_msg.stream,
943 0, NULL);
944 }
945
946
947 static void
nxt_router_status_handler(nxt_task_t * task,nxt_port_recv_msg_t * msg)948 nxt_router_status_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
949 {
950 u_char *p;
951 size_t alloc;
952 nxt_app_t *app;
953 nxt_buf_t *b;
954 nxt_uint_t type;
955 nxt_port_t *port;
956 nxt_status_app_t *app_stat;
957 nxt_event_engine_t *engine;
958 nxt_status_report_t *report;
959
960 port = nxt_runtime_port_find(task->thread->runtime,
961 msg->port_msg.pid,
962 msg->port_msg.reply_port);
963 if (nxt_slow_path(port == NULL)) {
964 nxt_alert(task, "nxt_router_status_handler(): reply port not found");
965 return;
966 }
967
968 alloc = sizeof(nxt_status_report_t);
969
970 nxt_queue_each(app, &nxt_router->apps, nxt_app_t, link) {
971
972 alloc += sizeof(nxt_status_app_t) + app->name.length;
973
974 } nxt_queue_loop;
975
976 b = nxt_buf_mem_alloc(port->mem_pool, alloc, 0);
977 if (nxt_slow_path(b == NULL)) {
978 type = NXT_PORT_MSG_RPC_ERROR;
979 goto fail;
980 }
981
982 report = (nxt_status_report_t *) b->mem.free;
983 b->mem.free = b->mem.end;
984
985 nxt_memzero(report, sizeof(nxt_status_report_t));
986
987 nxt_queue_each(engine, &nxt_router->engines, nxt_event_engine_t, link0) {
988
989 report->accepted_conns += engine->accepted_conns_cnt;
990 report->idle_conns += engine->idle_conns_cnt;
991 report->closed_conns += engine->closed_conns_cnt;
992 report->requests += engine->requests_cnt;
993
994 } nxt_queue_loop;
995
996 report->apps_count = 0;
997 app_stat = report->apps;
998 p = b->mem.end;
999
1000 nxt_queue_each(app, &nxt_router->apps, nxt_app_t, link) {
1001 p -= app->name.length;
1002
1003 nxt_memcpy(p, app->name.start, app->name.length);
1004
1005 app_stat->name.length = app->name.length;
1006 app_stat->name.start = (u_char *) (p - b->mem.pos);
1007
1008 app_stat->active_requests = app->active_requests;
1009 app_stat->pending_processes = app->pending_processes;
1010 app_stat->processes = app->processes;
1011 app_stat->idle_processes = app->idle_processes;
1012
1013 report->apps_count++;
1014 app_stat++;
1015 } nxt_queue_loop;
1016
1017 type = NXT_PORT_MSG_RPC_READY_LAST;
1018
1019 fail:
1020
1021 nxt_port_socket_write(task, port, type, -1, msg->port_msg.stream, 0, b);
1022 }
1023
1024
1025 static void
nxt_router_app_process_remove_pid(nxt_task_t * task,nxt_port_t * port,void * data)1026 nxt_router_app_process_remove_pid(nxt_task_t *task, nxt_port_t *port,
1027 void *data)
1028 {
1029 union {
1030 nxt_pid_t removed_pid;
1031 void *data;
1032 } u;
1033
1034 u.data = data;
1035
1036 nxt_port_rpc_remove_peer(task, port, u.removed_pid);
1037 }
1038
1039
1040 static void
nxt_router_remove_pid_handler(nxt_task_t * task,nxt_port_recv_msg_t * msg)1041 nxt_router_remove_pid_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
1042 {
1043 nxt_event_engine_t *engine;
1044
1045 nxt_port_remove_pid_handler(task, msg);
1046
1047 nxt_queue_each(engine, &nxt_router->engines, nxt_event_engine_t, link0)
1048 {
1049 if (nxt_fast_path(engine->port != NULL)) {
1050 nxt_port_post(task, engine->port, nxt_router_app_process_remove_pid,
1051 msg->u.data);
1052 }
1053 }
1054 nxt_queue_loop;
1055
1056 if (msg->port_msg.stream == 0) {
1057 return;
1058 }
1059
1060 msg->port_msg.type = _NXT_PORT_MSG_RPC_ERROR;
1061
1062 nxt_port_rpc_handler(task, msg);
1063 }
1064
1065
1066 static nxt_router_temp_conf_t *
nxt_router_temp_conf(nxt_task_t * task)1067 nxt_router_temp_conf(nxt_task_t *task)
1068 {
1069 nxt_mp_t *mp, *tmp;
1070 nxt_router_conf_t *rtcf;
1071 nxt_router_temp_conf_t *tmcf;
1072
1073 mp = nxt_mp_create(1024, 128, 256, 32);
1074 if (nxt_slow_path(mp == NULL)) {
1075 return NULL;
1076 }
1077
1078 rtcf = nxt_mp_zget(mp, sizeof(nxt_router_conf_t));
1079 if (nxt_slow_path(rtcf == NULL)) {
1080 goto fail;
1081 }
1082
1083 rtcf->mem_pool = mp;
1084
1085 rtcf->tstr_state = nxt_tstr_state_new(mp, 0);
1086 if (nxt_slow_path(rtcf->tstr_state == NULL)) {
1087 goto fail;
1088 }
1089
1090 #if (NXT_HAVE_NJS)
1091 nxt_http_register_js_proto(rtcf->tstr_state->jcf);
1092 #endif
1093
1094 tmp = nxt_mp_create(1024, 128, 256, 32);
1095 if (nxt_slow_path(tmp == NULL)) {
1096 goto fail;
1097 }
1098
1099 tmcf = nxt_mp_zget(tmp, sizeof(nxt_router_temp_conf_t));
1100 if (nxt_slow_path(tmcf == NULL)) {
1101 goto temp_fail;
1102 }
1103
1104 tmcf->mem_pool = tmp;
1105 tmcf->router_conf = rtcf;
1106 tmcf->count = 1;
1107 tmcf->engine = task->thread->engine;
1108
1109 tmcf->engines = nxt_array_create(tmcf->mem_pool, 4,
1110 sizeof(nxt_router_engine_conf_t));
1111 if (nxt_slow_path(tmcf->engines == NULL)) {
1112 goto temp_fail;
1113 }
1114
1115 nxt_queue_init(&creating_sockets);
1116 nxt_queue_init(&pending_sockets);
1117 nxt_queue_init(&updating_sockets);
1118 nxt_queue_init(&keeping_sockets);
1119 nxt_queue_init(&deleting_sockets);
1120
1121 #if (NXT_TLS)
1122 nxt_queue_init(&tmcf->tls);
1123 #endif
1124
1125 #if (NXT_HAVE_NJS)
1126 nxt_queue_init(&tmcf->js_modules);
1127 #endif
1128
1129 nxt_queue_init(&tmcf->apps);
1130 nxt_queue_init(&tmcf->previous);
1131
1132 return tmcf;
1133
1134 temp_fail:
1135
1136 nxt_mp_destroy(tmp);
1137
1138 fail:
1139
1140 if (rtcf->tstr_state != NULL) {
1141 nxt_tstr_state_release(rtcf->tstr_state);
1142 }
1143
1144 nxt_mp_destroy(mp);
1145
1146 return NULL;
1147 }
1148
1149
1150 nxt_inline nxt_bool_t
nxt_router_app_can_start(nxt_app_t * app)1151 nxt_router_app_can_start(nxt_app_t *app)
1152 {
1153 return app->processes + app->pending_processes < app->max_processes
1154 && app->pending_processes < app->max_pending_processes;
1155 }
1156
1157
1158 nxt_inline nxt_bool_t
nxt_router_app_need_start(nxt_app_t * app)1159 nxt_router_app_need_start(nxt_app_t *app)
1160 {
1161 return (app->active_requests
1162 > app->port_hash_count + app->pending_processes)
1163 || (app->spare_processes
1164 > app->idle_processes + app->pending_processes);
1165 }
1166
1167
1168 void
nxt_router_conf_apply(nxt_task_t * task,void * obj,void * data)1169 nxt_router_conf_apply(nxt_task_t *task, void *obj, void *data)
1170 {
1171 nxt_int_t ret;
1172 nxt_app_t *app;
1173 nxt_router_t *router;
1174 nxt_runtime_t *rt;
1175 nxt_queue_link_t *qlk;
1176 nxt_socket_conf_t *skcf;
1177 nxt_router_conf_t *rtcf;
1178 nxt_router_temp_conf_t *tmcf;
1179 const nxt_event_interface_t *interface;
1180 #if (NXT_TLS)
1181 nxt_router_tlssock_t *tls;
1182 #endif
1183 #if (NXT_HAVE_NJS)
1184 nxt_router_js_module_t *js_module;
1185 #endif
1186
1187 tmcf = obj;
1188
1189 qlk = nxt_queue_first(&pending_sockets);
1190
1191 if (qlk != nxt_queue_tail(&pending_sockets)) {
1192 nxt_queue_remove(qlk);
1193 nxt_queue_insert_tail(&creating_sockets, qlk);
1194
1195 skcf = nxt_queue_link_data(qlk, nxt_socket_conf_t, link);
1196
1197 nxt_router_listen_socket_rpc_create(task, tmcf, skcf);
1198
1199 return;
1200 }
1201
1202 #if (NXT_TLS)
1203 qlk = nxt_queue_last(&tmcf->tls);
1204
1205 if (qlk != nxt_queue_head(&tmcf->tls)) {
1206 nxt_queue_remove(qlk);
1207
1208 tls = nxt_queue_link_data(qlk, nxt_router_tlssock_t, link);
1209
1210 nxt_cert_store_get(task, &tls->name, tmcf->mem_pool,
1211 nxt_router_tls_rpc_handler, tls);
1212 return;
1213 }
1214 #endif
1215
1216 #if (NXT_HAVE_NJS)
1217 qlk = nxt_queue_last(&tmcf->js_modules);
1218
1219 if (qlk != nxt_queue_head(&tmcf->js_modules)) {
1220 nxt_queue_remove(qlk);
1221
1222 js_module = nxt_queue_link_data(qlk, nxt_router_js_module_t, link);
1223
1224 nxt_script_store_get(task, &js_module->name, tmcf->mem_pool,
1225 nxt_router_js_module_rpc_handler, js_module);
1226 return;
1227 }
1228 #endif
1229
1230 rtcf = tmcf->router_conf;
1231
1232 ret = nxt_tstr_state_done(rtcf->tstr_state, NULL);
1233 if (nxt_slow_path(ret != NXT_OK)) {
1234 goto fail;
1235 }
1236
1237 nxt_queue_each(app, &tmcf->apps, nxt_app_t, link) {
1238
1239 if (nxt_router_app_need_start(app)) {
1240 nxt_router_app_rpc_create(task, tmcf, app);
1241 return;
1242 }
1243
1244 } nxt_queue_loop;
1245
1246 if (rtcf->access_log != NULL && rtcf->access_log->fd == -1) {
1247 nxt_router_access_log_open(task, tmcf);
1248 return;
1249 }
1250
1251 rt = task->thread->runtime;
1252
1253 interface = nxt_service_get(rt->services, "engine", NULL);
1254
1255 router = rtcf->router;
1256
1257 ret = nxt_router_engines_create(task, router, tmcf, interface);
1258 if (nxt_slow_path(ret != NXT_OK)) {
1259 goto fail;
1260 }
1261
1262 ret = nxt_router_threads_create(task, rt, tmcf);
1263 if (nxt_slow_path(ret != NXT_OK)) {
1264 goto fail;
1265 }
1266
1267 nxt_router_apps_sort(task, router, tmcf);
1268
1269 nxt_router_apps_hash_use(task, rtcf, 1);
1270
1271 nxt_router_engines_post(router, tmcf);
1272
1273 nxt_queue_add(&router->sockets, &updating_sockets);
1274 nxt_queue_add(&router->sockets, &creating_sockets);
1275
1276 if (router->access_log != rtcf->access_log) {
1277 nxt_router_access_log_use(&router->lock, rtcf->access_log);
1278
1279 nxt_router_access_log_release(task, &router->lock, router->access_log);
1280
1281 router->access_log = rtcf->access_log;
1282 }
1283
1284 nxt_router_conf_ready(task, tmcf);
1285
1286 return;
1287
1288 fail:
1289
1290 nxt_router_conf_error(task, tmcf);
1291
1292 return;
1293 }
1294
1295
1296 static void
nxt_router_conf_wait(nxt_task_t * task,void * obj,void * data)1297 nxt_router_conf_wait(nxt_task_t *task, void *obj, void *data)
1298 {
1299 nxt_joint_job_t *job;
1300
1301 job = obj;
1302
1303 nxt_router_conf_ready(task, job->tmcf);
1304 }
1305
1306
1307 static void
nxt_router_conf_ready(nxt_task_t * task,nxt_router_temp_conf_t * tmcf)1308 nxt_router_conf_ready(nxt_task_t *task, nxt_router_temp_conf_t *tmcf)
1309 {
1310 uint32_t count;
1311 nxt_router_conf_t *rtcf;
1312 nxt_thread_spinlock_t *lock;
1313
1314 nxt_debug(task, "temp conf %p count: %D", tmcf, tmcf->count);
1315
1316 if (--tmcf->count > 0) {
1317 return;
1318 }
1319
1320 nxt_router_conf_send(task, tmcf, NXT_PORT_MSG_RPC_READY_LAST);
1321
1322 rtcf = tmcf->router_conf;
1323
1324 lock = &rtcf->router->lock;
1325
1326 nxt_thread_spin_lock(lock);
1327
1328 count = rtcf->count;
1329
1330 nxt_thread_spin_unlock(lock);
1331
1332 nxt_debug(task, "rtcf %p: %D", rtcf, count);
1333
1334 if (count == 0) {
1335 nxt_router_apps_hash_use(task, rtcf, -1);
1336
1337 nxt_router_access_log_release(task, lock, rtcf->access_log);
1338
1339 nxt_mp_destroy(rtcf->mem_pool);
1340 }
1341
1342 nxt_mp_release(tmcf->mem_pool);
1343 }
1344
1345
1346 void
nxt_router_conf_error(nxt_task_t * task,nxt_router_temp_conf_t * tmcf)1347 nxt_router_conf_error(nxt_task_t *task, nxt_router_temp_conf_t *tmcf)
1348 {
1349 nxt_app_t *app;
1350 nxt_socket_t s;
1351 nxt_router_t *router;
1352 nxt_queue_link_t *qlk;
1353 nxt_socket_conf_t *skcf;
1354 nxt_router_conf_t *rtcf;
1355
1356 nxt_alert(task, "failed to apply new conf");
1357
1358 for (qlk = nxt_queue_first(&creating_sockets);
1359 qlk != nxt_queue_tail(&creating_sockets);
1360 qlk = nxt_queue_next(qlk))
1361 {
1362 skcf = nxt_queue_link_data(qlk, nxt_socket_conf_t, link);
1363 s = skcf->listen->socket;
1364
1365 if (s != -1) {
1366 nxt_socket_close(task, s);
1367 }
1368
1369 nxt_free(skcf->listen);
1370 }
1371
1372 rtcf = tmcf->router_conf;
1373
1374 nxt_queue_each(app, &tmcf->apps, nxt_app_t, link) {
1375
1376 nxt_router_app_unlink(task, app);
1377
1378 } nxt_queue_loop;
1379
1380 router = rtcf->router;
1381
1382 nxt_queue_add(&router->sockets, &keeping_sockets);
1383 nxt_queue_add(&router->sockets, &deleting_sockets);
1384
1385 nxt_queue_add(&router->apps, &tmcf->previous);
1386
1387 // TODO: new engines and threads
1388
1389 nxt_router_access_log_release(task, &router->lock, rtcf->access_log);
1390
1391 nxt_mp_destroy(rtcf->mem_pool);
1392
1393 nxt_router_conf_send(task, tmcf, NXT_PORT_MSG_RPC_ERROR);
1394
1395 nxt_mp_release(tmcf->mem_pool);
1396 }
1397
1398
1399 static void
nxt_router_conf_send(nxt_task_t * task,nxt_router_temp_conf_t * tmcf,nxt_port_msg_type_t type)1400 nxt_router_conf_send(nxt_task_t *task, nxt_router_temp_conf_t *tmcf,
1401 nxt_port_msg_type_t type)
1402 {
1403 nxt_port_socket_write(task, tmcf->port, type, -1, tmcf->stream, 0, NULL);
1404
1405 nxt_port_use(task, tmcf->port, -1);
1406
1407 tmcf->port = NULL;
1408 }
1409
1410
1411 static nxt_conf_map_t nxt_router_conf[] = {
1412 {
1413 nxt_string("listeners_threads"),
1414 NXT_CONF_MAP_INT32,
1415 offsetof(nxt_router_conf_t, threads),
1416 },
1417 };
1418
1419
1420 static nxt_conf_map_t nxt_router_app_conf[] = {
1421 {
1422 nxt_string("type"),
1423 NXT_CONF_MAP_STR,
1424 offsetof(nxt_router_app_conf_t, type),
1425 },
1426
1427 {
1428 nxt_string("limits"),
1429 NXT_CONF_MAP_PTR,
1430 offsetof(nxt_router_app_conf_t, limits_value),
1431 },
1432
1433 {
1434 nxt_string("processes"),
1435 NXT_CONF_MAP_INT32,
1436 offsetof(nxt_router_app_conf_t, processes),
1437 },
1438
1439 {
1440 nxt_string("processes"),
1441 NXT_CONF_MAP_PTR,
1442 offsetof(nxt_router_app_conf_t, processes_value),
1443 },
1444
1445 {
1446 nxt_string("targets"),
1447 NXT_CONF_MAP_PTR,
1448 offsetof(nxt_router_app_conf_t, targets_value),
1449 },
1450 };
1451
1452
1453 static nxt_conf_map_t nxt_router_app_limits_conf[] = {
1454 {
1455 nxt_string("timeout"),
1456 NXT_CONF_MAP_MSEC,
1457 offsetof(nxt_router_app_conf_t, timeout),
1458 },
1459 };
1460
1461
1462 static nxt_conf_map_t nxt_router_app_processes_conf[] = {
1463 {
1464 nxt_string("spare"),
1465 NXT_CONF_MAP_INT32,
1466 offsetof(nxt_router_app_conf_t, spare_processes),
1467 },
1468
1469 {
1470 nxt_string("max"),
1471 NXT_CONF_MAP_INT32,
1472 offsetof(nxt_router_app_conf_t, max_processes),
1473 },
1474
1475 {
1476 nxt_string("idle_timeout"),
1477 NXT_CONF_MAP_MSEC,
1478 offsetof(nxt_router_app_conf_t, idle_timeout),
1479 },
1480 };
1481
1482
1483 static nxt_conf_map_t nxt_router_listener_conf[] = {
1484 {
1485 nxt_string("pass"),
1486 NXT_CONF_MAP_STR_COPY,
1487 offsetof(nxt_router_listener_conf_t, pass),
1488 },
1489
1490 {
1491 nxt_string("application"),
1492 NXT_CONF_MAP_STR_COPY,
1493 offsetof(nxt_router_listener_conf_t, application),
1494 },
1495 };
1496
1497
1498 static nxt_conf_map_t nxt_router_http_conf[] = {
1499 {
1500 nxt_string("header_buffer_size"),
1501 NXT_CONF_MAP_SIZE,
1502 offsetof(nxt_socket_conf_t, header_buffer_size),
1503 },
1504
1505 {
1506 nxt_string("large_header_buffer_size"),
1507 NXT_CONF_MAP_SIZE,
1508 offsetof(nxt_socket_conf_t, large_header_buffer_size),
1509 },
1510
1511 {
1512 nxt_string("large_header_buffers"),
1513 NXT_CONF_MAP_SIZE,
1514 offsetof(nxt_socket_conf_t, large_header_buffers),
1515 },
1516
1517 {
1518 nxt_string("body_buffer_size"),
1519 NXT_CONF_MAP_SIZE,
1520 offsetof(nxt_socket_conf_t, body_buffer_size),
1521 },
1522
1523 {
1524 nxt_string("max_body_size"),
1525 NXT_CONF_MAP_SIZE,
1526 offsetof(nxt_socket_conf_t, max_body_size),
1527 },
1528
1529 {
1530 nxt_string("idle_timeout"),
1531 NXT_CONF_MAP_MSEC,
1532 offsetof(nxt_socket_conf_t, idle_timeout),
1533 },
1534
1535 {
1536 nxt_string("header_read_timeout"),
1537 NXT_CONF_MAP_MSEC,
1538 offsetof(nxt_socket_conf_t, header_read_timeout),
1539 },
1540
1541 {
1542 nxt_string("body_read_timeout"),
1543 NXT_CONF_MAP_MSEC,
1544 offsetof(nxt_socket_conf_t, body_read_timeout),
1545 },
1546
1547 {
1548 nxt_string("send_timeout"),
1549 NXT_CONF_MAP_MSEC,
1550 offsetof(nxt_socket_conf_t, send_timeout),
1551 },
1552
1553 {
1554 nxt_string("body_temp_path"),
1555 NXT_CONF_MAP_STR,
1556 offsetof(nxt_socket_conf_t, body_temp_path),
1557 },
1558
1559 {
1560 nxt_string("discard_unsafe_fields"),
1561 NXT_CONF_MAP_INT8,
1562 offsetof(nxt_socket_conf_t, discard_unsafe_fields),
1563 },
1564
1565 {
1566 nxt_string("log_route"),
1567 NXT_CONF_MAP_INT8,
1568 offsetof(nxt_socket_conf_t, log_route),
1569 },
1570
1571 {
1572 nxt_string("server_version"),
1573 NXT_CONF_MAP_INT8,
1574 offsetof(nxt_socket_conf_t, server_version),
1575 },
1576 };
1577
1578
1579 static nxt_conf_map_t nxt_router_websocket_conf[] = {
1580 {
1581 nxt_string("max_frame_size"),
1582 NXT_CONF_MAP_SIZE,
1583 offsetof(nxt_websocket_conf_t, max_frame_size),
1584 },
1585
1586 {
1587 nxt_string("read_timeout"),
1588 NXT_CONF_MAP_MSEC,
1589 offsetof(nxt_websocket_conf_t, read_timeout),
1590 },
1591
1592 {
1593 nxt_string("keepalive_interval"),
1594 NXT_CONF_MAP_MSEC,
1595 offsetof(nxt_websocket_conf_t, keepalive_interval),
1596 },
1597
1598 };
1599
1600
1601 static nxt_int_t
nxt_router_conf_create(nxt_task_t * task,nxt_router_temp_conf_t * tmcf,u_char * start,u_char * end)1602 nxt_router_conf_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf,
1603 u_char *start, u_char *end)
1604 {
1605 u_char *p;
1606 size_t size;
1607 nxt_mp_t *mp, *app_mp;
1608 uint32_t next, next_target;
1609 nxt_int_t ret;
1610 nxt_str_t name, target;
1611 nxt_app_t *app, *prev;
1612 nxt_str_t *t, *s, *targets;
1613 nxt_uint_t n, i;
1614 nxt_port_t *port;
1615 nxt_router_t *router;
1616 nxt_app_joint_t *app_joint;
1617 #if (NXT_TLS)
1618 nxt_tls_init_t *tls_init;
1619 nxt_conf_value_t *certificate;
1620 #endif
1621 #if (NXT_HAVE_NJS)
1622 nxt_conf_value_t *js_module;
1623 #endif
1624 nxt_conf_value_t *root, *conf, *http, *value, *websocket;
1625 nxt_conf_value_t *applications, *application;
1626 nxt_conf_value_t *listeners, *listener;
1627 nxt_socket_conf_t *skcf;
1628 nxt_router_conf_t *rtcf;
1629 nxt_http_routes_t *routes;
1630 nxt_event_engine_t *engine;
1631 nxt_app_lang_module_t *lang;
1632 nxt_router_app_conf_t apcf;
1633 nxt_router_listener_conf_t lscf;
1634
1635 static nxt_str_t http_path = nxt_string("/settings/http");
1636 static nxt_str_t applications_path = nxt_string("/applications");
1637 static nxt_str_t listeners_path = nxt_string("/listeners");
1638 static nxt_str_t routes_path = nxt_string("/routes");
1639 static nxt_str_t access_log_path = nxt_string("/access_log");
1640 #if (NXT_TLS)
1641 static nxt_str_t certificate_path = nxt_string("/tls/certificate");
1642 static nxt_str_t conf_commands_path = nxt_string("/tls/conf_commands");
1643 static nxt_str_t conf_cache_path = nxt_string("/tls/session/cache_size");
1644 static nxt_str_t conf_timeout_path = nxt_string("/tls/session/timeout");
1645 static nxt_str_t conf_tickets = nxt_string("/tls/session/tickets");
1646 #endif
1647 #if (NXT_HAVE_NJS)
1648 static nxt_str_t js_module_path = nxt_string("/settings/js_module");
1649 #endif
1650 static nxt_str_t static_path = nxt_string("/settings/http/static");
1651 static nxt_str_t websocket_path = nxt_string("/settings/http/websocket");
1652 static nxt_str_t forwarded_path = nxt_string("/forwarded");
1653 static nxt_str_t client_ip_path = nxt_string("/client_ip");
1654
1655 root = nxt_conf_json_parse(tmcf->mem_pool, start, end, NULL);
1656 if (root == NULL) {
1657 nxt_alert(task, "configuration parsing error");
1658 return NXT_ERROR;
1659 }
1660
1661 rtcf = tmcf->router_conf;
1662 mp = rtcf->mem_pool;
1663
1664 ret = nxt_conf_map_object(mp, root, nxt_router_conf,
1665 nxt_nitems(nxt_router_conf), rtcf);
1666 if (ret != NXT_OK) {
1667 nxt_alert(task, "root map error");
1668 return NXT_ERROR;
1669 }
1670
1671 if (rtcf->threads == 0) {
1672 rtcf->threads = nxt_ncpu;
1673 }
1674
1675 conf = nxt_conf_get_path(root, &static_path);
1676
1677 ret = nxt_router_conf_process_static(task, rtcf, conf);
1678 if (nxt_slow_path(ret != NXT_OK)) {
1679 return NXT_ERROR;
1680 }
1681
1682 router = rtcf->router;
1683
1684 applications = nxt_conf_get_path(root, &applications_path);
1685
1686 if (applications != NULL) {
1687 next = 0;
1688
1689 for ( ;; ) {
1690 application = nxt_conf_next_object_member(applications,
1691 &name, &next);
1692 if (application == NULL) {
1693 break;
1694 }
1695
1696 nxt_debug(task, "application \"%V\"", &name);
1697
1698 size = nxt_conf_json_length(application, NULL);
1699
1700 app_mp = nxt_mp_create(4096, 128, 1024, 64);
1701 if (nxt_slow_path(app_mp == NULL)) {
1702 goto fail;
1703 }
1704
1705 app = nxt_mp_get(app_mp, sizeof(nxt_app_t) + name.length + size);
1706 if (app == NULL) {
1707 goto app_fail;
1708 }
1709
1710 nxt_memzero(app, sizeof(nxt_app_t));
1711
1712 app->mem_pool = app_mp;
1713
1714 app->name.start = nxt_pointer_to(app, sizeof(nxt_app_t));
1715 app->conf.start = nxt_pointer_to(app, sizeof(nxt_app_t)
1716 + name.length);
1717
1718 p = nxt_conf_json_print(app->conf.start, application, NULL);
1719 app->conf.length = p - app->conf.start;
1720
1721 nxt_assert(app->conf.length <= size);
1722
1723 nxt_debug(task, "application conf \"%V\"", &app->conf);
1724
1725 prev = nxt_router_app_find(&router->apps, &name);
1726
1727 if (prev != NULL && nxt_strstr_eq(&app->conf, &prev->conf)) {
1728 nxt_mp_destroy(app_mp);
1729
1730 nxt_queue_remove(&prev->link);
1731 nxt_queue_insert_tail(&tmcf->previous, &prev->link);
1732
1733 ret = nxt_router_apps_hash_add(rtcf, prev);
1734 if (nxt_slow_path(ret != NXT_OK)) {
1735 goto fail;
1736 }
1737
1738 continue;
1739 }
1740
1741 apcf.processes = 1;
1742 apcf.max_processes = 1;
1743 apcf.spare_processes = 0;
1744 apcf.timeout = 0;
1745 apcf.idle_timeout = 15000;
1746 apcf.limits_value = NULL;
1747 apcf.processes_value = NULL;
1748 apcf.targets_value = NULL;
1749
1750 app_joint = nxt_malloc(sizeof(nxt_app_joint_t));
1751 if (nxt_slow_path(app_joint == NULL)) {
1752 goto app_fail;
1753 }
1754
1755 nxt_memzero(app_joint, sizeof(nxt_app_joint_t));
1756
1757 ret = nxt_conf_map_object(mp, application, nxt_router_app_conf,
1758 nxt_nitems(nxt_router_app_conf), &apcf);
1759 if (ret != NXT_OK) {
1760 nxt_alert(task, "application map error");
1761 goto app_fail;
1762 }
1763
1764 if (apcf.limits_value != NULL) {
1765
1766 if (nxt_conf_type(apcf.limits_value) != NXT_CONF_OBJECT) {
1767 nxt_alert(task, "application limits is not object");
1768 goto app_fail;
1769 }
1770
1771 ret = nxt_conf_map_object(mp, apcf.limits_value,
1772 nxt_router_app_limits_conf,
1773 nxt_nitems(nxt_router_app_limits_conf),
1774 &apcf);
1775 if (ret != NXT_OK) {
1776 nxt_alert(task, "application limits map error");
1777 goto app_fail;
1778 }
1779 }
1780
1781 if (apcf.processes_value != NULL
1782 && nxt_conf_type(apcf.processes_value) == NXT_CONF_OBJECT)
1783 {
1784 ret = nxt_conf_map_object(mp, apcf.processes_value,
1785 nxt_router_app_processes_conf,
1786 nxt_nitems(nxt_router_app_processes_conf),
1787 &apcf);
1788 if (ret != NXT_OK) {
1789 nxt_alert(task, "application processes map error");
1790 goto app_fail;
1791 }
1792
1793 } else {
1794 apcf.max_processes = apcf.processes;
1795 apcf.spare_processes = apcf.processes;
1796 }
1797
1798 if (apcf.targets_value != NULL) {
1799 n = nxt_conf_object_members_count(apcf.targets_value);
1800
1801 targets = nxt_mp_get(app_mp, sizeof(nxt_str_t) * n);
1802 if (nxt_slow_path(targets == NULL)) {
1803 goto app_fail;
1804 }
1805
1806 next_target = 0;
1807
1808 for (i = 0; i < n; i++) {
1809 (void) nxt_conf_next_object_member(apcf.targets_value,
1810 &target, &next_target);
1811
1812 s = nxt_str_dup(app_mp, &targets[i], &target);
1813 if (nxt_slow_path(s == NULL)) {
1814 goto app_fail;
1815 }
1816 }
1817
1818 } else {
1819 targets = NULL;
1820 }
1821
1822 nxt_debug(task, "application type: %V", &apcf.type);
1823 nxt_debug(task, "application processes: %D", apcf.processes);
1824 nxt_debug(task, "application request timeout: %M", apcf.timeout);
1825
1826 lang = nxt_app_lang_module(task->thread->runtime, &apcf.type);
1827
1828 if (lang == NULL) {
1829 nxt_alert(task, "unknown application type: \"%V\"", &apcf.type);
1830 goto app_fail;
1831 }
1832
1833 nxt_debug(task, "application language module: \"%s\"", lang->file);
1834
1835 ret = nxt_thread_mutex_create(&app->mutex);
1836 if (ret != NXT_OK) {
1837 goto app_fail;
1838 }
1839
1840 nxt_queue_init(&app->ports);
1841 nxt_queue_init(&app->spare_ports);
1842 nxt_queue_init(&app->idle_ports);
1843 nxt_queue_init(&app->ack_waiting_req);
1844
1845 app->name.length = name.length;
1846 nxt_memcpy(app->name.start, name.start, name.length);
1847
1848 app->type = lang->type;
1849 app->max_processes = apcf.max_processes;
1850 app->spare_processes = apcf.spare_processes;
1851 app->max_pending_processes = apcf.spare_processes
1852 ? apcf.spare_processes : 1;
1853 app->timeout = apcf.timeout;
1854 app->idle_timeout = apcf.idle_timeout;
1855
1856 app->targets = targets;
1857
1858 engine = task->thread->engine;
1859
1860 app->engine = engine;
1861
1862 app->adjust_idle_work.handler = nxt_router_adjust_idle_timer;
1863 app->adjust_idle_work.task = &engine->task;
1864 app->adjust_idle_work.obj = app;
1865
1866 nxt_queue_insert_tail(&tmcf->apps, &app->link);
1867
1868 ret = nxt_router_apps_hash_add(rtcf, app);
1869 if (nxt_slow_path(ret != NXT_OK)) {
1870 goto app_fail;
1871 }
1872
1873 nxt_router_app_use(task, app, 1);
1874
1875 app->joint = app_joint;
1876
1877 app_joint->use_count = 1;
1878 app_joint->app = app;
1879
1880 app_joint->idle_timer.bias = NXT_TIMER_DEFAULT_BIAS;
1881 app_joint->idle_timer.work_queue = &engine->fast_work_queue;
1882 app_joint->idle_timer.handler = nxt_router_app_idle_timeout;
1883 app_joint->idle_timer.task = &engine->task;
1884 app_joint->idle_timer.log = app_joint->idle_timer.task->log;
1885
1886 app_joint->free_app_work.handler = nxt_router_free_app;
1887 app_joint->free_app_work.task = &engine->task;
1888 app_joint->free_app_work.obj = app_joint;
1889
1890 port = nxt_port_new(task, NXT_SHARED_PORT_ID, nxt_pid,
1891 NXT_PROCESS_APP);
1892 if (nxt_slow_path(port == NULL)) {
1893 return NXT_ERROR;
1894 }
1895
1896 ret = nxt_port_socket_init(task, port, 0);
1897 if (nxt_slow_path(ret != NXT_OK)) {
1898 nxt_port_use(task, port, -1);
1899 return NXT_ERROR;
1900 }
1901
1902 ret = nxt_router_app_queue_init(task, port);
1903 if (nxt_slow_path(ret != NXT_OK)) {
1904 nxt_port_write_close(port);
1905 nxt_port_read_close(port);
1906 nxt_port_use(task, port, -1);
1907 return NXT_ERROR;
1908 }
1909
1910 nxt_port_write_enable(task, port);
1911 port->app = app;
1912
1913 app->shared_port = port;
1914
1915 nxt_thread_mutex_create(&app->outgoing.mutex);
1916 }
1917 }
1918
1919 conf = nxt_conf_get_path(root, &routes_path);
1920 if (nxt_fast_path(conf != NULL)) {
1921 routes = nxt_http_routes_create(task, tmcf, conf);
1922 if (nxt_slow_path(routes == NULL)) {
1923 return NXT_ERROR;
1924 }
1925
1926 rtcf->routes = routes;
1927 }
1928
1929 ret = nxt_upstreams_create(task, tmcf, root);
1930 if (nxt_slow_path(ret != NXT_OK)) {
1931 return ret;
1932 }
1933
1934 http = nxt_conf_get_path(root, &http_path);
1935 #if 0
1936 if (http == NULL) {
1937 nxt_alert(task, "no \"http\" block");
1938 return NXT_ERROR;
1939 }
1940 #endif
1941
1942 websocket = nxt_conf_get_path(root, &websocket_path);
1943
1944 listeners = nxt_conf_get_path(root, &listeners_path);
1945
1946 if (listeners != NULL) {
1947 next = 0;
1948
1949 for ( ;; ) {
1950 listener = nxt_conf_next_object_member(listeners, &name, &next);
1951 if (listener == NULL) {
1952 break;
1953 }
1954
1955 skcf = nxt_router_socket_conf(task, tmcf, &name);
1956 if (skcf == NULL) {
1957 goto fail;
1958 }
1959
1960 nxt_memzero(&lscf, sizeof(lscf));
1961
1962 ret = nxt_conf_map_object(mp, listener, nxt_router_listener_conf,
1963 nxt_nitems(nxt_router_listener_conf),
1964 &lscf);
1965 if (ret != NXT_OK) {
1966 nxt_alert(task, "listener map error");
1967 goto fail;
1968 }
1969
1970 nxt_debug(task, "application: %V", &lscf.application);
1971
1972 // STUB, default values if http block is not defined.
1973 skcf->header_buffer_size = 2048;
1974 skcf->large_header_buffer_size = 8192;
1975 skcf->large_header_buffers = 4;
1976 skcf->discard_unsafe_fields = 1;
1977 skcf->body_buffer_size = 16 * 1024;
1978 skcf->max_body_size = 8 * 1024 * 1024;
1979 skcf->proxy_header_buffer_size = 64 * 1024;
1980 skcf->proxy_buffer_size = 4096;
1981 skcf->proxy_buffers = 256;
1982 skcf->idle_timeout = 180 * 1000;
1983 skcf->header_read_timeout = 30 * 1000;
1984 skcf->body_read_timeout = 30 * 1000;
1985 skcf->send_timeout = 30 * 1000;
1986 skcf->proxy_timeout = 60 * 1000;
1987 skcf->proxy_send_timeout = 30 * 1000;
1988 skcf->proxy_read_timeout = 30 * 1000;
1989
1990 skcf->server_version = 1;
1991
1992 skcf->websocket_conf.max_frame_size = 1024 * 1024;
1993 skcf->websocket_conf.read_timeout = 60 * 1000;
1994 skcf->websocket_conf.keepalive_interval = 30 * 1000;
1995
1996 nxt_str_null(&skcf->body_temp_path);
1997
1998 if (http != NULL) {
1999 ret = nxt_conf_map_object(mp, http, nxt_router_http_conf,
2000 nxt_nitems(nxt_router_http_conf),
2001 skcf);
2002 if (ret != NXT_OK) {
2003 nxt_alert(task, "http map error");
2004 goto fail;
2005 }
2006 }
2007
2008 if (websocket != NULL) {
2009 ret = nxt_conf_map_object(mp, websocket,
2010 nxt_router_websocket_conf,
2011 nxt_nitems(nxt_router_websocket_conf),
2012 &skcf->websocket_conf);
2013 if (ret != NXT_OK) {
2014 nxt_alert(task, "websocket map error");
2015 goto fail;
2016 }
2017 }
2018
2019 t = &skcf->body_temp_path;
2020
2021 if (t->length == 0) {
2022 t->start = (u_char *) task->thread->runtime->tmp;
2023 t->length = nxt_strlen(t->start);
2024 }
2025
2026 conf = nxt_conf_get_path(listener, &forwarded_path);
2027
2028 if (conf != NULL) {
2029 skcf->forwarded = nxt_router_conf_forward(task, mp, conf);
2030 if (nxt_slow_path(skcf->forwarded == NULL)) {
2031 return NXT_ERROR;
2032 }
2033 }
2034
2035 conf = nxt_conf_get_path(listener, &client_ip_path);
2036
2037 if (conf != NULL) {
2038 skcf->client_ip = nxt_router_conf_forward(task, mp, conf);
2039 if (nxt_slow_path(skcf->client_ip == NULL)) {
2040 return NXT_ERROR;
2041 }
2042 }
2043
2044 #if (NXT_TLS)
2045 certificate = nxt_conf_get_path(listener, &certificate_path);
2046
2047 if (certificate != NULL) {
2048 tls_init = nxt_mp_get(tmcf->mem_pool, sizeof(nxt_tls_init_t));
2049 if (nxt_slow_path(tls_init == NULL)) {
2050 return NXT_ERROR;
2051 }
2052
2053 tls_init->cache_size = 0;
2054 tls_init->timeout = 300;
2055
2056 value = nxt_conf_get_path(listener, &conf_cache_path);
2057 if (value != NULL) {
2058 tls_init->cache_size = nxt_conf_get_number(value);
2059 }
2060
2061 value = nxt_conf_get_path(listener, &conf_timeout_path);
2062 if (value != NULL) {
2063 tls_init->timeout = nxt_conf_get_number(value);
2064 }
2065
2066 tls_init->conf_cmds = nxt_conf_get_path(listener,
2067 &conf_commands_path);
2068
2069 tls_init->tickets_conf = nxt_conf_get_path(listener,
2070 &conf_tickets);
2071
2072 n = nxt_conf_array_elements_count_or_1(certificate);
2073
2074 for (i = 0; i < n; i++) {
2075 value = nxt_conf_get_array_element_or_itself(certificate,
2076 i);
2077 nxt_assert(value != NULL);
2078
2079 ret = nxt_router_conf_tls_insert(tmcf, value, skcf,
2080 tls_init, i == 0);
2081 if (nxt_slow_path(ret != NXT_OK)) {
2082 goto fail;
2083 }
2084 }
2085 }
2086 #endif
2087
2088 skcf->listen->handler = nxt_http_conn_init;
2089 skcf->router_conf = rtcf;
2090 skcf->router_conf->count++;
2091
2092 if (lscf.pass.length != 0) {
2093 skcf->action = nxt_http_action_create(task, tmcf, &lscf.pass);
2094
2095 /* COMPATIBILITY: listener application. */
2096 } else if (lscf.application.length > 0) {
2097 skcf->action = nxt_http_pass_application(task, rtcf,
2098 &lscf.application);
2099 }
2100
2101 if (nxt_slow_path(skcf->action == NULL)) {
2102 goto fail;
2103 }
2104 }
2105 }
2106
2107 ret = nxt_http_routes_resolve(task, tmcf);
2108 if (nxt_slow_path(ret != NXT_OK)) {
2109 goto fail;
2110 }
2111
2112 value = nxt_conf_get_path(root, &access_log_path);
2113
2114 if (value != NULL) {
2115 ret = nxt_router_access_log_create(task, rtcf, value);
2116 if (nxt_slow_path(ret != NXT_OK)) {
2117 goto fail;
2118 }
2119 }
2120
2121 #if (NXT_HAVE_NJS)
2122 js_module = nxt_conf_get_path(root, &js_module_path);
2123
2124 if (js_module != NULL) {
2125 if (nxt_conf_type(js_module) == NXT_CONF_ARRAY) {
2126 n = nxt_conf_array_elements_count(js_module);
2127
2128 for (i = 0; i < n; i++) {
2129 value = nxt_conf_get_array_element(js_module, i);
2130
2131 ret = nxt_router_js_module_insert(tmcf, value);
2132 if (nxt_slow_path(ret != NXT_OK)) {
2133 goto fail;
2134 }
2135 }
2136
2137 } else {
2138 /* NXT_CONF_STRING */
2139
2140 ret = nxt_router_js_module_insert(tmcf, js_module);
2141 if (nxt_slow_path(ret != NXT_OK)) {
2142 goto fail;
2143 }
2144 }
2145 }
2146
2147 #endif
2148
2149 nxt_queue_add(&deleting_sockets, &router->sockets);
2150 nxt_queue_init(&router->sockets);
2151
2152 return NXT_OK;
2153
2154 app_fail:
2155
2156 nxt_mp_destroy(app_mp);
2157
2158 fail:
2159
2160 nxt_queue_each(app, &tmcf->apps, nxt_app_t, link) {
2161
2162 nxt_queue_remove(&app->link);
2163 nxt_thread_mutex_destroy(&app->mutex);
2164 nxt_mp_destroy(app->mem_pool);
2165
2166 } nxt_queue_loop;
2167
2168 return NXT_ERROR;
2169 }
2170
2171
2172 #if (NXT_TLS)
2173
2174 static nxt_int_t
nxt_router_conf_tls_insert(nxt_router_temp_conf_t * tmcf,nxt_conf_value_t * value,nxt_socket_conf_t * skcf,nxt_tls_init_t * tls_init,nxt_bool_t last)2175 nxt_router_conf_tls_insert(nxt_router_temp_conf_t *tmcf,
2176 nxt_conf_value_t *value, nxt_socket_conf_t *skcf,
2177 nxt_tls_init_t *tls_init, nxt_bool_t last)
2178 {
2179 nxt_router_tlssock_t *tls;
2180
2181 tls = nxt_mp_get(tmcf->mem_pool, sizeof(nxt_router_tlssock_t));
2182 if (nxt_slow_path(tls == NULL)) {
2183 return NXT_ERROR;
2184 }
2185
2186 tls->tls_init = tls_init;
2187 tls->socket_conf = skcf;
2188 tls->temp_conf = tmcf;
2189 tls->last = last;
2190 nxt_conf_get_string(value, &tls->name);
2191
2192 nxt_queue_insert_tail(&tmcf->tls, &tls->link);
2193
2194 return NXT_OK;
2195 }
2196
2197 #endif
2198
2199
2200 #if (NXT_HAVE_NJS)
2201
2202 static void
nxt_router_js_module_rpc_handler(nxt_task_t * task,nxt_port_recv_msg_t * msg,void * data)2203 nxt_router_js_module_rpc_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg,
2204 void *data)
2205 {
2206 nxt_int_t ret;
2207 nxt_str_t text;
2208 nxt_router_conf_t *rtcf;
2209 nxt_router_temp_conf_t *tmcf;
2210 nxt_router_js_module_t *js_module;
2211
2212 nxt_debug(task, "auto module rpc handler");
2213
2214 js_module = data;
2215 tmcf = js_module->temp_conf;
2216
2217 if (msg == NULL || msg->port_msg.type == _NXT_PORT_MSG_RPC_ERROR) {
2218 goto fail;
2219 }
2220
2221 rtcf = tmcf->router_conf;
2222
2223 ret = nxt_script_file_read(msg->fd[0], &text);
2224
2225 nxt_fd_close(msg->fd[0]);
2226
2227 if (nxt_slow_path(ret == NXT_ERROR)) {
2228 goto fail;
2229 }
2230
2231 if (text.length > 0) {
2232 ret = nxt_js_add_module(rtcf->tstr_state->jcf, &js_module->name, &text);
2233
2234 nxt_free(text.start);
2235
2236 if (nxt_slow_path(ret == NXT_ERROR)) {
2237 goto fail;
2238 }
2239 }
2240
2241 nxt_work_queue_add(&task->thread->engine->fast_work_queue,
2242 nxt_router_conf_apply, task, tmcf, NULL);
2243 return;
2244
2245 fail:
2246
2247 nxt_router_conf_error(task, tmcf);
2248 }
2249
2250
2251 static nxt_int_t
nxt_router_js_module_insert(nxt_router_temp_conf_t * tmcf,nxt_conf_value_t * value)2252 nxt_router_js_module_insert(nxt_router_temp_conf_t *tmcf,
2253 nxt_conf_value_t *value)
2254 {
2255 nxt_router_js_module_t *js_module;
2256
2257 js_module = nxt_mp_get(tmcf->mem_pool, sizeof(nxt_router_js_module_t));
2258 if (nxt_slow_path(js_module == NULL)) {
2259 return NXT_ERROR;
2260 }
2261
2262 js_module->temp_conf = tmcf;
2263 nxt_conf_get_string(value, &js_module->name);
2264
2265 nxt_queue_insert_tail(&tmcf->js_modules, &js_module->link);
2266
2267 return NXT_OK;
2268 }
2269
2270 #endif
2271
2272
2273 static nxt_int_t
nxt_router_conf_process_static(nxt_task_t * task,nxt_router_conf_t * rtcf,nxt_conf_value_t * conf)2274 nxt_router_conf_process_static(nxt_task_t *task, nxt_router_conf_t *rtcf,
2275 nxt_conf_value_t *conf)
2276 {
2277 uint32_t next, i;
2278 nxt_mp_t *mp;
2279 nxt_str_t *type, exten, str, *s;
2280 nxt_int_t ret;
2281 nxt_uint_t exts;
2282 nxt_conf_value_t *mtypes_conf, *ext_conf, *value;
2283
2284 static nxt_str_t mtypes_path = nxt_string("/mime_types");
2285
2286 mp = rtcf->mem_pool;
2287
2288 ret = nxt_http_static_mtypes_init(mp, &rtcf->mtypes_hash);
2289 if (nxt_slow_path(ret != NXT_OK)) {
2290 return NXT_ERROR;
2291 }
2292
2293 if (conf == NULL) {
2294 return NXT_OK;
2295 }
2296
2297 mtypes_conf = nxt_conf_get_path(conf, &mtypes_path);
2298
2299 if (mtypes_conf != NULL) {
2300 next = 0;
2301
2302 for ( ;; ) {
2303 ext_conf = nxt_conf_next_object_member(mtypes_conf, &str, &next);
2304
2305 if (ext_conf == NULL) {
2306 break;
2307 }
2308
2309 type = nxt_str_dup(mp, NULL, &str);
2310 if (nxt_slow_path(type == NULL)) {
2311 return NXT_ERROR;
2312 }
2313
2314 if (nxt_conf_type(ext_conf) == NXT_CONF_STRING) {
2315 s = nxt_conf_get_string_dup(ext_conf, mp, &exten);
2316 if (nxt_slow_path(s == NULL)) {
2317 return NXT_ERROR;
2318 }
2319
2320 ret = nxt_http_static_mtypes_hash_add(mp, &rtcf->mtypes_hash,
2321 &exten, type);
2322 if (nxt_slow_path(ret != NXT_OK)) {
2323 return NXT_ERROR;
2324 }
2325
2326 continue;
2327 }
2328
2329 exts = nxt_conf_array_elements_count(ext_conf);
2330
2331 for (i = 0; i < exts; i++) {
2332 value = nxt_conf_get_array_element(ext_conf, i);
2333
2334 s = nxt_conf_get_string_dup(value, mp, &exten);
2335 if (nxt_slow_path(s == NULL)) {
2336 return NXT_ERROR;
2337 }
2338
2339 ret = nxt_http_static_mtypes_hash_add(mp, &rtcf->mtypes_hash,
2340 &exten, type);
2341 if (nxt_slow_path(ret != NXT_OK)) {
2342 return NXT_ERROR;
2343 }
2344 }
2345 }
2346 }
2347
2348 return NXT_OK;
2349 }
2350
2351
2352 static nxt_http_forward_t *
nxt_router_conf_forward(nxt_task_t * task,nxt_mp_t * mp,nxt_conf_value_t * conf)2353 nxt_router_conf_forward(nxt_task_t *task, nxt_mp_t *mp, nxt_conf_value_t *conf)
2354 {
2355 nxt_int_t ret;
2356 nxt_conf_value_t *header_conf, *client_ip_conf, *protocol_conf;
2357 nxt_conf_value_t *source_conf, *recursive_conf;
2358 nxt_http_forward_t *forward;
2359 nxt_http_route_addr_rule_t *source;
2360
2361 static nxt_str_t header_path = nxt_string("/header");
2362 static nxt_str_t client_ip_path = nxt_string("/client_ip");
2363 static nxt_str_t protocol_path = nxt_string("/protocol");
2364 static nxt_str_t source_path = nxt_string("/source");
2365 static nxt_str_t recursive_path = nxt_string("/recursive");
2366
2367 header_conf = nxt_conf_get_path(conf, &header_path);
2368
2369 if (header_conf != NULL) {
2370 client_ip_conf = nxt_conf_get_path(conf, &header_path);
2371 protocol_conf = NULL;
2372
2373 } else {
2374 client_ip_conf = nxt_conf_get_path(conf, &client_ip_path);
2375 protocol_conf = nxt_conf_get_path(conf, &protocol_path);
2376 }
2377
2378 source_conf = nxt_conf_get_path(conf, &source_path);
2379 recursive_conf = nxt_conf_get_path(conf, &recursive_path);
2380
2381 if (source_conf == NULL
2382 || (protocol_conf == NULL && client_ip_conf == NULL))
2383 {
2384 return NULL;
2385 }
2386
2387 forward = nxt_mp_zget(mp, sizeof(nxt_http_forward_t));
2388 if (nxt_slow_path(forward == NULL)) {
2389 return NULL;
2390 }
2391
2392 source = nxt_http_route_addr_rule_create(task, mp, source_conf);
2393 if (nxt_slow_path(source == NULL)) {
2394 return NULL;
2395 }
2396
2397 forward->source = source;
2398
2399 if (recursive_conf != NULL) {
2400 forward->recursive = nxt_conf_get_boolean(recursive_conf);
2401 }
2402
2403 if (client_ip_conf != NULL) {
2404 ret = nxt_router_conf_forward_header(mp, client_ip_conf,
2405 &forward->client_ip);
2406 if (nxt_slow_path(ret != NXT_OK)) {
2407 return NULL;
2408 }
2409 }
2410
2411 if (protocol_conf != NULL) {
2412 ret = nxt_router_conf_forward_header(mp, protocol_conf,
2413 &forward->protocol);
2414 if (nxt_slow_path(ret != NXT_OK)) {
2415 return NULL;
2416 }
2417 }
2418
2419 return forward;
2420 }
2421
2422
2423 static nxt_int_t
nxt_router_conf_forward_header(nxt_mp_t * mp,nxt_conf_value_t * conf,nxt_http_forward_header_t * fh)2424 nxt_router_conf_forward_header(nxt_mp_t *mp, nxt_conf_value_t *conf,
2425 nxt_http_forward_header_t *fh)
2426 {
2427 char c;
2428 size_t i;
2429 uint32_t hash;
2430
2431 fh->header = nxt_conf_get_string_dup(conf, mp, NULL);
2432 if (nxt_slow_path(fh->header == NULL)) {
2433 return NXT_ERROR;
2434 }
2435
2436 hash = NXT_HTTP_FIELD_HASH_INIT;
2437
2438 for (i = 0; i < fh->header->length; i++) {
2439 c = fh->header->start[i];
2440 hash = nxt_http_field_hash_char(hash, nxt_lowcase(c));
2441 }
2442
2443 hash = nxt_http_field_hash_end(hash) & 0xFFFF;
2444
2445 fh->header_hash = hash;
2446
2447 return NXT_OK;
2448 }
2449
2450
2451 static nxt_app_t *
nxt_router_app_find(nxt_queue_t * queue,nxt_str_t * name)2452 nxt_router_app_find(nxt_queue_t *queue, nxt_str_t *name)
2453 {
2454 nxt_app_t *app;
2455
2456 nxt_queue_each(app, queue, nxt_app_t, link) {
2457
2458 if (nxt_strstr_eq(name, &app->name)) {
2459 return app;
2460 }
2461
2462 } nxt_queue_loop;
2463
2464 return NULL;
2465 }
2466
2467
2468 static nxt_int_t
nxt_router_app_queue_init(nxt_task_t * task,nxt_port_t * port)2469 nxt_router_app_queue_init(nxt_task_t *task, nxt_port_t *port)
2470 {
2471 void *mem;
2472 nxt_int_t fd;
2473
2474 fd = nxt_shm_open(task, sizeof(nxt_app_queue_t));
2475 if (nxt_slow_path(fd == -1)) {
2476 return NXT_ERROR;
2477 }
2478
2479 mem = nxt_mem_mmap(NULL, sizeof(nxt_app_queue_t),
2480 PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
2481 if (nxt_slow_path(mem == MAP_FAILED)) {
2482 nxt_fd_close(fd);
2483
2484 return NXT_ERROR;
2485 }
2486
2487 nxt_app_queue_init(mem);
2488
2489 port->queue_fd = fd;
2490 port->queue = mem;
2491
2492 return NXT_OK;
2493 }
2494
2495
2496 static nxt_int_t
nxt_router_port_queue_init(nxt_task_t * task,nxt_port_t * port)2497 nxt_router_port_queue_init(nxt_task_t *task, nxt_port_t *port)
2498 {
2499 void *mem;
2500 nxt_int_t fd;
2501
2502 fd = nxt_shm_open(task, sizeof(nxt_port_queue_t));
2503 if (nxt_slow_path(fd == -1)) {
2504 return NXT_ERROR;
2505 }
2506
2507 mem = nxt_mem_mmap(NULL, sizeof(nxt_port_queue_t),
2508 PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
2509 if (nxt_slow_path(mem == MAP_FAILED)) {
2510 nxt_fd_close(fd);
2511
2512 return NXT_ERROR;
2513 }
2514
2515 nxt_port_queue_init(mem);
2516
2517 port->queue_fd = fd;
2518 port->queue = mem;
2519
2520 return NXT_OK;
2521 }
2522
2523
2524 static nxt_int_t
nxt_router_port_queue_map(nxt_task_t * task,nxt_port_t * port,nxt_fd_t fd)2525 nxt_router_port_queue_map(nxt_task_t *task, nxt_port_t *port, nxt_fd_t fd)
2526 {
2527 void *mem;
2528
2529 nxt_assert(fd != -1);
2530
2531 mem = nxt_mem_mmap(NULL, sizeof(nxt_port_queue_t),
2532 PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
2533 if (nxt_slow_path(mem == MAP_FAILED)) {
2534
2535 return NXT_ERROR;
2536 }
2537
2538 port->queue = mem;
2539
2540 return NXT_OK;
2541 }
2542
2543
2544 static const nxt_lvlhsh_proto_t nxt_router_apps_hash_proto nxt_aligned(64) = {
2545 NXT_LVLHSH_DEFAULT,
2546 nxt_router_apps_hash_test,
2547 nxt_mp_lvlhsh_alloc,
2548 nxt_mp_lvlhsh_free,
2549 };
2550
2551
2552 static nxt_int_t
nxt_router_apps_hash_test(nxt_lvlhsh_query_t * lhq,void * data)2553 nxt_router_apps_hash_test(nxt_lvlhsh_query_t *lhq, void *data)
2554 {
2555 nxt_app_t *app;
2556
2557 app = data;
2558
2559 return nxt_strstr_eq(&lhq->key, &app->name) ? NXT_OK : NXT_DECLINED;
2560 }
2561
2562
2563 static nxt_int_t
nxt_router_apps_hash_add(nxt_router_conf_t * rtcf,nxt_app_t * app)2564 nxt_router_apps_hash_add(nxt_router_conf_t *rtcf, nxt_app_t *app)
2565 {
2566 nxt_lvlhsh_query_t lhq;
2567
2568 lhq.key_hash = nxt_djb_hash(app->name.start, app->name.length);
2569 lhq.replace = 0;
2570 lhq.key = app->name;
2571 lhq.value = app;
2572 lhq.proto = &nxt_router_apps_hash_proto;
2573 lhq.pool = rtcf->mem_pool;
2574
2575 switch (nxt_lvlhsh_insert(&rtcf->apps_hash, &lhq)) {
2576
2577 case NXT_OK:
2578 return NXT_OK;
2579
2580 case NXT_DECLINED:
2581 nxt_thread_log_alert("router app hash adding failed: "
2582 "\"%V\" is already in hash", &lhq.key);
2583 /* Fall through. */
2584 default:
2585 return NXT_ERROR;
2586 }
2587 }
2588
2589
2590 static nxt_app_t *
nxt_router_apps_hash_get(nxt_router_conf_t * rtcf,nxt_str_t * name)2591 nxt_router_apps_hash_get(nxt_router_conf_t *rtcf, nxt_str_t *name)
2592 {
2593 nxt_lvlhsh_query_t lhq;
2594
2595 lhq.key_hash = nxt_djb_hash(name->start, name->length);
2596 lhq.key = *name;
2597 lhq.proto = &nxt_router_apps_hash_proto;
2598
2599 if (nxt_lvlhsh_find(&rtcf->apps_hash, &lhq) != NXT_OK) {
2600 return NULL;
2601 }
2602
2603 return lhq.value;
2604 }
2605
2606
2607 static void
nxt_router_apps_hash_use(nxt_task_t * task,nxt_router_conf_t * rtcf,int i)2608 nxt_router_apps_hash_use(nxt_task_t *task, nxt_router_conf_t *rtcf, int i)
2609 {
2610 nxt_app_t *app;
2611 nxt_lvlhsh_each_t lhe;
2612
2613 nxt_lvlhsh_each_init(&lhe, &nxt_router_apps_hash_proto);
2614
2615 for ( ;; ) {
2616 app = nxt_lvlhsh_each(&rtcf->apps_hash, &lhe);
2617
2618 if (app == NULL) {
2619 break;
2620 }
2621
2622 nxt_router_app_use(task, app, i);
2623 }
2624 }
2625
2626
2627 typedef struct {
2628 nxt_app_t *app;
2629 nxt_int_t target;
2630 } nxt_http_app_conf_t;
2631
2632
2633 nxt_int_t
nxt_router_application_init(nxt_router_conf_t * rtcf,nxt_str_t * name,nxt_str_t * target,nxt_http_action_t * action)2634 nxt_router_application_init(nxt_router_conf_t *rtcf, nxt_str_t *name,
2635 nxt_str_t *target, nxt_http_action_t *action)
2636 {
2637 nxt_app_t *app;
2638 nxt_str_t *targets;
2639 nxt_uint_t i;
2640 nxt_http_app_conf_t *conf;
2641
2642 app = nxt_router_apps_hash_get(rtcf, name);
2643 if (app == NULL) {
2644 return NXT_DECLINED;
2645 }
2646
2647 conf = nxt_mp_get(rtcf->mem_pool, sizeof(nxt_http_app_conf_t));
2648 if (nxt_slow_path(conf == NULL)) {
2649 return NXT_ERROR;
2650 }
2651
2652 action->handler = nxt_http_application_handler;
2653 action->u.conf = conf;
2654
2655 conf->app = app;
2656
2657 if (target != NULL && target->length != 0) {
2658 targets = app->targets;
2659
2660 for (i = 0; !nxt_strstr_eq(target, &targets[i]); i++);
2661
2662 conf->target = i;
2663
2664 } else {
2665 conf->target = 0;
2666 }
2667
2668 return NXT_OK;
2669 }
2670
2671
2672 static nxt_socket_conf_t *
nxt_router_socket_conf(nxt_task_t * task,nxt_router_temp_conf_t * tmcf,nxt_str_t * name)2673 nxt_router_socket_conf(nxt_task_t *task, nxt_router_temp_conf_t *tmcf,
2674 nxt_str_t *name)
2675 {
2676 size_t size;
2677 nxt_int_t ret;
2678 nxt_bool_t wildcard;
2679 nxt_sockaddr_t *sa;
2680 nxt_socket_conf_t *skcf;
2681 nxt_listen_socket_t *ls;
2682
2683 sa = nxt_sockaddr_parse(tmcf->mem_pool, name);
2684 if (nxt_slow_path(sa == NULL)) {
2685 nxt_alert(task, "invalid listener \"%V\"", name);
2686 return NULL;
2687 }
2688
2689 sa->type = SOCK_STREAM;
2690
2691 nxt_debug(task, "router listener: \"%*s\"",
2692 (size_t) sa->length, nxt_sockaddr_start(sa));
2693
2694 skcf = nxt_mp_zget(tmcf->router_conf->mem_pool, sizeof(nxt_socket_conf_t));
2695 if (nxt_slow_path(skcf == NULL)) {
2696 return NULL;
2697 }
2698
2699 size = nxt_sockaddr_size(sa);
2700
2701 ret = nxt_router_listen_socket_find(tmcf, skcf, sa);
2702
2703 if (ret != NXT_OK) {
2704
2705 ls = nxt_zalloc(sizeof(nxt_listen_socket_t) + size);
2706 if (nxt_slow_path(ls == NULL)) {
2707 return NULL;
2708 }
2709
2710 skcf->listen = ls;
2711
2712 ls->sockaddr = nxt_pointer_to(ls, sizeof(nxt_listen_socket_t));
2713 nxt_memcpy(ls->sockaddr, sa, size);
2714
2715 nxt_listen_socket_remote_size(ls);
2716
2717 ls->socket = -1;
2718 ls->backlog = NXT_LISTEN_BACKLOG;
2719 ls->flags = NXT_NONBLOCK;
2720 ls->read_after_accept = 1;
2721 }
2722
2723 switch (sa->u.sockaddr.sa_family) {
2724 #if (NXT_HAVE_UNIX_DOMAIN)
2725 case AF_UNIX:
2726 wildcard = 0;
2727 break;
2728 #endif
2729 #if (NXT_INET6)
2730 case AF_INET6:
2731 wildcard = IN6_IS_ADDR_UNSPECIFIED(&sa->u.sockaddr_in6.sin6_addr);
2732 break;
2733 #endif
2734 case AF_INET:
2735 default:
2736 wildcard = (sa->u.sockaddr_in.sin_addr.s_addr == INADDR_ANY);
2737 break;
2738 }
2739
2740 if (!wildcard) {
2741 skcf->sockaddr = nxt_mp_zget(tmcf->router_conf->mem_pool, size);
2742 if (nxt_slow_path(skcf->sockaddr == NULL)) {
2743 return NULL;
2744 }
2745
2746 nxt_memcpy(skcf->sockaddr, sa, size);
2747 }
2748
2749 return skcf;
2750 }
2751
2752
2753 static nxt_int_t
nxt_router_listen_socket_find(nxt_router_temp_conf_t * tmcf,nxt_socket_conf_t * nskcf,nxt_sockaddr_t * sa)2754 nxt_router_listen_socket_find(nxt_router_temp_conf_t *tmcf,
2755 nxt_socket_conf_t *nskcf, nxt_sockaddr_t *sa)
2756 {
2757 nxt_router_t *router;
2758 nxt_queue_link_t *qlk;
2759 nxt_socket_conf_t *skcf;
2760
2761 router = tmcf->router_conf->router;
2762
2763 for (qlk = nxt_queue_first(&router->sockets);
2764 qlk != nxt_queue_tail(&router->sockets);
2765 qlk = nxt_queue_next(qlk))
2766 {
2767 skcf = nxt_queue_link_data(qlk, nxt_socket_conf_t, link);
2768
2769 if (nxt_sockaddr_cmp(skcf->listen->sockaddr, sa)) {
2770 nskcf->listen = skcf->listen;
2771
2772 nxt_queue_remove(qlk);
2773 nxt_queue_insert_tail(&keeping_sockets, qlk);
2774
2775 nxt_queue_insert_tail(&updating_sockets, &nskcf->link);
2776
2777 return NXT_OK;
2778 }
2779 }
2780
2781 nxt_queue_insert_tail(&pending_sockets, &nskcf->link);
2782
2783 return NXT_DECLINED;
2784 }
2785
2786
2787 static void
nxt_router_listen_socket_rpc_create(nxt_task_t * task,nxt_router_temp_conf_t * tmcf,nxt_socket_conf_t * skcf)2788 nxt_router_listen_socket_rpc_create(nxt_task_t *task,
2789 nxt_router_temp_conf_t *tmcf, nxt_socket_conf_t *skcf)
2790 {
2791 size_t size;
2792 uint32_t stream;
2793 nxt_int_t ret;
2794 nxt_buf_t *b;
2795 nxt_port_t *main_port, *router_port;
2796 nxt_runtime_t *rt;
2797 nxt_socket_rpc_t *rpc;
2798
2799 rpc = nxt_mp_alloc(tmcf->mem_pool, sizeof(nxt_socket_rpc_t));
2800 if (rpc == NULL) {
2801 goto fail;
2802 }
2803
2804 rpc->socket_conf = skcf;
2805 rpc->temp_conf = tmcf;
2806
2807 size = nxt_sockaddr_size(skcf->listen->sockaddr);
2808
2809 b = nxt_buf_mem_alloc(tmcf->mem_pool, size, 0);
2810 if (b == NULL) {
2811 goto fail;
2812 }
2813
2814 b->completion_handler = nxt_buf_dummy_completion;
2815
2816 b->mem.free = nxt_cpymem(b->mem.free, skcf->listen->sockaddr, size);
2817
2818 rt = task->thread->runtime;
2819 main_port = rt->port_by_type[NXT_PROCESS_MAIN];
2820 router_port = rt->port_by_type[NXT_PROCESS_ROUTER];
2821
2822 stream = nxt_port_rpc_register_handler(task, router_port,
2823 nxt_router_listen_socket_ready,
2824 nxt_router_listen_socket_error,
2825 main_port->pid, rpc);
2826 if (nxt_slow_path(stream == 0)) {
2827 goto fail;
2828 }
2829
2830 ret = nxt_port_socket_write(task, main_port, NXT_PORT_MSG_SOCKET, -1,
2831 stream, router_port->id, b);
2832
2833 if (nxt_slow_path(ret != NXT_OK)) {
2834 nxt_port_rpc_cancel(task, router_port, stream);
2835 goto fail;
2836 }
2837
2838 return;
2839
2840 fail:
2841
2842 nxt_router_conf_error(task, tmcf);
2843 }
2844
2845
2846 static void
nxt_router_listen_socket_ready(nxt_task_t * task,nxt_port_recv_msg_t * msg,void * data)2847 nxt_router_listen_socket_ready(nxt_task_t *task, nxt_port_recv_msg_t *msg,
2848 void *data)
2849 {
2850 nxt_int_t ret;
2851 nxt_socket_t s;
2852 nxt_socket_rpc_t *rpc;
2853
2854 rpc = data;
2855
2856 s = msg->fd[0];
2857
2858 ret = nxt_socket_nonblocking(task, s);
2859 if (nxt_slow_path(ret != NXT_OK)) {
2860 goto fail;
2861 }
2862
2863 nxt_socket_defer_accept(task, s, rpc->socket_conf->listen->sockaddr);
2864
2865 ret = nxt_listen_socket(task, s, NXT_LISTEN_BACKLOG);
2866 if (nxt_slow_path(ret != NXT_OK)) {
2867 goto fail;
2868 }
2869
2870 rpc->socket_conf->listen->socket = s;
2871
2872 nxt_work_queue_add(&task->thread->engine->fast_work_queue,
2873 nxt_router_conf_apply, task, rpc->temp_conf, NULL);
2874
2875 return;
2876
2877 fail:
2878
2879 nxt_socket_close(task, s);
2880
2881 nxt_router_conf_error(task, rpc->temp_conf);
2882 }
2883
2884
2885 static void
nxt_router_listen_socket_error(nxt_task_t * task,nxt_port_recv_msg_t * msg,void * data)2886 nxt_router_listen_socket_error(nxt_task_t *task, nxt_port_recv_msg_t *msg,
2887 void *data)
2888 {
2889 nxt_socket_rpc_t *rpc;
2890 nxt_router_temp_conf_t *tmcf;
2891
2892 rpc = data;
2893 tmcf = rpc->temp_conf;
2894
2895 #if 0
2896 u_char *p;
2897 size_t size;
2898 uint8_t error;
2899 nxt_buf_t *in, *out;
2900 nxt_sockaddr_t *sa;
2901
2902 static nxt_str_t socket_errors[] = {
2903 nxt_string("ListenerSystem"),
2904 nxt_string("ListenerNoIPv6"),
2905 nxt_string("ListenerPort"),
2906 nxt_string("ListenerInUse"),
2907 nxt_string("ListenerNoAddress"),
2908 nxt_string("ListenerNoAccess"),
2909 nxt_string("ListenerPath"),
2910 };
2911
2912 sa = rpc->socket_conf->listen->sockaddr;
2913
2914 in = nxt_buf_chk_make_plain(tmcf->mem_pool, msg->buf, msg->size);
2915
2916 if (nxt_slow_path(in == NULL)) {
2917 return;
2918 }
2919
2920 p = in->mem.pos;
2921
2922 error = *p++;
2923
2924 size = nxt_length("listen socket error: ")
2925 + nxt_length("{listener: \"\", code:\"\", message: \"\"}")
2926 + sa->length + socket_errors[error].length + (in->mem.free - p);
2927
2928 out = nxt_buf_mem_alloc(tmcf->mem_pool, size, 0);
2929 if (nxt_slow_path(out == NULL)) {
2930 return;
2931 }
2932
2933 out->mem.free = nxt_sprintf(out->mem.free, out->mem.end,
2934 "listen socket error: "
2935 "{listener: \"%*s\", code:\"%V\", message: \"%*s\"}",
2936 (size_t) sa->length, nxt_sockaddr_start(sa),
2937 &socket_errors[error], in->mem.free - p, p);
2938
2939 nxt_debug(task, "%*s", out->mem.free - out->mem.pos, out->mem.pos);
2940 #endif
2941
2942 nxt_router_conf_error(task, tmcf);
2943 }
2944
2945
2946 #if (NXT_TLS)
2947
2948 static void
nxt_router_tls_rpc_handler(nxt_task_t * task,nxt_port_recv_msg_t * msg,void * data)2949 nxt_router_tls_rpc_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg,
2950 void *data)
2951 {
2952 nxt_mp_t *mp;
2953 nxt_int_t ret;
2954 nxt_tls_conf_t *tlscf;
2955 nxt_router_tlssock_t *tls;
2956 nxt_tls_bundle_conf_t *bundle;
2957 nxt_router_temp_conf_t *tmcf;
2958
2959 nxt_debug(task, "tls rpc handler");
2960
2961 tls = data;
2962 tmcf = tls->temp_conf;
2963
2964 if (msg == NULL || msg->port_msg.type == _NXT_PORT_MSG_RPC_ERROR) {
2965 goto fail;
2966 }
2967
2968 mp = tmcf->router_conf->mem_pool;
2969
2970 if (tls->socket_conf->tls == NULL) {
2971 tlscf = nxt_mp_zget(mp, sizeof(nxt_tls_conf_t));
2972 if (nxt_slow_path(tlscf == NULL)) {
2973 goto fail;
2974 }
2975
2976 tlscf->no_wait_shutdown = 1;
2977 tls->socket_conf->tls = tlscf;
2978
2979 } else {
2980 tlscf = tls->socket_conf->tls;
2981 }
2982
2983 tls->tls_init->conf = tlscf;
2984
2985 bundle = nxt_mp_get(mp, sizeof(nxt_tls_bundle_conf_t));
2986 if (nxt_slow_path(bundle == NULL)) {
2987 goto fail;
2988 }
2989
2990 if (nxt_slow_path(nxt_str_dup(mp, &bundle->name, &tls->name) == NULL)) {
2991 goto fail;
2992 }
2993
2994 bundle->chain_file = msg->fd[0];
2995 bundle->next = tlscf->bundle;
2996 tlscf->bundle = bundle;
2997
2998 ret = task->thread->runtime->tls->server_init(task, mp, tls->tls_init,
2999 tls->last);
3000 if (nxt_slow_path(ret != NXT_OK)) {
3001 goto fail;
3002 }
3003
3004 nxt_work_queue_add(&task->thread->engine->fast_work_queue,
3005 nxt_router_conf_apply, task, tmcf, NULL);
3006 return;
3007
3008 fail:
3009
3010 nxt_router_conf_error(task, tmcf);
3011 }
3012
3013 #endif
3014
3015
3016 static void
nxt_router_app_rpc_create(nxt_task_t * task,nxt_router_temp_conf_t * tmcf,nxt_app_t * app)3017 nxt_router_app_rpc_create(nxt_task_t *task,
3018 nxt_router_temp_conf_t *tmcf, nxt_app_t *app)
3019 {
3020 size_t size;
3021 uint32_t stream;
3022 nxt_fd_t port_fd, queue_fd;
3023 nxt_int_t ret;
3024 nxt_buf_t *b;
3025 nxt_port_t *router_port, *dport;
3026 nxt_runtime_t *rt;
3027 nxt_app_rpc_t *rpc;
3028
3029 rt = task->thread->runtime;
3030
3031 dport = app->proto_port;
3032
3033 if (dport == NULL) {
3034 nxt_debug(task, "app '%V' prototype prefork", &app->name);
3035
3036 size = app->name.length + 1 + app->conf.length;
3037
3038 b = nxt_buf_mem_alloc(tmcf->mem_pool, size, 0);
3039 if (nxt_slow_path(b == NULL)) {
3040 goto fail;
3041 }
3042
3043 b->completion_handler = nxt_buf_dummy_completion;
3044
3045 nxt_buf_cpystr(b, &app->name);
3046 *b->mem.free++ = '\0';
3047 nxt_buf_cpystr(b, &app->conf);
3048
3049 dport = rt->port_by_type[NXT_PROCESS_MAIN];
3050
3051 port_fd = app->shared_port->pair[0];
3052 queue_fd = app->shared_port->queue_fd;
3053
3054 } else {
3055 nxt_debug(task, "app '%V' prefork", &app->name);
3056
3057 b = NULL;
3058 port_fd = -1;
3059 queue_fd = -1;
3060 }
3061
3062 router_port = rt->port_by_type[NXT_PROCESS_ROUTER];
3063
3064 rpc = nxt_port_rpc_register_handler_ex(task, router_port,
3065 nxt_router_app_prefork_ready,
3066 nxt_router_app_prefork_error,
3067 sizeof(nxt_app_rpc_t));
3068 if (nxt_slow_path(rpc == NULL)) {
3069 goto fail;
3070 }
3071
3072 rpc->app = app;
3073 rpc->temp_conf = tmcf;
3074 rpc->proto = (b != NULL);
3075
3076 stream = nxt_port_rpc_ex_stream(rpc);
3077
3078 ret = nxt_port_socket_write2(task, dport, NXT_PORT_MSG_START_PROCESS,
3079 port_fd, queue_fd, stream, router_port->id, b);
3080 if (nxt_slow_path(ret != NXT_OK)) {
3081 nxt_port_rpc_cancel(task, router_port, stream);
3082 goto fail;
3083 }
3084
3085 if (b == NULL) {
3086 nxt_port_rpc_ex_set_peer(task, router_port, rpc, dport->pid);
3087
3088 app->pending_processes++;
3089 }
3090
3091 return;
3092
3093 fail:
3094
3095 nxt_router_conf_error(task, tmcf);
3096 }
3097
3098
3099 static void
nxt_router_app_prefork_ready(nxt_task_t * task,nxt_port_recv_msg_t * msg,void * data)3100 nxt_router_app_prefork_ready(nxt_task_t *task, nxt_port_recv_msg_t *msg,
3101 void *data)
3102 {
3103 nxt_app_t *app;
3104 nxt_port_t *port;
3105 nxt_app_rpc_t *rpc;
3106 nxt_event_engine_t *engine;
3107
3108 rpc = data;
3109 app = rpc->app;
3110
3111 port = msg->u.new_port;
3112
3113 nxt_assert(port != NULL);
3114 nxt_assert(port->id == 0);
3115
3116 if (rpc->proto) {
3117 nxt_assert(app->proto_port == NULL);
3118 nxt_assert(port->type == NXT_PROCESS_PROTOTYPE);
3119
3120 nxt_port_inc_use(port);
3121
3122 app->proto_port = port;
3123 port->app = app;
3124
3125 nxt_router_app_rpc_create(task, rpc->temp_conf, app);
3126
3127 return;
3128 }
3129
3130 nxt_assert(port->type == NXT_PROCESS_APP);
3131
3132 port->app = app;
3133 port->main_app_port = port;
3134
3135 app->pending_processes--;
3136 app->processes++;
3137 app->idle_processes++;
3138
3139 engine = task->thread->engine;
3140
3141 nxt_queue_insert_tail(&app->ports, &port->app_link);
3142 nxt_queue_insert_tail(&app->spare_ports, &port->idle_link);
3143
3144 nxt_debug(task, "app '%V' move new port %PI:%d to spare_ports",
3145 &app->name, port->pid, port->id);
3146
3147 nxt_port_hash_add(&app->port_hash, port);
3148 app->port_hash_count++;
3149
3150 port->idle_start = 0;
3151
3152 nxt_port_inc_use(port);
3153
3154 nxt_port_socket_write(task, port, NXT_PORT_MSG_PORT_ACK, -1, 0, 0, NULL);
3155
3156 nxt_work_queue_add(&engine->fast_work_queue,
3157 nxt_router_conf_apply, task, rpc->temp_conf, NULL);
3158 }
3159
3160
3161 static void
nxt_router_app_prefork_error(nxt_task_t * task,nxt_port_recv_msg_t * msg,void * data)3162 nxt_router_app_prefork_error(nxt_task_t *task, nxt_port_recv_msg_t *msg,
3163 void *data)
3164 {
3165 nxt_app_t *app;
3166 nxt_app_rpc_t *rpc;
3167 nxt_router_temp_conf_t *tmcf;
3168
3169 rpc = data;
3170 app = rpc->app;
3171 tmcf = rpc->temp_conf;
3172
3173 if (rpc->proto) {
3174 nxt_log(task, NXT_LOG_WARN, "failed to start prototype \"%V\"",
3175 &app->name);
3176
3177 } else {
3178 nxt_log(task, NXT_LOG_WARN, "failed to start application \"%V\"",
3179 &app->name);
3180
3181 app->pending_processes--;
3182 }
3183
3184 nxt_router_conf_error(task, tmcf);
3185 }
3186
3187
3188 static nxt_int_t
nxt_router_engines_create(nxt_task_t * task,nxt_router_t * router,nxt_router_temp_conf_t * tmcf,const nxt_event_interface_t * interface)3189 nxt_router_engines_create(nxt_task_t *task, nxt_router_t *router,
3190 nxt_router_temp_conf_t *tmcf, const nxt_event_interface_t *interface)
3191 {
3192 nxt_int_t ret;
3193 nxt_uint_t n, threads;
3194 nxt_queue_link_t *qlk;
3195 nxt_router_engine_conf_t *recf;
3196
3197 threads = tmcf->router_conf->threads;
3198
3199 tmcf->engines = nxt_array_create(tmcf->mem_pool, threads,
3200 sizeof(nxt_router_engine_conf_t));
3201 if (nxt_slow_path(tmcf->engines == NULL)) {
3202 return NXT_ERROR;
3203 }
3204
3205 n = 0;
3206
3207 for (qlk = nxt_queue_first(&router->engines);
3208 qlk != nxt_queue_tail(&router->engines);
3209 qlk = nxt_queue_next(qlk))
3210 {
3211 recf = nxt_array_zero_add(tmcf->engines);
3212 if (nxt_slow_path(recf == NULL)) {
3213 return NXT_ERROR;
3214 }
3215
3216 recf->engine = nxt_queue_link_data(qlk, nxt_event_engine_t, link0);
3217
3218 if (n < threads) {
3219 recf->action = NXT_ROUTER_ENGINE_KEEP;
3220 ret = nxt_router_engine_conf_update(tmcf, recf);
3221
3222 } else {
3223 recf->action = NXT_ROUTER_ENGINE_DELETE;
3224 ret = nxt_router_engine_conf_delete(tmcf, recf);
3225 }
3226
3227 if (nxt_slow_path(ret != NXT_OK)) {
3228 return ret;
3229 }
3230
3231 n++;
3232 }
3233
3234 tmcf->new_threads = n;
3235
3236 while (n < threads) {
3237 recf = nxt_array_zero_add(tmcf->engines);
3238 if (nxt_slow_path(recf == NULL)) {
3239 return NXT_ERROR;
3240 }
3241
3242 recf->action = NXT_ROUTER_ENGINE_ADD;
3243
3244 recf->engine = nxt_event_engine_create(task, interface, NULL, 0, 0);
3245 if (nxt_slow_path(recf->engine == NULL)) {
3246 return NXT_ERROR;
3247 }
3248
3249 ret = nxt_router_engine_conf_create(tmcf, recf);
3250 if (nxt_slow_path(ret != NXT_OK)) {
3251 return ret;
3252 }
3253
3254 n++;
3255 }
3256
3257 return NXT_OK;
3258 }
3259
3260
3261 static nxt_int_t
nxt_router_engine_conf_create(nxt_router_temp_conf_t * tmcf,nxt_router_engine_conf_t * recf)3262 nxt_router_engine_conf_create(nxt_router_temp_conf_t *tmcf,
3263 nxt_router_engine_conf_t *recf)
3264 {
3265 nxt_int_t ret;
3266
3267 ret = nxt_router_engine_joints_create(tmcf, recf, &creating_sockets,
3268 nxt_router_listen_socket_create);
3269 if (nxt_slow_path(ret != NXT_OK)) {
3270 return ret;
3271 }
3272
3273 ret = nxt_router_engine_joints_create(tmcf, recf, &updating_sockets,
3274 nxt_router_listen_socket_create);
3275 if (nxt_slow_path(ret != NXT_OK)) {
3276 return ret;
3277 }
3278
3279 return ret;
3280 }
3281
3282
3283 static nxt_int_t
nxt_router_engine_conf_update(nxt_router_temp_conf_t * tmcf,nxt_router_engine_conf_t * recf)3284 nxt_router_engine_conf_update(nxt_router_temp_conf_t *tmcf,
3285 nxt_router_engine_conf_t *recf)
3286 {
3287 nxt_int_t ret;
3288
3289 ret = nxt_router_engine_joints_create(tmcf, recf, &creating_sockets,
3290 nxt_router_listen_socket_create);
3291 if (nxt_slow_path(ret != NXT_OK)) {
3292 return ret;
3293 }
3294
3295 ret = nxt_router_engine_joints_create(tmcf, recf, &updating_sockets,
3296 nxt_router_listen_socket_update);
3297 if (nxt_slow_path(ret != NXT_OK)) {
3298 return ret;
3299 }
3300
3301 ret = nxt_router_engine_joints_delete(tmcf, recf, &deleting_sockets);
3302 if (nxt_slow_path(ret != NXT_OK)) {
3303 return ret;
3304 }
3305
3306 return ret;
3307 }
3308
3309
3310 static nxt_int_t
nxt_router_engine_conf_delete(nxt_router_temp_conf_t * tmcf,nxt_router_engine_conf_t * recf)3311 nxt_router_engine_conf_delete(nxt_router_temp_conf_t *tmcf,
3312 nxt_router_engine_conf_t *recf)
3313 {
3314 nxt_int_t ret;
3315
3316 ret = nxt_router_engine_quit(tmcf, recf);
3317 if (nxt_slow_path(ret != NXT_OK)) {
3318 return ret;
3319 }
3320
3321 ret = nxt_router_engine_joints_delete(tmcf, recf, &updating_sockets);
3322 if (nxt_slow_path(ret != NXT_OK)) {
3323 return ret;
3324 }
3325
3326 return nxt_router_engine_joints_delete(tmcf, recf, &deleting_sockets);
3327 }
3328
3329
3330 static nxt_int_t
nxt_router_engine_joints_create(nxt_router_temp_conf_t * tmcf,nxt_router_engine_conf_t * recf,nxt_queue_t * sockets,nxt_work_handler_t handler)3331 nxt_router_engine_joints_create(nxt_router_temp_conf_t *tmcf,
3332 nxt_router_engine_conf_t *recf, nxt_queue_t *sockets,
3333 nxt_work_handler_t handler)
3334 {
3335 nxt_int_t ret;
3336 nxt_joint_job_t *job;
3337 nxt_queue_link_t *qlk;
3338 nxt_socket_conf_t *skcf;
3339 nxt_socket_conf_joint_t *joint;
3340
3341 for (qlk = nxt_queue_first(sockets);
3342 qlk != nxt_queue_tail(sockets);
3343 qlk = nxt_queue_next(qlk))
3344 {
3345 job = nxt_mp_get(tmcf->mem_pool, sizeof(nxt_joint_job_t));
3346 if (nxt_slow_path(job == NULL)) {
3347 return NXT_ERROR;
3348 }
3349
3350 job->work.next = recf->jobs;
3351 recf->jobs = &job->work;
3352
3353 job->task = tmcf->engine->task;
3354 job->work.handler = handler;
3355 job->work.task = &job->task;
3356 job->work.obj = job;
3357 job->tmcf = tmcf;
3358
3359 tmcf->count++;
3360
3361 joint = nxt_mp_alloc(tmcf->router_conf->mem_pool,
3362 sizeof(nxt_socket_conf_joint_t));
3363 if (nxt_slow_path(joint == NULL)) {
3364 return NXT_ERROR;
3365 }
3366
3367 job->work.data = joint;
3368
3369 ret = nxt_upstreams_joint_create(tmcf, &joint->upstreams);
3370 if (nxt_slow_path(ret != NXT_OK)) {
3371 return ret;
3372 }
3373
3374 joint->count = 1;
3375
3376 skcf = nxt_queue_link_data(qlk, nxt_socket_conf_t, link);
3377 skcf->count++;
3378 joint->socket_conf = skcf;
3379
3380 joint->engine = recf->engine;
3381 }
3382
3383 return NXT_OK;
3384 }
3385
3386
3387 static nxt_int_t
nxt_router_engine_quit(nxt_router_temp_conf_t * tmcf,nxt_router_engine_conf_t * recf)3388 nxt_router_engine_quit(nxt_router_temp_conf_t *tmcf,
3389 nxt_router_engine_conf_t *recf)
3390 {
3391 nxt_joint_job_t *job;
3392
3393 job = nxt_mp_get(tmcf->mem_pool, sizeof(nxt_joint_job_t));
3394 if (nxt_slow_path(job == NULL)) {
3395 return NXT_ERROR;
3396 }
3397
3398 job->work.next = recf->jobs;
3399 recf->jobs = &job->work;
3400
3401 job->task = tmcf->engine->task;
3402 job->work.handler = nxt_router_worker_thread_quit;
3403 job->work.task = &job->task;
3404 job->work.obj = NULL;
3405 job->work.data = NULL;
3406 job->tmcf = NULL;
3407
3408 return NXT_OK;
3409 }
3410
3411
3412 static nxt_int_t
nxt_router_engine_joints_delete(nxt_router_temp_conf_t * tmcf,nxt_router_engine_conf_t * recf,nxt_queue_t * sockets)3413 nxt_router_engine_joints_delete(nxt_router_temp_conf_t *tmcf,
3414 nxt_router_engine_conf_t *recf, nxt_queue_t *sockets)
3415 {
3416 nxt_joint_job_t *job;
3417 nxt_queue_link_t *qlk;
3418
3419 for (qlk = nxt_queue_first(sockets);
3420 qlk != nxt_queue_tail(sockets);
3421 qlk = nxt_queue_next(qlk))
3422 {
3423 job = nxt_mp_get(tmcf->mem_pool, sizeof(nxt_joint_job_t));
3424 if (nxt_slow_path(job == NULL)) {
3425 return NXT_ERROR;
3426 }
3427
3428 job->work.next = recf->jobs;
3429 recf->jobs = &job->work;
3430
3431 job->task = tmcf->engine->task;
3432 job->work.handler = nxt_router_listen_socket_delete;
3433 job->work.task = &job->task;
3434 job->work.obj = job;
3435 job->work.data = nxt_queue_link_data(qlk, nxt_socket_conf_t, link);
3436 job->tmcf = tmcf;
3437
3438 tmcf->count++;
3439 }
3440
3441 return NXT_OK;
3442 }
3443
3444
3445 static nxt_int_t
nxt_router_threads_create(nxt_task_t * task,nxt_runtime_t * rt,nxt_router_temp_conf_t * tmcf)3446 nxt_router_threads_create(nxt_task_t *task, nxt_runtime_t *rt,
3447 nxt_router_temp_conf_t *tmcf)
3448 {
3449 nxt_int_t ret;
3450 nxt_uint_t i, threads;
3451 nxt_router_engine_conf_t *recf;
3452
3453 recf = tmcf->engines->elts;
3454 threads = tmcf->router_conf->threads;
3455
3456 for (i = tmcf->new_threads; i < threads; i++) {
3457 ret = nxt_router_thread_create(task, rt, recf[i].engine);
3458 if (nxt_slow_path(ret != NXT_OK)) {
3459 return ret;
3460 }
3461 }
3462
3463 return NXT_OK;
3464 }
3465
3466
3467 static nxt_int_t
nxt_router_thread_create(nxt_task_t * task,nxt_runtime_t * rt,nxt_event_engine_t * engine)3468 nxt_router_thread_create(nxt_task_t *task, nxt_runtime_t *rt,
3469 nxt_event_engine_t *engine)
3470 {
3471 nxt_int_t ret;
3472 nxt_thread_link_t *link;
3473 nxt_thread_handle_t handle;
3474
3475 link = nxt_zalloc(sizeof(nxt_thread_link_t));
3476
3477 if (nxt_slow_path(link == NULL)) {
3478 return NXT_ERROR;
3479 }
3480
3481 link->start = nxt_router_thread_start;
3482 link->engine = engine;
3483 link->work.handler = nxt_router_thread_exit_handler;
3484 link->work.task = task;
3485 link->work.data = link;
3486
3487 nxt_queue_insert_tail(&rt->engines, &engine->link);
3488
3489 ret = nxt_thread_create(&handle, link);
3490
3491 if (nxt_slow_path(ret != NXT_OK)) {
3492 nxt_queue_remove(&engine->link);
3493 }
3494
3495 return ret;
3496 }
3497
3498
3499 static void
nxt_router_apps_sort(nxt_task_t * task,nxt_router_t * router,nxt_router_temp_conf_t * tmcf)3500 nxt_router_apps_sort(nxt_task_t *task, nxt_router_t *router,
3501 nxt_router_temp_conf_t *tmcf)
3502 {
3503 nxt_app_t *app;
3504
3505 nxt_queue_each(app, &router->apps, nxt_app_t, link) {
3506
3507 nxt_router_app_unlink(task, app);
3508
3509 } nxt_queue_loop;
3510
3511 nxt_queue_add(&router->apps, &tmcf->previous);
3512 nxt_queue_add(&router->apps, &tmcf->apps);
3513 }
3514
3515
3516 static void
nxt_router_engines_post(nxt_router_t * router,nxt_router_temp_conf_t * tmcf)3517 nxt_router_engines_post(nxt_router_t *router, nxt_router_temp_conf_t *tmcf)
3518 {
3519 nxt_uint_t n;
3520 nxt_event_engine_t *engine;
3521 nxt_router_engine_conf_t *recf;
3522
3523 recf = tmcf->engines->elts;
3524
3525 for (n = tmcf->engines->nelts; n != 0; n--) {
3526 engine = recf->engine;
3527
3528 switch (recf->action) {
3529
3530 case NXT_ROUTER_ENGINE_KEEP:
3531 break;
3532
3533 case NXT_ROUTER_ENGINE_ADD:
3534 nxt_queue_insert_tail(&router->engines, &engine->link0);
3535 break;
3536
3537 case NXT_ROUTER_ENGINE_DELETE:
3538 nxt_queue_remove(&engine->link0);
3539 break;
3540 }
3541
3542 nxt_router_engine_post(engine, recf->jobs);
3543
3544 recf++;
3545 }
3546 }
3547
3548
3549 static void
nxt_router_engine_post(nxt_event_engine_t * engine,nxt_work_t * jobs)3550 nxt_router_engine_post(nxt_event_engine_t *engine, nxt_work_t *jobs)
3551 {
3552 nxt_work_t *work, *next;
3553
3554 for (work = jobs; work != NULL; work = next) {
3555 next = work->next;
3556 work->next = NULL;
3557
3558 nxt_event_engine_post(engine, work);
3559 }
3560 }
3561
3562
3563 static nxt_port_handlers_t nxt_router_app_port_handlers = {
3564 .rpc_error = nxt_port_rpc_handler,
3565 .mmap = nxt_port_mmap_handler,
3566 .data = nxt_port_rpc_handler,
3567 .oosm = nxt_router_oosm_handler,
3568 .req_headers_ack = nxt_port_rpc_handler,
3569 };
3570
3571
3572 static void
nxt_router_thread_start(void * data)3573 nxt_router_thread_start(void *data)
3574 {
3575 nxt_int_t ret;
3576 nxt_port_t *port;
3577 nxt_task_t *task;
3578 nxt_work_t *work;
3579 nxt_thread_t *thread;
3580 nxt_thread_link_t *link;
3581 nxt_event_engine_t *engine;
3582
3583 link = data;
3584 engine = link->engine;
3585 task = &engine->task;
3586
3587 thread = nxt_thread();
3588
3589 nxt_event_engine_thread_adopt(engine);
3590
3591 /* STUB */
3592 thread->runtime = engine->task.thread->runtime;
3593
3594 engine->task.thread = thread;
3595 engine->task.log = thread->log;
3596 thread->engine = engine;
3597 thread->task = &engine->task;
3598 #if 0
3599 thread->fiber = &engine->fibers->fiber;
3600 #endif
3601
3602 engine->mem_pool = nxt_mp_create(4096, 128, 1024, 64);
3603 if (nxt_slow_path(engine->mem_pool == NULL)) {
3604 return;
3605 }
3606
3607 port = nxt_port_new(task, nxt_port_get_next_id(), nxt_pid,
3608 NXT_PROCESS_ROUTER);
3609 if (nxt_slow_path(port == NULL)) {
3610 return;
3611 }
3612
3613 ret = nxt_port_socket_init(task, port, 0);
3614 if (nxt_slow_path(ret != NXT_OK)) {
3615 nxt_port_use(task, port, -1);
3616 return;
3617 }
3618
3619 ret = nxt_router_port_queue_init(task, port);
3620 if (nxt_slow_path(ret != NXT_OK)) {
3621 nxt_port_use(task, port, -1);
3622 return;
3623 }
3624
3625 engine->port = port;
3626
3627 nxt_port_enable(task, port, &nxt_router_app_port_handlers);
3628
3629 work = nxt_zalloc(sizeof(nxt_work_t));
3630 if (nxt_slow_path(work == NULL)) {
3631 return;
3632 }
3633
3634 work->handler = nxt_router_rt_add_port;
3635 work->task = link->work.task;
3636 work->obj = work;
3637 work->data = port;
3638
3639 nxt_event_engine_post(link->work.task->thread->engine, work);
3640
3641 nxt_event_engine_start(engine);
3642 }
3643
3644
3645 static void
nxt_router_rt_add_port(nxt_task_t * task,void * obj,void * data)3646 nxt_router_rt_add_port(nxt_task_t *task, void *obj, void *data)
3647 {
3648 nxt_int_t res;
3649 nxt_port_t *port;
3650 nxt_runtime_t *rt;
3651
3652 rt = task->thread->runtime;
3653 port = data;
3654
3655 nxt_free(obj);
3656
3657 res = nxt_port_hash_add(&rt->ports, port);
3658
3659 if (nxt_fast_path(res == NXT_OK)) {
3660 nxt_port_use(task, port, 1);
3661 }
3662 }
3663
3664
3665 static void
nxt_router_listen_socket_create(nxt_task_t * task,void * obj,void * data)3666 nxt_router_listen_socket_create(nxt_task_t *task, void *obj, void *data)
3667 {
3668 nxt_joint_job_t *job;
3669 nxt_socket_conf_t *skcf;
3670 nxt_listen_event_t *lev;
3671 nxt_listen_socket_t *ls;
3672 nxt_thread_spinlock_t *lock;
3673 nxt_socket_conf_joint_t *joint;
3674
3675 job = obj;
3676 joint = data;
3677
3678 nxt_queue_insert_tail(&task->thread->engine->joints, &joint->link);
3679
3680 skcf = joint->socket_conf;
3681 ls = skcf->listen;
3682
3683 lev = nxt_listen_event(task, ls);
3684 if (nxt_slow_path(lev == NULL)) {
3685 nxt_router_listen_socket_release(task, skcf);
3686 return;
3687 }
3688
3689 lev->socket.data = joint;
3690
3691 lock = &skcf->router_conf->router->lock;
3692
3693 nxt_thread_spin_lock(lock);
3694 ls->count++;
3695 nxt_thread_spin_unlock(lock);
3696
3697 job->work.next = NULL;
3698 job->work.handler = nxt_router_conf_wait;
3699
3700 nxt_event_engine_post(job->tmcf->engine, &job->work);
3701 }
3702
3703
3704 nxt_inline nxt_listen_event_t *
nxt_router_listen_event(nxt_queue_t * listen_connections,nxt_socket_conf_t * skcf)3705 nxt_router_listen_event(nxt_queue_t *listen_connections,
3706 nxt_socket_conf_t *skcf)
3707 {
3708 nxt_socket_t fd;
3709 nxt_queue_link_t *qlk;
3710 nxt_listen_event_t *lev;
3711
3712 fd = skcf->listen->socket;
3713
3714 for (qlk = nxt_queue_first(listen_connections);
3715 qlk != nxt_queue_tail(listen_connections);
3716 qlk = nxt_queue_next(qlk))
3717 {
3718 lev = nxt_queue_link_data(qlk, nxt_listen_event_t, link);
3719
3720 if (fd == lev->socket.fd) {
3721 return lev;
3722 }
3723 }
3724
3725 return NULL;
3726 }
3727
3728
3729 static void
nxt_router_listen_socket_update(nxt_task_t * task,void * obj,void * data)3730 nxt_router_listen_socket_update(nxt_task_t *task, void *obj, void *data)
3731 {
3732 nxt_joint_job_t *job;
3733 nxt_event_engine_t *engine;
3734 nxt_listen_event_t *lev;
3735 nxt_socket_conf_joint_t *joint, *old;
3736
3737 job = obj;
3738 joint = data;
3739
3740 engine = task->thread->engine;
3741
3742 nxt_queue_insert_tail(&engine->joints, &joint->link);
3743
3744 lev = nxt_router_listen_event(&engine->listen_connections,
3745 joint->socket_conf);
3746
3747 old = lev->socket.data;
3748 lev->socket.data = joint;
3749 lev->listen = joint->socket_conf->listen;
3750
3751 job->work.next = NULL;
3752 job->work.handler = nxt_router_conf_wait;
3753
3754 nxt_event_engine_post(job->tmcf->engine, &job->work);
3755
3756 /*
3757 * The task is allocated from configuration temporary
3758 * memory pool so it can be freed after engine post operation.
3759 */
3760
3761 nxt_router_conf_release(&engine->task, old);
3762 }
3763
3764
3765 static void
nxt_router_listen_socket_delete(nxt_task_t * task,void * obj,void * data)3766 nxt_router_listen_socket_delete(nxt_task_t *task, void *obj, void *data)
3767 {
3768 nxt_socket_conf_t *skcf;
3769 nxt_listen_event_t *lev;
3770 nxt_event_engine_t *engine;
3771 nxt_socket_conf_joint_t *joint;
3772
3773 skcf = data;
3774
3775 engine = task->thread->engine;
3776
3777 lev = nxt_router_listen_event(&engine->listen_connections, skcf);
3778
3779 nxt_fd_event_delete(engine, &lev->socket);
3780
3781 nxt_debug(task, "engine %p: listen socket delete: %d", engine,
3782 lev->socket.fd);
3783
3784 joint = lev->socket.data;
3785 joint->close_job = obj;
3786
3787 lev->timer.handler = nxt_router_listen_socket_close;
3788 lev->timer.work_queue = &engine->fast_work_queue;
3789
3790 nxt_timer_add(engine, &lev->timer, 0);
3791 }
3792
3793
3794 static void
nxt_router_worker_thread_quit(nxt_task_t * task,void * obj,void * data)3795 nxt_router_worker_thread_quit(nxt_task_t *task, void *obj, void *data)
3796 {
3797 nxt_event_engine_t *engine;
3798
3799 nxt_debug(task, "router worker thread quit");
3800
3801 engine = task->thread->engine;
3802
3803 engine->shutdown = 1;
3804
3805 if (nxt_queue_is_empty(&engine->joints)) {
3806 nxt_thread_exit(task->thread);
3807 }
3808 }
3809
3810
3811 static void
nxt_router_listen_socket_close(nxt_task_t * task,void * obj,void * data)3812 nxt_router_listen_socket_close(nxt_task_t *task, void *obj, void *data)
3813 {
3814 nxt_timer_t *timer;
3815 nxt_joint_job_t *job;
3816 nxt_listen_event_t *lev;
3817 nxt_socket_conf_joint_t *joint;
3818
3819 timer = obj;
3820 lev = nxt_timer_data(timer, nxt_listen_event_t, timer);
3821
3822 nxt_debug(task, "engine %p: listen socket close: %d", task->thread->engine,
3823 lev->socket.fd);
3824
3825 nxt_queue_remove(&lev->link);
3826
3827 joint = lev->socket.data;
3828 lev->socket.data = NULL;
3829
3830 /* 'task' refers to lev->task and we cannot use after nxt_free() */
3831 task = &task->thread->engine->task;
3832
3833 nxt_router_listen_socket_release(task, joint->socket_conf);
3834
3835 job = joint->close_job;
3836 job->work.next = NULL;
3837 job->work.handler = nxt_router_conf_wait;
3838
3839 nxt_event_engine_post(job->tmcf->engine, &job->work);
3840
3841 nxt_router_listen_event_release(task, lev, joint);
3842 }
3843
3844
3845 static void
nxt_router_listen_socket_release(nxt_task_t * task,nxt_socket_conf_t * skcf)3846 nxt_router_listen_socket_release(nxt_task_t *task, nxt_socket_conf_t *skcf)
3847 {
3848 #if (NXT_HAVE_UNIX_DOMAIN)
3849 size_t size;
3850 nxt_buf_t *b;
3851 nxt_port_t *main_port;
3852 nxt_runtime_t *rt;
3853 nxt_sockaddr_t *sa;
3854 #endif
3855 nxt_listen_socket_t *ls;
3856 nxt_thread_spinlock_t *lock;
3857
3858 ls = skcf->listen;
3859 lock = &skcf->router_conf->router->lock;
3860
3861 nxt_thread_spin_lock(lock);
3862
3863 nxt_debug(task, "engine %p: listen socket release: ls->count %D",
3864 task->thread->engine, ls->count);
3865
3866 if (--ls->count != 0) {
3867 ls = NULL;
3868 }
3869
3870 nxt_thread_spin_unlock(lock);
3871
3872 if (ls == NULL) {
3873 return;
3874 }
3875
3876 nxt_socket_close(task, ls->socket);
3877
3878 #if (NXT_HAVE_UNIX_DOMAIN)
3879 sa = ls->sockaddr;
3880 if (sa->u.sockaddr.sa_family != AF_UNIX
3881 || sa->u.sockaddr_un.sun_path[0] == '\0')
3882 {
3883 goto out_free_ls;
3884 }
3885
3886 size = nxt_sockaddr_size(ls->sockaddr);
3887
3888 b = nxt_buf_mem_alloc(task->thread->engine->mem_pool, size, 0);
3889 if (b == NULL) {
3890 goto out_free_ls;
3891 }
3892
3893 b->mem.free = nxt_cpymem(b->mem.free, ls->sockaddr, size);
3894
3895 rt = task->thread->runtime;
3896 main_port = rt->port_by_type[NXT_PROCESS_MAIN];
3897
3898 (void) nxt_port_socket_write(task, main_port, NXT_PORT_MSG_SOCKET_UNLINK,
3899 -1, 0, 0, b);
3900
3901 out_free_ls:
3902 #endif
3903 nxt_free(ls);
3904 }
3905
3906
3907 void
nxt_router_listen_event_release(nxt_task_t * task,nxt_listen_event_t * lev,nxt_socket_conf_joint_t * joint)3908 nxt_router_listen_event_release(nxt_task_t *task, nxt_listen_event_t *lev,
3909 nxt_socket_conf_joint_t *joint)
3910 {
3911 nxt_event_engine_t *engine;
3912
3913 nxt_debug(task, "listen event count: %D", lev->count);
3914
3915 engine = task->thread->engine;
3916
3917 if (--lev->count == 0) {
3918 if (lev->next != NULL) {
3919 nxt_sockaddr_cache_free(engine, lev->next);
3920
3921 nxt_conn_free(task, lev->next);
3922 }
3923
3924 nxt_free(lev);
3925 }
3926
3927 if (joint != NULL) {
3928 nxt_router_conf_release(task, joint);
3929 }
3930
3931 if (engine->shutdown && nxt_queue_is_empty(&engine->joints)) {
3932 nxt_thread_exit(task->thread);
3933 }
3934 }
3935
3936
3937 void
nxt_router_conf_release(nxt_task_t * task,nxt_socket_conf_joint_t * joint)3938 nxt_router_conf_release(nxt_task_t *task, nxt_socket_conf_joint_t *joint)
3939 {
3940 nxt_socket_conf_t *skcf;
3941 nxt_router_conf_t *rtcf;
3942 nxt_thread_spinlock_t *lock;
3943
3944 nxt_debug(task, "conf joint %p count: %D", joint, joint->count);
3945
3946 if (--joint->count != 0) {
3947 return;
3948 }
3949
3950 nxt_queue_remove(&joint->link);
3951
3952 /*
3953 * The joint content can not be safely used after the critical
3954 * section protected by the spinlock because its memory pool may
3955 * be already destroyed by another thread.
3956 */
3957 skcf = joint->socket_conf;
3958 rtcf = skcf->router_conf;
3959 lock = &rtcf->router->lock;
3960
3961 nxt_thread_spin_lock(lock);
3962
3963 nxt_debug(task, "conf skcf %p: %D, rtcf %p: %D", skcf, skcf->count,
3964 rtcf, rtcf->count);
3965
3966 if (--skcf->count != 0) {
3967 skcf = NULL;
3968 rtcf = NULL;
3969
3970 } else {
3971 nxt_queue_remove(&skcf->link);
3972
3973 if (--rtcf->count != 0) {
3974 rtcf = NULL;
3975 }
3976 }
3977
3978 nxt_thread_spin_unlock(lock);
3979
3980 #if (NXT_TLS)
3981 if (skcf != NULL && skcf->tls != NULL) {
3982 task->thread->runtime->tls->server_free(task, skcf->tls);
3983 }
3984 #endif
3985
3986 /* TODO remove engine->port */
3987
3988 if (rtcf != NULL) {
3989 nxt_debug(task, "old router conf is destroyed");
3990
3991 nxt_router_apps_hash_use(task, rtcf, -1);
3992
3993 nxt_router_access_log_release(task, lock, rtcf->access_log);
3994
3995 nxt_tstr_state_release(rtcf->tstr_state);
3996
3997 nxt_mp_thread_adopt(rtcf->mem_pool);
3998
3999 nxt_mp_destroy(rtcf->mem_pool);
4000 }
4001 }
4002
4003
4004 static void
nxt_router_thread_exit_handler(nxt_task_t * task,void * obj,void * data)4005 nxt_router_thread_exit_handler(nxt_task_t *task, void *obj, void *data)
4006 {
4007 nxt_port_t *port;
4008 nxt_thread_link_t *link;
4009 nxt_event_engine_t *engine;
4010 nxt_thread_handle_t handle;
4011
4012 handle = (nxt_thread_handle_t) (uintptr_t) obj;
4013 link = data;
4014
4015 nxt_thread_wait(handle);
4016
4017 engine = link->engine;
4018
4019 nxt_queue_remove(&engine->link);
4020
4021 port = engine->port;
4022
4023 // TODO notify all apps
4024
4025 port->engine = task->thread->engine;
4026 nxt_mp_thread_adopt(port->mem_pool);
4027 nxt_port_use(task, port, -1);
4028
4029 nxt_mp_thread_adopt(engine->mem_pool);
4030 nxt_mp_destroy(engine->mem_pool);
4031
4032 nxt_event_engine_free(engine);
4033
4034 nxt_free(link);
4035 }
4036
4037
4038 static void
nxt_router_response_ready_handler(nxt_task_t * task,nxt_port_recv_msg_t * msg,void * data)4039 nxt_router_response_ready_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg,
4040 void *data)
4041 {
4042 size_t b_size, count;
4043 nxt_int_t ret;
4044 nxt_app_t *app;
4045 nxt_buf_t *b, *next;
4046 nxt_port_t *app_port;
4047 nxt_unit_field_t *f;
4048 nxt_http_field_t *field;
4049 nxt_http_request_t *r;
4050 nxt_unit_response_t *resp;
4051 nxt_request_rpc_data_t *req_rpc_data;
4052
4053 req_rpc_data = data;
4054
4055 r = req_rpc_data->request;
4056 if (nxt_slow_path(r == NULL)) {
4057 return;
4058 }
4059
4060 if (r->error) {
4061 nxt_request_rpc_data_unlink(task, req_rpc_data);
4062 return;
4063 }
4064
4065 app = req_rpc_data->app;
4066 nxt_assert(app != NULL);
4067
4068 if (msg->port_msg.type == _NXT_PORT_MSG_REQ_HEADERS_ACK) {
4069 nxt_router_req_headers_ack_handler(task, msg, req_rpc_data);
4070
4071 return;
4072 }
4073
4074 b = (msg->size == 0) ? NULL : msg->buf;
4075
4076 if (msg->port_msg.last != 0) {
4077 nxt_debug(task, "router data create last buf");
4078
4079 nxt_buf_chain_add(&b, nxt_http_buf_last(r));
4080
4081 req_rpc_data->rpc_cancel = 0;
4082
4083 if (req_rpc_data->apr_action == NXT_APR_REQUEST_FAILED) {
4084 req_rpc_data->apr_action = NXT_APR_GOT_RESPONSE;
4085 }
4086
4087 nxt_request_rpc_data_unlink(task, req_rpc_data);
4088
4089 } else {
4090 if (app->timeout != 0) {
4091 r->timer.handler = nxt_router_app_timeout;
4092 r->timer_data = req_rpc_data;
4093 nxt_timer_add(task->thread->engine, &r->timer, app->timeout);
4094 }
4095 }
4096
4097 if (b == NULL) {
4098 return;
4099 }
4100
4101 if (msg->buf == b) {
4102 /* Disable instant buffer completion/re-using by port. */
4103 msg->buf = NULL;
4104 }
4105
4106 if (r->header_sent) {
4107 nxt_buf_chain_add(&r->out, b);
4108 nxt_http_request_send_body(task, r, NULL);
4109
4110 } else {
4111 b_size = nxt_buf_is_mem(b) ? nxt_buf_mem_used_size(&b->mem) : 0;
4112
4113 if (nxt_slow_path(b_size < sizeof(nxt_unit_response_t))) {
4114 nxt_alert(task, "response buffer too small: %z", b_size);
4115 goto fail;
4116 }
4117
4118 resp = (void *) b->mem.pos;
4119 count = (b_size - sizeof(nxt_unit_response_t))
4120 / sizeof(nxt_unit_field_t);
4121
4122 if (nxt_slow_path(count < resp->fields_count)) {
4123 nxt_alert(task, "response buffer too small for fields count: %D",
4124 resp->fields_count);
4125 goto fail;
4126 }
4127
4128 field = NULL;
4129
4130 for (f = resp->fields; f < resp->fields + resp->fields_count; f++) {
4131 if (f->skip) {
4132 continue;
4133 }
4134
4135 field = nxt_list_add(r->resp.fields);
4136
4137 if (nxt_slow_path(field == NULL)) {
4138 goto fail;
4139 }
4140
4141 field->hash = f->hash;
4142 field->skip = 0;
4143 field->hopbyhop = 0;
4144
4145 field->name_length = f->name_length;
4146 field->value_length = f->value_length;
4147 field->name = nxt_unit_sptr_get(&f->name);
4148 field->value = nxt_unit_sptr_get(&f->value);
4149
4150 ret = nxt_http_field_process(field, &nxt_response_fields_hash, r);
4151 if (nxt_slow_path(ret != NXT_OK)) {
4152 goto fail;
4153 }
4154
4155 nxt_debug(task, "header%s: %*s: %*s",
4156 (field->skip ? " skipped" : ""),
4157 (size_t) field->name_length, field->name,
4158 (size_t) field->value_length, field->value);
4159
4160 if (field->skip) {
4161 r->resp.fields->last->nelts--;
4162 }
4163 }
4164
4165 r->status = resp->status;
4166
4167 if (resp->piggyback_content_length != 0) {
4168 b->mem.pos = nxt_unit_sptr_get(&resp->piggyback_content);
4169 b->mem.free = b->mem.pos + resp->piggyback_content_length;
4170
4171 } else {
4172 b->mem.pos = b->mem.free;
4173 }
4174
4175 if (nxt_buf_mem_used_size(&b->mem) == 0) {
4176 next = b->next;
4177 b->next = NULL;
4178
4179 nxt_work_queue_add(&task->thread->engine->fast_work_queue,
4180 b->completion_handler, task, b, b->parent);
4181
4182 b = next;
4183 }
4184
4185 if (b != NULL) {
4186 nxt_buf_chain_add(&r->out, b);
4187 }
4188
4189 nxt_http_request_header_send(task, r, nxt_http_request_send_body, NULL);
4190
4191 if (r->websocket_handshake
4192 && r->status == NXT_HTTP_SWITCHING_PROTOCOLS)
4193 {
4194 app_port = req_rpc_data->app_port;
4195 if (nxt_slow_path(app_port == NULL)) {
4196 goto fail;
4197 }
4198
4199 nxt_thread_mutex_lock(&app->mutex);
4200
4201 app_port->main_app_port->active_websockets++;
4202
4203 nxt_thread_mutex_unlock(&app->mutex);
4204
4205 nxt_router_app_port_release(task, app, app_port, NXT_APR_UPGRADE);
4206 req_rpc_data->apr_action = NXT_APR_CLOSE;
4207
4208 nxt_debug(task, "stream #%uD upgrade", req_rpc_data->stream);
4209
4210 r->state = &nxt_http_websocket;
4211
4212 } else {
4213 r->state = &nxt_http_request_send_state;
4214 }
4215 }
4216
4217 return;
4218
4219 fail:
4220
4221 nxt_http_request_error(task, r, NXT_HTTP_SERVICE_UNAVAILABLE);
4222
4223 nxt_request_rpc_data_unlink(task, req_rpc_data);
4224 }
4225
4226
4227 static void
nxt_router_req_headers_ack_handler(nxt_task_t * task,nxt_port_recv_msg_t * msg,nxt_request_rpc_data_t * req_rpc_data)4228 nxt_router_req_headers_ack_handler(nxt_task_t *task,
4229 nxt_port_recv_msg_t *msg, nxt_request_rpc_data_t *req_rpc_data)
4230 {
4231 int res;
4232 nxt_app_t *app;
4233 nxt_buf_t *b;
4234 nxt_bool_t start_process, unlinked;
4235 nxt_port_t *app_port, *main_app_port, *idle_port;
4236 nxt_queue_link_t *idle_lnk;
4237 nxt_http_request_t *r;
4238
4239 nxt_debug(task, "stream #%uD: got ack from %PI:%d",
4240 req_rpc_data->stream,
4241 msg->port_msg.pid, msg->port_msg.reply_port);
4242
4243 nxt_port_rpc_ex_set_peer(task, msg->port, req_rpc_data,
4244 msg->port_msg.pid);
4245
4246 app = req_rpc_data->app;
4247 r = req_rpc_data->request;
4248
4249 start_process = 0;
4250 unlinked = 0;
4251
4252 nxt_thread_mutex_lock(&app->mutex);
4253
4254 if (r->app_link.next != NULL) {
4255 nxt_queue_remove(&r->app_link);
4256 r->app_link.next = NULL;
4257
4258 unlinked = 1;
4259 }
4260
4261 app_port = nxt_port_hash_find(&app->port_hash, msg->port_msg.pid,
4262 msg->port_msg.reply_port);
4263 if (nxt_slow_path(app_port == NULL)) {
4264 nxt_thread_mutex_unlock(&app->mutex);
4265
4266 nxt_http_request_error(task, r, NXT_HTTP_INTERNAL_SERVER_ERROR);
4267
4268 if (unlinked) {
4269 nxt_mp_release(r->mem_pool);
4270 }
4271
4272 return;
4273 }
4274
4275 main_app_port = app_port->main_app_port;
4276
4277 if (nxt_queue_chk_remove(&main_app_port->idle_link)) {
4278 app->idle_processes--;
4279
4280 nxt_debug(task, "app '%V' move port %PI:%d out of %s (ack)",
4281 &app->name, main_app_port->pid, main_app_port->id,
4282 (main_app_port->idle_start ? "idle_ports" : "spare_ports"));
4283
4284 /* Check port was in 'spare_ports' using idle_start field. */
4285 if (main_app_port->idle_start == 0
4286 && app->idle_processes >= app->spare_processes)
4287 {
4288 /*
4289 * If there is a vacant space in spare ports,
4290 * move the last idle to spare_ports.
4291 */
4292 nxt_assert(!nxt_queue_is_empty(&app->idle_ports));
4293
4294 idle_lnk = nxt_queue_last(&app->idle_ports);
4295 idle_port = nxt_queue_link_data(idle_lnk, nxt_port_t, idle_link);
4296 nxt_queue_remove(idle_lnk);
4297
4298 nxt_queue_insert_tail(&app->spare_ports, idle_lnk);
4299
4300 idle_port->idle_start = 0;
4301
4302 nxt_debug(task, "app '%V' move port %PI:%d from idle_ports "
4303 "to spare_ports",
4304 &app->name, idle_port->pid, idle_port->id);
4305 }
4306
4307 if (nxt_router_app_can_start(app) && nxt_router_app_need_start(app)) {
4308 app->pending_processes++;
4309 start_process = 1;
4310 }
4311 }
4312
4313 main_app_port->active_requests++;
4314
4315 nxt_port_inc_use(app_port);
4316
4317 nxt_thread_mutex_unlock(&app->mutex);
4318
4319 if (unlinked) {
4320 nxt_mp_release(r->mem_pool);
4321 }
4322
4323 if (start_process) {
4324 nxt_router_start_app_process(task, app);
4325 }
4326
4327 nxt_port_use(task, req_rpc_data->app_port, -1);
4328
4329 req_rpc_data->app_port = app_port;
4330
4331 b = req_rpc_data->msg_info.buf;
4332
4333 if (b != NULL) {
4334 /* First buffer is already sent. Start from second. */
4335 b = b->next;
4336
4337 req_rpc_data->msg_info.buf->next = NULL;
4338 }
4339
4340 if (req_rpc_data->msg_info.body_fd != -1 || b != NULL) {
4341 nxt_debug(task, "stream #%uD: send body fd %d", req_rpc_data->stream,
4342 req_rpc_data->msg_info.body_fd);
4343
4344 if (req_rpc_data->msg_info.body_fd != -1) {
4345 lseek(req_rpc_data->msg_info.body_fd, 0, SEEK_SET);
4346 }
4347
4348 res = nxt_port_socket_write(task, app_port, NXT_PORT_MSG_REQ_BODY,
4349 req_rpc_data->msg_info.body_fd,
4350 req_rpc_data->stream,
4351 task->thread->engine->port->id, b);
4352
4353 if (nxt_slow_path(res != NXT_OK)) {
4354 nxt_http_request_error(task, r, NXT_HTTP_INTERNAL_SERVER_ERROR);
4355 }
4356 }
4357
4358 if (app->timeout != 0) {
4359 r->timer.handler = nxt_router_app_timeout;
4360 r->timer_data = req_rpc_data;
4361 nxt_timer_add(task->thread->engine, &r->timer, app->timeout);
4362 }
4363 }
4364
4365
4366 static const nxt_http_request_state_t nxt_http_request_send_state
4367 nxt_aligned(64) =
4368 {
4369 .error_handler = nxt_http_request_error_handler,
4370 };
4371
4372
4373 static void
nxt_http_request_send_body(nxt_task_t * task,void * obj,void * data)4374 nxt_http_request_send_body(nxt_task_t *task, void *obj, void *data)
4375 {
4376 nxt_buf_t *out;
4377 nxt_http_request_t *r;
4378
4379 r = obj;
4380
4381 out = r->out;
4382
4383 if (out != NULL) {
4384 r->out = NULL;
4385 nxt_http_request_send(task, r, out);
4386 }
4387 }
4388
4389
4390 static void
nxt_router_response_error_handler(nxt_task_t * task,nxt_port_recv_msg_t * msg,void * data)4391 nxt_router_response_error_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg,
4392 void *data)
4393 {
4394 nxt_request_rpc_data_t *req_rpc_data;
4395
4396 req_rpc_data = data;
4397
4398 req_rpc_data->rpc_cancel = 0;
4399
4400 /* TODO cancel message and return if cancelled. */
4401 // nxt_router_msg_cancel(task, &req_rpc_data->msg_info, req_rpc_data->stream);
4402
4403 if (req_rpc_data->request != NULL) {
4404 nxt_http_request_error(task, req_rpc_data->request,
4405 NXT_HTTP_SERVICE_UNAVAILABLE);
4406 }
4407
4408 nxt_request_rpc_data_unlink(task, req_rpc_data);
4409 }
4410
4411
4412 static void
nxt_router_app_port_ready(nxt_task_t * task,nxt_port_recv_msg_t * msg,void * data)4413 nxt_router_app_port_ready(nxt_task_t *task, nxt_port_recv_msg_t *msg,
4414 void *data)
4415 {
4416 uint32_t n;
4417 nxt_app_t *app;
4418 nxt_bool_t start_process, restarted;
4419 nxt_port_t *port;
4420 nxt_app_joint_t *app_joint;
4421 nxt_app_joint_rpc_t *app_joint_rpc;
4422
4423 nxt_assert(data != NULL);
4424
4425 app_joint_rpc = data;
4426 app_joint = app_joint_rpc->app_joint;
4427 port = msg->u.new_port;
4428
4429 nxt_assert(app_joint != NULL);
4430 nxt_assert(port != NULL);
4431 nxt_assert(port->id == 0);
4432
4433 app = app_joint->app;
4434
4435 nxt_router_app_joint_use(task, app_joint, -1);
4436
4437 if (nxt_slow_path(app == NULL)) {
4438 nxt_debug(task, "new port ready for released app, send QUIT");
4439
4440 nxt_port_socket_write(task, port, NXT_PORT_MSG_QUIT, -1, 0, 0, NULL);
4441
4442 return;
4443 }
4444
4445 nxt_thread_mutex_lock(&app->mutex);
4446
4447 restarted = (app->generation != app_joint_rpc->generation);
4448
4449 if (app_joint_rpc->proto) {
4450 nxt_assert(app->proto_port == NULL);
4451 nxt_assert(port->type == NXT_PROCESS_PROTOTYPE);
4452
4453 n = app->proto_port_requests;
4454 app->proto_port_requests = 0;
4455
4456 if (nxt_slow_path(restarted)) {
4457 nxt_thread_mutex_unlock(&app->mutex);
4458
4459 nxt_debug(task, "proto port ready for restarted app, send QUIT");
4460
4461 nxt_port_socket_write(task, port, NXT_PORT_MSG_QUIT, -1, 0, 0,
4462 NULL);
4463
4464 } else {
4465 port->app = app;
4466 app->proto_port = port;
4467
4468 nxt_thread_mutex_unlock(&app->mutex);
4469
4470 nxt_port_use(task, port, 1);
4471 }
4472
4473 port = task->thread->runtime->port_by_type[NXT_PROCESS_ROUTER];
4474
4475 while (n > 0) {
4476 nxt_router_app_use(task, app, 1);
4477
4478 nxt_router_start_app_process_handler(task, port, app);
4479
4480 n--;
4481 }
4482
4483 return;
4484 }
4485
4486 nxt_assert(port->type == NXT_PROCESS_APP);
4487 nxt_assert(app->pending_processes != 0);
4488
4489 app->pending_processes--;
4490
4491 if (nxt_slow_path(restarted)) {
4492 nxt_debug(task, "new port ready for restarted app, send QUIT");
4493
4494 start_process = !task->thread->engine->shutdown
4495 && nxt_router_app_can_start(app)
4496 && nxt_router_app_need_start(app);
4497
4498 if (start_process) {
4499 app->pending_processes++;
4500 }
4501
4502 nxt_thread_mutex_unlock(&app->mutex);
4503
4504 nxt_port_socket_write(task, port, NXT_PORT_MSG_QUIT, -1, 0, 0, NULL);
4505
4506 if (start_process) {
4507 nxt_router_start_app_process(task, app);
4508 }
4509
4510 return;
4511 }
4512
4513 port->app = app;
4514 port->main_app_port = port;
4515
4516 app->processes++;
4517 nxt_port_hash_add(&app->port_hash, port);
4518 app->port_hash_count++;
4519
4520 nxt_thread_mutex_unlock(&app->mutex);
4521
4522 nxt_debug(task, "app '%V' new port ready, pid %PI, %d/%d",
4523 &app->name, port->pid, app->processes, app->pending_processes);
4524
4525 nxt_port_socket_write(task, port, NXT_PORT_MSG_PORT_ACK, -1, 0, 0, NULL);
4526
4527 nxt_router_app_port_release(task, app, port, NXT_APR_NEW_PORT);
4528 }
4529
4530
4531 static void
nxt_router_app_port_error(nxt_task_t * task,nxt_port_recv_msg_t * msg,void * data)4532 nxt_router_app_port_error(nxt_task_t *task, nxt_port_recv_msg_t *msg,
4533 void *data)
4534 {
4535 nxt_app_t *app;
4536 nxt_app_joint_t *app_joint;
4537 nxt_queue_link_t *link;
4538 nxt_http_request_t *r;
4539 nxt_app_joint_rpc_t *app_joint_rpc;
4540
4541 nxt_assert(data != NULL);
4542
4543 app_joint_rpc = data;
4544 app_joint = app_joint_rpc->app_joint;
4545
4546 nxt_assert(app_joint != NULL);
4547
4548 app = app_joint->app;
4549
4550 nxt_router_app_joint_use(task, app_joint, -1);
4551
4552 if (nxt_slow_path(app == NULL)) {
4553 nxt_debug(task, "start error for released app");
4554
4555 return;
4556 }
4557
4558 nxt_debug(task, "app '%V' %p start error", &app->name, app);
4559
4560 link = NULL;
4561
4562 nxt_thread_mutex_lock(&app->mutex);
4563
4564 nxt_assert(app->pending_processes != 0);
4565
4566 app->pending_processes--;
4567
4568 if (app->processes == 0 && !nxt_queue_is_empty(&app->ack_waiting_req)) {
4569 link = nxt_queue_first(&app->ack_waiting_req);
4570
4571 nxt_queue_remove(link);
4572 link->next = NULL;
4573 }
4574
4575 nxt_thread_mutex_unlock(&app->mutex);
4576
4577 while (link != NULL) {
4578 r = nxt_container_of(link, nxt_http_request_t, app_link);
4579
4580 nxt_event_engine_post(r->engine, &r->err_work);
4581
4582 link = NULL;
4583
4584 nxt_thread_mutex_lock(&app->mutex);
4585
4586 if (app->processes == 0 && app->pending_processes == 0
4587 && !nxt_queue_is_empty(&app->ack_waiting_req))
4588 {
4589 link = nxt_queue_first(&app->ack_waiting_req);
4590
4591 nxt_queue_remove(link);
4592 link->next = NULL;
4593 }
4594
4595 nxt_thread_mutex_unlock(&app->mutex);
4596 }
4597 }
4598
4599
4600 nxt_inline nxt_port_t *
nxt_router_app_get_port_for_quit(nxt_task_t * task,nxt_app_t * app)4601 nxt_router_app_get_port_for_quit(nxt_task_t *task, nxt_app_t *app)
4602 {
4603 nxt_port_t *port;
4604
4605 port = NULL;
4606
4607 nxt_thread_mutex_lock(&app->mutex);
4608
4609 nxt_queue_each(port, &app->ports, nxt_port_t, app_link) {
4610
4611 /* Caller is responsible to decrease port use count. */
4612 nxt_queue_chk_remove(&port->app_link);
4613
4614 if (nxt_queue_chk_remove(&port->idle_link)) {
4615 app->idle_processes--;
4616
4617 nxt_debug(task, "app '%V' move port %PI:%d out of %s for quit",
4618 &app->name, port->pid, port->id,
4619 (port->idle_start ? "idle_ports" : "spare_ports"));
4620 }
4621
4622 nxt_port_hash_remove(&app->port_hash, port);
4623 app->port_hash_count--;
4624
4625 port->app = NULL;
4626 app->processes--;
4627
4628 break;
4629
4630 } nxt_queue_loop;
4631
4632 nxt_thread_mutex_unlock(&app->mutex);
4633
4634 return port;
4635 }
4636
4637
4638 static void
nxt_router_app_use(nxt_task_t * task,nxt_app_t * app,int i)4639 nxt_router_app_use(nxt_task_t *task, nxt_app_t *app, int i)
4640 {
4641 int c;
4642
4643 c = nxt_atomic_fetch_add(&app->use_count, i);
4644
4645 if (i < 0 && c == -i) {
4646
4647 if (task->thread->engine != app->engine) {
4648 nxt_event_engine_post(app->engine, &app->joint->free_app_work);
4649
4650 } else {
4651 nxt_router_free_app(task, app->joint, NULL);
4652 }
4653 }
4654 }
4655
4656
4657 static void
nxt_router_app_unlink(nxt_task_t * task,nxt_app_t * app)4658 nxt_router_app_unlink(nxt_task_t *task, nxt_app_t *app)
4659 {
4660 nxt_debug(task, "app '%V' %p unlink", &app->name, app);
4661
4662 nxt_queue_remove(&app->link);
4663
4664 nxt_router_app_use(task, app, -1);
4665 }
4666
4667
4668 static void
nxt_router_app_port_release(nxt_task_t * task,nxt_app_t * app,nxt_port_t * port,nxt_apr_action_t action)4669 nxt_router_app_port_release(nxt_task_t *task, nxt_app_t *app, nxt_port_t *port,
4670 nxt_apr_action_t action)
4671 {
4672 int inc_use;
4673 uint32_t got_response, dec_requests;
4674 nxt_bool_t adjust_idle_timer;
4675 nxt_port_t *main_app_port;
4676
4677 nxt_assert(port != NULL);
4678
4679 inc_use = 0;
4680 got_response = 0;
4681 dec_requests = 0;
4682
4683 switch (action) {
4684 case NXT_APR_NEW_PORT:
4685 break;
4686 case NXT_APR_REQUEST_FAILED:
4687 dec_requests = 1;
4688 inc_use = -1;
4689 break;
4690 case NXT_APR_GOT_RESPONSE:
4691 got_response = 1;
4692 inc_use = -1;
4693 break;
4694 case NXT_APR_UPGRADE:
4695 got_response = 1;
4696 break;
4697 case NXT_APR_CLOSE:
4698 inc_use = -1;
4699 break;
4700 }
4701
4702 nxt_debug(task, "app '%V' release port %PI:%d: %d %d", &app->name,
4703 port->pid, port->id,
4704 (int) inc_use, (int) got_response);
4705
4706 if (port->id == NXT_SHARED_PORT_ID) {
4707 nxt_thread_mutex_lock(&app->mutex);
4708
4709 app->active_requests -= got_response + dec_requests;
4710
4711 nxt_thread_mutex_unlock(&app->mutex);
4712
4713 goto adjust_use;
4714 }
4715
4716 main_app_port = port->main_app_port;
4717
4718 nxt_thread_mutex_lock(&app->mutex);
4719
4720 main_app_port->active_requests -= got_response + dec_requests;
4721 app->active_requests -= got_response + dec_requests;
4722
4723 if (main_app_port->pair[1] != -1 && main_app_port->app_link.next == NULL) {
4724 nxt_queue_insert_tail(&app->ports, &main_app_port->app_link);
4725
4726 nxt_port_inc_use(main_app_port);
4727 }
4728
4729 adjust_idle_timer = 0;
4730
4731 if (main_app_port->pair[1] != -1
4732 && main_app_port->active_requests == 0
4733 && main_app_port->active_websockets == 0
4734 && main_app_port->idle_link.next == NULL)
4735 {
4736 if (app->idle_processes == app->spare_processes
4737 && app->adjust_idle_work.data == NULL)
4738 {
4739 adjust_idle_timer = 1;
4740 app->adjust_idle_work.data = app;
4741 app->adjust_idle_work.next = NULL;
4742 }
4743
4744 if (app->idle_processes < app->spare_processes) {
4745 nxt_queue_insert_tail(&app->spare_ports, &main_app_port->idle_link);
4746
4747 nxt_debug(task, "app '%V' move port %PI:%d to spare_ports",
4748 &app->name, main_app_port->pid, main_app_port->id);
4749 } else {
4750 nxt_queue_insert_tail(&app->idle_ports, &main_app_port->idle_link);
4751
4752 main_app_port->idle_start = task->thread->engine->timers.now;
4753
4754 nxt_debug(task, "app '%V' move port %PI:%d to idle_ports",
4755 &app->name, main_app_port->pid, main_app_port->id);
4756 }
4757
4758 app->idle_processes++;
4759 }
4760
4761 nxt_thread_mutex_unlock(&app->mutex);
4762
4763 if (adjust_idle_timer) {
4764 nxt_router_app_use(task, app, 1);
4765 nxt_event_engine_post(app->engine, &app->adjust_idle_work);
4766 }
4767
4768 /* ? */
4769 if (main_app_port->pair[1] == -1) {
4770 nxt_debug(task, "app '%V' %p port %p already closed (pid %PI dead?)",
4771 &app->name, app, main_app_port, main_app_port->pid);
4772
4773 goto adjust_use;
4774 }
4775
4776 nxt_debug(task, "app '%V' %p requests queue is empty, keep the port",
4777 &app->name, app);
4778
4779 adjust_use:
4780
4781 nxt_port_use(task, port, inc_use);
4782 }
4783
4784
4785 void
nxt_router_app_port_close(nxt_task_t * task,nxt_port_t * port)4786 nxt_router_app_port_close(nxt_task_t *task, nxt_port_t *port)
4787 {
4788 nxt_app_t *app;
4789 nxt_bool_t unchain, start_process;
4790 nxt_port_t *idle_port;
4791 nxt_queue_link_t *idle_lnk;
4792
4793 app = port->app;
4794
4795 nxt_assert(app != NULL);
4796
4797 nxt_thread_mutex_lock(&app->mutex);
4798
4799 if (port == app->proto_port) {
4800 app->proto_port = NULL;
4801 port->app = NULL;
4802
4803 nxt_thread_mutex_unlock(&app->mutex);
4804
4805 nxt_debug(task, "app '%V' prototype pid %PI closed", &app->name,
4806 port->pid);
4807
4808 nxt_port_use(task, port, -1);
4809
4810 return;
4811 }
4812
4813 nxt_port_hash_remove(&app->port_hash, port);
4814 app->port_hash_count--;
4815
4816 if (port->id != 0) {
4817 nxt_thread_mutex_unlock(&app->mutex);
4818
4819 nxt_debug(task, "app '%V' port (%PI, %d) closed", &app->name,
4820 port->pid, port->id);
4821
4822 return;
4823 }
4824
4825 unchain = nxt_queue_chk_remove(&port->app_link);
4826
4827 if (nxt_queue_chk_remove(&port->idle_link)) {
4828 app->idle_processes--;
4829
4830 nxt_debug(task, "app '%V' move port %PI:%d out of %s before close",
4831 &app->name, port->pid, port->id,
4832 (port->idle_start ? "idle_ports" : "spare_ports"));
4833
4834 if (port->idle_start == 0
4835 && app->idle_processes >= app->spare_processes)
4836 {
4837 nxt_assert(!nxt_queue_is_empty(&app->idle_ports));
4838
4839 idle_lnk = nxt_queue_last(&app->idle_ports);
4840 idle_port = nxt_queue_link_data(idle_lnk, nxt_port_t, idle_link);
4841 nxt_queue_remove(idle_lnk);
4842
4843 nxt_queue_insert_tail(&app->spare_ports, idle_lnk);
4844
4845 idle_port->idle_start = 0;
4846
4847 nxt_debug(task, "app '%V' move port %PI:%d from idle_ports "
4848 "to spare_ports",
4849 &app->name, idle_port->pid, idle_port->id);
4850 }
4851 }
4852
4853 app->processes--;
4854
4855 start_process = !task->thread->engine->shutdown
4856 && nxt_router_app_can_start(app)
4857 && nxt_router_app_need_start(app);
4858
4859 if (start_process) {
4860 app->pending_processes++;
4861 }
4862
4863 nxt_thread_mutex_unlock(&app->mutex);
4864
4865 nxt_debug(task, "app '%V' pid %PI closed", &app->name, port->pid);
4866
4867 if (unchain) {
4868 nxt_port_use(task, port, -1);
4869 }
4870
4871 if (start_process) {
4872 nxt_router_start_app_process(task, app);
4873 }
4874 }
4875
4876
4877 static void
nxt_router_adjust_idle_timer(nxt_task_t * task,void * obj,void * data)4878 nxt_router_adjust_idle_timer(nxt_task_t *task, void *obj, void *data)
4879 {
4880 nxt_app_t *app;
4881 nxt_bool_t queued;
4882 nxt_port_t *port;
4883 nxt_msec_t timeout, threshold;
4884 nxt_queue_link_t *lnk;
4885 nxt_event_engine_t *engine;
4886
4887 app = obj;
4888 queued = (data == app);
4889
4890 nxt_debug(task, "nxt_router_adjust_idle_timer: app \"%V\", queued %b",
4891 &app->name, queued);
4892
4893 engine = task->thread->engine;
4894
4895 nxt_assert(app->engine == engine);
4896
4897 threshold = engine->timers.now + app->joint->idle_timer.bias;
4898 timeout = 0;
4899
4900 nxt_thread_mutex_lock(&app->mutex);
4901
4902 if (queued) {
4903 app->adjust_idle_work.data = NULL;
4904 }
4905
4906 nxt_debug(task, "app '%V' idle_processes %d, spare_processes %d",
4907 &app->name,
4908 (int) app->idle_processes, (int) app->spare_processes);
4909
4910 while (app->idle_processes > app->spare_processes) {
4911
4912 nxt_assert(!nxt_queue_is_empty(&app->idle_ports));
4913
4914 lnk = nxt_queue_first(&app->idle_ports);
4915 port = nxt_queue_link_data(lnk, nxt_port_t, idle_link);
4916
4917 timeout = port->idle_start + app->idle_timeout;
4918
4919 nxt_debug(task, "app '%V' pid %PI, start %M, timeout %M, threshold %M",
4920 &app->name, port->pid,
4921 port->idle_start, timeout, threshold);
4922
4923 if (timeout > threshold) {
4924 break;
4925 }
4926
4927 nxt_queue_remove(lnk);
4928 lnk->next = NULL;
4929
4930 nxt_debug(task, "app '%V' move port %PI:%d out of idle_ports (timeout)",
4931 &app->name, port->pid, port->id);
4932
4933 nxt_queue_chk_remove(&port->app_link);
4934
4935 nxt_port_hash_remove(&app->port_hash, port);
4936 app->port_hash_count--;
4937
4938 app->idle_processes--;
4939 app->processes--;
4940 port->app = NULL;
4941
4942 nxt_thread_mutex_unlock(&app->mutex);
4943
4944 nxt_debug(task, "app '%V' send QUIT to idle port %PI",
4945 &app->name, port->pid);
4946
4947 nxt_port_socket_write(task, port, NXT_PORT_MSG_QUIT, -1, 0, 0, NULL);
4948
4949 nxt_port_use(task, port, -1);
4950
4951 nxt_thread_mutex_lock(&app->mutex);
4952 }
4953
4954 nxt_thread_mutex_unlock(&app->mutex);
4955
4956 if (timeout > threshold) {
4957 nxt_timer_add(engine, &app->joint->idle_timer, timeout - threshold);
4958
4959 } else {
4960 nxt_timer_disable(engine, &app->joint->idle_timer);
4961 }
4962
4963 if (queued) {
4964 nxt_router_app_use(task, app, -1);
4965 }
4966 }
4967
4968
4969 static void
nxt_router_app_idle_timeout(nxt_task_t * task,void * obj,void * data)4970 nxt_router_app_idle_timeout(nxt_task_t *task, void *obj, void *data)
4971 {
4972 nxt_timer_t *timer;
4973 nxt_app_joint_t *app_joint;
4974
4975 timer = obj;
4976 app_joint = nxt_container_of(timer, nxt_app_joint_t, idle_timer);
4977
4978 if (nxt_fast_path(app_joint->app != NULL)) {
4979 nxt_router_adjust_idle_timer(task, app_joint->app, NULL);
4980 }
4981 }
4982
4983
4984 static void
nxt_router_app_joint_release_handler(nxt_task_t * task,void * obj,void * data)4985 nxt_router_app_joint_release_handler(nxt_task_t *task, void *obj, void *data)
4986 {
4987 nxt_timer_t *timer;
4988 nxt_app_joint_t *app_joint;
4989
4990 timer = obj;
4991 app_joint = nxt_container_of(timer, nxt_app_joint_t, idle_timer);
4992
4993 nxt_router_app_joint_use(task, app_joint, -1);
4994 }
4995
4996
4997 static void
nxt_router_free_app(nxt_task_t * task,void * obj,void * data)4998 nxt_router_free_app(nxt_task_t *task, void *obj, void *data)
4999 {
5000 nxt_app_t *app;
5001 nxt_port_t *port, *proto_port;
5002 nxt_app_joint_t *app_joint;
5003
5004 app_joint = obj;
5005 app = app_joint->app;
5006
5007 for ( ;; ) {
5008 port = nxt_router_app_get_port_for_quit(task, app);
5009 if (port == NULL) {
5010 break;
5011 }
5012
5013 nxt_port_use(task, port, -1);
5014 }
5015
5016 nxt_thread_mutex_lock(&app->mutex);
5017
5018 for ( ;; ) {
5019 port = nxt_port_hash_retrieve(&app->port_hash);
5020 if (port == NULL) {
5021 break;
5022 }
5023
5024 app->port_hash_count--;
5025
5026 port->app = NULL;
5027
5028 nxt_port_close(task, port);
5029
5030 nxt_port_use(task, port, -1);
5031 }
5032
5033 proto_port = app->proto_port;
5034
5035 if (proto_port != NULL) {
5036 nxt_debug(task, "send QUIT to prototype '%V' pid %PI", &app->name,
5037 proto_port->pid);
5038
5039 app->proto_port = NULL;
5040 proto_port->app = NULL;
5041 }
5042
5043 nxt_thread_mutex_unlock(&app->mutex);
5044
5045 if (proto_port != NULL) {
5046 nxt_port_socket_write(task, proto_port, NXT_PORT_MSG_QUIT,
5047 -1, 0, 0, NULL);
5048
5049 nxt_port_close(task, proto_port);
5050
5051 nxt_port_use(task, proto_port, -1);
5052 }
5053
5054 nxt_assert(app->proto_port == NULL);
5055 nxt_assert(app->processes == 0);
5056 nxt_assert(app->active_requests == 0);
5057 nxt_assert(app->port_hash_count == 0);
5058 nxt_assert(app->idle_processes == 0);
5059 nxt_assert(nxt_queue_is_empty(&app->ports));
5060 nxt_assert(nxt_queue_is_empty(&app->spare_ports));
5061 nxt_assert(nxt_queue_is_empty(&app->idle_ports));
5062
5063 nxt_port_mmaps_destroy(&app->outgoing, 1);
5064
5065 nxt_thread_mutex_destroy(&app->outgoing.mutex);
5066
5067 if (app->shared_port != NULL) {
5068 app->shared_port->app = NULL;
5069 nxt_port_close(task, app->shared_port);
5070 nxt_port_use(task, app->shared_port, -1);
5071
5072 app->shared_port = NULL;
5073 }
5074
5075 nxt_thread_mutex_destroy(&app->mutex);
5076 nxt_mp_destroy(app->mem_pool);
5077
5078 app_joint->app = NULL;
5079
5080 if (nxt_timer_delete(task->thread->engine, &app_joint->idle_timer)) {
5081 app_joint->idle_timer.handler = nxt_router_app_joint_release_handler;
5082 nxt_timer_add(task->thread->engine, &app_joint->idle_timer, 0);
5083
5084 } else {
5085 nxt_router_app_joint_use(task, app_joint, -1);
5086 }
5087 }
5088
5089
5090 static void
nxt_router_app_port_get(nxt_task_t * task,nxt_app_t * app,nxt_request_rpc_data_t * req_rpc_data)5091 nxt_router_app_port_get(nxt_task_t *task, nxt_app_t *app,
5092 nxt_request_rpc_data_t *req_rpc_data)
5093 {
5094 nxt_bool_t start_process;
5095 nxt_port_t *port;
5096 nxt_http_request_t *r;
5097
5098 start_process = 0;
5099
5100 nxt_thread_mutex_lock(&app->mutex);
5101
5102 port = app->shared_port;
5103 nxt_port_inc_use(port);
5104
5105 app->active_requests++;
5106
5107 if (nxt_router_app_can_start(app) && nxt_router_app_need_start(app)) {
5108 app->pending_processes++;
5109 start_process = 1;
5110 }
5111
5112 r = req_rpc_data->request;
5113
5114 /*
5115 * Put request into application-wide list to be able to cancel request
5116 * if something goes wrong with application processes.
5117 */
5118 nxt_queue_insert_tail(&app->ack_waiting_req, &r->app_link);
5119
5120 nxt_thread_mutex_unlock(&app->mutex);
5121
5122 /*
5123 * Retain request memory pool while request is linked in ack_waiting_req
5124 * to guarantee request structure memory is accessble.
5125 */
5126 nxt_mp_retain(r->mem_pool);
5127
5128 req_rpc_data->app_port = port;
5129 req_rpc_data->apr_action = NXT_APR_REQUEST_FAILED;
5130
5131 if (start_process) {
5132 nxt_router_start_app_process(task, app);
5133 }
5134 }
5135
5136
5137 void
nxt_router_process_http_request(nxt_task_t * task,nxt_http_request_t * r,nxt_http_action_t * action)5138 nxt_router_process_http_request(nxt_task_t *task, nxt_http_request_t *r,
5139 nxt_http_action_t *action)
5140 {
5141 nxt_event_engine_t *engine;
5142 nxt_http_app_conf_t *conf;
5143 nxt_request_rpc_data_t *req_rpc_data;
5144
5145 conf = action->u.conf;
5146 engine = task->thread->engine;
5147
5148 r->app_target = conf->target;
5149
5150 req_rpc_data = nxt_port_rpc_register_handler_ex(task, engine->port,
5151 nxt_router_response_ready_handler,
5152 nxt_router_response_error_handler,
5153 sizeof(nxt_request_rpc_data_t));
5154 if (nxt_slow_path(req_rpc_data == NULL)) {
5155 nxt_http_request_error(task, r, NXT_HTTP_INTERNAL_SERVER_ERROR);
5156 return;
5157 }
5158
5159 /*
5160 * At this point we have request req_rpc_data allocated and registered
5161 * in port handlers. Need to fixup request memory pool. Counterpart
5162 * release will be called via following call chain:
5163 * nxt_request_rpc_data_unlink() ->
5164 * nxt_router_http_request_release_post() ->
5165 * nxt_router_http_request_release()
5166 */
5167 nxt_mp_retain(r->mem_pool);
5168
5169 r->timer.task = &engine->task;
5170 r->timer.work_queue = &engine->fast_work_queue;
5171 r->timer.log = engine->task.log;
5172 r->timer.bias = NXT_TIMER_DEFAULT_BIAS;
5173
5174 r->engine = engine;
5175 r->err_work.handler = nxt_router_http_request_error;
5176 r->err_work.task = task;
5177 r->err_work.obj = r;
5178
5179 req_rpc_data->stream = nxt_port_rpc_ex_stream(req_rpc_data);
5180 req_rpc_data->app = conf->app;
5181 req_rpc_data->msg_info.body_fd = -1;
5182 req_rpc_data->rpc_cancel = 1;
5183
5184 nxt_router_app_use(task, conf->app, 1);
5185
5186 req_rpc_data->request = r;
5187 r->req_rpc_data = req_rpc_data;
5188
5189 if (r->last != NULL) {
5190 r->last->completion_handler = nxt_router_http_request_done;
5191 }
5192
5193 nxt_router_app_port_get(task, conf->app, req_rpc_data);
5194 nxt_router_app_prepare_request(task, req_rpc_data);
5195 }
5196
5197
5198 static void
nxt_router_http_request_error(nxt_task_t * task,void * obj,void * data)5199 nxt_router_http_request_error(nxt_task_t *task, void *obj, void *data)
5200 {
5201 nxt_http_request_t *r;
5202
5203 r = obj;
5204
5205 nxt_debug(task, "router http request error (rpc_data %p)", r->req_rpc_data);
5206
5207 nxt_http_request_error(task, r, NXT_HTTP_SERVICE_UNAVAILABLE);
5208
5209 if (r->req_rpc_data != NULL) {
5210 nxt_request_rpc_data_unlink(task, r->req_rpc_data);
5211 }
5212
5213 nxt_mp_release(r->mem_pool);
5214 }
5215
5216
5217 static void
nxt_router_http_request_done(nxt_task_t * task,void * obj,void * data)5218 nxt_router_http_request_done(nxt_task_t *task, void *obj, void *data)
5219 {
5220 nxt_http_request_t *r;
5221
5222 r = data;
5223
5224 nxt_debug(task, "router http request done (rpc_data %p)", r->req_rpc_data);
5225
5226 if (r->req_rpc_data != NULL) {
5227 nxt_request_rpc_data_unlink(task, r->req_rpc_data);
5228 }
5229
5230 nxt_http_request_close_handler(task, r, r->proto.any);
5231 }
5232
5233
5234 static void
nxt_router_app_prepare_request(nxt_task_t * task,nxt_request_rpc_data_t * req_rpc_data)5235 nxt_router_app_prepare_request(nxt_task_t *task,
5236 nxt_request_rpc_data_t *req_rpc_data)
5237 {
5238 nxt_app_t *app;
5239 nxt_buf_t *buf, *body;
5240 nxt_int_t res;
5241 nxt_port_t *port, *reply_port;
5242
5243 int notify;
5244 struct {
5245 nxt_port_msg_t pm;
5246 nxt_port_mmap_msg_t mm;
5247 } msg;
5248
5249
5250 app = req_rpc_data->app;
5251
5252 nxt_assert(app != NULL);
5253
5254 port = req_rpc_data->app_port;
5255
5256 nxt_assert(port != NULL);
5257 nxt_assert(port->queue != NULL);
5258
5259 reply_port = task->thread->engine->port;
5260
5261 buf = nxt_router_prepare_msg(task, req_rpc_data->request, app,
5262 nxt_app_msg_prefix[app->type]);
5263 if (nxt_slow_path(buf == NULL)) {
5264 nxt_alert(task, "stream #%uD, app '%V': failed to prepare app message",
5265 req_rpc_data->stream, &app->name);
5266
5267 nxt_http_request_error(task, req_rpc_data->request,
5268 NXT_HTTP_INTERNAL_SERVER_ERROR);
5269
5270 return;
5271 }
5272
5273 nxt_debug(task, "about to send %O bytes buffer to app process port %d",
5274 nxt_buf_used_size(buf),
5275 port->socket.fd);
5276
5277 req_rpc_data->msg_info.buf = buf;
5278
5279 body = req_rpc_data->request->body;
5280
5281 if (body != NULL && nxt_buf_is_file(body)) {
5282 req_rpc_data->msg_info.body_fd = body->file->fd;
5283
5284 body->file->fd = -1;
5285
5286 } else {
5287 req_rpc_data->msg_info.body_fd = -1;
5288 }
5289
5290 msg.pm.stream = req_rpc_data->stream;
5291 msg.pm.pid = reply_port->pid;
5292 msg.pm.reply_port = reply_port->id;
5293 msg.pm.type = NXT_PORT_MSG_REQ_HEADERS;
5294 msg.pm.last = 0;
5295 msg.pm.mmap = 1;
5296 msg.pm.nf = 0;
5297 msg.pm.mf = 0;
5298
5299 nxt_port_mmap_handler_t *mmap_handler = buf->parent;
5300 nxt_port_mmap_header_t *hdr = mmap_handler->hdr;
5301
5302 msg.mm.mmap_id = hdr->id;
5303 msg.mm.chunk_id = nxt_port_mmap_chunk_id(hdr, buf->mem.pos);
5304 msg.mm.size = nxt_buf_used_size(buf);
5305
5306 res = nxt_app_queue_send(port->queue, &msg, sizeof(msg),
5307 req_rpc_data->stream, ¬ify,
5308 &req_rpc_data->msg_info.tracking_cookie);
5309 if (nxt_fast_path(res == NXT_OK)) {
5310 if (notify != 0) {
5311 (void) nxt_port_socket_write(task, port,
5312 NXT_PORT_MSG_READ_QUEUE,
5313 -1, req_rpc_data->stream,
5314 reply_port->id, NULL);
5315
5316 } else {
5317 nxt_debug(task, "queue is not empty");
5318 }
5319
5320 buf->is_port_mmap_sent = 1;
5321 buf->mem.pos = buf->mem.free;
5322
5323 } else {
5324 nxt_alert(task, "stream #%uD, app '%V': failed to send app message",
5325 req_rpc_data->stream, &app->name);
5326
5327 nxt_http_request_error(task, req_rpc_data->request,
5328 NXT_HTTP_INTERNAL_SERVER_ERROR);
5329 }
5330 }
5331
5332
5333 struct nxt_fields_iter_s {
5334 nxt_list_part_t *part;
5335 nxt_http_field_t *field;
5336 };
5337
5338 typedef struct nxt_fields_iter_s nxt_fields_iter_t;
5339
5340
5341 static nxt_http_field_t *
nxt_fields_part_first(nxt_list_part_t * part,nxt_fields_iter_t * i)5342 nxt_fields_part_first(nxt_list_part_t *part, nxt_fields_iter_t *i)
5343 {
5344 if (part == NULL) {
5345 return NULL;
5346 }
5347
5348 while (part->nelts == 0) {
5349 part = part->next;
5350 if (part == NULL) {
5351 return NULL;
5352 }
5353 }
5354
5355 i->part = part;
5356 i->field = nxt_list_data(i->part);
5357
5358 return i->field;
5359 }
5360
5361
5362 static nxt_http_field_t *
nxt_fields_first(nxt_list_t * fields,nxt_fields_iter_t * i)5363 nxt_fields_first(nxt_list_t *fields, nxt_fields_iter_t *i)
5364 {
5365 return nxt_fields_part_first(nxt_list_part(fields), i);
5366 }
5367
5368
5369 static nxt_http_field_t *
nxt_fields_next(nxt_fields_iter_t * i)5370 nxt_fields_next(nxt_fields_iter_t *i)
5371 {
5372 nxt_http_field_t *end = nxt_list_data(i->part);
5373
5374 end += i->part->nelts;
5375 i->field++;
5376
5377 if (i->field < end) {
5378 return i->field;
5379 }
5380
5381 return nxt_fields_part_first(i->part->next, i);
5382 }
5383
5384
5385 static nxt_buf_t *
nxt_router_prepare_msg(nxt_task_t * task,nxt_http_request_t * r,nxt_app_t * app,const nxt_str_t * prefix)5386 nxt_router_prepare_msg(nxt_task_t *task, nxt_http_request_t *r,
5387 nxt_app_t *app, const nxt_str_t *prefix)
5388 {
5389 void *target_pos, *query_pos;
5390 u_char *pos, *end, *p, c;
5391 size_t fields_count, req_size, size, free_size;
5392 size_t copy_size;
5393 nxt_off_t content_length;
5394 nxt_buf_t *b, *buf, *out, **tail;
5395 nxt_http_field_t *field, *dup;
5396 nxt_unit_field_t *dst_field;
5397 nxt_fields_iter_t iter, dup_iter;
5398 nxt_unit_request_t *req;
5399
5400 req_size = sizeof(nxt_unit_request_t)
5401 + r->method->length + 1
5402 + r->version.length + 1
5403 + r->remote->address_length + 1
5404 + r->local->address_length + 1
5405 + nxt_sockaddr_port_length(r->local) + 1
5406 + r->server_name.length + 1
5407 + r->target.length + 1
5408 + (r->path->start != r->target.start ? r->path->length + 1 : 0);
5409
5410 content_length = r->content_length_n < 0 ? 0 : r->content_length_n;
5411 fields_count = 0;
5412
5413 nxt_list_each(field, r->fields) {
5414 fields_count++;
5415
5416 req_size += field->name_length + prefix->length + 1
5417 + field->value_length + 1;
5418 } nxt_list_loop;
5419
5420 req_size += fields_count * sizeof(nxt_unit_field_t);
5421
5422 if (nxt_slow_path(req_size > PORT_MMAP_DATA_SIZE)) {
5423 nxt_alert(task, "headers to big to fit in shared memory (%d)",
5424 (int) req_size);
5425
5426 return NULL;
5427 }
5428
5429 out = nxt_port_mmap_get_buf(task, &app->outgoing,
5430 nxt_min(req_size + content_length, PORT_MMAP_DATA_SIZE));
5431 if (nxt_slow_path(out == NULL)) {
5432 return NULL;
5433 }
5434
5435 req = (nxt_unit_request_t *) out->mem.free;
5436 out->mem.free += req_size;
5437
5438 req->app_target = r->app_target;
5439
5440 req->content_length = content_length;
5441
5442 p = (u_char *) (req->fields + fields_count);
5443
5444 nxt_debug(task, "fields_count=%d", (int) fields_count);
5445
5446 req->method_length = r->method->length;
5447 nxt_unit_sptr_set(&req->method, p);
5448 p = nxt_cpymem(p, r->method->start, r->method->length);
5449 *p++ = '\0';
5450
5451 req->version_length = r->version.length;
5452 nxt_unit_sptr_set(&req->version, p);
5453 p = nxt_cpymem(p, r->version.start, r->version.length);
5454 *p++ = '\0';
5455
5456 req->remote_length = r->remote->address_length;
5457 nxt_unit_sptr_set(&req->remote, p);
5458 p = nxt_cpymem(p, nxt_sockaddr_address(r->remote),
5459 r->remote->address_length);
5460 *p++ = '\0';
5461
5462 req->local_addr_length = r->local->address_length;
5463 nxt_unit_sptr_set(&req->local_addr, p);
5464 p = nxt_cpymem(p, nxt_sockaddr_address(r->local), r->local->address_length);
5465 *p++ = '\0';
5466
5467 req->local_port_length = nxt_sockaddr_port_length(r->local);
5468 nxt_unit_sptr_set(&req->local_port, p);
5469 p = nxt_cpymem(p, nxt_sockaddr_port(r->local),
5470 nxt_sockaddr_port_length(r->local));
5471 *p++ = '\0';
5472
5473 req->tls = r->tls;
5474 req->websocket_handshake = r->websocket_handshake;
5475
5476 req->server_name_length = r->server_name.length;
5477 nxt_unit_sptr_set(&req->server_name, p);
5478 p = nxt_cpymem(p, r->server_name.start, r->server_name.length);
5479 *p++ = '\0';
5480
5481 target_pos = p;
5482 req->target_length = (uint32_t) r->target.length;
5483 nxt_unit_sptr_set(&req->target, p);
5484 p = nxt_cpymem(p, r->target.start, r->target.length);
5485 *p++ = '\0';
5486
5487 req->path_length = (uint32_t) r->path->length;
5488 if (r->path->start == r->target.start) {
5489 nxt_unit_sptr_set(&req->path, target_pos);
5490
5491 } else {
5492 nxt_unit_sptr_set(&req->path, p);
5493 p = nxt_cpymem(p, r->path->start, r->path->length);
5494 *p++ = '\0';
5495 }
5496
5497 req->query_length = (uint32_t) r->args->length;
5498 if (r->args->start != NULL) {
5499 query_pos = nxt_pointer_to(target_pos,
5500 r->args->start - r->target.start);
5501
5502 nxt_unit_sptr_set(&req->query, query_pos);
5503
5504 } else {
5505 req->query.offset = 0;
5506 }
5507
5508 req->content_length_field = NXT_UNIT_NONE_FIELD;
5509 req->content_type_field = NXT_UNIT_NONE_FIELD;
5510 req->cookie_field = NXT_UNIT_NONE_FIELD;
5511 req->authorization_field = NXT_UNIT_NONE_FIELD;
5512
5513 dst_field = req->fields;
5514
5515 for (field = nxt_fields_first(r->fields, &iter);
5516 field != NULL;
5517 field = nxt_fields_next(&iter))
5518 {
5519 if (field->skip) {
5520 continue;
5521 }
5522
5523 dst_field->hash = field->hash;
5524 dst_field->skip = 0;
5525 dst_field->name_length = field->name_length + prefix->length;
5526 dst_field->value_length = field->value_length;
5527
5528 if (field == r->content_length) {
5529 req->content_length_field = dst_field - req->fields;
5530
5531 } else if (field == r->content_type) {
5532 req->content_type_field = dst_field - req->fields;
5533
5534 } else if (field == r->cookie) {
5535 req->cookie_field = dst_field - req->fields;
5536
5537 } else if (field == r->authorization) {
5538 req->authorization_field = dst_field - req->fields;
5539 }
5540
5541 nxt_debug(task, "add field 0x%04Xd, %d, %d, %p : %d %p",
5542 (int) field->hash, (int) field->skip,
5543 (int) field->name_length, field->name,
5544 (int) field->value_length, field->value);
5545
5546 if (prefix->length != 0) {
5547 nxt_unit_sptr_set(&dst_field->name, p);
5548 p = nxt_cpymem(p, prefix->start, prefix->length);
5549
5550 end = field->name + field->name_length;
5551 for (pos = field->name; pos < end; pos++) {
5552 c = *pos;
5553
5554 if (c >= 'a' && c <= 'z') {
5555 *p++ = (c & ~0x20);
5556 continue;
5557 }
5558
5559 if (c == '-') {
5560 *p++ = '_';
5561 continue;
5562 }
5563
5564 *p++ = c;
5565 }
5566
5567 } else {
5568 nxt_unit_sptr_set(&dst_field->name, p);
5569 p = nxt_cpymem(p, field->name, field->name_length);
5570 }
5571
5572 *p++ = '\0';
5573
5574 nxt_unit_sptr_set(&dst_field->value, p);
5575 p = nxt_cpymem(p, field->value, field->value_length);
5576
5577 if (prefix->length != 0) {
5578 dup_iter = iter;
5579
5580 for (dup = nxt_fields_next(&dup_iter);
5581 dup != NULL;
5582 dup = nxt_fields_next(&dup_iter))
5583 {
5584 if (dup->name_length != field->name_length
5585 || dup->skip
5586 || dup->hash != field->hash
5587 || nxt_memcasecmp(dup->name, field->name, dup->name_length))
5588 {
5589 continue;
5590 }
5591
5592 p = nxt_cpymem(p, ", ", 2);
5593 p = nxt_cpymem(p, dup->value, dup->value_length);
5594
5595 dst_field->value_length += 2 + dup->value_length;
5596
5597 dup->skip = 1;
5598 }
5599 }
5600
5601 *p++ = '\0';
5602
5603 dst_field++;
5604 }
5605
5606 req->fields_count = (uint32_t) (dst_field - req->fields);
5607
5608 nxt_unit_sptr_set(&req->preread_content, out->mem.free);
5609
5610 buf = out;
5611 tail = &buf->next;
5612
5613 for (b = r->body; b != NULL; b = b->next) {
5614 size = nxt_buf_mem_used_size(&b->mem);
5615 pos = b->mem.pos;
5616
5617 while (size > 0) {
5618 if (buf == NULL) {
5619 free_size = nxt_min(size, PORT_MMAP_DATA_SIZE);
5620
5621 buf = nxt_port_mmap_get_buf(task, &app->outgoing, free_size);
5622 if (nxt_slow_path(buf == NULL)) {
5623 while (out != NULL) {
5624 buf = out->next;
5625 out->next = NULL;
5626 out->completion_handler(task, out, out->parent);
5627 out = buf;
5628 }
5629 return NULL;
5630 }
5631
5632 *tail = buf;
5633 tail = &buf->next;
5634
5635 } else {
5636 free_size = nxt_buf_mem_free_size(&buf->mem);
5637 if (free_size < size
5638 && nxt_port_mmap_increase_buf(task, buf, size, 1)
5639 == NXT_OK)
5640 {
5641 free_size = nxt_buf_mem_free_size(&buf->mem);
5642 }
5643 }
5644
5645 if (free_size > 0) {
5646 copy_size = nxt_min(free_size, size);
5647
5648 buf->mem.free = nxt_cpymem(buf->mem.free, pos, copy_size);
5649
5650 size -= copy_size;
5651 pos += copy_size;
5652
5653 if (size == 0) {
5654 break;
5655 }
5656 }
5657
5658 buf = NULL;
5659 }
5660 }
5661
5662 return out;
5663 }
5664
5665
5666 static void
nxt_router_app_timeout(nxt_task_t * task,void * obj,void * data)5667 nxt_router_app_timeout(nxt_task_t *task, void *obj, void *data)
5668 {
5669 nxt_timer_t *timer;
5670 nxt_http_request_t *r;
5671 nxt_request_rpc_data_t *req_rpc_data;
5672
5673 timer = obj;
5674
5675 nxt_debug(task, "router app timeout");
5676
5677 r = nxt_timer_data(timer, nxt_http_request_t, timer);
5678 req_rpc_data = r->timer_data;
5679
5680 nxt_http_request_error(task, r, NXT_HTTP_SERVICE_UNAVAILABLE);
5681
5682 nxt_request_rpc_data_unlink(task, req_rpc_data);
5683 }
5684
5685
5686 static void
nxt_router_http_request_release_post(nxt_task_t * task,nxt_http_request_t * r)5687 nxt_router_http_request_release_post(nxt_task_t *task, nxt_http_request_t *r)
5688 {
5689 r->timer.handler = nxt_router_http_request_release;
5690 nxt_timer_add(task->thread->engine, &r->timer, 0);
5691 }
5692
5693
5694 static void
nxt_router_http_request_release(nxt_task_t * task,void * obj,void * data)5695 nxt_router_http_request_release(nxt_task_t *task, void *obj, void *data)
5696 {
5697 nxt_http_request_t *r;
5698
5699 nxt_debug(task, "http request pool release");
5700
5701 r = nxt_timer_data(obj, nxt_http_request_t, timer);
5702
5703 nxt_mp_release(r->mem_pool);
5704 }
5705
5706
5707 static void
nxt_router_oosm_handler(nxt_task_t * task,nxt_port_recv_msg_t * msg)5708 nxt_router_oosm_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
5709 {
5710 size_t mi;
5711 uint32_t i;
5712 nxt_bool_t ack;
5713 nxt_process_t *process;
5714 nxt_free_map_t *m;
5715 nxt_port_mmap_handler_t *mmap_handler;
5716
5717 nxt_debug(task, "oosm in %PI", msg->port_msg.pid);
5718
5719 process = nxt_runtime_process_find(task->thread->runtime,
5720 msg->port_msg.pid);
5721 if (nxt_slow_path(process == NULL)) {
5722 return;
5723 }
5724
5725 ack = 0;
5726
5727 /*
5728 * To mitigate possible racing condition (when OOSM message received
5729 * after some of the memory was already freed), need to try to find
5730 * first free segment in shared memory and send ACK if found.
5731 */
5732
5733 nxt_thread_mutex_lock(&process->incoming.mutex);
5734
5735 for (i = 0; i < process->incoming.size; i++) {
5736 mmap_handler = process->incoming.elts[i].mmap_handler;
5737
5738 if (nxt_slow_path(mmap_handler == NULL)) {
5739 continue;
5740 }
5741
5742 m = mmap_handler->hdr->free_map;
5743
5744 for (mi = 0; mi < MAX_FREE_IDX; mi++) {
5745 if (m[mi] != 0) {
5746 ack = 1;
5747
5748 nxt_debug(task, "oosm: already free #%uD %uz = 0x%08xA",
5749 i, mi, m[mi]);
5750
5751 break;
5752 }
5753 }
5754 }
5755
5756 nxt_thread_mutex_unlock(&process->incoming.mutex);
5757
5758 if (ack) {
5759 nxt_process_broadcast_shm_ack(task, process);
5760 }
5761 }
5762
5763
5764 static void
nxt_router_get_mmap_handler(nxt_task_t * task,nxt_port_recv_msg_t * msg)5765 nxt_router_get_mmap_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
5766 {
5767 nxt_fd_t fd;
5768 nxt_port_t *port;
5769 nxt_runtime_t *rt;
5770 nxt_port_mmaps_t *mmaps;
5771 nxt_port_msg_get_mmap_t *get_mmap_msg;
5772 nxt_port_mmap_handler_t *mmap_handler;
5773
5774 rt = task->thread->runtime;
5775
5776 port = nxt_runtime_port_find(rt, msg->port_msg.pid,
5777 msg->port_msg.reply_port);
5778 if (nxt_slow_path(port == NULL)) {
5779 nxt_alert(task, "get_mmap_handler: reply_port %PI:%d not found",
5780 msg->port_msg.pid, msg->port_msg.reply_port);
5781
5782 return;
5783 }
5784
5785 if (nxt_slow_path(nxt_buf_used_size(msg->buf)
5786 < (int) sizeof(nxt_port_msg_get_mmap_t)))
5787 {
5788 nxt_alert(task, "get_mmap_handler: message buffer too small (%d)",
5789 (int) nxt_buf_used_size(msg->buf));
5790
5791 return;
5792 }
5793
5794 get_mmap_msg = (nxt_port_msg_get_mmap_t *) msg->buf->mem.pos;
5795
5796 nxt_assert(port->type == NXT_PROCESS_APP);
5797
5798 if (nxt_slow_path(port->app == NULL)) {
5799 nxt_alert(task, "get_mmap_handler: app == NULL for reply port %PI:%d",
5800 port->pid, port->id);
5801
5802 // FIXME
5803 nxt_port_socket_write(task, port, NXT_PORT_MSG_RPC_ERROR,
5804 -1, msg->port_msg.stream, 0, NULL);
5805
5806 return;
5807 }
5808
5809 mmaps = &port->app->outgoing;
5810 nxt_thread_mutex_lock(&mmaps->mutex);
5811
5812 if (nxt_slow_path(get_mmap_msg->id >= mmaps->size)) {
5813 nxt_thread_mutex_unlock(&mmaps->mutex);
5814
5815 nxt_alert(task, "get_mmap_handler: mmap id is too big (%d)",
5816 (int) get_mmap_msg->id);
5817
5818 // FIXME
5819 nxt_port_socket_write(task, port, NXT_PORT_MSG_RPC_ERROR,
5820 -1, msg->port_msg.stream, 0, NULL);
5821 return;
5822 }
5823
5824 mmap_handler = mmaps->elts[get_mmap_msg->id].mmap_handler;
5825
5826 fd = mmap_handler->fd;
5827
5828 nxt_thread_mutex_unlock(&mmaps->mutex);
5829
5830 nxt_debug(task, "get mmap %PI:%d found",
5831 msg->port_msg.pid, (int) get_mmap_msg->id);
5832
5833 (void) nxt_port_socket_write(task, port, NXT_PORT_MSG_MMAP, fd, 0, 0, NULL);
5834 }
5835
5836
5837 static void
nxt_router_get_port_handler(nxt_task_t * task,nxt_port_recv_msg_t * msg)5838 nxt_router_get_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
5839 {
5840 nxt_port_t *port, *reply_port;
5841 nxt_runtime_t *rt;
5842 nxt_port_msg_get_port_t *get_port_msg;
5843
5844 rt = task->thread->runtime;
5845
5846 reply_port = nxt_runtime_port_find(rt, msg->port_msg.pid,
5847 msg->port_msg.reply_port);
5848 if (nxt_slow_path(reply_port == NULL)) {
5849 nxt_alert(task, "get_port_handler: reply_port %PI:%d not found",
5850 msg->port_msg.pid, msg->port_msg.reply_port);
5851
5852 return;
5853 }
5854
5855 if (nxt_slow_path(nxt_buf_used_size(msg->buf)
5856 < (int) sizeof(nxt_port_msg_get_port_t)))
5857 {
5858 nxt_alert(task, "get_port_handler: message buffer too small (%d)",
5859 (int) nxt_buf_used_size(msg->buf));
5860
5861 return;
5862 }
5863
5864 get_port_msg = (nxt_port_msg_get_port_t *) msg->buf->mem.pos;
5865
5866 port = nxt_runtime_port_find(rt, get_port_msg->pid, get_port_msg->id);
5867 if (nxt_slow_path(port == NULL)) {
5868 nxt_alert(task, "get_port_handler: port %PI:%d not found",
5869 get_port_msg->pid, get_port_msg->id);
5870
5871 return;
5872 }
5873
5874 nxt_debug(task, "get port %PI:%d found", get_port_msg->pid,
5875 get_port_msg->id);
5876
5877 (void) nxt_port_send_port(task, reply_port, port, msg->port_msg.stream);
5878 }
5879