nxt_router.c (1978:13e1e2651f08) nxt_router.c (1980:43553aa72111)
1
2/*
3 * Copyright (C) Igor Sysoev
4 * Copyright (C) Valentin V. Bartenev
5 * Copyright (C) NGINX, Inc.
6 */
7
8#include <nxt_router.h>

--- 13 unchanged lines hidden (view full) ---

22
23typedef struct {
24 nxt_str_t type;
25 uint32_t processes;
26 uint32_t max_processes;
27 uint32_t spare_processes;
28 nxt_msec_t timeout;
29 nxt_msec_t idle_timeout;
1
2/*
3 * Copyright (C) Igor Sysoev
4 * Copyright (C) Valentin V. Bartenev
5 * Copyright (C) NGINX, Inc.
6 */
7
8#include <nxt_router.h>

--- 13 unchanged lines hidden (view full) ---

22
23typedef struct {
24 nxt_str_t type;
25 uint32_t processes;
26 uint32_t max_processes;
27 uint32_t spare_processes;
28 nxt_msec_t timeout;
29 nxt_msec_t idle_timeout;
30 uint32_t requests;
31 nxt_conf_value_t *limits_value;
32 nxt_conf_value_t *processes_value;
33 nxt_conf_value_t *targets_value;
34} nxt_router_app_conf_t;
35
36
37typedef struct {
38 nxt_str_t pass;

--- 1249 unchanged lines hidden (view full) ---

1288
1289
1290static nxt_conf_map_t nxt_router_app_limits_conf[] = {
1291 {
1292 nxt_string("timeout"),
1293 NXT_CONF_MAP_MSEC,
1294 offsetof(nxt_router_app_conf_t, timeout),
1295 },
30 nxt_conf_value_t *limits_value;
31 nxt_conf_value_t *processes_value;
32 nxt_conf_value_t *targets_value;
33} nxt_router_app_conf_t;
34
35
36typedef struct {
37 nxt_str_t pass;

--- 1249 unchanged lines hidden (view full) ---

1287
1288
1289static nxt_conf_map_t nxt_router_app_limits_conf[] = {
1290 {
1291 nxt_string("timeout"),
1292 NXT_CONF_MAP_MSEC,
1293 offsetof(nxt_router_app_conf_t, timeout),
1294 },
1296
1297 {
1298 nxt_string("requests"),
1299 NXT_CONF_MAP_INT32,
1300 offsetof(nxt_router_app_conf_t, requests),
1301 },
1302};
1303
1304
1305static nxt_conf_map_t nxt_router_app_processes_conf[] = {
1306 {
1307 nxt_string("spare"),
1308 NXT_CONF_MAP_INT32,
1309 offsetof(nxt_router_app_conf_t, spare_processes),

--- 252 unchanged lines hidden (view full) ---

1562 continue;
1563 }
1564
1565 apcf.processes = 1;
1566 apcf.max_processes = 1;
1567 apcf.spare_processes = 0;
1568 apcf.timeout = 0;
1569 apcf.idle_timeout = 15000;
1295};
1296
1297
1298static nxt_conf_map_t nxt_router_app_processes_conf[] = {
1299 {
1300 nxt_string("spare"),
1301 NXT_CONF_MAP_INT32,
1302 offsetof(nxt_router_app_conf_t, spare_processes),

--- 252 unchanged lines hidden (view full) ---

1555 continue;
1556 }
1557
1558 apcf.processes = 1;
1559 apcf.max_processes = 1;
1560 apcf.spare_processes = 0;
1561 apcf.timeout = 0;
1562 apcf.idle_timeout = 15000;
1570 apcf.requests = 0;
1571 apcf.limits_value = NULL;
1572 apcf.processes_value = NULL;
1573 apcf.targets_value = NULL;
1574
1575 app_joint = nxt_malloc(sizeof(nxt_app_joint_t));
1576 if (nxt_slow_path(app_joint == NULL)) {
1577 goto app_fail;
1578 }

--- 63 unchanged lines hidden (view full) ---

1642
1643 } else {
1644 targets = NULL;
1645 }
1646
1647 nxt_debug(task, "application type: %V", &apcf.type);
1648 nxt_debug(task, "application processes: %D", apcf.processes);
1649 nxt_debug(task, "application request timeout: %M", apcf.timeout);
1563 apcf.limits_value = NULL;
1564 apcf.processes_value = NULL;
1565 apcf.targets_value = NULL;
1566
1567 app_joint = nxt_malloc(sizeof(nxt_app_joint_t));
1568 if (nxt_slow_path(app_joint == NULL)) {
1569 goto app_fail;
1570 }

--- 63 unchanged lines hidden (view full) ---

1634
1635 } else {
1636 targets = NULL;
1637 }
1638
1639 nxt_debug(task, "application type: %V", &apcf.type);
1640 nxt_debug(task, "application processes: %D", apcf.processes);
1641 nxt_debug(task, "application request timeout: %M", apcf.timeout);
1650 nxt_debug(task, "application requests: %D", apcf.requests);
1651
1652 lang = nxt_app_lang_module(task->thread->runtime, &apcf.type);
1653
1654 if (lang == NULL) {
1655 nxt_alert(task, "unknown application type: \"%V\"", &apcf.type);
1656 goto app_fail;
1657 }
1658

--- 14 unchanged lines hidden (view full) ---

1673
1674 app->type = lang->type;
1675 app->max_processes = apcf.max_processes;
1676 app->spare_processes = apcf.spare_processes;
1677 app->max_pending_processes = apcf.spare_processes
1678 ? apcf.spare_processes : 1;
1679 app->timeout = apcf.timeout;
1680 app->idle_timeout = apcf.idle_timeout;
1642
1643 lang = nxt_app_lang_module(task->thread->runtime, &apcf.type);
1644
1645 if (lang == NULL) {
1646 nxt_alert(task, "unknown application type: \"%V\"", &apcf.type);
1647 goto app_fail;
1648 }
1649

--- 14 unchanged lines hidden (view full) ---

1664
1665 app->type = lang->type;
1666 app->max_processes = apcf.max_processes;
1667 app->spare_processes = apcf.spare_processes;
1668 app->max_pending_processes = apcf.spare_processes
1669 ? apcf.spare_processes : 1;
1670 app->timeout = apcf.timeout;
1671 app->idle_timeout = apcf.idle_timeout;
1681 app->max_requests = apcf.requests;
1682
1683 app->targets = targets;
1684
1685 engine = task->thread->engine;
1686
1687 app->engine = engine;
1688
1689 app->adjust_idle_work.handler = nxt_router_adjust_idle_timer;

--- 2981 unchanged lines hidden (view full) ---

4671
4672
4673static void
4674nxt_router_app_port_release(nxt_task_t *task, nxt_app_t *app, nxt_port_t *port,
4675 nxt_apr_action_t action)
4676{
4677 int inc_use;
4678 uint32_t got_response, dec_requests;
1672
1673 app->targets = targets;
1674
1675 engine = task->thread->engine;
1676
1677 app->engine = engine;
1678
1679 app->adjust_idle_work.handler = nxt_router_adjust_idle_timer;

--- 2981 unchanged lines hidden (view full) ---

4661
4662
4663static void
4664nxt_router_app_port_release(nxt_task_t *task, nxt_app_t *app, nxt_port_t *port,
4665 nxt_apr_action_t action)
4666{
4667 int inc_use;
4668 uint32_t got_response, dec_requests;
4679 nxt_bool_t port_unchained, send_quit, adjust_idle_timer;
4669 nxt_bool_t adjust_idle_timer;
4680 nxt_port_t *main_app_port;
4681
4682 nxt_assert(port != NULL);
4683
4684 inc_use = 0;
4685 got_response = 0;
4686 dec_requests = 0;
4687

--- 29 unchanged lines hidden (view full) ---

4717
4718 goto adjust_use;
4719 }
4720
4721 main_app_port = port->main_app_port;
4722
4723 nxt_thread_mutex_lock(&app->mutex);
4724
4670 nxt_port_t *main_app_port;
4671
4672 nxt_assert(port != NULL);
4673
4674 inc_use = 0;
4675 got_response = 0;
4676 dec_requests = 0;
4677

--- 29 unchanged lines hidden (view full) ---

4707
4708 goto adjust_use;
4709 }
4710
4711 main_app_port = port->main_app_port;
4712
4713 nxt_thread_mutex_lock(&app->mutex);
4714
4725 main_app_port->app_responses += got_response;
4726 main_app_port->active_requests -= got_response + dec_requests;
4727 app->active_requests -= got_response + dec_requests;
4728
4715 main_app_port->active_requests -= got_response + dec_requests;
4716 app->active_requests -= got_response + dec_requests;
4717
4729 if (main_app_port->pair[1] != -1
4730 && (app->max_requests == 0
4731 || main_app_port->app_responses < app->max_requests))
4732 {
4733 if (main_app_port->app_link.next == NULL) {
4734 nxt_queue_insert_tail(&app->ports, &main_app_port->app_link);
4718 if (main_app_port->pair[1] != -1 && main_app_port->app_link.next == NULL) {
4719 nxt_queue_insert_tail(&app->ports, &main_app_port->app_link);
4735
4720
4736 nxt_port_inc_use(main_app_port);
4737 }
4721 nxt_port_inc_use(main_app_port);
4738 }
4739
4722 }
4723
4740 send_quit = (app->max_requests > 0
4741 && main_app_port->app_responses >= app->max_requests);
4742
4743 if (send_quit) {
4744 port_unchained = nxt_queue_chk_remove(&main_app_port->app_link);
4745
4746 nxt_port_hash_remove(&app->port_hash, main_app_port);
4747 app->port_hash_count--;
4748
4749 main_app_port->app = NULL;
4750 app->processes--;
4751
4752 } else {
4753 port_unchained = 0;
4754 }
4755
4756 adjust_idle_timer = 0;
4757
4724 adjust_idle_timer = 0;
4725
4758 if (main_app_port->pair[1] != -1 && !send_quit
4726 if (main_app_port->pair[1] != -1
4759 && main_app_port->active_requests == 0
4760 && main_app_port->active_websockets == 0
4761 && main_app_port->idle_link.next == NULL)
4762 {
4763 if (app->idle_processes == app->spare_processes
4764 && app->adjust_idle_work.data == NULL)
4765 {
4766 adjust_idle_timer = 1;

--- 28 unchanged lines hidden (view full) ---

4795 /* ? */
4796 if (main_app_port->pair[1] == -1) {
4797 nxt_debug(task, "app '%V' %p port %p already closed (pid %PI dead?)",
4798 &app->name, app, main_app_port, main_app_port->pid);
4799
4800 goto adjust_use;
4801 }
4802
4727 && main_app_port->active_requests == 0
4728 && main_app_port->active_websockets == 0
4729 && main_app_port->idle_link.next == NULL)
4730 {
4731 if (app->idle_processes == app->spare_processes
4732 && app->adjust_idle_work.data == NULL)
4733 {
4734 adjust_idle_timer = 1;

--- 28 unchanged lines hidden (view full) ---

4763 /* ? */
4764 if (main_app_port->pair[1] == -1) {
4765 nxt_debug(task, "app '%V' %p port %p already closed (pid %PI dead?)",
4766 &app->name, app, main_app_port, main_app_port->pid);
4767
4768 goto adjust_use;
4769 }
4770
4803 if (send_quit) {
4804 nxt_debug(task, "app '%V' %p send QUIT to port", &app->name, app);
4805
4806 nxt_port_socket_write(task, main_app_port, NXT_PORT_MSG_QUIT, -1, 0, 0,
4807 NULL);
4808
4809 if (port_unchained) {
4810 nxt_port_use(task, main_app_port, -1);
4811 }
4812
4813 goto adjust_use;
4814 }
4815
4816 nxt_debug(task, "app '%V' %p requests queue is empty, keep the port",
4817 &app->name, app);
4818
4819adjust_use:
4820
4821 nxt_port_use(task, port, inc_use);
4822}
4823

--- 1059 unchanged lines hidden ---
4771 nxt_debug(task, "app '%V' %p requests queue is empty, keep the port",
4772 &app->name, app);
4773
4774adjust_use:
4775
4776 nxt_port_use(task, port, inc_use);
4777}
4778

--- 1059 unchanged lines hidden ---