nxt_router.c (358:40bbd4c2349d) nxt_router.c (359:d4848619451a)
1
2/*
3 * Copyright (C) Igor Sysoev
4 * Copyright (C) Valentin V. Bartenev
5 * Copyright (C) NGINX, Inc.
6 */
7
8#include <nxt_router.h>

--- 61 unchanged lines hidden (view full) ---

70static nxt_router_temp_conf_t *nxt_router_temp_conf(nxt_task_t *task);
71static void nxt_router_conf_apply(nxt_task_t *task, void *obj, void *data);
72static void nxt_router_conf_ready(nxt_task_t *task,
73 nxt_router_temp_conf_t *tmcf);
74static void nxt_router_conf_error(nxt_task_t *task,
75 nxt_router_temp_conf_t *tmcf);
76static void nxt_router_conf_send(nxt_task_t *task,
77 nxt_router_temp_conf_t *tmcf, nxt_port_msg_type_t type);
1
2/*
3 * Copyright (C) Igor Sysoev
4 * Copyright (C) Valentin V. Bartenev
5 * Copyright (C) NGINX, Inc.
6 */
7
8#include <nxt_router.h>

--- 61 unchanged lines hidden (view full) ---

70static nxt_router_temp_conf_t *nxt_router_temp_conf(nxt_task_t *task);
71static void nxt_router_conf_apply(nxt_task_t *task, void *obj, void *data);
72static void nxt_router_conf_ready(nxt_task_t *task,
73 nxt_router_temp_conf_t *tmcf);
74static void nxt_router_conf_error(nxt_task_t *task,
75 nxt_router_temp_conf_t *tmcf);
76static void nxt_router_conf_send(nxt_task_t *task,
77 nxt_router_temp_conf_t *tmcf, nxt_port_msg_type_t type);
78static void nxt_router_listen_sockets_sort(nxt_router_t *router,
79 nxt_router_temp_conf_t *tmcf);
80
81static nxt_int_t nxt_router_conf_create(nxt_task_t *task,
82 nxt_router_temp_conf_t *tmcf, u_char *start, u_char *end);
83static nxt_app_t *nxt_router_app_find(nxt_queue_t *queue, nxt_str_t *name);
84static nxt_app_t *nxt_router_listener_application(nxt_router_temp_conf_t *tmcf,
85 nxt_str_t *name);
86static void nxt_router_listen_socket_rpc_create(nxt_task_t *task,
87 nxt_router_temp_conf_t *tmcf, nxt_socket_conf_t *skcf);
88static void nxt_router_listen_socket_ready(nxt_task_t *task,
89 nxt_port_recv_msg_t *msg, void *data);
90static void nxt_router_listen_socket_error(nxt_task_t *task,
91 nxt_port_recv_msg_t *msg, void *data);
78
79static nxt_int_t nxt_router_conf_create(nxt_task_t *task,
80 nxt_router_temp_conf_t *tmcf, u_char *start, u_char *end);
81static nxt_app_t *nxt_router_app_find(nxt_queue_t *queue, nxt_str_t *name);
82static nxt_app_t *nxt_router_listener_application(nxt_router_temp_conf_t *tmcf,
83 nxt_str_t *name);
84static void nxt_router_listen_socket_rpc_create(nxt_task_t *task,
85 nxt_router_temp_conf_t *tmcf, nxt_socket_conf_t *skcf);
86static void nxt_router_listen_socket_ready(nxt_task_t *task,
87 nxt_port_recv_msg_t *msg, void *data);
88static void nxt_router_listen_socket_error(nxt_task_t *task,
89 nxt_port_recv_msg_t *msg, void *data);
92static nxt_socket_conf_t *nxt_router_socket_conf(nxt_task_t *task, nxt_mp_t *mp,
93 nxt_sockaddr_t *sa);
90static nxt_socket_conf_t *nxt_router_socket_conf(nxt_task_t *task,
91 nxt_router_temp_conf_t *tmcf, nxt_str_t *name);
92static nxt_int_t nxt_router_listen_socket_find(nxt_router_temp_conf_t *tmcf,
93 nxt_socket_conf_t *nskcf, nxt_sockaddr_t *sa);
94
95static nxt_int_t nxt_router_engines_create(nxt_task_t *task,
96 nxt_router_t *router, nxt_router_temp_conf_t *tmcf,
97 const nxt_event_interface_t *interface);
98static nxt_int_t nxt_router_engine_conf_create(nxt_router_temp_conf_t *tmcf,
99 nxt_router_engine_conf_t *recf);
100static nxt_int_t nxt_router_engine_conf_update(nxt_router_temp_conf_t *tmcf,
101 nxt_router_engine_conf_t *recf);
102static nxt_int_t nxt_router_engine_conf_delete(nxt_router_temp_conf_t *tmcf,
103 nxt_router_engine_conf_t *recf);
94
95static nxt_int_t nxt_router_engines_create(nxt_task_t *task,
96 nxt_router_t *router, nxt_router_temp_conf_t *tmcf,
97 const nxt_event_interface_t *interface);
98static nxt_int_t nxt_router_engine_conf_create(nxt_router_temp_conf_t *tmcf,
99 nxt_router_engine_conf_t *recf);
100static nxt_int_t nxt_router_engine_conf_update(nxt_router_temp_conf_t *tmcf,
101 nxt_router_engine_conf_t *recf);
102static nxt_int_t nxt_router_engine_conf_delete(nxt_router_temp_conf_t *tmcf,
103 nxt_router_engine_conf_t *recf);
104static void nxt_router_engine_socket_count(nxt_queue_t *sockets);
105static nxt_int_t nxt_router_engine_joints_create(nxt_router_temp_conf_t *tmcf,
106 nxt_router_engine_conf_t *recf, nxt_queue_t *sockets,
107 nxt_work_handler_t handler);
108static nxt_int_t nxt_router_engine_quit(nxt_router_temp_conf_t *tmcf,
109 nxt_router_engine_conf_t *recf);
110static nxt_int_t nxt_router_engine_joints_delete(nxt_router_temp_conf_t *tmcf,
111 nxt_router_engine_conf_t *recf, nxt_queue_t *sockets);
112

--- 15 unchanged lines hidden (view full) ---

128static void nxt_router_listen_socket_update(nxt_task_t *task, void *obj,
129 void *data);
130static void nxt_router_listen_socket_delete(nxt_task_t *task, void *obj,
131 void *data);
132static void nxt_router_worker_thread_quit(nxt_task_t *task, void *obj,
133 void *data);
134static void nxt_router_listen_socket_close(nxt_task_t *task, void *obj,
135 void *data);
104static nxt_int_t nxt_router_engine_joints_create(nxt_router_temp_conf_t *tmcf,
105 nxt_router_engine_conf_t *recf, nxt_queue_t *sockets,
106 nxt_work_handler_t handler);
107static nxt_int_t nxt_router_engine_quit(nxt_router_temp_conf_t *tmcf,
108 nxt_router_engine_conf_t *recf);
109static nxt_int_t nxt_router_engine_joints_delete(nxt_router_temp_conf_t *tmcf,
110 nxt_router_engine_conf_t *recf, nxt_queue_t *sockets);
111

--- 15 unchanged lines hidden (view full) ---

127static void nxt_router_listen_socket_update(nxt_task_t *task, void *obj,
128 void *data);
129static void nxt_router_listen_socket_delete(nxt_task_t *task, void *obj,
130 void *data);
131static void nxt_router_worker_thread_quit(nxt_task_t *task, void *obj,
132 void *data);
133static void nxt_router_listen_socket_close(nxt_task_t *task, void *obj,
134 void *data);
136static void nxt_router_listen_socket_release(nxt_task_t *task,
137 nxt_socket_conf_joint_t *joint);
138static void nxt_router_thread_exit_handler(nxt_task_t *task, void *obj,
139 void *data);
135static void nxt_router_thread_exit_handler(nxt_task_t *task, void *obj,
136 void *data);
137static void nxt_router_listen_socket_release(nxt_task_t *task,
138 nxt_socket_conf_t *skcf);
140static void nxt_router_conf_release(nxt_task_t *task,
141 nxt_socket_conf_joint_t *joint);
142
143static void nxt_router_app_port_ready(nxt_task_t *task,
144 nxt_port_recv_msg_t *msg, void *data);
145static void nxt_router_app_port_error(nxt_task_t *task,
146 nxt_port_recv_msg_t *msg, void *data);
147
148static nxt_port_t * nxt_router_app_get_idle_port(nxt_app_t *app);
149static void nxt_router_app_port_release(nxt_task_t *task, nxt_port_t *port,
150 uint32_t request_failed, uint32_t got_response);
151
152static void nxt_router_conn_init(nxt_task_t *task, void *obj, void *data);
153static void nxt_router_conn_http_header_parse(nxt_task_t *task, void *obj,
154 void *data);
139static void nxt_router_conf_release(nxt_task_t *task,
140 nxt_socket_conf_joint_t *joint);
141
142static void nxt_router_app_port_ready(nxt_task_t *task,
143 nxt_port_recv_msg_t *msg, void *data);
144static void nxt_router_app_port_error(nxt_task_t *task,
145 nxt_port_recv_msg_t *msg, void *data);
146
147static nxt_port_t * nxt_router_app_get_idle_port(nxt_app_t *app);
148static void nxt_router_app_port_release(nxt_task_t *task, nxt_port_t *port,
149 uint32_t request_failed, uint32_t got_response);
150
151static void nxt_router_conn_init(nxt_task_t *task, void *obj, void *data);
152static void nxt_router_conn_http_header_parse(nxt_task_t *task, void *obj,
153 void *data);
154static nxt_sockaddr_t *nxt_router_local_addr(nxt_task_t *task, nxt_conn_t *c);
155static void nxt_router_conn_http_body_read(nxt_task_t *task, void *obj,
156 void *data);
157static void nxt_router_process_http_request(nxt_task_t *task,
158 nxt_conn_t *c, nxt_app_parse_ctx_t *ap);
159static void nxt_router_process_http_request_mp(nxt_task_t *task,
160 nxt_req_app_link_t *ra);
161static nxt_int_t nxt_python_prepare_msg(nxt_task_t *task, nxt_app_request_t *r,
162 nxt_app_wmsg_t *wmsg);

--- 652 unchanged lines hidden (view full) ---

815
816 nxt_log(task, NXT_LOG_CRIT, "failed to apply new conf");
817
818 for (qlk = nxt_queue_first(&tmcf->creating);
819 qlk != nxt_queue_tail(&tmcf->creating);
820 qlk = nxt_queue_next(qlk))
821 {
822 skcf = nxt_queue_link_data(qlk, nxt_socket_conf_t, link);
155static void nxt_router_conn_http_body_read(nxt_task_t *task, void *obj,
156 void *data);
157static void nxt_router_process_http_request(nxt_task_t *task,
158 nxt_conn_t *c, nxt_app_parse_ctx_t *ap);
159static void nxt_router_process_http_request_mp(nxt_task_t *task,
160 nxt_req_app_link_t *ra);
161static nxt_int_t nxt_python_prepare_msg(nxt_task_t *task, nxt_app_request_t *r,
162 nxt_app_wmsg_t *wmsg);

--- 652 unchanged lines hidden (view full) ---

815
816 nxt_log(task, NXT_LOG_CRIT, "failed to apply new conf");
817
818 for (qlk = nxt_queue_first(&tmcf->creating);
819 qlk != nxt_queue_tail(&tmcf->creating);
820 qlk = nxt_queue_next(qlk))
821 {
822 skcf = nxt_queue_link_data(qlk, nxt_socket_conf_t, link);
823 s = skcf->listen.socket;
823 s = skcf->listen->socket;
824
825 if (s != -1) {
826 nxt_socket_close(task, s);
827 }
828
824
825 if (s != -1) {
826 nxt_socket_close(task, s);
827 }
828
829 nxt_free(skcf->socket);
829 nxt_free(skcf->listen);
830 }
831
832 router = tmcf->conf->router;
833
834 nxt_queue_add(&router->sockets, &tmcf->keeping);
835 nxt_queue_add(&router->sockets, &tmcf->deleting);
836
837 // TODO: new engines and threads

--- 117 unchanged lines hidden (view full) ---

955{
956 u_char *p;
957 size_t size;
958 nxt_mp_t *mp;
959 uint32_t next;
960 nxt_int_t ret;
961 nxt_str_t name;
962 nxt_app_t *app, *prev;
830 }
831
832 router = tmcf->conf->router;
833
834 nxt_queue_add(&router->sockets, &tmcf->keeping);
835 nxt_queue_add(&router->sockets, &tmcf->deleting);
836
837 // TODO: new engines and threads

--- 117 unchanged lines hidden (view full) ---

955{
956 u_char *p;
957 size_t size;
958 nxt_mp_t *mp;
959 uint32_t next;
960 nxt_int_t ret;
961 nxt_str_t name;
962 nxt_app_t *app, *prev;
963 nxt_sockaddr_t *sa;
963 nxt_router_t *router;
964 nxt_conf_value_t *conf, *http;
965 nxt_conf_value_t *applications, *application;
966 nxt_conf_value_t *listeners, *listener;
967 nxt_socket_conf_t *skcf;
968 nxt_app_lang_module_t *lang;
969 nxt_router_app_conf_t apcf;
970 nxt_router_listener_conf_t lscf;
971

--- 21 unchanged lines hidden (view full) ---

993 }
994
995 applications = nxt_conf_get_path(conf, &applications_path);
996 if (applications == NULL) {
997 nxt_log(task, NXT_LOG_CRIT, "no \"applications\" block");
998 return NXT_ERROR;
999 }
1000
964 nxt_conf_value_t *conf, *http;
965 nxt_conf_value_t *applications, *application;
966 nxt_conf_value_t *listeners, *listener;
967 nxt_socket_conf_t *skcf;
968 nxt_app_lang_module_t *lang;
969 nxt_router_app_conf_t apcf;
970 nxt_router_listener_conf_t lscf;
971

--- 21 unchanged lines hidden (view full) ---

993 }
994
995 applications = nxt_conf_get_path(conf, &applications_path);
996 if (applications == NULL) {
997 nxt_log(task, NXT_LOG_CRIT, "no \"applications\" block");
998 return NXT_ERROR;
999 }
1000
1001 router = tmcf->conf->router;
1002
1001 next = 0;
1002
1003 for ( ;; ) {
1004 application = nxt_conf_next_object_member(applications, &name, &next);
1005 if (application == NULL) {
1006 break;
1007 }
1008

--- 13 unchanged lines hidden (view full) ---

1022
1023 p = nxt_conf_json_print(app->conf.start, application, NULL);
1024 app->conf.length = p - app->conf.start;
1025
1026 nxt_assert(app->conf.length <= size);
1027
1028 nxt_debug(task, "application conf \"%V\"", &app->conf);
1029
1003 next = 0;
1004
1005 for ( ;; ) {
1006 application = nxt_conf_next_object_member(applications, &name, &next);
1007 if (application == NULL) {
1008 break;
1009 }
1010

--- 13 unchanged lines hidden (view full) ---

1024
1025 p = nxt_conf_json_print(app->conf.start, application, NULL);
1026 app->conf.length = p - app->conf.start;
1027
1028 nxt_assert(app->conf.length <= size);
1029
1030 nxt_debug(task, "application conf \"%V\"", &app->conf);
1031
1030 prev = nxt_router_app_find(&tmcf->conf->router->apps, &name);
1032 prev = nxt_router_app_find(&router->apps, &name);
1031
1032 if (prev != NULL && nxt_strstr_eq(&app->conf, &prev->conf)) {
1033 nxt_free(app);
1034
1035 nxt_queue_remove(&prev->link);
1036 nxt_queue_insert_tail(&tmcf->previous, &prev->link);
1037 continue;
1038 }

--- 82 unchanged lines hidden (view full) ---

1121 next = 0;
1122
1123 for ( ;; ) {
1124 listener = nxt_conf_next_object_member(listeners, &name, &next);
1125 if (listener == NULL) {
1126 break;
1127 }
1128
1033
1034 if (prev != NULL && nxt_strstr_eq(&app->conf, &prev->conf)) {
1035 nxt_free(app);
1036
1037 nxt_queue_remove(&prev->link);
1038 nxt_queue_insert_tail(&tmcf->previous, &prev->link);
1039 continue;
1040 }

--- 82 unchanged lines hidden (view full) ---

1123 next = 0;
1124
1125 for ( ;; ) {
1126 listener = nxt_conf_next_object_member(listeners, &name, &next);
1127 if (listener == NULL) {
1128 break;
1129 }
1130
1129 sa = nxt_sockaddr_parse(mp, &name);
1130 if (sa == NULL) {
1131 nxt_log(task, NXT_LOG_CRIT, "invalid listener \"%V\"", &name);
1132 goto fail;
1133 }
1134
1135 sa->type = SOCK_STREAM;
1136
1137 nxt_debug(task, "router listener: \"%*s\"",
1138 sa->length, nxt_sockaddr_start(sa));
1139
1140 skcf = nxt_router_socket_conf(task, mp, sa);
1131 skcf = nxt_router_socket_conf(task, tmcf, &name);
1141 if (skcf == NULL) {
1142 goto fail;
1143 }
1144
1145 ret = nxt_conf_map_object(mp, listener, nxt_router_listener_conf,
1146 nxt_nitems(nxt_router_listener_conf), &lscf);
1147 if (ret != NXT_OK) {
1148 nxt_log(task, NXT_LOG_CRIT, "listener map error");

--- 15 unchanged lines hidden (view full) ---

1164 ret = nxt_conf_map_object(mp, http, nxt_router_http_conf,
1165 nxt_nitems(nxt_router_http_conf), skcf);
1166 if (ret != NXT_OK) {
1167 nxt_log(task, NXT_LOG_CRIT, "http map error");
1168 goto fail;
1169 }
1170 }
1171
1132 if (skcf == NULL) {
1133 goto fail;
1134 }
1135
1136 ret = nxt_conf_map_object(mp, listener, nxt_router_listener_conf,
1137 nxt_nitems(nxt_router_listener_conf), &lscf);
1138 if (ret != NXT_OK) {
1139 nxt_log(task, NXT_LOG_CRIT, "listener map error");

--- 15 unchanged lines hidden (view full) ---

1155 ret = nxt_conf_map_object(mp, http, nxt_router_http_conf,
1156 nxt_nitems(nxt_router_http_conf), skcf);
1157 if (ret != NXT_OK) {
1158 nxt_log(task, NXT_LOG_CRIT, "http map error");
1159 goto fail;
1160 }
1161 }
1162
1172 skcf->listen.handler = nxt_router_conn_init;
1163 skcf->listen->handler = nxt_router_conn_init;
1173 skcf->router_conf = tmcf->conf;
1174 skcf->router_conf->count++;
1175 skcf->application = nxt_router_listener_application(tmcf,
1176 &lscf.application);
1164 skcf->router_conf = tmcf->conf;
1165 skcf->router_conf->count++;
1166 skcf->application = nxt_router_listener_application(tmcf,
1167 &lscf.application);
1177
1178 nxt_queue_insert_tail(&tmcf->pending, &skcf->link);
1179 }
1180
1168 }
1169
1181 nxt_router_listen_sockets_sort(tmcf->conf->router, tmcf);
1170 nxt_queue_add(&tmcf->deleting, &router->sockets);
1171 nxt_queue_init(&router->sockets);
1182
1183 return NXT_OK;
1184
1185app_fail:
1186
1187 nxt_free(app);
1188
1189fail:

--- 38 unchanged lines hidden (view full) ---

1228 app = nxt_router_app_find(&tmcf->previous, name);
1229 }
1230
1231 return app;
1232}
1233
1234
1235static nxt_socket_conf_t *
1172
1173 return NXT_OK;
1174
1175app_fail:
1176
1177 nxt_free(app);
1178
1179fail:

--- 38 unchanged lines hidden (view full) ---

1218 app = nxt_router_app_find(&tmcf->previous, name);
1219 }
1220
1221 return app;
1222}
1223
1224
1225static nxt_socket_conf_t *
1236nxt_router_socket_conf(nxt_task_t *task, nxt_mp_t *mp, nxt_sockaddr_t *sa)
1226nxt_router_socket_conf(nxt_task_t *task, nxt_router_temp_conf_t *tmcf,
1227 nxt_str_t *name)
1237{
1228{
1238 nxt_socket_conf_t *skcf;
1229 size_t size;
1230 nxt_int_t ret;
1231 nxt_bool_t wildcard;
1232 nxt_sockaddr_t *sa;
1233 nxt_socket_conf_t *skcf;
1234 nxt_listen_socket_t *ls;
1239
1235
1240 skcf = nxt_mp_zget(mp, sizeof(nxt_socket_conf_t));
1236 sa = nxt_sockaddr_parse(tmcf->mem_pool, name);
1237 if (nxt_slow_path(sa == NULL)) {
1238 nxt_log(task, NXT_LOG_CRIT, "invalid listener \"%V\"", name);
1239 return NULL;
1240 }
1241
1242 sa->type = SOCK_STREAM;
1243
1244 nxt_debug(task, "router listener: \"%*s\"",
1245 sa->length, nxt_sockaddr_start(sa));
1246
1247 skcf = nxt_mp_zget(tmcf->conf->mem_pool, sizeof(nxt_socket_conf_t));
1241 if (nxt_slow_path(skcf == NULL)) {
1242 return NULL;
1243 }
1244
1248 if (nxt_slow_path(skcf == NULL)) {
1249 return NULL;
1250 }
1251
1245 skcf->sockaddr = sa;
1252 size = nxt_sockaddr_size(sa);
1246
1253
1247 skcf->listen.sockaddr = sa;
1254 ret = nxt_router_listen_socket_find(tmcf, skcf, sa);
1248
1255
1249 nxt_listen_socket_remote_size(&skcf->listen, sa);
1256 if (ret != NXT_OK) {
1250
1257
1251 skcf->listen.socket = -1;
1252 skcf->listen.backlog = NXT_LISTEN_BACKLOG;
1253 skcf->listen.flags = NXT_NONBLOCK;
1254 skcf->listen.read_after_accept = 1;
1258 ls = nxt_zalloc(sizeof(nxt_listen_socket_t) + size);
1259 if (nxt_slow_path(ls == NULL)) {
1260 return NULL;
1261 }
1255
1262
1263 skcf->listen = ls;
1264
1265 ls->sockaddr = nxt_pointer_to(ls, sizeof(nxt_listen_socket_t));
1266 nxt_memcpy(ls->sockaddr, sa, size);
1267
1268 nxt_listen_socket_remote_size(ls);
1269
1270 ls->socket = -1;
1271 ls->backlog = NXT_LISTEN_BACKLOG;
1272 ls->flags = NXT_NONBLOCK;
1273 ls->read_after_accept = 1;
1274 }
1275
1276 switch (sa->u.sockaddr.sa_family) {
1277#if (NXT_HAVE_UNIX_DOMAIN)
1278 case AF_UNIX:
1279 wildcard = 0;
1280 break;
1281#endif
1282#if (NXT_INET6)
1283 case AF_INET6:
1284 wildcard = IN6_IS_ADDR_UNSPECIFIED(&sa->u.sockaddr_in6.sin6_addr);
1285 break;
1286#endif
1287 case AF_INET:
1288 default:
1289 wildcard = (sa->u.sockaddr_in.sin_addr.s_addr == INADDR_ANY);
1290 break;
1291 }
1292
1293 if (!wildcard) {
1294 skcf->sockaddr = nxt_mp_zget(tmcf->conf->mem_pool, size);
1295 if (nxt_slow_path(skcf->sockaddr == NULL)) {
1296 return NULL;
1297 }
1298
1299 nxt_memcpy(skcf->sockaddr, sa, size);
1300 }
1301
1256 return skcf;
1257}
1258
1259
1302 return skcf;
1303}
1304
1305
1260static void
1261nxt_router_listen_sockets_sort(nxt_router_t *router,
1262 nxt_router_temp_conf_t *tmcf)
1306static nxt_int_t
1307nxt_router_listen_socket_find(nxt_router_temp_conf_t *tmcf,
1308 nxt_socket_conf_t *nskcf, nxt_sockaddr_t *sa)
1263{
1309{
1264 nxt_queue_link_t *nqlk, *oqlk, *next;
1265 nxt_socket_conf_t *nskcf, *oskcf;
1310 nxt_router_t *router;
1311 nxt_queue_link_t *qlk;
1312 nxt_socket_conf_t *skcf;
1266
1313
1267 for (nqlk = nxt_queue_first(&tmcf->pending);
1268 nqlk != nxt_queue_tail(&tmcf->pending);
1269 nqlk = next)
1314 router = tmcf->conf->router;
1315
1316 for (qlk = nxt_queue_first(&router->sockets);
1317 qlk != nxt_queue_tail(&router->sockets);
1318 qlk = nxt_queue_next(qlk))
1270 {
1319 {
1271 next = nxt_queue_next(nqlk);
1272 nskcf = nxt_queue_link_data(nqlk, nxt_socket_conf_t, link);
1320 skcf = nxt_queue_link_data(qlk, nxt_socket_conf_t, link);
1273
1321
1274 for (oqlk = nxt_queue_first(&router->sockets);
1275 oqlk != nxt_queue_tail(&router->sockets);
1276 oqlk = nxt_queue_next(oqlk))
1277 {
1278 oskcf = nxt_queue_link_data(oqlk, nxt_socket_conf_t, link);
1322 if (nxt_sockaddr_cmp(skcf->listen->sockaddr, sa)) {
1323 nskcf->listen = skcf->listen;
1279
1324
1280 if (nxt_sockaddr_cmp(nskcf->sockaddr, oskcf->sockaddr)) {
1281 nskcf->socket = oskcf->socket;
1282 nskcf->listen.socket = oskcf->listen.socket;
1325 nxt_queue_remove(qlk);
1326 nxt_queue_insert_tail(&tmcf->keeping, qlk);
1283
1327
1284 nxt_queue_remove(oqlk);
1285 nxt_queue_insert_tail(&tmcf->keeping, oqlk);
1328 nxt_queue_insert_tail(&tmcf->updating, &nskcf->link);
1286
1329
1287 nxt_queue_remove(nqlk);
1288 nxt_queue_insert_tail(&tmcf->updating, nqlk);
1289
1290 break;
1291 }
1330 return NXT_OK;
1292 }
1293 }
1294
1331 }
1332 }
1333
1295 nxt_queue_add(&tmcf->deleting, &router->sockets);
1296 nxt_queue_init(&router->sockets);
1334 nxt_queue_insert_tail(&tmcf->pending, &nskcf->link);
1335
1336 return NXT_DECLINED;
1297}
1298
1299
1300static void
1301nxt_router_listen_socket_rpc_create(nxt_task_t *task,
1302 nxt_router_temp_conf_t *tmcf, nxt_socket_conf_t *skcf)
1303{
1304 size_t size;

--- 6 unchanged lines hidden (view full) ---

1311 rpc = nxt_mp_alloc(tmcf->mem_pool, sizeof(nxt_socket_rpc_t));
1312 if (rpc == NULL) {
1313 goto fail;
1314 }
1315
1316 rpc->socket_conf = skcf;
1317 rpc->temp_conf = tmcf;
1318
1337}
1338
1339
1340static void
1341nxt_router_listen_socket_rpc_create(nxt_task_t *task,
1342 nxt_router_temp_conf_t *tmcf, nxt_socket_conf_t *skcf)
1343{
1344 size_t size;

--- 6 unchanged lines hidden (view full) ---

1351 rpc = nxt_mp_alloc(tmcf->mem_pool, sizeof(nxt_socket_rpc_t));
1352 if (rpc == NULL) {
1353 goto fail;
1354 }
1355
1356 rpc->socket_conf = skcf;
1357 rpc->temp_conf = tmcf;
1358
1319 size = nxt_sockaddr_size(skcf->sockaddr);
1359 size = nxt_sockaddr_size(skcf->listen->sockaddr);
1320
1321 b = nxt_buf_mem_alloc(tmcf->mem_pool, size, 0);
1322 if (b == NULL) {
1323 goto fail;
1324 }
1325
1360
1361 b = nxt_buf_mem_alloc(tmcf->mem_pool, size, 0);
1362 if (b == NULL) {
1363 goto fail;
1364 }
1365
1326 b->mem.free = nxt_cpymem(b->mem.free, skcf->sockaddr, size);
1366 b->mem.free = nxt_cpymem(b->mem.free, skcf->listen->sockaddr, size);
1327
1328 rt = task->thread->runtime;
1329 main_port = rt->port_by_type[NXT_PROCESS_MAIN];
1330 router_port = rt->port_by_type[NXT_PROCESS_ROUTER];
1331
1332 stream = nxt_port_rpc_register_handler(task, router_port,
1333 nxt_router_listen_socket_ready,
1334 nxt_router_listen_socket_error,

--- 12 unchanged lines hidden (view full) ---

1347 nxt_router_conf_error(task, tmcf);
1348}
1349
1350
1351static void
1352nxt_router_listen_socket_ready(nxt_task_t *task, nxt_port_recv_msg_t *msg,
1353 void *data)
1354{
1367
1368 rt = task->thread->runtime;
1369 main_port = rt->port_by_type[NXT_PROCESS_MAIN];
1370 router_port = rt->port_by_type[NXT_PROCESS_ROUTER];
1371
1372 stream = nxt_port_rpc_register_handler(task, router_port,
1373 nxt_router_listen_socket_ready,
1374 nxt_router_listen_socket_error,

--- 12 unchanged lines hidden (view full) ---

1387 nxt_router_conf_error(task, tmcf);
1388}
1389
1390
1391static void
1392nxt_router_listen_socket_ready(nxt_task_t *task, nxt_port_recv_msg_t *msg,
1393 void *data)
1394{
1355 nxt_int_t ret;
1356 nxt_socket_t s;
1357 nxt_socket_rpc_t *rpc;
1358 nxt_router_socket_t *rtsk;
1395 nxt_int_t ret;
1396 nxt_socket_t s;
1397 nxt_socket_rpc_t *rpc;
1359
1360 rpc = data;
1361
1362 s = msg->fd;
1363
1364 ret = nxt_socket_nonblocking(task, s);
1365 if (nxt_slow_path(ret != NXT_OK)) {
1366 goto fail;
1367 }
1368
1398
1399 rpc = data;
1400
1401 s = msg->fd;
1402
1403 ret = nxt_socket_nonblocking(task, s);
1404 if (nxt_slow_path(ret != NXT_OK)) {
1405 goto fail;
1406 }
1407
1369 nxt_socket_defer_accept(task, s, rpc->socket_conf->sockaddr);
1408 nxt_socket_defer_accept(task, s, rpc->socket_conf->listen->sockaddr);
1370
1371 ret = nxt_listen_socket(task, s, NXT_LISTEN_BACKLOG);
1372 if (nxt_slow_path(ret != NXT_OK)) {
1373 goto fail;
1374 }
1375
1409
1410 ret = nxt_listen_socket(task, s, NXT_LISTEN_BACKLOG);
1411 if (nxt_slow_path(ret != NXT_OK)) {
1412 goto fail;
1413 }
1414
1376 rtsk = nxt_malloc(sizeof(nxt_router_socket_t));
1377 if (nxt_slow_path(rtsk == NULL)) {
1378 goto fail;
1379 }
1415 rpc->socket_conf->listen->socket = s;
1380
1416
1381 rtsk->count = 0;
1382 rtsk->fd = s;
1383
1384 rpc->socket_conf->listen.socket = s;
1385 rpc->socket_conf->socket = rtsk;
1386
1387 nxt_work_queue_add(&task->thread->engine->fast_work_queue,
1388 nxt_router_conf_apply, task, rpc->temp_conf, NULL);
1389
1390 return;
1391
1392fail:
1393
1394 nxt_socket_close(task, s);

--- 20 unchanged lines hidden (view full) ---

1415 nxt_string("ListenerPort"),
1416 nxt_string("ListenerInUse"),
1417 nxt_string("ListenerNoAddress"),
1418 nxt_string("ListenerNoAccess"),
1419 nxt_string("ListenerPath"),
1420 };
1421
1422 rpc = data;
1417 nxt_work_queue_add(&task->thread->engine->fast_work_queue,
1418 nxt_router_conf_apply, task, rpc->temp_conf, NULL);
1419
1420 return;
1421
1422fail:
1423
1424 nxt_socket_close(task, s);

--- 20 unchanged lines hidden (view full) ---

1445 nxt_string("ListenerPort"),
1446 nxt_string("ListenerInUse"),
1447 nxt_string("ListenerNoAddress"),
1448 nxt_string("ListenerNoAccess"),
1449 nxt_string("ListenerPath"),
1450 };
1451
1452 rpc = data;
1423 sa = rpc->socket_conf->sockaddr;
1453 sa = rpc->socket_conf->listen->sockaddr;
1424 tmcf = rpc->temp_conf;
1425
1426 in = nxt_buf_chk_make_plain(tmcf->mem_pool, msg->buf, msg->size);
1427
1428 nxt_assert(in != NULL);
1429
1430 p = in->mem.pos;
1431

--- 92 unchanged lines hidden (view full) ---

1524 return NXT_OK;
1525}
1526
1527
1528static nxt_int_t
1529nxt_router_engine_conf_create(nxt_router_temp_conf_t *tmcf,
1530 nxt_router_engine_conf_t *recf)
1531{
1454 tmcf = rpc->temp_conf;
1455
1456 in = nxt_buf_chk_make_plain(tmcf->mem_pool, msg->buf, msg->size);
1457
1458 nxt_assert(in != NULL);
1459
1460 p = in->mem.pos;
1461

--- 92 unchanged lines hidden (view full) ---

1554 return NXT_OK;
1555}
1556
1557
1558static nxt_int_t
1559nxt_router_engine_conf_create(nxt_router_temp_conf_t *tmcf,
1560 nxt_router_engine_conf_t *recf)
1561{
1532 nxt_int_t ret;
1533 nxt_thread_spinlock_t *lock;
1562 nxt_int_t ret;
1534
1535 ret = nxt_router_engine_joints_create(tmcf, recf, &tmcf->creating,
1536 nxt_router_listen_socket_create);
1537 if (nxt_slow_path(ret != NXT_OK)) {
1538 return ret;
1539 }
1540
1541 ret = nxt_router_engine_joints_create(tmcf, recf, &tmcf->updating,
1542 nxt_router_listen_socket_create);
1543 if (nxt_slow_path(ret != NXT_OK)) {
1544 return ret;
1545 }
1546
1563
1564 ret = nxt_router_engine_joints_create(tmcf, recf, &tmcf->creating,
1565 nxt_router_listen_socket_create);
1566 if (nxt_slow_path(ret != NXT_OK)) {
1567 return ret;
1568 }
1569
1570 ret = nxt_router_engine_joints_create(tmcf, recf, &tmcf->updating,
1571 nxt_router_listen_socket_create);
1572 if (nxt_slow_path(ret != NXT_OK)) {
1573 return ret;
1574 }
1575
1547 lock = &tmcf->conf->router->lock;
1548
1549 nxt_thread_spin_lock(lock);
1550
1551 nxt_router_engine_socket_count(&tmcf->creating);
1552 nxt_router_engine_socket_count(&tmcf->updating);
1553
1554 nxt_thread_spin_unlock(lock);
1555
1556 return ret;
1557}
1558
1559
1560static nxt_int_t
1561nxt_router_engine_conf_update(nxt_router_temp_conf_t *tmcf,
1562 nxt_router_engine_conf_t *recf)
1563{
1576 return ret;
1577}
1578
1579
1580static nxt_int_t
1581nxt_router_engine_conf_update(nxt_router_temp_conf_t *tmcf,
1582 nxt_router_engine_conf_t *recf)
1583{
1564 nxt_int_t ret;
1565 nxt_thread_spinlock_t *lock;
1584 nxt_int_t ret;
1566
1567 ret = nxt_router_engine_joints_create(tmcf, recf, &tmcf->creating,
1568 nxt_router_listen_socket_create);
1569 if (nxt_slow_path(ret != NXT_OK)) {
1570 return ret;
1571 }
1572
1573 ret = nxt_router_engine_joints_create(tmcf, recf, &tmcf->updating,
1574 nxt_router_listen_socket_update);
1575 if (nxt_slow_path(ret != NXT_OK)) {
1576 return ret;
1577 }
1578
1579 ret = nxt_router_engine_joints_delete(tmcf, recf, &tmcf->deleting);
1580 if (nxt_slow_path(ret != NXT_OK)) {
1581 return ret;
1582 }
1583
1585
1586 ret = nxt_router_engine_joints_create(tmcf, recf, &tmcf->creating,
1587 nxt_router_listen_socket_create);
1588 if (nxt_slow_path(ret != NXT_OK)) {
1589 return ret;
1590 }
1591
1592 ret = nxt_router_engine_joints_create(tmcf, recf, &tmcf->updating,
1593 nxt_router_listen_socket_update);
1594 if (nxt_slow_path(ret != NXT_OK)) {
1595 return ret;
1596 }
1597
1598 ret = nxt_router_engine_joints_delete(tmcf, recf, &tmcf->deleting);
1599 if (nxt_slow_path(ret != NXT_OK)) {
1600 return ret;
1601 }
1602
1584 lock = &tmcf->conf->router->lock;
1585
1586 nxt_thread_spin_lock(lock);
1587
1588 nxt_router_engine_socket_count(&tmcf->creating);
1589
1590 nxt_thread_spin_unlock(lock);
1591
1592 return ret;
1593}
1594
1595
1596static nxt_int_t
1597nxt_router_engine_conf_delete(nxt_router_temp_conf_t *tmcf,
1598 nxt_router_engine_conf_t *recf)
1599{

--- 59 unchanged lines hidden (view full) ---

1659
1660 joint->engine = recf->engine;
1661 }
1662
1663 return NXT_OK;
1664}
1665
1666
1603 return ret;
1604}
1605
1606
1607static nxt_int_t
1608nxt_router_engine_conf_delete(nxt_router_temp_conf_t *tmcf,
1609 nxt_router_engine_conf_t *recf)
1610{

--- 59 unchanged lines hidden (view full) ---

1670
1671 joint->engine = recf->engine;
1672 }
1673
1674 return NXT_OK;
1675}
1676
1677
1667static void
1668nxt_router_engine_socket_count(nxt_queue_t *sockets)
1669{
1670 nxt_queue_link_t *qlk;
1671 nxt_socket_conf_t *skcf;
1672
1673 for (qlk = nxt_queue_first(sockets);
1674 qlk != nxt_queue_tail(sockets);
1675 qlk = nxt_queue_next(qlk))
1676 {
1677 skcf = nxt_queue_link_data(qlk, nxt_socket_conf_t, link);
1678 skcf->socket->count++;
1679 }
1680}
1681
1682
1683static nxt_int_t
1684nxt_router_engine_quit(nxt_router_temp_conf_t *tmcf,
1685 nxt_router_engine_conf_t *recf)
1686{
1687 nxt_joint_job_t *job;
1688
1689 job = nxt_mp_get(tmcf->mem_pool, sizeof(nxt_joint_job_t));
1690 if (nxt_slow_path(job == NULL)) {

--- 245 unchanged lines hidden (view full) ---

1936 nxt_event_engine_start(engine);
1937}
1938
1939
1940static void
1941nxt_router_listen_socket_create(nxt_task_t *task, void *obj, void *data)
1942{
1943 nxt_joint_job_t *job;
1678static nxt_int_t
1679nxt_router_engine_quit(nxt_router_temp_conf_t *tmcf,
1680 nxt_router_engine_conf_t *recf)
1681{
1682 nxt_joint_job_t *job;
1683
1684 job = nxt_mp_get(tmcf->mem_pool, sizeof(nxt_joint_job_t));
1685 if (nxt_slow_path(job == NULL)) {

--- 245 unchanged lines hidden (view full) ---

1931 nxt_event_engine_start(engine);
1932}
1933
1934
1935static void
1936nxt_router_listen_socket_create(nxt_task_t *task, void *obj, void *data)
1937{
1938 nxt_joint_job_t *job;
1944 nxt_listen_event_t *listen;
1939 nxt_socket_conf_t *skcf;
1940 nxt_listen_event_t *lev;
1945 nxt_listen_socket_t *ls;
1941 nxt_listen_socket_t *ls;
1942 nxt_thread_spinlock_t *lock;
1946 nxt_socket_conf_joint_t *joint;
1947
1948 job = obj;
1949 joint = data;
1950
1943 nxt_socket_conf_joint_t *joint;
1944
1945 job = obj;
1946 joint = data;
1947
1951 ls = &joint->socket_conf->listen;
1952
1953 nxt_queue_insert_tail(&task->thread->engine->joints, &joint->link);
1954
1948 nxt_queue_insert_tail(&task->thread->engine->joints, &joint->link);
1949
1955 listen = nxt_listen_event(task, ls);
1956 if (nxt_slow_path(listen == NULL)) {
1957 nxt_router_listen_socket_release(task, joint);
1950 skcf = joint->socket_conf;
1951 ls = skcf->listen;
1952
1953 lev = nxt_listen_event(task, ls);
1954 if (nxt_slow_path(lev == NULL)) {
1955 nxt_router_listen_socket_release(task, skcf);
1958 return;
1959 }
1960
1956 return;
1957 }
1958
1961 listen->socket.data = joint;
1959 lev->socket.data = joint;
1962
1960
1961 lock = &skcf->router_conf->router->lock;
1962
1963 nxt_thread_spin_lock(lock);
1964 ls->count++;
1965 nxt_thread_spin_unlock(lock);
1966
1963 job->work.next = NULL;
1964 job->work.handler = nxt_router_conf_wait;
1965
1966 nxt_event_engine_post(job->tmcf->engine, &job->work);
1967}
1968
1969
1970nxt_inline nxt_listen_event_t *
1971nxt_router_listen_event(nxt_queue_t *listen_connections,
1972 nxt_socket_conf_t *skcf)
1973{
1974 nxt_socket_t fd;
1975 nxt_queue_link_t *qlk;
1967 job->work.next = NULL;
1968 job->work.handler = nxt_router_conf_wait;
1969
1970 nxt_event_engine_post(job->tmcf->engine, &job->work);
1971}
1972
1973
1974nxt_inline nxt_listen_event_t *
1975nxt_router_listen_event(nxt_queue_t *listen_connections,
1976 nxt_socket_conf_t *skcf)
1977{
1978 nxt_socket_t fd;
1979 nxt_queue_link_t *qlk;
1976 nxt_listen_event_t *listen;
1980 nxt_listen_event_t *lev;
1977
1981
1978 fd = skcf->socket->fd;
1982 fd = skcf->listen->socket;
1979
1980 for (qlk = nxt_queue_first(listen_connections);
1981 qlk != nxt_queue_tail(listen_connections);
1982 qlk = nxt_queue_next(qlk))
1983 {
1983
1984 for (qlk = nxt_queue_first(listen_connections);
1985 qlk != nxt_queue_tail(listen_connections);
1986 qlk = nxt_queue_next(qlk))
1987 {
1984 listen = nxt_queue_link_data(qlk, nxt_listen_event_t, link);
1988 lev = nxt_queue_link_data(qlk, nxt_listen_event_t, link);
1985
1989
1986 if (fd == listen->socket.fd) {
1987 return listen;
1990 if (fd == lev->socket.fd) {
1991 return lev;
1988 }
1989 }
1990
1991 return NULL;
1992}
1993
1994
1995static void
1996nxt_router_listen_socket_update(nxt_task_t *task, void *obj, void *data)
1997{
1998 nxt_joint_job_t *job;
1999 nxt_event_engine_t *engine;
1992 }
1993 }
1994
1995 return NULL;
1996}
1997
1998
1999static void
2000nxt_router_listen_socket_update(nxt_task_t *task, void *obj, void *data)
2001{
2002 nxt_joint_job_t *job;
2003 nxt_event_engine_t *engine;
2000 nxt_listen_event_t *listen;
2004 nxt_listen_event_t *lev;
2001 nxt_socket_conf_joint_t *joint, *old;
2002
2003 job = obj;
2004 joint = data;
2005
2006 engine = task->thread->engine;
2007
2008 nxt_queue_insert_tail(&engine->joints, &joint->link);
2009
2005 nxt_socket_conf_joint_t *joint, *old;
2006
2007 job = obj;
2008 joint = data;
2009
2010 engine = task->thread->engine;
2011
2012 nxt_queue_insert_tail(&engine->joints, &joint->link);
2013
2010 listen = nxt_router_listen_event(&engine->listen_connections,
2011 joint->socket_conf);
2014 lev = nxt_router_listen_event(&engine->listen_connections,
2015 joint->socket_conf);
2012
2016
2013 old = listen->socket.data;
2014 listen->socket.data = joint;
2015 listen->listen = &joint->socket_conf->listen;
2017 old = lev->socket.data;
2018 lev->socket.data = joint;
2019 lev->listen = joint->socket_conf->listen;
2016
2017 job->work.next = NULL;
2018 job->work.handler = nxt_router_conf_wait;
2019
2020 nxt_event_engine_post(job->tmcf->engine, &job->work);
2021
2022 /*
2023 * The task is allocated from configuration temporary

--- 4 unchanged lines hidden (view full) ---

2028}
2029
2030
2031static void
2032nxt_router_listen_socket_delete(nxt_task_t *task, void *obj, void *data)
2033{
2034 nxt_joint_job_t *job;
2035 nxt_socket_conf_t *skcf;
2020
2021 job->work.next = NULL;
2022 job->work.handler = nxt_router_conf_wait;
2023
2024 nxt_event_engine_post(job->tmcf->engine, &job->work);
2025
2026 /*
2027 * The task is allocated from configuration temporary

--- 4 unchanged lines hidden (view full) ---

2032}
2033
2034
2035static void
2036nxt_router_listen_socket_delete(nxt_task_t *task, void *obj, void *data)
2037{
2038 nxt_joint_job_t *job;
2039 nxt_socket_conf_t *skcf;
2036 nxt_listen_event_t *listen;
2040 nxt_listen_event_t *lev;
2037 nxt_event_engine_t *engine;
2038
2039 job = obj;
2040 skcf = data;
2041
2042 engine = task->thread->engine;
2043
2041 nxt_event_engine_t *engine;
2042
2043 job = obj;
2044 skcf = data;
2045
2046 engine = task->thread->engine;
2047
2044 listen = nxt_router_listen_event(&engine->listen_connections, skcf);
2048 lev = nxt_router_listen_event(&engine->listen_connections, skcf);
2045
2049
2046 nxt_fd_event_delete(engine, &listen->socket);
2050 nxt_fd_event_delete(engine, &lev->socket);
2047
2048 nxt_debug(task, "engine %p: listen socket delete: %d", engine,
2051
2052 nxt_debug(task, "engine %p: listen socket delete: %d", engine,
2049 listen->socket.fd);
2053 lev->socket.fd);
2050
2054
2051 listen->timer.handler = nxt_router_listen_socket_close;
2052 listen->timer.work_queue = &engine->fast_work_queue;
2055 lev->timer.handler = nxt_router_listen_socket_close;
2056 lev->timer.work_queue = &engine->fast_work_queue;
2053
2057
2054 nxt_timer_add(engine, &listen->timer, 0);
2058 nxt_timer_add(engine, &lev->timer, 0);
2055
2056 job->work.next = NULL;
2057 job->work.handler = nxt_router_conf_wait;
2058
2059 nxt_event_engine_post(job->tmcf->engine, &job->work);
2060}
2061
2062

--- 13 unchanged lines hidden (view full) ---

2076 }
2077}
2078
2079
2080static void
2081nxt_router_listen_socket_close(nxt_task_t *task, void *obj, void *data)
2082{
2083 nxt_timer_t *timer;
2059
2060 job->work.next = NULL;
2061 job->work.handler = nxt_router_conf_wait;
2062
2063 nxt_event_engine_post(job->tmcf->engine, &job->work);
2064}
2065
2066

--- 13 unchanged lines hidden (view full) ---

2080 }
2081}
2082
2083
2084static void
2085nxt_router_listen_socket_close(nxt_task_t *task, void *obj, void *data)
2086{
2087 nxt_timer_t *timer;
2084 nxt_listen_event_t *listen;
2088 nxt_listen_event_t *lev;
2085 nxt_socket_conf_joint_t *joint;
2086
2087 timer = obj;
2089 nxt_socket_conf_joint_t *joint;
2090
2091 timer = obj;
2088 listen = nxt_timer_data(timer, nxt_listen_event_t, timer);
2089 joint = listen->socket.data;
2092 lev = nxt_timer_data(timer, nxt_listen_event_t, timer);
2093 joint = lev->socket.data;
2090
2091 nxt_debug(task, "engine %p: listen socket close: %d", task->thread->engine,
2094
2095 nxt_debug(task, "engine %p: listen socket close: %d", task->thread->engine,
2092 listen->socket.fd);
2096 lev->socket.fd);
2093
2097
2094 nxt_queue_remove(&listen->link);
2098 nxt_queue_remove(&lev->link);
2095
2099
2096 /* 'task' refers to listen->task and we cannot use after nxt_free() */
2100 /* 'task' refers to lev->task and we cannot use after nxt_free() */
2097 task = &task->thread->engine->task;
2098
2101 task = &task->thread->engine->task;
2102
2099 nxt_free(listen);
2103 nxt_router_listen_socket_release(task, joint->socket_conf);
2100
2104
2101 nxt_router_listen_socket_release(task, joint);
2105 nxt_free(lev);
2106
2107 nxt_router_conf_release(task, joint);
2102}
2103
2104
2105static void
2108}
2109
2110
2111static void
2106nxt_router_listen_socket_release(nxt_task_t *task,
2107 nxt_socket_conf_joint_t *joint)
2112nxt_router_listen_socket_release(nxt_task_t *task, nxt_socket_conf_t *skcf)
2108{
2113{
2109 nxt_socket_conf_t *skcf;
2110 nxt_router_socket_t *rtsk;
2114 nxt_listen_socket_t *ls;
2111 nxt_thread_spinlock_t *lock;
2112
2115 nxt_thread_spinlock_t *lock;
2116
2113 skcf = joint->socket_conf;
2114 rtsk = skcf->socket;
2117 ls = skcf->listen;
2115 lock = &skcf->router_conf->router->lock;
2116
2117 nxt_thread_spin_lock(lock);
2118
2118 lock = &skcf->router_conf->router->lock;
2119
2120 nxt_thread_spin_lock(lock);
2121
2119 nxt_debug(task, "engine %p: listen socket release: rtsk->count %D",
2120 task->thread->engine, rtsk->count);
2122 nxt_debug(task, "engine %p: listen socket release: ls->count %D",
2123 task->thread->engine, ls->count);
2121
2124
2122 if (--rtsk->count != 0) {
2123 rtsk = NULL;
2125 if (--ls->count != 0) {
2126 ls = NULL;
2124 }
2125
2126 nxt_thread_spin_unlock(lock);
2127
2127 }
2128
2129 nxt_thread_spin_unlock(lock);
2130
2128 if (rtsk != NULL) {
2129 nxt_socket_close(task, rtsk->fd);
2130 nxt_free(rtsk);
2131 skcf->socket = NULL;
2131 if (ls != NULL) {
2132 nxt_socket_close(task, ls->socket);
2133 nxt_free(ls);
2132 }
2134 }
2133
2134 nxt_router_conf_release(task, joint);
2135}
2136
2137
2138static void
2139nxt_router_conf_release(nxt_task_t *task, nxt_socket_conf_joint_t *joint)
2140{
2141 nxt_bool_t exit;
2142 nxt_socket_conf_t *skcf;

--- 113 unchanged lines hidden (view full) ---

2256};
2257
2258
2259static void
2260nxt_router_conn_init(nxt_task_t *task, void *obj, void *data)
2261{
2262 size_t size;
2263 nxt_conn_t *c;
2135}
2136
2137
2138static void
2139nxt_router_conf_release(nxt_task_t *task, nxt_socket_conf_joint_t *joint)
2140{
2141 nxt_bool_t exit;
2142 nxt_socket_conf_t *skcf;

--- 113 unchanged lines hidden (view full) ---

2256};
2257
2258
2259static void
2260nxt_router_conn_init(nxt_task_t *task, void *obj, void *data)
2261{
2262 size_t size;
2263 nxt_conn_t *c;
2264 nxt_socket_conf_t *skcf;
2264 nxt_event_engine_t *engine;
2265 nxt_socket_conf_joint_t *joint;
2266
2267 c = obj;
2268 joint = data;
2269
2270 nxt_debug(task, "router conn init");
2271
2265 nxt_event_engine_t *engine;
2266 nxt_socket_conf_joint_t *joint;
2267
2268 c = obj;
2269 joint = data;
2270
2271 nxt_debug(task, "router conn init");
2272
2273 c->joint = joint;
2272 joint->count++;
2273
2274 joint->count++;
2275
2274 size = joint->socket_conf->header_buffer_size;
2276 skcf = joint->socket_conf;
2277 c->local = skcf->sockaddr;
2278
2279 size = skcf->header_buffer_size;
2275 c->read = nxt_buf_mem_alloc(c->mem_pool, size, 0);
2276
2277 c->socket.data = NULL;
2278
2279 engine = task->thread->engine;
2280 c->read_work_queue = &engine->fast_work_queue;
2281 c->write_work_queue = &engine->fast_work_queue;
2282

--- 497 unchanged lines hidden (view full) ---

2780 nxt_port_t *port;
2781 nxt_event_engine_t *engine;
2782 nxt_socket_conf_joint_t *joint;
2783
2784 port = NULL;
2785 use_delta = 1;
2786 c = ra->rc->conn;
2787
2280 c->read = nxt_buf_mem_alloc(c->mem_pool, size, 0);
2281
2282 c->socket.data = NULL;
2283
2284 engine = task->thread->engine;
2285 c->read_work_queue = &engine->fast_work_queue;
2286 c->write_work_queue = &engine->fast_work_queue;
2287

--- 497 unchanged lines hidden (view full) ---

2785 nxt_port_t *port;
2786 nxt_event_engine_t *engine;
2787 nxt_socket_conf_joint_t *joint;
2788
2789 port = NULL;
2790 use_delta = 1;
2791 c = ra->rc->conn;
2792
2788 joint = c->listen->socket.data;
2793 joint = c->joint;
2789 app = joint->socket_conf->application;
2790
2791 if (app == NULL) {
2792 nxt_router_gen_error(task, c, 500,
2793 "Application is NULL in socket_conf");
2794 return NXT_ERROR;
2795 }
2796

--- 84 unchanged lines hidden (view full) ---

2881 nxt_app_parse_ctx_t *ap;
2882 nxt_app_request_body_t *b;
2883 nxt_socket_conf_joint_t *joint;
2884 nxt_app_request_header_t *h;
2885
2886 c = obj;
2887 ap = data;
2888 buf = c->read;
2794 app = joint->socket_conf->application;
2795
2796 if (app == NULL) {
2797 nxt_router_gen_error(task, c, 500,
2798 "Application is NULL in socket_conf");
2799 return NXT_ERROR;
2800 }
2801

--- 84 unchanged lines hidden (view full) ---

2886 nxt_app_parse_ctx_t *ap;
2887 nxt_app_request_body_t *b;
2888 nxt_socket_conf_joint_t *joint;
2889 nxt_app_request_header_t *h;
2890
2891 c = obj;
2892 ap = data;
2893 buf = c->read;
2889 joint = c->listen->socket.data;
2894 joint = c->joint;
2890
2891 nxt_debug(task, "router conn http header parse");
2892
2893 if (ap == NULL) {
2894 ap = nxt_app_http_req_init(task);
2895 if (nxt_slow_path(ap == NULL)) {
2896 nxt_router_gen_error(task, c, 500,
2897 "Failed to allocate parse context");
2898 return;
2899 }
2900
2901 c->socket.data = ap;
2902
2903 ap->r.remote.start = nxt_sockaddr_address(c->remote);
2904 ap->r.remote.length = c->remote->address_length;
2905
2895
2896 nxt_debug(task, "router conn http header parse");
2897
2898 if (ap == NULL) {
2899 ap = nxt_app_http_req_init(task);
2900 if (nxt_slow_path(ap == NULL)) {
2901 nxt_router_gen_error(task, c, 500,
2902 "Failed to allocate parse context");
2903 return;
2904 }
2905
2906 c->socket.data = ap;
2907
2908 ap->r.remote.start = nxt_sockaddr_address(c->remote);
2909 ap->r.remote.length = c->remote->address_length;
2910
2906 local = joint->socket_conf->sockaddr;
2907 ap->r.local.start = nxt_sockaddr_address(local);
2908 ap->r.local.length = local->address_length;
2911 /*
2912 * TODO: need an application flag to get local address
2913 * required by "SERVER_ADDR" in Pyhton and PHP. Not used in Go.
2914 */
2915 local = nxt_router_local_addr(task, c);
2909
2916
2917 if (nxt_fast_path(local != NULL)) {
2918 ap->r.local.start = nxt_sockaddr_address(local);
2919 ap->r.local.length = local->address_length;
2920 }
2921
2910 ap->r.header.buf = buf;
2911 }
2912
2913 h = &ap->r.header;
2914 b = &ap->r.body;
2915
2916 ret = nxt_app_http_req_header_parse(task, ap, buf);
2917

--- 77 unchanged lines hidden (view full) ---

2995 }
2996
2997 }
2998
2999 nxt_conn_read(task->thread->engine, c);
3000}
3001
3002
2922 ap->r.header.buf = buf;
2923 }
2924
2925 h = &ap->r.header;
2926 b = &ap->r.body;
2927
2928 ret = nxt_app_http_req_header_parse(task, ap, buf);
2929

--- 77 unchanged lines hidden (view full) ---

3007 }
3008
3009 }
3010
3011 nxt_conn_read(task->thread->engine, c);
3012}
3013
3014
3015static nxt_sockaddr_t *
3016nxt_router_local_addr(nxt_task_t *task, nxt_conn_t *c)
3017{
3018 int ret;
3019 size_t size;
3020 socklen_t socklen;
3021 nxt_sockaddr_t *sa;
3022
3023 if (c->local != NULL) {
3024 return c->local;
3025 }
3026
3027 /* AF_UNIX should not get in here. */
3028
3029 switch (c->remote->u.sockaddr.sa_family) {
3030#if (NXT_INET6)
3031 case AF_INET6:
3032 socklen = sizeof(struct sockaddr_in6);
3033 size = offsetof(nxt_sockaddr_t, u) + socklen + NXT_INET6_ADDR_STR_LEN;
3034 break;
3035#endif
3036 case AF_INET:
3037 default:
3038 socklen = sizeof(struct sockaddr_in6);
3039 size = offsetof(nxt_sockaddr_t, u) + socklen + NXT_INET_ADDR_STR_LEN;
3040 break;
3041 }
3042
3043 sa = nxt_mp_get(c->mem_pool, size);
3044 if (nxt_slow_path(sa == NULL)) {
3045 return NULL;
3046 }
3047
3048 ret = getsockname(c->socket.fd, &sa->u.sockaddr, &socklen);
3049 if (nxt_slow_path(ret != 0)) {
3050 nxt_log(task, NXT_LOG_CRIT, "getsockname(%d) failed", c->socket.fd);
3051 return NULL;
3052 }
3053
3054 c->local = sa;
3055
3056 nxt_sockaddr_text(sa);
3057
3058 /*
3059 * TODO: here we can adjust the end of non-freeable block
3060 * in c->mem_pool to the end of actual sockaddr length.
3061 */
3062
3063 return sa;
3064}
3065
3066
3003static void
3004nxt_router_conn_http_body_read(nxt_task_t *task, void *obj, void *data)
3005{
3006 size_t size;
3007 nxt_int_t ret;
3008 nxt_buf_t *buf;
3009 nxt_conn_t *c;
3010 nxt_app_parse_ctx_t *ap;

--- 24 unchanged lines hidden (view full) ---

3035
3036 case NXT_ERROR:
3037 nxt_router_gen_error(task, c, 500, "Read body error");
3038 return;
3039
3040 default: /* NXT_AGAIN */
3041
3042 if (nxt_buf_mem_free_size(&buf->mem) == 0) {
3067static void
3068nxt_router_conn_http_body_read(nxt_task_t *task, void *obj, void *data)
3069{
3070 size_t size;
3071 nxt_int_t ret;
3072 nxt_buf_t *buf;
3073 nxt_conn_t *c;
3074 nxt_app_parse_ctx_t *ap;

--- 24 unchanged lines hidden (view full) ---

3099
3100 case NXT_ERROR:
3101 nxt_router_gen_error(task, c, 500, "Read body error");
3102 return;
3103
3104 default: /* NXT_AGAIN */
3105
3106 if (nxt_buf_mem_free_size(&buf->mem) == 0) {
3043 joint = c->listen->socket.data;
3107 joint = c->joint;
3044
3045 b->preread_size += nxt_buf_mem_used_size(&buf->mem);
3046
3047 size = nxt_min(joint->socket_conf->body_buffer_size,
3048 (size_t) h->parsed_content_length - b->preread_size);
3049
3050 buf->next = nxt_buf_mem_alloc(c->mem_pool, size, 0);
3051 if (nxt_slow_path(buf->next == NULL)) {

--- 528 unchanged lines hidden (view full) ---

3580 } nxt_queue_loop;
3581
3582 nxt_queue_remove(&c->link);
3583
3584 engine = task->thread->engine;
3585
3586 nxt_sockaddr_cache_free(engine, c);
3587
3108
3109 b->preread_size += nxt_buf_mem_used_size(&buf->mem);
3110
3111 size = nxt_min(joint->socket_conf->body_buffer_size,
3112 (size_t) h->parsed_content_length - b->preread_size);
3113
3114 buf->next = nxt_buf_mem_alloc(c->mem_pool, size, 0);
3115 if (nxt_slow_path(buf->next == NULL)) {

--- 528 unchanged lines hidden (view full) ---

3644 } nxt_queue_loop;
3645
3646 nxt_queue_remove(&c->link);
3647
3648 engine = task->thread->engine;
3649
3650 nxt_sockaddr_cache_free(engine, c);
3651
3588 joint = c->listen->socket.data;
3652 joint = c->joint;
3589
3590 nxt_mp_cleanup(c->mem_pool, nxt_router_conn_mp_cleanup,
3591 &engine->task, joint, NULL);
3592
3593 nxt_mp_release(c->mem_pool, c);
3594}
3595
3596

--- 51 unchanged lines hidden (view full) ---

3648}
3649
3650
3651static nxt_msec_t
3652nxt_router_conn_timeout_value(nxt_conn_t *c, uintptr_t data)
3653{
3654 nxt_socket_conf_joint_t *joint;
3655
3653
3654 nxt_mp_cleanup(c->mem_pool, nxt_router_conn_mp_cleanup,
3655 &engine->task, joint, NULL);
3656
3657 nxt_mp_release(c->mem_pool, c);
3658}
3659
3660

--- 51 unchanged lines hidden (view full) ---

3712}
3713
3714
3715static nxt_msec_t
3716nxt_router_conn_timeout_value(nxt_conn_t *c, uintptr_t data)
3717{
3718 nxt_socket_conf_joint_t *joint;
3719
3656 joint = c->listen->socket.data;
3720 joint = c->joint;
3657
3658 return nxt_value_at(nxt_msec_t, joint->socket_conf, data);
3659}
3721
3722 return nxt_value_at(nxt_msec_t, joint->socket_conf, data);
3723}