nxt_router.c (1127:e60606d72d6b) nxt_router.c (1131:ec7d924d8dfb)
1
2/*
3 * Copyright (C) Igor Sysoev
4 * Copyright (C) Valentin V. Bartenev
5 * Copyright (C) NGINX, Inc.
6 */
7
8#include <nxt_router.h>
9#include <nxt_conf.h>
10#if (NXT_TLS)
11#include <nxt_cert.h>
12#endif
13#include <nxt_http.h>
14#include <nxt_port_memory_int.h>
15#include <nxt_unit_request.h>
16#include <nxt_unit_response.h>
1
2/*
3 * Copyright (C) Igor Sysoev
4 * Copyright (C) Valentin V. Bartenev
5 * Copyright (C) NGINX, Inc.
6 */
7
8#include <nxt_router.h>
9#include <nxt_conf.h>
10#if (NXT_TLS)
11#include <nxt_cert.h>
12#endif
13#include <nxt_http.h>
14#include <nxt_port_memory_int.h>
15#include <nxt_unit_request.h>
16#include <nxt_unit_response.h>
17#include <nxt_router_request.h>
17
18
18
19typedef struct {
20 nxt_str_t type;
21 uint32_t processes;
22 uint32_t max_processes;
23 uint32_t spare_processes;
24 nxt_msec_t timeout;
25 nxt_msec_t res_timeout;
26 nxt_msec_t idle_timeout;

--- 16 unchanged lines hidden (view full) ---

43 nxt_socket_conf_t *conf;
44
45 nxt_queue_link_t link; /* for nxt_socket_conf_t.tls */
46} nxt_router_tlssock_t;
47
48#endif
49
50
19typedef struct {
20 nxt_str_t type;
21 uint32_t processes;
22 uint32_t max_processes;
23 uint32_t spare_processes;
24 nxt_msec_t timeout;
25 nxt_msec_t res_timeout;
26 nxt_msec_t idle_timeout;

--- 16 unchanged lines hidden (view full) ---

43 nxt_socket_conf_t *conf;
44
45 nxt_queue_link_t link; /* for nxt_socket_conf_t.tls */
46} nxt_router_tlssock_t;
47
48#endif
49
50
51typedef struct nxt_msg_info_s {
52 nxt_buf_t *buf;
53 nxt_port_mmap_tracking_t tracking;
54 nxt_work_handler_t completion_handler;
55} nxt_msg_info_t;
56
57
58typedef struct nxt_request_app_link_s nxt_request_app_link_t;
59
60
61typedef enum {
62 NXT_APR_NEW_PORT,
63 NXT_APR_REQUEST_FAILED,
64 NXT_APR_GOT_RESPONSE,
65 NXT_APR_CLOSE,
66} nxt_apr_action_t;
67
68
69typedef struct {
51typedef struct {
70 uint32_t stream;
71 nxt_app_t *app;
72
73 nxt_port_t *app_port;
74 nxt_apr_action_t apr_action;
75
76 nxt_http_request_t *request;
77 nxt_msg_info_t msg_info;
78 nxt_request_app_link_t *req_app_link;
79} nxt_request_rpc_data_t;
80
81
82struct nxt_request_app_link_s {
83 uint32_t stream;
84 nxt_atomic_t use_count;
85
86 nxt_port_t *app_port;
87 nxt_apr_action_t apr_action;
88
89 nxt_port_t *reply_port;
90 nxt_http_request_t *request;
91 nxt_msg_info_t msg_info;
92 nxt_request_rpc_data_t *req_rpc_data;
93
94 nxt_nsec_t res_time;
95
96 nxt_queue_link_t link_app_requests; /* for nxt_app_t.requests */
97 /* for nxt_port_t.pending_requests */
98 nxt_queue_link_t link_port_pending;
99 nxt_queue_link_t link_app_pending; /* for nxt_app_t.pending */
100
101 nxt_mp_t *mem_pool;
102 nxt_work_t work;
103
104 int err_code;
105 const char *err_str;
106};
107
108
109typedef struct {
110 nxt_socket_conf_t *socket_conf;
111 nxt_router_temp_conf_t *temp_conf;
112} nxt_socket_rpc_t;
113
114
115typedef struct {
116 nxt_app_t *app;
117 nxt_router_temp_conf_t *temp_conf;

--- 182 unchanged lines hidden (view full) ---

300static void nxt_router_app_joint_use(nxt_task_t *task,
301 nxt_app_joint_t *app_joint, int i);
302
303static nxt_int_t nxt_router_http_request_done(nxt_task_t *task,
304 nxt_http_request_t *r);
305static void nxt_router_http_request_release(nxt_task_t *task, void *obj,
306 void *data);
307
52 nxt_socket_conf_t *socket_conf;
53 nxt_router_temp_conf_t *temp_conf;
54} nxt_socket_rpc_t;
55
56
57typedef struct {
58 nxt_app_t *app;
59 nxt_router_temp_conf_t *temp_conf;

--- 182 unchanged lines hidden (view full) ---

242static void nxt_router_app_joint_use(nxt_task_t *task,
243 nxt_app_joint_t *app_joint, int i);
244
245static nxt_int_t nxt_router_http_request_done(nxt_task_t *task,
246 nxt_http_request_t *r);
247static void nxt_router_http_request_release(nxt_task_t *task, void *obj,
248 void *data);
249
250const nxt_http_request_state_t nxt_http_websocket;
251
308static nxt_router_t *nxt_router;
309
310static const nxt_str_t http_prefix = nxt_string("HTTP_");
311static const nxt_str_t empty_prefix = nxt_string("");
312
313static const nxt_str_t *nxt_app_msg_prefix[] = {
314 &empty_prefix,
315 &http_prefix,

--- 342 unchanged lines hidden (view full) ---

658}
659
660
661static void
662nxt_request_app_link_release(nxt_task_t *task,
663 nxt_request_app_link_t *req_app_link)
664{
665 nxt_mp_t *mp;
252static nxt_router_t *nxt_router;
253
254static const nxt_str_t http_prefix = nxt_string("HTTP_");
255static const nxt_str_t empty_prefix = nxt_string("");
256
257static const nxt_str_t *nxt_app_msg_prefix[] = {
258 &empty_prefix,
259 &http_prefix,

--- 342 unchanged lines hidden (view full) ---

602}
603
604
605static void
606nxt_request_app_link_release(nxt_task_t *task,
607 nxt_request_app_link_t *req_app_link)
608{
609 nxt_mp_t *mp;
610 nxt_http_request_t *r;
666 nxt_request_rpc_data_t *req_rpc_data;
667
668 nxt_assert(task->thread->engine == req_app_link->work.data);
669 nxt_assert(req_app_link->use_count == 0);
670
671 nxt_debug(task, "req_app_link stream #%uD release", req_app_link->stream);
672
673 req_rpc_data = req_app_link->req_rpc_data;

--- 4 unchanged lines hidden (view full) ---

678 req_app_link->err_code);
679
680 } else {
681 req_rpc_data->app_port = req_app_link->app_port;
682 req_rpc_data->apr_action = req_app_link->apr_action;
683 req_rpc_data->msg_info = req_app_link->msg_info;
684
685 if (req_rpc_data->app->timeout != 0) {
611 nxt_request_rpc_data_t *req_rpc_data;
612
613 nxt_assert(task->thread->engine == req_app_link->work.data);
614 nxt_assert(req_app_link->use_count == 0);
615
616 nxt_debug(task, "req_app_link stream #%uD release", req_app_link->stream);
617
618 req_rpc_data = req_app_link->req_rpc_data;

--- 4 unchanged lines hidden (view full) ---

623 req_app_link->err_code);
624
625 } else {
626 req_rpc_data->app_port = req_app_link->app_port;
627 req_rpc_data->apr_action = req_app_link->apr_action;
628 req_rpc_data->msg_info = req_app_link->msg_info;
629
630 if (req_rpc_data->app->timeout != 0) {
686 req_rpc_data->request->timer.handler = nxt_router_app_timeout;
687 req_rpc_data->request->timer_data = req_rpc_data;
688 nxt_timer_add(task->thread->engine,
689 &req_rpc_data->request->timer,
631 r = req_rpc_data->request;
632
633 r->timer.handler = nxt_router_app_timeout;
634 r->timer_data = req_rpc_data;
635 nxt_timer_add(task->thread->engine, &r->timer,
690 req_rpc_data->app->timeout);
691 }
692
693 req_app_link->app_port = NULL;
694 req_app_link->msg_info.buf = NULL;
695 }
696
697 req_rpc_data->req_app_link = NULL;

--- 130 unchanged lines hidden (view full) ---

828 req_app_link->req_rpc_data = NULL;
829
830 ra_use_delta = 0;
831
832 nxt_thread_mutex_lock(&req_rpc_data->app->mutex);
833
834 if (req_app_link->link_app_requests.next == NULL
835 && req_app_link->link_port_pending.next == NULL
636 req_rpc_data->app->timeout);
637 }
638
639 req_app_link->app_port = NULL;
640 req_app_link->msg_info.buf = NULL;
641 }
642
643 req_rpc_data->req_app_link = NULL;

--- 130 unchanged lines hidden (view full) ---

774 req_app_link->req_rpc_data = NULL;
775
776 ra_use_delta = 0;
777
778 nxt_thread_mutex_lock(&req_rpc_data->app->mutex);
779
780 if (req_app_link->link_app_requests.next == NULL
781 && req_app_link->link_port_pending.next == NULL
836 && req_app_link->link_app_pending.next == NULL)
782 && req_app_link->link_app_pending.next == NULL
783 && req_app_link->link_port_websockets.next == NULL)
837 {
838 req_app_link = NULL;
839
840 } else {
841 ra_use_delta -=
842 nxt_queue_chk_remove(&req_app_link->link_app_requests)
784 {
785 req_app_link = NULL;
786
787 } else {
788 ra_use_delta -=
789 nxt_queue_chk_remove(&req_app_link->link_app_requests)
843 + nxt_queue_chk_remove(&req_app_link->link_port_pending);
790 + nxt_queue_chk_remove(&req_app_link->link_port_pending)
791 + nxt_queue_chk_remove(&req_app_link->link_port_websockets);
844
845 nxt_queue_chk_remove(&req_app_link->link_app_pending);
846 }
847
848 nxt_thread_mutex_unlock(&req_rpc_data->app->mutex);
849
850 if (req_app_link != NULL) {
851 nxt_request_app_link_use(task, req_app_link, ra_use_delta);

--- 6 unchanged lines hidden (view full) ---

858 req_rpc_data->app = NULL;
859 }
860
861 if (req_rpc_data->request != NULL) {
862 req_rpc_data->request->timer_data = NULL;
863
864 nxt_router_http_request_done(task, req_rpc_data->request);
865
792
793 nxt_queue_chk_remove(&req_app_link->link_app_pending);
794 }
795
796 nxt_thread_mutex_unlock(&req_rpc_data->app->mutex);
797
798 if (req_app_link != NULL) {
799 nxt_request_app_link_use(task, req_app_link, ra_use_delta);

--- 6 unchanged lines hidden (view full) ---

806 req_rpc_data->app = NULL;
807 }
808
809 if (req_rpc_data->request != NULL) {
810 req_rpc_data->request->timer_data = NULL;
811
812 nxt_router_http_request_done(task, req_rpc_data->request);
813
814 req_rpc_data->request->req_rpc_data = NULL;
866 req_rpc_data->request = NULL;
867 }
868}
869
870
871void
872nxt_router_new_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
873{

--- 533 unchanged lines hidden (view full) ---

1407 {
1408 nxt_string("send_timeout"),
1409 NXT_CONF_MAP_MSEC,
1410 offsetof(nxt_socket_conf_t, send_timeout),
1411 },
1412};
1413
1414
815 req_rpc_data->request = NULL;
816 }
817}
818
819
820void
821nxt_router_new_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
822{

--- 533 unchanged lines hidden (view full) ---

1356 {
1357 nxt_string("send_timeout"),
1358 NXT_CONF_MAP_MSEC,
1359 offsetof(nxt_socket_conf_t, send_timeout),
1360 },
1361};
1362
1363
1364static nxt_conf_map_t nxt_router_websocket_conf[] = {
1365 {
1366 nxt_string("max_frame_size"),
1367 NXT_CONF_MAP_SIZE,
1368 offsetof(nxt_websocket_conf_t, max_frame_size),
1369 },
1370
1371 {
1372 nxt_string("read_timeout"),
1373 NXT_CONF_MAP_MSEC,
1374 offsetof(nxt_websocket_conf_t, read_timeout),
1375 },
1376
1377 {
1378 nxt_string("keepalive_interval"),
1379 NXT_CONF_MAP_MSEC,
1380 offsetof(nxt_websocket_conf_t, keepalive_interval),
1381 },
1382
1383};
1384
1385
1415static nxt_int_t
1416nxt_router_conf_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf,
1417 u_char *start, u_char *end)
1418{
1419 u_char *p;
1420 size_t size;
1421 nxt_mp_t *mp;
1422 uint32_t next;
1423 nxt_int_t ret;
1424 nxt_str_t name, path;
1425 nxt_app_t *app, *prev;
1426 nxt_router_t *router;
1427 nxt_app_joint_t *app_joint;
1386static nxt_int_t
1387nxt_router_conf_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf,
1388 u_char *start, u_char *end)
1389{
1390 u_char *p;
1391 size_t size;
1392 nxt_mp_t *mp;
1393 uint32_t next;
1394 nxt_int_t ret;
1395 nxt_str_t name, path;
1396 nxt_app_t *app, *prev;
1397 nxt_router_t *router;
1398 nxt_app_joint_t *app_joint;
1428 nxt_conf_value_t *conf, *http, *value;
1399 nxt_conf_value_t *conf, *http, *value, *websocket;
1429 nxt_conf_value_t *applications, *application;
1430 nxt_conf_value_t *listeners, *listener;
1431 nxt_conf_value_t *routes_conf;
1432 nxt_socket_conf_t *skcf;
1433 nxt_http_routes_t *routes;
1434 nxt_event_engine_t *engine;
1435 nxt_app_lang_module_t *lang;
1436 nxt_router_app_conf_t apcf;

--- 6 unchanged lines hidden (view full) ---

1443 static nxt_str_t http_path = nxt_string("/settings/http");
1444 static nxt_str_t applications_path = nxt_string("/applications");
1445 static nxt_str_t listeners_path = nxt_string("/listeners");
1446 static nxt_str_t routes_path = nxt_string("/routes");
1447 static nxt_str_t access_log_path = nxt_string("/access_log");
1448#if (NXT_TLS)
1449 static nxt_str_t certificate_path = nxt_string("/tls/certificate");
1450#endif
1400 nxt_conf_value_t *applications, *application;
1401 nxt_conf_value_t *listeners, *listener;
1402 nxt_conf_value_t *routes_conf;
1403 nxt_socket_conf_t *skcf;
1404 nxt_http_routes_t *routes;
1405 nxt_event_engine_t *engine;
1406 nxt_app_lang_module_t *lang;
1407 nxt_router_app_conf_t apcf;

--- 6 unchanged lines hidden (view full) ---

1414 static nxt_str_t http_path = nxt_string("/settings/http");
1415 static nxt_str_t applications_path = nxt_string("/applications");
1416 static nxt_str_t listeners_path = nxt_string("/listeners");
1417 static nxt_str_t routes_path = nxt_string("/routes");
1418 static nxt_str_t access_log_path = nxt_string("/access_log");
1419#if (NXT_TLS)
1420 static nxt_str_t certificate_path = nxt_string("/tls/certificate");
1421#endif
1422 static nxt_str_t websocket_path = nxt_string("/settings/http/websocket");
1451
1452 conf = nxt_conf_json_parse(tmcf->mem_pool, start, end, NULL);
1453 if (conf == NULL) {
1454 nxt_alert(task, "configuration parsing error");
1455 return NXT_ERROR;
1456 }
1457
1458 mp = tmcf->router_conf->mem_pool;

--- 194 unchanged lines hidden (view full) ---

1653 http = nxt_conf_get_path(conf, &http_path);
1654#if 0
1655 if (http == NULL) {
1656 nxt_alert(task, "no \"http\" block");
1657 return NXT_ERROR;
1658 }
1659#endif
1660
1423
1424 conf = nxt_conf_json_parse(tmcf->mem_pool, start, end, NULL);
1425 if (conf == NULL) {
1426 nxt_alert(task, "configuration parsing error");
1427 return NXT_ERROR;
1428 }
1429
1430 mp = tmcf->router_conf->mem_pool;

--- 194 unchanged lines hidden (view full) ---

1625 http = nxt_conf_get_path(conf, &http_path);
1626#if 0
1627 if (http == NULL) {
1628 nxt_alert(task, "no \"http\" block");
1629 return NXT_ERROR;
1630 }
1631#endif
1632
1633 websocket = nxt_conf_get_path(conf, &websocket_path);
1634
1661 listeners = nxt_conf_get_path(conf, &listeners_path);
1662
1663 if (listeners != NULL) {
1664 next = 0;
1665
1666 for ( ;; ) {
1667 listener = nxt_conf_next_object_member(listeners, &name, &next);
1668 if (listener == NULL) {

--- 23 unchanged lines hidden (view full) ---

1692 skcf->large_header_buffers = 4;
1693 skcf->body_buffer_size = 16 * 1024;
1694 skcf->max_body_size = 8 * 1024 * 1024;
1695 skcf->idle_timeout = 180 * 1000;
1696 skcf->header_read_timeout = 30 * 1000;
1697 skcf->body_read_timeout = 30 * 1000;
1698 skcf->send_timeout = 30 * 1000;
1699
1635 listeners = nxt_conf_get_path(conf, &listeners_path);
1636
1637 if (listeners != NULL) {
1638 next = 0;
1639
1640 for ( ;; ) {
1641 listener = nxt_conf_next_object_member(listeners, &name, &next);
1642 if (listener == NULL) {

--- 23 unchanged lines hidden (view full) ---

1666 skcf->large_header_buffers = 4;
1667 skcf->body_buffer_size = 16 * 1024;
1668 skcf->max_body_size = 8 * 1024 * 1024;
1669 skcf->idle_timeout = 180 * 1000;
1670 skcf->header_read_timeout = 30 * 1000;
1671 skcf->body_read_timeout = 30 * 1000;
1672 skcf->send_timeout = 30 * 1000;
1673
1674 skcf->websocket_conf.max_frame_size = 1024 * 1024;
1675 skcf->websocket_conf.read_timeout = 60 * 1000;
1676 skcf->websocket_conf.keepalive_interval = 30 * 1000;
1677
1700 if (http != NULL) {
1701 ret = nxt_conf_map_object(mp, http, nxt_router_http_conf,
1702 nxt_nitems(nxt_router_http_conf),
1703 skcf);
1704 if (ret != NXT_OK) {
1705 nxt_alert(task, "http map error");
1706 goto fail;
1707 }
1708 }
1709
1678 if (http != NULL) {
1679 ret = nxt_conf_map_object(mp, http, nxt_router_http_conf,
1680 nxt_nitems(nxt_router_http_conf),
1681 skcf);
1682 if (ret != NXT_OK) {
1683 nxt_alert(task, "http map error");
1684 goto fail;
1685 }
1686 }
1687
1688 if (websocket != NULL) {
1689 ret = nxt_conf_map_object(mp, websocket,
1690 nxt_router_websocket_conf,
1691 nxt_nitems(nxt_router_websocket_conf),
1692 &skcf->websocket_conf);
1693 if (ret != NXT_OK) {
1694 nxt_alert(task, "websocket map error");
1695 goto fail;
1696 }
1697 }
1698
1710#if (NXT_TLS)
1711 value = nxt_conf_get_path(listener, &certificate_path);
1712
1713 if (value != NULL) {
1714 nxt_conf_get_string(value, &name);
1715
1716 tls = nxt_mp_get(mp, sizeof(nxt_router_tlssock_t));
1717 if (nxt_slow_path(tls == NULL)) {

--- 1695 unchanged lines hidden (view full) ---

3413
3414
3415static void
3416nxt_router_response_ready_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg,
3417 void *data)
3418{
3419 nxt_int_t ret;
3420 nxt_buf_t *b;
1699#if (NXT_TLS)
1700 value = nxt_conf_get_path(listener, &certificate_path);
1701
1702 if (value != NULL) {
1703 nxt_conf_get_string(value, &name);
1704
1705 tls = nxt_mp_get(mp, sizeof(nxt_router_tlssock_t));
1706 if (nxt_slow_path(tls == NULL)) {

--- 1695 unchanged lines hidden (view full) ---

3402
3403
3404static void
3405nxt_router_response_ready_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg,
3406 void *data)
3407{
3408 nxt_int_t ret;
3409 nxt_buf_t *b;
3410 nxt_port_t *app_port;
3421 nxt_unit_field_t *f;
3422 nxt_http_field_t *field;
3423 nxt_http_request_t *r;
3424 nxt_unit_response_t *resp;
3411 nxt_unit_field_t *f;
3412 nxt_http_field_t *field;
3413 nxt_http_request_t *r;
3414 nxt_unit_response_t *resp;
3415 nxt_request_app_link_t *req_app_link;
3425 nxt_request_rpc_data_t *req_rpc_data;
3426
3427 b = msg->buf;
3428 req_rpc_data = data;
3429
3430 if (msg->size == 0) {
3431 b = NULL;
3432 }

--- 104 unchanged lines hidden (view full) ---

3537 }
3538
3539 if (b != NULL) {
3540 nxt_buf_chain_add(&r->out, b);
3541 }
3542
3543 nxt_http_request_header_send(task, r);
3544
3416 nxt_request_rpc_data_t *req_rpc_data;
3417
3418 b = msg->buf;
3419 req_rpc_data = data;
3420
3421 if (msg->size == 0) {
3422 b = NULL;
3423 }

--- 104 unchanged lines hidden (view full) ---

3528 }
3529
3530 if (b != NULL) {
3531 nxt_buf_chain_add(&r->out, b);
3532 }
3533
3534 nxt_http_request_header_send(task, r);
3535
3545 r->state = &nxt_http_request_send_state;
3536 if (r->websocket_handshake
3537 && r->status == NXT_HTTP_SWITCHING_PROTOCOLS)
3538 {
3539 req_app_link = nxt_request_app_link_alloc(task,
3540 req_rpc_data->req_app_link,
3541 req_rpc_data);
3542 if (nxt_slow_path(req_app_link == NULL)) {
3543 goto fail;
3544 }
3546
3545
3546 app_port = req_app_link->app_port;
3547
3548 if (app_port == NULL && req_rpc_data->app_port != NULL) {
3549 req_app_link->app_port = req_rpc_data->app_port;
3550 app_port = req_app_link->app_port;
3551 req_app_link->apr_action = req_rpc_data->apr_action;
3552
3553 req_rpc_data->app_port = NULL;
3554 }
3555
3556 if (nxt_slow_path(app_port == NULL)) {
3557 goto fail;
3558 }
3559
3560 nxt_thread_mutex_lock(&req_rpc_data->app->mutex);
3561
3562 nxt_queue_insert_tail(&app_port->active_websockets,
3563 &req_app_link->link_port_websockets);
3564
3565 nxt_thread_mutex_unlock(&req_rpc_data->app->mutex);
3566
3567 nxt_router_app_port_release(task, app_port, NXT_APR_UPGRADE);
3568 req_app_link->apr_action = NXT_APR_CLOSE;
3569
3570 nxt_debug(task, "req_app_link stream #%uD upgrade",
3571 req_app_link->stream);
3572
3573 r->state = &nxt_http_websocket;
3574
3575 } else {
3576 r->state = &nxt_http_request_send_state;
3577 }
3578
3547 if (r->out) {
3548 nxt_work_queue_add(&task->thread->engine->fast_work_queue,
3549 nxt_http_request_send_body, task, r, NULL);
3550 }
3551 }
3552
3553 return;
3554

--- 364 unchanged lines hidden (view full) ---

3919 dec_pending = 1;
3920 inc_use = -1;
3921 break;
3922 case NXT_APR_GOT_RESPONSE:
3923 dec_pending = 1;
3924 got_response = 1;
3925 inc_use = -1;
3926 break;
3579 if (r->out) {
3580 nxt_work_queue_add(&task->thread->engine->fast_work_queue,
3581 nxt_http_request_send_body, task, r, NULL);
3582 }
3583 }
3584
3585 return;
3586

--- 364 unchanged lines hidden (view full) ---

3951 dec_pending = 1;
3952 inc_use = -1;
3953 break;
3954 case NXT_APR_GOT_RESPONSE:
3955 dec_pending = 1;
3956 got_response = 1;
3957 inc_use = -1;
3958 break;
3959 case NXT_APR_UPGRADE:
3960 dec_pending = 1;
3961 got_response = 1;
3962 break;
3927 case NXT_APR_CLOSE:
3928 inc_use = -1;
3929 break;
3930 }
3931
3932 nxt_thread_mutex_lock(&app->mutex);
3933
3934 port->app_pending_responses -= dec_pending;

--- 106 unchanged lines hidden (view full) ---

4041 app->processes--;
4042
4043 } else {
4044 port_unchained = 0;
4045 }
4046
4047 adjust_idle_timer = 0;
4048
3963 case NXT_APR_CLOSE:
3964 inc_use = -1;
3965 break;
3966 }
3967
3968 nxt_thread_mutex_lock(&app->mutex);
3969
3970 port->app_pending_responses -= dec_pending;

--- 106 unchanged lines hidden (view full) ---

4077 app->processes--;
4078
4079 } else {
4080 port_unchained = 0;
4081 }
4082
4083 adjust_idle_timer = 0;
4084
4049 if (port->pair[1] != -1 && !send_quit && port->app_pending_responses == 0) {
4050 nxt_assert(port->idle_link.next == NULL);
4051
4085 if (port->pair[1] != -1 && !send_quit && port->app_pending_responses == 0
4086 && nxt_queue_is_empty(&port->active_websockets)
4087 && port->idle_link.next == NULL)
4088 {
4052 if (app->idle_processes == app->spare_processes
4053 && app->adjust_idle_work.data == NULL)
4054 {
4055 adjust_idle_timer = 1;
4056 app->adjust_idle_work.data = app;
4057 app->adjust_idle_work.next = NULL;
4058 }
4059

--- 480 unchanged lines hidden (view full) ---

4540 }
4541
4542 req_rpc_data->stream = nxt_port_rpc_ex_stream(req_rpc_data);
4543 req_rpc_data->app = app;
4544
4545 nxt_router_app_use(task, app, 1);
4546
4547 req_rpc_data->request = r;
4089 if (app->idle_processes == app->spare_processes
4090 && app->adjust_idle_work.data == NULL)
4091 {
4092 adjust_idle_timer = 1;
4093 app->adjust_idle_work.data = app;
4094 app->adjust_idle_work.next = NULL;
4095 }
4096

--- 480 unchanged lines hidden (view full) ---

4577 }
4578
4579 req_rpc_data->stream = nxt_port_rpc_ex_stream(req_rpc_data);
4580 req_rpc_data->app = app;
4581
4582 nxt_router_app_use(task, app, 1);
4583
4584 req_rpc_data->request = r;
4585 r->req_rpc_data = req_rpc_data;
4548
4549 req_app_link = &ra_local;
4550 nxt_request_app_link_init(task, req_app_link, req_rpc_data);
4551
4552 res = nxt_router_app_port(task, app, req_app_link);
4553
4554 if (res != NXT_OK) {
4555 return;

--- 74 unchanged lines hidden (view full) ---

4630 &req_app_link->msg_info.tracking,
4631 req_app_link->stream);
4632 if (nxt_slow_path(res != NXT_OK)) {
4633 nxt_request_app_link_error(req_app_link, 500,
4634 "Failed to get tracking area");
4635 goto release_port;
4636 }
4637
4586
4587 req_app_link = &ra_local;
4588 nxt_request_app_link_init(task, req_app_link, req_rpc_data);
4589
4590 res = nxt_router_app_port(task, app, req_app_link);
4591
4592 if (res != NXT_OK) {
4593 return;

--- 74 unchanged lines hidden (view full) ---

4668 &req_app_link->msg_info.tracking,
4669 req_app_link->stream);
4670 if (nxt_slow_path(res != NXT_OK)) {
4671 nxt_request_app_link_error(req_app_link, 500,
4672 "Failed to get tracking area");
4673 goto release_port;
4674 }
4675
4638 res = nxt_port_socket_twrite(task, port, NXT_PORT_MSG_DATA,
4676 res = nxt_port_socket_twrite(task, port, NXT_PORT_MSG_REQ_HEADERS,
4639 -1, req_app_link->stream, reply_port->id, buf,
4640 &req_app_link->msg_info.tracking);
4641
4642 if (nxt_slow_path(res != NXT_OK)) {
4643 nxt_request_app_link_error(req_app_link, 500,
4644 "Failed to send message to application");
4645 goto release_port;
4646 }

--- 133 unchanged lines hidden (view full) ---

4780 *p++ = '\0';
4781
4782 req->local_length = r->local->address_length;
4783 nxt_unit_sptr_set(&req->local, p);
4784 p = nxt_cpymem(p, nxt_sockaddr_address(r->local), r->local->address_length);
4785 *p++ = '\0';
4786
4787 req->tls = (r->tls != NULL);
4677 -1, req_app_link->stream, reply_port->id, buf,
4678 &req_app_link->msg_info.tracking);
4679
4680 if (nxt_slow_path(res != NXT_OK)) {
4681 nxt_request_app_link_error(req_app_link, 500,
4682 "Failed to send message to application");
4683 goto release_port;
4684 }

--- 133 unchanged lines hidden (view full) ---

4818 *p++ = '\0';
4819
4820 req->local_length = r->local->address_length;
4821 nxt_unit_sptr_set(&req->local, p);
4822 p = nxt_cpymem(p, nxt_sockaddr_address(r->local), r->local->address_length);
4823 *p++ = '\0';
4824
4825 req->tls = (r->tls != NULL);
4826 req->websocket_handshake = r->websocket_handshake;
4788
4789 req->server_name_length = r->server_name.length;
4790 nxt_unit_sptr_set(&req->server_name, p);
4791 p = nxt_cpymem(p, r->server_name.start, r->server_name.length);
4792 *p++ = '\0';
4793
4794 target_pos = p;
4795 req->target_length = (uint32_t) r->target.length;

--- 296 unchanged lines hidden ---
4827
4828 req->server_name_length = r->server_name.length;
4829 nxt_unit_sptr_set(&req->server_name, p);
4830 p = nxt_cpymem(p, r->server_name.start, r->server_name.length);
4831 *p++ = '\0';
4832
4833 target_pos = p;
4834 req->target_length = (uint32_t) r->target.length;

--- 296 unchanged lines hidden ---