nxt_router.c (430:3a24c399394f) nxt_router.c (431:5817734dd9b9)
1
2/*
3 * Copyright (C) Igor Sysoev
4 * Copyright (C) Valentin V. Bartenev
5 * Copyright (C) NGINX, Inc.
6 */
7
8#include <nxt_router.h>
9#include <nxt_conf.h>
1
2/*
3 * Copyright (C) Igor Sysoev
4 * Copyright (C) Valentin V. Bartenev
5 * Copyright (C) NGINX, Inc.
6 */
7
8#include <nxt_router.h>
9#include <nxt_conf.h>
10#include <nxt_http.h>
10
11
12typedef struct {
13 nxt_str_t type;
14 uint32_t workers;
15 nxt_msec_t timeout;
16 nxt_msec_t res_timeout;
17 uint32_t requests;

--- 12 unchanged lines hidden (view full) ---

30 nxt_work_handler_t completion_handler;
31} nxt_msg_info_t;
32
33
34typedef struct nxt_req_app_link_s nxt_req_app_link_t;
35
36
37typedef struct {
11
12
13typedef struct {
14 nxt_str_t type;
15 uint32_t workers;
16 nxt_msec_t timeout;
17 nxt_msec_t res_timeout;
18 uint32_t requests;

--- 12 unchanged lines hidden (view full) ---

31 nxt_work_handler_t completion_handler;
32} nxt_msg_info_t;
33
34
35typedef struct nxt_req_app_link_s nxt_req_app_link_t;
36
37
38typedef struct {
38 uint32_t stream;
39 nxt_conn_t *conn;
40 nxt_app_t *app;
41 nxt_port_t *app_port;
42 nxt_app_parse_ctx_t *ap;
43 nxt_msg_info_t msg_info;
44 nxt_req_app_link_t *ra;
39 uint32_t stream;
40 nxt_app_t *app;
41 nxt_port_t *app_port;
42 nxt_app_parse_ctx_t *ap;
43 nxt_msg_info_t msg_info;
44 nxt_req_app_link_t *ra;
45
45
46 nxt_queue_link_t link; /* for nxt_conn_t.requests */
46 nxt_queue_link_t link; /* for nxt_conn_t.requests */
47} nxt_req_conn_link_t;
48
49
50struct nxt_req_app_link_s {
51 uint32_t stream;
52 nxt_atomic_t use_count;
53 nxt_port_t *app_port;
54 nxt_port_t *reply_port;

--- 139 unchanged lines hidden (view full) ---

194 nxt_port_recv_msg_t *msg, void *data);
195
196static nxt_port_t * nxt_router_app_get_idle_port(nxt_app_t *app);
197static void nxt_router_app_port_release(nxt_task_t *task, nxt_port_t *port,
198 uint32_t request_failed, uint32_t got_response);
199static nxt_int_t nxt_router_app_port(nxt_task_t *task, nxt_app_t *app,
200 nxt_req_app_link_t *ra);
201
47} nxt_req_conn_link_t;
48
49
50struct nxt_req_app_link_s {
51 uint32_t stream;
52 nxt_atomic_t use_count;
53 nxt_port_t *app_port;
54 nxt_port_t *reply_port;

--- 139 unchanged lines hidden (view full) ---

194 nxt_port_recv_msg_t *msg, void *data);
195
196static nxt_port_t * nxt_router_app_get_idle_port(nxt_app_t *app);
197static void nxt_router_app_port_release(nxt_task_t *task, nxt_port_t *port,
198 uint32_t request_failed, uint32_t got_response);
199static nxt_int_t nxt_router_app_port(nxt_task_t *task, nxt_app_t *app,
200 nxt_req_app_link_t *ra);
201
202static void nxt_router_conn_init(nxt_task_t *task, void *obj, void *data);
203static void nxt_router_conn_http_header_parse(nxt_task_t *task, void *obj,
204 void *data);
205static nxt_sockaddr_t *nxt_router_local_addr(nxt_task_t *task, nxt_conn_t *c);
206static void nxt_router_conn_http_body_read(nxt_task_t *task, void *obj,
207 void *data);
208static void nxt_router_process_http_request(nxt_task_t *task,
209 nxt_conn_t *c, nxt_app_parse_ctx_t *ap);
210static void nxt_router_app_prepare_request(nxt_task_t *task,
211 nxt_req_app_link_t *ra);
212static nxt_int_t nxt_python_prepare_msg(nxt_task_t *task, nxt_app_request_t *r,
213 nxt_app_wmsg_t *wmsg);
214static nxt_int_t nxt_php_prepare_msg(nxt_task_t *task, nxt_app_request_t *r,
215 nxt_app_wmsg_t *wmsg);
216static nxt_int_t nxt_go_prepare_msg(nxt_task_t *task, nxt_app_request_t *r,
217 nxt_app_wmsg_t *wmsg);
202static void nxt_router_app_prepare_request(nxt_task_t *task,
203 nxt_req_app_link_t *ra);
204static nxt_int_t nxt_python_prepare_msg(nxt_task_t *task, nxt_app_request_t *r,
205 nxt_app_wmsg_t *wmsg);
206static nxt_int_t nxt_php_prepare_msg(nxt_task_t *task, nxt_app_request_t *r,
207 nxt_app_wmsg_t *wmsg);
208static nxt_int_t nxt_go_prepare_msg(nxt_task_t *task, nxt_app_request_t *r,
209 nxt_app_wmsg_t *wmsg);
218static void nxt_router_conn_ready(nxt_task_t *task, void *obj, void *data);
219static void nxt_router_conn_close(nxt_task_t *task, void *obj, void *data);
220static void nxt_router_conn_free(nxt_task_t *task, void *obj, void *data);
210static void nxt_router_conn_free(nxt_task_t *task, void *obj, void *data);
221static void nxt_router_conn_error(nxt_task_t *task, void *obj, void *data);
222static void nxt_router_conn_timeout(nxt_task_t *task, void *obj, void *data);
223static void nxt_router_app_timeout(nxt_task_t *task, void *obj, void *data);
211static void nxt_router_app_timeout(nxt_task_t *task, void *obj, void *data);
224static nxt_msec_t nxt_router_conn_timeout_value(nxt_conn_t *c, uintptr_t data);
225
212
226static void nxt_router_gen_error(nxt_task_t *task, nxt_conn_t *c, int code,
227 const char* str);
213static const nxt_http_request_state_t nxt_http_request_send_state;
214static void nxt_http_request_send_body(nxt_task_t *task, void *obj, void *data);
228
229static nxt_router_t *nxt_router;
230
231
232static nxt_app_prepare_msg_t nxt_app_prepare_msg[] = {
233 nxt_python_prepare_msg,
234 nxt_php_prepare_msg,
235 nxt_go_prepare_msg,

--- 4 unchanged lines hidden (view full) ---

240nxt_router_start(nxt_task_t *task, void *data)
241{
242 nxt_int_t ret;
243 nxt_router_t *router;
244 nxt_runtime_t *rt;
245
246 rt = task->thread->runtime;
247
215
216static nxt_router_t *nxt_router;
217
218
219static nxt_app_prepare_msg_t nxt_app_prepare_msg[] = {
220 nxt_python_prepare_msg,
221 nxt_php_prepare_msg,
222 nxt_go_prepare_msg,

--- 4 unchanged lines hidden (view full) ---

227nxt_router_start(nxt_task_t *task, void *data)
228{
229 nxt_int_t ret;
230 nxt_router_t *router;
231 nxt_runtime_t *rt;
232
233 rt = task->thread->runtime;
234
248 ret = nxt_app_http_init(task, rt);
235 ret = nxt_http_init(task, rt);
249 if (nxt_slow_path(ret != NXT_OK)) {
250 return ret;
251 }
252
253 router = nxt_zalloc(sizeof(nxt_router_t));
254 if (nxt_slow_path(router == NULL)) {
255 return NXT_ERROR;
256 }

--- 240 unchanged lines hidden (view full) ---

497
498 nxt_router_ra_use(task, ra, -1);
499}
500
501
502static void
503nxt_router_ra_release(nxt_task_t *task, nxt_req_app_link_t *ra)
504{
236 if (nxt_slow_path(ret != NXT_OK)) {
237 return ret;
238 }
239
240 router = nxt_zalloc(sizeof(nxt_router_t));
241 if (nxt_slow_path(router == NULL)) {
242 return NXT_ERROR;
243 }

--- 240 unchanged lines hidden (view full) ---

484
485 nxt_router_ra_use(task, ra, -1);
486}
487
488
489static void
490nxt_router_ra_release(nxt_task_t *task, nxt_req_app_link_t *ra)
491{
505 nxt_mp_t *mp;
506 nxt_conn_t *c;
507 nxt_req_conn_link_t *rc;
492 nxt_mp_t *mp;
493 nxt_req_conn_link_t *rc;
508
509 nxt_assert(task->thread->engine == ra->work.data);
510 nxt_assert(ra->use_count == 0);
511
512 nxt_debug(task, "ra stream #%uD release", ra->stream);
513
514 rc = ra->rc;
515
516 if (rc != NULL) {
494
495 nxt_assert(task->thread->engine == ra->work.data);
496 nxt_assert(ra->use_count == 0);
497
498 nxt_debug(task, "ra stream #%uD release", ra->stream);
499
500 rc = ra->rc;
501
502 if (rc != NULL) {
517 c = rc->conn;
518
519 if (nxt_slow_path(ra->err_code != 0)) {
503 if (nxt_slow_path(ra->err_code != 0)) {
520 nxt_router_gen_error(task, c, ra->err_code, ra->err_str);
504 nxt_http_request_error(task, rc->ap->request, ra->err_code);
521
522 } else {
523 rc->app_port = ra->app_port;
524 rc->msg_info = ra->msg_info;
525
526 if (rc->app->timeout != 0) {
505
506 } else {
507 rc->app_port = ra->app_port;
508 rc->msg_info = ra->msg_info;
509
510 if (rc->app->timeout != 0) {
527 c->read_timer.handler = nxt_router_app_timeout;
528 nxt_timer_add(task->thread->engine, &c->read_timer,
511 rc->ap->timer.handler = nxt_router_app_timeout;
512 nxt_timer_add(task->thread->engine, &rc->ap->timer,
529 rc->app->timeout);
530 }
531
532 ra->app_port = NULL;
533 ra->msg_info.buf = NULL;
534 }
535
536 rc->ra = NULL;

--- 151 unchanged lines hidden (view full) ---

688 rc->app = NULL;
689 }
690
691 if (rc->ap != NULL) {
692 nxt_app_http_req_done(task, rc->ap);
693
694 rc->ap = NULL;
695 }
513 rc->app->timeout);
514 }
515
516 ra->app_port = NULL;
517 ra->msg_info.buf = NULL;
518 }
519
520 rc->ra = NULL;

--- 151 unchanged lines hidden (view full) ---

672 rc->app = NULL;
673 }
674
675 if (rc->ap != NULL) {
676 nxt_app_http_req_done(task, rc->ap);
677
678 rc->ap = NULL;
679 }
696
697 nxt_queue_remove(&rc->link);
698
699 rc->conn = NULL;
700}
701
702
703void
704nxt_router_new_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
705{
706 nxt_port_new_port_handler(task, msg);
707

--- 366 unchanged lines hidden (view full) ---

1074
1075 {
1076 nxt_string("max_body_size"),
1077 NXT_CONF_MAP_SIZE,
1078 offsetof(nxt_socket_conf_t, max_body_size),
1079 },
1080
1081 {
680}
681
682
683void
684nxt_router_new_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
685{
686 nxt_port_new_port_handler(task, msg);
687

--- 366 unchanged lines hidden (view full) ---

1054
1055 {
1056 nxt_string("max_body_size"),
1057 NXT_CONF_MAP_SIZE,
1058 offsetof(nxt_socket_conf_t, max_body_size),
1059 },
1060
1061 {
1062 nxt_string("idle_timeout"),
1063 NXT_CONF_MAP_MSEC,
1064 offsetof(nxt_socket_conf_t, idle_timeout),
1065 },
1066
1067 {
1082 nxt_string("header_read_timeout"),
1083 NXT_CONF_MAP_MSEC,
1084 offsetof(nxt_socket_conf_t, header_read_timeout),
1085 },
1086
1087 {
1088 nxt_string("body_read_timeout"),
1089 NXT_CONF_MAP_MSEC,
1090 offsetof(nxt_socket_conf_t, body_read_timeout),
1091 },
1068 nxt_string("header_read_timeout"),
1069 NXT_CONF_MAP_MSEC,
1070 offsetof(nxt_socket_conf_t, header_read_timeout),
1071 },
1072
1073 {
1074 nxt_string("body_read_timeout"),
1075 NXT_CONF_MAP_MSEC,
1076 offsetof(nxt_socket_conf_t, body_read_timeout),
1077 },
1078
1079 {
1080 nxt_string("send_timeout"),
1081 NXT_CONF_MAP_MSEC,
1082 offsetof(nxt_socket_conf_t, send_timeout),
1083 },
1092};
1093
1094
1095static nxt_int_t
1096nxt_router_conf_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf,
1097 u_char *start, u_char *end)
1098{
1099 u_char *p;

--- 191 unchanged lines hidden (view full) ---

1291 nxt_debug(task, "application: %V", &lscf.application);
1292
1293 // STUB, default values if http block is not defined.
1294 skcf->header_buffer_size = 2048;
1295 skcf->large_header_buffer_size = 8192;
1296 skcf->large_header_buffers = 4;
1297 skcf->body_buffer_size = 16 * 1024;
1298 skcf->max_body_size = 2 * 1024 * 1024;
1084};
1085
1086
1087static nxt_int_t
1088nxt_router_conf_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf,
1089 u_char *start, u_char *end)
1090{
1091 u_char *p;

--- 191 unchanged lines hidden (view full) ---

1283 nxt_debug(task, "application: %V", &lscf.application);
1284
1285 // STUB, default values if http block is not defined.
1286 skcf->header_buffer_size = 2048;
1287 skcf->large_header_buffer_size = 8192;
1288 skcf->large_header_buffers = 4;
1289 skcf->body_buffer_size = 16 * 1024;
1290 skcf->max_body_size = 2 * 1024 * 1024;
1291 skcf->idle_timeout = 65000;
1299 skcf->header_read_timeout = 5000;
1300 skcf->body_read_timeout = 5000;
1292 skcf->header_read_timeout = 5000;
1293 skcf->body_read_timeout = 5000;
1294 skcf->send_timeout = 5000;
1301
1302 if (http != NULL) {
1303 ret = nxt_conf_map_object(mp, http, nxt_router_http_conf,
1304 nxt_nitems(nxt_router_http_conf), skcf);
1305 if (ret != NXT_OK) {
1306 nxt_log(task, NXT_LOG_CRIT, "http map error");
1307 goto fail;
1308 }
1309 }
1310
1295
1296 if (http != NULL) {
1297 ret = nxt_conf_map_object(mp, http, nxt_router_http_conf,
1298 nxt_nitems(nxt_router_http_conf), skcf);
1299 if (ret != NXT_OK) {
1300 nxt_log(task, NXT_LOG_CRIT, "http map error");
1301 goto fail;
1302 }
1303 }
1304
1311 skcf->listen->handler = nxt_router_conn_init;
1305 skcf->listen->handler = nxt_http_conn_init;
1312 skcf->router_conf = tmcf->conf;
1313 skcf->router_conf->count++;
1314 skcf->application = nxt_router_listener_application(tmcf,
1315 &lscf.application);
1316 }
1317
1318 nxt_queue_add(&tmcf->deleting, &router->sockets);
1319 nxt_queue_init(&router->sockets);

--- 1052 unchanged lines hidden (view full) ---

2372 nxt_mp_destroy(engine->mem_pool);
2373
2374 nxt_event_engine_free(engine);
2375
2376 nxt_free(link);
2377}
2378
2379
1306 skcf->router_conf = tmcf->conf;
1307 skcf->router_conf->count++;
1308 skcf->application = nxt_router_listener_application(tmcf,
1309 &lscf.application);
1310 }
1311
1312 nxt_queue_add(&tmcf->deleting, &router->sockets);
1313 nxt_queue_init(&router->sockets);

--- 1052 unchanged lines hidden (view full) ---

2366 nxt_mp_destroy(engine->mem_pool);
2367
2368 nxt_event_engine_free(engine);
2369
2370 nxt_free(link);
2371}
2372
2373
2380static const nxt_conn_state_t nxt_router_conn_read_header_state
2381 nxt_aligned(64) =
2382{
2383 .ready_handler = nxt_router_conn_http_header_parse,
2384 .close_handler = nxt_router_conn_close,
2385 .error_handler = nxt_router_conn_error,
2386
2387 .timer_handler = nxt_router_conn_timeout,
2388 .timer_value = nxt_router_conn_timeout_value,
2389 .timer_data = offsetof(nxt_socket_conf_t, header_read_timeout),
2390};
2391
2392
2393static const nxt_conn_state_t nxt_router_conn_read_body_state
2394 nxt_aligned(64) =
2395{
2396 .ready_handler = nxt_router_conn_http_body_read,
2397 .close_handler = nxt_router_conn_close,
2398 .error_handler = nxt_router_conn_error,
2399
2400 .timer_handler = nxt_router_conn_timeout,
2401 .timer_value = nxt_router_conn_timeout_value,
2402 .timer_data = offsetof(nxt_socket_conf_t, body_read_timeout),
2403 .timer_autoreset = 1,
2404};
2405
2406
2407static void
2374static void
2408nxt_router_conn_init(nxt_task_t *task, void *obj, void *data)
2409{
2410 size_t size;
2411 nxt_conn_t *c;
2412 nxt_socket_conf_t *skcf;
2413 nxt_event_engine_t *engine;
2414 nxt_socket_conf_joint_t *joint;
2415
2416 c = obj;
2417 joint = data;
2418
2419 nxt_debug(task, "router conn init");
2420
2421 c->joint = joint;
2422 joint->count++;
2423
2424 skcf = joint->socket_conf;
2425 c->local = skcf->sockaddr;
2426
2427 size = skcf->header_buffer_size;
2428 c->read = nxt_buf_mem_alloc(c->mem_pool, size, 0);
2429
2430 c->socket.data = NULL;
2431
2432 engine = task->thread->engine;
2433 c->read_work_queue = &engine->fast_work_queue;
2434 c->write_work_queue = &engine->fast_work_queue;
2435
2436 c->read_state = &nxt_router_conn_read_header_state;
2437
2438 nxt_conn_read(engine, c);
2439}
2440
2441
2442static const nxt_conn_state_t nxt_router_conn_write_state
2443 nxt_aligned(64) =
2444{
2445 .ready_handler = nxt_router_conn_ready,
2446 .close_handler = nxt_router_conn_close,
2447 .error_handler = nxt_router_conn_error,
2448};
2449
2450
2451static void
2452nxt_router_response_ready_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg,
2453 void *data)
2454{
2455 size_t dump_size;
2375nxt_router_response_ready_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg,
2376 void *data)
2377{
2378 size_t dump_size;
2379 nxt_int_t ret;
2456 nxt_buf_t *b, *last;
2380 nxt_buf_t *b, *last;
2457 nxt_conn_t *c;
2458 nxt_event_engine_t *engine;
2381 nxt_http_request_t *r;
2459 nxt_req_conn_link_t *rc;
2382 nxt_req_conn_link_t *rc;
2383 nxt_app_parse_ctx_t *ar;
2460
2461 b = msg->buf;
2462 rc = data;
2463
2384
2385 b = msg->buf;
2386 rc = data;
2387
2464 c = rc->conn;
2465
2466 dump_size = nxt_buf_used_size(b);
2467
2468 if (dump_size > 300) {
2469 dump_size = 300;
2470 }
2471
2472 nxt_debug(task, "%srouter app data (%z): %*s",
2473 msg->port_msg.last ? "last " : "", msg->size, dump_size,
2474 b->mem.pos);
2475
2476 if (msg->size == 0) {
2477 b = NULL;
2478 }
2479
2388 dump_size = nxt_buf_used_size(b);
2389
2390 if (dump_size > 300) {
2391 dump_size = 300;
2392 }
2393
2394 nxt_debug(task, "%srouter app data (%z): %*s",
2395 msg->port_msg.last ? "last " : "", msg->size, dump_size,
2396 b->mem.pos);
2397
2398 if (msg->size == 0) {
2399 b = NULL;
2400 }
2401
2480 engine = task->thread->engine;
2402 ar = rc->ap;
2481
2403
2482 nxt_timer_disable(engine, &c->read_timer);
2483
2484 if (msg->port_msg.last != 0) {
2485 nxt_debug(task, "router data create last buf");
2486
2404 if (msg->port_msg.last != 0) {
2405 nxt_debug(task, "router data create last buf");
2406
2487 last = nxt_buf_sync_alloc(c->mem_pool, NXT_BUF_SYNC_LAST);
2407 last = nxt_http_request_last_buffer(task, ar->request);
2488 if (nxt_slow_path(last == NULL)) {
2408 if (nxt_slow_path(last == NULL)) {
2489 /* TODO pogorevaTb */
2409 nxt_app_http_req_done(task, ar);
2410 nxt_router_rc_unlink(task, rc);
2411 return;
2490 }
2491
2492 nxt_buf_chain_add(&b, last);
2493
2494 nxt_router_rc_unlink(task, rc);
2495
2496 } else {
2497 if (rc->app->timeout != 0) {
2412 }
2413
2414 nxt_buf_chain_add(&b, last);
2415
2416 nxt_router_rc_unlink(task, rc);
2417
2418 } else {
2419 if (rc->app->timeout != 0) {
2498 c->read_timer.handler = nxt_router_app_timeout;
2499 nxt_timer_add(engine, &c->read_timer, rc->app->timeout);
2420 ar->timer.handler = nxt_router_app_timeout;
2421 nxt_timer_add(task->thread->engine, &ar->timer, rc->app->timeout);
2500 }
2501 }
2502
2503 if (b == NULL) {
2504 return;
2505 }
2506
2507 if (msg->buf == b) {
2508 /* Disable instant buffer completion/re-using by port. */
2509 msg->buf = NULL;
2510 }
2511
2422 }
2423 }
2424
2425 if (b == NULL) {
2426 return;
2427 }
2428
2429 if (msg->buf == b) {
2430 /* Disable instant buffer completion/re-using by port. */
2431 msg->buf = NULL;
2432 }
2433
2512 if (c->write == NULL) {
2513 c->write = b;
2514 c->write_state = &nxt_router_conn_write_state;
2434 r = ar->request;
2515
2435
2516 nxt_conn_write(task->thread->engine, c);
2436 if (r->header_sent) {
2437 nxt_buf_chain_add(&r->out, b);
2438 nxt_http_request_send_body(task, r, NULL);
2517
2518 } else {
2439
2440 } else {
2519 nxt_debug(task, "router data attach out bufs to existing chain");
2441 ret = nxt_http_parse_fields(&ar->resp_parser, &b->mem);
2442 if (nxt_slow_path(ret != NXT_DONE)) {
2443 goto fail;
2444 }
2520
2445
2521 nxt_buf_chain_add(&c->write, b);
2446 r->resp.fields = ar->resp_parser.fields;
2447
2448 ret = nxt_http_fields_process(r->resp.fields,
2449 &nxt_response_fields_hash, r);
2450 if (nxt_slow_path(ret != NXT_OK)) {
2451 goto fail;
2452 }
2453
2454 if (nxt_buf_mem_used_size(&b->mem) != 0) {
2455 nxt_buf_chain_add(&r->out, b);
2456 }
2457
2458 r->state = &nxt_http_request_send_state;
2459
2460 nxt_http_request_header_send(task, r);
2522 }
2461 }
2462
2463 return;
2464
2465fail:
2466
2467 nxt_app_http_req_done(task, ar);
2468 nxt_router_rc_unlink(task, rc);
2469
2470 nxt_http_request_error(task, r, NXT_HTTP_SERVICE_UNAVAILABLE);
2523}
2524
2525
2471}
2472
2473
2474static const nxt_http_request_state_t nxt_http_request_send_state
2475 nxt_aligned(64) =
2476{
2477 .ready_handler = nxt_http_request_send_body,
2478 .error_handler = nxt_http_request_close_handler,
2479};
2480
2481
2526static void
2482static void
2483nxt_http_request_send_body(nxt_task_t *task, void *obj, void *data)
2484{
2485 nxt_buf_t *out;
2486 nxt_http_request_t *r;
2487
2488 r = obj;
2489
2490 out = r->out;
2491
2492 if (out != NULL) {
2493 r->out = NULL;
2494 nxt_http_request_send(task, r, out);
2495 }
2496}
2497
2498
2499static void
2527nxt_router_response_error_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg,
2528 void *data)
2529{
2530 nxt_int_t res;
2531 nxt_port_t *port;
2532 nxt_bool_t cancelled;
2533 nxt_req_app_link_t *ra;
2534 nxt_req_conn_link_t *rc;

--- 22 unchanged lines hidden (view full) ---

2557 }
2558
2559 msg->port_msg.last = 0;
2560
2561 return;
2562 }
2563 }
2564
2500nxt_router_response_error_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg,
2501 void *data)
2502{
2503 nxt_int_t res;
2504 nxt_port_t *port;
2505 nxt_bool_t cancelled;
2506 nxt_req_app_link_t *ra;
2507 nxt_req_conn_link_t *rc;

--- 22 unchanged lines hidden (view full) ---

2530 }
2531
2532 msg->port_msg.last = 0;
2533
2534 return;
2535 }
2536 }
2537
2565 nxt_router_gen_error(task, rc->conn, 500,
2566 "Application terminated unexpectedly");
2538 nxt_http_request_error(task, rc->ap->request, NXT_HTTP_SERVICE_UNAVAILABLE);
2567
2568 nxt_router_rc_unlink(task, rc);
2569}
2570
2571
2539
2540 nxt_router_rc_unlink(task, rc);
2541}
2542
2543
2572nxt_inline const char *
2573nxt_router_text_by_code(int code)
2574{
2575 switch (code) {
2576 case 400: return "Bad request";
2577 case 404: return "Not found";
2578 case 403: return "Forbidden";
2579 case 408: return "Request Timeout";
2580 case 411: return "Length Required";
2581 case 413: return "Request Entity Too Large";
2582 case 500:
2583 default: return "Internal server error";
2584 }
2585}
2586
2587
2588static nxt_buf_t *
2589nxt_router_get_error_buf(nxt_task_t *task, nxt_mp_t *mp, int code,
2590 const char* str)
2591{
2592 nxt_buf_t *b, *last;
2593
2594 b = nxt_buf_mem_alloc(mp, 16384, 0);
2595 if (nxt_slow_path(b == NULL)) {
2596 return NULL;
2597 }
2598
2599 b->mem.free = nxt_sprintf(b->mem.free, b->mem.end,
2600 "HTTP/1.0 %d %s\r\n"
2601 "Content-Type: text/plain\r\n"
2602 "Connection: close\r\n\r\n",
2603 code, nxt_router_text_by_code(code));
2604
2605 b->mem.free = nxt_cpymem(b->mem.free, str, nxt_strlen(str));
2606
2607 last = nxt_buf_sync_alloc(mp, NXT_BUF_SYNC_LAST);
2608
2609 if (nxt_slow_path(last == NULL)) {
2610 nxt_mp_free(mp, b);
2611 return NULL;
2612 }
2613
2614 nxt_buf_chain_add(&b, last);
2615
2616 return b;
2617}
2618
2619
2620
2621static void
2544static void
2622nxt_router_gen_error(nxt_task_t *task, nxt_conn_t *c, int code,
2623 const char* str)
2624{
2625 nxt_mp_t *mp;
2626 nxt_buf_t *b;
2627
2628 /* TODO: fix when called in the middle of response */
2629
2630 nxt_log_alert(task->log, "error %d: %s", code, str);
2631
2632 if (c->socket.fd == -1) {
2633 return;
2634 }
2635
2636 mp = c->mem_pool;
2637
2638 b = nxt_router_get_error_buf(task, mp, code, str);
2639 if (nxt_slow_path(b == NULL)) {
2640 return;
2641 }
2642
2643 if (c->write == NULL) {
2644 c->write = b;
2645 c->write_state = &nxt_router_conn_write_state;
2646
2647 nxt_conn_write(task->thread->engine, c);
2648
2649 } else {
2650 nxt_debug(task, "router data attach out bufs to existing chain");
2651
2652 nxt_buf_chain_add(&c->write, b);
2653 }
2654}
2655
2656
2657static void
2658nxt_router_app_port_ready(nxt_task_t *task, nxt_port_recv_msg_t *msg,
2659 void *data)
2660{
2661 nxt_app_t *app;
2662 nxt_port_t *port;
2663
2664 app = data;
2665 port = msg->u.new_port;

--- 557 unchanged lines hidden (view full) ---

3223 nxt_router_port_select(task, &state);
3224
3225 nxt_thread_mutex_unlock(&app->mutex);
3226
3227 return nxt_router_port_post_select(task, &state);
3228}
3229
3230
2545nxt_router_app_port_ready(nxt_task_t *task, nxt_port_recv_msg_t *msg,
2546 void *data)
2547{
2548 nxt_app_t *app;
2549 nxt_port_t *port;
2550
2551 app = data;
2552 port = msg->u.new_port;

--- 557 unchanged lines hidden (view full) ---

3110 nxt_router_port_select(task, &state);
3111
3112 nxt_thread_mutex_unlock(&app->mutex);
3113
3114 return nxt_router_port_post_select(task, &state);
3115}
3116
3117
3231static void
3232nxt_router_conn_http_header_parse(nxt_task_t *task, void *obj, void *data)
3118void
3119nxt_router_process_http_request(nxt_task_t *task, nxt_app_parse_ctx_t *ar)
3233{
3120{
3234 size_t size;
3235 nxt_int_t ret;
3236 nxt_buf_t *buf;
3237 nxt_conn_t *c;
3238 nxt_sockaddr_t *local;
3239 nxt_app_parse_ctx_t *ap;
3240 nxt_app_request_body_t *b;
3241 nxt_socket_conf_joint_t *joint;
3242 nxt_app_request_header_t *h;
3121 nxt_int_t res;
3122 nxt_app_t *app;
3123 nxt_port_t *port;
3124 nxt_event_engine_t *engine;
3125 nxt_http_request_t *r;
3126 nxt_req_app_link_t ra_local, *ra;
3127 nxt_req_conn_link_t *rc;
3243
3128
3244 c = obj;
3245 ap = data;
3246 buf = c->read;
3247 joint = c->joint;
3129 r = ar->request;
3130 app = r->socket_conf->application;
3248
3131
3249 nxt_debug(task, "router conn http header parse");
3250
3251 if (ap == NULL) {
3252 ap = nxt_app_http_req_init(task);
3253 if (nxt_slow_path(ap == NULL)) {
3254 nxt_router_gen_error(task, c, 500,
3255 "Failed to allocate parse context");
3256 return;
3257 }
3258
3259 c->socket.data = ap;
3260
3261 ap->r.remote.start = nxt_sockaddr_address(c->remote);
3262 ap->r.remote.length = c->remote->address_length;
3263
3264 /*
3265 * TODO: need an application flag to get local address
3266 * required by "SERVER_ADDR" in Pyhton and PHP. Not used in Go.
3267 */
3268 local = nxt_router_local_addr(task, c);
3269
3270 if (nxt_fast_path(local != NULL)) {
3271 ap->r.local.start = nxt_sockaddr_address(local);
3272 ap->r.local.length = local->address_length;
3273 }
3274
3275 ap->r.header.buf = buf;
3276 }
3277
3278 h = &ap->r.header;
3279 b = &ap->r.body;
3280
3281 ret = nxt_app_http_req_header_parse(task, ap, buf);
3282
3283 nxt_debug(task, "http parse request header: %d", ret);
3284
3285 switch (nxt_expect(NXT_DONE, ret)) {
3286
3287 case NXT_DONE:
3288 nxt_debug(task, "router request header parsing complete, "
3289 "content length: %O, preread: %uz",
3290 h->parsed_content_length, nxt_buf_mem_used_size(&buf->mem));
3291
3292 if (b->done) {
3293 nxt_router_process_http_request(task, c, ap);
3294
3295 return;
3296 }
3297
3298 if (joint->socket_conf->max_body_size > 0
3299 && (size_t) h->parsed_content_length
3300 > joint->socket_conf->max_body_size)
3301 {
3302 nxt_router_gen_error(task, c, 413, "Content-Length too big");
3303 return;
3304 }
3305
3306 if (nxt_buf_mem_free_size(&buf->mem) == 0) {
3307 size = nxt_min(joint->socket_conf->body_buffer_size,
3308 (size_t) h->parsed_content_length);
3309
3310 buf->next = nxt_buf_mem_alloc(c->mem_pool, size, 0);
3311 if (nxt_slow_path(buf->next == NULL)) {
3312 nxt_router_gen_error(task, c, 500, "Failed to allocate "
3313 "buffer for request body");
3314 return;
3315 }
3316
3317 c->read = buf->next;
3318
3319 b->preread_size += nxt_buf_mem_used_size(&buf->mem);
3320 }
3321
3322 if (b->buf == NULL) {
3323 b->buf = c->read;
3324 }
3325
3326 c->read_state = &nxt_router_conn_read_body_state;
3327 break;
3328
3329 case NXT_ERROR:
3330 nxt_router_gen_error(task, c, 400, "Request header parse error");
3331 return;
3332
3333 default: /* NXT_AGAIN */
3334
3335 if (c->read->mem.free == c->read->mem.end) {
3336 size = joint->socket_conf->large_header_buffer_size;
3337
3338 if (size <= (size_t) nxt_buf_mem_used_size(&buf->mem)
3339 || ap->r.header.bufs
3340 >= joint->socket_conf->large_header_buffers)
3341 {
3342 nxt_router_gen_error(task, c, 413,
3343 "Too long request headers");
3344 return;
3345 }
3346
3347 buf->next = nxt_buf_mem_alloc(c->mem_pool, size, 0);
3348 if (nxt_slow_path(buf->next == NULL)) {
3349 nxt_router_gen_error(task, c, 500,
3350 "Failed to allocate large header "
3351 "buffer");
3352 return;
3353 }
3354
3355 ap->r.header.bufs++;
3356
3357 size = c->read->mem.free - c->read->mem.pos;
3358
3359 c->read = nxt_buf_cpy(buf->next, c->read->mem.pos, size);
3360 }
3361
3362 }
3363
3364 nxt_conn_read(task->thread->engine, c);
3365}
3366
3367
3368static nxt_sockaddr_t *
3369nxt_router_local_addr(nxt_task_t *task, nxt_conn_t *c)
3370{
3371 int ret;
3372 size_t size, length;
3373 socklen_t socklen;
3374 nxt_sockaddr_t *sa;
3375
3376 if (c->local != NULL) {
3377 return c->local;
3378 }
3379
3380 /* AF_UNIX should not get in here. */
3381
3382 switch (c->remote->u.sockaddr.sa_family) {
3383#if (NXT_INET6)
3384 case AF_INET6:
3385 socklen = sizeof(struct sockaddr_in6);
3386 length = NXT_INET6_ADDR_STR_LEN;
3387 size = offsetof(nxt_sockaddr_t, u) + socklen + length;
3388 break;
3389#endif
3390 case AF_INET:
3391 default:
3392 socklen = sizeof(struct sockaddr_in);
3393 length = NXT_INET_ADDR_STR_LEN;
3394 size = offsetof(nxt_sockaddr_t, u) + socklen + length;
3395 break;
3396 }
3397
3398 sa = nxt_mp_get(c->mem_pool, size);
3399 if (nxt_slow_path(sa == NULL)) {
3400 return NULL;
3401 }
3402
3403 sa->socklen = socklen;
3404 sa->length = length;
3405
3406 ret = getsockname(c->socket.fd, &sa->u.sockaddr, &socklen);
3407 if (nxt_slow_path(ret != 0)) {
3408 nxt_log(task, NXT_LOG_CRIT, "getsockname(%d) failed", c->socket.fd);
3409 return NULL;
3410 }
3411
3412 c->local = sa;
3413
3414 nxt_sockaddr_text(sa);
3415
3416 /*
3417 * TODO: here we can adjust the end of non-freeable block
3418 * in c->mem_pool to the end of actual sockaddr length.
3419 */
3420
3421 return sa;
3422}
3423
3424
3425static void
3426nxt_router_conn_http_body_read(nxt_task_t *task, void *obj, void *data)
3427{
3428 size_t size;
3429 nxt_int_t ret;
3430 nxt_buf_t *buf;
3431 nxt_conn_t *c;
3432 nxt_app_parse_ctx_t *ap;
3433 nxt_app_request_body_t *b;
3434 nxt_socket_conf_joint_t *joint;
3435 nxt_app_request_header_t *h;
3436
3437 c = obj;
3438 ap = data;
3439 buf = c->read;
3440
3441 nxt_debug(task, "router conn http body read");
3442
3443 nxt_assert(ap != NULL);
3444
3445 b = &ap->r.body;
3446 h = &ap->r.header;
3447
3448 ret = nxt_app_http_req_body_read(task, ap, buf);
3449
3450 nxt_debug(task, "http read request body: %d", ret);
3451
3452 switch (nxt_expect(NXT_DONE, ret)) {
3453
3454 case NXT_DONE:
3455 nxt_router_process_http_request(task, c, ap);
3456 return;
3457
3458 case NXT_ERROR:
3459 nxt_router_gen_error(task, c, 500, "Read body error");
3460 return;
3461
3462 default: /* NXT_AGAIN */
3463
3464 if (nxt_buf_mem_free_size(&buf->mem) == 0) {
3465 joint = c->joint;
3466
3467 b->preread_size += nxt_buf_mem_used_size(&buf->mem);
3468
3469 size = nxt_min(joint->socket_conf->body_buffer_size,
3470 (size_t) h->parsed_content_length - b->preread_size);
3471
3472 buf->next = nxt_buf_mem_alloc(c->mem_pool, size, 0);
3473 if (nxt_slow_path(buf->next == NULL)) {
3474 nxt_router_gen_error(task, c, 500, "Failed to allocate "
3475 "buffer for request body");
3476 return;
3477 }
3478
3479 c->read = buf->next;
3480 }
3481
3482 nxt_debug(task, "router request body read again, rest: %uz",
3483 h->parsed_content_length - b->preread_size);
3484 }
3485
3486 nxt_conn_read(task->thread->engine, c);
3487}
3488
3489
3490static void
3491nxt_router_process_http_request(nxt_task_t *task, nxt_conn_t *c,
3492 nxt_app_parse_ctx_t *ap)
3493{
3494 nxt_int_t res;
3495 nxt_app_t *app;
3496 nxt_port_t *port;
3497 nxt_event_engine_t *engine;
3498 nxt_req_app_link_t ra_local, *ra;
3499 nxt_req_conn_link_t *rc;
3500 nxt_socket_conf_joint_t *joint;
3501
3502 joint = c->joint;
3503 app = joint->socket_conf->application;
3504
3505 if (app == NULL) {
3132 if (app == NULL) {
3506 nxt_router_gen_error(task, c, 500,
3507 "Application is NULL in socket_conf");
3133 nxt_http_request_error(task, r, NXT_HTTP_INTERNAL_SERVER_ERROR);
3508 return;
3509 }
3510
3511 engine = task->thread->engine;
3512
3513 rc = nxt_port_rpc_register_handler_ex(task, engine->port,
3514 nxt_router_response_ready_handler,
3515 nxt_router_response_error_handler,
3516 sizeof(nxt_req_conn_link_t));
3517
3518 if (nxt_slow_path(rc == NULL)) {
3134 return;
3135 }
3136
3137 engine = task->thread->engine;
3138
3139 rc = nxt_port_rpc_register_handler_ex(task, engine->port,
3140 nxt_router_response_ready_handler,
3141 nxt_router_response_error_handler,
3142 sizeof(nxt_req_conn_link_t));
3143
3144 if (nxt_slow_path(rc == NULL)) {
3519 nxt_router_gen_error(task, c, 500, "Failed to allocate "
3520 "req<->conn link");
3521
3145 nxt_http_request_error(task, r, NXT_HTTP_INTERNAL_SERVER_ERROR);
3522 return;
3523 }
3524
3525 rc->stream = nxt_port_rpc_ex_stream(rc);
3146 return;
3147 }
3148
3149 rc->stream = nxt_port_rpc_ex_stream(rc);
3526 rc->conn = c;
3527 rc->app = app;
3528
3529 nxt_router_app_use(task, app, 1);
3530
3150 rc->app = app;
3151
3152 nxt_router_app_use(task, app, 1);
3153
3531 nxt_timer_disable(engine, &c->read_timer);
3154 rc->ap = ar;
3532
3155
3533 nxt_queue_insert_tail(&c->requests, &rc->link);
3534
3535 nxt_debug(task, "stream #%uD linked to conn %p at engine %p",
3536 rc->stream, c, engine);
3537
3538 rc->ap = ap;
3539 c->socket.data = NULL;
3540
3541 ra = &ra_local;
3542 nxt_router_ra_init(task, ra, rc);
3543
3544 res = nxt_router_app_port(task, app, ra);
3545
3546 if (res != NXT_OK) {
3547 return;
3548 }

--- 358 unchanged lines hidden (view full) ---

3907 return NXT_OK;
3908
3909fail:
3910
3911 return NXT_ERROR;
3912}
3913
3914
3156 ra = &ra_local;
3157 nxt_router_ra_init(task, ra, rc);
3158
3159 res = nxt_router_app_port(task, app, ra);
3160
3161 if (res != NXT_OK) {
3162 return;
3163 }

--- 358 unchanged lines hidden (view full) ---

3522 return NXT_OK;
3523
3524fail:
3525
3526 return NXT_ERROR;
3527}
3528
3529
3915static const nxt_conn_state_t nxt_router_conn_close_state
3530const nxt_conn_state_t nxt_router_conn_close_state
3916 nxt_aligned(64) =
3917{
3918 .ready_handler = nxt_router_conn_free,
3919};
3920
3921
3922static void
3531 nxt_aligned(64) =
3532{
3533 .ready_handler = nxt_router_conn_free,
3534};
3535
3536
3537static void
3923nxt_router_conn_ready(nxt_task_t *task, void *obj, void *data)
3924{
3925 nxt_buf_t *b;
3926 nxt_bool_t last;
3927 nxt_conn_t *c;
3928 nxt_work_queue_t *wq;
3929
3930 nxt_debug(task, "router conn ready %p", obj);
3931
3932 c = obj;
3933 b = c->write;
3934
3935 wq = &task->thread->engine->fast_work_queue;
3936
3937 last = 0;
3938
3939 while (b != NULL) {
3940 if (!nxt_buf_is_sync(b)) {
3941 if (nxt_buf_used_size(b) > 0) {
3942 break;
3943 }
3944 }
3945
3946 if (nxt_buf_is_last(b)) {
3947 last = 1;
3948 }
3949
3950 nxt_work_queue_add(wq, b->completion_handler, task, b, b->parent);
3951
3952 b = b->next;
3953 }
3954
3955 c->write = b;
3956
3957 if (b != NULL) {
3958 nxt_debug(task, "router conn %p has more data to write", obj);
3959
3960 nxt_conn_write(task->thread->engine, c);
3961
3962 } else {
3963 nxt_debug(task, "router conn %p no more data to write, last = %d", obj,
3964 last);
3965
3966 if (last != 0) {
3967 nxt_debug(task, "enqueue router conn close %p (ready handler)", c);
3968
3969 nxt_work_queue_add(wq, nxt_router_conn_close, task, c,
3970 c->socket.data);
3971 }
3972 }
3973}
3974
3975
3976static void
3977nxt_router_conn_close(nxt_task_t *task, void *obj, void *data)
3978{
3979 nxt_conn_t *c;
3980
3981 c = obj;
3982
3983 nxt_debug(task, "router conn close");
3984
3985 c->write_state = &nxt_router_conn_close_state;
3986
3987 nxt_conn_close(task->thread->engine, c);
3988}
3989
3990
3991static void
3992nxt_router_conn_mp_cleanup(nxt_task_t *task, void *obj, void *data)
3993{
3994 nxt_socket_conf_joint_t *joint;
3995
3996 joint = obj;
3997
3998 nxt_router_conf_release(task, joint);
3999}
4000
4001
4002static void
4003nxt_router_conn_free(nxt_task_t *task, void *obj, void *data)
4004{
4005 nxt_conn_t *c;
4006 nxt_event_engine_t *engine;
3538nxt_router_conn_mp_cleanup(nxt_task_t *task, void *obj, void *data)
3539{
3540 nxt_socket_conf_joint_t *joint;
3541
3542 joint = obj;
3543
3544 nxt_router_conf_release(task, joint);
3545}
3546
3547
3548static void
3549nxt_router_conn_free(nxt_task_t *task, void *obj, void *data)
3550{
3551 nxt_conn_t *c;
3552 nxt_event_engine_t *engine;
4007 nxt_req_conn_link_t *rc;
4008 nxt_app_parse_ctx_t *ap;
4009 nxt_socket_conf_joint_t *joint;
4010
4011 c = obj;
3553 nxt_socket_conf_joint_t *joint;
3554
3555 c = obj;
4012 ap = data;
4013
4014 nxt_debug(task, "router conn close done");
4015
3556
3557 nxt_debug(task, "router conn close done");
3558
4016 if (ap != NULL) {
4017 nxt_app_http_req_done(task, ap);
4018
4019 c->socket.data = NULL;
4020 }
4021
4022 nxt_queue_each(rc, &c->requests, nxt_req_conn_link_t, link) {
4023
4024 nxt_debug(task, "conn %p close, stream #%uD", c, rc->stream);
4025
4026 nxt_router_rc_unlink(task, rc);
4027
4028 nxt_port_rpc_cancel(task, task->thread->engine->port, rc->stream);
4029
4030 } nxt_queue_loop;
4031
4032 nxt_queue_remove(&c->link);
4033
4034 engine = task->thread->engine;
4035
4036 nxt_sockaddr_cache_free(engine, c);
4037
4038 joint = c->joint;
4039
4040 nxt_mp_cleanup(c->mem_pool, nxt_router_conn_mp_cleanup,
4041 &engine->task, joint, NULL);
4042
4043 nxt_conn_free(task, c);
4044}
4045
4046
4047static void
3559 nxt_queue_remove(&c->link);
3560
3561 engine = task->thread->engine;
3562
3563 nxt_sockaddr_cache_free(engine, c);
3564
3565 joint = c->joint;
3566
3567 nxt_mp_cleanup(c->mem_pool, nxt_router_conn_mp_cleanup,
3568 &engine->task, joint, NULL);
3569
3570 nxt_conn_free(task, c);
3571}
3572
3573
3574static void
4048nxt_router_conn_error(nxt_task_t *task, void *obj, void *data)
4049{
4050 nxt_conn_t *c;
4051
4052 c = obj;
4053
4054 nxt_debug(task, "router conn error");
4055
4056 if (c->socket.fd != -1) {
4057 c->write_state = &nxt_router_conn_close_state;
4058
4059 nxt_conn_close(task->thread->engine, c);
4060 }
4061}
4062
4063
4064static void
4065nxt_router_conn_timeout(nxt_task_t *task, void *obj, void *data)
4066{
4067 nxt_conn_t *c;
4068 nxt_timer_t *timer;
4069
4070 timer = obj;
4071
4072 nxt_debug(task, "router conn timeout");
4073
4074 c = nxt_read_timer_conn(timer);
4075
4076 if (c->read_state == &nxt_router_conn_read_header_state) {
4077 nxt_router_gen_error(task, c, 408, "Read header timeout");
4078
4079 } else {
4080 nxt_router_gen_error(task, c, 408, "Read body timeout");
4081 }
4082}
4083
4084
4085static void
4086nxt_router_app_timeout(nxt_task_t *task, void *obj, void *data)
4087{
3575nxt_router_app_timeout(nxt_task_t *task, void *obj, void *data)
3576{
4088 nxt_conn_t *c;
4089 nxt_timer_t *timer;
3577 nxt_timer_t *timer;
3578 nxt_app_parse_ctx_t *ar;
4090
4091 timer = obj;
4092
4093 nxt_debug(task, "router app timeout");
4094
3579
3580 timer = obj;
3581
3582 nxt_debug(task, "router app timeout");
3583
4095 c = nxt_read_timer_conn(timer);
3584 ar = nxt_timer_data(timer, nxt_app_parse_ctx_t, timer);
4096
3585
4097 nxt_router_gen_error(task, c, 408, "Application timeout");
3586 if (!ar->request->header_sent) {
3587 nxt_http_request_error(task, ar->request, NXT_HTTP_SERVICE_UNAVAILABLE);
3588 }
4098}
3589}
4099
4100
4101static nxt_msec_t
4102nxt_router_conn_timeout_value(nxt_conn_t *c, uintptr_t data)
4103{
4104 nxt_socket_conf_joint_t *joint;
4105
4106 joint = c->joint;
4107
4108 return nxt_value_at(nxt_msec_t, joint->socket_conf, data);
4109}