nxt_router.c (1488:6976d36be926) nxt_router.c (1509:2a3a06bc2684)
1
2/*
3 * Copyright (C) Igor Sysoev
4 * Copyright (C) Valentin V. Bartenev
5 * Copyright (C) NGINX, Inc.
6 */
7
8#include <nxt_router.h>

--- 283 unchanged lines hidden (view full) ---

292 .restart = 1,
293 .setup = nxt_process_core_setup,
294 .start = nxt_router_start,
295 .port_handlers = &nxt_router_process_port_handlers,
296 .signals = nxt_process_signals,
297};
298
299
1
2/*
3 * Copyright (C) Igor Sysoev
4 * Copyright (C) Valentin V. Bartenev
5 * Copyright (C) NGINX, Inc.
6 */
7
8#include <nxt_router.h>

--- 283 unchanged lines hidden (view full) ---

292 .restart = 1,
293 .setup = nxt_process_core_setup,
294 .start = nxt_router_start,
295 .port_handlers = &nxt_router_process_port_handlers,
296 .signals = nxt_process_signals,
297};
298
299
300/* Queues of nxt_socket_conf_t */
301nxt_queue_t creating_sockets;
302nxt_queue_t pending_sockets;
303nxt_queue_t updating_sockets;
304nxt_queue_t keeping_sockets;
305nxt_queue_t deleting_sockets;
306
307
300static nxt_int_t
301nxt_router_prefork(nxt_task_t *task, nxt_process_t *process, nxt_mp_t *mp)
302{
303 nxt_runtime_stop_app_processes(task, task->thread->runtime);
304
305 return NXT_OK;
306}
307

--- 714 unchanged lines hidden (view full) ---

1022 tmcf->engine = task->thread->engine;
1023
1024 tmcf->engines = nxt_array_create(tmcf->mem_pool, 4,
1025 sizeof(nxt_router_engine_conf_t));
1026 if (nxt_slow_path(tmcf->engines == NULL)) {
1027 goto temp_fail;
1028 }
1029
308static nxt_int_t
309nxt_router_prefork(nxt_task_t *task, nxt_process_t *process, nxt_mp_t *mp)
310{
311 nxt_runtime_stop_app_processes(task, task->thread->runtime);
312
313 return NXT_OK;
314}
315

--- 714 unchanged lines hidden (view full) ---

1030 tmcf->engine = task->thread->engine;
1031
1032 tmcf->engines = nxt_array_create(tmcf->mem_pool, 4,
1033 sizeof(nxt_router_engine_conf_t));
1034 if (nxt_slow_path(tmcf->engines == NULL)) {
1035 goto temp_fail;
1036 }
1037
1030 nxt_queue_init(&tmcf->deleting);
1031 nxt_queue_init(&tmcf->keeping);
1032 nxt_queue_init(&tmcf->updating);
1033 nxt_queue_init(&tmcf->pending);
1034 nxt_queue_init(&tmcf->creating);
1038 nxt_queue_init(&creating_sockets);
1039 nxt_queue_init(&pending_sockets);
1040 nxt_queue_init(&updating_sockets);
1041 nxt_queue_init(&keeping_sockets);
1042 nxt_queue_init(&deleting_sockets);
1035
1036#if (NXT_TLS)
1037 nxt_queue_init(&tmcf->tls);
1038#endif
1039
1040 nxt_queue_init(&tmcf->apps);
1041 nxt_queue_init(&tmcf->previous);
1042

--- 40 unchanged lines hidden (view full) ---

1083 nxt_router_temp_conf_t *tmcf;
1084 const nxt_event_interface_t *interface;
1085#if (NXT_TLS)
1086 nxt_router_tlssock_t *tls;
1087#endif
1088
1089 tmcf = obj;
1090
1043
1044#if (NXT_TLS)
1045 nxt_queue_init(&tmcf->tls);
1046#endif
1047
1048 nxt_queue_init(&tmcf->apps);
1049 nxt_queue_init(&tmcf->previous);
1050

--- 40 unchanged lines hidden (view full) ---

1091 nxt_router_temp_conf_t *tmcf;
1092 const nxt_event_interface_t *interface;
1093#if (NXT_TLS)
1094 nxt_router_tlssock_t *tls;
1095#endif
1096
1097 tmcf = obj;
1098
1091 qlk = nxt_queue_first(&tmcf->pending);
1099 qlk = nxt_queue_first(&pending_sockets);
1092
1100
1093 if (qlk != nxt_queue_tail(&tmcf->pending)) {
1101 if (qlk != nxt_queue_tail(&pending_sockets)) {
1094 nxt_queue_remove(qlk);
1102 nxt_queue_remove(qlk);
1095 nxt_queue_insert_tail(&tmcf->creating, qlk);
1103 nxt_queue_insert_tail(&creating_sockets, qlk);
1096
1097 skcf = nxt_queue_link_data(qlk, nxt_socket_conf_t, link);
1098
1099 nxt_router_listen_socket_rpc_create(task, tmcf, skcf);
1100
1101 return;
1102 }
1103

--- 41 unchanged lines hidden (view full) ---

1145 if (nxt_slow_path(ret != NXT_OK)) {
1146 goto fail;
1147 }
1148
1149 nxt_router_apps_sort(task, router, tmcf);
1150
1151 nxt_router_engines_post(router, tmcf);
1152
1104
1105 skcf = nxt_queue_link_data(qlk, nxt_socket_conf_t, link);
1106
1107 nxt_router_listen_socket_rpc_create(task, tmcf, skcf);
1108
1109 return;
1110 }
1111

--- 41 unchanged lines hidden (view full) ---

1153 if (nxt_slow_path(ret != NXT_OK)) {
1154 goto fail;
1155 }
1156
1157 nxt_router_apps_sort(task, router, tmcf);
1158
1159 nxt_router_engines_post(router, tmcf);
1160
1153 nxt_queue_add(&router->sockets, &tmcf->updating);
1154 nxt_queue_add(&router->sockets, &tmcf->creating);
1161 nxt_queue_add(&router->sockets, &updating_sockets);
1162 nxt_queue_add(&router->sockets, &creating_sockets);
1155
1156 router->access_log = rtcf->access_log;
1157
1158 nxt_router_conf_ready(task, tmcf);
1159
1160 return;
1161
1162fail:

--- 17 unchanged lines hidden (view full) ---

1180
1181static void
1182nxt_router_conf_ready(nxt_task_t *task, nxt_router_temp_conf_t *tmcf)
1183{
1184 nxt_debug(task, "temp conf count:%D", tmcf->count);
1185
1186 if (--tmcf->count == 0) {
1187 nxt_router_conf_send(task, tmcf, NXT_PORT_MSG_RPC_READY_LAST);
1163
1164 router->access_log = rtcf->access_log;
1165
1166 nxt_router_conf_ready(task, tmcf);
1167
1168 return;
1169
1170fail:

--- 17 unchanged lines hidden (view full) ---

1188
1189static void
1190nxt_router_conf_ready(nxt_task_t *task, nxt_router_temp_conf_t *tmcf)
1191{
1192 nxt_debug(task, "temp conf count:%D", tmcf->count);
1193
1194 if (--tmcf->count == 0) {
1195 nxt_router_conf_send(task, tmcf, NXT_PORT_MSG_RPC_READY_LAST);
1196
1197 nxt_mp_destroy(tmcf->mem_pool);
1188 }
1189}
1190
1191
1192static void
1193nxt_router_conf_error(nxt_task_t *task, nxt_router_temp_conf_t *tmcf)
1194{
1195 nxt_app_t *app;
1196 nxt_queue_t new_socket_confs;
1197 nxt_socket_t s;
1198 nxt_router_t *router;
1199 nxt_queue_link_t *qlk;
1200 nxt_socket_conf_t *skcf;
1201 nxt_router_conf_t *rtcf;
1202
1203 nxt_alert(task, "failed to apply new conf");
1204
1198 }
1199}
1200
1201
1202static void
1203nxt_router_conf_error(nxt_task_t *task, nxt_router_temp_conf_t *tmcf)
1204{
1205 nxt_app_t *app;
1206 nxt_queue_t new_socket_confs;
1207 nxt_socket_t s;
1208 nxt_router_t *router;
1209 nxt_queue_link_t *qlk;
1210 nxt_socket_conf_t *skcf;
1211 nxt_router_conf_t *rtcf;
1212
1213 nxt_alert(task, "failed to apply new conf");
1214
1205 for (qlk = nxt_queue_first(&tmcf->creating);
1206 qlk != nxt_queue_tail(&tmcf->creating);
1215 for (qlk = nxt_queue_first(&creating_sockets);
1216 qlk != nxt_queue_tail(&creating_sockets);
1207 qlk = nxt_queue_next(qlk))
1208 {
1209 skcf = nxt_queue_link_data(qlk, nxt_socket_conf_t, link);
1210 s = skcf->listen->socket;
1211
1212 if (s != -1) {
1213 nxt_socket_close(task, s);
1214 }
1215
1216 nxt_free(skcf->listen);
1217 }
1218
1219 nxt_queue_init(&new_socket_confs);
1217 qlk = nxt_queue_next(qlk))
1218 {
1219 skcf = nxt_queue_link_data(qlk, nxt_socket_conf_t, link);
1220 s = skcf->listen->socket;
1221
1222 if (s != -1) {
1223 nxt_socket_close(task, s);
1224 }
1225
1226 nxt_free(skcf->listen);
1227 }
1228
1229 nxt_queue_init(&new_socket_confs);
1220 nxt_queue_add(&new_socket_confs, &tmcf->updating);
1221 nxt_queue_add(&new_socket_confs, &tmcf->pending);
1222 nxt_queue_add(&new_socket_confs, &tmcf->creating);
1230 nxt_queue_add(&new_socket_confs, &updating_sockets);
1231 nxt_queue_add(&new_socket_confs, &pending_sockets);
1232 nxt_queue_add(&new_socket_confs, &creating_sockets);
1223
1224 rtcf = tmcf->router_conf;
1225
1226 nxt_http_routes_cleanup(task, rtcf->routes);
1227
1228 nxt_queue_each(skcf, &new_socket_confs, nxt_socket_conf_t, link) {
1229
1230 if (skcf->action != NULL) {

--- 5 unchanged lines hidden (view full) ---

1236 nxt_queue_each(app, &tmcf->apps, nxt_app_t, link) {
1237
1238 nxt_router_app_unlink(task, app);
1239
1240 } nxt_queue_loop;
1241
1242 router = rtcf->router;
1243
1233
1234 rtcf = tmcf->router_conf;
1235
1236 nxt_http_routes_cleanup(task, rtcf->routes);
1237
1238 nxt_queue_each(skcf, &new_socket_confs, nxt_socket_conf_t, link) {
1239
1240 if (skcf->action != NULL) {

--- 5 unchanged lines hidden (view full) ---

1246 nxt_queue_each(app, &tmcf->apps, nxt_app_t, link) {
1247
1248 nxt_router_app_unlink(task, app);
1249
1250 } nxt_queue_loop;
1251
1252 router = rtcf->router;
1253
1244 nxt_queue_add(&router->sockets, &tmcf->keeping);
1245 nxt_queue_add(&router->sockets, &tmcf->deleting);
1254 nxt_queue_add(&router->sockets, &keeping_sockets);
1255 nxt_queue_add(&router->sockets, &deleting_sockets);
1246
1247 nxt_queue_add(&router->apps, &tmcf->previous);
1248
1249 // TODO: new engines and threads
1250
1251 nxt_router_access_log_release(task, &router->lock, rtcf->access_log);
1252
1253 nxt_mp_destroy(rtcf->mem_pool);
1254
1255 nxt_router_conf_send(task, tmcf, NXT_PORT_MSG_RPC_ERROR);
1256
1257 nxt_queue_add(&router->apps, &tmcf->previous);
1258
1259 // TODO: new engines and threads
1260
1261 nxt_router_access_log_release(task, &router->lock, rtcf->access_log);
1262
1263 nxt_mp_destroy(rtcf->mem_pool);
1264
1265 nxt_router_conf_send(task, tmcf, NXT_PORT_MSG_RPC_ERROR);
1266
1267 nxt_mp_destroy(tmcf->mem_pool);
1256}
1257
1258
1259static void
1260nxt_router_conf_send(nxt_task_t *task, nxt_router_temp_conf_t *tmcf,
1261 nxt_port_msg_type_t type)
1262{
1263 nxt_port_socket_write(task, tmcf->port, type, -1, tmcf->stream, 0, NULL);

--- 633 unchanged lines hidden (view full) ---

1897 + sizeof(nxt_router_access_log_t);
1898
1899 nxt_memcpy(access_log->path.start, path.start, path.length);
1900 }
1901
1902 tmcf->router_conf->access_log = access_log;
1903 }
1904
1268}
1269
1270
1271static void
1272nxt_router_conf_send(nxt_task_t *task, nxt_router_temp_conf_t *tmcf,
1273 nxt_port_msg_type_t type)
1274{
1275 nxt_port_socket_write(task, tmcf->port, type, -1, tmcf->stream, 0, NULL);

--- 633 unchanged lines hidden (view full) ---

1909 + sizeof(nxt_router_access_log_t);
1910
1911 nxt_memcpy(access_log->path.start, path.start, path.length);
1912 }
1913
1914 tmcf->router_conf->access_log = access_log;
1915 }
1916
1905 nxt_queue_add(&tmcf->deleting, &router->sockets);
1917 nxt_queue_add(&deleting_sockets, &router->sockets);
1906 nxt_queue_init(&router->sockets);
1907
1908 return NXT_OK;
1909
1910app_fail:
1911
1912 nxt_mp_destroy(app_mp);
1913

--- 222 unchanged lines hidden (view full) ---

2136 qlk = nxt_queue_next(qlk))
2137 {
2138 skcf = nxt_queue_link_data(qlk, nxt_socket_conf_t, link);
2139
2140 if (nxt_sockaddr_cmp(skcf->listen->sockaddr, sa)) {
2141 nskcf->listen = skcf->listen;
2142
2143 nxt_queue_remove(qlk);
1918 nxt_queue_init(&router->sockets);
1919
1920 return NXT_OK;
1921
1922app_fail:
1923
1924 nxt_mp_destroy(app_mp);
1925

--- 222 unchanged lines hidden (view full) ---

2148 qlk = nxt_queue_next(qlk))
2149 {
2150 skcf = nxt_queue_link_data(qlk, nxt_socket_conf_t, link);
2151
2152 if (nxt_sockaddr_cmp(skcf->listen->sockaddr, sa)) {
2153 nskcf->listen = skcf->listen;
2154
2155 nxt_queue_remove(qlk);
2144 nxt_queue_insert_tail(&tmcf->keeping, qlk);
2156 nxt_queue_insert_tail(&keeping_sockets, qlk);
2145
2157
2146 nxt_queue_insert_tail(&tmcf->updating, &nskcf->link);
2158 nxt_queue_insert_tail(&updating_sockets, &nskcf->link);
2147
2148 return NXT_OK;
2149 }
2150 }
2151
2159
2160 return NXT_OK;
2161 }
2162 }
2163
2152 nxt_queue_insert_tail(&tmcf->pending, &nskcf->link);
2164 nxt_queue_insert_tail(&pending_sockets, &nskcf->link);
2153
2154 return NXT_DECLINED;
2155}
2156
2157
2158static void
2159nxt_router_listen_socket_rpc_create(nxt_task_t *task,
2160 nxt_router_temp_conf_t *tmcf, nxt_socket_conf_t *skcf)

--- 411 unchanged lines hidden (view full) ---

2572
2573
2574static nxt_int_t
2575nxt_router_engine_conf_create(nxt_router_temp_conf_t *tmcf,
2576 nxt_router_engine_conf_t *recf)
2577{
2578 nxt_int_t ret;
2579
2165
2166 return NXT_DECLINED;
2167}
2168
2169
2170static void
2171nxt_router_listen_socket_rpc_create(nxt_task_t *task,
2172 nxt_router_temp_conf_t *tmcf, nxt_socket_conf_t *skcf)

--- 411 unchanged lines hidden (view full) ---

2584
2585
2586static nxt_int_t
2587nxt_router_engine_conf_create(nxt_router_temp_conf_t *tmcf,
2588 nxt_router_engine_conf_t *recf)
2589{
2590 nxt_int_t ret;
2591
2580 ret = nxt_router_engine_joints_create(tmcf, recf, &tmcf->creating,
2592 ret = nxt_router_engine_joints_create(tmcf, recf, &creating_sockets,
2581 nxt_router_listen_socket_create);
2582 if (nxt_slow_path(ret != NXT_OK)) {
2583 return ret;
2584 }
2585
2593 nxt_router_listen_socket_create);
2594 if (nxt_slow_path(ret != NXT_OK)) {
2595 return ret;
2596 }
2597
2586 ret = nxt_router_engine_joints_create(tmcf, recf, &tmcf->updating,
2598 ret = nxt_router_engine_joints_create(tmcf, recf, &updating_sockets,
2587 nxt_router_listen_socket_create);
2588 if (nxt_slow_path(ret != NXT_OK)) {
2589 return ret;
2590 }
2591
2592 return ret;
2593}
2594
2595
2596static nxt_int_t
2597nxt_router_engine_conf_update(nxt_router_temp_conf_t *tmcf,
2598 nxt_router_engine_conf_t *recf)
2599{
2600 nxt_int_t ret;
2601
2599 nxt_router_listen_socket_create);
2600 if (nxt_slow_path(ret != NXT_OK)) {
2601 return ret;
2602 }
2603
2604 return ret;
2605}
2606
2607
2608static nxt_int_t
2609nxt_router_engine_conf_update(nxt_router_temp_conf_t *tmcf,
2610 nxt_router_engine_conf_t *recf)
2611{
2612 nxt_int_t ret;
2613
2602 ret = nxt_router_engine_joints_create(tmcf, recf, &tmcf->creating,
2614 ret = nxt_router_engine_joints_create(tmcf, recf, &creating_sockets,
2603 nxt_router_listen_socket_create);
2604 if (nxt_slow_path(ret != NXT_OK)) {
2605 return ret;
2606 }
2607
2615 nxt_router_listen_socket_create);
2616 if (nxt_slow_path(ret != NXT_OK)) {
2617 return ret;
2618 }
2619
2608 ret = nxt_router_engine_joints_create(tmcf, recf, &tmcf->updating,
2620 ret = nxt_router_engine_joints_create(tmcf, recf, &updating_sockets,
2609 nxt_router_listen_socket_update);
2610 if (nxt_slow_path(ret != NXT_OK)) {
2611 return ret;
2612 }
2613
2621 nxt_router_listen_socket_update);
2622 if (nxt_slow_path(ret != NXT_OK)) {
2623 return ret;
2624 }
2625
2614 ret = nxt_router_engine_joints_delete(tmcf, recf, &tmcf->deleting);
2626 ret = nxt_router_engine_joints_delete(tmcf, recf, &deleting_sockets);
2615 if (nxt_slow_path(ret != NXT_OK)) {
2616 return ret;
2617 }
2618
2619 return ret;
2620}
2621
2622
2623static nxt_int_t
2624nxt_router_engine_conf_delete(nxt_router_temp_conf_t *tmcf,
2625 nxt_router_engine_conf_t *recf)
2626{
2627 nxt_int_t ret;
2628
2629 ret = nxt_router_engine_quit(tmcf, recf);
2630 if (nxt_slow_path(ret != NXT_OK)) {
2631 return ret;
2632 }
2633
2627 if (nxt_slow_path(ret != NXT_OK)) {
2628 return ret;
2629 }
2630
2631 return ret;
2632}
2633
2634
2635static nxt_int_t
2636nxt_router_engine_conf_delete(nxt_router_temp_conf_t *tmcf,
2637 nxt_router_engine_conf_t *recf)
2638{
2639 nxt_int_t ret;
2640
2641 ret = nxt_router_engine_quit(tmcf, recf);
2642 if (nxt_slow_path(ret != NXT_OK)) {
2643 return ret;
2644 }
2645
2634 ret = nxt_router_engine_joints_delete(tmcf, recf, &tmcf->updating);
2646 ret = nxt_router_engine_joints_delete(tmcf, recf, &updating_sockets);
2635 if (nxt_slow_path(ret != NXT_OK)) {
2636 return ret;
2637 }
2638
2647 if (nxt_slow_path(ret != NXT_OK)) {
2648 return ret;
2649 }
2650
2639 return nxt_router_engine_joints_delete(tmcf, recf, &tmcf->deleting);
2651 return nxt_router_engine_joints_delete(tmcf, recf, &deleting_sockets);
2640}
2641
2642
2643static nxt_int_t
2644nxt_router_engine_joints_create(nxt_router_temp_conf_t *tmcf,
2645 nxt_router_engine_conf_t *recf, nxt_queue_t *sockets,
2646 nxt_work_handler_t handler)
2647{

--- 2823 unchanged lines hidden ---
2652}
2653
2654
2655static nxt_int_t
2656nxt_router_engine_joints_create(nxt_router_temp_conf_t *tmcf,
2657 nxt_router_engine_conf_t *recf, nxt_queue_t *sockets,
2658 nxt_work_handler_t handler)
2659{

--- 2823 unchanged lines hidden ---