nxt_router.c (367:1a733ff177d7) nxt_router.c (386:d9e23ae1617d)
1
2/*
3 * Copyright (C) Igor Sysoev
4 * Copyright (C) Valentin V. Bartenev
5 * Copyright (C) NGINX, Inc.
6 */
7
8#include <nxt_router.h>
9#include <nxt_conf.h>
10
11
12typedef struct {
13 nxt_str_t type;
14 uint32_t workers;
15 nxt_msec_t timeout;
16 uint32_t requests;
17 nxt_conf_value_t *limits_value;
18} nxt_router_app_conf_t;
19
20
21typedef struct {
22 nxt_str_t application;
23} nxt_router_listener_conf_t;
24
25
26typedef struct nxt_req_app_link_s nxt_req_app_link_t;
27
28
29typedef struct {
30 uint32_t stream;
31 nxt_conn_t *conn;
32 nxt_app_t *app;
33 nxt_port_t *app_port;
34 nxt_app_parse_ctx_t *ap;
35 nxt_req_app_link_t *ra;
36
37 nxt_queue_link_t link; /* for nxt_conn_t.requests */
38} nxt_req_conn_link_t;
39
40
41struct nxt_req_app_link_s {
42 uint32_t stream;
43 nxt_port_t *app_port;
44 nxt_pid_t app_pid;
45 nxt_port_t *reply_port;
46 nxt_app_parse_ctx_t *ap;
47 nxt_req_conn_link_t *rc;
48
49 nxt_queue_link_t link; /* for nxt_app_t.requests */
50
51 nxt_mp_t *mem_pool;
52 nxt_work_t work;
53
54 int err_code;
55 const char *err_str;
56};
57
58
59typedef struct {
60 nxt_socket_conf_t *socket_conf;
61 nxt_router_temp_conf_t *temp_conf;
62} nxt_socket_rpc_t;
63
64
65static nxt_int_t nxt_router_start_worker(nxt_task_t *task, nxt_app_t *app);
66
67static void nxt_router_ra_error(nxt_task_t *task, nxt_req_app_link_t *ra,
68 int code, const char* str);
69
70static nxt_router_temp_conf_t *nxt_router_temp_conf(nxt_task_t *task);
71static void nxt_router_conf_apply(nxt_task_t *task, void *obj, void *data);
72static void nxt_router_conf_ready(nxt_task_t *task,
73 nxt_router_temp_conf_t *tmcf);
74static void nxt_router_conf_error(nxt_task_t *task,
75 nxt_router_temp_conf_t *tmcf);
76static void nxt_router_conf_send(nxt_task_t *task,
77 nxt_router_temp_conf_t *tmcf, nxt_port_msg_type_t type);
78
79static nxt_int_t nxt_router_conf_create(nxt_task_t *task,
80 nxt_router_temp_conf_t *tmcf, u_char *start, u_char *end);
81static nxt_app_t *nxt_router_app_find(nxt_queue_t *queue, nxt_str_t *name);
82static nxt_app_t *nxt_router_listener_application(nxt_router_temp_conf_t *tmcf,
83 nxt_str_t *name);
84static void nxt_router_listen_socket_rpc_create(nxt_task_t *task,
85 nxt_router_temp_conf_t *tmcf, nxt_socket_conf_t *skcf);
86static void nxt_router_listen_socket_ready(nxt_task_t *task,
87 nxt_port_recv_msg_t *msg, void *data);
88static void nxt_router_listen_socket_error(nxt_task_t *task,
89 nxt_port_recv_msg_t *msg, void *data);
90static nxt_socket_conf_t *nxt_router_socket_conf(nxt_task_t *task,
91 nxt_router_temp_conf_t *tmcf, nxt_str_t *name);
92static nxt_int_t nxt_router_listen_socket_find(nxt_router_temp_conf_t *tmcf,
93 nxt_socket_conf_t *nskcf, nxt_sockaddr_t *sa);
94
95static nxt_int_t nxt_router_engines_create(nxt_task_t *task,
96 nxt_router_t *router, nxt_router_temp_conf_t *tmcf,
97 const nxt_event_interface_t *interface);
98static nxt_int_t nxt_router_engine_conf_create(nxt_router_temp_conf_t *tmcf,
99 nxt_router_engine_conf_t *recf);
100static nxt_int_t nxt_router_engine_conf_update(nxt_router_temp_conf_t *tmcf,
101 nxt_router_engine_conf_t *recf);
102static nxt_int_t nxt_router_engine_conf_delete(nxt_router_temp_conf_t *tmcf,
103 nxt_router_engine_conf_t *recf);
104static nxt_int_t nxt_router_engine_joints_create(nxt_router_temp_conf_t *tmcf,
105 nxt_router_engine_conf_t *recf, nxt_queue_t *sockets,
106 nxt_work_handler_t handler);
107static nxt_int_t nxt_router_engine_quit(nxt_router_temp_conf_t *tmcf,
108 nxt_router_engine_conf_t *recf);
109static nxt_int_t nxt_router_engine_joints_delete(nxt_router_temp_conf_t *tmcf,
110 nxt_router_engine_conf_t *recf, nxt_queue_t *sockets);
111
112static nxt_int_t nxt_router_threads_create(nxt_task_t *task, nxt_runtime_t *rt,
113 nxt_router_temp_conf_t *tmcf);
114static nxt_int_t nxt_router_thread_create(nxt_task_t *task, nxt_runtime_t *rt,
115 nxt_event_engine_t *engine);
116static void nxt_router_apps_sort(nxt_task_t *task, nxt_router_t *router,
117 nxt_router_temp_conf_t *tmcf);
118
119static void nxt_router_engines_post(nxt_router_t *router,
120 nxt_router_temp_conf_t *tmcf);
121static void nxt_router_engine_post(nxt_event_engine_t *engine,
122 nxt_work_t *jobs);
123
124static void nxt_router_thread_start(void *data);
125static void nxt_router_listen_socket_create(nxt_task_t *task, void *obj,
126 void *data);
127static void nxt_router_listen_socket_update(nxt_task_t *task, void *obj,
128 void *data);
129static void nxt_router_listen_socket_delete(nxt_task_t *task, void *obj,
130 void *data);
131static void nxt_router_worker_thread_quit(nxt_task_t *task, void *obj,
132 void *data);
133static void nxt_router_listen_socket_close(nxt_task_t *task, void *obj,
134 void *data);
135static void nxt_router_thread_exit_handler(nxt_task_t *task, void *obj,
136 void *data);
137static void nxt_router_listen_socket_release(nxt_task_t *task,
138 nxt_socket_conf_t *skcf);
139static void nxt_router_conf_release(nxt_task_t *task,
140 nxt_socket_conf_joint_t *joint);
141
142static void nxt_router_app_port_ready(nxt_task_t *task,
143 nxt_port_recv_msg_t *msg, void *data);
144static void nxt_router_app_port_error(nxt_task_t *task,
145 nxt_port_recv_msg_t *msg, void *data);
146
147static nxt_port_t * nxt_router_app_get_idle_port(nxt_app_t *app);
148static void nxt_router_app_port_release(nxt_task_t *task, nxt_port_t *port,
149 uint32_t request_failed, uint32_t got_response);
150
151static void nxt_router_conn_init(nxt_task_t *task, void *obj, void *data);
152static void nxt_router_conn_http_header_parse(nxt_task_t *task, void *obj,
153 void *data);
154static nxt_sockaddr_t *nxt_router_local_addr(nxt_task_t *task, nxt_conn_t *c);
155static void nxt_router_conn_http_body_read(nxt_task_t *task, void *obj,
156 void *data);
157static void nxt_router_process_http_request(nxt_task_t *task,
158 nxt_conn_t *c, nxt_app_parse_ctx_t *ap);
159static void nxt_router_process_http_request_mp(nxt_task_t *task,
160 nxt_req_app_link_t *ra);
161static nxt_int_t nxt_python_prepare_msg(nxt_task_t *task, nxt_app_request_t *r,
162 nxt_app_wmsg_t *wmsg);
163static nxt_int_t nxt_php_prepare_msg(nxt_task_t *task, nxt_app_request_t *r,
164 nxt_app_wmsg_t *wmsg);
165static nxt_int_t nxt_go_prepare_msg(nxt_task_t *task, nxt_app_request_t *r,
166 nxt_app_wmsg_t *wmsg);
167static void nxt_router_conn_ready(nxt_task_t *task, void *obj, void *data);
168static void nxt_router_conn_close(nxt_task_t *task, void *obj, void *data);
169static void nxt_router_conn_free(nxt_task_t *task, void *obj, void *data);
170static void nxt_router_conn_error(nxt_task_t *task, void *obj, void *data);
171static void nxt_router_conn_timeout(nxt_task_t *task, void *obj, void *data);
172static void nxt_router_app_timeout(nxt_task_t *task, void *obj, void *data);
173static nxt_msec_t nxt_router_conn_timeout_value(nxt_conn_t *c, uintptr_t data);
174
175static void nxt_router_gen_error(nxt_task_t *task, nxt_conn_t *c, int code,
176 const char* str);
177
178static nxt_router_t *nxt_router;
179
180
181static nxt_app_prepare_msg_t nxt_app_prepare_msg[] = {
182 nxt_python_prepare_msg,
183 nxt_php_prepare_msg,
184 nxt_go_prepare_msg,
185};
186
187
188nxt_int_t
189nxt_router_start(nxt_task_t *task, void *data)
190{
191 nxt_int_t ret;
192 nxt_router_t *router;
193 nxt_runtime_t *rt;
194
195 rt = task->thread->runtime;
196
197 ret = nxt_app_http_init(task, rt);
198 if (nxt_slow_path(ret != NXT_OK)) {
199 return ret;
200 }
201
202 router = nxt_zalloc(sizeof(nxt_router_t));
203 if (nxt_slow_path(router == NULL)) {
204 return NXT_ERROR;
205 }
206
207 nxt_queue_init(&router->engines);
208 nxt_queue_init(&router->sockets);
209 nxt_queue_init(&router->apps);
210
211 nxt_router = router;
212
213 return NXT_OK;
214}
215
216
217static void
218nxt_router_start_worker_handler(nxt_task_t *task, nxt_port_t *port, void *data)
219{
220 size_t size;
221 uint32_t stream;
222 nxt_app_t *app;
223 nxt_buf_t *b;
224 nxt_port_t *main_port;
225 nxt_runtime_t *rt;
226
227 app = data;
228
229 rt = task->thread->runtime;
230 main_port = rt->port_by_type[NXT_PROCESS_MAIN];
231
232 nxt_debug(task, "app '%V' %p start worker", &app->name, app);
233
234 size = app->name.length + 1 + app->conf.length;
235
236 b = nxt_buf_mem_ts_alloc(task, task->thread->engine->mem_pool, size);
237
238 if (nxt_slow_path(b == NULL)) {
239 goto failed;
240 }
241
242 nxt_buf_cpystr(b, &app->name);
243 *b->mem.free++ = '\0';
244 nxt_buf_cpystr(b, &app->conf);
245
246 stream = nxt_port_rpc_register_handler(task, port,
247 nxt_router_app_port_ready,
248 nxt_router_app_port_error,
249 -1, app);
250
251 if (nxt_slow_path(stream == 0)) {
252 nxt_mp_release(b->data, b);
253
254 goto failed;
255 }
256
257 nxt_port_socket_write(task, main_port, NXT_PORT_MSG_START_WORKER, -1,
258 stream, port->id, b);
259
260 return;
261
262failed:
263
264 nxt_thread_mutex_lock(&app->mutex);
265
266 app->pending_workers--;
267
268 nxt_thread_mutex_unlock(&app->mutex);
269
270 nxt_router_app_use(task, app, -1);
271}
272
273
274static nxt_int_t
275nxt_router_start_worker(nxt_task_t *task, nxt_app_t *app)
276{
277 nxt_int_t res;
278 nxt_port_t *router_port;
279 nxt_runtime_t *rt;
280
281 rt = task->thread->runtime;
282 router_port = rt->port_by_type[NXT_PROCESS_ROUTER];
283
284 nxt_router_app_use(task, app, 1);
285
286 res = nxt_port_post(task, router_port, nxt_router_start_worker_handler,
287 app);
288
289 if (res == NXT_OK) {
290 return res;
291 }
292
293 nxt_thread_mutex_lock(&app->mutex);
294
295 app->pending_workers--;
296
297 nxt_thread_mutex_unlock(&app->mutex);
298
299 nxt_router_app_use(task, app, -1);
300
301 return NXT_ERROR;
302}
303
304
305nxt_inline void
306nxt_router_ra_init(nxt_task_t *task, nxt_req_app_link_t *ra,
307 nxt_req_conn_link_t *rc)
308{
309 nxt_event_engine_t *engine;
310
311 engine = task->thread->engine;
312
313 nxt_memzero(ra, sizeof(nxt_req_app_link_t));
314
315 ra->stream = rc->stream;
316 ra->app_pid = -1;
317 ra->rc = rc;
318 rc->ra = ra;
319 ra->reply_port = engine->port;
320 ra->ap = rc->ap;
321
322 ra->work.handler = NULL;
323 ra->work.task = &engine->task;
324 ra->work.obj = ra;
325 ra->work.data = engine;
326}
327
328
329nxt_inline nxt_req_app_link_t *
330nxt_router_ra_create(nxt_task_t *task, nxt_req_app_link_t *ra_src)
331{
332 nxt_mp_t *mp;
333 nxt_req_app_link_t *ra;
334
335 mp = ra_src->ap->mem_pool;
336
337 ra = nxt_mp_retain(mp, sizeof(nxt_req_app_link_t));
338
339 if (nxt_slow_path(ra == NULL)) {
340
341 ra_src->rc->ra = NULL;
342 ra_src->rc = NULL;
343
344 return NULL;
345 }
346
347 nxt_router_ra_init(task, ra, ra_src->rc);
348
349 ra->mem_pool = mp;
350
351 return ra;
352}
353
354
355static void
356nxt_router_ra_release(nxt_task_t *task, void *obj, void *data)
357{
358 nxt_req_app_link_t *ra;
359 nxt_event_engine_t *engine;
360 nxt_req_conn_link_t *rc;
361
362 ra = obj;
363 engine = data;
364
365 if (task->thread->engine != engine) {
366 if (ra->app_port != NULL) {
367 ra->app_pid = ra->app_port->pid;
368 }
369
370 ra->work.handler = nxt_router_ra_release;
371 ra->work.task = &engine->task;
372 ra->work.next = NULL;
373
374 nxt_debug(task, "ra stream #%uD post release to %p",
375 ra->stream, engine);
376
377 nxt_event_engine_post(engine, &ra->work);
378
379 return;
380 }
381
382 nxt_debug(task, "ra stream #%uD release", ra->stream);
383
384 rc = ra->rc;
385
386 if (rc != NULL) {
387 if (ra->app_pid != -1) {
388 nxt_port_rpc_ex_set_peer(task, engine->port, rc, ra->app_pid);
389 }
390
391 rc->app_port = ra->app_port;
392
393 ra->app_port = NULL;
394 rc->ra = NULL;
395 ra->rc = NULL;
396 }
397
398 if (ra->app_port != NULL) {
399 nxt_router_app_port_release(task, ra->app_port, 0, 1);
400
401 ra->app_port = NULL;
402 }
403
404 if (ra->mem_pool != NULL) {
405 nxt_mp_release(ra->mem_pool, ra);
406 }
407}
408
409
410static void
411nxt_router_ra_abort(nxt_task_t *task, void *obj, void *data)
412{
413 nxt_conn_t *c;
414 nxt_req_app_link_t *ra;
415 nxt_req_conn_link_t *rc;
416 nxt_event_engine_t *engine;
417
418 ra = obj;
419 engine = data;
420
421 if (task->thread->engine != engine) {
422 ra->work.handler = nxt_router_ra_abort;
423 ra->work.task = &engine->task;
424 ra->work.next = NULL;
425
426 nxt_debug(task, "ra stream #%uD post abort to %p", ra->stream, engine);
427
428 nxt_event_engine_post(engine, &ra->work);
429
430 return;
431 }
432
433 nxt_debug(task, "ra stream #%uD abort", ra->stream);
434
435 rc = ra->rc;
436
437 if (rc != NULL) {
438 c = rc->conn;
439
440 nxt_router_gen_error(task, c, 500,
441 "Failed to start application worker");
442
443 rc->ra = NULL;
444 ra->rc = NULL;
445 }
446
447 if (ra->app_port != NULL) {
448 nxt_router_app_port_release(task, ra->app_port, 0, 1);
449
450 ra->app_port = NULL;
451 }
452
453 if (ra->mem_pool != NULL) {
454 nxt_mp_release(ra->mem_pool, ra);
455 }
456}
457
458
459static void
460nxt_router_ra_error_handler(nxt_task_t *task, void *obj, void *data)
461{
462 nxt_req_app_link_t *ra;
463
464 ra = obj;
465
466 nxt_router_ra_error(task, ra, ra->err_code, ra->err_str);
467}
468
469
470static void
471nxt_router_ra_error(nxt_task_t *task, nxt_req_app_link_t *ra, int code,
472 const char* str)
473{
474 nxt_conn_t *c;
475 nxt_req_conn_link_t *rc;
476 nxt_event_engine_t *engine;
477
478 engine = ra->work.data;
479
480 if (task->thread->engine != engine) {
481 ra->err_code = code;
482 ra->err_str = str;
483
484 ra->work.handler = nxt_router_ra_error_handler;
485 ra->work.task = &engine->task;
486 ra->work.next = NULL;
487
488 nxt_debug(task, "ra stream #%uD post error to %p", ra->stream, engine);
489
490 nxt_event_engine_post(engine, &ra->work);
491
492 return;
493 }
494
495 nxt_debug(task, "ra stream #%uD error", ra->stream);
496
497 rc = ra->rc;
498
499 if (rc != NULL) {
500 c = rc->conn;
501
502 nxt_router_gen_error(task, c, code, str);
503
504 rc->ra = NULL;
505 ra->rc = NULL;
506 }
507
508 if (ra->app_port != NULL) {
509 nxt_router_app_port_release(task, ra->app_port, 0, 1);
510
511 ra->app_port = NULL;
512 }
513
514 if (ra->mem_pool != NULL) {
515 nxt_mp_release(ra->mem_pool, ra);
516 }
517}
518
519
520nxt_inline void
521nxt_router_rc_unlink(nxt_task_t *task, nxt_req_conn_link_t *rc)
522{
523 nxt_req_app_link_t *ra;
524
525 if (rc->app_port != NULL) {
526 nxt_router_app_port_release(task, rc->app_port, 0, 1);
527
528 rc->app_port = NULL;
529 }
530
531 ra = rc->ra;
532
533 if (ra != NULL) {
534 rc->ra = NULL;
535 ra->rc = NULL;
536
537 nxt_thread_mutex_lock(&rc->app->mutex);
538
539 if (ra->link.next != NULL) {
540 nxt_queue_remove(&ra->link);
541
542 ra->link.next = NULL;
543
544 } else {
545 ra = NULL;
546 }
547
548 nxt_thread_mutex_unlock(&rc->app->mutex);
549 }
550
551 if (ra != NULL) {
552 nxt_router_ra_release(task, ra, ra->work.data);
553 }
554
555 if (rc->app != NULL) {
556 nxt_router_app_use(task, rc->app, -1);
557
558 rc->app = NULL;
559 }
560
561 if (rc->ap != NULL) {
562 nxt_app_http_req_done(task, rc->ap);
563
564 rc->ap = NULL;
565 }
566
567 nxt_queue_remove(&rc->link);
568
569 rc->conn = NULL;
570}
571
572
573void
574nxt_router_new_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
575{
576 nxt_port_new_port_handler(task, msg);
577
578 if (msg->port_msg.stream == 0) {
579 return;
580 }
581
582 if (msg->u.new_port == NULL ||
583 msg->u.new_port->type != NXT_PROCESS_WORKER)
584 {
585 msg->port_msg.type = _NXT_PORT_MSG_RPC_ERROR;
586 }
587
588 nxt_port_rpc_handler(task, msg);
589}
590
591
592void
593nxt_router_conf_data_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
594{
595 nxt_int_t ret;
596 nxt_buf_t *b;
597 nxt_router_temp_conf_t *tmcf;
598
599 tmcf = nxt_router_temp_conf(task);
600 if (nxt_slow_path(tmcf == NULL)) {
601 return;
602 }
603
604 b = nxt_buf_chk_make_plain(tmcf->conf->mem_pool, msg->buf, msg->size);
605
606 nxt_assert(b != NULL);
607
608 tmcf->conf->router = nxt_router;
609 tmcf->stream = msg->port_msg.stream;
610 tmcf->port = nxt_runtime_port_find(task->thread->runtime,
611 msg->port_msg.pid,
612 msg->port_msg.reply_port);
613
614 ret = nxt_router_conf_create(task, tmcf, b->mem.pos, b->mem.free);
615
616 if (nxt_fast_path(ret == NXT_OK)) {
617 nxt_router_conf_apply(task, tmcf, NULL);
618
619 } else {
620 nxt_router_conf_error(task, tmcf);
621 }
622}
623
624
625static void
626nxt_router_worker_remove_pid(nxt_task_t *task, nxt_port_t *port, void *data)
627{
628 union {
629 nxt_pid_t removed_pid;
630 void *data;
631 } u;
632
633 u.data = data;
634
635 nxt_port_rpc_remove_peer(task, port, u.removed_pid);
636}
637
638
639void
640nxt_router_remove_pid_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
641{
642 nxt_event_engine_t *engine;
643
644 nxt_port_remove_pid_handler(task, msg);
645
646 if (msg->port_msg.stream == 0) {
647 return;
648 }
649
650 nxt_queue_each(engine, &nxt_router->engines, nxt_event_engine_t, link0)
651 {
652 nxt_port_post(task, engine->port, nxt_router_worker_remove_pid,
653 msg->u.data);
654 }
655 nxt_queue_loop;
656
657 msg->port_msg.type = _NXT_PORT_MSG_RPC_ERROR;
658
659 nxt_port_rpc_handler(task, msg);
660}
661
662
663static nxt_router_temp_conf_t *
664nxt_router_temp_conf(nxt_task_t *task)
665{
666 nxt_mp_t *mp, *tmp;
667 nxt_router_conf_t *rtcf;
668 nxt_router_temp_conf_t *tmcf;
669
670 mp = nxt_mp_create(1024, 128, 256, 32);
671 if (nxt_slow_path(mp == NULL)) {
672 return NULL;
673 }
674
675 rtcf = nxt_mp_zget(mp, sizeof(nxt_router_conf_t));
676 if (nxt_slow_path(rtcf == NULL)) {
677 goto fail;
678 }
679
680 rtcf->mem_pool = mp;
681
682 tmp = nxt_mp_create(1024, 128, 256, 32);
683 if (nxt_slow_path(tmp == NULL)) {
684 goto fail;
685 }
686
687 tmcf = nxt_mp_zget(tmp, sizeof(nxt_router_temp_conf_t));
688 if (nxt_slow_path(tmcf == NULL)) {
689 goto temp_fail;
690 }
691
692 tmcf->mem_pool = tmp;
693 tmcf->conf = rtcf;
694 tmcf->count = 1;
695 tmcf->engine = task->thread->engine;
696
697 tmcf->engines = nxt_array_create(tmcf->mem_pool, 4,
698 sizeof(nxt_router_engine_conf_t));
699 if (nxt_slow_path(tmcf->engines == NULL)) {
700 goto temp_fail;
701 }
702
703 nxt_queue_init(&tmcf->deleting);
704 nxt_queue_init(&tmcf->keeping);
705 nxt_queue_init(&tmcf->updating);
706 nxt_queue_init(&tmcf->pending);
707 nxt_queue_init(&tmcf->creating);
708 nxt_queue_init(&tmcf->apps);
709 nxt_queue_init(&tmcf->previous);
710
711 return tmcf;
712
713temp_fail:
714
715 nxt_mp_destroy(tmp);
716
717fail:
718
719 nxt_mp_destroy(mp);
720
721 return NULL;
722}
723
724
725static void
726nxt_router_conf_apply(nxt_task_t *task, void *obj, void *data)
727{
728 nxt_int_t ret;
729 nxt_router_t *router;
730 nxt_runtime_t *rt;
731 nxt_queue_link_t *qlk;
732 nxt_socket_conf_t *skcf;
733 nxt_router_temp_conf_t *tmcf;
734 const nxt_event_interface_t *interface;
735
736 tmcf = obj;
737
738 qlk = nxt_queue_first(&tmcf->pending);
739
740 if (qlk != nxt_queue_tail(&tmcf->pending)) {
741 nxt_queue_remove(qlk);
742 nxt_queue_insert_tail(&tmcf->creating, qlk);
743
744 skcf = nxt_queue_link_data(qlk, nxt_socket_conf_t, link);
745
746 nxt_router_listen_socket_rpc_create(task, tmcf, skcf);
747
748 return;
749 }
750
751 rt = task->thread->runtime;
752
753 interface = nxt_service_get(rt->services, "engine", NULL);
754
755 router = tmcf->conf->router;
756
757 ret = nxt_router_engines_create(task, router, tmcf, interface);
758 if (nxt_slow_path(ret != NXT_OK)) {
759 goto fail;
760 }
761
762 ret = nxt_router_threads_create(task, rt, tmcf);
763 if (nxt_slow_path(ret != NXT_OK)) {
764 goto fail;
765 }
766
767 nxt_router_apps_sort(task, router, tmcf);
768
769 nxt_router_engines_post(router, tmcf);
770
771 nxt_queue_add(&router->sockets, &tmcf->updating);
772 nxt_queue_add(&router->sockets, &tmcf->creating);
773
774 nxt_router_conf_ready(task, tmcf);
775
776 return;
777
778fail:
779
780 nxt_router_conf_error(task, tmcf);
781
782 return;
783}
784
785
786static void
787nxt_router_conf_wait(nxt_task_t *task, void *obj, void *data)
788{
789 nxt_joint_job_t *job;
790
791 job = obj;
792
793 nxt_router_conf_ready(task, job->tmcf);
794}
795
796
797static void
798nxt_router_conf_ready(nxt_task_t *task, nxt_router_temp_conf_t *tmcf)
799{
800 nxt_debug(task, "temp conf count:%D", tmcf->count);
801
802 if (--tmcf->count == 0) {
803 nxt_router_conf_send(task, tmcf, NXT_PORT_MSG_RPC_READY_LAST);
804 }
805}
806
807
808static void
809nxt_router_conf_error(nxt_task_t *task, nxt_router_temp_conf_t *tmcf)
810{
811 nxt_socket_t s;
812 nxt_router_t *router;
813 nxt_queue_link_t *qlk;
814 nxt_socket_conf_t *skcf;
815
816 nxt_log(task, NXT_LOG_CRIT, "failed to apply new conf");
817
818 for (qlk = nxt_queue_first(&tmcf->creating);
819 qlk != nxt_queue_tail(&tmcf->creating);
820 qlk = nxt_queue_next(qlk))
821 {
822 skcf = nxt_queue_link_data(qlk, nxt_socket_conf_t, link);
823 s = skcf->listen->socket;
824
825 if (s != -1) {
826 nxt_socket_close(task, s);
827 }
828
829 nxt_free(skcf->listen);
830 }
831
832 router = tmcf->conf->router;
833
834 nxt_queue_add(&router->sockets, &tmcf->keeping);
835 nxt_queue_add(&router->sockets, &tmcf->deleting);
836
837 // TODO: new engines and threads
838
839 nxt_mp_destroy(tmcf->conf->mem_pool);
840
841 nxt_router_conf_send(task, tmcf, NXT_PORT_MSG_RPC_ERROR);
842}
843
844
845static void
846nxt_router_conf_send(nxt_task_t *task, nxt_router_temp_conf_t *tmcf,
847 nxt_port_msg_type_t type)
848{
849 nxt_port_socket_write(task, tmcf->port, type, -1, tmcf->stream, 0, NULL);
850}
851
852
853static nxt_conf_map_t nxt_router_conf[] = {
854 {
855 nxt_string("listeners_threads"),
856 NXT_CONF_MAP_INT32,
857 offsetof(nxt_router_conf_t, threads),
858 },
859};
860
861
862static nxt_conf_map_t nxt_router_app_conf[] = {
863 {
864 nxt_string("type"),
865 NXT_CONF_MAP_STR,
866 offsetof(nxt_router_app_conf_t, type),
867 },
868
869 {
870 nxt_string("workers"),
871 NXT_CONF_MAP_INT32,
872 offsetof(nxt_router_app_conf_t, workers),
873 },
874
875 {
876 nxt_string("limits"),
877 NXT_CONF_MAP_PTR,
878 offsetof(nxt_router_app_conf_t, limits_value),
879 },
880};
881
882
883static nxt_conf_map_t nxt_router_app_limits_conf[] = {
884 {
885 nxt_string("timeout"),
886 NXT_CONF_MAP_MSEC,
887 offsetof(nxt_router_app_conf_t, timeout),
888 },
889
890 {
891 nxt_string("requests"),
892 NXT_CONF_MAP_INT32,
893 offsetof(nxt_router_app_conf_t, requests),
894 },
895};
896
897
898static nxt_conf_map_t nxt_router_listener_conf[] = {
899 {
900 nxt_string("application"),
901 NXT_CONF_MAP_STR,
902 offsetof(nxt_router_listener_conf_t, application),
903 },
904};
905
906
907static nxt_conf_map_t nxt_router_http_conf[] = {
908 {
909 nxt_string("header_buffer_size"),
910 NXT_CONF_MAP_SIZE,
911 offsetof(nxt_socket_conf_t, header_buffer_size),
912 },
913
914 {
915 nxt_string("large_header_buffer_size"),
916 NXT_CONF_MAP_SIZE,
917 offsetof(nxt_socket_conf_t, large_header_buffer_size),
918 },
919
920 {
921 nxt_string("large_header_buffers"),
922 NXT_CONF_MAP_SIZE,
923 offsetof(nxt_socket_conf_t, large_header_buffers),
924 },
925
926 {
927 nxt_string("body_buffer_size"),
928 NXT_CONF_MAP_SIZE,
929 offsetof(nxt_socket_conf_t, body_buffer_size),
930 },
931
932 {
933 nxt_string("max_body_size"),
934 NXT_CONF_MAP_SIZE,
935 offsetof(nxt_socket_conf_t, max_body_size),
936 },
937
938 {
939 nxt_string("header_read_timeout"),
940 NXT_CONF_MAP_MSEC,
941 offsetof(nxt_socket_conf_t, header_read_timeout),
942 },
943
944 {
945 nxt_string("body_read_timeout"),
946 NXT_CONF_MAP_MSEC,
947 offsetof(nxt_socket_conf_t, body_read_timeout),
948 },
949};
950
951
952static nxt_int_t
953nxt_router_conf_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf,
954 u_char *start, u_char *end)
955{
956 u_char *p;
957 size_t size;
958 nxt_mp_t *mp;
959 uint32_t next;
960 nxt_int_t ret;
961 nxt_str_t name;
962 nxt_app_t *app, *prev;
963 nxt_router_t *router;
964 nxt_conf_value_t *conf, *http;
965 nxt_conf_value_t *applications, *application;
966 nxt_conf_value_t *listeners, *listener;
967 nxt_socket_conf_t *skcf;
968 nxt_app_lang_module_t *lang;
969 nxt_router_app_conf_t apcf;
970 nxt_router_listener_conf_t lscf;
971
972 static nxt_str_t http_path = nxt_string("/http");
973 static nxt_str_t applications_path = nxt_string("/applications");
974 static nxt_str_t listeners_path = nxt_string("/listeners");
975
976 conf = nxt_conf_json_parse(tmcf->mem_pool, start, end, NULL);
977 if (conf == NULL) {
978 nxt_log(task, NXT_LOG_CRIT, "configuration parsing error");
979 return NXT_ERROR;
980 }
981
982 mp = tmcf->conf->mem_pool;
983
984 ret = nxt_conf_map_object(mp, conf, nxt_router_conf,
985 nxt_nitems(nxt_router_conf), tmcf->conf);
986 if (ret != NXT_OK) {
987 nxt_log(task, NXT_LOG_CRIT, "root map error");
988 return NXT_ERROR;
989 }
990
991 if (tmcf->conf->threads == 0) {
992 tmcf->conf->threads = nxt_ncpu;
993 }
994
995 applications = nxt_conf_get_path(conf, &applications_path);
996 if (applications == NULL) {
997 nxt_log(task, NXT_LOG_CRIT, "no \"applications\" block");
998 return NXT_ERROR;
999 }
1000
1001 router = tmcf->conf->router;
1002
1003 next = 0;
1004
1005 for ( ;; ) {
1006 application = nxt_conf_next_object_member(applications, &name, &next);
1007 if (application == NULL) {
1008 break;
1009 }
1010
1011 nxt_debug(task, "application \"%V\"", &name);
1012
1013 size = nxt_conf_json_length(application, NULL);
1014
1015 app = nxt_malloc(sizeof(nxt_app_t) + name.length + size);
1016 if (app == NULL) {
1017 goto fail;
1018 }
1019
1020 nxt_memzero(app, sizeof(nxt_app_t));
1021
1022 app->name.start = nxt_pointer_to(app, sizeof(nxt_app_t));
1023 app->conf.start = nxt_pointer_to(app, sizeof(nxt_app_t) + name.length);
1024
1025 p = nxt_conf_json_print(app->conf.start, application, NULL);
1026 app->conf.length = p - app->conf.start;
1027
1028 nxt_assert(app->conf.length <= size);
1029
1030 nxt_debug(task, "application conf \"%V\"", &app->conf);
1031
1032 prev = nxt_router_app_find(&router->apps, &name);
1033
1034 if (prev != NULL && nxt_strstr_eq(&app->conf, &prev->conf)) {
1035 nxt_free(app);
1036
1037 nxt_queue_remove(&prev->link);
1038 nxt_queue_insert_tail(&tmcf->previous, &prev->link);
1039 continue;
1040 }
1041
1042 apcf.workers = 1;
1043 apcf.timeout = 0;
1044 apcf.requests = 0;
1045 apcf.limits_value = NULL;
1046
1047 ret = nxt_conf_map_object(mp, application, nxt_router_app_conf,
1048 nxt_nitems(nxt_router_app_conf), &apcf);
1049 if (ret != NXT_OK) {
1050 nxt_log(task, NXT_LOG_CRIT, "application map error");
1051 goto app_fail;
1052 }
1053
1054 if (apcf.limits_value != NULL) {
1055
1056 if (nxt_conf_type(apcf.limits_value) != NXT_CONF_OBJECT) {
1057 nxt_log(task, NXT_LOG_CRIT, "application limits is not object");
1058 goto app_fail;
1059 }
1060
1061 ret = nxt_conf_map_object(mp, apcf.limits_value,
1062 nxt_router_app_limits_conf,
1063 nxt_nitems(nxt_router_app_limits_conf),
1064 &apcf);
1065 if (ret != NXT_OK) {
1066 nxt_log(task, NXT_LOG_CRIT, "application limits map error");
1067 goto app_fail;
1068 }
1069 }
1070
1071 nxt_debug(task, "application type: %V", &apcf.type);
1072 nxt_debug(task, "application workers: %D", apcf.workers);
1073 nxt_debug(task, "application timeout: %D", apcf.timeout);
1074 nxt_debug(task, "application requests: %D", apcf.requests);
1075
1076 lang = nxt_app_lang_module(task->thread->runtime, &apcf.type);
1077
1078 if (lang == NULL) {
1079 nxt_log(task, NXT_LOG_CRIT, "unknown application type: \"%V\"",
1080 &apcf.type);
1081 goto app_fail;
1082 }
1083
1084 nxt_debug(task, "application language module: \"%s\"", lang->file);
1085
1086 ret = nxt_thread_mutex_create(&app->mutex);
1087 if (ret != NXT_OK) {
1088 goto app_fail;
1089 }
1090
1091 nxt_queue_init(&app->ports);
1092 nxt_queue_init(&app->requests);
1093
1094 app->name.length = name.length;
1095 nxt_memcpy(app->name.start, name.start, name.length);
1096
1097 app->type = lang->type;
1098 app->max_workers = apcf.workers;
1099 app->timeout = apcf.timeout;
1100 app->live = 1;
1101 app->max_pending_responses = 2;
1102 app->prepare_msg = nxt_app_prepare_msg[lang->type];
1103
1104 nxt_queue_insert_tail(&tmcf->apps, &app->link);
1105
1106 nxt_router_app_use(task, app, 1);
1107 }
1108
1109 http = nxt_conf_get_path(conf, &http_path);
1110#if 0
1111 if (http == NULL) {
1112 nxt_log(task, NXT_LOG_CRIT, "no \"http\" block");
1113 return NXT_ERROR;
1114 }
1115#endif
1116
1117 listeners = nxt_conf_get_path(conf, &listeners_path);
1118 if (listeners == NULL) {
1119 nxt_log(task, NXT_LOG_CRIT, "no \"listeners\" block");
1120 return NXT_ERROR;
1121 }
1122
1123 next = 0;
1124
1125 for ( ;; ) {
1126 listener = nxt_conf_next_object_member(listeners, &name, &next);
1127 if (listener == NULL) {
1128 break;
1129 }
1130
1131 skcf = nxt_router_socket_conf(task, tmcf, &name);
1132 if (skcf == NULL) {
1133 goto fail;
1134 }
1135
1136 ret = nxt_conf_map_object(mp, listener, nxt_router_listener_conf,
1137 nxt_nitems(nxt_router_listener_conf), &lscf);
1138 if (ret != NXT_OK) {
1139 nxt_log(task, NXT_LOG_CRIT, "listener map error");
1140 goto fail;
1141 }
1142
1143 nxt_debug(task, "application: %V", &lscf.application);
1144
1145 // STUB, default values if http block is not defined.
1146 skcf->header_buffer_size = 2048;
1147 skcf->large_header_buffer_size = 8192;
1148 skcf->large_header_buffers = 4;
1149 skcf->body_buffer_size = 16 * 1024;
1150 skcf->max_body_size = 2 * 1024 * 1024;
1151 skcf->header_read_timeout = 5000;
1152 skcf->body_read_timeout = 5000;
1153
1154 if (http != NULL) {
1155 ret = nxt_conf_map_object(mp, http, nxt_router_http_conf,
1156 nxt_nitems(nxt_router_http_conf), skcf);
1157 if (ret != NXT_OK) {
1158 nxt_log(task, NXT_LOG_CRIT, "http map error");
1159 goto fail;
1160 }
1161 }
1162
1163 skcf->listen->handler = nxt_router_conn_init;
1164 skcf->router_conf = tmcf->conf;
1165 skcf->router_conf->count++;
1166 skcf->application = nxt_router_listener_application(tmcf,
1167 &lscf.application);
1168 }
1169
1170 nxt_queue_add(&tmcf->deleting, &router->sockets);
1171 nxt_queue_init(&router->sockets);
1172
1173 return NXT_OK;
1174
1175app_fail:
1176
1177 nxt_free(app);
1178
1179fail:
1180
1181 nxt_queue_each(app, &tmcf->apps, nxt_app_t, link) {
1182
1183 nxt_queue_remove(&app->link);
1184 nxt_thread_mutex_destroy(&app->mutex);
1185 nxt_free(app);
1186
1187 } nxt_queue_loop;
1188
1189 return NXT_ERROR;
1190}
1191
1192
1193static nxt_app_t *
1194nxt_router_app_find(nxt_queue_t *queue, nxt_str_t *name)
1195{
1196 nxt_app_t *app;
1197
1198 nxt_queue_each(app, queue, nxt_app_t, link) {
1199
1200 if (nxt_strstr_eq(name, &app->name)) {
1201 return app;
1202 }
1203
1204 } nxt_queue_loop;
1205
1206 return NULL;
1207}
1208
1209
1210static nxt_app_t *
1211nxt_router_listener_application(nxt_router_temp_conf_t *tmcf, nxt_str_t *name)
1212{
1213 nxt_app_t *app;
1214
1215 app = nxt_router_app_find(&tmcf->apps, name);
1216
1217 if (app == NULL) {
1218 app = nxt_router_app_find(&tmcf->previous, name);
1219 }
1220
1221 return app;
1222}
1223
1224
1225static nxt_socket_conf_t *
1226nxt_router_socket_conf(nxt_task_t *task, nxt_router_temp_conf_t *tmcf,
1227 nxt_str_t *name)
1228{
1229 size_t size;
1230 nxt_int_t ret;
1231 nxt_bool_t wildcard;
1232 nxt_sockaddr_t *sa;
1233 nxt_socket_conf_t *skcf;
1234 nxt_listen_socket_t *ls;
1235
1236 sa = nxt_sockaddr_parse(tmcf->mem_pool, name);
1237 if (nxt_slow_path(sa == NULL)) {
1238 nxt_log(task, NXT_LOG_CRIT, "invalid listener \"%V\"", name);
1239 return NULL;
1240 }
1241
1242 sa->type = SOCK_STREAM;
1243
1244 nxt_debug(task, "router listener: \"%*s\"",
1245 sa->length, nxt_sockaddr_start(sa));
1246
1247 skcf = nxt_mp_zget(tmcf->conf->mem_pool, sizeof(nxt_socket_conf_t));
1248 if (nxt_slow_path(skcf == NULL)) {
1249 return NULL;
1250 }
1251
1252 size = nxt_sockaddr_size(sa);
1253
1254 ret = nxt_router_listen_socket_find(tmcf, skcf, sa);
1255
1256 if (ret != NXT_OK) {
1257
1258 ls = nxt_zalloc(sizeof(nxt_listen_socket_t) + size);
1259 if (nxt_slow_path(ls == NULL)) {
1260 return NULL;
1261 }
1262
1263 skcf->listen = ls;
1264
1265 ls->sockaddr = nxt_pointer_to(ls, sizeof(nxt_listen_socket_t));
1266 nxt_memcpy(ls->sockaddr, sa, size);
1267
1268 nxt_listen_socket_remote_size(ls);
1269
1270 ls->socket = -1;
1271 ls->backlog = NXT_LISTEN_BACKLOG;
1272 ls->flags = NXT_NONBLOCK;
1273 ls->read_after_accept = 1;
1274 }
1275
1276 switch (sa->u.sockaddr.sa_family) {
1277#if (NXT_HAVE_UNIX_DOMAIN)
1278 case AF_UNIX:
1279 wildcard = 0;
1280 break;
1281#endif
1282#if (NXT_INET6)
1283 case AF_INET6:
1284 wildcard = IN6_IS_ADDR_UNSPECIFIED(&sa->u.sockaddr_in6.sin6_addr);
1285 break;
1286#endif
1287 case AF_INET:
1288 default:
1289 wildcard = (sa->u.sockaddr_in.sin_addr.s_addr == INADDR_ANY);
1290 break;
1291 }
1292
1293 if (!wildcard) {
1294 skcf->sockaddr = nxt_mp_zget(tmcf->conf->mem_pool, size);
1295 if (nxt_slow_path(skcf->sockaddr == NULL)) {
1296 return NULL;
1297 }
1298
1299 nxt_memcpy(skcf->sockaddr, sa, size);
1300 }
1301
1302 return skcf;
1303}
1304
1305
1306static nxt_int_t
1307nxt_router_listen_socket_find(nxt_router_temp_conf_t *tmcf,
1308 nxt_socket_conf_t *nskcf, nxt_sockaddr_t *sa)
1309{
1310 nxt_router_t *router;
1311 nxt_queue_link_t *qlk;
1312 nxt_socket_conf_t *skcf;
1313
1314 router = tmcf->conf->router;
1315
1316 for (qlk = nxt_queue_first(&router->sockets);
1317 qlk != nxt_queue_tail(&router->sockets);
1318 qlk = nxt_queue_next(qlk))
1319 {
1320 skcf = nxt_queue_link_data(qlk, nxt_socket_conf_t, link);
1321
1322 if (nxt_sockaddr_cmp(skcf->listen->sockaddr, sa)) {
1323 nskcf->listen = skcf->listen;
1324
1325 nxt_queue_remove(qlk);
1326 nxt_queue_insert_tail(&tmcf->keeping, qlk);
1327
1328 nxt_queue_insert_tail(&tmcf->updating, &nskcf->link);
1329
1330 return NXT_OK;
1331 }
1332 }
1333
1334 nxt_queue_insert_tail(&tmcf->pending, &nskcf->link);
1335
1336 return NXT_DECLINED;
1337}
1338
1339
1340static void
1341nxt_router_listen_socket_rpc_create(nxt_task_t *task,
1342 nxt_router_temp_conf_t *tmcf, nxt_socket_conf_t *skcf)
1343{
1344 size_t size;
1345 uint32_t stream;
1346 nxt_buf_t *b;
1347 nxt_port_t *main_port, *router_port;
1348 nxt_runtime_t *rt;
1349 nxt_socket_rpc_t *rpc;
1350
1351 rpc = nxt_mp_alloc(tmcf->mem_pool, sizeof(nxt_socket_rpc_t));
1352 if (rpc == NULL) {
1353 goto fail;
1354 }
1355
1356 rpc->socket_conf = skcf;
1357 rpc->temp_conf = tmcf;
1358
1359 size = nxt_sockaddr_size(skcf->listen->sockaddr);
1360
1361 b = nxt_buf_mem_alloc(tmcf->mem_pool, size, 0);
1362 if (b == NULL) {
1363 goto fail;
1364 }
1365
1366 b->mem.free = nxt_cpymem(b->mem.free, skcf->listen->sockaddr, size);
1367
1368 rt = task->thread->runtime;
1369 main_port = rt->port_by_type[NXT_PROCESS_MAIN];
1370 router_port = rt->port_by_type[NXT_PROCESS_ROUTER];
1371
1372 stream = nxt_port_rpc_register_handler(task, router_port,
1373 nxt_router_listen_socket_ready,
1374 nxt_router_listen_socket_error,
1375 main_port->pid, rpc);
1376 if (stream == 0) {
1377 goto fail;
1378 }
1379
1380 nxt_port_socket_write(task, main_port, NXT_PORT_MSG_SOCKET, -1,
1381 stream, router_port->id, b);
1382
1383 return;
1384
1385fail:
1386
1387 nxt_router_conf_error(task, tmcf);
1388}
1389
1390
1391static void
1392nxt_router_listen_socket_ready(nxt_task_t *task, nxt_port_recv_msg_t *msg,
1393 void *data)
1394{
1395 nxt_int_t ret;
1396 nxt_socket_t s;
1397 nxt_socket_rpc_t *rpc;
1398
1399 rpc = data;
1400
1401 s = msg->fd;
1402
1403 ret = nxt_socket_nonblocking(task, s);
1404 if (nxt_slow_path(ret != NXT_OK)) {
1405 goto fail;
1406 }
1407
1408 nxt_socket_defer_accept(task, s, rpc->socket_conf->listen->sockaddr);
1409
1410 ret = nxt_listen_socket(task, s, NXT_LISTEN_BACKLOG);
1411 if (nxt_slow_path(ret != NXT_OK)) {
1412 goto fail;
1413 }
1414
1415 rpc->socket_conf->listen->socket = s;
1416
1417 nxt_work_queue_add(&task->thread->engine->fast_work_queue,
1418 nxt_router_conf_apply, task, rpc->temp_conf, NULL);
1419
1420 return;
1421
1422fail:
1423
1424 nxt_socket_close(task, s);
1425
1426 nxt_router_conf_error(task, rpc->temp_conf);
1427}
1428
1429
1430static void
1431nxt_router_listen_socket_error(nxt_task_t *task, nxt_port_recv_msg_t *msg,
1432 void *data)
1433{
1434 u_char *p;
1435 size_t size;
1436 uint8_t error;
1437 nxt_buf_t *in, *out;
1438 nxt_sockaddr_t *sa;
1439 nxt_socket_rpc_t *rpc;
1440 nxt_router_temp_conf_t *tmcf;
1441
1442 static nxt_str_t socket_errors[] = {
1443 nxt_string("ListenerSystem"),
1444 nxt_string("ListenerNoIPv6"),
1445 nxt_string("ListenerPort"),
1446 nxt_string("ListenerInUse"),
1447 nxt_string("ListenerNoAddress"),
1448 nxt_string("ListenerNoAccess"),
1449 nxt_string("ListenerPath"),
1450 };
1451
1452 rpc = data;
1453 sa = rpc->socket_conf->listen->sockaddr;
1454 tmcf = rpc->temp_conf;
1455
1456 in = nxt_buf_chk_make_plain(tmcf->mem_pool, msg->buf, msg->size);
1457
1458 nxt_assert(in != NULL);
1459
1460 p = in->mem.pos;
1461
1462 error = *p++;
1463
1464 size = sizeof("listen socket error: ") - 1
1465 + sizeof("{listener: \"\", code:\"\", message: \"\"}") - 1
1466 + sa->length + socket_errors[error].length + (in->mem.free - p);
1467
1468 out = nxt_buf_mem_alloc(tmcf->mem_pool, size, 0);
1469 if (nxt_slow_path(out == NULL)) {
1470 return;
1471 }
1472
1473 out->mem.free = nxt_sprintf(out->mem.free, out->mem.end,
1474 "listen socket error: "
1475 "{listener: \"%*s\", code:\"%V\", message: \"%*s\"}",
1476 sa->length, nxt_sockaddr_start(sa),
1477 &socket_errors[error], in->mem.free - p, p);
1478
1479 nxt_debug(task, "%*s", out->mem.free - out->mem.pos, out->mem.pos);
1480
1481 nxt_router_conf_error(task, tmcf);
1482}
1483
1484
1485static nxt_int_t
1486nxt_router_engines_create(nxt_task_t *task, nxt_router_t *router,
1487 nxt_router_temp_conf_t *tmcf, const nxt_event_interface_t *interface)
1488{
1489 nxt_int_t ret;
1490 nxt_uint_t n, threads;
1491 nxt_queue_link_t *qlk;
1492 nxt_router_engine_conf_t *recf;
1493
1494 threads = tmcf->conf->threads;
1495
1496 tmcf->engines = nxt_array_create(tmcf->mem_pool, threads,
1497 sizeof(nxt_router_engine_conf_t));
1498 if (nxt_slow_path(tmcf->engines == NULL)) {
1499 return NXT_ERROR;
1500 }
1501
1502 n = 0;
1503
1504 for (qlk = nxt_queue_first(&router->engines);
1505 qlk != nxt_queue_tail(&router->engines);
1506 qlk = nxt_queue_next(qlk))
1507 {
1508 recf = nxt_array_zero_add(tmcf->engines);
1509 if (nxt_slow_path(recf == NULL)) {
1510 return NXT_ERROR;
1511 }
1512
1513 recf->engine = nxt_queue_link_data(qlk, nxt_event_engine_t, link0);
1514
1515 if (n < threads) {
1516 recf->action = NXT_ROUTER_ENGINE_KEEP;
1517 ret = nxt_router_engine_conf_update(tmcf, recf);
1518
1519 } else {
1520 recf->action = NXT_ROUTER_ENGINE_DELETE;
1521 ret = nxt_router_engine_conf_delete(tmcf, recf);
1522 }
1523
1524 if (nxt_slow_path(ret != NXT_OK)) {
1525 return ret;
1526 }
1527
1528 n++;
1529 }
1530
1531 tmcf->new_threads = n;
1532
1533 while (n < threads) {
1534 recf = nxt_array_zero_add(tmcf->engines);
1535 if (nxt_slow_path(recf == NULL)) {
1536 return NXT_ERROR;
1537 }
1538
1539 recf->action = NXT_ROUTER_ENGINE_ADD;
1540
1541 recf->engine = nxt_event_engine_create(task, interface, NULL, 0, 0);
1542 if (nxt_slow_path(recf->engine == NULL)) {
1543 return NXT_ERROR;
1544 }
1545
1546 ret = nxt_router_engine_conf_create(tmcf, recf);
1547 if (nxt_slow_path(ret != NXT_OK)) {
1548 return ret;
1549 }
1550
1551 n++;
1552 }
1553
1554 return NXT_OK;
1555}
1556
1557
1558static nxt_int_t
1559nxt_router_engine_conf_create(nxt_router_temp_conf_t *tmcf,
1560 nxt_router_engine_conf_t *recf)
1561{
1562 nxt_int_t ret;
1563
1564 ret = nxt_router_engine_joints_create(tmcf, recf, &tmcf->creating,
1565 nxt_router_listen_socket_create);
1566 if (nxt_slow_path(ret != NXT_OK)) {
1567 return ret;
1568 }
1569
1570 ret = nxt_router_engine_joints_create(tmcf, recf, &tmcf->updating,
1571 nxt_router_listen_socket_create);
1572 if (nxt_slow_path(ret != NXT_OK)) {
1573 return ret;
1574 }
1575
1576 return ret;
1577}
1578
1579
1580static nxt_int_t
1581nxt_router_engine_conf_update(nxt_router_temp_conf_t *tmcf,
1582 nxt_router_engine_conf_t *recf)
1583{
1584 nxt_int_t ret;
1585
1586 ret = nxt_router_engine_joints_create(tmcf, recf, &tmcf->creating,
1587 nxt_router_listen_socket_create);
1588 if (nxt_slow_path(ret != NXT_OK)) {
1589 return ret;
1590 }
1591
1592 ret = nxt_router_engine_joints_create(tmcf, recf, &tmcf->updating,
1593 nxt_router_listen_socket_update);
1594 if (nxt_slow_path(ret != NXT_OK)) {
1595 return ret;
1596 }
1597
1598 ret = nxt_router_engine_joints_delete(tmcf, recf, &tmcf->deleting);
1599 if (nxt_slow_path(ret != NXT_OK)) {
1600 return ret;
1601 }
1602
1603 return ret;
1604}
1605
1606
1607static nxt_int_t
1608nxt_router_engine_conf_delete(nxt_router_temp_conf_t *tmcf,
1609 nxt_router_engine_conf_t *recf)
1610{
1611 nxt_int_t ret;
1612
1613 ret = nxt_router_engine_quit(tmcf, recf);
1614 if (nxt_slow_path(ret != NXT_OK)) {
1615 return ret;
1616 }
1617
1618 ret = nxt_router_engine_joints_delete(tmcf, recf, &tmcf->updating);
1619 if (nxt_slow_path(ret != NXT_OK)) {
1620 return ret;
1621 }
1622
1623 return nxt_router_engine_joints_delete(tmcf, recf, &tmcf->deleting);
1624}
1625
1626
1627static nxt_int_t
1628nxt_router_engine_joints_create(nxt_router_temp_conf_t *tmcf,
1629 nxt_router_engine_conf_t *recf, nxt_queue_t *sockets,
1630 nxt_work_handler_t handler)
1631{
1632 nxt_joint_job_t *job;
1633 nxt_queue_link_t *qlk;
1634 nxt_socket_conf_t *skcf;
1635 nxt_socket_conf_joint_t *joint;
1636
1637 for (qlk = nxt_queue_first(sockets);
1638 qlk != nxt_queue_tail(sockets);
1639 qlk = nxt_queue_next(qlk))
1640 {
1641 job = nxt_mp_get(tmcf->mem_pool, sizeof(nxt_joint_job_t));
1642 if (nxt_slow_path(job == NULL)) {
1643 return NXT_ERROR;
1644 }
1645
1646 job->work.next = recf->jobs;
1647 recf->jobs = &job->work;
1648
1649 job->task = tmcf->engine->task;
1650 job->work.handler = handler;
1651 job->work.task = &job->task;
1652 job->work.obj = job;
1653 job->tmcf = tmcf;
1654
1655 tmcf->count++;
1656
1657 joint = nxt_mp_alloc(tmcf->conf->mem_pool,
1658 sizeof(nxt_socket_conf_joint_t));
1659 if (nxt_slow_path(joint == NULL)) {
1660 return NXT_ERROR;
1661 }
1662
1663 job->work.data = joint;
1664
1665 joint->count = 1;
1666
1667 skcf = nxt_queue_link_data(qlk, nxt_socket_conf_t, link);
1668 skcf->count++;
1669 joint->socket_conf = skcf;
1670
1671 joint->engine = recf->engine;
1672 }
1673
1674 return NXT_OK;
1675}
1676
1677
1678static nxt_int_t
1679nxt_router_engine_quit(nxt_router_temp_conf_t *tmcf,
1680 nxt_router_engine_conf_t *recf)
1681{
1682 nxt_joint_job_t *job;
1683
1684 job = nxt_mp_get(tmcf->mem_pool, sizeof(nxt_joint_job_t));
1685 if (nxt_slow_path(job == NULL)) {
1686 return NXT_ERROR;
1687 }
1688
1689 job->work.next = recf->jobs;
1690 recf->jobs = &job->work;
1691
1692 job->task = tmcf->engine->task;
1693 job->work.handler = nxt_router_worker_thread_quit;
1694 job->work.task = &job->task;
1695 job->work.obj = NULL;
1696 job->work.data = NULL;
1697 job->tmcf = NULL;
1698
1699 return NXT_OK;
1700}
1701
1702
1703static nxt_int_t
1704nxt_router_engine_joints_delete(nxt_router_temp_conf_t *tmcf,
1705 nxt_router_engine_conf_t *recf, nxt_queue_t *sockets)
1706{
1707 nxt_joint_job_t *job;
1708 nxt_queue_link_t *qlk;
1709
1710 for (qlk = nxt_queue_first(sockets);
1711 qlk != nxt_queue_tail(sockets);
1712 qlk = nxt_queue_next(qlk))
1713 {
1714 job = nxt_mp_get(tmcf->mem_pool, sizeof(nxt_joint_job_t));
1715 if (nxt_slow_path(job == NULL)) {
1716 return NXT_ERROR;
1717 }
1718
1719 job->work.next = recf->jobs;
1720 recf->jobs = &job->work;
1721
1722 job->task = tmcf->engine->task;
1723 job->work.handler = nxt_router_listen_socket_delete;
1724 job->work.task = &job->task;
1725 job->work.obj = job;
1726 job->work.data = nxt_queue_link_data(qlk, nxt_socket_conf_t, link);
1727 job->tmcf = tmcf;
1728
1729 tmcf->count++;
1730 }
1731
1732 return NXT_OK;
1733}
1734
1735
1736static nxt_int_t
1737nxt_router_threads_create(nxt_task_t *task, nxt_runtime_t *rt,
1738 nxt_router_temp_conf_t *tmcf)
1739{
1740 nxt_int_t ret;
1741 nxt_uint_t i, threads;
1742 nxt_router_engine_conf_t *recf;
1743
1744 recf = tmcf->engines->elts;
1745 threads = tmcf->conf->threads;
1746
1747 for (i = tmcf->new_threads; i < threads; i++) {
1748 ret = nxt_router_thread_create(task, rt, recf[i].engine);
1749 if (nxt_slow_path(ret != NXT_OK)) {
1750 return ret;
1751 }
1752 }
1753
1754 return NXT_OK;
1755}
1756
1757
1758static nxt_int_t
1759nxt_router_thread_create(nxt_task_t *task, nxt_runtime_t *rt,
1760 nxt_event_engine_t *engine)
1761{
1762 nxt_int_t ret;
1763 nxt_thread_link_t *link;
1764 nxt_thread_handle_t handle;
1765
1766 link = nxt_zalloc(sizeof(nxt_thread_link_t));
1767
1768 if (nxt_slow_path(link == NULL)) {
1769 return NXT_ERROR;
1770 }
1771
1772 link->start = nxt_router_thread_start;
1773 link->engine = engine;
1774 link->work.handler = nxt_router_thread_exit_handler;
1775 link->work.task = task;
1776 link->work.data = link;
1777
1778 nxt_queue_insert_tail(&rt->engines, &engine->link);
1779
1780 ret = nxt_thread_create(&handle, link);
1781
1782 if (nxt_slow_path(ret != NXT_OK)) {
1783 nxt_queue_remove(&engine->link);
1784 }
1785
1786 return ret;
1787}
1788
1789
1790static void
1791nxt_router_apps_sort(nxt_task_t *task, nxt_router_t *router,
1792 nxt_router_temp_conf_t *tmcf)
1793{
1794 nxt_app_t *app;
1795 nxt_port_t *port;
1796
1797 nxt_queue_each(app, &router->apps, nxt_app_t, link) {
1798
1799 nxt_queue_remove(&app->link);
1800
1801 nxt_debug(task, "about to free app '%V' %p", &app->name, app);
1802
1803 app->live = 0;
1804
1805 do {
1806 port = nxt_router_app_get_idle_port(app);
1807 if (port == NULL) {
1808 break;
1809 }
1810
1811 nxt_debug(task, "port %p send quit", port);
1812
1813 nxt_port_socket_write(task, port, NXT_PORT_MSG_QUIT, -1, 0, 0,
1814 NULL);
1815
1816 nxt_port_use(task, port, -1);
1817 } while (1);
1818
1819 nxt_router_app_use(task, app, -1);
1820
1821 } nxt_queue_loop;
1822
1823 nxt_queue_add(&router->apps, &tmcf->previous);
1824 nxt_queue_add(&router->apps, &tmcf->apps);
1825}
1826
1827
1828static void
1829nxt_router_engines_post(nxt_router_t *router, nxt_router_temp_conf_t *tmcf)
1830{
1831 nxt_uint_t n;
1832 nxt_event_engine_t *engine;
1833 nxt_router_engine_conf_t *recf;
1834
1835 recf = tmcf->engines->elts;
1836
1837 for (n = tmcf->engines->nelts; n != 0; n--) {
1838 engine = recf->engine;
1839
1840 switch (recf->action) {
1841
1842 case NXT_ROUTER_ENGINE_KEEP:
1843 break;
1844
1845 case NXT_ROUTER_ENGINE_ADD:
1846 nxt_queue_insert_tail(&router->engines, &engine->link0);
1847 break;
1848
1849 case NXT_ROUTER_ENGINE_DELETE:
1850 nxt_queue_remove(&engine->link0);
1851 break;
1852 }
1853
1854 nxt_router_engine_post(engine, recf->jobs);
1855
1856 recf++;
1857 }
1858}
1859
1860
1861static void
1862nxt_router_engine_post(nxt_event_engine_t *engine, nxt_work_t *jobs)
1863{
1864 nxt_work_t *work, *next;
1865
1866 for (work = jobs; work != NULL; work = next) {
1867 next = work->next;
1868 work->next = NULL;
1869
1870 nxt_event_engine_post(engine, work);
1871 }
1872}
1873
1874
1875static nxt_port_handlers_t nxt_router_app_port_handlers = {
1876 .mmap = nxt_port_mmap_handler,
1877 .data = nxt_port_rpc_handler,
1878};
1879
1880
1881static void
1882nxt_router_thread_start(void *data)
1883{
1884 nxt_int_t ret;
1885 nxt_port_t *port;
1886 nxt_task_t *task;
1887 nxt_thread_t *thread;
1888 nxt_thread_link_t *link;
1889 nxt_event_engine_t *engine;
1890
1891 link = data;
1892 engine = link->engine;
1893 task = &engine->task;
1894
1895 thread = nxt_thread();
1896
1897 nxt_event_engine_thread_adopt(engine);
1898
1899 /* STUB */
1900 thread->runtime = engine->task.thread->runtime;
1901
1902 engine->task.thread = thread;
1903 engine->task.log = thread->log;
1904 thread->engine = engine;
1905 thread->task = &engine->task;
1906#if 0
1907 thread->fiber = &engine->fibers->fiber;
1908#endif
1909
1910 engine->mem_pool = nxt_mp_create(4096, 128, 1024, 64);
1911 if (nxt_slow_path(engine->mem_pool == NULL)) {
1912 return;
1913 }
1914
1915 port = nxt_port_new(task, nxt_port_get_next_id(), nxt_pid,
1916 NXT_PROCESS_ROUTER);
1917 if (nxt_slow_path(port == NULL)) {
1918 return;
1919 }
1920
1921 ret = nxt_port_socket_init(task, port, 0);
1922 if (nxt_slow_path(ret != NXT_OK)) {
1923 nxt_port_use(task, port, -1);
1924 return;
1925 }
1926
1927 engine->port = port;
1928
1929 nxt_port_enable(task, port, &nxt_router_app_port_handlers);
1930
1931 nxt_event_engine_start(engine);
1932}
1933
1934
1935static void
1936nxt_router_listen_socket_create(nxt_task_t *task, void *obj, void *data)
1937{
1938 nxt_joint_job_t *job;
1939 nxt_socket_conf_t *skcf;
1940 nxt_listen_event_t *lev;
1941 nxt_listen_socket_t *ls;
1942 nxt_thread_spinlock_t *lock;
1943 nxt_socket_conf_joint_t *joint;
1944
1945 job = obj;
1946 joint = data;
1947
1948 nxt_queue_insert_tail(&task->thread->engine->joints, &joint->link);
1949
1950 skcf = joint->socket_conf;
1951 ls = skcf->listen;
1952
1953 lev = nxt_listen_event(task, ls);
1954 if (nxt_slow_path(lev == NULL)) {
1955 nxt_router_listen_socket_release(task, skcf);
1956 return;
1957 }
1958
1959 lev->socket.data = joint;
1960
1961 lock = &skcf->router_conf->router->lock;
1962
1963 nxt_thread_spin_lock(lock);
1964 ls->count++;
1965 nxt_thread_spin_unlock(lock);
1966
1967 job->work.next = NULL;
1968 job->work.handler = nxt_router_conf_wait;
1969
1970 nxt_event_engine_post(job->tmcf->engine, &job->work);
1971}
1972
1973
1974nxt_inline nxt_listen_event_t *
1975nxt_router_listen_event(nxt_queue_t *listen_connections,
1976 nxt_socket_conf_t *skcf)
1977{
1978 nxt_socket_t fd;
1979 nxt_queue_link_t *qlk;
1980 nxt_listen_event_t *lev;
1981
1982 fd = skcf->listen->socket;
1983
1984 for (qlk = nxt_queue_first(listen_connections);
1985 qlk != nxt_queue_tail(listen_connections);
1986 qlk = nxt_queue_next(qlk))
1987 {
1988 lev = nxt_queue_link_data(qlk, nxt_listen_event_t, link);
1989
1990 if (fd == lev->socket.fd) {
1991 return lev;
1992 }
1993 }
1994
1995 return NULL;
1996}
1997
1998
1999static void
2000nxt_router_listen_socket_update(nxt_task_t *task, void *obj, void *data)
2001{
2002 nxt_joint_job_t *job;
2003 nxt_event_engine_t *engine;
2004 nxt_listen_event_t *lev;
2005 nxt_socket_conf_joint_t *joint, *old;
2006
2007 job = obj;
2008 joint = data;
2009
2010 engine = task->thread->engine;
2011
2012 nxt_queue_insert_tail(&engine->joints, &joint->link);
2013
2014 lev = nxt_router_listen_event(&engine->listen_connections,
2015 joint->socket_conf);
2016
2017 old = lev->socket.data;
2018 lev->socket.data = joint;
2019 lev->listen = joint->socket_conf->listen;
2020
2021 job->work.next = NULL;
2022 job->work.handler = nxt_router_conf_wait;
2023
2024 nxt_event_engine_post(job->tmcf->engine, &job->work);
2025
2026 /*
2027 * The task is allocated from configuration temporary
2028 * memory pool so it can be freed after engine post operation.
2029 */
2030
2031 nxt_router_conf_release(&engine->task, old);
2032}
2033
2034
2035static void
2036nxt_router_listen_socket_delete(nxt_task_t *task, void *obj, void *data)
2037{
2038 nxt_joint_job_t *job;
2039 nxt_socket_conf_t *skcf;
2040 nxt_listen_event_t *lev;
2041 nxt_event_engine_t *engine;
2042
2043 job = obj;
2044 skcf = data;
2045
2046 engine = task->thread->engine;
2047
2048 lev = nxt_router_listen_event(&engine->listen_connections, skcf);
2049
2050 nxt_fd_event_delete(engine, &lev->socket);
2051
2052 nxt_debug(task, "engine %p: listen socket delete: %d", engine,
2053 lev->socket.fd);
2054
2055 lev->timer.handler = nxt_router_listen_socket_close;
2056 lev->timer.work_queue = &engine->fast_work_queue;
2057
2058 nxt_timer_add(engine, &lev->timer, 0);
2059
2060 job->work.next = NULL;
2061 job->work.handler = nxt_router_conf_wait;
2062
2063 nxt_event_engine_post(job->tmcf->engine, &job->work);
2064}
2065
2066
2067static void
2068nxt_router_worker_thread_quit(nxt_task_t *task, void *obj, void *data)
2069{
2070 nxt_event_engine_t *engine;
2071
2072 nxt_debug(task, "router worker thread quit");
2073
2074 engine = task->thread->engine;
2075
2076 engine->shutdown = 1;
2077
2078 if (nxt_queue_is_empty(&engine->joints)) {
2079 nxt_thread_exit(task->thread);
2080 }
2081}
2082
2083
2084static void
2085nxt_router_listen_socket_close(nxt_task_t *task, void *obj, void *data)
2086{
2087 nxt_timer_t *timer;
2088 nxt_listen_event_t *lev;
2089 nxt_socket_conf_joint_t *joint;
2090
2091 timer = obj;
2092 lev = nxt_timer_data(timer, nxt_listen_event_t, timer);
2093 joint = lev->socket.data;
2094
2095 nxt_debug(task, "engine %p: listen socket close: %d", task->thread->engine,
2096 lev->socket.fd);
2097
2098 nxt_queue_remove(&lev->link);
2099
2100 /* 'task' refers to lev->task and we cannot use after nxt_free() */
2101 task = &task->thread->engine->task;
2102
2103 nxt_router_listen_socket_release(task, joint->socket_conf);
2104
2105 nxt_free(lev);
2106
2107 nxt_router_conf_release(task, joint);
2108}
2109
2110
2111static void
2112nxt_router_listen_socket_release(nxt_task_t *task, nxt_socket_conf_t *skcf)
2113{
2114 nxt_listen_socket_t *ls;
2115 nxt_thread_spinlock_t *lock;
2116
2117 ls = skcf->listen;
2118 lock = &skcf->router_conf->router->lock;
2119
2120 nxt_thread_spin_lock(lock);
2121
2122 nxt_debug(task, "engine %p: listen socket release: ls->count %D",
2123 task->thread->engine, ls->count);
2124
2125 if (--ls->count != 0) {
2126 ls = NULL;
2127 }
2128
2129 nxt_thread_spin_unlock(lock);
2130
2131 if (ls != NULL) {
2132 nxt_socket_close(task, ls->socket);
2133 nxt_free(ls);
2134 }
2135}
2136
2137
2138static void
2139nxt_router_conf_release(nxt_task_t *task, nxt_socket_conf_joint_t *joint)
2140{
2141 nxt_bool_t exit;
2142 nxt_socket_conf_t *skcf;
2143 nxt_router_conf_t *rtcf;
2144 nxt_event_engine_t *engine;
2145 nxt_thread_spinlock_t *lock;
2146
2147 nxt_debug(task, "conf joint %p count: %D", joint, joint->count);
2148
2149 if (--joint->count != 0) {
2150 return;
2151 }
2152
2153 nxt_queue_remove(&joint->link);
2154
2155 skcf = joint->socket_conf;
2156 rtcf = skcf->router_conf;
2157 lock = &rtcf->router->lock;
2158
2159 nxt_thread_spin_lock(lock);
2160
2161 nxt_debug(task, "conf skcf %p: %D, rtcf %p: %D", skcf, skcf->count,
2162 rtcf, rtcf->count);
2163
2164 if (--skcf->count != 0) {
2165 rtcf = NULL;
2166
2167 } else {
2168 nxt_queue_remove(&skcf->link);
2169
2170 if (--rtcf->count != 0) {
2171 rtcf = NULL;
2172 }
2173 }
2174
2175 nxt_thread_spin_unlock(lock);
2176
2177 /* TODO remove engine->port */
2178 /* TODO excude from connected ports */
2179
2180 /* The joint content can be used before memory pool destruction. */
2181 engine = joint->engine;
2182 exit = (engine->shutdown && nxt_queue_is_empty(&engine->joints));
2183
2184 if (rtcf != NULL) {
2185 nxt_debug(task, "old router conf is destroyed");
2186
2187 nxt_mp_thread_adopt(rtcf->mem_pool);
2188
2189 nxt_mp_destroy(rtcf->mem_pool);
2190 }
2191
2192 if (exit) {
2193 nxt_thread_exit(task->thread);
2194 }
2195}
2196
2197
2198static void
2199nxt_router_thread_exit_handler(nxt_task_t *task, void *obj, void *data)
2200{
2201 nxt_port_t *port;
2202 nxt_thread_link_t *link;
2203 nxt_event_engine_t *engine;
2204 nxt_thread_handle_t handle;
2205
2206 handle = (nxt_thread_handle_t) obj;
2207 link = data;
2208
2209 nxt_thread_wait(handle);
2210
2211 engine = link->engine;
2212
2213 nxt_queue_remove(&engine->link);
2214
2215 port = engine->port;
2216
2217 // TODO notify all apps
2218
2219 port->engine = task->thread->engine;
2220 nxt_mp_thread_adopt(port->mem_pool);
2221 nxt_port_use(task, port, -1);
2222
2223 nxt_mp_thread_adopt(engine->mem_pool);
2224 nxt_mp_destroy(engine->mem_pool);
2225
2226 nxt_event_engine_free(engine);
2227
2228 nxt_free(link);
2229}
2230
2231
2232static const nxt_conn_state_t nxt_router_conn_read_header_state
2233 nxt_aligned(64) =
2234{
2235 .ready_handler = nxt_router_conn_http_header_parse,
2236 .close_handler = nxt_router_conn_close,
2237 .error_handler = nxt_router_conn_error,
2238
2239 .timer_handler = nxt_router_conn_timeout,
2240 .timer_value = nxt_router_conn_timeout_value,
2241 .timer_data = offsetof(nxt_socket_conf_t, header_read_timeout),
2242};
2243
2244
2245static const nxt_conn_state_t nxt_router_conn_read_body_state
2246 nxt_aligned(64) =
2247{
2248 .ready_handler = nxt_router_conn_http_body_read,
2249 .close_handler = nxt_router_conn_close,
2250 .error_handler = nxt_router_conn_error,
2251
2252 .timer_handler = nxt_router_conn_timeout,
2253 .timer_value = nxt_router_conn_timeout_value,
2254 .timer_data = offsetof(nxt_socket_conf_t, body_read_timeout),
2255 .timer_autoreset = 1,
2256};
2257
2258
2259static void
2260nxt_router_conn_init(nxt_task_t *task, void *obj, void *data)
2261{
2262 size_t size;
2263 nxt_conn_t *c;
2264 nxt_socket_conf_t *skcf;
2265 nxt_event_engine_t *engine;
2266 nxt_socket_conf_joint_t *joint;
2267
2268 c = obj;
2269 joint = data;
2270
2271 nxt_debug(task, "router conn init");
2272
2273 c->joint = joint;
2274 joint->count++;
2275
2276 skcf = joint->socket_conf;
2277 c->local = skcf->sockaddr;
2278
2279 size = skcf->header_buffer_size;
2280 c->read = nxt_buf_mem_alloc(c->mem_pool, size, 0);
2281
2282 c->socket.data = NULL;
2283
2284 engine = task->thread->engine;
2285 c->read_work_queue = &engine->fast_work_queue;
2286 c->write_work_queue = &engine->fast_work_queue;
2287
2288 c->read_state = &nxt_router_conn_read_header_state;
2289
2290 nxt_conn_read(engine, c);
2291}
2292
2293
2294static const nxt_conn_state_t nxt_router_conn_write_state
2295 nxt_aligned(64) =
2296{
2297 .ready_handler = nxt_router_conn_ready,
2298 .close_handler = nxt_router_conn_close,
2299 .error_handler = nxt_router_conn_error,
2300};
2301
2302
2303static void
2304nxt_router_response_ready_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg,
2305 void *data)
2306{
2307 size_t dump_size;
2308 nxt_buf_t *b, *last;
2309 nxt_conn_t *c;
2310 nxt_req_conn_link_t *rc;
2311
2312 b = msg->buf;
2313 rc = data;
2314
2315 c = rc->conn;
2316
2317 dump_size = nxt_buf_used_size(b);
2318
2319 if (dump_size > 300) {
2320 dump_size = 300;
2321 }
2322
2323 nxt_debug(task, "%srouter app data (%z): %*s",
2324 msg->port_msg.last ? "last " : "", msg->size, dump_size,
2325 b->mem.pos);
2326
2327 if (msg->size == 0) {
2328 b = NULL;
2329 }
2330
2331 if (msg->port_msg.last != 0) {
2332 nxt_debug(task, "router data create last buf");
2333
2334 last = nxt_buf_sync_alloc(c->mem_pool, NXT_BUF_SYNC_LAST);
2335 if (nxt_slow_path(last == NULL)) {
2336 /* TODO pogorevaTb */
2337 }
2338
2339 nxt_buf_chain_add(&b, last);
2340
2341 nxt_router_rc_unlink(task, rc);
2342 }
2343
2344 if (b == NULL) {
2345 return;
2346 }
2347
2348 if (msg->buf == b) {
2349 /* Disable instant buffer completion/re-using by port. */
2350 msg->buf = NULL;
2351 }
2352
2353 if (c->write == NULL) {
2354 c->write = b;
2355 c->write_state = &nxt_router_conn_write_state;
2356
2357 nxt_conn_write(task->thread->engine, c);
2358
2359 } else {
2360 nxt_debug(task, "router data attach out bufs to existing chain");
2361
2362 nxt_buf_chain_add(&c->write, b);
2363 }
2364}
2365
2366
2367static void
2368nxt_router_response_error_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg,
2369 void *data)
2370{
2371 nxt_req_conn_link_t *rc;
2372
2373 rc = data;
2374
2375 nxt_router_gen_error(task, rc->conn, 500,
2376 "Application terminated unexpectedly");
2377
2378 nxt_router_rc_unlink(task, rc);
2379}
2380
2381
2382nxt_inline const char *
2383nxt_router_text_by_code(int code)
2384{
2385 switch (code) {
2386 case 400: return "Bad request";
2387 case 404: return "Not found";
2388 case 403: return "Forbidden";
2389 case 408: return "Request Timeout";
2390 case 411: return "Length Required";
2391 case 413: return "Request Entity Too Large";
2392 case 500:
2393 default: return "Internal server error";
2394 }
2395}
2396
2397
2398static nxt_buf_t *
2399nxt_router_get_error_buf(nxt_task_t *task, nxt_mp_t *mp, int code,
2400 const char* str)
2401{
2402 nxt_buf_t *b, *last;
2403
2404 b = nxt_buf_mem_alloc(mp, 16384, 0);
2405 if (nxt_slow_path(b == NULL)) {
2406 return NULL;
2407 }
2408
2409 b->mem.free = nxt_sprintf(b->mem.free, b->mem.end,
2410 "HTTP/1.0 %d %s\r\n"
2411 "Content-Type: text/plain\r\n"
2412 "Connection: close\r\n\r\n",
2413 code, nxt_router_text_by_code(code));
2414
2415 b->mem.free = nxt_cpymem(b->mem.free, str, nxt_strlen(str));
2416
2417 nxt_log_alert(task->log, "error %d: %s", code, str);
2418
2419 last = nxt_buf_sync_alloc(mp, NXT_BUF_SYNC_LAST);
2420
2421 if (nxt_slow_path(last == NULL)) {
2422 nxt_mp_free(mp, b);
2423 return NULL;
2424 }
2425
2426 nxt_buf_chain_add(&b, last);
2427
2428 return b;
2429}
2430
2431
2432
2433static void
2434nxt_router_gen_error(nxt_task_t *task, nxt_conn_t *c, int code,
2435 const char* str)
2436{
2437 nxt_mp_t *mp;
2438 nxt_buf_t *b;
2439
2440 /* TODO: fix when called in the middle of response */
2441
2442 mp = c->mem_pool;
2443
2444 b = nxt_router_get_error_buf(task, mp, code, str);
2445
2446 if (c->socket.fd == -1) {
2447 nxt_mp_free(mp, b->next);
2448 nxt_mp_free(mp, b);
2449 return;
2450 }
2451
2452 if (c->write == NULL) {
2453 c->write = b;
2454 c->write_state = &nxt_router_conn_write_state;
2455
2456 nxt_conn_write(task->thread->engine, c);
2457
2458 } else {
2459 nxt_debug(task, "router data attach out bufs to existing chain");
2460
2461 nxt_buf_chain_add(&c->write, b);
2462 }
2463}
2464
2465
2466static void
2467nxt_router_app_port_ready(nxt_task_t *task, nxt_port_recv_msg_t *msg,
2468 void *data)
2469{
2470 nxt_app_t *app;
2471 nxt_port_t *port;
2472
2473 app = data;
2474 port = msg->u.new_port;
2475
2476 nxt_assert(app != NULL);
2477 nxt_assert(port != NULL);
2478
2479 port->app = app;
2480
2481 nxt_thread_mutex_lock(&app->mutex);
2482
2483 nxt_assert(app->pending_workers != 0);
2484
2485 app->pending_workers--;
2486 app->workers++;
2487
2488 nxt_thread_mutex_unlock(&app->mutex);
2489
2490 nxt_debug(task, "app '%V' %p new port ready", &app->name, app);
2491
2492 nxt_router_app_port_release(task, port, 0, 0);
2493}
2494
2495
2496static void
2497nxt_router_app_port_error(nxt_task_t *task, nxt_port_recv_msg_t *msg,
2498 void *data)
2499{
2500 nxt_app_t *app;
2501 nxt_queue_link_t *lnk;
2502 nxt_req_app_link_t *ra;
2503
2504 app = data;
2505
2506 nxt_assert(app != NULL);
2507
2508 nxt_debug(task, "app '%V' %p start error", &app->name, app);
2509
2510 nxt_thread_mutex_lock(&app->mutex);
2511
2512 nxt_assert(app->pending_workers != 0);
2513
2514 app->pending_workers--;
2515
2516 if (!nxt_queue_is_empty(&app->requests)) {
2517 lnk = nxt_queue_last(&app->requests);
2518 nxt_queue_remove(lnk);
2519 lnk->next = NULL;
2520
2521 ra = nxt_queue_link_data(lnk, nxt_req_app_link_t, link);
2522
2523 } else {
2524 ra = NULL;
2525 }
2526
2527 nxt_thread_mutex_unlock(&app->mutex);
2528
2529 if (ra != NULL) {
2530 nxt_debug(task, "app '%V' %p abort next stream #%uD",
2531 &app->name, app, ra->stream);
2532
2533 nxt_router_ra_abort(task, ra, ra->work.data);
2534 }
2535
2536 nxt_router_app_use(task, app, -1);
2537}
2538
2539
2540void
2541nxt_router_app_use(nxt_task_t *task, nxt_app_t *app, int i)
2542{
2543 int c;
2544
2545 c = nxt_atomic_fetch_add(&app->use_count, i);
2546
2547 if (i < 0 && c == -i) {
2548
2549 nxt_assert(app->live == 0);
2550 nxt_assert(app->workers == 0);
2551 nxt_assert(app->pending_workers == 0);
2552 nxt_assert(nxt_queue_is_empty(&app->requests) != 0);
2553 nxt_assert(nxt_queue_is_empty(&app->ports) != 0);
2554
2555 nxt_thread_mutex_destroy(&app->mutex);
2556 nxt_free(app);
2557 }
2558}
2559
2560
2561nxt_inline nxt_port_t *
2562nxt_router_app_get_port_unsafe(nxt_app_t *app, int *use_delta)
2563{
2564 nxt_port_t *port;
2565 nxt_queue_link_t *lnk;
2566
2567 lnk = nxt_queue_first(&app->ports);
2568 nxt_queue_remove(lnk);
2569
2570 port = nxt_queue_link_data(lnk, nxt_port_t, app_link);
2571
2572 port->app_requests++;
2573
2574 if (app->live &&
2575 (app->max_pending_responses == 0 ||
2576 (port->app_requests - port->app_responses) <
2577 app->max_pending_responses) )
2578 {
2579 nxt_queue_insert_tail(&app->ports, lnk);
2580
2581 } else {
2582 lnk->next = NULL;
2583
2584 (*use_delta)--;
2585 }
2586
2587 return port;
2588}
2589
2590
2591static nxt_port_t *
2592nxt_router_app_get_idle_port(nxt_app_t *app)
2593{
2594 nxt_port_t *port;
2595
2596 port = NULL;
2597
2598 nxt_thread_mutex_lock(&app->mutex);
2599
2600 nxt_queue_each(port, &app->ports, nxt_port_t, app_link) {
2601
2602 if (port->app_requests > port->app_responses) {
2603 port = NULL;
2604
2605 continue;
2606 }
2607
2608 nxt_queue_remove(&port->app_link);
2609 port->app_link.next = NULL;
2610
2611 break;
2612
2613 } nxt_queue_loop;
2614
2615 nxt_thread_mutex_unlock(&app->mutex);
2616
2617 return port;
2618}
2619
2620
2621static void
2622nxt_router_app_process_request(nxt_task_t *task, void *obj, void *data)
2623{
2624 nxt_app_t *app;
2625 nxt_req_app_link_t *ra;
2626
2627 app = obj;
2628 ra = data;
2629
2630 nxt_assert(app != NULL);
2631 nxt_assert(ra != NULL);
2632 nxt_assert(ra->app_port != NULL);
2633
2634 nxt_debug(task, "app '%V' %p process next stream #%uD",
2635 &app->name, app, ra->stream);
2636
2637 nxt_router_process_http_request_mp(task, ra);
2638}
2639
2640
2641static void
2642nxt_router_app_port_release(nxt_task_t *task, nxt_port_t *port,
2643 uint32_t request_failed, uint32_t got_response)
2644{
2645 int use_delta, ra_use_delta;
2646 nxt_app_t *app;
2647 nxt_bool_t send_quit;
2648 nxt_queue_link_t *lnk;
2649 nxt_req_app_link_t *ra;
2650
2651 nxt_assert(port != NULL);
2652 nxt_assert(port->app != NULL);
2653
2654 app = port->app;
2655
2656 use_delta = (request_failed == 0 && got_response == 0) ? 0 : -1;
2657
2658 nxt_thread_mutex_lock(&app->mutex);
2659
2660 port->app_requests -= request_failed;
2661 port->app_responses += got_response;
2662
2663 if (app->live != 0 &&
2664 port->pair[1] != -1 &&
2665 port->app_link.next == NULL &&
2666 (app->max_pending_responses == 0 ||
2667 (port->app_requests - port->app_responses) <
2668 app->max_pending_responses) )
2669 {
2670 nxt_queue_insert_tail(&app->ports, &port->app_link);
2671 use_delta++;
2672 }
2673
2674 if (app->live != 0 &&
2675 !nxt_queue_is_empty(&app->ports) &&
2676 !nxt_queue_is_empty(&app->requests))
2677 {
2678 lnk = nxt_queue_first(&app->requests);
2679 nxt_queue_remove(lnk);
2680 lnk->next = NULL;
2681
2682 ra = nxt_queue_link_data(lnk, nxt_req_app_link_t, link);
2683
2684 ra_use_delta = 1;
2685 ra->app_port = nxt_router_app_get_port_unsafe(app, &ra_use_delta);
2686
2687 } else {
2688 ra = NULL;
2689 ra_use_delta = 0;
2690 }
2691
2692 send_quit = app->live == 0 && port->app_requests == port->app_responses;
2693
2694 nxt_thread_mutex_unlock(&app->mutex);
2695
2696 if (ra != NULL) {
2697 nxt_work_queue_add(&task->thread->engine->fast_work_queue,
2698 nxt_router_app_process_request,
2699 &task->thread->engine->task, app, ra);
2700
2701 goto adjust_use;
2702 }
2703
2704 /* ? */
2705 if (port->pair[1] == -1) {
2706 nxt_debug(task, "app '%V' %p port %p already closed (pid %PI dead?)",
2707 &app->name, app, port, port->pid);
2708
2709 goto adjust_use;
2710 }
2711
2712 if (send_quit) {
2713 nxt_debug(task, "app '%V' %p is not alive, send QUIT to port",
2714 &app->name, app);
2715
2716 nxt_port_socket_write(task, port, NXT_PORT_MSG_QUIT,
2717 -1, 0, 0, NULL);
2718
2719 goto adjust_use;
2720 }
2721
2722 nxt_debug(task, "app '%V' %p requests queue is empty, keep the port",
2723 &app->name, app);
2724
2725adjust_use:
2726
2727 if (use_delta != 0) {
2728 nxt_port_use(task, port, use_delta);
2729 }
2730
2731 if (ra_use_delta != 0) {
2732 nxt_port_use(task, ra->app_port, ra_use_delta);
2733 }
2734}
2735
2736
2737void
2738nxt_router_app_port_close(nxt_task_t *task, nxt_port_t *port)
2739{
2740 nxt_app_t *app;
2741 nxt_bool_t unchain, start_worker;
2742
2743 app = port->app;
2744
2745 nxt_assert(app != NULL);
2746
2747 nxt_thread_mutex_lock(&app->mutex);
2748
2749 unchain = port->app_link.next != NULL;
2750
2751 if (unchain) {
2752 nxt_queue_remove(&port->app_link);
2753 port->app_link.next = NULL;
2754 }
2755
2756 app->workers--;
2757
2758 start_worker = app->live != 0 &&
2759 nxt_queue_is_empty(&app->requests) == 0 &&
2760 app->workers + app->pending_workers < app->max_workers;
2761
2762 if (start_worker) {
2763 app->pending_workers++;
2764 }
2765
2766 nxt_thread_mutex_unlock(&app->mutex);
2767
2768 nxt_debug(task, "app '%V' %p port %p close", &app->name, app, port);
2769
2770 if (unchain) {
2771 nxt_port_use(task, port, -1);
2772 }
2773
2774 if (start_worker) {
2775 nxt_router_start_worker(task, app);
2776 }
2777}
2778
2779
2780static nxt_int_t
2781nxt_router_app_port(nxt_task_t *task, nxt_req_app_link_t *ra)
2782{
2783 int use_delta;
2784 nxt_int_t res;
2785 nxt_app_t *app;
2786 nxt_bool_t can_start_worker;
2787 nxt_conn_t *c;
2788 nxt_port_t *port;
2789 nxt_event_engine_t *engine;
2790 nxt_socket_conf_joint_t *joint;
2791
2792 port = NULL;
2793 use_delta = 1;
2794 c = ra->rc->conn;
2795
2796 joint = c->joint;
2797 app = joint->socket_conf->application;
2798
2799 if (app == NULL) {
2800 nxt_router_gen_error(task, c, 500,
2801 "Application is NULL in socket_conf");
2802 return NXT_ERROR;
2803 }
2804
2805 ra->rc->app = app;
2806
2807 nxt_router_app_use(task, app, 1);
2808
2809 engine = task->thread->engine;
2810
2811 nxt_timer_disable(engine, &c->read_timer);
2812
2813 if (app->timeout != 0) {
2814 c->read_timer.handler = nxt_router_app_timeout;
2815 nxt_timer_add(engine, &c->read_timer, app->timeout);
2816 }
2817
2818 can_start_worker = 0;
2819
2820 nxt_thread_mutex_lock(&app->mutex);
2821
2822 if (!nxt_queue_is_empty(&app->ports)) {
2823 port = nxt_router_app_get_port_unsafe(app, &use_delta);
2824
2825 } else {
2826 ra = nxt_router_ra_create(task, ra);
2827
2828 if (nxt_fast_path(ra != NULL)) {
2829 nxt_queue_insert_tail(&app->requests, &ra->link);
2830
2831 can_start_worker = (app->workers + app->pending_workers) <
2832 app->max_workers;
2833 if (can_start_worker) {
2834 app->pending_workers++;
2835 }
2836 }
2837
2838 port = NULL;
2839 }
2840
2841 nxt_thread_mutex_unlock(&app->mutex);
2842
2843 if (nxt_slow_path(ra == NULL)) {
2844 nxt_router_gen_error(task, c, 500, "Failed to allocate "
2845 "req<->app link");
2846 return NXT_ERROR;
2847 }
2848
2849 if (port != NULL) {
2850 nxt_debug(task, "already have port for app '%V' %p ", &app->name, app);
2851
2852 ra->app_port = port;
2853
2854 if (use_delta != 0) {
2855 nxt_port_use(task, port, use_delta);
2856 }
2857 return NXT_OK;
2858 }
2859
2860 nxt_debug(task, "ra stream #%uD allocated", ra->stream);
2861
2862 if (!can_start_worker) {
2863 nxt_debug(task, "app '%V' %p too many running or pending workers",
2864 &app->name, app);
2865
2866 return NXT_AGAIN;
2867 }
2868
2869 res = nxt_router_start_worker(task, app);
2870
2871 if (nxt_slow_path(res != NXT_OK)) {
2872 nxt_router_gen_error(task, c, 500, "Failed to start worker");
2873
2874 return NXT_ERROR;
2875 }
2876
2877 return NXT_AGAIN;
2878}
2879
2880
2881static void
2882nxt_router_conn_http_header_parse(nxt_task_t *task, void *obj, void *data)
2883{
2884 size_t size;
2885 nxt_int_t ret;
2886 nxt_buf_t *buf;
2887 nxt_conn_t *c;
2888 nxt_sockaddr_t *local;
2889 nxt_app_parse_ctx_t *ap;
2890 nxt_app_request_body_t *b;
2891 nxt_socket_conf_joint_t *joint;
2892 nxt_app_request_header_t *h;
2893
2894 c = obj;
2895 ap = data;
2896 buf = c->read;
2897 joint = c->joint;
2898
2899 nxt_debug(task, "router conn http header parse");
2900
2901 if (ap == NULL) {
2902 ap = nxt_app_http_req_init(task);
2903 if (nxt_slow_path(ap == NULL)) {
2904 nxt_router_gen_error(task, c, 500,
2905 "Failed to allocate parse context");
2906 return;
2907 }
2908
2909 c->socket.data = ap;
2910
2911 ap->r.remote.start = nxt_sockaddr_address(c->remote);
2912 ap->r.remote.length = c->remote->address_length;
2913
2914 /*
2915 * TODO: need an application flag to get local address
2916 * required by "SERVER_ADDR" in Pyhton and PHP. Not used in Go.
2917 */
2918 local = nxt_router_local_addr(task, c);
2919
2920 if (nxt_fast_path(local != NULL)) {
2921 ap->r.local.start = nxt_sockaddr_address(local);
2922 ap->r.local.length = local->address_length;
2923 }
2924
2925 ap->r.header.buf = buf;
2926 }
2927
2928 h = &ap->r.header;
2929 b = &ap->r.body;
2930
2931 ret = nxt_app_http_req_header_parse(task, ap, buf);
2932
2933 nxt_debug(task, "http parse request header: %d", ret);
2934
2935 switch (nxt_expect(NXT_DONE, ret)) {
2936
2937 case NXT_DONE:
2938 nxt_debug(task, "router request header parsing complete, "
2939 "content length: %O, preread: %uz",
2940 h->parsed_content_length, nxt_buf_mem_used_size(&buf->mem));
2941
2942 if (b->done) {
2943 nxt_router_process_http_request(task, c, ap);
2944
2945 return;
2946 }
2947
2948 if (joint->socket_conf->max_body_size > 0
2949 && (size_t) h->parsed_content_length
2950 > joint->socket_conf->max_body_size)
2951 {
2952 nxt_router_gen_error(task, c, 413, "Content-Length too big");
2953 return;
2954 }
2955
2956 if (nxt_buf_mem_free_size(&buf->mem) == 0) {
2957 size = nxt_min(joint->socket_conf->body_buffer_size,
2958 (size_t) h->parsed_content_length);
2959
2960 buf->next = nxt_buf_mem_alloc(c->mem_pool, size, 0);
2961 if (nxt_slow_path(buf->next == NULL)) {
2962 nxt_router_gen_error(task, c, 500, "Failed to allocate "
2963 "buffer for request body");
2964 return;
2965 }
2966
2967 c->read = buf->next;
2968
2969 b->preread_size += nxt_buf_mem_used_size(&buf->mem);
2970 }
2971
2972 if (b->buf == NULL) {
2973 b->buf = c->read;
2974 }
2975
2976 c->read_state = &nxt_router_conn_read_body_state;
2977 break;
2978
2979 case NXT_ERROR:
2980 nxt_router_gen_error(task, c, 400, "Request header parse error");
2981 return;
2982
2983 default: /* NXT_AGAIN */
2984
2985 if (c->read->mem.free == c->read->mem.end) {
2986 size = joint->socket_conf->large_header_buffer_size;
2987
2988 if (size <= (size_t) nxt_buf_mem_used_size(&buf->mem)
2989 || ap->r.header.bufs
2990 >= joint->socket_conf->large_header_buffers)
2991 {
2992 nxt_router_gen_error(task, c, 413,
2993 "Too long request headers");
2994 return;
2995 }
2996
2997 buf->next = nxt_buf_mem_alloc(c->mem_pool, size, 0);
2998 if (nxt_slow_path(buf->next == NULL)) {
2999 nxt_router_gen_error(task, c, 500,
3000 "Failed to allocate large header "
3001 "buffer");
3002 return;
3003 }
3004
3005 ap->r.header.bufs++;
3006
3007 size = c->read->mem.free - c->read->mem.pos;
3008
3009 c->read = nxt_buf_cpy(buf->next, c->read->mem.pos, size);
3010 }
3011
3012 }
3013
3014 nxt_conn_read(task->thread->engine, c);
3015}
3016
3017
3018static nxt_sockaddr_t *
3019nxt_router_local_addr(nxt_task_t *task, nxt_conn_t *c)
3020{
3021 int ret;
3022 size_t size, length;
3023 socklen_t socklen;
3024 nxt_sockaddr_t *sa;
3025
3026 if (c->local != NULL) {
3027 return c->local;
3028 }
3029
3030 /* AF_UNIX should not get in here. */
3031
3032 switch (c->remote->u.sockaddr.sa_family) {
3033#if (NXT_INET6)
3034 case AF_INET6:
3035 socklen = sizeof(struct sockaddr_in6);
3036 length = NXT_INET6_ADDR_STR_LEN;
3037 size = offsetof(nxt_sockaddr_t, u) + socklen + length;
3038 break;
3039#endif
3040 case AF_INET:
3041 default:
3042 socklen = sizeof(struct sockaddr_in);
3043 length = NXT_INET_ADDR_STR_LEN;
3044 size = offsetof(nxt_sockaddr_t, u) + socklen + length;
3045 break;
3046 }
3047
3048 sa = nxt_mp_get(c->mem_pool, size);
3049 if (nxt_slow_path(sa == NULL)) {
3050 return NULL;
3051 }
3052
3053 sa->socklen = socklen;
3054 sa->length = length;
3055
3056 ret = getsockname(c->socket.fd, &sa->u.sockaddr, &socklen);
3057 if (nxt_slow_path(ret != 0)) {
3058 nxt_log(task, NXT_LOG_CRIT, "getsockname(%d) failed", c->socket.fd);
3059 return NULL;
3060 }
3061
3062 c->local = sa;
3063
3064 nxt_sockaddr_text(sa);
3065
3066 /*
3067 * TODO: here we can adjust the end of non-freeable block
3068 * in c->mem_pool to the end of actual sockaddr length.
3069 */
3070
3071 return sa;
3072}
3073
3074
3075static void
3076nxt_router_conn_http_body_read(nxt_task_t *task, void *obj, void *data)
3077{
3078 size_t size;
3079 nxt_int_t ret;
3080 nxt_buf_t *buf;
3081 nxt_conn_t *c;
3082 nxt_app_parse_ctx_t *ap;
3083 nxt_app_request_body_t *b;
3084 nxt_socket_conf_joint_t *joint;
3085 nxt_app_request_header_t *h;
3086
3087 c = obj;
3088 ap = data;
3089 buf = c->read;
3090
3091 nxt_debug(task, "router conn http body read");
3092
3093 nxt_assert(ap != NULL);
3094
3095 b = &ap->r.body;
3096 h = &ap->r.header;
3097
3098 ret = nxt_app_http_req_body_read(task, ap, buf);
3099
3100 nxt_debug(task, "http read request body: %d", ret);
3101
3102 switch (nxt_expect(NXT_DONE, ret)) {
3103
3104 case NXT_DONE:
3105 nxt_router_process_http_request(task, c, ap);
3106 return;
3107
3108 case NXT_ERROR:
3109 nxt_router_gen_error(task, c, 500, "Read body error");
3110 return;
3111
3112 default: /* NXT_AGAIN */
3113
3114 if (nxt_buf_mem_free_size(&buf->mem) == 0) {
3115 joint = c->joint;
3116
3117 b->preread_size += nxt_buf_mem_used_size(&buf->mem);
3118
3119 size = nxt_min(joint->socket_conf->body_buffer_size,
3120 (size_t) h->parsed_content_length - b->preread_size);
3121
3122 buf->next = nxt_buf_mem_alloc(c->mem_pool, size, 0);
3123 if (nxt_slow_path(buf->next == NULL)) {
3124 nxt_router_gen_error(task, c, 500, "Failed to allocate "
3125 "buffer for request body");
3126 return;
3127 }
3128
3129 c->read = buf->next;
3130 }
3131
3132 nxt_debug(task, "router request body read again, rest: %uz",
3133 h->parsed_content_length - b->preread_size);
3134 }
3135
3136 nxt_conn_read(task->thread->engine, c);
3137}
3138
3139
3140static void
3141nxt_router_process_http_request(nxt_task_t *task, nxt_conn_t *c,
3142 nxt_app_parse_ctx_t *ap)
3143{
3144 nxt_int_t res;
3145 nxt_port_t *port;
3146 nxt_event_engine_t *engine;
3147 nxt_req_app_link_t ra_local, *ra;
3148 nxt_req_conn_link_t *rc;
3149
3150 engine = task->thread->engine;
3151
3152 rc = nxt_port_rpc_register_handler_ex(task, engine->port,
3153 nxt_router_response_ready_handler,
3154 nxt_router_response_error_handler,
3155 sizeof(nxt_req_conn_link_t));
3156
3157 if (nxt_slow_path(rc == NULL)) {
3158 nxt_router_gen_error(task, c, 500, "Failed to allocate "
3159 "req<->conn link");
3160
3161 return;
3162 }
3163
3164 rc->stream = nxt_port_rpc_ex_stream(rc);
3165 rc->conn = c;
3166
3167 nxt_queue_insert_tail(&c->requests, &rc->link);
3168
3169 nxt_debug(task, "stream #%uD linked to conn %p at engine %p",
3170 rc->stream, c, engine);
3171
3172 rc->ap = ap;
3173 c->socket.data = NULL;
3174
3175 ra = &ra_local;
3176 nxt_router_ra_init(task, ra, rc);
3177
3178 res = nxt_router_app_port(task, ra);
3179
3180 if (res != NXT_OK) {
3181 return;
3182 }
3183
3184 port = ra->app_port;
3185
3186 if (nxt_slow_path(port == NULL)) {
3187 nxt_router_gen_error(task, c, 500, "Application port not found");
3188 return;
3189 }
3190
3191 nxt_port_rpc_ex_set_peer(task, engine->port, rc, port->pid);
3192
3193 nxt_router_process_http_request_mp(task, ra);
3194}
3195
3196
3197static void
3198nxt_router_process_http_request_mp(nxt_task_t *task, nxt_req_app_link_t *ra)
3199{
3200 uint32_t request_failed;
3201 nxt_int_t res;
3202 nxt_port_t *port, *c_port, *reply_port;
3203 nxt_app_wmsg_t wmsg;
3204 nxt_app_parse_ctx_t *ap;
3205
3206 nxt_assert(ra->app_port != NULL);
3207
3208 port = ra->app_port;
3209 reply_port = ra->reply_port;
3210 ap = ra->ap;
3211
3212 request_failed = 1;
3213
3214 c_port = nxt_process_connected_port_find(port->process, reply_port->pid,
3215 reply_port->id);
3216 if (nxt_slow_path(c_port != reply_port)) {
3217 res = nxt_port_send_port(task, port, reply_port, 0);
3218
3219 if (nxt_slow_path(res != NXT_OK)) {
3220 nxt_router_ra_error(task, ra, 500,
3221 "Failed to send reply port to application");
3222 ra = NULL;
3223 goto release_port;
3224 }
3225
3226 nxt_process_connected_port_add(port->process, reply_port);
3227 }
3228
3229 wmsg.port = port;
3230 wmsg.write = NULL;
3231 wmsg.buf = &wmsg.write;
3232 wmsg.stream = ra->stream;
3233
3234 res = port->app->prepare_msg(task, &ap->r, &wmsg);
3235
3236 if (nxt_slow_path(res != NXT_OK)) {
3237 nxt_router_ra_error(task, ra, 500,
3238 "Failed to prepare message for application");
3239 ra = NULL;
3240 goto release_port;
3241 }
3242
3243 nxt_debug(task, "about to send %d bytes buffer to worker port %d",
3244 nxt_buf_used_size(wmsg.write),
3245 wmsg.port->socket.fd);
3246
3247 request_failed = 0;
3248
3249 res = nxt_port_socket_write(task, wmsg.port, NXT_PORT_MSG_DATA,
3250 -1, ra->stream, reply_port->id, wmsg.write);
3251
3252 if (nxt_slow_path(res != NXT_OK)) {
3253 nxt_router_ra_error(task, ra, 500,
3254 "Failed to send message to application");
3255 ra = NULL;
3256 goto release_port;
3257 }
3258
3259release_port:
3260
3261 nxt_router_app_port_release(task, port, request_failed, 0);
3262
3263 if (ra != NULL) {
3264 if (request_failed != 0) {
3265 ra->app_port = 0;
3266 }
3267
3268 nxt_router_ra_release(task, ra, ra->work.data);
3269 }
3270}
3271
3272
3273static nxt_int_t
3274nxt_python_prepare_msg(nxt_task_t *task, nxt_app_request_t *r,
3275 nxt_app_wmsg_t *wmsg)
3276{
3277 nxt_int_t rc;
3278 nxt_buf_t *b;
3279 nxt_http_field_t *field;
3280 nxt_app_request_header_t *h;
3281
3282 static const nxt_str_t prefix = nxt_string("HTTP_");
3283 static const nxt_str_t eof = nxt_null_string;
3284
3285 h = &r->header;
3286
3287#define RC(S) \
3288 do { \
3289 rc = (S); \
3290 if (nxt_slow_path(rc != NXT_OK)) { \
3291 goto fail; \
3292 } \
3293 } while(0)
3294
3295#define NXT_WRITE(N) \
3296 RC(nxt_app_msg_write_str(task, wmsg, N))
3297
3298 /* TODO error handle, async mmap buffer assignment */
3299
3300 NXT_WRITE(&h->method);
3301 NXT_WRITE(&h->target);
3302
3303 if (h->path.start == h->target.start) {
3304 NXT_WRITE(&eof);
3305
3306 } else {
3307 NXT_WRITE(&h->path);
3308 }
3309
3310 if (h->query.start != NULL) {
3311 RC(nxt_app_msg_write_size(task, wmsg,
3312 h->query.start - h->target.start + 1));
3313 } else {
3314 RC(nxt_app_msg_write_size(task, wmsg, 0));
3315 }
3316
3317 NXT_WRITE(&h->version);
3318
3319 NXT_WRITE(&r->remote);
3320 NXT_WRITE(&r->local);
3321
3322 NXT_WRITE(&h->host);
3323 NXT_WRITE(&h->content_type);
3324 NXT_WRITE(&h->content_length);
3325
3326 nxt_list_each(field, h->fields) {
3327 RC(nxt_app_msg_write_prefixed_upcase(task, wmsg,
3328 &prefix, &field->name));
3329 NXT_WRITE(&field->value);
3330
3331 } nxt_list_loop;
3332
3333 /* end-of-headers mark */
3334 NXT_WRITE(&eof);
3335
3336 RC(nxt_app_msg_write_size(task, wmsg, r->body.preread_size));
3337
3338 for(b = r->body.buf; b != NULL; b = b->next) {
3339 RC(nxt_app_msg_write_raw(task, wmsg, b->mem.pos,
3340 nxt_buf_mem_used_size(&b->mem)));
3341 }
3342
3343#undef NXT_WRITE
3344#undef RC
3345
3346 return NXT_OK;
3347
3348fail:
3349
3350 return NXT_ERROR;
3351}
3352
3353
3354static nxt_int_t
3355nxt_php_prepare_msg(nxt_task_t *task, nxt_app_request_t *r,
3356 nxt_app_wmsg_t *wmsg)
3357{
3358 nxt_int_t rc;
3359 nxt_buf_t *b;
3360 nxt_bool_t method_is_post;
3361 nxt_http_field_t *field;
3362 nxt_app_request_header_t *h;
3363
3364 static const nxt_str_t prefix = nxt_string("HTTP_");
3365 static const nxt_str_t eof = nxt_null_string;
3366
3367 h = &r->header;
3368
3369#define RC(S) \
3370 do { \
3371 rc = (S); \
3372 if (nxt_slow_path(rc != NXT_OK)) { \
3373 goto fail; \
3374 } \
3375 } while(0)
3376
3377#define NXT_WRITE(N) \
3378 RC(nxt_app_msg_write_str(task, wmsg, N))
3379
3380 /* TODO error handle, async mmap buffer assignment */
3381
3382 NXT_WRITE(&h->method);
3383 NXT_WRITE(&h->target);
3384
3385 if (h->path.start == h->target.start) {
3386 NXT_WRITE(&eof);
3387
3388 } else {
3389 NXT_WRITE(&h->path);
3390 }
3391
3392 if (h->query.start != NULL) {
3393 RC(nxt_app_msg_write_size(task, wmsg,
3394 h->query.start - h->target.start + 1));
3395 } else {
3396 RC(nxt_app_msg_write_size(task, wmsg, 0));
3397 }
3398
3399 NXT_WRITE(&h->version);
3400
3401 // PHP_SELF
3402 // SCRIPT_NAME
3403 // SCRIPT_FILENAME
3404 // DOCUMENT_ROOT
3405
3406 NXT_WRITE(&r->remote);
3407 NXT_WRITE(&r->local);
3408
3409 NXT_WRITE(&h->host);
3410 NXT_WRITE(&h->cookie);
3411 NXT_WRITE(&h->content_type);
3412 NXT_WRITE(&h->content_length);
3413
3414 RC(nxt_app_msg_write_size(task, wmsg, h->parsed_content_length));
3415 RC(nxt_app_msg_write_size(task, wmsg, r->body.preread_size));
3416
3417 method_is_post = h->method.length == 4 &&
3418 h->method.start[0] == 'P' &&
3419 h->method.start[1] == 'O' &&
3420 h->method.start[2] == 'S' &&
3421 h->method.start[3] == 'T';
3422
3423 if (method_is_post) {
3424 for(b = r->body.buf; b != NULL; b = b->next) {
3425 RC(nxt_app_msg_write_raw(task, wmsg, b->mem.pos,
3426 nxt_buf_mem_used_size(&b->mem)));
3427 }
3428 }
3429
3430 nxt_list_each(field, h->fields) {
3431 RC(nxt_app_msg_write_prefixed_upcase(task, wmsg,
3432 &prefix, &field->name));
3433 NXT_WRITE(&field->value);
3434
3435 } nxt_list_loop;
3436
3437 /* end-of-headers mark */
3438 NXT_WRITE(&eof);
3439
3440 if (!method_is_post) {
3441 for(b = r->body.buf; b != NULL; b = b->next) {
3442 RC(nxt_app_msg_write_raw(task, wmsg, b->mem.pos,
3443 nxt_buf_mem_used_size(&b->mem)));
3444 }
3445 }
3446
3447#undef NXT_WRITE
3448#undef RC
3449
3450 return NXT_OK;
3451
3452fail:
3453
3454 return NXT_ERROR;
3455}
3456
3457
3458static nxt_int_t
3459nxt_go_prepare_msg(nxt_task_t *task, nxt_app_request_t *r, nxt_app_wmsg_t *wmsg)
3460{
3461 nxt_int_t rc;
3462 nxt_buf_t *b;
3463 nxt_http_field_t *field;
3464 nxt_app_request_header_t *h;
3465
3466 static const nxt_str_t eof = nxt_null_string;
3467
3468 h = &r->header;
3469
3470#define RC(S) \
3471 do { \
3472 rc = (S); \
3473 if (nxt_slow_path(rc != NXT_OK)) { \
3474 goto fail; \
3475 } \
3476 } while(0)
3477
3478#define NXT_WRITE(N) \
3479 RC(nxt_app_msg_write_str(task, wmsg, N))
3480
3481 /* TODO error handle, async mmap buffer assignment */
3482
3483 NXT_WRITE(&h->method);
3484 NXT_WRITE(&h->target);
3485
3486 if (h->path.start == h->target.start) {
3487 NXT_WRITE(&eof);
3488
3489 } else {
3490 NXT_WRITE(&h->path);
3491 }
3492
3493 if (h->query.start != NULL) {
3494 RC(nxt_app_msg_write_size(task, wmsg,
3495 h->query.start - h->target.start + 1));
3496 } else {
3497 RC(nxt_app_msg_write_size(task, wmsg, 0));
3498 }
3499
3500 NXT_WRITE(&h->version);
3501 NXT_WRITE(&r->remote);
3502
3503 NXT_WRITE(&h->host);
3504 NXT_WRITE(&h->cookie);
3505 NXT_WRITE(&h->content_type);
3506 NXT_WRITE(&h->content_length);
3507
3508 RC(nxt_app_msg_write_size(task, wmsg, h->parsed_content_length));
3509
3510 nxt_list_each(field, h->fields) {
3511 NXT_WRITE(&field->name);
3512 NXT_WRITE(&field->value);
3513
3514 } nxt_list_loop;
3515
3516 /* end-of-headers mark */
3517 NXT_WRITE(&eof);
3518
3519 RC(nxt_app_msg_write_size(task, wmsg, r->body.preread_size));
3520
3521 for(b = r->body.buf; b != NULL; b = b->next) {
3522 RC(nxt_app_msg_write_raw(task, wmsg, b->mem.pos,
3523 nxt_buf_mem_used_size(&b->mem)));
3524 }
3525
3526#undef NXT_WRITE
3527#undef RC
3528
3529 return NXT_OK;
3530
3531fail:
3532
3533 return NXT_ERROR;
3534}
3535
3536
3537static const nxt_conn_state_t nxt_router_conn_close_state
3538 nxt_aligned(64) =
3539{
3540 .ready_handler = nxt_router_conn_free,
3541};
3542
3543
3544static void
3545nxt_router_conn_ready(nxt_task_t *task, void *obj, void *data)
3546{
3547 nxt_buf_t *b;
3548 nxt_bool_t last;
3549 nxt_conn_t *c;
3550 nxt_work_queue_t *wq;
3551
3552 nxt_debug(task, "router conn ready %p", obj);
3553
3554 c = obj;
3555 b = c->write;
3556
3557 wq = &task->thread->engine->fast_work_queue;
3558
3559 last = 0;
3560
3561 while (b != NULL) {
3562 if (!nxt_buf_is_sync(b)) {
3563 if (nxt_buf_used_size(b) > 0) {
3564 break;
3565 }
3566 }
3567
3568 if (nxt_buf_is_last(b)) {
3569 last = 1;
3570 }
3571
3572 nxt_work_queue_add(wq, b->completion_handler, task, b, b->parent);
3573
3574 b = b->next;
3575 }
3576
3577 c->write = b;
3578
3579 if (b != NULL) {
3580 nxt_debug(task, "router conn %p has more data to write", obj);
3581
3582 nxt_conn_write(task->thread->engine, c);
3583
3584 } else {
3585 nxt_debug(task, "router conn %p no more data to write, last = %d", obj,
3586 last);
3587
3588 if (last != 0) {
3589 nxt_debug(task, "enqueue router conn close %p (ready handler)", c);
3590
3591 nxt_work_queue_add(wq, nxt_router_conn_close, task, c,
3592 c->socket.data);
3593 }
3594 }
3595}
3596
3597
3598static void
3599nxt_router_conn_close(nxt_task_t *task, void *obj, void *data)
3600{
3601 nxt_conn_t *c;
3602
3603 c = obj;
3604
3605 nxt_debug(task, "router conn close");
3606
3607 c->write_state = &nxt_router_conn_close_state;
3608
3609 nxt_conn_close(task->thread->engine, c);
3610}
3611
3612
3613static void
3614nxt_router_conn_mp_cleanup(nxt_task_t *task, void *obj, void *data)
3615{
3616 nxt_socket_conf_joint_t *joint;
3617
3618 joint = obj;
3619
3620 nxt_router_conf_release(task, joint);
3621}
3622
3623
3624static void
3625nxt_router_conn_free(nxt_task_t *task, void *obj, void *data)
3626{
3627 nxt_conn_t *c;
3628 nxt_event_engine_t *engine;
3629 nxt_req_conn_link_t *rc;
3630 nxt_app_parse_ctx_t *ap;
3631 nxt_socket_conf_joint_t *joint;
3632
3633 c = obj;
3634 ap = data;
3635
3636 nxt_debug(task, "router conn close done");
3637
3638 if (ap != NULL) {
3639 nxt_app_http_req_done(task, ap);
3640
3641 c->socket.data = NULL;
3642 }
3643
3644 nxt_queue_each(rc, &c->requests, nxt_req_conn_link_t, link) {
3645
3646 nxt_debug(task, "conn %p close, stream #%uD", c, rc->stream);
3647
3648 nxt_router_rc_unlink(task, rc);
3649
3650 nxt_port_rpc_cancel(task, task->thread->engine->port, rc->stream);
3651
3652 } nxt_queue_loop;
3653
3654 nxt_queue_remove(&c->link);
3655
3656 engine = task->thread->engine;
3657
3658 nxt_sockaddr_cache_free(engine, c);
3659
3660 joint = c->joint;
3661
3662 nxt_mp_cleanup(c->mem_pool, nxt_router_conn_mp_cleanup,
3663 &engine->task, joint, NULL);
3664
1
2/*
3 * Copyright (C) Igor Sysoev
4 * Copyright (C) Valentin V. Bartenev
5 * Copyright (C) NGINX, Inc.
6 */
7
8#include <nxt_router.h>
9#include <nxt_conf.h>
10
11
12typedef struct {
13 nxt_str_t type;
14 uint32_t workers;
15 nxt_msec_t timeout;
16 uint32_t requests;
17 nxt_conf_value_t *limits_value;
18} nxt_router_app_conf_t;
19
20
21typedef struct {
22 nxt_str_t application;
23} nxt_router_listener_conf_t;
24
25
26typedef struct nxt_req_app_link_s nxt_req_app_link_t;
27
28
29typedef struct {
30 uint32_t stream;
31 nxt_conn_t *conn;
32 nxt_app_t *app;
33 nxt_port_t *app_port;
34 nxt_app_parse_ctx_t *ap;
35 nxt_req_app_link_t *ra;
36
37 nxt_queue_link_t link; /* for nxt_conn_t.requests */
38} nxt_req_conn_link_t;
39
40
41struct nxt_req_app_link_s {
42 uint32_t stream;
43 nxt_port_t *app_port;
44 nxt_pid_t app_pid;
45 nxt_port_t *reply_port;
46 nxt_app_parse_ctx_t *ap;
47 nxt_req_conn_link_t *rc;
48
49 nxt_queue_link_t link; /* for nxt_app_t.requests */
50
51 nxt_mp_t *mem_pool;
52 nxt_work_t work;
53
54 int err_code;
55 const char *err_str;
56};
57
58
59typedef struct {
60 nxt_socket_conf_t *socket_conf;
61 nxt_router_temp_conf_t *temp_conf;
62} nxt_socket_rpc_t;
63
64
65static nxt_int_t nxt_router_start_worker(nxt_task_t *task, nxt_app_t *app);
66
67static void nxt_router_ra_error(nxt_task_t *task, nxt_req_app_link_t *ra,
68 int code, const char* str);
69
70static nxt_router_temp_conf_t *nxt_router_temp_conf(nxt_task_t *task);
71static void nxt_router_conf_apply(nxt_task_t *task, void *obj, void *data);
72static void nxt_router_conf_ready(nxt_task_t *task,
73 nxt_router_temp_conf_t *tmcf);
74static void nxt_router_conf_error(nxt_task_t *task,
75 nxt_router_temp_conf_t *tmcf);
76static void nxt_router_conf_send(nxt_task_t *task,
77 nxt_router_temp_conf_t *tmcf, nxt_port_msg_type_t type);
78
79static nxt_int_t nxt_router_conf_create(nxt_task_t *task,
80 nxt_router_temp_conf_t *tmcf, u_char *start, u_char *end);
81static nxt_app_t *nxt_router_app_find(nxt_queue_t *queue, nxt_str_t *name);
82static nxt_app_t *nxt_router_listener_application(nxt_router_temp_conf_t *tmcf,
83 nxt_str_t *name);
84static void nxt_router_listen_socket_rpc_create(nxt_task_t *task,
85 nxt_router_temp_conf_t *tmcf, nxt_socket_conf_t *skcf);
86static void nxt_router_listen_socket_ready(nxt_task_t *task,
87 nxt_port_recv_msg_t *msg, void *data);
88static void nxt_router_listen_socket_error(nxt_task_t *task,
89 nxt_port_recv_msg_t *msg, void *data);
90static nxt_socket_conf_t *nxt_router_socket_conf(nxt_task_t *task,
91 nxt_router_temp_conf_t *tmcf, nxt_str_t *name);
92static nxt_int_t nxt_router_listen_socket_find(nxt_router_temp_conf_t *tmcf,
93 nxt_socket_conf_t *nskcf, nxt_sockaddr_t *sa);
94
95static nxt_int_t nxt_router_engines_create(nxt_task_t *task,
96 nxt_router_t *router, nxt_router_temp_conf_t *tmcf,
97 const nxt_event_interface_t *interface);
98static nxt_int_t nxt_router_engine_conf_create(nxt_router_temp_conf_t *tmcf,
99 nxt_router_engine_conf_t *recf);
100static nxt_int_t nxt_router_engine_conf_update(nxt_router_temp_conf_t *tmcf,
101 nxt_router_engine_conf_t *recf);
102static nxt_int_t nxt_router_engine_conf_delete(nxt_router_temp_conf_t *tmcf,
103 nxt_router_engine_conf_t *recf);
104static nxt_int_t nxt_router_engine_joints_create(nxt_router_temp_conf_t *tmcf,
105 nxt_router_engine_conf_t *recf, nxt_queue_t *sockets,
106 nxt_work_handler_t handler);
107static nxt_int_t nxt_router_engine_quit(nxt_router_temp_conf_t *tmcf,
108 nxt_router_engine_conf_t *recf);
109static nxt_int_t nxt_router_engine_joints_delete(nxt_router_temp_conf_t *tmcf,
110 nxt_router_engine_conf_t *recf, nxt_queue_t *sockets);
111
112static nxt_int_t nxt_router_threads_create(nxt_task_t *task, nxt_runtime_t *rt,
113 nxt_router_temp_conf_t *tmcf);
114static nxt_int_t nxt_router_thread_create(nxt_task_t *task, nxt_runtime_t *rt,
115 nxt_event_engine_t *engine);
116static void nxt_router_apps_sort(nxt_task_t *task, nxt_router_t *router,
117 nxt_router_temp_conf_t *tmcf);
118
119static void nxt_router_engines_post(nxt_router_t *router,
120 nxt_router_temp_conf_t *tmcf);
121static void nxt_router_engine_post(nxt_event_engine_t *engine,
122 nxt_work_t *jobs);
123
124static void nxt_router_thread_start(void *data);
125static void nxt_router_listen_socket_create(nxt_task_t *task, void *obj,
126 void *data);
127static void nxt_router_listen_socket_update(nxt_task_t *task, void *obj,
128 void *data);
129static void nxt_router_listen_socket_delete(nxt_task_t *task, void *obj,
130 void *data);
131static void nxt_router_worker_thread_quit(nxt_task_t *task, void *obj,
132 void *data);
133static void nxt_router_listen_socket_close(nxt_task_t *task, void *obj,
134 void *data);
135static void nxt_router_thread_exit_handler(nxt_task_t *task, void *obj,
136 void *data);
137static void nxt_router_listen_socket_release(nxt_task_t *task,
138 nxt_socket_conf_t *skcf);
139static void nxt_router_conf_release(nxt_task_t *task,
140 nxt_socket_conf_joint_t *joint);
141
142static void nxt_router_app_port_ready(nxt_task_t *task,
143 nxt_port_recv_msg_t *msg, void *data);
144static void nxt_router_app_port_error(nxt_task_t *task,
145 nxt_port_recv_msg_t *msg, void *data);
146
147static nxt_port_t * nxt_router_app_get_idle_port(nxt_app_t *app);
148static void nxt_router_app_port_release(nxt_task_t *task, nxt_port_t *port,
149 uint32_t request_failed, uint32_t got_response);
150
151static void nxt_router_conn_init(nxt_task_t *task, void *obj, void *data);
152static void nxt_router_conn_http_header_parse(nxt_task_t *task, void *obj,
153 void *data);
154static nxt_sockaddr_t *nxt_router_local_addr(nxt_task_t *task, nxt_conn_t *c);
155static void nxt_router_conn_http_body_read(nxt_task_t *task, void *obj,
156 void *data);
157static void nxt_router_process_http_request(nxt_task_t *task,
158 nxt_conn_t *c, nxt_app_parse_ctx_t *ap);
159static void nxt_router_process_http_request_mp(nxt_task_t *task,
160 nxt_req_app_link_t *ra);
161static nxt_int_t nxt_python_prepare_msg(nxt_task_t *task, nxt_app_request_t *r,
162 nxt_app_wmsg_t *wmsg);
163static nxt_int_t nxt_php_prepare_msg(nxt_task_t *task, nxt_app_request_t *r,
164 nxt_app_wmsg_t *wmsg);
165static nxt_int_t nxt_go_prepare_msg(nxt_task_t *task, nxt_app_request_t *r,
166 nxt_app_wmsg_t *wmsg);
167static void nxt_router_conn_ready(nxt_task_t *task, void *obj, void *data);
168static void nxt_router_conn_close(nxt_task_t *task, void *obj, void *data);
169static void nxt_router_conn_free(nxt_task_t *task, void *obj, void *data);
170static void nxt_router_conn_error(nxt_task_t *task, void *obj, void *data);
171static void nxt_router_conn_timeout(nxt_task_t *task, void *obj, void *data);
172static void nxt_router_app_timeout(nxt_task_t *task, void *obj, void *data);
173static nxt_msec_t nxt_router_conn_timeout_value(nxt_conn_t *c, uintptr_t data);
174
175static void nxt_router_gen_error(nxt_task_t *task, nxt_conn_t *c, int code,
176 const char* str);
177
178static nxt_router_t *nxt_router;
179
180
181static nxt_app_prepare_msg_t nxt_app_prepare_msg[] = {
182 nxt_python_prepare_msg,
183 nxt_php_prepare_msg,
184 nxt_go_prepare_msg,
185};
186
187
188nxt_int_t
189nxt_router_start(nxt_task_t *task, void *data)
190{
191 nxt_int_t ret;
192 nxt_router_t *router;
193 nxt_runtime_t *rt;
194
195 rt = task->thread->runtime;
196
197 ret = nxt_app_http_init(task, rt);
198 if (nxt_slow_path(ret != NXT_OK)) {
199 return ret;
200 }
201
202 router = nxt_zalloc(sizeof(nxt_router_t));
203 if (nxt_slow_path(router == NULL)) {
204 return NXT_ERROR;
205 }
206
207 nxt_queue_init(&router->engines);
208 nxt_queue_init(&router->sockets);
209 nxt_queue_init(&router->apps);
210
211 nxt_router = router;
212
213 return NXT_OK;
214}
215
216
217static void
218nxt_router_start_worker_handler(nxt_task_t *task, nxt_port_t *port, void *data)
219{
220 size_t size;
221 uint32_t stream;
222 nxt_app_t *app;
223 nxt_buf_t *b;
224 nxt_port_t *main_port;
225 nxt_runtime_t *rt;
226
227 app = data;
228
229 rt = task->thread->runtime;
230 main_port = rt->port_by_type[NXT_PROCESS_MAIN];
231
232 nxt_debug(task, "app '%V' %p start worker", &app->name, app);
233
234 size = app->name.length + 1 + app->conf.length;
235
236 b = nxt_buf_mem_ts_alloc(task, task->thread->engine->mem_pool, size);
237
238 if (nxt_slow_path(b == NULL)) {
239 goto failed;
240 }
241
242 nxt_buf_cpystr(b, &app->name);
243 *b->mem.free++ = '\0';
244 nxt_buf_cpystr(b, &app->conf);
245
246 stream = nxt_port_rpc_register_handler(task, port,
247 nxt_router_app_port_ready,
248 nxt_router_app_port_error,
249 -1, app);
250
251 if (nxt_slow_path(stream == 0)) {
252 nxt_mp_release(b->data, b);
253
254 goto failed;
255 }
256
257 nxt_port_socket_write(task, main_port, NXT_PORT_MSG_START_WORKER, -1,
258 stream, port->id, b);
259
260 return;
261
262failed:
263
264 nxt_thread_mutex_lock(&app->mutex);
265
266 app->pending_workers--;
267
268 nxt_thread_mutex_unlock(&app->mutex);
269
270 nxt_router_app_use(task, app, -1);
271}
272
273
274static nxt_int_t
275nxt_router_start_worker(nxt_task_t *task, nxt_app_t *app)
276{
277 nxt_int_t res;
278 nxt_port_t *router_port;
279 nxt_runtime_t *rt;
280
281 rt = task->thread->runtime;
282 router_port = rt->port_by_type[NXT_PROCESS_ROUTER];
283
284 nxt_router_app_use(task, app, 1);
285
286 res = nxt_port_post(task, router_port, nxt_router_start_worker_handler,
287 app);
288
289 if (res == NXT_OK) {
290 return res;
291 }
292
293 nxt_thread_mutex_lock(&app->mutex);
294
295 app->pending_workers--;
296
297 nxt_thread_mutex_unlock(&app->mutex);
298
299 nxt_router_app_use(task, app, -1);
300
301 return NXT_ERROR;
302}
303
304
305nxt_inline void
306nxt_router_ra_init(nxt_task_t *task, nxt_req_app_link_t *ra,
307 nxt_req_conn_link_t *rc)
308{
309 nxt_event_engine_t *engine;
310
311 engine = task->thread->engine;
312
313 nxt_memzero(ra, sizeof(nxt_req_app_link_t));
314
315 ra->stream = rc->stream;
316 ra->app_pid = -1;
317 ra->rc = rc;
318 rc->ra = ra;
319 ra->reply_port = engine->port;
320 ra->ap = rc->ap;
321
322 ra->work.handler = NULL;
323 ra->work.task = &engine->task;
324 ra->work.obj = ra;
325 ra->work.data = engine;
326}
327
328
329nxt_inline nxt_req_app_link_t *
330nxt_router_ra_create(nxt_task_t *task, nxt_req_app_link_t *ra_src)
331{
332 nxt_mp_t *mp;
333 nxt_req_app_link_t *ra;
334
335 mp = ra_src->ap->mem_pool;
336
337 ra = nxt_mp_retain(mp, sizeof(nxt_req_app_link_t));
338
339 if (nxt_slow_path(ra == NULL)) {
340
341 ra_src->rc->ra = NULL;
342 ra_src->rc = NULL;
343
344 return NULL;
345 }
346
347 nxt_router_ra_init(task, ra, ra_src->rc);
348
349 ra->mem_pool = mp;
350
351 return ra;
352}
353
354
355static void
356nxt_router_ra_release(nxt_task_t *task, void *obj, void *data)
357{
358 nxt_req_app_link_t *ra;
359 nxt_event_engine_t *engine;
360 nxt_req_conn_link_t *rc;
361
362 ra = obj;
363 engine = data;
364
365 if (task->thread->engine != engine) {
366 if (ra->app_port != NULL) {
367 ra->app_pid = ra->app_port->pid;
368 }
369
370 ra->work.handler = nxt_router_ra_release;
371 ra->work.task = &engine->task;
372 ra->work.next = NULL;
373
374 nxt_debug(task, "ra stream #%uD post release to %p",
375 ra->stream, engine);
376
377 nxt_event_engine_post(engine, &ra->work);
378
379 return;
380 }
381
382 nxt_debug(task, "ra stream #%uD release", ra->stream);
383
384 rc = ra->rc;
385
386 if (rc != NULL) {
387 if (ra->app_pid != -1) {
388 nxt_port_rpc_ex_set_peer(task, engine->port, rc, ra->app_pid);
389 }
390
391 rc->app_port = ra->app_port;
392
393 ra->app_port = NULL;
394 rc->ra = NULL;
395 ra->rc = NULL;
396 }
397
398 if (ra->app_port != NULL) {
399 nxt_router_app_port_release(task, ra->app_port, 0, 1);
400
401 ra->app_port = NULL;
402 }
403
404 if (ra->mem_pool != NULL) {
405 nxt_mp_release(ra->mem_pool, ra);
406 }
407}
408
409
410static void
411nxt_router_ra_abort(nxt_task_t *task, void *obj, void *data)
412{
413 nxt_conn_t *c;
414 nxt_req_app_link_t *ra;
415 nxt_req_conn_link_t *rc;
416 nxt_event_engine_t *engine;
417
418 ra = obj;
419 engine = data;
420
421 if (task->thread->engine != engine) {
422 ra->work.handler = nxt_router_ra_abort;
423 ra->work.task = &engine->task;
424 ra->work.next = NULL;
425
426 nxt_debug(task, "ra stream #%uD post abort to %p", ra->stream, engine);
427
428 nxt_event_engine_post(engine, &ra->work);
429
430 return;
431 }
432
433 nxt_debug(task, "ra stream #%uD abort", ra->stream);
434
435 rc = ra->rc;
436
437 if (rc != NULL) {
438 c = rc->conn;
439
440 nxt_router_gen_error(task, c, 500,
441 "Failed to start application worker");
442
443 rc->ra = NULL;
444 ra->rc = NULL;
445 }
446
447 if (ra->app_port != NULL) {
448 nxt_router_app_port_release(task, ra->app_port, 0, 1);
449
450 ra->app_port = NULL;
451 }
452
453 if (ra->mem_pool != NULL) {
454 nxt_mp_release(ra->mem_pool, ra);
455 }
456}
457
458
459static void
460nxt_router_ra_error_handler(nxt_task_t *task, void *obj, void *data)
461{
462 nxt_req_app_link_t *ra;
463
464 ra = obj;
465
466 nxt_router_ra_error(task, ra, ra->err_code, ra->err_str);
467}
468
469
470static void
471nxt_router_ra_error(nxt_task_t *task, nxt_req_app_link_t *ra, int code,
472 const char* str)
473{
474 nxt_conn_t *c;
475 nxt_req_conn_link_t *rc;
476 nxt_event_engine_t *engine;
477
478 engine = ra->work.data;
479
480 if (task->thread->engine != engine) {
481 ra->err_code = code;
482 ra->err_str = str;
483
484 ra->work.handler = nxt_router_ra_error_handler;
485 ra->work.task = &engine->task;
486 ra->work.next = NULL;
487
488 nxt_debug(task, "ra stream #%uD post error to %p", ra->stream, engine);
489
490 nxt_event_engine_post(engine, &ra->work);
491
492 return;
493 }
494
495 nxt_debug(task, "ra stream #%uD error", ra->stream);
496
497 rc = ra->rc;
498
499 if (rc != NULL) {
500 c = rc->conn;
501
502 nxt_router_gen_error(task, c, code, str);
503
504 rc->ra = NULL;
505 ra->rc = NULL;
506 }
507
508 if (ra->app_port != NULL) {
509 nxt_router_app_port_release(task, ra->app_port, 0, 1);
510
511 ra->app_port = NULL;
512 }
513
514 if (ra->mem_pool != NULL) {
515 nxt_mp_release(ra->mem_pool, ra);
516 }
517}
518
519
520nxt_inline void
521nxt_router_rc_unlink(nxt_task_t *task, nxt_req_conn_link_t *rc)
522{
523 nxt_req_app_link_t *ra;
524
525 if (rc->app_port != NULL) {
526 nxt_router_app_port_release(task, rc->app_port, 0, 1);
527
528 rc->app_port = NULL;
529 }
530
531 ra = rc->ra;
532
533 if (ra != NULL) {
534 rc->ra = NULL;
535 ra->rc = NULL;
536
537 nxt_thread_mutex_lock(&rc->app->mutex);
538
539 if (ra->link.next != NULL) {
540 nxt_queue_remove(&ra->link);
541
542 ra->link.next = NULL;
543
544 } else {
545 ra = NULL;
546 }
547
548 nxt_thread_mutex_unlock(&rc->app->mutex);
549 }
550
551 if (ra != NULL) {
552 nxt_router_ra_release(task, ra, ra->work.data);
553 }
554
555 if (rc->app != NULL) {
556 nxt_router_app_use(task, rc->app, -1);
557
558 rc->app = NULL;
559 }
560
561 if (rc->ap != NULL) {
562 nxt_app_http_req_done(task, rc->ap);
563
564 rc->ap = NULL;
565 }
566
567 nxt_queue_remove(&rc->link);
568
569 rc->conn = NULL;
570}
571
572
573void
574nxt_router_new_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
575{
576 nxt_port_new_port_handler(task, msg);
577
578 if (msg->port_msg.stream == 0) {
579 return;
580 }
581
582 if (msg->u.new_port == NULL ||
583 msg->u.new_port->type != NXT_PROCESS_WORKER)
584 {
585 msg->port_msg.type = _NXT_PORT_MSG_RPC_ERROR;
586 }
587
588 nxt_port_rpc_handler(task, msg);
589}
590
591
592void
593nxt_router_conf_data_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
594{
595 nxt_int_t ret;
596 nxt_buf_t *b;
597 nxt_router_temp_conf_t *tmcf;
598
599 tmcf = nxt_router_temp_conf(task);
600 if (nxt_slow_path(tmcf == NULL)) {
601 return;
602 }
603
604 b = nxt_buf_chk_make_plain(tmcf->conf->mem_pool, msg->buf, msg->size);
605
606 nxt_assert(b != NULL);
607
608 tmcf->conf->router = nxt_router;
609 tmcf->stream = msg->port_msg.stream;
610 tmcf->port = nxt_runtime_port_find(task->thread->runtime,
611 msg->port_msg.pid,
612 msg->port_msg.reply_port);
613
614 ret = nxt_router_conf_create(task, tmcf, b->mem.pos, b->mem.free);
615
616 if (nxt_fast_path(ret == NXT_OK)) {
617 nxt_router_conf_apply(task, tmcf, NULL);
618
619 } else {
620 nxt_router_conf_error(task, tmcf);
621 }
622}
623
624
625static void
626nxt_router_worker_remove_pid(nxt_task_t *task, nxt_port_t *port, void *data)
627{
628 union {
629 nxt_pid_t removed_pid;
630 void *data;
631 } u;
632
633 u.data = data;
634
635 nxt_port_rpc_remove_peer(task, port, u.removed_pid);
636}
637
638
639void
640nxt_router_remove_pid_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
641{
642 nxt_event_engine_t *engine;
643
644 nxt_port_remove_pid_handler(task, msg);
645
646 if (msg->port_msg.stream == 0) {
647 return;
648 }
649
650 nxt_queue_each(engine, &nxt_router->engines, nxt_event_engine_t, link0)
651 {
652 nxt_port_post(task, engine->port, nxt_router_worker_remove_pid,
653 msg->u.data);
654 }
655 nxt_queue_loop;
656
657 msg->port_msg.type = _NXT_PORT_MSG_RPC_ERROR;
658
659 nxt_port_rpc_handler(task, msg);
660}
661
662
663static nxt_router_temp_conf_t *
664nxt_router_temp_conf(nxt_task_t *task)
665{
666 nxt_mp_t *mp, *tmp;
667 nxt_router_conf_t *rtcf;
668 nxt_router_temp_conf_t *tmcf;
669
670 mp = nxt_mp_create(1024, 128, 256, 32);
671 if (nxt_slow_path(mp == NULL)) {
672 return NULL;
673 }
674
675 rtcf = nxt_mp_zget(mp, sizeof(nxt_router_conf_t));
676 if (nxt_slow_path(rtcf == NULL)) {
677 goto fail;
678 }
679
680 rtcf->mem_pool = mp;
681
682 tmp = nxt_mp_create(1024, 128, 256, 32);
683 if (nxt_slow_path(tmp == NULL)) {
684 goto fail;
685 }
686
687 tmcf = nxt_mp_zget(tmp, sizeof(nxt_router_temp_conf_t));
688 if (nxt_slow_path(tmcf == NULL)) {
689 goto temp_fail;
690 }
691
692 tmcf->mem_pool = tmp;
693 tmcf->conf = rtcf;
694 tmcf->count = 1;
695 tmcf->engine = task->thread->engine;
696
697 tmcf->engines = nxt_array_create(tmcf->mem_pool, 4,
698 sizeof(nxt_router_engine_conf_t));
699 if (nxt_slow_path(tmcf->engines == NULL)) {
700 goto temp_fail;
701 }
702
703 nxt_queue_init(&tmcf->deleting);
704 nxt_queue_init(&tmcf->keeping);
705 nxt_queue_init(&tmcf->updating);
706 nxt_queue_init(&tmcf->pending);
707 nxt_queue_init(&tmcf->creating);
708 nxt_queue_init(&tmcf->apps);
709 nxt_queue_init(&tmcf->previous);
710
711 return tmcf;
712
713temp_fail:
714
715 nxt_mp_destroy(tmp);
716
717fail:
718
719 nxt_mp_destroy(mp);
720
721 return NULL;
722}
723
724
725static void
726nxt_router_conf_apply(nxt_task_t *task, void *obj, void *data)
727{
728 nxt_int_t ret;
729 nxt_router_t *router;
730 nxt_runtime_t *rt;
731 nxt_queue_link_t *qlk;
732 nxt_socket_conf_t *skcf;
733 nxt_router_temp_conf_t *tmcf;
734 const nxt_event_interface_t *interface;
735
736 tmcf = obj;
737
738 qlk = nxt_queue_first(&tmcf->pending);
739
740 if (qlk != nxt_queue_tail(&tmcf->pending)) {
741 nxt_queue_remove(qlk);
742 nxt_queue_insert_tail(&tmcf->creating, qlk);
743
744 skcf = nxt_queue_link_data(qlk, nxt_socket_conf_t, link);
745
746 nxt_router_listen_socket_rpc_create(task, tmcf, skcf);
747
748 return;
749 }
750
751 rt = task->thread->runtime;
752
753 interface = nxt_service_get(rt->services, "engine", NULL);
754
755 router = tmcf->conf->router;
756
757 ret = nxt_router_engines_create(task, router, tmcf, interface);
758 if (nxt_slow_path(ret != NXT_OK)) {
759 goto fail;
760 }
761
762 ret = nxt_router_threads_create(task, rt, tmcf);
763 if (nxt_slow_path(ret != NXT_OK)) {
764 goto fail;
765 }
766
767 nxt_router_apps_sort(task, router, tmcf);
768
769 nxt_router_engines_post(router, tmcf);
770
771 nxt_queue_add(&router->sockets, &tmcf->updating);
772 nxt_queue_add(&router->sockets, &tmcf->creating);
773
774 nxt_router_conf_ready(task, tmcf);
775
776 return;
777
778fail:
779
780 nxt_router_conf_error(task, tmcf);
781
782 return;
783}
784
785
786static void
787nxt_router_conf_wait(nxt_task_t *task, void *obj, void *data)
788{
789 nxt_joint_job_t *job;
790
791 job = obj;
792
793 nxt_router_conf_ready(task, job->tmcf);
794}
795
796
797static void
798nxt_router_conf_ready(nxt_task_t *task, nxt_router_temp_conf_t *tmcf)
799{
800 nxt_debug(task, "temp conf count:%D", tmcf->count);
801
802 if (--tmcf->count == 0) {
803 nxt_router_conf_send(task, tmcf, NXT_PORT_MSG_RPC_READY_LAST);
804 }
805}
806
807
808static void
809nxt_router_conf_error(nxt_task_t *task, nxt_router_temp_conf_t *tmcf)
810{
811 nxt_socket_t s;
812 nxt_router_t *router;
813 nxt_queue_link_t *qlk;
814 nxt_socket_conf_t *skcf;
815
816 nxt_log(task, NXT_LOG_CRIT, "failed to apply new conf");
817
818 for (qlk = nxt_queue_first(&tmcf->creating);
819 qlk != nxt_queue_tail(&tmcf->creating);
820 qlk = nxt_queue_next(qlk))
821 {
822 skcf = nxt_queue_link_data(qlk, nxt_socket_conf_t, link);
823 s = skcf->listen->socket;
824
825 if (s != -1) {
826 nxt_socket_close(task, s);
827 }
828
829 nxt_free(skcf->listen);
830 }
831
832 router = tmcf->conf->router;
833
834 nxt_queue_add(&router->sockets, &tmcf->keeping);
835 nxt_queue_add(&router->sockets, &tmcf->deleting);
836
837 // TODO: new engines and threads
838
839 nxt_mp_destroy(tmcf->conf->mem_pool);
840
841 nxt_router_conf_send(task, tmcf, NXT_PORT_MSG_RPC_ERROR);
842}
843
844
845static void
846nxt_router_conf_send(nxt_task_t *task, nxt_router_temp_conf_t *tmcf,
847 nxt_port_msg_type_t type)
848{
849 nxt_port_socket_write(task, tmcf->port, type, -1, tmcf->stream, 0, NULL);
850}
851
852
853static nxt_conf_map_t nxt_router_conf[] = {
854 {
855 nxt_string("listeners_threads"),
856 NXT_CONF_MAP_INT32,
857 offsetof(nxt_router_conf_t, threads),
858 },
859};
860
861
862static nxt_conf_map_t nxt_router_app_conf[] = {
863 {
864 nxt_string("type"),
865 NXT_CONF_MAP_STR,
866 offsetof(nxt_router_app_conf_t, type),
867 },
868
869 {
870 nxt_string("workers"),
871 NXT_CONF_MAP_INT32,
872 offsetof(nxt_router_app_conf_t, workers),
873 },
874
875 {
876 nxt_string("limits"),
877 NXT_CONF_MAP_PTR,
878 offsetof(nxt_router_app_conf_t, limits_value),
879 },
880};
881
882
883static nxt_conf_map_t nxt_router_app_limits_conf[] = {
884 {
885 nxt_string("timeout"),
886 NXT_CONF_MAP_MSEC,
887 offsetof(nxt_router_app_conf_t, timeout),
888 },
889
890 {
891 nxt_string("requests"),
892 NXT_CONF_MAP_INT32,
893 offsetof(nxt_router_app_conf_t, requests),
894 },
895};
896
897
898static nxt_conf_map_t nxt_router_listener_conf[] = {
899 {
900 nxt_string("application"),
901 NXT_CONF_MAP_STR,
902 offsetof(nxt_router_listener_conf_t, application),
903 },
904};
905
906
907static nxt_conf_map_t nxt_router_http_conf[] = {
908 {
909 nxt_string("header_buffer_size"),
910 NXT_CONF_MAP_SIZE,
911 offsetof(nxt_socket_conf_t, header_buffer_size),
912 },
913
914 {
915 nxt_string("large_header_buffer_size"),
916 NXT_CONF_MAP_SIZE,
917 offsetof(nxt_socket_conf_t, large_header_buffer_size),
918 },
919
920 {
921 nxt_string("large_header_buffers"),
922 NXT_CONF_MAP_SIZE,
923 offsetof(nxt_socket_conf_t, large_header_buffers),
924 },
925
926 {
927 nxt_string("body_buffer_size"),
928 NXT_CONF_MAP_SIZE,
929 offsetof(nxt_socket_conf_t, body_buffer_size),
930 },
931
932 {
933 nxt_string("max_body_size"),
934 NXT_CONF_MAP_SIZE,
935 offsetof(nxt_socket_conf_t, max_body_size),
936 },
937
938 {
939 nxt_string("header_read_timeout"),
940 NXT_CONF_MAP_MSEC,
941 offsetof(nxt_socket_conf_t, header_read_timeout),
942 },
943
944 {
945 nxt_string("body_read_timeout"),
946 NXT_CONF_MAP_MSEC,
947 offsetof(nxt_socket_conf_t, body_read_timeout),
948 },
949};
950
951
952static nxt_int_t
953nxt_router_conf_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf,
954 u_char *start, u_char *end)
955{
956 u_char *p;
957 size_t size;
958 nxt_mp_t *mp;
959 uint32_t next;
960 nxt_int_t ret;
961 nxt_str_t name;
962 nxt_app_t *app, *prev;
963 nxt_router_t *router;
964 nxt_conf_value_t *conf, *http;
965 nxt_conf_value_t *applications, *application;
966 nxt_conf_value_t *listeners, *listener;
967 nxt_socket_conf_t *skcf;
968 nxt_app_lang_module_t *lang;
969 nxt_router_app_conf_t apcf;
970 nxt_router_listener_conf_t lscf;
971
972 static nxt_str_t http_path = nxt_string("/http");
973 static nxt_str_t applications_path = nxt_string("/applications");
974 static nxt_str_t listeners_path = nxt_string("/listeners");
975
976 conf = nxt_conf_json_parse(tmcf->mem_pool, start, end, NULL);
977 if (conf == NULL) {
978 nxt_log(task, NXT_LOG_CRIT, "configuration parsing error");
979 return NXT_ERROR;
980 }
981
982 mp = tmcf->conf->mem_pool;
983
984 ret = nxt_conf_map_object(mp, conf, nxt_router_conf,
985 nxt_nitems(nxt_router_conf), tmcf->conf);
986 if (ret != NXT_OK) {
987 nxt_log(task, NXT_LOG_CRIT, "root map error");
988 return NXT_ERROR;
989 }
990
991 if (tmcf->conf->threads == 0) {
992 tmcf->conf->threads = nxt_ncpu;
993 }
994
995 applications = nxt_conf_get_path(conf, &applications_path);
996 if (applications == NULL) {
997 nxt_log(task, NXT_LOG_CRIT, "no \"applications\" block");
998 return NXT_ERROR;
999 }
1000
1001 router = tmcf->conf->router;
1002
1003 next = 0;
1004
1005 for ( ;; ) {
1006 application = nxt_conf_next_object_member(applications, &name, &next);
1007 if (application == NULL) {
1008 break;
1009 }
1010
1011 nxt_debug(task, "application \"%V\"", &name);
1012
1013 size = nxt_conf_json_length(application, NULL);
1014
1015 app = nxt_malloc(sizeof(nxt_app_t) + name.length + size);
1016 if (app == NULL) {
1017 goto fail;
1018 }
1019
1020 nxt_memzero(app, sizeof(nxt_app_t));
1021
1022 app->name.start = nxt_pointer_to(app, sizeof(nxt_app_t));
1023 app->conf.start = nxt_pointer_to(app, sizeof(nxt_app_t) + name.length);
1024
1025 p = nxt_conf_json_print(app->conf.start, application, NULL);
1026 app->conf.length = p - app->conf.start;
1027
1028 nxt_assert(app->conf.length <= size);
1029
1030 nxt_debug(task, "application conf \"%V\"", &app->conf);
1031
1032 prev = nxt_router_app_find(&router->apps, &name);
1033
1034 if (prev != NULL && nxt_strstr_eq(&app->conf, &prev->conf)) {
1035 nxt_free(app);
1036
1037 nxt_queue_remove(&prev->link);
1038 nxt_queue_insert_tail(&tmcf->previous, &prev->link);
1039 continue;
1040 }
1041
1042 apcf.workers = 1;
1043 apcf.timeout = 0;
1044 apcf.requests = 0;
1045 apcf.limits_value = NULL;
1046
1047 ret = nxt_conf_map_object(mp, application, nxt_router_app_conf,
1048 nxt_nitems(nxt_router_app_conf), &apcf);
1049 if (ret != NXT_OK) {
1050 nxt_log(task, NXT_LOG_CRIT, "application map error");
1051 goto app_fail;
1052 }
1053
1054 if (apcf.limits_value != NULL) {
1055
1056 if (nxt_conf_type(apcf.limits_value) != NXT_CONF_OBJECT) {
1057 nxt_log(task, NXT_LOG_CRIT, "application limits is not object");
1058 goto app_fail;
1059 }
1060
1061 ret = nxt_conf_map_object(mp, apcf.limits_value,
1062 nxt_router_app_limits_conf,
1063 nxt_nitems(nxt_router_app_limits_conf),
1064 &apcf);
1065 if (ret != NXT_OK) {
1066 nxt_log(task, NXT_LOG_CRIT, "application limits map error");
1067 goto app_fail;
1068 }
1069 }
1070
1071 nxt_debug(task, "application type: %V", &apcf.type);
1072 nxt_debug(task, "application workers: %D", apcf.workers);
1073 nxt_debug(task, "application timeout: %D", apcf.timeout);
1074 nxt_debug(task, "application requests: %D", apcf.requests);
1075
1076 lang = nxt_app_lang_module(task->thread->runtime, &apcf.type);
1077
1078 if (lang == NULL) {
1079 nxt_log(task, NXT_LOG_CRIT, "unknown application type: \"%V\"",
1080 &apcf.type);
1081 goto app_fail;
1082 }
1083
1084 nxt_debug(task, "application language module: \"%s\"", lang->file);
1085
1086 ret = nxt_thread_mutex_create(&app->mutex);
1087 if (ret != NXT_OK) {
1088 goto app_fail;
1089 }
1090
1091 nxt_queue_init(&app->ports);
1092 nxt_queue_init(&app->requests);
1093
1094 app->name.length = name.length;
1095 nxt_memcpy(app->name.start, name.start, name.length);
1096
1097 app->type = lang->type;
1098 app->max_workers = apcf.workers;
1099 app->timeout = apcf.timeout;
1100 app->live = 1;
1101 app->max_pending_responses = 2;
1102 app->prepare_msg = nxt_app_prepare_msg[lang->type];
1103
1104 nxt_queue_insert_tail(&tmcf->apps, &app->link);
1105
1106 nxt_router_app_use(task, app, 1);
1107 }
1108
1109 http = nxt_conf_get_path(conf, &http_path);
1110#if 0
1111 if (http == NULL) {
1112 nxt_log(task, NXT_LOG_CRIT, "no \"http\" block");
1113 return NXT_ERROR;
1114 }
1115#endif
1116
1117 listeners = nxt_conf_get_path(conf, &listeners_path);
1118 if (listeners == NULL) {
1119 nxt_log(task, NXT_LOG_CRIT, "no \"listeners\" block");
1120 return NXT_ERROR;
1121 }
1122
1123 next = 0;
1124
1125 for ( ;; ) {
1126 listener = nxt_conf_next_object_member(listeners, &name, &next);
1127 if (listener == NULL) {
1128 break;
1129 }
1130
1131 skcf = nxt_router_socket_conf(task, tmcf, &name);
1132 if (skcf == NULL) {
1133 goto fail;
1134 }
1135
1136 ret = nxt_conf_map_object(mp, listener, nxt_router_listener_conf,
1137 nxt_nitems(nxt_router_listener_conf), &lscf);
1138 if (ret != NXT_OK) {
1139 nxt_log(task, NXT_LOG_CRIT, "listener map error");
1140 goto fail;
1141 }
1142
1143 nxt_debug(task, "application: %V", &lscf.application);
1144
1145 // STUB, default values if http block is not defined.
1146 skcf->header_buffer_size = 2048;
1147 skcf->large_header_buffer_size = 8192;
1148 skcf->large_header_buffers = 4;
1149 skcf->body_buffer_size = 16 * 1024;
1150 skcf->max_body_size = 2 * 1024 * 1024;
1151 skcf->header_read_timeout = 5000;
1152 skcf->body_read_timeout = 5000;
1153
1154 if (http != NULL) {
1155 ret = nxt_conf_map_object(mp, http, nxt_router_http_conf,
1156 nxt_nitems(nxt_router_http_conf), skcf);
1157 if (ret != NXT_OK) {
1158 nxt_log(task, NXT_LOG_CRIT, "http map error");
1159 goto fail;
1160 }
1161 }
1162
1163 skcf->listen->handler = nxt_router_conn_init;
1164 skcf->router_conf = tmcf->conf;
1165 skcf->router_conf->count++;
1166 skcf->application = nxt_router_listener_application(tmcf,
1167 &lscf.application);
1168 }
1169
1170 nxt_queue_add(&tmcf->deleting, &router->sockets);
1171 nxt_queue_init(&router->sockets);
1172
1173 return NXT_OK;
1174
1175app_fail:
1176
1177 nxt_free(app);
1178
1179fail:
1180
1181 nxt_queue_each(app, &tmcf->apps, nxt_app_t, link) {
1182
1183 nxt_queue_remove(&app->link);
1184 nxt_thread_mutex_destroy(&app->mutex);
1185 nxt_free(app);
1186
1187 } nxt_queue_loop;
1188
1189 return NXT_ERROR;
1190}
1191
1192
1193static nxt_app_t *
1194nxt_router_app_find(nxt_queue_t *queue, nxt_str_t *name)
1195{
1196 nxt_app_t *app;
1197
1198 nxt_queue_each(app, queue, nxt_app_t, link) {
1199
1200 if (nxt_strstr_eq(name, &app->name)) {
1201 return app;
1202 }
1203
1204 } nxt_queue_loop;
1205
1206 return NULL;
1207}
1208
1209
1210static nxt_app_t *
1211nxt_router_listener_application(nxt_router_temp_conf_t *tmcf, nxt_str_t *name)
1212{
1213 nxt_app_t *app;
1214
1215 app = nxt_router_app_find(&tmcf->apps, name);
1216
1217 if (app == NULL) {
1218 app = nxt_router_app_find(&tmcf->previous, name);
1219 }
1220
1221 return app;
1222}
1223
1224
1225static nxt_socket_conf_t *
1226nxt_router_socket_conf(nxt_task_t *task, nxt_router_temp_conf_t *tmcf,
1227 nxt_str_t *name)
1228{
1229 size_t size;
1230 nxt_int_t ret;
1231 nxt_bool_t wildcard;
1232 nxt_sockaddr_t *sa;
1233 nxt_socket_conf_t *skcf;
1234 nxt_listen_socket_t *ls;
1235
1236 sa = nxt_sockaddr_parse(tmcf->mem_pool, name);
1237 if (nxt_slow_path(sa == NULL)) {
1238 nxt_log(task, NXT_LOG_CRIT, "invalid listener \"%V\"", name);
1239 return NULL;
1240 }
1241
1242 sa->type = SOCK_STREAM;
1243
1244 nxt_debug(task, "router listener: \"%*s\"",
1245 sa->length, nxt_sockaddr_start(sa));
1246
1247 skcf = nxt_mp_zget(tmcf->conf->mem_pool, sizeof(nxt_socket_conf_t));
1248 if (nxt_slow_path(skcf == NULL)) {
1249 return NULL;
1250 }
1251
1252 size = nxt_sockaddr_size(sa);
1253
1254 ret = nxt_router_listen_socket_find(tmcf, skcf, sa);
1255
1256 if (ret != NXT_OK) {
1257
1258 ls = nxt_zalloc(sizeof(nxt_listen_socket_t) + size);
1259 if (nxt_slow_path(ls == NULL)) {
1260 return NULL;
1261 }
1262
1263 skcf->listen = ls;
1264
1265 ls->sockaddr = nxt_pointer_to(ls, sizeof(nxt_listen_socket_t));
1266 nxt_memcpy(ls->sockaddr, sa, size);
1267
1268 nxt_listen_socket_remote_size(ls);
1269
1270 ls->socket = -1;
1271 ls->backlog = NXT_LISTEN_BACKLOG;
1272 ls->flags = NXT_NONBLOCK;
1273 ls->read_after_accept = 1;
1274 }
1275
1276 switch (sa->u.sockaddr.sa_family) {
1277#if (NXT_HAVE_UNIX_DOMAIN)
1278 case AF_UNIX:
1279 wildcard = 0;
1280 break;
1281#endif
1282#if (NXT_INET6)
1283 case AF_INET6:
1284 wildcard = IN6_IS_ADDR_UNSPECIFIED(&sa->u.sockaddr_in6.sin6_addr);
1285 break;
1286#endif
1287 case AF_INET:
1288 default:
1289 wildcard = (sa->u.sockaddr_in.sin_addr.s_addr == INADDR_ANY);
1290 break;
1291 }
1292
1293 if (!wildcard) {
1294 skcf->sockaddr = nxt_mp_zget(tmcf->conf->mem_pool, size);
1295 if (nxt_slow_path(skcf->sockaddr == NULL)) {
1296 return NULL;
1297 }
1298
1299 nxt_memcpy(skcf->sockaddr, sa, size);
1300 }
1301
1302 return skcf;
1303}
1304
1305
1306static nxt_int_t
1307nxt_router_listen_socket_find(nxt_router_temp_conf_t *tmcf,
1308 nxt_socket_conf_t *nskcf, nxt_sockaddr_t *sa)
1309{
1310 nxt_router_t *router;
1311 nxt_queue_link_t *qlk;
1312 nxt_socket_conf_t *skcf;
1313
1314 router = tmcf->conf->router;
1315
1316 for (qlk = nxt_queue_first(&router->sockets);
1317 qlk != nxt_queue_tail(&router->sockets);
1318 qlk = nxt_queue_next(qlk))
1319 {
1320 skcf = nxt_queue_link_data(qlk, nxt_socket_conf_t, link);
1321
1322 if (nxt_sockaddr_cmp(skcf->listen->sockaddr, sa)) {
1323 nskcf->listen = skcf->listen;
1324
1325 nxt_queue_remove(qlk);
1326 nxt_queue_insert_tail(&tmcf->keeping, qlk);
1327
1328 nxt_queue_insert_tail(&tmcf->updating, &nskcf->link);
1329
1330 return NXT_OK;
1331 }
1332 }
1333
1334 nxt_queue_insert_tail(&tmcf->pending, &nskcf->link);
1335
1336 return NXT_DECLINED;
1337}
1338
1339
1340static void
1341nxt_router_listen_socket_rpc_create(nxt_task_t *task,
1342 nxt_router_temp_conf_t *tmcf, nxt_socket_conf_t *skcf)
1343{
1344 size_t size;
1345 uint32_t stream;
1346 nxt_buf_t *b;
1347 nxt_port_t *main_port, *router_port;
1348 nxt_runtime_t *rt;
1349 nxt_socket_rpc_t *rpc;
1350
1351 rpc = nxt_mp_alloc(tmcf->mem_pool, sizeof(nxt_socket_rpc_t));
1352 if (rpc == NULL) {
1353 goto fail;
1354 }
1355
1356 rpc->socket_conf = skcf;
1357 rpc->temp_conf = tmcf;
1358
1359 size = nxt_sockaddr_size(skcf->listen->sockaddr);
1360
1361 b = nxt_buf_mem_alloc(tmcf->mem_pool, size, 0);
1362 if (b == NULL) {
1363 goto fail;
1364 }
1365
1366 b->mem.free = nxt_cpymem(b->mem.free, skcf->listen->sockaddr, size);
1367
1368 rt = task->thread->runtime;
1369 main_port = rt->port_by_type[NXT_PROCESS_MAIN];
1370 router_port = rt->port_by_type[NXT_PROCESS_ROUTER];
1371
1372 stream = nxt_port_rpc_register_handler(task, router_port,
1373 nxt_router_listen_socket_ready,
1374 nxt_router_listen_socket_error,
1375 main_port->pid, rpc);
1376 if (stream == 0) {
1377 goto fail;
1378 }
1379
1380 nxt_port_socket_write(task, main_port, NXT_PORT_MSG_SOCKET, -1,
1381 stream, router_port->id, b);
1382
1383 return;
1384
1385fail:
1386
1387 nxt_router_conf_error(task, tmcf);
1388}
1389
1390
1391static void
1392nxt_router_listen_socket_ready(nxt_task_t *task, nxt_port_recv_msg_t *msg,
1393 void *data)
1394{
1395 nxt_int_t ret;
1396 nxt_socket_t s;
1397 nxt_socket_rpc_t *rpc;
1398
1399 rpc = data;
1400
1401 s = msg->fd;
1402
1403 ret = nxt_socket_nonblocking(task, s);
1404 if (nxt_slow_path(ret != NXT_OK)) {
1405 goto fail;
1406 }
1407
1408 nxt_socket_defer_accept(task, s, rpc->socket_conf->listen->sockaddr);
1409
1410 ret = nxt_listen_socket(task, s, NXT_LISTEN_BACKLOG);
1411 if (nxt_slow_path(ret != NXT_OK)) {
1412 goto fail;
1413 }
1414
1415 rpc->socket_conf->listen->socket = s;
1416
1417 nxt_work_queue_add(&task->thread->engine->fast_work_queue,
1418 nxt_router_conf_apply, task, rpc->temp_conf, NULL);
1419
1420 return;
1421
1422fail:
1423
1424 nxt_socket_close(task, s);
1425
1426 nxt_router_conf_error(task, rpc->temp_conf);
1427}
1428
1429
1430static void
1431nxt_router_listen_socket_error(nxt_task_t *task, nxt_port_recv_msg_t *msg,
1432 void *data)
1433{
1434 u_char *p;
1435 size_t size;
1436 uint8_t error;
1437 nxt_buf_t *in, *out;
1438 nxt_sockaddr_t *sa;
1439 nxt_socket_rpc_t *rpc;
1440 nxt_router_temp_conf_t *tmcf;
1441
1442 static nxt_str_t socket_errors[] = {
1443 nxt_string("ListenerSystem"),
1444 nxt_string("ListenerNoIPv6"),
1445 nxt_string("ListenerPort"),
1446 nxt_string("ListenerInUse"),
1447 nxt_string("ListenerNoAddress"),
1448 nxt_string("ListenerNoAccess"),
1449 nxt_string("ListenerPath"),
1450 };
1451
1452 rpc = data;
1453 sa = rpc->socket_conf->listen->sockaddr;
1454 tmcf = rpc->temp_conf;
1455
1456 in = nxt_buf_chk_make_plain(tmcf->mem_pool, msg->buf, msg->size);
1457
1458 nxt_assert(in != NULL);
1459
1460 p = in->mem.pos;
1461
1462 error = *p++;
1463
1464 size = sizeof("listen socket error: ") - 1
1465 + sizeof("{listener: \"\", code:\"\", message: \"\"}") - 1
1466 + sa->length + socket_errors[error].length + (in->mem.free - p);
1467
1468 out = nxt_buf_mem_alloc(tmcf->mem_pool, size, 0);
1469 if (nxt_slow_path(out == NULL)) {
1470 return;
1471 }
1472
1473 out->mem.free = nxt_sprintf(out->mem.free, out->mem.end,
1474 "listen socket error: "
1475 "{listener: \"%*s\", code:\"%V\", message: \"%*s\"}",
1476 sa->length, nxt_sockaddr_start(sa),
1477 &socket_errors[error], in->mem.free - p, p);
1478
1479 nxt_debug(task, "%*s", out->mem.free - out->mem.pos, out->mem.pos);
1480
1481 nxt_router_conf_error(task, tmcf);
1482}
1483
1484
1485static nxt_int_t
1486nxt_router_engines_create(nxt_task_t *task, nxt_router_t *router,
1487 nxt_router_temp_conf_t *tmcf, const nxt_event_interface_t *interface)
1488{
1489 nxt_int_t ret;
1490 nxt_uint_t n, threads;
1491 nxt_queue_link_t *qlk;
1492 nxt_router_engine_conf_t *recf;
1493
1494 threads = tmcf->conf->threads;
1495
1496 tmcf->engines = nxt_array_create(tmcf->mem_pool, threads,
1497 sizeof(nxt_router_engine_conf_t));
1498 if (nxt_slow_path(tmcf->engines == NULL)) {
1499 return NXT_ERROR;
1500 }
1501
1502 n = 0;
1503
1504 for (qlk = nxt_queue_first(&router->engines);
1505 qlk != nxt_queue_tail(&router->engines);
1506 qlk = nxt_queue_next(qlk))
1507 {
1508 recf = nxt_array_zero_add(tmcf->engines);
1509 if (nxt_slow_path(recf == NULL)) {
1510 return NXT_ERROR;
1511 }
1512
1513 recf->engine = nxt_queue_link_data(qlk, nxt_event_engine_t, link0);
1514
1515 if (n < threads) {
1516 recf->action = NXT_ROUTER_ENGINE_KEEP;
1517 ret = nxt_router_engine_conf_update(tmcf, recf);
1518
1519 } else {
1520 recf->action = NXT_ROUTER_ENGINE_DELETE;
1521 ret = nxt_router_engine_conf_delete(tmcf, recf);
1522 }
1523
1524 if (nxt_slow_path(ret != NXT_OK)) {
1525 return ret;
1526 }
1527
1528 n++;
1529 }
1530
1531 tmcf->new_threads = n;
1532
1533 while (n < threads) {
1534 recf = nxt_array_zero_add(tmcf->engines);
1535 if (nxt_slow_path(recf == NULL)) {
1536 return NXT_ERROR;
1537 }
1538
1539 recf->action = NXT_ROUTER_ENGINE_ADD;
1540
1541 recf->engine = nxt_event_engine_create(task, interface, NULL, 0, 0);
1542 if (nxt_slow_path(recf->engine == NULL)) {
1543 return NXT_ERROR;
1544 }
1545
1546 ret = nxt_router_engine_conf_create(tmcf, recf);
1547 if (nxt_slow_path(ret != NXT_OK)) {
1548 return ret;
1549 }
1550
1551 n++;
1552 }
1553
1554 return NXT_OK;
1555}
1556
1557
1558static nxt_int_t
1559nxt_router_engine_conf_create(nxt_router_temp_conf_t *tmcf,
1560 nxt_router_engine_conf_t *recf)
1561{
1562 nxt_int_t ret;
1563
1564 ret = nxt_router_engine_joints_create(tmcf, recf, &tmcf->creating,
1565 nxt_router_listen_socket_create);
1566 if (nxt_slow_path(ret != NXT_OK)) {
1567 return ret;
1568 }
1569
1570 ret = nxt_router_engine_joints_create(tmcf, recf, &tmcf->updating,
1571 nxt_router_listen_socket_create);
1572 if (nxt_slow_path(ret != NXT_OK)) {
1573 return ret;
1574 }
1575
1576 return ret;
1577}
1578
1579
1580static nxt_int_t
1581nxt_router_engine_conf_update(nxt_router_temp_conf_t *tmcf,
1582 nxt_router_engine_conf_t *recf)
1583{
1584 nxt_int_t ret;
1585
1586 ret = nxt_router_engine_joints_create(tmcf, recf, &tmcf->creating,
1587 nxt_router_listen_socket_create);
1588 if (nxt_slow_path(ret != NXT_OK)) {
1589 return ret;
1590 }
1591
1592 ret = nxt_router_engine_joints_create(tmcf, recf, &tmcf->updating,
1593 nxt_router_listen_socket_update);
1594 if (nxt_slow_path(ret != NXT_OK)) {
1595 return ret;
1596 }
1597
1598 ret = nxt_router_engine_joints_delete(tmcf, recf, &tmcf->deleting);
1599 if (nxt_slow_path(ret != NXT_OK)) {
1600 return ret;
1601 }
1602
1603 return ret;
1604}
1605
1606
1607static nxt_int_t
1608nxt_router_engine_conf_delete(nxt_router_temp_conf_t *tmcf,
1609 nxt_router_engine_conf_t *recf)
1610{
1611 nxt_int_t ret;
1612
1613 ret = nxt_router_engine_quit(tmcf, recf);
1614 if (nxt_slow_path(ret != NXT_OK)) {
1615 return ret;
1616 }
1617
1618 ret = nxt_router_engine_joints_delete(tmcf, recf, &tmcf->updating);
1619 if (nxt_slow_path(ret != NXT_OK)) {
1620 return ret;
1621 }
1622
1623 return nxt_router_engine_joints_delete(tmcf, recf, &tmcf->deleting);
1624}
1625
1626
1627static nxt_int_t
1628nxt_router_engine_joints_create(nxt_router_temp_conf_t *tmcf,
1629 nxt_router_engine_conf_t *recf, nxt_queue_t *sockets,
1630 nxt_work_handler_t handler)
1631{
1632 nxt_joint_job_t *job;
1633 nxt_queue_link_t *qlk;
1634 nxt_socket_conf_t *skcf;
1635 nxt_socket_conf_joint_t *joint;
1636
1637 for (qlk = nxt_queue_first(sockets);
1638 qlk != nxt_queue_tail(sockets);
1639 qlk = nxt_queue_next(qlk))
1640 {
1641 job = nxt_mp_get(tmcf->mem_pool, sizeof(nxt_joint_job_t));
1642 if (nxt_slow_path(job == NULL)) {
1643 return NXT_ERROR;
1644 }
1645
1646 job->work.next = recf->jobs;
1647 recf->jobs = &job->work;
1648
1649 job->task = tmcf->engine->task;
1650 job->work.handler = handler;
1651 job->work.task = &job->task;
1652 job->work.obj = job;
1653 job->tmcf = tmcf;
1654
1655 tmcf->count++;
1656
1657 joint = nxt_mp_alloc(tmcf->conf->mem_pool,
1658 sizeof(nxt_socket_conf_joint_t));
1659 if (nxt_slow_path(joint == NULL)) {
1660 return NXT_ERROR;
1661 }
1662
1663 job->work.data = joint;
1664
1665 joint->count = 1;
1666
1667 skcf = nxt_queue_link_data(qlk, nxt_socket_conf_t, link);
1668 skcf->count++;
1669 joint->socket_conf = skcf;
1670
1671 joint->engine = recf->engine;
1672 }
1673
1674 return NXT_OK;
1675}
1676
1677
1678static nxt_int_t
1679nxt_router_engine_quit(nxt_router_temp_conf_t *tmcf,
1680 nxt_router_engine_conf_t *recf)
1681{
1682 nxt_joint_job_t *job;
1683
1684 job = nxt_mp_get(tmcf->mem_pool, sizeof(nxt_joint_job_t));
1685 if (nxt_slow_path(job == NULL)) {
1686 return NXT_ERROR;
1687 }
1688
1689 job->work.next = recf->jobs;
1690 recf->jobs = &job->work;
1691
1692 job->task = tmcf->engine->task;
1693 job->work.handler = nxt_router_worker_thread_quit;
1694 job->work.task = &job->task;
1695 job->work.obj = NULL;
1696 job->work.data = NULL;
1697 job->tmcf = NULL;
1698
1699 return NXT_OK;
1700}
1701
1702
1703static nxt_int_t
1704nxt_router_engine_joints_delete(nxt_router_temp_conf_t *tmcf,
1705 nxt_router_engine_conf_t *recf, nxt_queue_t *sockets)
1706{
1707 nxt_joint_job_t *job;
1708 nxt_queue_link_t *qlk;
1709
1710 for (qlk = nxt_queue_first(sockets);
1711 qlk != nxt_queue_tail(sockets);
1712 qlk = nxt_queue_next(qlk))
1713 {
1714 job = nxt_mp_get(tmcf->mem_pool, sizeof(nxt_joint_job_t));
1715 if (nxt_slow_path(job == NULL)) {
1716 return NXT_ERROR;
1717 }
1718
1719 job->work.next = recf->jobs;
1720 recf->jobs = &job->work;
1721
1722 job->task = tmcf->engine->task;
1723 job->work.handler = nxt_router_listen_socket_delete;
1724 job->work.task = &job->task;
1725 job->work.obj = job;
1726 job->work.data = nxt_queue_link_data(qlk, nxt_socket_conf_t, link);
1727 job->tmcf = tmcf;
1728
1729 tmcf->count++;
1730 }
1731
1732 return NXT_OK;
1733}
1734
1735
1736static nxt_int_t
1737nxt_router_threads_create(nxt_task_t *task, nxt_runtime_t *rt,
1738 nxt_router_temp_conf_t *tmcf)
1739{
1740 nxt_int_t ret;
1741 nxt_uint_t i, threads;
1742 nxt_router_engine_conf_t *recf;
1743
1744 recf = tmcf->engines->elts;
1745 threads = tmcf->conf->threads;
1746
1747 for (i = tmcf->new_threads; i < threads; i++) {
1748 ret = nxt_router_thread_create(task, rt, recf[i].engine);
1749 if (nxt_slow_path(ret != NXT_OK)) {
1750 return ret;
1751 }
1752 }
1753
1754 return NXT_OK;
1755}
1756
1757
1758static nxt_int_t
1759nxt_router_thread_create(nxt_task_t *task, nxt_runtime_t *rt,
1760 nxt_event_engine_t *engine)
1761{
1762 nxt_int_t ret;
1763 nxt_thread_link_t *link;
1764 nxt_thread_handle_t handle;
1765
1766 link = nxt_zalloc(sizeof(nxt_thread_link_t));
1767
1768 if (nxt_slow_path(link == NULL)) {
1769 return NXT_ERROR;
1770 }
1771
1772 link->start = nxt_router_thread_start;
1773 link->engine = engine;
1774 link->work.handler = nxt_router_thread_exit_handler;
1775 link->work.task = task;
1776 link->work.data = link;
1777
1778 nxt_queue_insert_tail(&rt->engines, &engine->link);
1779
1780 ret = nxt_thread_create(&handle, link);
1781
1782 if (nxt_slow_path(ret != NXT_OK)) {
1783 nxt_queue_remove(&engine->link);
1784 }
1785
1786 return ret;
1787}
1788
1789
1790static void
1791nxt_router_apps_sort(nxt_task_t *task, nxt_router_t *router,
1792 nxt_router_temp_conf_t *tmcf)
1793{
1794 nxt_app_t *app;
1795 nxt_port_t *port;
1796
1797 nxt_queue_each(app, &router->apps, nxt_app_t, link) {
1798
1799 nxt_queue_remove(&app->link);
1800
1801 nxt_debug(task, "about to free app '%V' %p", &app->name, app);
1802
1803 app->live = 0;
1804
1805 do {
1806 port = nxt_router_app_get_idle_port(app);
1807 if (port == NULL) {
1808 break;
1809 }
1810
1811 nxt_debug(task, "port %p send quit", port);
1812
1813 nxt_port_socket_write(task, port, NXT_PORT_MSG_QUIT, -1, 0, 0,
1814 NULL);
1815
1816 nxt_port_use(task, port, -1);
1817 } while (1);
1818
1819 nxt_router_app_use(task, app, -1);
1820
1821 } nxt_queue_loop;
1822
1823 nxt_queue_add(&router->apps, &tmcf->previous);
1824 nxt_queue_add(&router->apps, &tmcf->apps);
1825}
1826
1827
1828static void
1829nxt_router_engines_post(nxt_router_t *router, nxt_router_temp_conf_t *tmcf)
1830{
1831 nxt_uint_t n;
1832 nxt_event_engine_t *engine;
1833 nxt_router_engine_conf_t *recf;
1834
1835 recf = tmcf->engines->elts;
1836
1837 for (n = tmcf->engines->nelts; n != 0; n--) {
1838 engine = recf->engine;
1839
1840 switch (recf->action) {
1841
1842 case NXT_ROUTER_ENGINE_KEEP:
1843 break;
1844
1845 case NXT_ROUTER_ENGINE_ADD:
1846 nxt_queue_insert_tail(&router->engines, &engine->link0);
1847 break;
1848
1849 case NXT_ROUTER_ENGINE_DELETE:
1850 nxt_queue_remove(&engine->link0);
1851 break;
1852 }
1853
1854 nxt_router_engine_post(engine, recf->jobs);
1855
1856 recf++;
1857 }
1858}
1859
1860
1861static void
1862nxt_router_engine_post(nxt_event_engine_t *engine, nxt_work_t *jobs)
1863{
1864 nxt_work_t *work, *next;
1865
1866 for (work = jobs; work != NULL; work = next) {
1867 next = work->next;
1868 work->next = NULL;
1869
1870 nxt_event_engine_post(engine, work);
1871 }
1872}
1873
1874
1875static nxt_port_handlers_t nxt_router_app_port_handlers = {
1876 .mmap = nxt_port_mmap_handler,
1877 .data = nxt_port_rpc_handler,
1878};
1879
1880
1881static void
1882nxt_router_thread_start(void *data)
1883{
1884 nxt_int_t ret;
1885 nxt_port_t *port;
1886 nxt_task_t *task;
1887 nxt_thread_t *thread;
1888 nxt_thread_link_t *link;
1889 nxt_event_engine_t *engine;
1890
1891 link = data;
1892 engine = link->engine;
1893 task = &engine->task;
1894
1895 thread = nxt_thread();
1896
1897 nxt_event_engine_thread_adopt(engine);
1898
1899 /* STUB */
1900 thread->runtime = engine->task.thread->runtime;
1901
1902 engine->task.thread = thread;
1903 engine->task.log = thread->log;
1904 thread->engine = engine;
1905 thread->task = &engine->task;
1906#if 0
1907 thread->fiber = &engine->fibers->fiber;
1908#endif
1909
1910 engine->mem_pool = nxt_mp_create(4096, 128, 1024, 64);
1911 if (nxt_slow_path(engine->mem_pool == NULL)) {
1912 return;
1913 }
1914
1915 port = nxt_port_new(task, nxt_port_get_next_id(), nxt_pid,
1916 NXT_PROCESS_ROUTER);
1917 if (nxt_slow_path(port == NULL)) {
1918 return;
1919 }
1920
1921 ret = nxt_port_socket_init(task, port, 0);
1922 if (nxt_slow_path(ret != NXT_OK)) {
1923 nxt_port_use(task, port, -1);
1924 return;
1925 }
1926
1927 engine->port = port;
1928
1929 nxt_port_enable(task, port, &nxt_router_app_port_handlers);
1930
1931 nxt_event_engine_start(engine);
1932}
1933
1934
1935static void
1936nxt_router_listen_socket_create(nxt_task_t *task, void *obj, void *data)
1937{
1938 nxt_joint_job_t *job;
1939 nxt_socket_conf_t *skcf;
1940 nxt_listen_event_t *lev;
1941 nxt_listen_socket_t *ls;
1942 nxt_thread_spinlock_t *lock;
1943 nxt_socket_conf_joint_t *joint;
1944
1945 job = obj;
1946 joint = data;
1947
1948 nxt_queue_insert_tail(&task->thread->engine->joints, &joint->link);
1949
1950 skcf = joint->socket_conf;
1951 ls = skcf->listen;
1952
1953 lev = nxt_listen_event(task, ls);
1954 if (nxt_slow_path(lev == NULL)) {
1955 nxt_router_listen_socket_release(task, skcf);
1956 return;
1957 }
1958
1959 lev->socket.data = joint;
1960
1961 lock = &skcf->router_conf->router->lock;
1962
1963 nxt_thread_spin_lock(lock);
1964 ls->count++;
1965 nxt_thread_spin_unlock(lock);
1966
1967 job->work.next = NULL;
1968 job->work.handler = nxt_router_conf_wait;
1969
1970 nxt_event_engine_post(job->tmcf->engine, &job->work);
1971}
1972
1973
1974nxt_inline nxt_listen_event_t *
1975nxt_router_listen_event(nxt_queue_t *listen_connections,
1976 nxt_socket_conf_t *skcf)
1977{
1978 nxt_socket_t fd;
1979 nxt_queue_link_t *qlk;
1980 nxt_listen_event_t *lev;
1981
1982 fd = skcf->listen->socket;
1983
1984 for (qlk = nxt_queue_first(listen_connections);
1985 qlk != nxt_queue_tail(listen_connections);
1986 qlk = nxt_queue_next(qlk))
1987 {
1988 lev = nxt_queue_link_data(qlk, nxt_listen_event_t, link);
1989
1990 if (fd == lev->socket.fd) {
1991 return lev;
1992 }
1993 }
1994
1995 return NULL;
1996}
1997
1998
1999static void
2000nxt_router_listen_socket_update(nxt_task_t *task, void *obj, void *data)
2001{
2002 nxt_joint_job_t *job;
2003 nxt_event_engine_t *engine;
2004 nxt_listen_event_t *lev;
2005 nxt_socket_conf_joint_t *joint, *old;
2006
2007 job = obj;
2008 joint = data;
2009
2010 engine = task->thread->engine;
2011
2012 nxt_queue_insert_tail(&engine->joints, &joint->link);
2013
2014 lev = nxt_router_listen_event(&engine->listen_connections,
2015 joint->socket_conf);
2016
2017 old = lev->socket.data;
2018 lev->socket.data = joint;
2019 lev->listen = joint->socket_conf->listen;
2020
2021 job->work.next = NULL;
2022 job->work.handler = nxt_router_conf_wait;
2023
2024 nxt_event_engine_post(job->tmcf->engine, &job->work);
2025
2026 /*
2027 * The task is allocated from configuration temporary
2028 * memory pool so it can be freed after engine post operation.
2029 */
2030
2031 nxt_router_conf_release(&engine->task, old);
2032}
2033
2034
2035static void
2036nxt_router_listen_socket_delete(nxt_task_t *task, void *obj, void *data)
2037{
2038 nxt_joint_job_t *job;
2039 nxt_socket_conf_t *skcf;
2040 nxt_listen_event_t *lev;
2041 nxt_event_engine_t *engine;
2042
2043 job = obj;
2044 skcf = data;
2045
2046 engine = task->thread->engine;
2047
2048 lev = nxt_router_listen_event(&engine->listen_connections, skcf);
2049
2050 nxt_fd_event_delete(engine, &lev->socket);
2051
2052 nxt_debug(task, "engine %p: listen socket delete: %d", engine,
2053 lev->socket.fd);
2054
2055 lev->timer.handler = nxt_router_listen_socket_close;
2056 lev->timer.work_queue = &engine->fast_work_queue;
2057
2058 nxt_timer_add(engine, &lev->timer, 0);
2059
2060 job->work.next = NULL;
2061 job->work.handler = nxt_router_conf_wait;
2062
2063 nxt_event_engine_post(job->tmcf->engine, &job->work);
2064}
2065
2066
2067static void
2068nxt_router_worker_thread_quit(nxt_task_t *task, void *obj, void *data)
2069{
2070 nxt_event_engine_t *engine;
2071
2072 nxt_debug(task, "router worker thread quit");
2073
2074 engine = task->thread->engine;
2075
2076 engine->shutdown = 1;
2077
2078 if (nxt_queue_is_empty(&engine->joints)) {
2079 nxt_thread_exit(task->thread);
2080 }
2081}
2082
2083
2084static void
2085nxt_router_listen_socket_close(nxt_task_t *task, void *obj, void *data)
2086{
2087 nxt_timer_t *timer;
2088 nxt_listen_event_t *lev;
2089 nxt_socket_conf_joint_t *joint;
2090
2091 timer = obj;
2092 lev = nxt_timer_data(timer, nxt_listen_event_t, timer);
2093 joint = lev->socket.data;
2094
2095 nxt_debug(task, "engine %p: listen socket close: %d", task->thread->engine,
2096 lev->socket.fd);
2097
2098 nxt_queue_remove(&lev->link);
2099
2100 /* 'task' refers to lev->task and we cannot use after nxt_free() */
2101 task = &task->thread->engine->task;
2102
2103 nxt_router_listen_socket_release(task, joint->socket_conf);
2104
2105 nxt_free(lev);
2106
2107 nxt_router_conf_release(task, joint);
2108}
2109
2110
2111static void
2112nxt_router_listen_socket_release(nxt_task_t *task, nxt_socket_conf_t *skcf)
2113{
2114 nxt_listen_socket_t *ls;
2115 nxt_thread_spinlock_t *lock;
2116
2117 ls = skcf->listen;
2118 lock = &skcf->router_conf->router->lock;
2119
2120 nxt_thread_spin_lock(lock);
2121
2122 nxt_debug(task, "engine %p: listen socket release: ls->count %D",
2123 task->thread->engine, ls->count);
2124
2125 if (--ls->count != 0) {
2126 ls = NULL;
2127 }
2128
2129 nxt_thread_spin_unlock(lock);
2130
2131 if (ls != NULL) {
2132 nxt_socket_close(task, ls->socket);
2133 nxt_free(ls);
2134 }
2135}
2136
2137
2138static void
2139nxt_router_conf_release(nxt_task_t *task, nxt_socket_conf_joint_t *joint)
2140{
2141 nxt_bool_t exit;
2142 nxt_socket_conf_t *skcf;
2143 nxt_router_conf_t *rtcf;
2144 nxt_event_engine_t *engine;
2145 nxt_thread_spinlock_t *lock;
2146
2147 nxt_debug(task, "conf joint %p count: %D", joint, joint->count);
2148
2149 if (--joint->count != 0) {
2150 return;
2151 }
2152
2153 nxt_queue_remove(&joint->link);
2154
2155 skcf = joint->socket_conf;
2156 rtcf = skcf->router_conf;
2157 lock = &rtcf->router->lock;
2158
2159 nxt_thread_spin_lock(lock);
2160
2161 nxt_debug(task, "conf skcf %p: %D, rtcf %p: %D", skcf, skcf->count,
2162 rtcf, rtcf->count);
2163
2164 if (--skcf->count != 0) {
2165 rtcf = NULL;
2166
2167 } else {
2168 nxt_queue_remove(&skcf->link);
2169
2170 if (--rtcf->count != 0) {
2171 rtcf = NULL;
2172 }
2173 }
2174
2175 nxt_thread_spin_unlock(lock);
2176
2177 /* TODO remove engine->port */
2178 /* TODO excude from connected ports */
2179
2180 /* The joint content can be used before memory pool destruction. */
2181 engine = joint->engine;
2182 exit = (engine->shutdown && nxt_queue_is_empty(&engine->joints));
2183
2184 if (rtcf != NULL) {
2185 nxt_debug(task, "old router conf is destroyed");
2186
2187 nxt_mp_thread_adopt(rtcf->mem_pool);
2188
2189 nxt_mp_destroy(rtcf->mem_pool);
2190 }
2191
2192 if (exit) {
2193 nxt_thread_exit(task->thread);
2194 }
2195}
2196
2197
2198static void
2199nxt_router_thread_exit_handler(nxt_task_t *task, void *obj, void *data)
2200{
2201 nxt_port_t *port;
2202 nxt_thread_link_t *link;
2203 nxt_event_engine_t *engine;
2204 nxt_thread_handle_t handle;
2205
2206 handle = (nxt_thread_handle_t) obj;
2207 link = data;
2208
2209 nxt_thread_wait(handle);
2210
2211 engine = link->engine;
2212
2213 nxt_queue_remove(&engine->link);
2214
2215 port = engine->port;
2216
2217 // TODO notify all apps
2218
2219 port->engine = task->thread->engine;
2220 nxt_mp_thread_adopt(port->mem_pool);
2221 nxt_port_use(task, port, -1);
2222
2223 nxt_mp_thread_adopt(engine->mem_pool);
2224 nxt_mp_destroy(engine->mem_pool);
2225
2226 nxt_event_engine_free(engine);
2227
2228 nxt_free(link);
2229}
2230
2231
2232static const nxt_conn_state_t nxt_router_conn_read_header_state
2233 nxt_aligned(64) =
2234{
2235 .ready_handler = nxt_router_conn_http_header_parse,
2236 .close_handler = nxt_router_conn_close,
2237 .error_handler = nxt_router_conn_error,
2238
2239 .timer_handler = nxt_router_conn_timeout,
2240 .timer_value = nxt_router_conn_timeout_value,
2241 .timer_data = offsetof(nxt_socket_conf_t, header_read_timeout),
2242};
2243
2244
2245static const nxt_conn_state_t nxt_router_conn_read_body_state
2246 nxt_aligned(64) =
2247{
2248 .ready_handler = nxt_router_conn_http_body_read,
2249 .close_handler = nxt_router_conn_close,
2250 .error_handler = nxt_router_conn_error,
2251
2252 .timer_handler = nxt_router_conn_timeout,
2253 .timer_value = nxt_router_conn_timeout_value,
2254 .timer_data = offsetof(nxt_socket_conf_t, body_read_timeout),
2255 .timer_autoreset = 1,
2256};
2257
2258
2259static void
2260nxt_router_conn_init(nxt_task_t *task, void *obj, void *data)
2261{
2262 size_t size;
2263 nxt_conn_t *c;
2264 nxt_socket_conf_t *skcf;
2265 nxt_event_engine_t *engine;
2266 nxt_socket_conf_joint_t *joint;
2267
2268 c = obj;
2269 joint = data;
2270
2271 nxt_debug(task, "router conn init");
2272
2273 c->joint = joint;
2274 joint->count++;
2275
2276 skcf = joint->socket_conf;
2277 c->local = skcf->sockaddr;
2278
2279 size = skcf->header_buffer_size;
2280 c->read = nxt_buf_mem_alloc(c->mem_pool, size, 0);
2281
2282 c->socket.data = NULL;
2283
2284 engine = task->thread->engine;
2285 c->read_work_queue = &engine->fast_work_queue;
2286 c->write_work_queue = &engine->fast_work_queue;
2287
2288 c->read_state = &nxt_router_conn_read_header_state;
2289
2290 nxt_conn_read(engine, c);
2291}
2292
2293
2294static const nxt_conn_state_t nxt_router_conn_write_state
2295 nxt_aligned(64) =
2296{
2297 .ready_handler = nxt_router_conn_ready,
2298 .close_handler = nxt_router_conn_close,
2299 .error_handler = nxt_router_conn_error,
2300};
2301
2302
2303static void
2304nxt_router_response_ready_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg,
2305 void *data)
2306{
2307 size_t dump_size;
2308 nxt_buf_t *b, *last;
2309 nxt_conn_t *c;
2310 nxt_req_conn_link_t *rc;
2311
2312 b = msg->buf;
2313 rc = data;
2314
2315 c = rc->conn;
2316
2317 dump_size = nxt_buf_used_size(b);
2318
2319 if (dump_size > 300) {
2320 dump_size = 300;
2321 }
2322
2323 nxt_debug(task, "%srouter app data (%z): %*s",
2324 msg->port_msg.last ? "last " : "", msg->size, dump_size,
2325 b->mem.pos);
2326
2327 if (msg->size == 0) {
2328 b = NULL;
2329 }
2330
2331 if (msg->port_msg.last != 0) {
2332 nxt_debug(task, "router data create last buf");
2333
2334 last = nxt_buf_sync_alloc(c->mem_pool, NXT_BUF_SYNC_LAST);
2335 if (nxt_slow_path(last == NULL)) {
2336 /* TODO pogorevaTb */
2337 }
2338
2339 nxt_buf_chain_add(&b, last);
2340
2341 nxt_router_rc_unlink(task, rc);
2342 }
2343
2344 if (b == NULL) {
2345 return;
2346 }
2347
2348 if (msg->buf == b) {
2349 /* Disable instant buffer completion/re-using by port. */
2350 msg->buf = NULL;
2351 }
2352
2353 if (c->write == NULL) {
2354 c->write = b;
2355 c->write_state = &nxt_router_conn_write_state;
2356
2357 nxt_conn_write(task->thread->engine, c);
2358
2359 } else {
2360 nxt_debug(task, "router data attach out bufs to existing chain");
2361
2362 nxt_buf_chain_add(&c->write, b);
2363 }
2364}
2365
2366
2367static void
2368nxt_router_response_error_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg,
2369 void *data)
2370{
2371 nxt_req_conn_link_t *rc;
2372
2373 rc = data;
2374
2375 nxt_router_gen_error(task, rc->conn, 500,
2376 "Application terminated unexpectedly");
2377
2378 nxt_router_rc_unlink(task, rc);
2379}
2380
2381
2382nxt_inline const char *
2383nxt_router_text_by_code(int code)
2384{
2385 switch (code) {
2386 case 400: return "Bad request";
2387 case 404: return "Not found";
2388 case 403: return "Forbidden";
2389 case 408: return "Request Timeout";
2390 case 411: return "Length Required";
2391 case 413: return "Request Entity Too Large";
2392 case 500:
2393 default: return "Internal server error";
2394 }
2395}
2396
2397
2398static nxt_buf_t *
2399nxt_router_get_error_buf(nxt_task_t *task, nxt_mp_t *mp, int code,
2400 const char* str)
2401{
2402 nxt_buf_t *b, *last;
2403
2404 b = nxt_buf_mem_alloc(mp, 16384, 0);
2405 if (nxt_slow_path(b == NULL)) {
2406 return NULL;
2407 }
2408
2409 b->mem.free = nxt_sprintf(b->mem.free, b->mem.end,
2410 "HTTP/1.0 %d %s\r\n"
2411 "Content-Type: text/plain\r\n"
2412 "Connection: close\r\n\r\n",
2413 code, nxt_router_text_by_code(code));
2414
2415 b->mem.free = nxt_cpymem(b->mem.free, str, nxt_strlen(str));
2416
2417 nxt_log_alert(task->log, "error %d: %s", code, str);
2418
2419 last = nxt_buf_sync_alloc(mp, NXT_BUF_SYNC_LAST);
2420
2421 if (nxt_slow_path(last == NULL)) {
2422 nxt_mp_free(mp, b);
2423 return NULL;
2424 }
2425
2426 nxt_buf_chain_add(&b, last);
2427
2428 return b;
2429}
2430
2431
2432
2433static void
2434nxt_router_gen_error(nxt_task_t *task, nxt_conn_t *c, int code,
2435 const char* str)
2436{
2437 nxt_mp_t *mp;
2438 nxt_buf_t *b;
2439
2440 /* TODO: fix when called in the middle of response */
2441
2442 mp = c->mem_pool;
2443
2444 b = nxt_router_get_error_buf(task, mp, code, str);
2445
2446 if (c->socket.fd == -1) {
2447 nxt_mp_free(mp, b->next);
2448 nxt_mp_free(mp, b);
2449 return;
2450 }
2451
2452 if (c->write == NULL) {
2453 c->write = b;
2454 c->write_state = &nxt_router_conn_write_state;
2455
2456 nxt_conn_write(task->thread->engine, c);
2457
2458 } else {
2459 nxt_debug(task, "router data attach out bufs to existing chain");
2460
2461 nxt_buf_chain_add(&c->write, b);
2462 }
2463}
2464
2465
2466static void
2467nxt_router_app_port_ready(nxt_task_t *task, nxt_port_recv_msg_t *msg,
2468 void *data)
2469{
2470 nxt_app_t *app;
2471 nxt_port_t *port;
2472
2473 app = data;
2474 port = msg->u.new_port;
2475
2476 nxt_assert(app != NULL);
2477 nxt_assert(port != NULL);
2478
2479 port->app = app;
2480
2481 nxt_thread_mutex_lock(&app->mutex);
2482
2483 nxt_assert(app->pending_workers != 0);
2484
2485 app->pending_workers--;
2486 app->workers++;
2487
2488 nxt_thread_mutex_unlock(&app->mutex);
2489
2490 nxt_debug(task, "app '%V' %p new port ready", &app->name, app);
2491
2492 nxt_router_app_port_release(task, port, 0, 0);
2493}
2494
2495
2496static void
2497nxt_router_app_port_error(nxt_task_t *task, nxt_port_recv_msg_t *msg,
2498 void *data)
2499{
2500 nxt_app_t *app;
2501 nxt_queue_link_t *lnk;
2502 nxt_req_app_link_t *ra;
2503
2504 app = data;
2505
2506 nxt_assert(app != NULL);
2507
2508 nxt_debug(task, "app '%V' %p start error", &app->name, app);
2509
2510 nxt_thread_mutex_lock(&app->mutex);
2511
2512 nxt_assert(app->pending_workers != 0);
2513
2514 app->pending_workers--;
2515
2516 if (!nxt_queue_is_empty(&app->requests)) {
2517 lnk = nxt_queue_last(&app->requests);
2518 nxt_queue_remove(lnk);
2519 lnk->next = NULL;
2520
2521 ra = nxt_queue_link_data(lnk, nxt_req_app_link_t, link);
2522
2523 } else {
2524 ra = NULL;
2525 }
2526
2527 nxt_thread_mutex_unlock(&app->mutex);
2528
2529 if (ra != NULL) {
2530 nxt_debug(task, "app '%V' %p abort next stream #%uD",
2531 &app->name, app, ra->stream);
2532
2533 nxt_router_ra_abort(task, ra, ra->work.data);
2534 }
2535
2536 nxt_router_app_use(task, app, -1);
2537}
2538
2539
2540void
2541nxt_router_app_use(nxt_task_t *task, nxt_app_t *app, int i)
2542{
2543 int c;
2544
2545 c = nxt_atomic_fetch_add(&app->use_count, i);
2546
2547 if (i < 0 && c == -i) {
2548
2549 nxt_assert(app->live == 0);
2550 nxt_assert(app->workers == 0);
2551 nxt_assert(app->pending_workers == 0);
2552 nxt_assert(nxt_queue_is_empty(&app->requests) != 0);
2553 nxt_assert(nxt_queue_is_empty(&app->ports) != 0);
2554
2555 nxt_thread_mutex_destroy(&app->mutex);
2556 nxt_free(app);
2557 }
2558}
2559
2560
2561nxt_inline nxt_port_t *
2562nxt_router_app_get_port_unsafe(nxt_app_t *app, int *use_delta)
2563{
2564 nxt_port_t *port;
2565 nxt_queue_link_t *lnk;
2566
2567 lnk = nxt_queue_first(&app->ports);
2568 nxt_queue_remove(lnk);
2569
2570 port = nxt_queue_link_data(lnk, nxt_port_t, app_link);
2571
2572 port->app_requests++;
2573
2574 if (app->live &&
2575 (app->max_pending_responses == 0 ||
2576 (port->app_requests - port->app_responses) <
2577 app->max_pending_responses) )
2578 {
2579 nxt_queue_insert_tail(&app->ports, lnk);
2580
2581 } else {
2582 lnk->next = NULL;
2583
2584 (*use_delta)--;
2585 }
2586
2587 return port;
2588}
2589
2590
2591static nxt_port_t *
2592nxt_router_app_get_idle_port(nxt_app_t *app)
2593{
2594 nxt_port_t *port;
2595
2596 port = NULL;
2597
2598 nxt_thread_mutex_lock(&app->mutex);
2599
2600 nxt_queue_each(port, &app->ports, nxt_port_t, app_link) {
2601
2602 if (port->app_requests > port->app_responses) {
2603 port = NULL;
2604
2605 continue;
2606 }
2607
2608 nxt_queue_remove(&port->app_link);
2609 port->app_link.next = NULL;
2610
2611 break;
2612
2613 } nxt_queue_loop;
2614
2615 nxt_thread_mutex_unlock(&app->mutex);
2616
2617 return port;
2618}
2619
2620
2621static void
2622nxt_router_app_process_request(nxt_task_t *task, void *obj, void *data)
2623{
2624 nxt_app_t *app;
2625 nxt_req_app_link_t *ra;
2626
2627 app = obj;
2628 ra = data;
2629
2630 nxt_assert(app != NULL);
2631 nxt_assert(ra != NULL);
2632 nxt_assert(ra->app_port != NULL);
2633
2634 nxt_debug(task, "app '%V' %p process next stream #%uD",
2635 &app->name, app, ra->stream);
2636
2637 nxt_router_process_http_request_mp(task, ra);
2638}
2639
2640
2641static void
2642nxt_router_app_port_release(nxt_task_t *task, nxt_port_t *port,
2643 uint32_t request_failed, uint32_t got_response)
2644{
2645 int use_delta, ra_use_delta;
2646 nxt_app_t *app;
2647 nxt_bool_t send_quit;
2648 nxt_queue_link_t *lnk;
2649 nxt_req_app_link_t *ra;
2650
2651 nxt_assert(port != NULL);
2652 nxt_assert(port->app != NULL);
2653
2654 app = port->app;
2655
2656 use_delta = (request_failed == 0 && got_response == 0) ? 0 : -1;
2657
2658 nxt_thread_mutex_lock(&app->mutex);
2659
2660 port->app_requests -= request_failed;
2661 port->app_responses += got_response;
2662
2663 if (app->live != 0 &&
2664 port->pair[1] != -1 &&
2665 port->app_link.next == NULL &&
2666 (app->max_pending_responses == 0 ||
2667 (port->app_requests - port->app_responses) <
2668 app->max_pending_responses) )
2669 {
2670 nxt_queue_insert_tail(&app->ports, &port->app_link);
2671 use_delta++;
2672 }
2673
2674 if (app->live != 0 &&
2675 !nxt_queue_is_empty(&app->ports) &&
2676 !nxt_queue_is_empty(&app->requests))
2677 {
2678 lnk = nxt_queue_first(&app->requests);
2679 nxt_queue_remove(lnk);
2680 lnk->next = NULL;
2681
2682 ra = nxt_queue_link_data(lnk, nxt_req_app_link_t, link);
2683
2684 ra_use_delta = 1;
2685 ra->app_port = nxt_router_app_get_port_unsafe(app, &ra_use_delta);
2686
2687 } else {
2688 ra = NULL;
2689 ra_use_delta = 0;
2690 }
2691
2692 send_quit = app->live == 0 && port->app_requests == port->app_responses;
2693
2694 nxt_thread_mutex_unlock(&app->mutex);
2695
2696 if (ra != NULL) {
2697 nxt_work_queue_add(&task->thread->engine->fast_work_queue,
2698 nxt_router_app_process_request,
2699 &task->thread->engine->task, app, ra);
2700
2701 goto adjust_use;
2702 }
2703
2704 /* ? */
2705 if (port->pair[1] == -1) {
2706 nxt_debug(task, "app '%V' %p port %p already closed (pid %PI dead?)",
2707 &app->name, app, port, port->pid);
2708
2709 goto adjust_use;
2710 }
2711
2712 if (send_quit) {
2713 nxt_debug(task, "app '%V' %p is not alive, send QUIT to port",
2714 &app->name, app);
2715
2716 nxt_port_socket_write(task, port, NXT_PORT_MSG_QUIT,
2717 -1, 0, 0, NULL);
2718
2719 goto adjust_use;
2720 }
2721
2722 nxt_debug(task, "app '%V' %p requests queue is empty, keep the port",
2723 &app->name, app);
2724
2725adjust_use:
2726
2727 if (use_delta != 0) {
2728 nxt_port_use(task, port, use_delta);
2729 }
2730
2731 if (ra_use_delta != 0) {
2732 nxt_port_use(task, ra->app_port, ra_use_delta);
2733 }
2734}
2735
2736
2737void
2738nxt_router_app_port_close(nxt_task_t *task, nxt_port_t *port)
2739{
2740 nxt_app_t *app;
2741 nxt_bool_t unchain, start_worker;
2742
2743 app = port->app;
2744
2745 nxt_assert(app != NULL);
2746
2747 nxt_thread_mutex_lock(&app->mutex);
2748
2749 unchain = port->app_link.next != NULL;
2750
2751 if (unchain) {
2752 nxt_queue_remove(&port->app_link);
2753 port->app_link.next = NULL;
2754 }
2755
2756 app->workers--;
2757
2758 start_worker = app->live != 0 &&
2759 nxt_queue_is_empty(&app->requests) == 0 &&
2760 app->workers + app->pending_workers < app->max_workers;
2761
2762 if (start_worker) {
2763 app->pending_workers++;
2764 }
2765
2766 nxt_thread_mutex_unlock(&app->mutex);
2767
2768 nxt_debug(task, "app '%V' %p port %p close", &app->name, app, port);
2769
2770 if (unchain) {
2771 nxt_port_use(task, port, -1);
2772 }
2773
2774 if (start_worker) {
2775 nxt_router_start_worker(task, app);
2776 }
2777}
2778
2779
2780static nxt_int_t
2781nxt_router_app_port(nxt_task_t *task, nxt_req_app_link_t *ra)
2782{
2783 int use_delta;
2784 nxt_int_t res;
2785 nxt_app_t *app;
2786 nxt_bool_t can_start_worker;
2787 nxt_conn_t *c;
2788 nxt_port_t *port;
2789 nxt_event_engine_t *engine;
2790 nxt_socket_conf_joint_t *joint;
2791
2792 port = NULL;
2793 use_delta = 1;
2794 c = ra->rc->conn;
2795
2796 joint = c->joint;
2797 app = joint->socket_conf->application;
2798
2799 if (app == NULL) {
2800 nxt_router_gen_error(task, c, 500,
2801 "Application is NULL in socket_conf");
2802 return NXT_ERROR;
2803 }
2804
2805 ra->rc->app = app;
2806
2807 nxt_router_app_use(task, app, 1);
2808
2809 engine = task->thread->engine;
2810
2811 nxt_timer_disable(engine, &c->read_timer);
2812
2813 if (app->timeout != 0) {
2814 c->read_timer.handler = nxt_router_app_timeout;
2815 nxt_timer_add(engine, &c->read_timer, app->timeout);
2816 }
2817
2818 can_start_worker = 0;
2819
2820 nxt_thread_mutex_lock(&app->mutex);
2821
2822 if (!nxt_queue_is_empty(&app->ports)) {
2823 port = nxt_router_app_get_port_unsafe(app, &use_delta);
2824
2825 } else {
2826 ra = nxt_router_ra_create(task, ra);
2827
2828 if (nxt_fast_path(ra != NULL)) {
2829 nxt_queue_insert_tail(&app->requests, &ra->link);
2830
2831 can_start_worker = (app->workers + app->pending_workers) <
2832 app->max_workers;
2833 if (can_start_worker) {
2834 app->pending_workers++;
2835 }
2836 }
2837
2838 port = NULL;
2839 }
2840
2841 nxt_thread_mutex_unlock(&app->mutex);
2842
2843 if (nxt_slow_path(ra == NULL)) {
2844 nxt_router_gen_error(task, c, 500, "Failed to allocate "
2845 "req<->app link");
2846 return NXT_ERROR;
2847 }
2848
2849 if (port != NULL) {
2850 nxt_debug(task, "already have port for app '%V' %p ", &app->name, app);
2851
2852 ra->app_port = port;
2853
2854 if (use_delta != 0) {
2855 nxt_port_use(task, port, use_delta);
2856 }
2857 return NXT_OK;
2858 }
2859
2860 nxt_debug(task, "ra stream #%uD allocated", ra->stream);
2861
2862 if (!can_start_worker) {
2863 nxt_debug(task, "app '%V' %p too many running or pending workers",
2864 &app->name, app);
2865
2866 return NXT_AGAIN;
2867 }
2868
2869 res = nxt_router_start_worker(task, app);
2870
2871 if (nxt_slow_path(res != NXT_OK)) {
2872 nxt_router_gen_error(task, c, 500, "Failed to start worker");
2873
2874 return NXT_ERROR;
2875 }
2876
2877 return NXT_AGAIN;
2878}
2879
2880
2881static void
2882nxt_router_conn_http_header_parse(nxt_task_t *task, void *obj, void *data)
2883{
2884 size_t size;
2885 nxt_int_t ret;
2886 nxt_buf_t *buf;
2887 nxt_conn_t *c;
2888 nxt_sockaddr_t *local;
2889 nxt_app_parse_ctx_t *ap;
2890 nxt_app_request_body_t *b;
2891 nxt_socket_conf_joint_t *joint;
2892 nxt_app_request_header_t *h;
2893
2894 c = obj;
2895 ap = data;
2896 buf = c->read;
2897 joint = c->joint;
2898
2899 nxt_debug(task, "router conn http header parse");
2900
2901 if (ap == NULL) {
2902 ap = nxt_app_http_req_init(task);
2903 if (nxt_slow_path(ap == NULL)) {
2904 nxt_router_gen_error(task, c, 500,
2905 "Failed to allocate parse context");
2906 return;
2907 }
2908
2909 c->socket.data = ap;
2910
2911 ap->r.remote.start = nxt_sockaddr_address(c->remote);
2912 ap->r.remote.length = c->remote->address_length;
2913
2914 /*
2915 * TODO: need an application flag to get local address
2916 * required by "SERVER_ADDR" in Pyhton and PHP. Not used in Go.
2917 */
2918 local = nxt_router_local_addr(task, c);
2919
2920 if (nxt_fast_path(local != NULL)) {
2921 ap->r.local.start = nxt_sockaddr_address(local);
2922 ap->r.local.length = local->address_length;
2923 }
2924
2925 ap->r.header.buf = buf;
2926 }
2927
2928 h = &ap->r.header;
2929 b = &ap->r.body;
2930
2931 ret = nxt_app_http_req_header_parse(task, ap, buf);
2932
2933 nxt_debug(task, "http parse request header: %d", ret);
2934
2935 switch (nxt_expect(NXT_DONE, ret)) {
2936
2937 case NXT_DONE:
2938 nxt_debug(task, "router request header parsing complete, "
2939 "content length: %O, preread: %uz",
2940 h->parsed_content_length, nxt_buf_mem_used_size(&buf->mem));
2941
2942 if (b->done) {
2943 nxt_router_process_http_request(task, c, ap);
2944
2945 return;
2946 }
2947
2948 if (joint->socket_conf->max_body_size > 0
2949 && (size_t) h->parsed_content_length
2950 > joint->socket_conf->max_body_size)
2951 {
2952 nxt_router_gen_error(task, c, 413, "Content-Length too big");
2953 return;
2954 }
2955
2956 if (nxt_buf_mem_free_size(&buf->mem) == 0) {
2957 size = nxt_min(joint->socket_conf->body_buffer_size,
2958 (size_t) h->parsed_content_length);
2959
2960 buf->next = nxt_buf_mem_alloc(c->mem_pool, size, 0);
2961 if (nxt_slow_path(buf->next == NULL)) {
2962 nxt_router_gen_error(task, c, 500, "Failed to allocate "
2963 "buffer for request body");
2964 return;
2965 }
2966
2967 c->read = buf->next;
2968
2969 b->preread_size += nxt_buf_mem_used_size(&buf->mem);
2970 }
2971
2972 if (b->buf == NULL) {
2973 b->buf = c->read;
2974 }
2975
2976 c->read_state = &nxt_router_conn_read_body_state;
2977 break;
2978
2979 case NXT_ERROR:
2980 nxt_router_gen_error(task, c, 400, "Request header parse error");
2981 return;
2982
2983 default: /* NXT_AGAIN */
2984
2985 if (c->read->mem.free == c->read->mem.end) {
2986 size = joint->socket_conf->large_header_buffer_size;
2987
2988 if (size <= (size_t) nxt_buf_mem_used_size(&buf->mem)
2989 || ap->r.header.bufs
2990 >= joint->socket_conf->large_header_buffers)
2991 {
2992 nxt_router_gen_error(task, c, 413,
2993 "Too long request headers");
2994 return;
2995 }
2996
2997 buf->next = nxt_buf_mem_alloc(c->mem_pool, size, 0);
2998 if (nxt_slow_path(buf->next == NULL)) {
2999 nxt_router_gen_error(task, c, 500,
3000 "Failed to allocate large header "
3001 "buffer");
3002 return;
3003 }
3004
3005 ap->r.header.bufs++;
3006
3007 size = c->read->mem.free - c->read->mem.pos;
3008
3009 c->read = nxt_buf_cpy(buf->next, c->read->mem.pos, size);
3010 }
3011
3012 }
3013
3014 nxt_conn_read(task->thread->engine, c);
3015}
3016
3017
3018static nxt_sockaddr_t *
3019nxt_router_local_addr(nxt_task_t *task, nxt_conn_t *c)
3020{
3021 int ret;
3022 size_t size, length;
3023 socklen_t socklen;
3024 nxt_sockaddr_t *sa;
3025
3026 if (c->local != NULL) {
3027 return c->local;
3028 }
3029
3030 /* AF_UNIX should not get in here. */
3031
3032 switch (c->remote->u.sockaddr.sa_family) {
3033#if (NXT_INET6)
3034 case AF_INET6:
3035 socklen = sizeof(struct sockaddr_in6);
3036 length = NXT_INET6_ADDR_STR_LEN;
3037 size = offsetof(nxt_sockaddr_t, u) + socklen + length;
3038 break;
3039#endif
3040 case AF_INET:
3041 default:
3042 socklen = sizeof(struct sockaddr_in);
3043 length = NXT_INET_ADDR_STR_LEN;
3044 size = offsetof(nxt_sockaddr_t, u) + socklen + length;
3045 break;
3046 }
3047
3048 sa = nxt_mp_get(c->mem_pool, size);
3049 if (nxt_slow_path(sa == NULL)) {
3050 return NULL;
3051 }
3052
3053 sa->socklen = socklen;
3054 sa->length = length;
3055
3056 ret = getsockname(c->socket.fd, &sa->u.sockaddr, &socklen);
3057 if (nxt_slow_path(ret != 0)) {
3058 nxt_log(task, NXT_LOG_CRIT, "getsockname(%d) failed", c->socket.fd);
3059 return NULL;
3060 }
3061
3062 c->local = sa;
3063
3064 nxt_sockaddr_text(sa);
3065
3066 /*
3067 * TODO: here we can adjust the end of non-freeable block
3068 * in c->mem_pool to the end of actual sockaddr length.
3069 */
3070
3071 return sa;
3072}
3073
3074
3075static void
3076nxt_router_conn_http_body_read(nxt_task_t *task, void *obj, void *data)
3077{
3078 size_t size;
3079 nxt_int_t ret;
3080 nxt_buf_t *buf;
3081 nxt_conn_t *c;
3082 nxt_app_parse_ctx_t *ap;
3083 nxt_app_request_body_t *b;
3084 nxt_socket_conf_joint_t *joint;
3085 nxt_app_request_header_t *h;
3086
3087 c = obj;
3088 ap = data;
3089 buf = c->read;
3090
3091 nxt_debug(task, "router conn http body read");
3092
3093 nxt_assert(ap != NULL);
3094
3095 b = &ap->r.body;
3096 h = &ap->r.header;
3097
3098 ret = nxt_app_http_req_body_read(task, ap, buf);
3099
3100 nxt_debug(task, "http read request body: %d", ret);
3101
3102 switch (nxt_expect(NXT_DONE, ret)) {
3103
3104 case NXT_DONE:
3105 nxt_router_process_http_request(task, c, ap);
3106 return;
3107
3108 case NXT_ERROR:
3109 nxt_router_gen_error(task, c, 500, "Read body error");
3110 return;
3111
3112 default: /* NXT_AGAIN */
3113
3114 if (nxt_buf_mem_free_size(&buf->mem) == 0) {
3115 joint = c->joint;
3116
3117 b->preread_size += nxt_buf_mem_used_size(&buf->mem);
3118
3119 size = nxt_min(joint->socket_conf->body_buffer_size,
3120 (size_t) h->parsed_content_length - b->preread_size);
3121
3122 buf->next = nxt_buf_mem_alloc(c->mem_pool, size, 0);
3123 if (nxt_slow_path(buf->next == NULL)) {
3124 nxt_router_gen_error(task, c, 500, "Failed to allocate "
3125 "buffer for request body");
3126 return;
3127 }
3128
3129 c->read = buf->next;
3130 }
3131
3132 nxt_debug(task, "router request body read again, rest: %uz",
3133 h->parsed_content_length - b->preread_size);
3134 }
3135
3136 nxt_conn_read(task->thread->engine, c);
3137}
3138
3139
3140static void
3141nxt_router_process_http_request(nxt_task_t *task, nxt_conn_t *c,
3142 nxt_app_parse_ctx_t *ap)
3143{
3144 nxt_int_t res;
3145 nxt_port_t *port;
3146 nxt_event_engine_t *engine;
3147 nxt_req_app_link_t ra_local, *ra;
3148 nxt_req_conn_link_t *rc;
3149
3150 engine = task->thread->engine;
3151
3152 rc = nxt_port_rpc_register_handler_ex(task, engine->port,
3153 nxt_router_response_ready_handler,
3154 nxt_router_response_error_handler,
3155 sizeof(nxt_req_conn_link_t));
3156
3157 if (nxt_slow_path(rc == NULL)) {
3158 nxt_router_gen_error(task, c, 500, "Failed to allocate "
3159 "req<->conn link");
3160
3161 return;
3162 }
3163
3164 rc->stream = nxt_port_rpc_ex_stream(rc);
3165 rc->conn = c;
3166
3167 nxt_queue_insert_tail(&c->requests, &rc->link);
3168
3169 nxt_debug(task, "stream #%uD linked to conn %p at engine %p",
3170 rc->stream, c, engine);
3171
3172 rc->ap = ap;
3173 c->socket.data = NULL;
3174
3175 ra = &ra_local;
3176 nxt_router_ra_init(task, ra, rc);
3177
3178 res = nxt_router_app_port(task, ra);
3179
3180 if (res != NXT_OK) {
3181 return;
3182 }
3183
3184 port = ra->app_port;
3185
3186 if (nxt_slow_path(port == NULL)) {
3187 nxt_router_gen_error(task, c, 500, "Application port not found");
3188 return;
3189 }
3190
3191 nxt_port_rpc_ex_set_peer(task, engine->port, rc, port->pid);
3192
3193 nxt_router_process_http_request_mp(task, ra);
3194}
3195
3196
3197static void
3198nxt_router_process_http_request_mp(nxt_task_t *task, nxt_req_app_link_t *ra)
3199{
3200 uint32_t request_failed;
3201 nxt_int_t res;
3202 nxt_port_t *port, *c_port, *reply_port;
3203 nxt_app_wmsg_t wmsg;
3204 nxt_app_parse_ctx_t *ap;
3205
3206 nxt_assert(ra->app_port != NULL);
3207
3208 port = ra->app_port;
3209 reply_port = ra->reply_port;
3210 ap = ra->ap;
3211
3212 request_failed = 1;
3213
3214 c_port = nxt_process_connected_port_find(port->process, reply_port->pid,
3215 reply_port->id);
3216 if (nxt_slow_path(c_port != reply_port)) {
3217 res = nxt_port_send_port(task, port, reply_port, 0);
3218
3219 if (nxt_slow_path(res != NXT_OK)) {
3220 nxt_router_ra_error(task, ra, 500,
3221 "Failed to send reply port to application");
3222 ra = NULL;
3223 goto release_port;
3224 }
3225
3226 nxt_process_connected_port_add(port->process, reply_port);
3227 }
3228
3229 wmsg.port = port;
3230 wmsg.write = NULL;
3231 wmsg.buf = &wmsg.write;
3232 wmsg.stream = ra->stream;
3233
3234 res = port->app->prepare_msg(task, &ap->r, &wmsg);
3235
3236 if (nxt_slow_path(res != NXT_OK)) {
3237 nxt_router_ra_error(task, ra, 500,
3238 "Failed to prepare message for application");
3239 ra = NULL;
3240 goto release_port;
3241 }
3242
3243 nxt_debug(task, "about to send %d bytes buffer to worker port %d",
3244 nxt_buf_used_size(wmsg.write),
3245 wmsg.port->socket.fd);
3246
3247 request_failed = 0;
3248
3249 res = nxt_port_socket_write(task, wmsg.port, NXT_PORT_MSG_DATA,
3250 -1, ra->stream, reply_port->id, wmsg.write);
3251
3252 if (nxt_slow_path(res != NXT_OK)) {
3253 nxt_router_ra_error(task, ra, 500,
3254 "Failed to send message to application");
3255 ra = NULL;
3256 goto release_port;
3257 }
3258
3259release_port:
3260
3261 nxt_router_app_port_release(task, port, request_failed, 0);
3262
3263 if (ra != NULL) {
3264 if (request_failed != 0) {
3265 ra->app_port = 0;
3266 }
3267
3268 nxt_router_ra_release(task, ra, ra->work.data);
3269 }
3270}
3271
3272
3273static nxt_int_t
3274nxt_python_prepare_msg(nxt_task_t *task, nxt_app_request_t *r,
3275 nxt_app_wmsg_t *wmsg)
3276{
3277 nxt_int_t rc;
3278 nxt_buf_t *b;
3279 nxt_http_field_t *field;
3280 nxt_app_request_header_t *h;
3281
3282 static const nxt_str_t prefix = nxt_string("HTTP_");
3283 static const nxt_str_t eof = nxt_null_string;
3284
3285 h = &r->header;
3286
3287#define RC(S) \
3288 do { \
3289 rc = (S); \
3290 if (nxt_slow_path(rc != NXT_OK)) { \
3291 goto fail; \
3292 } \
3293 } while(0)
3294
3295#define NXT_WRITE(N) \
3296 RC(nxt_app_msg_write_str(task, wmsg, N))
3297
3298 /* TODO error handle, async mmap buffer assignment */
3299
3300 NXT_WRITE(&h->method);
3301 NXT_WRITE(&h->target);
3302
3303 if (h->path.start == h->target.start) {
3304 NXT_WRITE(&eof);
3305
3306 } else {
3307 NXT_WRITE(&h->path);
3308 }
3309
3310 if (h->query.start != NULL) {
3311 RC(nxt_app_msg_write_size(task, wmsg,
3312 h->query.start - h->target.start + 1));
3313 } else {
3314 RC(nxt_app_msg_write_size(task, wmsg, 0));
3315 }
3316
3317 NXT_WRITE(&h->version);
3318
3319 NXT_WRITE(&r->remote);
3320 NXT_WRITE(&r->local);
3321
3322 NXT_WRITE(&h->host);
3323 NXT_WRITE(&h->content_type);
3324 NXT_WRITE(&h->content_length);
3325
3326 nxt_list_each(field, h->fields) {
3327 RC(nxt_app_msg_write_prefixed_upcase(task, wmsg,
3328 &prefix, &field->name));
3329 NXT_WRITE(&field->value);
3330
3331 } nxt_list_loop;
3332
3333 /* end-of-headers mark */
3334 NXT_WRITE(&eof);
3335
3336 RC(nxt_app_msg_write_size(task, wmsg, r->body.preread_size));
3337
3338 for(b = r->body.buf; b != NULL; b = b->next) {
3339 RC(nxt_app_msg_write_raw(task, wmsg, b->mem.pos,
3340 nxt_buf_mem_used_size(&b->mem)));
3341 }
3342
3343#undef NXT_WRITE
3344#undef RC
3345
3346 return NXT_OK;
3347
3348fail:
3349
3350 return NXT_ERROR;
3351}
3352
3353
3354static nxt_int_t
3355nxt_php_prepare_msg(nxt_task_t *task, nxt_app_request_t *r,
3356 nxt_app_wmsg_t *wmsg)
3357{
3358 nxt_int_t rc;
3359 nxt_buf_t *b;
3360 nxt_bool_t method_is_post;
3361 nxt_http_field_t *field;
3362 nxt_app_request_header_t *h;
3363
3364 static const nxt_str_t prefix = nxt_string("HTTP_");
3365 static const nxt_str_t eof = nxt_null_string;
3366
3367 h = &r->header;
3368
3369#define RC(S) \
3370 do { \
3371 rc = (S); \
3372 if (nxt_slow_path(rc != NXT_OK)) { \
3373 goto fail; \
3374 } \
3375 } while(0)
3376
3377#define NXT_WRITE(N) \
3378 RC(nxt_app_msg_write_str(task, wmsg, N))
3379
3380 /* TODO error handle, async mmap buffer assignment */
3381
3382 NXT_WRITE(&h->method);
3383 NXT_WRITE(&h->target);
3384
3385 if (h->path.start == h->target.start) {
3386 NXT_WRITE(&eof);
3387
3388 } else {
3389 NXT_WRITE(&h->path);
3390 }
3391
3392 if (h->query.start != NULL) {
3393 RC(nxt_app_msg_write_size(task, wmsg,
3394 h->query.start - h->target.start + 1));
3395 } else {
3396 RC(nxt_app_msg_write_size(task, wmsg, 0));
3397 }
3398
3399 NXT_WRITE(&h->version);
3400
3401 // PHP_SELF
3402 // SCRIPT_NAME
3403 // SCRIPT_FILENAME
3404 // DOCUMENT_ROOT
3405
3406 NXT_WRITE(&r->remote);
3407 NXT_WRITE(&r->local);
3408
3409 NXT_WRITE(&h->host);
3410 NXT_WRITE(&h->cookie);
3411 NXT_WRITE(&h->content_type);
3412 NXT_WRITE(&h->content_length);
3413
3414 RC(nxt_app_msg_write_size(task, wmsg, h->parsed_content_length));
3415 RC(nxt_app_msg_write_size(task, wmsg, r->body.preread_size));
3416
3417 method_is_post = h->method.length == 4 &&
3418 h->method.start[0] == 'P' &&
3419 h->method.start[1] == 'O' &&
3420 h->method.start[2] == 'S' &&
3421 h->method.start[3] == 'T';
3422
3423 if (method_is_post) {
3424 for(b = r->body.buf; b != NULL; b = b->next) {
3425 RC(nxt_app_msg_write_raw(task, wmsg, b->mem.pos,
3426 nxt_buf_mem_used_size(&b->mem)));
3427 }
3428 }
3429
3430 nxt_list_each(field, h->fields) {
3431 RC(nxt_app_msg_write_prefixed_upcase(task, wmsg,
3432 &prefix, &field->name));
3433 NXT_WRITE(&field->value);
3434
3435 } nxt_list_loop;
3436
3437 /* end-of-headers mark */
3438 NXT_WRITE(&eof);
3439
3440 if (!method_is_post) {
3441 for(b = r->body.buf; b != NULL; b = b->next) {
3442 RC(nxt_app_msg_write_raw(task, wmsg, b->mem.pos,
3443 nxt_buf_mem_used_size(&b->mem)));
3444 }
3445 }
3446
3447#undef NXT_WRITE
3448#undef RC
3449
3450 return NXT_OK;
3451
3452fail:
3453
3454 return NXT_ERROR;
3455}
3456
3457
3458static nxt_int_t
3459nxt_go_prepare_msg(nxt_task_t *task, nxt_app_request_t *r, nxt_app_wmsg_t *wmsg)
3460{
3461 nxt_int_t rc;
3462 nxt_buf_t *b;
3463 nxt_http_field_t *field;
3464 nxt_app_request_header_t *h;
3465
3466 static const nxt_str_t eof = nxt_null_string;
3467
3468 h = &r->header;
3469
3470#define RC(S) \
3471 do { \
3472 rc = (S); \
3473 if (nxt_slow_path(rc != NXT_OK)) { \
3474 goto fail; \
3475 } \
3476 } while(0)
3477
3478#define NXT_WRITE(N) \
3479 RC(nxt_app_msg_write_str(task, wmsg, N))
3480
3481 /* TODO error handle, async mmap buffer assignment */
3482
3483 NXT_WRITE(&h->method);
3484 NXT_WRITE(&h->target);
3485
3486 if (h->path.start == h->target.start) {
3487 NXT_WRITE(&eof);
3488
3489 } else {
3490 NXT_WRITE(&h->path);
3491 }
3492
3493 if (h->query.start != NULL) {
3494 RC(nxt_app_msg_write_size(task, wmsg,
3495 h->query.start - h->target.start + 1));
3496 } else {
3497 RC(nxt_app_msg_write_size(task, wmsg, 0));
3498 }
3499
3500 NXT_WRITE(&h->version);
3501 NXT_WRITE(&r->remote);
3502
3503 NXT_WRITE(&h->host);
3504 NXT_WRITE(&h->cookie);
3505 NXT_WRITE(&h->content_type);
3506 NXT_WRITE(&h->content_length);
3507
3508 RC(nxt_app_msg_write_size(task, wmsg, h->parsed_content_length));
3509
3510 nxt_list_each(field, h->fields) {
3511 NXT_WRITE(&field->name);
3512 NXT_WRITE(&field->value);
3513
3514 } nxt_list_loop;
3515
3516 /* end-of-headers mark */
3517 NXT_WRITE(&eof);
3518
3519 RC(nxt_app_msg_write_size(task, wmsg, r->body.preread_size));
3520
3521 for(b = r->body.buf; b != NULL; b = b->next) {
3522 RC(nxt_app_msg_write_raw(task, wmsg, b->mem.pos,
3523 nxt_buf_mem_used_size(&b->mem)));
3524 }
3525
3526#undef NXT_WRITE
3527#undef RC
3528
3529 return NXT_OK;
3530
3531fail:
3532
3533 return NXT_ERROR;
3534}
3535
3536
3537static const nxt_conn_state_t nxt_router_conn_close_state
3538 nxt_aligned(64) =
3539{
3540 .ready_handler = nxt_router_conn_free,
3541};
3542
3543
3544static void
3545nxt_router_conn_ready(nxt_task_t *task, void *obj, void *data)
3546{
3547 nxt_buf_t *b;
3548 nxt_bool_t last;
3549 nxt_conn_t *c;
3550 nxt_work_queue_t *wq;
3551
3552 nxt_debug(task, "router conn ready %p", obj);
3553
3554 c = obj;
3555 b = c->write;
3556
3557 wq = &task->thread->engine->fast_work_queue;
3558
3559 last = 0;
3560
3561 while (b != NULL) {
3562 if (!nxt_buf_is_sync(b)) {
3563 if (nxt_buf_used_size(b) > 0) {
3564 break;
3565 }
3566 }
3567
3568 if (nxt_buf_is_last(b)) {
3569 last = 1;
3570 }
3571
3572 nxt_work_queue_add(wq, b->completion_handler, task, b, b->parent);
3573
3574 b = b->next;
3575 }
3576
3577 c->write = b;
3578
3579 if (b != NULL) {
3580 nxt_debug(task, "router conn %p has more data to write", obj);
3581
3582 nxt_conn_write(task->thread->engine, c);
3583
3584 } else {
3585 nxt_debug(task, "router conn %p no more data to write, last = %d", obj,
3586 last);
3587
3588 if (last != 0) {
3589 nxt_debug(task, "enqueue router conn close %p (ready handler)", c);
3590
3591 nxt_work_queue_add(wq, nxt_router_conn_close, task, c,
3592 c->socket.data);
3593 }
3594 }
3595}
3596
3597
3598static void
3599nxt_router_conn_close(nxt_task_t *task, void *obj, void *data)
3600{
3601 nxt_conn_t *c;
3602
3603 c = obj;
3604
3605 nxt_debug(task, "router conn close");
3606
3607 c->write_state = &nxt_router_conn_close_state;
3608
3609 nxt_conn_close(task->thread->engine, c);
3610}
3611
3612
3613static void
3614nxt_router_conn_mp_cleanup(nxt_task_t *task, void *obj, void *data)
3615{
3616 nxt_socket_conf_joint_t *joint;
3617
3618 joint = obj;
3619
3620 nxt_router_conf_release(task, joint);
3621}
3622
3623
3624static void
3625nxt_router_conn_free(nxt_task_t *task, void *obj, void *data)
3626{
3627 nxt_conn_t *c;
3628 nxt_event_engine_t *engine;
3629 nxt_req_conn_link_t *rc;
3630 nxt_app_parse_ctx_t *ap;
3631 nxt_socket_conf_joint_t *joint;
3632
3633 c = obj;
3634 ap = data;
3635
3636 nxt_debug(task, "router conn close done");
3637
3638 if (ap != NULL) {
3639 nxt_app_http_req_done(task, ap);
3640
3641 c->socket.data = NULL;
3642 }
3643
3644 nxt_queue_each(rc, &c->requests, nxt_req_conn_link_t, link) {
3645
3646 nxt_debug(task, "conn %p close, stream #%uD", c, rc->stream);
3647
3648 nxt_router_rc_unlink(task, rc);
3649
3650 nxt_port_rpc_cancel(task, task->thread->engine->port, rc->stream);
3651
3652 } nxt_queue_loop;
3653
3654 nxt_queue_remove(&c->link);
3655
3656 engine = task->thread->engine;
3657
3658 nxt_sockaddr_cache_free(engine, c);
3659
3660 joint = c->joint;
3661
3662 nxt_mp_cleanup(c->mem_pool, nxt_router_conn_mp_cleanup,
3663 &engine->task, joint, NULL);
3664
3665 nxt_mp_release(c->mem_pool, c);
3665 nxt_conn_free(task, c);
3666}
3667
3668
3669static void
3670nxt_router_conn_error(nxt_task_t *task, void *obj, void *data)
3671{
3672 nxt_conn_t *c;
3673
3674 c = obj;
3675
3676 nxt_debug(task, "router conn error");
3677
3678 if (c->socket.fd != -1) {
3679 c->write_state = &nxt_router_conn_close_state;
3680
3681 nxt_conn_close(task->thread->engine, c);
3682 }
3683}
3684
3685
3686static void
3687nxt_router_conn_timeout(nxt_task_t *task, void *obj, void *data)
3688{
3689 nxt_conn_t *c;
3690 nxt_timer_t *timer;
3691
3692 timer = obj;
3693
3694 nxt_debug(task, "router conn timeout");
3695
3696 c = nxt_read_timer_conn(timer);
3697
3698 if (c->read_state == &nxt_router_conn_read_header_state) {
3699 nxt_router_gen_error(task, c, 408, "Read header timeout");
3700
3701 } else {
3702 nxt_router_gen_error(task, c, 408, "Read body timeout");
3703 }
3704}
3705
3706
3707static void
3708nxt_router_app_timeout(nxt_task_t *task, void *obj, void *data)
3709{
3710 nxt_conn_t *c;
3711 nxt_timer_t *timer;
3712
3713 timer = obj;
3714
3715 nxt_debug(task, "router app timeout");
3716
3717 c = nxt_read_timer_conn(timer);
3718
3719 nxt_router_gen_error(task, c, 408, "Application timeout");
3720}
3721
3722
3723static nxt_msec_t
3724nxt_router_conn_timeout_value(nxt_conn_t *c, uintptr_t data)
3725{
3726 nxt_socket_conf_joint_t *joint;
3727
3728 joint = c->joint;
3729
3730 return nxt_value_at(nxt_msec_t, joint->socket_conf, data);
3731}
3666}
3667
3668
3669static void
3670nxt_router_conn_error(nxt_task_t *task, void *obj, void *data)
3671{
3672 nxt_conn_t *c;
3673
3674 c = obj;
3675
3676 nxt_debug(task, "router conn error");
3677
3678 if (c->socket.fd != -1) {
3679 c->write_state = &nxt_router_conn_close_state;
3680
3681 nxt_conn_close(task->thread->engine, c);
3682 }
3683}
3684
3685
3686static void
3687nxt_router_conn_timeout(nxt_task_t *task, void *obj, void *data)
3688{
3689 nxt_conn_t *c;
3690 nxt_timer_t *timer;
3691
3692 timer = obj;
3693
3694 nxt_debug(task, "router conn timeout");
3695
3696 c = nxt_read_timer_conn(timer);
3697
3698 if (c->read_state == &nxt_router_conn_read_header_state) {
3699 nxt_router_gen_error(task, c, 408, "Read header timeout");
3700
3701 } else {
3702 nxt_router_gen_error(task, c, 408, "Read body timeout");
3703 }
3704}
3705
3706
3707static void
3708nxt_router_app_timeout(nxt_task_t *task, void *obj, void *data)
3709{
3710 nxt_conn_t *c;
3711 nxt_timer_t *timer;
3712
3713 timer = obj;
3714
3715 nxt_debug(task, "router app timeout");
3716
3717 c = nxt_read_timer_conn(timer);
3718
3719 nxt_router_gen_error(task, c, 408, "Application timeout");
3720}
3721
3722
3723static nxt_msec_t
3724nxt_router_conn_timeout_value(nxt_conn_t *c, uintptr_t data)
3725{
3726 nxt_socket_conf_joint_t *joint;
3727
3728 joint = c->joint;
3729
3730 return nxt_value_at(nxt_msec_t, joint->socket_conf, data);
3731}