Deleted Added
1
2/*
3 * Copyright (C) Igor Sysoev
4 * Copyright (C) Valentin V. Bartenev
5 * Copyright (C) NGINX, Inc.
6 */
7
8#include <nxt_router.h>

--- 61 unchanged lines hidden (view full) ---

70static nxt_router_temp_conf_t *nxt_router_temp_conf(nxt_task_t *task);
71static void nxt_router_conf_apply(nxt_task_t *task, void *obj, void *data);
72static void nxt_router_conf_ready(nxt_task_t *task,
73 nxt_router_temp_conf_t *tmcf);
74static void nxt_router_conf_error(nxt_task_t *task,
75 nxt_router_temp_conf_t *tmcf);
76static void nxt_router_conf_send(nxt_task_t *task,
77 nxt_router_temp_conf_t *tmcf, nxt_port_msg_type_t type);
78static void nxt_router_listen_sockets_sort(nxt_router_t *router,
79 nxt_router_temp_conf_t *tmcf);
80
81static nxt_int_t nxt_router_conf_create(nxt_task_t *task,
82 nxt_router_temp_conf_t *tmcf, u_char *start, u_char *end);
83static nxt_app_t *nxt_router_app_find(nxt_queue_t *queue, nxt_str_t *name);
84static nxt_app_t *nxt_router_listener_application(nxt_router_temp_conf_t *tmcf,
85 nxt_str_t *name);
86static void nxt_router_listen_socket_rpc_create(nxt_task_t *task,
87 nxt_router_temp_conf_t *tmcf, nxt_socket_conf_t *skcf);
88static void nxt_router_listen_socket_ready(nxt_task_t *task,
89 nxt_port_recv_msg_t *msg, void *data);
90static void nxt_router_listen_socket_error(nxt_task_t *task,
91 nxt_port_recv_msg_t *msg, void *data);
92static nxt_socket_conf_t *nxt_router_socket_conf(nxt_task_t *task, nxt_mp_t *mp,
93 nxt_sockaddr_t *sa);
94
95static nxt_int_t nxt_router_engines_create(nxt_task_t *task,
96 nxt_router_t *router, nxt_router_temp_conf_t *tmcf,
97 const nxt_event_interface_t *interface);
98static nxt_int_t nxt_router_engine_conf_create(nxt_router_temp_conf_t *tmcf,
99 nxt_router_engine_conf_t *recf);
100static nxt_int_t nxt_router_engine_conf_update(nxt_router_temp_conf_t *tmcf,
101 nxt_router_engine_conf_t *recf);
102static nxt_int_t nxt_router_engine_conf_delete(nxt_router_temp_conf_t *tmcf,
103 nxt_router_engine_conf_t *recf);
104static void nxt_router_engine_socket_count(nxt_queue_t *sockets);
105static nxt_int_t nxt_router_engine_joints_create(nxt_router_temp_conf_t *tmcf,
106 nxt_router_engine_conf_t *recf, nxt_queue_t *sockets,
107 nxt_work_handler_t handler);
108static nxt_int_t nxt_router_engine_quit(nxt_router_temp_conf_t *tmcf,
109 nxt_router_engine_conf_t *recf);
110static nxt_int_t nxt_router_engine_joints_delete(nxt_router_temp_conf_t *tmcf,
111 nxt_router_engine_conf_t *recf, nxt_queue_t *sockets);
112

--- 15 unchanged lines hidden (view full) ---

128static void nxt_router_listen_socket_update(nxt_task_t *task, void *obj,
129 void *data);
130static void nxt_router_listen_socket_delete(nxt_task_t *task, void *obj,
131 void *data);
132static void nxt_router_worker_thread_quit(nxt_task_t *task, void *obj,
133 void *data);
134static void nxt_router_listen_socket_close(nxt_task_t *task, void *obj,
135 void *data);
136static void nxt_router_listen_socket_release(nxt_task_t *task,
137 nxt_socket_conf_joint_t *joint);
138static void nxt_router_thread_exit_handler(nxt_task_t *task, void *obj,
139 void *data);
140static void nxt_router_conf_release(nxt_task_t *task,
141 nxt_socket_conf_joint_t *joint);
142
143static void nxt_router_app_port_ready(nxt_task_t *task,
144 nxt_port_recv_msg_t *msg, void *data);
145static void nxt_router_app_port_error(nxt_task_t *task,
146 nxt_port_recv_msg_t *msg, void *data);
147
148static nxt_port_t * nxt_router_app_get_idle_port(nxt_app_t *app);
149static void nxt_router_app_port_release(nxt_task_t *task, nxt_port_t *port,
150 uint32_t request_failed, uint32_t got_response);
151
152static void nxt_router_conn_init(nxt_task_t *task, void *obj, void *data);
153static void nxt_router_conn_http_header_parse(nxt_task_t *task, void *obj,
154 void *data);
155static void nxt_router_conn_http_body_read(nxt_task_t *task, void *obj,
156 void *data);
157static void nxt_router_process_http_request(nxt_task_t *task,
158 nxt_conn_t *c, nxt_app_parse_ctx_t *ap);
159static void nxt_router_process_http_request_mp(nxt_task_t *task,
160 nxt_req_app_link_t *ra);
161static nxt_int_t nxt_python_prepare_msg(nxt_task_t *task, nxt_app_request_t *r,
162 nxt_app_wmsg_t *wmsg);

--- 652 unchanged lines hidden (view full) ---

815
816 nxt_log(task, NXT_LOG_CRIT, "failed to apply new conf");
817
818 for (qlk = nxt_queue_first(&tmcf->creating);
819 qlk != nxt_queue_tail(&tmcf->creating);
820 qlk = nxt_queue_next(qlk))
821 {
822 skcf = nxt_queue_link_data(qlk, nxt_socket_conf_t, link);
823 s = skcf->listen.socket;
824
825 if (s != -1) {
826 nxt_socket_close(task, s);
827 }
828
829 nxt_free(skcf->socket);
830 }
831
832 router = tmcf->conf->router;
833
834 nxt_queue_add(&router->sockets, &tmcf->keeping);
835 nxt_queue_add(&router->sockets, &tmcf->deleting);
836
837 // TODO: new engines and threads

--- 117 unchanged lines hidden (view full) ---

955{
956 u_char *p;
957 size_t size;
958 nxt_mp_t *mp;
959 uint32_t next;
960 nxt_int_t ret;
961 nxt_str_t name;
962 nxt_app_t *app, *prev;
963 nxt_sockaddr_t *sa;
964 nxt_conf_value_t *conf, *http;
965 nxt_conf_value_t *applications, *application;
966 nxt_conf_value_t *listeners, *listener;
967 nxt_socket_conf_t *skcf;
968 nxt_app_lang_module_t *lang;
969 nxt_router_app_conf_t apcf;
970 nxt_router_listener_conf_t lscf;
971

--- 21 unchanged lines hidden (view full) ---

993 }
994
995 applications = nxt_conf_get_path(conf, &applications_path);
996 if (applications == NULL) {
997 nxt_log(task, NXT_LOG_CRIT, "no \"applications\" block");
998 return NXT_ERROR;
999 }
1000
1001 next = 0;
1002
1003 for ( ;; ) {
1004 application = nxt_conf_next_object_member(applications, &name, &next);
1005 if (application == NULL) {
1006 break;
1007 }
1008

--- 13 unchanged lines hidden (view full) ---

1022
1023 p = nxt_conf_json_print(app->conf.start, application, NULL);
1024 app->conf.length = p - app->conf.start;
1025
1026 nxt_assert(app->conf.length <= size);
1027
1028 nxt_debug(task, "application conf \"%V\"", &app->conf);
1029
1030 prev = nxt_router_app_find(&tmcf->conf->router->apps, &name);
1031
1032 if (prev != NULL && nxt_strstr_eq(&app->conf, &prev->conf)) {
1033 nxt_free(app);
1034
1035 nxt_queue_remove(&prev->link);
1036 nxt_queue_insert_tail(&tmcf->previous, &prev->link);
1037 continue;
1038 }

--- 82 unchanged lines hidden (view full) ---

1121 next = 0;
1122
1123 for ( ;; ) {
1124 listener = nxt_conf_next_object_member(listeners, &name, &next);
1125 if (listener == NULL) {
1126 break;
1127 }
1128
1129 sa = nxt_sockaddr_parse(mp, &name);
1130 if (sa == NULL) {
1131 nxt_log(task, NXT_LOG_CRIT, "invalid listener \"%V\"", &name);
1132 goto fail;
1133 }
1134
1135 sa->type = SOCK_STREAM;
1136
1137 nxt_debug(task, "router listener: \"%*s\"",
1138 sa->length, nxt_sockaddr_start(sa));
1139
1140 skcf = nxt_router_socket_conf(task, mp, sa);
1141 if (skcf == NULL) {
1142 goto fail;
1143 }
1144
1145 ret = nxt_conf_map_object(mp, listener, nxt_router_listener_conf,
1146 nxt_nitems(nxt_router_listener_conf), &lscf);
1147 if (ret != NXT_OK) {
1148 nxt_log(task, NXT_LOG_CRIT, "listener map error");

--- 15 unchanged lines hidden (view full) ---

1164 ret = nxt_conf_map_object(mp, http, nxt_router_http_conf,
1165 nxt_nitems(nxt_router_http_conf), skcf);
1166 if (ret != NXT_OK) {
1167 nxt_log(task, NXT_LOG_CRIT, "http map error");
1168 goto fail;
1169 }
1170 }
1171
1172 skcf->listen.handler = nxt_router_conn_init;
1173 skcf->router_conf = tmcf->conf;
1174 skcf->router_conf->count++;
1175 skcf->application = nxt_router_listener_application(tmcf,
1176 &lscf.application);
1177
1178 nxt_queue_insert_tail(&tmcf->pending, &skcf->link);
1179 }
1180
1181 nxt_router_listen_sockets_sort(tmcf->conf->router, tmcf);
1182
1183 return NXT_OK;
1184
1185app_fail:
1186
1187 nxt_free(app);
1188
1189fail:

--- 38 unchanged lines hidden (view full) ---

1228 app = nxt_router_app_find(&tmcf->previous, name);
1229 }
1230
1231 return app;
1232}
1233
1234
1235static nxt_socket_conf_t *
1236nxt_router_socket_conf(nxt_task_t *task, nxt_mp_t *mp, nxt_sockaddr_t *sa)
1237{
1238 nxt_socket_conf_t *skcf;
1239
1240 skcf = nxt_mp_zget(mp, sizeof(nxt_socket_conf_t));
1241 if (nxt_slow_path(skcf == NULL)) {
1242 return NULL;
1243 }
1244
1245 skcf->sockaddr = sa;
1246
1247 skcf->listen.sockaddr = sa;
1248
1249 nxt_listen_socket_remote_size(&skcf->listen, sa);
1250
1251 skcf->listen.socket = -1;
1252 skcf->listen.backlog = NXT_LISTEN_BACKLOG;
1253 skcf->listen.flags = NXT_NONBLOCK;
1254 skcf->listen.read_after_accept = 1;
1255
1256 return skcf;
1257}
1258
1259
1260static void
1261nxt_router_listen_sockets_sort(nxt_router_t *router,
1262 nxt_router_temp_conf_t *tmcf)
1263{
1264 nxt_queue_link_t *nqlk, *oqlk, *next;
1265 nxt_socket_conf_t *nskcf, *oskcf;
1266
1267 for (nqlk = nxt_queue_first(&tmcf->pending);
1268 nqlk != nxt_queue_tail(&tmcf->pending);
1269 nqlk = next)
1270 {
1271 next = nxt_queue_next(nqlk);
1272 nskcf = nxt_queue_link_data(nqlk, nxt_socket_conf_t, link);
1273
1274 for (oqlk = nxt_queue_first(&router->sockets);
1275 oqlk != nxt_queue_tail(&router->sockets);
1276 oqlk = nxt_queue_next(oqlk))
1277 {
1278 oskcf = nxt_queue_link_data(oqlk, nxt_socket_conf_t, link);
1279
1280 if (nxt_sockaddr_cmp(nskcf->sockaddr, oskcf->sockaddr)) {
1281 nskcf->socket = oskcf->socket;
1282 nskcf->listen.socket = oskcf->listen.socket;
1283
1284 nxt_queue_remove(oqlk);
1285 nxt_queue_insert_tail(&tmcf->keeping, oqlk);
1286
1287 nxt_queue_remove(nqlk);
1288 nxt_queue_insert_tail(&tmcf->updating, nqlk);
1289
1290 break;
1291 }
1292 }
1293 }
1294
1295 nxt_queue_add(&tmcf->deleting, &router->sockets);
1296 nxt_queue_init(&router->sockets);
1297}
1298
1299
1300static void
1301nxt_router_listen_socket_rpc_create(nxt_task_t *task,
1302 nxt_router_temp_conf_t *tmcf, nxt_socket_conf_t *skcf)
1303{
1304 size_t size;

--- 6 unchanged lines hidden (view full) ---

1311 rpc = nxt_mp_alloc(tmcf->mem_pool, sizeof(nxt_socket_rpc_t));
1312 if (rpc == NULL) {
1313 goto fail;
1314 }
1315
1316 rpc->socket_conf = skcf;
1317 rpc->temp_conf = tmcf;
1318
1319 size = nxt_sockaddr_size(skcf->sockaddr);
1320
1321 b = nxt_buf_mem_alloc(tmcf->mem_pool, size, 0);
1322 if (b == NULL) {
1323 goto fail;
1324 }
1325
1326 b->mem.free = nxt_cpymem(b->mem.free, skcf->sockaddr, size);
1327
1328 rt = task->thread->runtime;
1329 main_port = rt->port_by_type[NXT_PROCESS_MAIN];
1330 router_port = rt->port_by_type[NXT_PROCESS_ROUTER];
1331
1332 stream = nxt_port_rpc_register_handler(task, router_port,
1333 nxt_router_listen_socket_ready,
1334 nxt_router_listen_socket_error,

--- 12 unchanged lines hidden (view full) ---

1347 nxt_router_conf_error(task, tmcf);
1348}
1349
1350
1351static void
1352nxt_router_listen_socket_ready(nxt_task_t *task, nxt_port_recv_msg_t *msg,
1353 void *data)
1354{
1355 nxt_int_t ret;
1356 nxt_socket_t s;
1357 nxt_socket_rpc_t *rpc;
1358 nxt_router_socket_t *rtsk;
1359
1360 rpc = data;
1361
1362 s = msg->fd;
1363
1364 ret = nxt_socket_nonblocking(task, s);
1365 if (nxt_slow_path(ret != NXT_OK)) {
1366 goto fail;
1367 }
1368
1369 nxt_socket_defer_accept(task, s, rpc->socket_conf->sockaddr);
1370
1371 ret = nxt_listen_socket(task, s, NXT_LISTEN_BACKLOG);
1372 if (nxt_slow_path(ret != NXT_OK)) {
1373 goto fail;
1374 }
1375
1376 rtsk = nxt_malloc(sizeof(nxt_router_socket_t));
1377 if (nxt_slow_path(rtsk == NULL)) {
1378 goto fail;
1379 }
1380
1381 rtsk->count = 0;
1382 rtsk->fd = s;
1383
1384 rpc->socket_conf->listen.socket = s;
1385 rpc->socket_conf->socket = rtsk;
1386
1387 nxt_work_queue_add(&task->thread->engine->fast_work_queue,
1388 nxt_router_conf_apply, task, rpc->temp_conf, NULL);
1389
1390 return;
1391
1392fail:
1393
1394 nxt_socket_close(task, s);

--- 20 unchanged lines hidden (view full) ---

1415 nxt_string("ListenerPort"),
1416 nxt_string("ListenerInUse"),
1417 nxt_string("ListenerNoAddress"),
1418 nxt_string("ListenerNoAccess"),
1419 nxt_string("ListenerPath"),
1420 };
1421
1422 rpc = data;
1423 sa = rpc->socket_conf->sockaddr;
1424 tmcf = rpc->temp_conf;
1425
1426 in = nxt_buf_chk_make_plain(tmcf->mem_pool, msg->buf, msg->size);
1427
1428 nxt_assert(in != NULL);
1429
1430 p = in->mem.pos;
1431

--- 92 unchanged lines hidden (view full) ---

1524 return NXT_OK;
1525}
1526
1527
1528static nxt_int_t
1529nxt_router_engine_conf_create(nxt_router_temp_conf_t *tmcf,
1530 nxt_router_engine_conf_t *recf)
1531{
1532 nxt_int_t ret;
1533 nxt_thread_spinlock_t *lock;
1534
1535 ret = nxt_router_engine_joints_create(tmcf, recf, &tmcf->creating,
1536 nxt_router_listen_socket_create);
1537 if (nxt_slow_path(ret != NXT_OK)) {
1538 return ret;
1539 }
1540
1541 ret = nxt_router_engine_joints_create(tmcf, recf, &tmcf->updating,
1542 nxt_router_listen_socket_create);
1543 if (nxt_slow_path(ret != NXT_OK)) {
1544 return ret;
1545 }
1546
1547 lock = &tmcf->conf->router->lock;
1548
1549 nxt_thread_spin_lock(lock);
1550
1551 nxt_router_engine_socket_count(&tmcf->creating);
1552 nxt_router_engine_socket_count(&tmcf->updating);
1553
1554 nxt_thread_spin_unlock(lock);
1555
1556 return ret;
1557}
1558
1559
1560static nxt_int_t
1561nxt_router_engine_conf_update(nxt_router_temp_conf_t *tmcf,
1562 nxt_router_engine_conf_t *recf)
1563{
1564 nxt_int_t ret;
1565 nxt_thread_spinlock_t *lock;
1566
1567 ret = nxt_router_engine_joints_create(tmcf, recf, &tmcf->creating,
1568 nxt_router_listen_socket_create);
1569 if (nxt_slow_path(ret != NXT_OK)) {
1570 return ret;
1571 }
1572
1573 ret = nxt_router_engine_joints_create(tmcf, recf, &tmcf->updating,
1574 nxt_router_listen_socket_update);
1575 if (nxt_slow_path(ret != NXT_OK)) {
1576 return ret;
1577 }
1578
1579 ret = nxt_router_engine_joints_delete(tmcf, recf, &tmcf->deleting);
1580 if (nxt_slow_path(ret != NXT_OK)) {
1581 return ret;
1582 }
1583
1584 lock = &tmcf->conf->router->lock;
1585
1586 nxt_thread_spin_lock(lock);
1587
1588 nxt_router_engine_socket_count(&tmcf->creating);
1589
1590 nxt_thread_spin_unlock(lock);
1591
1592 return ret;
1593}
1594
1595
1596static nxt_int_t
1597nxt_router_engine_conf_delete(nxt_router_temp_conf_t *tmcf,
1598 nxt_router_engine_conf_t *recf)
1599{

--- 59 unchanged lines hidden (view full) ---

1659
1660 joint->engine = recf->engine;
1661 }
1662
1663 return NXT_OK;
1664}
1665
1666
1667static void
1668nxt_router_engine_socket_count(nxt_queue_t *sockets)
1669{
1670 nxt_queue_link_t *qlk;
1671 nxt_socket_conf_t *skcf;
1672
1673 for (qlk = nxt_queue_first(sockets);
1674 qlk != nxt_queue_tail(sockets);
1675 qlk = nxt_queue_next(qlk))
1676 {
1677 skcf = nxt_queue_link_data(qlk, nxt_socket_conf_t, link);
1678 skcf->socket->count++;
1679 }
1680}
1681
1682
1683static nxt_int_t
1684nxt_router_engine_quit(nxt_router_temp_conf_t *tmcf,
1685 nxt_router_engine_conf_t *recf)
1686{
1687 nxt_joint_job_t *job;
1688
1689 job = nxt_mp_get(tmcf->mem_pool, sizeof(nxt_joint_job_t));
1690 if (nxt_slow_path(job == NULL)) {

--- 245 unchanged lines hidden (view full) ---

1936 nxt_event_engine_start(engine);
1937}
1938
1939
1940static void
1941nxt_router_listen_socket_create(nxt_task_t *task, void *obj, void *data)
1942{
1943 nxt_joint_job_t *job;
1944 nxt_listen_event_t *listen;
1945 nxt_listen_socket_t *ls;
1946 nxt_socket_conf_joint_t *joint;
1947
1948 job = obj;
1949 joint = data;
1950
1951 ls = &joint->socket_conf->listen;
1952
1953 nxt_queue_insert_tail(&task->thread->engine->joints, &joint->link);
1954
1955 listen = nxt_listen_event(task, ls);
1956 if (nxt_slow_path(listen == NULL)) {
1957 nxt_router_listen_socket_release(task, joint);
1958 return;
1959 }
1960
1961 listen->socket.data = joint;
1962
1963 job->work.next = NULL;
1964 job->work.handler = nxt_router_conf_wait;
1965
1966 nxt_event_engine_post(job->tmcf->engine, &job->work);
1967}
1968
1969
1970nxt_inline nxt_listen_event_t *
1971nxt_router_listen_event(nxt_queue_t *listen_connections,
1972 nxt_socket_conf_t *skcf)
1973{
1974 nxt_socket_t fd;
1975 nxt_queue_link_t *qlk;
1976 nxt_listen_event_t *listen;
1977
1978 fd = skcf->socket->fd;
1979
1980 for (qlk = nxt_queue_first(listen_connections);
1981 qlk != nxt_queue_tail(listen_connections);
1982 qlk = nxt_queue_next(qlk))
1983 {
1984 listen = nxt_queue_link_data(qlk, nxt_listen_event_t, link);
1985
1986 if (fd == listen->socket.fd) {
1987 return listen;
1988 }
1989 }
1990
1991 return NULL;
1992}
1993
1994
1995static void
1996nxt_router_listen_socket_update(nxt_task_t *task, void *obj, void *data)
1997{
1998 nxt_joint_job_t *job;
1999 nxt_event_engine_t *engine;
2000 nxt_listen_event_t *listen;
2001 nxt_socket_conf_joint_t *joint, *old;
2002
2003 job = obj;
2004 joint = data;
2005
2006 engine = task->thread->engine;
2007
2008 nxt_queue_insert_tail(&engine->joints, &joint->link);
2009
2010 listen = nxt_router_listen_event(&engine->listen_connections,
2011 joint->socket_conf);
2012
2013 old = listen->socket.data;
2014 listen->socket.data = joint;
2015 listen->listen = &joint->socket_conf->listen;
2016
2017 job->work.next = NULL;
2018 job->work.handler = nxt_router_conf_wait;
2019
2020 nxt_event_engine_post(job->tmcf->engine, &job->work);
2021
2022 /*
2023 * The task is allocated from configuration temporary

--- 4 unchanged lines hidden (view full) ---

2028}
2029
2030
2031static void
2032nxt_router_listen_socket_delete(nxt_task_t *task, void *obj, void *data)
2033{
2034 nxt_joint_job_t *job;
2035 nxt_socket_conf_t *skcf;
2036 nxt_listen_event_t *listen;
2037 nxt_event_engine_t *engine;
2038
2039 job = obj;
2040 skcf = data;
2041
2042 engine = task->thread->engine;
2043
2044 listen = nxt_router_listen_event(&engine->listen_connections, skcf);
2045
2046 nxt_fd_event_delete(engine, &listen->socket);
2047
2048 nxt_debug(task, "engine %p: listen socket delete: %d", engine,
2049 listen->socket.fd);
2050
2051 listen->timer.handler = nxt_router_listen_socket_close;
2052 listen->timer.work_queue = &engine->fast_work_queue;
2053
2054 nxt_timer_add(engine, &listen->timer, 0);
2055
2056 job->work.next = NULL;
2057 job->work.handler = nxt_router_conf_wait;
2058
2059 nxt_event_engine_post(job->tmcf->engine, &job->work);
2060}
2061
2062

--- 13 unchanged lines hidden (view full) ---

2076 }
2077}
2078
2079
2080static void
2081nxt_router_listen_socket_close(nxt_task_t *task, void *obj, void *data)
2082{
2083 nxt_timer_t *timer;
2084 nxt_listen_event_t *listen;
2085 nxt_socket_conf_joint_t *joint;
2086
2087 timer = obj;
2088 listen = nxt_timer_data(timer, nxt_listen_event_t, timer);
2089 joint = listen->socket.data;
2090
2091 nxt_debug(task, "engine %p: listen socket close: %d", task->thread->engine,
2092 listen->socket.fd);
2093
2094 nxt_queue_remove(&listen->link);
2095
2096 /* 'task' refers to listen->task and we cannot use after nxt_free() */
2097 task = &task->thread->engine->task;
2098
2099 nxt_free(listen);
2100
2101 nxt_router_listen_socket_release(task, joint);
2102}
2103
2104
2105static void
2106nxt_router_listen_socket_release(nxt_task_t *task,
2107 nxt_socket_conf_joint_t *joint)
2108{
2109 nxt_socket_conf_t *skcf;
2110 nxt_router_socket_t *rtsk;
2111 nxt_thread_spinlock_t *lock;
2112
2113 skcf = joint->socket_conf;
2114 rtsk = skcf->socket;
2115 lock = &skcf->router_conf->router->lock;
2116
2117 nxt_thread_spin_lock(lock);
2118
2119 nxt_debug(task, "engine %p: listen socket release: rtsk->count %D",
2120 task->thread->engine, rtsk->count);
2121
2122 if (--rtsk->count != 0) {
2123 rtsk = NULL;
2124 }
2125
2126 nxt_thread_spin_unlock(lock);
2127
2128 if (rtsk != NULL) {
2129 nxt_socket_close(task, rtsk->fd);
2130 nxt_free(rtsk);
2131 skcf->socket = NULL;
2132 }
2133
2134 nxt_router_conf_release(task, joint);
2135}
2136
2137
2138static void
2139nxt_router_conf_release(nxt_task_t *task, nxt_socket_conf_joint_t *joint)
2140{
2141 nxt_bool_t exit;
2142 nxt_socket_conf_t *skcf;

--- 113 unchanged lines hidden (view full) ---

2256};
2257
2258
2259static void
2260nxt_router_conn_init(nxt_task_t *task, void *obj, void *data)
2261{
2262 size_t size;
2263 nxt_conn_t *c;
2264 nxt_event_engine_t *engine;
2265 nxt_socket_conf_joint_t *joint;
2266
2267 c = obj;
2268 joint = data;
2269
2270 nxt_debug(task, "router conn init");
2271
2272 joint->count++;
2273
2274 size = joint->socket_conf->header_buffer_size;
2275 c->read = nxt_buf_mem_alloc(c->mem_pool, size, 0);
2276
2277 c->socket.data = NULL;
2278
2279 engine = task->thread->engine;
2280 c->read_work_queue = &engine->fast_work_queue;
2281 c->write_work_queue = &engine->fast_work_queue;
2282

--- 497 unchanged lines hidden (view full) ---

2780 nxt_port_t *port;
2781 nxt_event_engine_t *engine;
2782 nxt_socket_conf_joint_t *joint;
2783
2784 port = NULL;
2785 use_delta = 1;
2786 c = ra->rc->conn;
2787
2788 joint = c->listen->socket.data;
2789 app = joint->socket_conf->application;
2790
2791 if (app == NULL) {
2792 nxt_router_gen_error(task, c, 500,
2793 "Application is NULL in socket_conf");
2794 return NXT_ERROR;
2795 }
2796

--- 84 unchanged lines hidden (view full) ---

2881 nxt_app_parse_ctx_t *ap;
2882 nxt_app_request_body_t *b;
2883 nxt_socket_conf_joint_t *joint;
2884 nxt_app_request_header_t *h;
2885
2886 c = obj;
2887 ap = data;
2888 buf = c->read;
2889 joint = c->listen->socket.data;
2890
2891 nxt_debug(task, "router conn http header parse");
2892
2893 if (ap == NULL) {
2894 ap = nxt_app_http_req_init(task);
2895 if (nxt_slow_path(ap == NULL)) {
2896 nxt_router_gen_error(task, c, 500,
2897 "Failed to allocate parse context");
2898 return;
2899 }
2900
2901 c->socket.data = ap;
2902
2903 ap->r.remote.start = nxt_sockaddr_address(c->remote);
2904 ap->r.remote.length = c->remote->address_length;
2905
2906 local = joint->socket_conf->sockaddr;
2907 ap->r.local.start = nxt_sockaddr_address(local);
2908 ap->r.local.length = local->address_length;
2909
2910 ap->r.header.buf = buf;
2911 }
2912
2913 h = &ap->r.header;
2914 b = &ap->r.body;
2915
2916 ret = nxt_app_http_req_header_parse(task, ap, buf);
2917

--- 77 unchanged lines hidden (view full) ---

2995 }
2996
2997 }
2998
2999 nxt_conn_read(task->thread->engine, c);
3000}
3001
3002
3003static void
3004nxt_router_conn_http_body_read(nxt_task_t *task, void *obj, void *data)
3005{
3006 size_t size;
3007 nxt_int_t ret;
3008 nxt_buf_t *buf;
3009 nxt_conn_t *c;
3010 nxt_app_parse_ctx_t *ap;

--- 24 unchanged lines hidden (view full) ---

3035
3036 case NXT_ERROR:
3037 nxt_router_gen_error(task, c, 500, "Read body error");
3038 return;
3039
3040 default: /* NXT_AGAIN */
3041
3042 if (nxt_buf_mem_free_size(&buf->mem) == 0) {
3043 joint = c->listen->socket.data;
3044
3045 b->preread_size += nxt_buf_mem_used_size(&buf->mem);
3046
3047 size = nxt_min(joint->socket_conf->body_buffer_size,
3048 (size_t) h->parsed_content_length - b->preread_size);
3049
3050 buf->next = nxt_buf_mem_alloc(c->mem_pool, size, 0);
3051 if (nxt_slow_path(buf->next == NULL)) {

--- 528 unchanged lines hidden (view full) ---

3580 } nxt_queue_loop;
3581
3582 nxt_queue_remove(&c->link);
3583
3584 engine = task->thread->engine;
3585
3586 nxt_sockaddr_cache_free(engine, c);
3587
3588 joint = c->listen->socket.data;
3589
3590 nxt_mp_cleanup(c->mem_pool, nxt_router_conn_mp_cleanup,
3591 &engine->task, joint, NULL);
3592
3593 nxt_mp_release(c->mem_pool, c);
3594}
3595
3596

--- 51 unchanged lines hidden (view full) ---

3648}
3649
3650
3651static nxt_msec_t
3652nxt_router_conn_timeout_value(nxt_conn_t *c, uintptr_t data)
3653{
3654 nxt_socket_conf_joint_t *joint;
3655
3656 joint = c->listen->socket.data;
3657
3658 return nxt_value_at(nxt_msec_t, joint->socket_conf, data);
3659}