nxt_router.c (1547:cbcd76704c90) nxt_router.c (1549:cad592a06086)
1
2/*
3 * Copyright (C) Igor Sysoev
4 * Copyright (C) Valentin V. Bartenev
5 * Copyright (C) NGINX, Inc.
6 */
7
8#include <nxt_router.h>

--- 423 unchanged lines hidden (view full) ---

432
433static nxt_int_t
434nxt_router_start_app_process(nxt_task_t *task, nxt_app_t *app)
435{
436 nxt_int_t res;
437 nxt_port_t *router_port;
438 nxt_runtime_t *rt;
439
1
2/*
3 * Copyright (C) Igor Sysoev
4 * Copyright (C) Valentin V. Bartenev
5 * Copyright (C) NGINX, Inc.
6 */
7
8#include <nxt_router.h>

--- 423 unchanged lines hidden (view full) ---

432
433static nxt_int_t
434nxt_router_start_app_process(nxt_task_t *task, nxt_app_t *app)
435{
436 nxt_int_t res;
437 nxt_port_t *router_port;
438 nxt_runtime_t *rt;
439
440 nxt_debug(task, "app '%V' start process", &app->name);
441
440 rt = task->thread->runtime;
441 router_port = rt->port_by_type[NXT_PROCESS_ROUTER];
442
443 nxt_router_app_use(task, app, 1);
444
445 res = nxt_port_post(task, router_port, nxt_router_start_app_process_handler,
446 app);
447

--- 1814 unchanged lines hidden (view full) ---

2262 app->pending_processes--;
2263 app->processes++;
2264 app->idle_processes++;
2265
2266 engine = task->thread->engine;
2267
2268 nxt_queue_insert_tail(&app->ports, &port->app_link);
2269 nxt_queue_insert_tail(&app->spare_ports, &port->idle_link);
442 rt = task->thread->runtime;
443 router_port = rt->port_by_type[NXT_PROCESS_ROUTER];
444
445 nxt_router_app_use(task, app, 1);
446
447 res = nxt_port_post(task, router_port, nxt_router_start_app_process_handler,
448 app);
449

--- 1814 unchanged lines hidden (view full) ---

2264 app->pending_processes--;
2265 app->processes++;
2266 app->idle_processes++;
2267
2268 engine = task->thread->engine;
2269
2270 nxt_queue_insert_tail(&app->ports, &port->app_link);
2271 nxt_queue_insert_tail(&app->spare_ports, &port->idle_link);
2272
2273 nxt_debug(task, "app '%V' move new port %PI:%d to spare_ports",
2274 &app->name, port->pid, port->id);
2275
2270 nxt_port_hash_add(&app->port_hash, port);
2271 app->port_hash_count++;
2272
2273 port->idle_start = 0;
2274
2275 nxt_port_inc_use(port);
2276
2277 nxt_router_app_shared_port_send(task, port);

--- 1407 unchanged lines hidden (view full) ---

3685 return;
3686 }
3687
3688 main_app_port = app_port->main_app_port;
3689
3690 if (nxt_queue_chk_remove(&main_app_port->idle_link)) {
3691 app->idle_processes--;
3692
2276 nxt_port_hash_add(&app->port_hash, port);
2277 app->port_hash_count++;
2278
2279 port->idle_start = 0;
2280
2281 nxt_port_inc_use(port);
2282
2283 nxt_router_app_shared_port_send(task, port);

--- 1407 unchanged lines hidden (view full) ---

3691 return;
3692 }
3693
3694 main_app_port = app_port->main_app_port;
3695
3696 if (nxt_queue_chk_remove(&main_app_port->idle_link)) {
3697 app->idle_processes--;
3698
3699 nxt_debug(task, "app '%V' move port %PI:%d out of %s (ack)",
3700 &app->name, main_app_port->pid, main_app_port->id,
3701 (main_app_port->idle_start ? "idle_ports" : "spare_ports"));
3702
3693 /* Check port was in 'spare_ports' using idle_start field. */
3694 if (main_app_port->idle_start == 0
3695 && app->idle_processes >= app->spare_processes)
3696 {
3697 /*
3698 * If there is a vacant space in spare ports,
3699 * move the last idle to spare_ports.
3700 */
3701 nxt_assert(!nxt_queue_is_empty(&app->idle_ports));
3702
3703 idle_lnk = nxt_queue_last(&app->idle_ports);
3704 idle_port = nxt_queue_link_data(idle_lnk, nxt_port_t, idle_link);
3705 nxt_queue_remove(idle_lnk);
3706
3707 nxt_queue_insert_tail(&app->spare_ports, idle_lnk);
3708
3709 idle_port->idle_start = 0;
3703 /* Check port was in 'spare_ports' using idle_start field. */
3704 if (main_app_port->idle_start == 0
3705 && app->idle_processes >= app->spare_processes)
3706 {
3707 /*
3708 * If there is a vacant space in spare ports,
3709 * move the last idle to spare_ports.
3710 */
3711 nxt_assert(!nxt_queue_is_empty(&app->idle_ports));
3712
3713 idle_lnk = nxt_queue_last(&app->idle_ports);
3714 idle_port = nxt_queue_link_data(idle_lnk, nxt_port_t, idle_link);
3715 nxt_queue_remove(idle_lnk);
3716
3717 nxt_queue_insert_tail(&app->spare_ports, idle_lnk);
3718
3719 idle_port->idle_start = 0;
3720
3721 nxt_debug(task, "app '%V' move port %PI:%d from idle_ports "
3722 "to spare_ports",
3723 &app->name, idle_port->pid, idle_port->id);
3710 }
3711
3712 if (nxt_router_app_can_start(app) && nxt_router_app_need_start(app)) {
3713 app->pending_processes++;
3714 start_process = 1;
3715 }
3716 }
3717

--- 202 unchanged lines hidden (view full) ---

3920 } else {
3921 nxt_router_free_app(task, app->joint, NULL);
3922 }
3923 }
3924}
3925
3926
3927nxt_inline nxt_port_t *
3724 }
3725
3726 if (nxt_router_app_can_start(app) && nxt_router_app_need_start(app)) {
3727 app->pending_processes++;
3728 start_process = 1;
3729 }
3730 }
3731

--- 202 unchanged lines hidden (view full) ---

3934 } else {
3935 nxt_router_free_app(task, app->joint, NULL);
3936 }
3937 }
3938}
3939
3940
3941nxt_inline nxt_port_t *
3928nxt_router_app_get_port_for_quit(nxt_app_t *app)
3942nxt_router_app_get_port_for_quit(nxt_task_t *task, nxt_app_t *app)
3929{
3930 nxt_port_t *port;
3931
3932 port = NULL;
3933
3934 nxt_thread_mutex_lock(&app->mutex);
3935
3936 nxt_queue_each(port, &app->ports, nxt_port_t, app_link) {
3937
3938 /* Caller is responsible to decrease port use count. */
3939 nxt_queue_chk_remove(&port->app_link);
3940
3941 if (nxt_queue_chk_remove(&port->idle_link)) {
3942 app->idle_processes--;
3943{
3944 nxt_port_t *port;
3945
3946 port = NULL;
3947
3948 nxt_thread_mutex_lock(&app->mutex);
3949
3950 nxt_queue_each(port, &app->ports, nxt_port_t, app_link) {
3951
3952 /* Caller is responsible to decrease port use count. */
3953 nxt_queue_chk_remove(&port->app_link);
3954
3955 if (nxt_queue_chk_remove(&port->idle_link)) {
3956 app->idle_processes--;
3957
3958 nxt_debug(task, "app '%V' move port %PI:%d out of %s for quit",
3959 &app->name, port->pid, port->id,
3960 (port->idle_start ? "idle_ports" : "spare_ports"));
3943 }
3944
3945 nxt_port_hash_remove(&app->port_hash, port);
3946 app->port_hash_count--;
3947
3948 port->app = NULL;
3949 app->processes--;
3950

--- 118 unchanged lines hidden (view full) ---

4069 adjust_idle_timer = 1;
4070 app->adjust_idle_work.data = app;
4071 app->adjust_idle_work.next = NULL;
4072 }
4073
4074 if (app->idle_processes < app->spare_processes) {
4075 nxt_queue_insert_tail(&app->spare_ports, &main_app_port->idle_link);
4076
3961 }
3962
3963 nxt_port_hash_remove(&app->port_hash, port);
3964 app->port_hash_count--;
3965
3966 port->app = NULL;
3967 app->processes--;
3968

--- 118 unchanged lines hidden (view full) ---

4087 adjust_idle_timer = 1;
4088 app->adjust_idle_work.data = app;
4089 app->adjust_idle_work.next = NULL;
4090 }
4091
4092 if (app->idle_processes < app->spare_processes) {
4093 nxt_queue_insert_tail(&app->spare_ports, &main_app_port->idle_link);
4094
4095 nxt_debug(task, "app '%V' move port %PI:%d to spare_ports",
4096 &app->name, main_app_port->pid, main_app_port->id);
4077 } else {
4078 nxt_queue_insert_tail(&app->idle_ports, &main_app_port->idle_link);
4079
4080 main_app_port->idle_start = task->thread->engine->timers.now;
4097 } else {
4098 nxt_queue_insert_tail(&app->idle_ports, &main_app_port->idle_link);
4099
4100 main_app_port->idle_start = task->thread->engine->timers.now;
4101
4102 nxt_debug(task, "app '%V' move port %PI:%d to idle_ports",
4103 &app->name, main_app_port->pid, main_app_port->id);
4081 }
4082
4083 app->idle_processes++;
4084 }
4085
4086 nxt_thread_mutex_unlock(&app->mutex);
4087
4088 if (adjust_idle_timer) {

--- 57 unchanged lines hidden (view full) ---

4146 return;
4147 }
4148
4149 unchain = nxt_queue_chk_remove(&port->app_link);
4150
4151 if (nxt_queue_chk_remove(&port->idle_link)) {
4152 app->idle_processes--;
4153
4104 }
4105
4106 app->idle_processes++;
4107 }
4108
4109 nxt_thread_mutex_unlock(&app->mutex);
4110
4111 if (adjust_idle_timer) {

--- 57 unchanged lines hidden (view full) ---

4169 return;
4170 }
4171
4172 unchain = nxt_queue_chk_remove(&port->app_link);
4173
4174 if (nxt_queue_chk_remove(&port->idle_link)) {
4175 app->idle_processes--;
4176
4177 nxt_debug(task, "app '%V' move port %PI:%d out of %s before close",
4178 &app->name, port->pid, port->id,
4179 (port->idle_start ? "idle_ports" : "spare_ports"));
4180
4154 if (port->idle_start == 0
4155 && app->idle_processes >= app->spare_processes)
4156 {
4157 nxt_assert(!nxt_queue_is_empty(&app->idle_ports));
4158
4159 idle_lnk = nxt_queue_last(&app->idle_ports);
4160 idle_port = nxt_queue_link_data(idle_lnk, nxt_port_t, idle_link);
4161 nxt_queue_remove(idle_lnk);
4162
4163 nxt_queue_insert_tail(&app->spare_ports, idle_lnk);
4164
4165 idle_port->idle_start = 0;
4181 if (port->idle_start == 0
4182 && app->idle_processes >= app->spare_processes)
4183 {
4184 nxt_assert(!nxt_queue_is_empty(&app->idle_ports));
4185
4186 idle_lnk = nxt_queue_last(&app->idle_ports);
4187 idle_port = nxt_queue_link_data(idle_lnk, nxt_port_t, idle_link);
4188 nxt_queue_remove(idle_lnk);
4189
4190 nxt_queue_insert_tail(&app->spare_ports, idle_lnk);
4191
4192 idle_port->idle_start = 0;
4193
4194 nxt_debug(task, "app '%V' move port %PI:%d from idle_ports "
4195 "to spare_ports",
4196 &app->name, idle_port->pid, idle_port->id);
4166 }
4167 }
4168
4169 app->processes--;
4170
4171 start_process = !task->thread->engine->shutdown
4172 && nxt_router_app_can_start(app)
4173 && nxt_router_app_need_start(app);

--- 64 unchanged lines hidden (view full) ---

4238
4239 if (timeout > threshold) {
4240 break;
4241 }
4242
4243 nxt_queue_remove(lnk);
4244 lnk->next = NULL;
4245
4197 }
4198 }
4199
4200 app->processes--;
4201
4202 start_process = !task->thread->engine->shutdown
4203 && nxt_router_app_can_start(app)
4204 && nxt_router_app_need_start(app);

--- 64 unchanged lines hidden (view full) ---

4269
4270 if (timeout > threshold) {
4271 break;
4272 }
4273
4274 nxt_queue_remove(lnk);
4275 lnk->next = NULL;
4276
4277 nxt_debug(task, "app '%V' move port %PI:%d out of idle_ports (timeout)",
4278 &app->name, port->pid, port->id);
4279
4246 nxt_queue_chk_remove(&port->app_link);
4247
4248 nxt_port_hash_remove(&app->port_hash, port);
4249 app->port_hash_count--;
4250
4251 app->idle_processes--;
4252 app->processes--;
4253 port->app = NULL;

--- 59 unchanged lines hidden (view full) ---

4313 nxt_app_t *app;
4314 nxt_port_t *port;
4315 nxt_app_joint_t *app_joint;
4316
4317 app_joint = obj;
4318 app = app_joint->app;
4319
4320 for ( ;; ) {
4280 nxt_queue_chk_remove(&port->app_link);
4281
4282 nxt_port_hash_remove(&app->port_hash, port);
4283 app->port_hash_count--;
4284
4285 app->idle_processes--;
4286 app->processes--;
4287 port->app = NULL;

--- 59 unchanged lines hidden (view full) ---

4347 nxt_app_t *app;
4348 nxt_port_t *port;
4349 nxt_app_joint_t *app_joint;
4350
4351 app_joint = obj;
4352 app = app_joint->app;
4353
4354 for ( ;; ) {
4321 port = nxt_router_app_get_port_for_quit(app);
4355 port = nxt_router_app_get_port_for_quit(task, app);
4322 if (port == NULL) {
4323 break;
4324 }
4325
4326 nxt_debug(task, "send QUIT to app '%V' pid %PI", &app->name, port->pid);
4327
4328 nxt_port_socket_write(task, port, NXT_PORT_MSG_QUIT, -1, 0, 0, NULL);
4329

--- 753 unchanged lines hidden ---
4356 if (port == NULL) {
4357 break;
4358 }
4359
4360 nxt_debug(task, "send QUIT to app '%V' pid %PI", &app->name, port->pid);
4361
4362 nxt_port_socket_write(task, port, NXT_PORT_MSG_QUIT, -1, 0, 0, NULL);
4363

--- 753 unchanged lines hidden ---