Deleted Added
1
2/*
3 * Copyright (C) Igor Sysoev
4 * Copyright (C) Valentin V. Bartenev
5 * Copyright (C) NGINX, Inc.
6 */
7
8#include <nxt_router.h>
9#include <nxt_conf.h>
10#if (NXT_TLS)
11#include <nxt_cert.h>
12#endif
13#include <nxt_http.h>
14#include <nxt_port_memory_int.h>
15#include <nxt_unit_request.h>
16#include <nxt_unit_response.h>
17#include <nxt_router_request.h>
18
19typedef struct {
20 nxt_str_t type;
21 uint32_t processes;
22 uint32_t max_processes;
23 uint32_t spare_processes;
24 nxt_msec_t timeout;
25 nxt_msec_t res_timeout;
26 nxt_msec_t idle_timeout;

--- 16 unchanged lines hidden (view full) ---

43 nxt_socket_conf_t *conf;
44
45 nxt_queue_link_t link; /* for nxt_socket_conf_t.tls */
46} nxt_router_tlssock_t;
47
48#endif
49
50
51typedef struct {
52 nxt_socket_conf_t *socket_conf;
53 nxt_router_temp_conf_t *temp_conf;
54} nxt_socket_rpc_t;
55
56
57typedef struct {
58 nxt_app_t *app;
59 nxt_router_temp_conf_t *temp_conf;

--- 182 unchanged lines hidden (view full) ---

242static void nxt_router_app_joint_use(nxt_task_t *task,
243 nxt_app_joint_t *app_joint, int i);
244
245static nxt_int_t nxt_router_http_request_done(nxt_task_t *task,
246 nxt_http_request_t *r);
247static void nxt_router_http_request_release(nxt_task_t *task, void *obj,
248 void *data);
249
250const nxt_http_request_state_t nxt_http_websocket;
251
252static nxt_router_t *nxt_router;
253
254static const nxt_str_t http_prefix = nxt_string("HTTP_");
255static const nxt_str_t empty_prefix = nxt_string("");
256
257static const nxt_str_t *nxt_app_msg_prefix[] = {
258 &empty_prefix,
259 &http_prefix,

--- 342 unchanged lines hidden (view full) ---

602}
603
604
605static void
606nxt_request_app_link_release(nxt_task_t *task,
607 nxt_request_app_link_t *req_app_link)
608{
609 nxt_mp_t *mp;
610 nxt_http_request_t *r;
611 nxt_request_rpc_data_t *req_rpc_data;
612
613 nxt_assert(task->thread->engine == req_app_link->work.data);
614 nxt_assert(req_app_link->use_count == 0);
615
616 nxt_debug(task, "req_app_link stream #%uD release", req_app_link->stream);
617
618 req_rpc_data = req_app_link->req_rpc_data;

--- 4 unchanged lines hidden (view full) ---

623 req_app_link->err_code);
624
625 } else {
626 req_rpc_data->app_port = req_app_link->app_port;
627 req_rpc_data->apr_action = req_app_link->apr_action;
628 req_rpc_data->msg_info = req_app_link->msg_info;
629
630 if (req_rpc_data->app->timeout != 0) {
631 r = req_rpc_data->request;
632
633 r->timer.handler = nxt_router_app_timeout;
634 r->timer_data = req_rpc_data;
635 nxt_timer_add(task->thread->engine, &r->timer,
636 req_rpc_data->app->timeout);
637 }
638
639 req_app_link->app_port = NULL;
640 req_app_link->msg_info.buf = NULL;
641 }
642
643 req_rpc_data->req_app_link = NULL;

--- 130 unchanged lines hidden (view full) ---

774 req_app_link->req_rpc_data = NULL;
775
776 ra_use_delta = 0;
777
778 nxt_thread_mutex_lock(&req_rpc_data->app->mutex);
779
780 if (req_app_link->link_app_requests.next == NULL
781 && req_app_link->link_port_pending.next == NULL
782 && req_app_link->link_app_pending.next == NULL
783 && req_app_link->link_port_websockets.next == NULL)
784 {
785 req_app_link = NULL;
786
787 } else {
788 ra_use_delta -=
789 nxt_queue_chk_remove(&req_app_link->link_app_requests)
790 + nxt_queue_chk_remove(&req_app_link->link_port_pending)
791 + nxt_queue_chk_remove(&req_app_link->link_port_websockets);
792
793 nxt_queue_chk_remove(&req_app_link->link_app_pending);
794 }
795
796 nxt_thread_mutex_unlock(&req_rpc_data->app->mutex);
797
798 if (req_app_link != NULL) {
799 nxt_request_app_link_use(task, req_app_link, ra_use_delta);

--- 6 unchanged lines hidden (view full) ---

806 req_rpc_data->app = NULL;
807 }
808
809 if (req_rpc_data->request != NULL) {
810 req_rpc_data->request->timer_data = NULL;
811
812 nxt_router_http_request_done(task, req_rpc_data->request);
813
814 req_rpc_data->request->req_rpc_data = NULL;
815 req_rpc_data->request = NULL;
816 }
817}
818
819
820void
821nxt_router_new_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
822{

--- 533 unchanged lines hidden (view full) ---

1356 {
1357 nxt_string("send_timeout"),
1358 NXT_CONF_MAP_MSEC,
1359 offsetof(nxt_socket_conf_t, send_timeout),
1360 },
1361};
1362
1363
1364static nxt_conf_map_t nxt_router_websocket_conf[] = {
1365 {
1366 nxt_string("max_frame_size"),
1367 NXT_CONF_MAP_SIZE,
1368 offsetof(nxt_websocket_conf_t, max_frame_size),
1369 },
1370
1371 {
1372 nxt_string("read_timeout"),
1373 NXT_CONF_MAP_MSEC,
1374 offsetof(nxt_websocket_conf_t, read_timeout),
1375 },
1376
1377 {
1378 nxt_string("keepalive_interval"),
1379 NXT_CONF_MAP_MSEC,
1380 offsetof(nxt_websocket_conf_t, keepalive_interval),
1381 },
1382
1383};
1384
1385
1386static nxt_int_t
1387nxt_router_conf_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf,
1388 u_char *start, u_char *end)
1389{
1390 u_char *p;
1391 size_t size;
1392 nxt_mp_t *mp;
1393 uint32_t next;
1394 nxt_int_t ret;
1395 nxt_str_t name, path;
1396 nxt_app_t *app, *prev;
1397 nxt_router_t *router;
1398 nxt_app_joint_t *app_joint;
1399 nxt_conf_value_t *conf, *http, *value, *websocket;
1400 nxt_conf_value_t *applications, *application;
1401 nxt_conf_value_t *listeners, *listener;
1402 nxt_conf_value_t *routes_conf;
1403 nxt_socket_conf_t *skcf;
1404 nxt_http_routes_t *routes;
1405 nxt_event_engine_t *engine;
1406 nxt_app_lang_module_t *lang;
1407 nxt_router_app_conf_t apcf;

--- 6 unchanged lines hidden (view full) ---

1414 static nxt_str_t http_path = nxt_string("/settings/http");
1415 static nxt_str_t applications_path = nxt_string("/applications");
1416 static nxt_str_t listeners_path = nxt_string("/listeners");
1417 static nxt_str_t routes_path = nxt_string("/routes");
1418 static nxt_str_t access_log_path = nxt_string("/access_log");
1419#if (NXT_TLS)
1420 static nxt_str_t certificate_path = nxt_string("/tls/certificate");
1421#endif
1422 static nxt_str_t websocket_path = nxt_string("/settings/http/websocket");
1423
1424 conf = nxt_conf_json_parse(tmcf->mem_pool, start, end, NULL);
1425 if (conf == NULL) {
1426 nxt_alert(task, "configuration parsing error");
1427 return NXT_ERROR;
1428 }
1429
1430 mp = tmcf->router_conf->mem_pool;

--- 194 unchanged lines hidden (view full) ---

1625 http = nxt_conf_get_path(conf, &http_path);
1626#if 0
1627 if (http == NULL) {
1628 nxt_alert(task, "no \"http\" block");
1629 return NXT_ERROR;
1630 }
1631#endif
1632
1633 websocket = nxt_conf_get_path(conf, &websocket_path);
1634
1635 listeners = nxt_conf_get_path(conf, &listeners_path);
1636
1637 if (listeners != NULL) {
1638 next = 0;
1639
1640 for ( ;; ) {
1641 listener = nxt_conf_next_object_member(listeners, &name, &next);
1642 if (listener == NULL) {

--- 23 unchanged lines hidden (view full) ---

1666 skcf->large_header_buffers = 4;
1667 skcf->body_buffer_size = 16 * 1024;
1668 skcf->max_body_size = 8 * 1024 * 1024;
1669 skcf->idle_timeout = 180 * 1000;
1670 skcf->header_read_timeout = 30 * 1000;
1671 skcf->body_read_timeout = 30 * 1000;
1672 skcf->send_timeout = 30 * 1000;
1673
1674 skcf->websocket_conf.max_frame_size = 1024 * 1024;
1675 skcf->websocket_conf.read_timeout = 60 * 1000;
1676 skcf->websocket_conf.keepalive_interval = 30 * 1000;
1677
1678 if (http != NULL) {
1679 ret = nxt_conf_map_object(mp, http, nxt_router_http_conf,
1680 nxt_nitems(nxt_router_http_conf),
1681 skcf);
1682 if (ret != NXT_OK) {
1683 nxt_alert(task, "http map error");
1684 goto fail;
1685 }
1686 }
1687
1688 if (websocket != NULL) {
1689 ret = nxt_conf_map_object(mp, websocket,
1690 nxt_router_websocket_conf,
1691 nxt_nitems(nxt_router_websocket_conf),
1692 &skcf->websocket_conf);
1693 if (ret != NXT_OK) {
1694 nxt_alert(task, "websocket map error");
1695 goto fail;
1696 }
1697 }
1698
1699#if (NXT_TLS)
1700 value = nxt_conf_get_path(listener, &certificate_path);
1701
1702 if (value != NULL) {
1703 nxt_conf_get_string(value, &name);
1704
1705 tls = nxt_mp_get(mp, sizeof(nxt_router_tlssock_t));
1706 if (nxt_slow_path(tls == NULL)) {

--- 1695 unchanged lines hidden (view full) ---

3402
3403
3404static void
3405nxt_router_response_ready_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg,
3406 void *data)
3407{
3408 nxt_int_t ret;
3409 nxt_buf_t *b;
3410 nxt_port_t *app_port;
3411 nxt_unit_field_t *f;
3412 nxt_http_field_t *field;
3413 nxt_http_request_t *r;
3414 nxt_unit_response_t *resp;
3415 nxt_request_app_link_t *req_app_link;
3416 nxt_request_rpc_data_t *req_rpc_data;
3417
3418 b = msg->buf;
3419 req_rpc_data = data;
3420
3421 if (msg->size == 0) {
3422 b = NULL;
3423 }

--- 104 unchanged lines hidden (view full) ---

3528 }
3529
3530 if (b != NULL) {
3531 nxt_buf_chain_add(&r->out, b);
3532 }
3533
3534 nxt_http_request_header_send(task, r);
3535
3536 if (r->websocket_handshake
3537 && r->status == NXT_HTTP_SWITCHING_PROTOCOLS)
3538 {
3539 req_app_link = nxt_request_app_link_alloc(task,
3540 req_rpc_data->req_app_link,
3541 req_rpc_data);
3542 if (nxt_slow_path(req_app_link == NULL)) {
3543 goto fail;
3544 }
3545
3546 app_port = req_app_link->app_port;
3547
3548 if (app_port == NULL && req_rpc_data->app_port != NULL) {
3549 req_app_link->app_port = req_rpc_data->app_port;
3550 app_port = req_app_link->app_port;
3551 req_app_link->apr_action = req_rpc_data->apr_action;
3552
3553 req_rpc_data->app_port = NULL;
3554 }
3555
3556 if (nxt_slow_path(app_port == NULL)) {
3557 goto fail;
3558 }
3559
3560 nxt_thread_mutex_lock(&req_rpc_data->app->mutex);
3561
3562 nxt_queue_insert_tail(&app_port->active_websockets,
3563 &req_app_link->link_port_websockets);
3564
3565 nxt_thread_mutex_unlock(&req_rpc_data->app->mutex);
3566
3567 nxt_router_app_port_release(task, app_port, NXT_APR_UPGRADE);
3568 req_app_link->apr_action = NXT_APR_CLOSE;
3569
3570 nxt_debug(task, "req_app_link stream #%uD upgrade",
3571 req_app_link->stream);
3572
3573 r->state = &nxt_http_websocket;
3574
3575 } else {
3576 r->state = &nxt_http_request_send_state;
3577 }
3578
3579 if (r->out) {
3580 nxt_work_queue_add(&task->thread->engine->fast_work_queue,
3581 nxt_http_request_send_body, task, r, NULL);
3582 }
3583 }
3584
3585 return;
3586

--- 364 unchanged lines hidden (view full) ---

3951 dec_pending = 1;
3952 inc_use = -1;
3953 break;
3954 case NXT_APR_GOT_RESPONSE:
3955 dec_pending = 1;
3956 got_response = 1;
3957 inc_use = -1;
3958 break;
3959 case NXT_APR_UPGRADE:
3960 dec_pending = 1;
3961 got_response = 1;
3962 break;
3963 case NXT_APR_CLOSE:
3964 inc_use = -1;
3965 break;
3966 }
3967
3968 nxt_thread_mutex_lock(&app->mutex);
3969
3970 port->app_pending_responses -= dec_pending;

--- 106 unchanged lines hidden (view full) ---

4077 app->processes--;
4078
4079 } else {
4080 port_unchained = 0;
4081 }
4082
4083 adjust_idle_timer = 0;
4084
4085 if (port->pair[1] != -1 && !send_quit && port->app_pending_responses == 0
4086 && nxt_queue_is_empty(&port->active_websockets)
4087 && port->idle_link.next == NULL)
4088 {
4089 if (app->idle_processes == app->spare_processes
4090 && app->adjust_idle_work.data == NULL)
4091 {
4092 adjust_idle_timer = 1;
4093 app->adjust_idle_work.data = app;
4094 app->adjust_idle_work.next = NULL;
4095 }
4096

--- 480 unchanged lines hidden (view full) ---

4577 }
4578
4579 req_rpc_data->stream = nxt_port_rpc_ex_stream(req_rpc_data);
4580 req_rpc_data->app = app;
4581
4582 nxt_router_app_use(task, app, 1);
4583
4584 req_rpc_data->request = r;
4585 r->req_rpc_data = req_rpc_data;
4586
4587 req_app_link = &ra_local;
4588 nxt_request_app_link_init(task, req_app_link, req_rpc_data);
4589
4590 res = nxt_router_app_port(task, app, req_app_link);
4591
4592 if (res != NXT_OK) {
4593 return;

--- 74 unchanged lines hidden (view full) ---

4668 &req_app_link->msg_info.tracking,
4669 req_app_link->stream);
4670 if (nxt_slow_path(res != NXT_OK)) {
4671 nxt_request_app_link_error(req_app_link, 500,
4672 "Failed to get tracking area");
4673 goto release_port;
4674 }
4675
4676 res = nxt_port_socket_twrite(task, port, NXT_PORT_MSG_REQ_HEADERS,
4677 -1, req_app_link->stream, reply_port->id, buf,
4678 &req_app_link->msg_info.tracking);
4679
4680 if (nxt_slow_path(res != NXT_OK)) {
4681 nxt_request_app_link_error(req_app_link, 500,
4682 "Failed to send message to application");
4683 goto release_port;
4684 }

--- 133 unchanged lines hidden (view full) ---

4818 *p++ = '\0';
4819
4820 req->local_length = r->local->address_length;
4821 nxt_unit_sptr_set(&req->local, p);
4822 p = nxt_cpymem(p, nxt_sockaddr_address(r->local), r->local->address_length);
4823 *p++ = '\0';
4824
4825 req->tls = (r->tls != NULL);
4826 req->websocket_handshake = r->websocket_handshake;
4827
4828 req->server_name_length = r->server_name.length;
4829 nxt_unit_sptr_set(&req->server_name, p);
4830 p = nxt_cpymem(p, r->server_name.start, r->server_name.length);
4831 *p++ = '\0';
4832
4833 target_pos = p;
4834 req->target_length = (uint32_t) r->target.length;

--- 296 unchanged lines hidden ---