nxt_router.c (1925:b8a2ac618950) nxt_router.c (1926:6e85d6c0b8bb)
1
2/*
3 * Copyright (C) Igor Sysoev
4 * Copyright (C) Valentin V. Bartenev
5 * Copyright (C) NGINX, Inc.
6 */
7
8#include <nxt_router.h>

--- 4 unchanged lines hidden (view full) ---

13#include <nxt_http.h>
14#include <nxt_port_memory_int.h>
15#include <nxt_unit_request.h>
16#include <nxt_unit_response.h>
17#include <nxt_router_request.h>
18#include <nxt_app_queue.h>
19#include <nxt_port_queue.h>
20
1
2/*
3 * Copyright (C) Igor Sysoev
4 * Copyright (C) Valentin V. Bartenev
5 * Copyright (C) NGINX, Inc.
6 */
7
8#include <nxt_router.h>

--- 4 unchanged lines hidden (view full) ---

13#include <nxt_http.h>
14#include <nxt_port_memory_int.h>
15#include <nxt_unit_request.h>
16#include <nxt_unit_response.h>
17#include <nxt_router_request.h>
18#include <nxt_app_queue.h>
19#include <nxt_port_queue.h>
20
21#define NXT_SHARED_PORT_ID 0xFFFFu
22
21typedef struct {
22 nxt_str_t type;
23 uint32_t processes;
24 uint32_t max_processes;
25 uint32_t spare_processes;
26 nxt_msec_t timeout;
27 nxt_msec_t idle_timeout;
28 uint32_t requests;

--- 33 unchanged lines hidden (view full) ---

62
63
64typedef struct {
65 nxt_app_t *app;
66 nxt_router_temp_conf_t *temp_conf;
67} nxt_app_rpc_t;
68
69
23typedef struct {
24 nxt_str_t type;
25 uint32_t processes;
26 uint32_t max_processes;
27 uint32_t spare_processes;
28 nxt_msec_t timeout;
29 nxt_msec_t idle_timeout;
30 uint32_t requests;

--- 33 unchanged lines hidden (view full) ---

64
65
66typedef struct {
67 nxt_app_t *app;
68 nxt_router_temp_conf_t *temp_conf;
69} nxt_app_rpc_t;
70
71
72typedef struct {
73 nxt_app_joint_t *app_joint;
74 uint32_t generation;
75} nxt_app_joint_rpc_t;
76
77
70static nxt_int_t nxt_router_prefork(nxt_task_t *task, nxt_process_t *process,
71 nxt_mp_t *mp);
72static nxt_int_t nxt_router_start(nxt_task_t *task, nxt_process_data_t *data);
73static void nxt_router_greet_controller(nxt_task_t *task,
74 nxt_port_t *controller_port);
75
76static nxt_int_t nxt_router_start_app_process(nxt_task_t *task, nxt_app_t *app);
77
78static void nxt_router_new_port_handler(nxt_task_t *task,
79 nxt_port_recv_msg_t *msg);
80static void nxt_router_conf_data_handler(nxt_task_t *task,
81 nxt_port_recv_msg_t *msg);
78static nxt_int_t nxt_router_prefork(nxt_task_t *task, nxt_process_t *process,
79 nxt_mp_t *mp);
80static nxt_int_t nxt_router_start(nxt_task_t *task, nxt_process_data_t *data);
81static void nxt_router_greet_controller(nxt_task_t *task,
82 nxt_port_t *controller_port);
83
84static nxt_int_t nxt_router_start_app_process(nxt_task_t *task, nxt_app_t *app);
85
86static void nxt_router_new_port_handler(nxt_task_t *task,
87 nxt_port_recv_msg_t *msg);
88static void nxt_router_conf_data_handler(nxt_task_t *task,
89 nxt_port_recv_msg_t *msg);
90static void nxt_router_app_restart_handler(nxt_task_t *task,
91 nxt_port_recv_msg_t *msg);
82static void nxt_router_remove_pid_handler(nxt_task_t *task,
83 nxt_port_recv_msg_t *msg);
84static void nxt_router_access_log_reopen_handler(nxt_task_t *task,
85 nxt_port_recv_msg_t *msg);
86
87static nxt_router_temp_conf_t *nxt_router_temp_conf(nxt_task_t *task);
88static void nxt_router_conf_apply(nxt_task_t *task, void *obj, void *data);
89static void nxt_router_conf_ready(nxt_task_t *task,

--- 186 unchanged lines hidden (view full) ---

276static const nxt_port_handlers_t nxt_router_process_port_handlers = {
277 .quit = nxt_signal_quit_handler,
278 .new_port = nxt_router_new_port_handler,
279 .get_port = nxt_router_get_port_handler,
280 .change_file = nxt_port_change_log_file_handler,
281 .mmap = nxt_port_mmap_handler,
282 .get_mmap = nxt_router_get_mmap_handler,
283 .data = nxt_router_conf_data_handler,
92static void nxt_router_remove_pid_handler(nxt_task_t *task,
93 nxt_port_recv_msg_t *msg);
94static void nxt_router_access_log_reopen_handler(nxt_task_t *task,
95 nxt_port_recv_msg_t *msg);
96
97static nxt_router_temp_conf_t *nxt_router_temp_conf(nxt_task_t *task);
98static void nxt_router_conf_apply(nxt_task_t *task, void *obj, void *data);
99static void nxt_router_conf_ready(nxt_task_t *task,

--- 186 unchanged lines hidden (view full) ---

286static const nxt_port_handlers_t nxt_router_process_port_handlers = {
287 .quit = nxt_signal_quit_handler,
288 .new_port = nxt_router_new_port_handler,
289 .get_port = nxt_router_get_port_handler,
290 .change_file = nxt_port_change_log_file_handler,
291 .mmap = nxt_port_mmap_handler,
292 .get_mmap = nxt_router_get_mmap_handler,
293 .data = nxt_router_conf_data_handler,
294 .app_restart = nxt_router_app_restart_handler,
284 .remove_pid = nxt_router_remove_pid_handler,
285 .access_log = nxt_router_access_log_reopen_handler,
286 .rpc_ready = nxt_port_rpc_handler,
287 .rpc_error = nxt_port_rpc_handler,
288 .oosm = nxt_router_oosm_handler,
289};
290
291

--- 82 unchanged lines hidden (view full) ---

374 -1, 0, 0, NULL);
375}
376
377
378static void
379nxt_router_start_app_process_handler(nxt_task_t *task, nxt_port_t *port,
380 void *data)
381{
295 .remove_pid = nxt_router_remove_pid_handler,
296 .access_log = nxt_router_access_log_reopen_handler,
297 .rpc_ready = nxt_port_rpc_handler,
298 .rpc_error = nxt_port_rpc_handler,
299 .oosm = nxt_router_oosm_handler,
300};
301
302

--- 82 unchanged lines hidden (view full) ---

385 -1, 0, 0, NULL);
386}
387
388
389static void
390nxt_router_start_app_process_handler(nxt_task_t *task, nxt_port_t *port,
391 void *data)
392{
382 size_t size;
383 uint32_t stream;
384 nxt_mp_t *mp;
385 nxt_int_t ret;
386 nxt_app_t *app;
387 nxt_buf_t *b;
388 nxt_port_t *main_port;
389 nxt_runtime_t *rt;
393 size_t size;
394 uint32_t stream;
395 nxt_mp_t *mp;
396 nxt_int_t ret;
397 nxt_app_t *app;
398 nxt_buf_t *b;
399 nxt_port_t *main_port;
400 nxt_runtime_t *rt;
401 nxt_app_joint_rpc_t *app_joint_rpc;
390
391 app = data;
392
393 rt = task->thread->runtime;
394 main_port = rt->port_by_type[NXT_PROCESS_MAIN];
395
396 nxt_debug(task, "app '%V' %p start process", &app->name, app);
397

--- 4 unchanged lines hidden (view full) ---

402 if (nxt_slow_path(b == NULL)) {
403 goto failed;
404 }
405
406 nxt_buf_cpystr(b, &app->name);
407 *b->mem.free++ = '\0';
408 nxt_buf_cpystr(b, &app->conf);
409
402
403 app = data;
404
405 rt = task->thread->runtime;
406 main_port = rt->port_by_type[NXT_PROCESS_MAIN];
407
408 nxt_debug(task, "app '%V' %p start process", &app->name, app);
409

--- 4 unchanged lines hidden (view full) ---

414 if (nxt_slow_path(b == NULL)) {
415 goto failed;
416 }
417
418 nxt_buf_cpystr(b, &app->name);
419 *b->mem.free++ = '\0';
420 nxt_buf_cpystr(b, &app->conf);
421
410 nxt_router_app_joint_use(task, app->joint, 1);
411
412 stream = nxt_port_rpc_register_handler(task, port,
413 nxt_router_app_port_ready,
414 nxt_router_app_port_error,
415 -1, app->joint);
416
417 if (nxt_slow_path(stream == 0)) {
418 nxt_router_app_joint_use(task, app->joint, -1);
419
422 app_joint_rpc = nxt_port_rpc_register_handler_ex(task, port,
423 nxt_router_app_port_ready,
424 nxt_router_app_port_error,
425 sizeof(nxt_app_joint_rpc_t));
426 if (nxt_slow_path(app_joint_rpc == NULL)) {
420 goto failed;
421 }
422
427 goto failed;
428 }
429
430 stream = nxt_port_rpc_ex_stream(app_joint_rpc);
431
423 ret = nxt_port_socket_write(task, main_port, NXT_PORT_MSG_START_PROCESS,
424 -1, stream, port->id, b);
432 ret = nxt_port_socket_write(task, main_port, NXT_PORT_MSG_START_PROCESS,
433 -1, stream, port->id, b);
425
426 if (nxt_slow_path(ret != NXT_OK)) {
427 nxt_port_rpc_cancel(task, port, stream);
428
434 if (nxt_slow_path(ret != NXT_OK)) {
435 nxt_port_rpc_cancel(task, port, stream);
436
429 nxt_router_app_joint_use(task, app->joint, -1);
430
431 goto failed;
432 }
433
437 goto failed;
438 }
439
440 app_joint_rpc->app_joint = app->joint;
441 app_joint_rpc->generation = app->generation;
442
443 nxt_router_app_joint_use(task, app->joint, 1);
444
434 nxt_router_app_use(task, app, -1);
435
436 return;
437
438failed:
439
440 if (b != NULL) {
441 mp = b->data;

--- 57 unchanged lines hidden (view full) ---

499}
500
501
502nxt_inline nxt_bool_t
503nxt_router_msg_cancel(nxt_task_t *task, nxt_request_rpc_data_t *req_rpc_data)
504{
505 nxt_buf_t *b, *next;
506 nxt_bool_t cancelled;
445 nxt_router_app_use(task, app, -1);
446
447 return;
448
449failed:
450
451 if (b != NULL) {
452 mp = b->data;

--- 57 unchanged lines hidden (view full) ---

510}
511
512
513nxt_inline nxt_bool_t
514nxt_router_msg_cancel(nxt_task_t *task, nxt_request_rpc_data_t *req_rpc_data)
515{
516 nxt_buf_t *b, *next;
517 nxt_bool_t cancelled;
518 nxt_port_t *app_port;
507 nxt_msg_info_t *msg_info;
508
509 msg_info = &req_rpc_data->msg_info;
510
511 if (msg_info->buf == NULL) {
512 return 0;
513 }
514
519 nxt_msg_info_t *msg_info;
520
521 msg_info = &req_rpc_data->msg_info;
522
523 if (msg_info->buf == NULL) {
524 return 0;
525 }
526
515 cancelled = nxt_app_queue_cancel(req_rpc_data->app->shared_port->queue,
516 msg_info->tracking_cookie,
517 req_rpc_data->stream);
527 app_port = req_rpc_data->app_port;
518
528
519 if (cancelled) {
520 nxt_debug(task, "stream #%uD: cancelled by router",
521 req_rpc_data->stream);
529 if (app_port != NULL && app_port->id == NXT_SHARED_PORT_ID) {
530 cancelled = nxt_app_queue_cancel(app_port->queue,
531 msg_info->tracking_cookie,
532 req_rpc_data->stream);
533
534 if (cancelled) {
535 nxt_debug(task, "stream #%uD: cancelled by router",
536 req_rpc_data->stream);
537 }
538
539 } else {
540 cancelled = 0;
522 }
523
524 for (b = msg_info->buf; b != NULL; b = next) {
525 next = b->next;
526 b->next = NULL;
527
528 if (b->is_port_mmap_sent) {
529 b->is_port_mmap_sent = cancelled == 0;

--- 259 unchanged lines hidden (view full) ---

789 if (msg->fd[0] != -1) {
790 nxt_fd_close(msg->fd[0]);
791 msg->fd[0] = -1;
792 }
793}
794
795
796static void
541 }
542
543 for (b = msg_info->buf; b != NULL; b = next) {
544 next = b->next;
545 b->next = NULL;
546
547 if (b->is_port_mmap_sent) {
548 b->is_port_mmap_sent = cancelled == 0;

--- 259 unchanged lines hidden (view full) ---

808 if (msg->fd[0] != -1) {
809 nxt_fd_close(msg->fd[0]);
810 msg->fd[0] = -1;
811 }
812}
813
814
815static void
816nxt_router_app_restart_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
817{
818 nxt_app_t *app;
819 nxt_int_t ret;
820 nxt_str_t app_name;
821 nxt_port_t *port, *reply_port, *shared_port, *old_shared_port;
822 nxt_port_msg_type_t reply;
823
824 reply_port = nxt_runtime_port_find(task->thread->runtime,
825 msg->port_msg.pid,
826 msg->port_msg.reply_port);
827 if (nxt_slow_path(reply_port == NULL)) {
828 nxt_alert(task, "app_restart_handler: reply port not found");
829 return;
830 }
831
832 app_name.length = nxt_buf_mem_used_size(&msg->buf->mem);
833 app_name.start = msg->buf->mem.pos;
834
835 nxt_debug(task, "app_restart_handler: %V", &app_name);
836
837 app = nxt_router_app_find(&nxt_router->apps, &app_name);
838
839 if (nxt_fast_path(app != NULL)) {
840 shared_port = nxt_port_new(task, NXT_SHARED_PORT_ID, nxt_pid,
841 NXT_PROCESS_APP);
842 if (nxt_slow_path(shared_port == NULL)) {
843 goto fail;
844 }
845
846 ret = nxt_port_socket_init(task, shared_port, 0);
847 if (nxt_slow_path(ret != NXT_OK)) {
848 nxt_port_use(task, shared_port, -1);
849 goto fail;
850 }
851
852 ret = nxt_router_app_queue_init(task, shared_port);
853 if (nxt_slow_path(ret != NXT_OK)) {
854 nxt_port_write_close(shared_port);
855 nxt_port_read_close(shared_port);
856 nxt_port_use(task, shared_port, -1);
857 goto fail;
858 }
859
860 nxt_port_write_enable(task, shared_port);
861
862 nxt_thread_mutex_lock(&app->mutex);
863
864 nxt_queue_each(port, &app->ports, nxt_port_t, app_link) {
865
866 (void) nxt_port_socket_write(task, port, NXT_PORT_MSG_QUIT, -1,
867 0, 0, NULL);
868
869 } nxt_queue_loop;
870
871 app->generation++;
872
873 shared_port->app = app;
874
875 old_shared_port = app->shared_port;
876 old_shared_port->app = NULL;
877
878 app->shared_port = shared_port;
879
880 nxt_thread_mutex_unlock(&app->mutex);
881
882 nxt_port_close(task, old_shared_port);
883 nxt_port_use(task, old_shared_port, -1);
884
885 reply = NXT_PORT_MSG_RPC_READY_LAST;
886
887 } else {
888
889fail:
890
891 reply = NXT_PORT_MSG_RPC_ERROR;
892 }
893
894 nxt_port_socket_write(task, reply_port, reply, -1, msg->port_msg.stream,
895 0, NULL);
896}
897
898
899static void
797nxt_router_app_process_remove_pid(nxt_task_t *task, nxt_port_t *port,
798 void *data)
799{
800 union {
801 nxt_pid_t removed_pid;
802 void *data;
803 } u;
804

--- 797 unchanged lines hidden (view full) ---

1602 app_joint->idle_timer.handler = nxt_router_app_idle_timeout;
1603 app_joint->idle_timer.task = &engine->task;
1604 app_joint->idle_timer.log = app_joint->idle_timer.task->log;
1605
1606 app_joint->free_app_work.handler = nxt_router_free_app;
1607 app_joint->free_app_work.task = &engine->task;
1608 app_joint->free_app_work.obj = app_joint;
1609
900nxt_router_app_process_remove_pid(nxt_task_t *task, nxt_port_t *port,
901 void *data)
902{
903 union {
904 nxt_pid_t removed_pid;
905 void *data;
906 } u;
907

--- 797 unchanged lines hidden (view full) ---

1705 app_joint->idle_timer.handler = nxt_router_app_idle_timeout;
1706 app_joint->idle_timer.task = &engine->task;
1707 app_joint->idle_timer.log = app_joint->idle_timer.task->log;
1708
1709 app_joint->free_app_work.handler = nxt_router_free_app;
1710 app_joint->free_app_work.task = &engine->task;
1711 app_joint->free_app_work.obj = app_joint;
1712
1610 port = nxt_port_new(task, (nxt_port_id_t) -1, nxt_pid,
1713 port = nxt_port_new(task, NXT_SHARED_PORT_ID, nxt_pid,
1611 NXT_PROCESS_APP);
1612 if (nxt_slow_path(port == NULL)) {
1613 return NXT_ERROR;
1614 }
1615
1616 ret = nxt_port_socket_init(task, port, 0);
1617 if (nxt_slow_path(ret != NXT_OK)) {
1618 nxt_port_use(task, port, -1);

--- 2609 unchanged lines hidden (view full) ---

4228 nxt_request_rpc_data_unlink(task, req_rpc_data);
4229}
4230
4231
4232static void
4233nxt_router_app_port_ready(nxt_task_t *task, nxt_port_recv_msg_t *msg,
4234 void *data)
4235{
1714 NXT_PROCESS_APP);
1715 if (nxt_slow_path(port == NULL)) {
1716 return NXT_ERROR;
1717 }
1718
1719 ret = nxt_port_socket_init(task, port, 0);
1720 if (nxt_slow_path(ret != NXT_OK)) {
1721 nxt_port_use(task, port, -1);

--- 2609 unchanged lines hidden (view full) ---

4331 nxt_request_rpc_data_unlink(task, req_rpc_data);
4332}
4333
4334
4335static void
4336nxt_router_app_port_ready(nxt_task_t *task, nxt_port_recv_msg_t *msg,
4337 void *data)
4338{
4236 nxt_app_t *app;
4237 nxt_port_t *port;
4238 nxt_app_joint_t *app_joint;
4339 nxt_app_t *app;
4340 nxt_bool_t start_process;
4341 nxt_port_t *port;
4342 nxt_app_joint_t *app_joint;
4343 nxt_app_joint_rpc_t *app_joint_rpc;
4239
4344
4240 app_joint = data;
4345 nxt_assert(data != NULL);
4346
4347 app_joint_rpc = data;
4348 app_joint = app_joint_rpc->app_joint;
4241 port = msg->u.new_port;
4242
4243 nxt_assert(app_joint != NULL);
4244 nxt_assert(port != NULL);
4245 nxt_assert(port->type == NXT_PROCESS_APP);
4246 nxt_assert(port->id == 0);
4247
4248 app = app_joint->app;
4249
4250 nxt_router_app_joint_use(task, app_joint, -1);
4251
4252 if (nxt_slow_path(app == NULL)) {
4253 nxt_debug(task, "new port ready for released app, send QUIT");
4254
4255 nxt_port_socket_write(task, port, NXT_PORT_MSG_QUIT, -1, 0, 0, NULL);
4256
4257 return;
4258 }
4259
4349 port = msg->u.new_port;
4350
4351 nxt_assert(app_joint != NULL);
4352 nxt_assert(port != NULL);
4353 nxt_assert(port->type == NXT_PROCESS_APP);
4354 nxt_assert(port->id == 0);
4355
4356 app = app_joint->app;
4357
4358 nxt_router_app_joint_use(task, app_joint, -1);
4359
4360 if (nxt_slow_path(app == NULL)) {
4361 nxt_debug(task, "new port ready for released app, send QUIT");
4362
4363 nxt_port_socket_write(task, port, NXT_PORT_MSG_QUIT, -1, 0, 0, NULL);
4364
4365 return;
4366 }
4367
4260 port->app = app;
4261 port->main_app_port = port;
4262
4263 nxt_thread_mutex_lock(&app->mutex);
4264
4265 nxt_assert(app->pending_processes != 0);
4266
4267 app->pending_processes--;
4368 nxt_thread_mutex_lock(&app->mutex);
4369
4370 nxt_assert(app->pending_processes != 0);
4371
4372 app->pending_processes--;
4373
4374 if (nxt_slow_path(app->generation != app_joint_rpc->generation)) {
4375 nxt_debug(task, "new port ready for restarted app, send QUIT");
4376
4377 start_process = !task->thread->engine->shutdown
4378 && nxt_router_app_can_start(app)
4379 && nxt_router_app_need_start(app);
4380
4381 if (start_process) {
4382 app->pending_processes++;
4383 }
4384
4385 nxt_thread_mutex_unlock(&app->mutex);
4386
4387 nxt_port_socket_write(task, port, NXT_PORT_MSG_QUIT, -1, 0, 0, NULL);
4388
4389 if (start_process) {
4390 nxt_router_start_app_process(task, app);
4391 }
4392
4393 return;
4394 }
4395
4396 port->app = app;
4397 port->main_app_port = port;
4398
4268 app->processes++;
4269 nxt_port_hash_add(&app->port_hash, port);
4270 app->port_hash_count++;
4271
4272 nxt_thread_mutex_unlock(&app->mutex);
4273
4274 nxt_debug(task, "app '%V' new port ready, pid %PI, %d/%d",
4275 &app->name, port->pid, app->processes, app->pending_processes);

--- 37 unchanged lines hidden (view full) ---

4313 0, 0, b);
4314}
4315
4316
4317static void
4318nxt_router_app_port_error(nxt_task_t *task, nxt_port_recv_msg_t *msg,
4319 void *data)
4320{
4399 app->processes++;
4400 nxt_port_hash_add(&app->port_hash, port);
4401 app->port_hash_count++;
4402
4403 nxt_thread_mutex_unlock(&app->mutex);
4404
4405 nxt_debug(task, "app '%V' new port ready, pid %PI, %d/%d",
4406 &app->name, port->pid, app->processes, app->pending_processes);

--- 37 unchanged lines hidden (view full) ---

4444 0, 0, b);
4445}
4446
4447
4448static void
4449nxt_router_app_port_error(nxt_task_t *task, nxt_port_recv_msg_t *msg,
4450 void *data)
4451{
4321 nxt_app_t *app;
4322 nxt_app_joint_t *app_joint;
4323 nxt_queue_link_t *link;
4324 nxt_http_request_t *r;
4452 nxt_app_t *app;
4453 nxt_app_joint_t *app_joint;
4454 nxt_queue_link_t *link;
4455 nxt_http_request_t *r;
4456 nxt_app_joint_rpc_t *app_joint_rpc;
4325
4457
4326 app_joint = data;
4458 nxt_assert(data != NULL);
4327
4459
4460 app_joint_rpc = data;
4461 app_joint = app_joint_rpc->app_joint;
4462
4328 nxt_assert(app_joint != NULL);
4329
4330 app = app_joint->app;
4331
4332 nxt_router_app_joint_use(task, app_joint, -1);
4333
4334 if (nxt_slow_path(app == NULL)) {
4335 nxt_debug(task, "start error for released app");

--- 149 unchanged lines hidden (view full) ---

4485 inc_use = -1;
4486 break;
4487 }
4488
4489 nxt_debug(task, "app '%V' release port %PI:%d: %d %d", &app->name,
4490 port->pid, port->id,
4491 (int) inc_use, (int) got_response);
4492
4463 nxt_assert(app_joint != NULL);
4464
4465 app = app_joint->app;
4466
4467 nxt_router_app_joint_use(task, app_joint, -1);
4468
4469 if (nxt_slow_path(app == NULL)) {
4470 nxt_debug(task, "start error for released app");

--- 149 unchanged lines hidden (view full) ---

4620 inc_use = -1;
4621 break;
4622 }
4623
4624 nxt_debug(task, "app '%V' release port %PI:%d: %d %d", &app->name,
4625 port->pid, port->id,
4626 (int) inc_use, (int) got_response);
4627
4493 if (port == app->shared_port) {
4628 if (port->id == NXT_SHARED_PORT_ID) {
4494 nxt_thread_mutex_lock(&app->mutex);
4495
4496 app->active_requests -= got_response + dec_requests;
4497
4498 nxt_thread_mutex_unlock(&app->mutex);
4499
4500 goto adjust_use;
4501 }

--- 353 unchanged lines hidden (view full) ---

4855 nxt_port_mmaps_destroy(&app->outgoing, 1);
4856
4857 nxt_thread_mutex_destroy(&app->outgoing.mutex);
4858
4859 if (app->shared_port != NULL) {
4860 app->shared_port->app = NULL;
4861 nxt_port_close(task, app->shared_port);
4862 nxt_port_use(task, app->shared_port, -1);
4629 nxt_thread_mutex_lock(&app->mutex);
4630
4631 app->active_requests -= got_response + dec_requests;
4632
4633 nxt_thread_mutex_unlock(&app->mutex);
4634
4635 goto adjust_use;
4636 }

--- 353 unchanged lines hidden (view full) ---

4990 nxt_port_mmaps_destroy(&app->outgoing, 1);
4991
4992 nxt_thread_mutex_destroy(&app->outgoing.mutex);
4993
4994 if (app->shared_port != NULL) {
4995 app->shared_port->app = NULL;
4996 nxt_port_close(task, app->shared_port);
4997 nxt_port_use(task, app->shared_port, -1);
4998
4999 app->shared_port = NULL;
4863 }
4864
4865 nxt_thread_mutex_destroy(&app->mutex);
4866 nxt_mp_destroy(app->mem_pool);
4867
4868 app_joint->app = NULL;
4869
4870 if (nxt_timer_delete(task->thread->engine, &app_joint->idle_timer)) {

--- 798 unchanged lines hidden ---
5000 }
5001
5002 nxt_thread_mutex_destroy(&app->mutex);
5003 nxt_mp_destroy(app->mem_pool);
5004
5005 app_joint->app = NULL;
5006
5007 if (nxt_timer_delete(task->thread->engine, &app_joint->idle_timer)) {

--- 798 unchanged lines hidden ---