nxt_router.c (494:7c83ddcc1c42) nxt_router.c (507:fa714d76592b)
1
2/*
3 * Copyright (C) Igor Sysoev
4 * Copyright (C) Valentin V. Bartenev
5 * Copyright (C) NGINX, Inc.
6 */
7
8#include <nxt_router.h>
9#include <nxt_conf.h>
10#include <nxt_http.h>
11
12
13typedef struct {
14 nxt_str_t type;
1
2/*
3 * Copyright (C) Igor Sysoev
4 * Copyright (C) Valentin V. Bartenev
5 * Copyright (C) NGINX, Inc.
6 */
7
8#include <nxt_router.h>
9#include <nxt_conf.h>
10#include <nxt_http.h>
11
12
13typedef struct {
14 nxt_str_t type;
15 uint32_t workers;
15 uint32_t processes;
16 uint32_t max_processes;
17 uint32_t spare_processes;
16 nxt_msec_t timeout;
17 nxt_msec_t res_timeout;
18 nxt_msec_t timeout;
19 nxt_msec_t res_timeout;
20 nxt_msec_t idle_timeout;
18 uint32_t requests;
19 nxt_conf_value_t *limits_value;
21 uint32_t requests;
22 nxt_conf_value_t *limits_value;
23 nxt_conf_value_t *processes_value;
20} nxt_router_app_conf_t;
21
22
23typedef struct {
24 nxt_str_t application;
25} nxt_router_listener_conf_t;
26
27

--- 43 unchanged lines hidden (view full) ---

71
72
73typedef struct {
74 nxt_socket_conf_t *socket_conf;
75 nxt_router_temp_conf_t *temp_conf;
76} nxt_socket_rpc_t;
77
78
24} nxt_router_app_conf_t;
25
26
27typedef struct {
28 nxt_str_t application;
29} nxt_router_listener_conf_t;
30
31

--- 43 unchanged lines hidden (view full) ---

75
76
77typedef struct {
78 nxt_socket_conf_t *socket_conf;
79 nxt_router_temp_conf_t *temp_conf;
80} nxt_socket_rpc_t;
81
82
83typedef struct {
84 nxt_app_t *app;
85 nxt_router_temp_conf_t *temp_conf;
86} nxt_app_rpc_t;
87
88
79struct nxt_port_select_state_s {
80 nxt_app_t *app;
81 nxt_req_app_link_t *ra;
82
83 nxt_port_t *failed_port;
84 int failed_port_use_delta;
85
89struct nxt_port_select_state_s {
90 nxt_app_t *app;
91 nxt_req_app_link_t *ra;
92
93 nxt_port_t *failed_port;
94 int failed_port_use_delta;
95
86 nxt_bool_t can_start_worker;
96 uint8_t start_process; /* 1 bit */
87 nxt_req_app_link_t *shared_ra;
88 nxt_port_t *port;
89};
90
91typedef struct nxt_port_select_state_s nxt_port_select_state_t;
92
93static void nxt_router_port_select(nxt_task_t *task,
94 nxt_port_select_state_t *state);
95
96static nxt_int_t nxt_router_port_post_select(nxt_task_t *task,
97 nxt_port_select_state_t *state);
98
97 nxt_req_app_link_t *shared_ra;
98 nxt_port_t *port;
99};
100
101typedef struct nxt_port_select_state_s nxt_port_select_state_t;
102
103static void nxt_router_port_select(nxt_task_t *task,
104 nxt_port_select_state_t *state);
105
106static nxt_int_t nxt_router_port_post_select(nxt_task_t *task,
107 nxt_port_select_state_t *state);
108
99static nxt_int_t nxt_router_start_worker(nxt_task_t *task, nxt_app_t *app);
109static nxt_int_t nxt_router_start_app_process(nxt_task_t *task, nxt_app_t *app);
100
101nxt_inline void
102nxt_router_ra_inc_use(nxt_req_app_link_t *ra)
103{
104 nxt_atomic_fetch_add(&ra->use_count, 1);
105}
106
107nxt_inline void

--- 23 unchanged lines hidden (view full) ---

131static nxt_app_t *nxt_router_listener_application(nxt_router_temp_conf_t *tmcf,
132 nxt_str_t *name);
133static void nxt_router_listen_socket_rpc_create(nxt_task_t *task,
134 nxt_router_temp_conf_t *tmcf, nxt_socket_conf_t *skcf);
135static void nxt_router_listen_socket_ready(nxt_task_t *task,
136 nxt_port_recv_msg_t *msg, void *data);
137static void nxt_router_listen_socket_error(nxt_task_t *task,
138 nxt_port_recv_msg_t *msg, void *data);
110
111nxt_inline void
112nxt_router_ra_inc_use(nxt_req_app_link_t *ra)
113{
114 nxt_atomic_fetch_add(&ra->use_count, 1);
115}
116
117nxt_inline void

--- 23 unchanged lines hidden (view full) ---

141static nxt_app_t *nxt_router_listener_application(nxt_router_temp_conf_t *tmcf,
142 nxt_str_t *name);
143static void nxt_router_listen_socket_rpc_create(nxt_task_t *task,
144 nxt_router_temp_conf_t *tmcf, nxt_socket_conf_t *skcf);
145static void nxt_router_listen_socket_ready(nxt_task_t *task,
146 nxt_port_recv_msg_t *msg, void *data);
147static void nxt_router_listen_socket_error(nxt_task_t *task,
148 nxt_port_recv_msg_t *msg, void *data);
149static void nxt_router_app_rpc_create(nxt_task_t *task,
150 nxt_router_temp_conf_t *tmcf, nxt_app_t *app);
151static void nxt_router_app_prefork_ready(nxt_task_t *task,
152 nxt_port_recv_msg_t *msg, void *data);
153static void nxt_router_app_prefork_error(nxt_task_t *task,
154 nxt_port_recv_msg_t *msg, void *data);
139static nxt_socket_conf_t *nxt_router_socket_conf(nxt_task_t *task,
140 nxt_router_temp_conf_t *tmcf, nxt_str_t *name);
141static nxt_int_t nxt_router_listen_socket_find(nxt_router_temp_conf_t *tmcf,
142 nxt_socket_conf_t *nskcf, nxt_sockaddr_t *sa);
143
144static nxt_int_t nxt_router_engines_create(nxt_task_t *task,
145 nxt_router_t *router, nxt_router_temp_conf_t *tmcf,
146 const nxt_event_interface_t *interface);

--- 41 unchanged lines hidden (view full) ---

188static void nxt_router_conf_release(nxt_task_t *task,
189 nxt_socket_conf_joint_t *joint);
190
191static void nxt_router_app_port_ready(nxt_task_t *task,
192 nxt_port_recv_msg_t *msg, void *data);
193static void nxt_router_app_port_error(nxt_task_t *task,
194 nxt_port_recv_msg_t *msg, void *data);
195
155static nxt_socket_conf_t *nxt_router_socket_conf(nxt_task_t *task,
156 nxt_router_temp_conf_t *tmcf, nxt_str_t *name);
157static nxt_int_t nxt_router_listen_socket_find(nxt_router_temp_conf_t *tmcf,
158 nxt_socket_conf_t *nskcf, nxt_sockaddr_t *sa);
159
160static nxt_int_t nxt_router_engines_create(nxt_task_t *task,
161 nxt_router_t *router, nxt_router_temp_conf_t *tmcf,
162 const nxt_event_interface_t *interface);

--- 41 unchanged lines hidden (view full) ---

204static void nxt_router_conf_release(nxt_task_t *task,
205 nxt_socket_conf_joint_t *joint);
206
207static void nxt_router_app_port_ready(nxt_task_t *task,
208 nxt_port_recv_msg_t *msg, void *data);
209static void nxt_router_app_port_error(nxt_task_t *task,
210 nxt_port_recv_msg_t *msg, void *data);
211
196static nxt_port_t * nxt_router_app_get_idle_port(nxt_app_t *app);
212static void nxt_router_app_quit(nxt_task_t *task, nxt_app_t *app);
197static void nxt_router_app_port_release(nxt_task_t *task, nxt_port_t *port,
198 uint32_t request_failed, uint32_t got_response);
199static nxt_int_t nxt_router_app_port(nxt_task_t *task, nxt_app_t *app,
200 nxt_req_app_link_t *ra);
201
202static void nxt_router_app_prepare_request(nxt_task_t *task,
203 nxt_req_app_link_t *ra);
204static nxt_int_t nxt_python_prepare_msg(nxt_task_t *task, nxt_app_request_t *r,
205 nxt_app_wmsg_t *wmsg);
206static nxt_int_t nxt_php_prepare_msg(nxt_task_t *task, nxt_app_request_t *r,
207 nxt_app_wmsg_t *wmsg);
208static nxt_int_t nxt_go_prepare_msg(nxt_task_t *task, nxt_app_request_t *r,
209 nxt_app_wmsg_t *wmsg);
210static void nxt_router_conn_free(nxt_task_t *task, void *obj, void *data);
211static void nxt_router_app_timeout(nxt_task_t *task, void *obj, void *data);
213static void nxt_router_app_port_release(nxt_task_t *task, nxt_port_t *port,
214 uint32_t request_failed, uint32_t got_response);
215static nxt_int_t nxt_router_app_port(nxt_task_t *task, nxt_app_t *app,
216 nxt_req_app_link_t *ra);
217
218static void nxt_router_app_prepare_request(nxt_task_t *task,
219 nxt_req_app_link_t *ra);
220static nxt_int_t nxt_python_prepare_msg(nxt_task_t *task, nxt_app_request_t *r,
221 nxt_app_wmsg_t *wmsg);
222static nxt_int_t nxt_php_prepare_msg(nxt_task_t *task, nxt_app_request_t *r,
223 nxt_app_wmsg_t *wmsg);
224static nxt_int_t nxt_go_prepare_msg(nxt_task_t *task, nxt_app_request_t *r,
225 nxt_app_wmsg_t *wmsg);
226static void nxt_router_conn_free(nxt_task_t *task, void *obj, void *data);
227static void nxt_router_app_timeout(nxt_task_t *task, void *obj, void *data);
228static void nxt_router_adjust_idle_timer(nxt_task_t *task, void *obj,
229 void *data);
230static void nxt_router_app_idle_timeout(nxt_task_t *task, void *obj,
231 void *data);
232static void nxt_router_app_release_handler(nxt_task_t *task, void *obj,
233 void *data);
212
213static const nxt_http_request_state_t nxt_http_request_send_state;
214static void nxt_http_request_send_body(nxt_task_t *task, void *obj, void *data);
215
216static nxt_router_t *nxt_router;
217
218
219static nxt_app_prepare_msg_t nxt_app_prepare_msg[] = {

--- 28 unchanged lines hidden (view full) ---

248
249 nxt_router = router;
250
251 return NXT_OK;
252}
253
254
255static void
234
235static const nxt_http_request_state_t nxt_http_request_send_state;
236static void nxt_http_request_send_body(nxt_task_t *task, void *obj, void *data);
237
238static nxt_router_t *nxt_router;
239
240
241static nxt_app_prepare_msg_t nxt_app_prepare_msg[] = {

--- 28 unchanged lines hidden (view full) ---

270
271 nxt_router = router;
272
273 return NXT_OK;
274}
275
276
277static void
256nxt_router_start_worker_handler(nxt_task_t *task, nxt_port_t *port, void *data)
278nxt_router_start_app_process_handler(nxt_task_t *task, nxt_port_t *port,
279 void *data)
257{
258 size_t size;
259 uint32_t stream;
260 nxt_mp_t *mp;
261 nxt_app_t *app;
262 nxt_buf_t *b;
263 nxt_port_t *main_port;
264 nxt_runtime_t *rt;
265
266 app = data;
267
268 rt = task->thread->runtime;
269 main_port = rt->port_by_type[NXT_PROCESS_MAIN];
270
280{
281 size_t size;
282 uint32_t stream;
283 nxt_mp_t *mp;
284 nxt_app_t *app;
285 nxt_buf_t *b;
286 nxt_port_t *main_port;
287 nxt_runtime_t *rt;
288
289 app = data;
290
291 rt = task->thread->runtime;
292 main_port = rt->port_by_type[NXT_PROCESS_MAIN];
293
271 nxt_debug(task, "app '%V' %p start worker", &app->name, app);
294 nxt_debug(task, "app '%V' %p start process", &app->name, app);
272
273 size = app->name.length + 1 + app->conf.length;
274
275 b = nxt_buf_mem_ts_alloc(task, task->thread->engine->mem_pool, size);
276
277 if (nxt_slow_path(b == NULL)) {
278 goto failed;
279 }

--- 19 unchanged lines hidden (view full) ---

299 stream, port->id, b);
300
301 return;
302
303failed:
304
305 nxt_thread_mutex_lock(&app->mutex);
306
295
296 size = app->name.length + 1 + app->conf.length;
297
298 b = nxt_buf_mem_ts_alloc(task, task->thread->engine->mem_pool, size);
299
300 if (nxt_slow_path(b == NULL)) {
301 goto failed;
302 }

--- 19 unchanged lines hidden (view full) ---

322 stream, port->id, b);
323
324 return;
325
326failed:
327
328 nxt_thread_mutex_lock(&app->mutex);
329
307 app->pending_workers--;
330 app->pending_processes--;
308
309 nxt_thread_mutex_unlock(&app->mutex);
310
311 nxt_router_app_use(task, app, -1);
312}
313
314
315static nxt_int_t
331
332 nxt_thread_mutex_unlock(&app->mutex);
333
334 nxt_router_app_use(task, app, -1);
335}
336
337
338static nxt_int_t
316nxt_router_start_worker(nxt_task_t *task, nxt_app_t *app)
339nxt_router_start_app_process(nxt_task_t *task, nxt_app_t *app)
317{
318 nxt_int_t res;
319 nxt_port_t *router_port;
320 nxt_runtime_t *rt;
321
322 rt = task->thread->runtime;
323 router_port = rt->port_by_type[NXT_PROCESS_ROUTER];
324
325 nxt_router_app_use(task, app, 1);
326
340{
341 nxt_int_t res;
342 nxt_port_t *router_port;
343 nxt_runtime_t *rt;
344
345 rt = task->thread->runtime;
346 router_port = rt->port_by_type[NXT_PROCESS_ROUTER];
347
348 nxt_router_app_use(task, app, 1);
349
327 res = nxt_port_post(task, router_port, nxt_router_start_worker_handler,
350 res = nxt_port_post(task, router_port, nxt_router_start_app_process_handler,
328 app);
329
330 if (res == NXT_OK) {
331 return res;
332 }
333
334 nxt_thread_mutex_lock(&app->mutex);
335
351 app);
352
353 if (res == NXT_OK) {
354 return res;
355 }
356
357 nxt_thread_mutex_lock(&app->mutex);
358
336 app->pending_workers--;
359 app->pending_processes--;
337
338 nxt_thread_mutex_unlock(&app->mutex);
339
340 nxt_router_app_use(task, app, -1);
341
342 return NXT_ERROR;
343}
344

--- 387 unchanged lines hidden (view full) ---

732
733 } else {
734 nxt_router_conf_error(task, tmcf);
735 }
736}
737
738
739static void
360
361 nxt_thread_mutex_unlock(&app->mutex);
362
363 nxt_router_app_use(task, app, -1);
364
365 return NXT_ERROR;
366}
367

--- 387 unchanged lines hidden (view full) ---

755
756 } else {
757 nxt_router_conf_error(task, tmcf);
758 }
759}
760
761
762static void
740nxt_router_worker_remove_pid(nxt_task_t *task, nxt_port_t *port, void *data)
763nxt_router_app_process_remove_pid(nxt_task_t *task, nxt_port_t *port,
764 void *data)
741{
742 union {
743 nxt_pid_t removed_pid;
744 void *data;
745 } u;
746
747 u.data = data;
748

--- 9 unchanged lines hidden (view full) ---

758 nxt_port_remove_pid_handler(task, msg);
759
760 if (msg->port_msg.stream == 0) {
761 return;
762 }
763
764 nxt_queue_each(engine, &nxt_router->engines, nxt_event_engine_t, link0)
765 {
765{
766 union {
767 nxt_pid_t removed_pid;
768 void *data;
769 } u;
770
771 u.data = data;
772

--- 9 unchanged lines hidden (view full) ---

782 nxt_port_remove_pid_handler(task, msg);
783
784 if (msg->port_msg.stream == 0) {
785 return;
786 }
787
788 nxt_queue_each(engine, &nxt_router->engines, nxt_event_engine_t, link0)
789 {
766 nxt_port_post(task, engine->port, nxt_router_worker_remove_pid,
790 nxt_port_post(task, engine->port, nxt_router_app_process_remove_pid,
767 msg->u.data);
768 }
769 nxt_queue_loop;
770
771 msg->port_msg.type = _NXT_PORT_MSG_RPC_ERROR;
772
773 nxt_port_rpc_handler(task, msg);
774}

--- 57 unchanged lines hidden (view full) ---

832fail:
833
834 nxt_mp_destroy(mp);
835
836 return NULL;
837}
838
839
791 msg->u.data);
792 }
793 nxt_queue_loop;
794
795 msg->port_msg.type = _NXT_PORT_MSG_RPC_ERROR;
796
797 nxt_port_rpc_handler(task, msg);
798}

--- 57 unchanged lines hidden (view full) ---

856fail:
857
858 nxt_mp_destroy(mp);
859
860 return NULL;
861}
862
863
864nxt_inline nxt_bool_t
865nxt_router_app_can_start(nxt_app_t *app)
866{
867 return app->processes + app->pending_processes < app->max_processes
868 && app->pending_processes < app->max_pending_processes;
869}
870
871
872nxt_inline nxt_bool_t
873nxt_router_app_need_start(nxt_app_t *app)
874{
875 return app->idle_processes + app->pending_processes
876 < app->spare_processes;
877}
878
879
840static void
841nxt_router_conf_apply(nxt_task_t *task, void *obj, void *data)
842{
843 nxt_int_t ret;
880static void
881nxt_router_conf_apply(nxt_task_t *task, void *obj, void *data)
882{
883 nxt_int_t ret;
884 nxt_app_t *app;
844 nxt_router_t *router;
845 nxt_runtime_t *rt;
846 nxt_queue_link_t *qlk;
847 nxt_socket_conf_t *skcf;
848 nxt_router_temp_conf_t *tmcf;
849 const nxt_event_interface_t *interface;
850
851 tmcf = obj;

--- 6 unchanged lines hidden (view full) ---

858
859 skcf = nxt_queue_link_data(qlk, nxt_socket_conf_t, link);
860
861 nxt_router_listen_socket_rpc_create(task, tmcf, skcf);
862
863 return;
864 }
865
885 nxt_router_t *router;
886 nxt_runtime_t *rt;
887 nxt_queue_link_t *qlk;
888 nxt_socket_conf_t *skcf;
889 nxt_router_temp_conf_t *tmcf;
890 const nxt_event_interface_t *interface;
891
892 tmcf = obj;

--- 6 unchanged lines hidden (view full) ---

899
900 skcf = nxt_queue_link_data(qlk, nxt_socket_conf_t, link);
901
902 nxt_router_listen_socket_rpc_create(task, tmcf, skcf);
903
904 return;
905 }
906
907 nxt_queue_each(app, &tmcf->apps, nxt_app_t, link) {
908
909 if (nxt_router_app_need_start(app)) {
910 nxt_router_app_rpc_create(task, tmcf, app);
911 return;
912 }
913
914 } nxt_queue_loop;
915
866 rt = task->thread->runtime;
867
868 interface = nxt_service_get(rt->services, "engine", NULL);
869
870 router = tmcf->conf->router;
871
872 ret = nxt_router_engines_create(task, router, tmcf, interface);
873 if (nxt_slow_path(ret != NXT_OK)) {

--- 44 unchanged lines hidden (view full) ---

918 nxt_router_conf_send(task, tmcf, NXT_PORT_MSG_RPC_READY_LAST);
919 }
920}
921
922
923static void
924nxt_router_conf_error(nxt_task_t *task, nxt_router_temp_conf_t *tmcf)
925{
916 rt = task->thread->runtime;
917
918 interface = nxt_service_get(rt->services, "engine", NULL);
919
920 router = tmcf->conf->router;
921
922 ret = nxt_router_engines_create(task, router, tmcf, interface);
923 if (nxt_slow_path(ret != NXT_OK)) {

--- 44 unchanged lines hidden (view full) ---

968 nxt_router_conf_send(task, tmcf, NXT_PORT_MSG_RPC_READY_LAST);
969 }
970}
971
972
973static void
974nxt_router_conf_error(nxt_task_t *task, nxt_router_temp_conf_t *tmcf)
975{
976 nxt_app_t *app;
926 nxt_socket_t s;
927 nxt_router_t *router;
928 nxt_queue_link_t *qlk;
929 nxt_socket_conf_t *skcf;
930
931 nxt_log(task, NXT_LOG_CRIT, "failed to apply new conf");
932
933 for (qlk = nxt_queue_first(&tmcf->creating);

--- 5 unchanged lines hidden (view full) ---

939
940 if (s != -1) {
941 nxt_socket_close(task, s);
942 }
943
944 nxt_free(skcf->listen);
945 }
946
977 nxt_socket_t s;
978 nxt_router_t *router;
979 nxt_queue_link_t *qlk;
980 nxt_socket_conf_t *skcf;
981
982 nxt_log(task, NXT_LOG_CRIT, "failed to apply new conf");
983
984 for (qlk = nxt_queue_first(&tmcf->creating);

--- 5 unchanged lines hidden (view full) ---

990
991 if (s != -1) {
992 nxt_socket_close(task, s);
993 }
994
995 nxt_free(skcf->listen);
996 }
997
998 nxt_queue_each(app, &tmcf->apps, nxt_app_t, link) {
999
1000 nxt_router_app_quit(task, app);
1001
1002 } nxt_queue_loop;
1003
947 router = tmcf->conf->router;
948
949 nxt_queue_add(&router->sockets, &tmcf->keeping);
950 nxt_queue_add(&router->sockets, &tmcf->deleting);
951
952 nxt_queue_add(&router->apps, &tmcf->previous);
953
954 // TODO: new engines and threads

--- 24 unchanged lines hidden (view full) ---

979static nxt_conf_map_t nxt_router_app_conf[] = {
980 {
981 nxt_string("type"),
982 NXT_CONF_MAP_STR,
983 offsetof(nxt_router_app_conf_t, type),
984 },
985
986 {
1004 router = tmcf->conf->router;
1005
1006 nxt_queue_add(&router->sockets, &tmcf->keeping);
1007 nxt_queue_add(&router->sockets, &tmcf->deleting);
1008
1009 nxt_queue_add(&router->apps, &tmcf->previous);
1010
1011 // TODO: new engines and threads

--- 24 unchanged lines hidden (view full) ---

1036static nxt_conf_map_t nxt_router_app_conf[] = {
1037 {
1038 nxt_string("type"),
1039 NXT_CONF_MAP_STR,
1040 offsetof(nxt_router_app_conf_t, type),
1041 },
1042
1043 {
987 nxt_string("workers"),
1044 nxt_string("limits"),
1045 NXT_CONF_MAP_PTR,
1046 offsetof(nxt_router_app_conf_t, limits_value),
1047 },
1048
1049 {
1050 nxt_string("processes"),
988 NXT_CONF_MAP_INT32,
1051 NXT_CONF_MAP_INT32,
989 offsetof(nxt_router_app_conf_t, workers),
1052 offsetof(nxt_router_app_conf_t, processes),
990 },
991
992 {
1053 },
1054
1055 {
993 nxt_string("limits"),
1056 nxt_string("processes"),
994 NXT_CONF_MAP_PTR,
1057 NXT_CONF_MAP_PTR,
995 offsetof(nxt_router_app_conf_t, limits_value),
1058 offsetof(nxt_router_app_conf_t, processes_value),
996 },
997};
998
999
1000static nxt_conf_map_t nxt_router_app_limits_conf[] = {
1001 {
1002 nxt_string("timeout"),
1003 NXT_CONF_MAP_MSEC,

--- 9 unchanged lines hidden (view full) ---

1013 {
1014 nxt_string("requests"),
1015 NXT_CONF_MAP_INT32,
1016 offsetof(nxt_router_app_conf_t, requests),
1017 },
1018};
1019
1020
1059 },
1060};
1061
1062
1063static nxt_conf_map_t nxt_router_app_limits_conf[] = {
1064 {
1065 nxt_string("timeout"),
1066 NXT_CONF_MAP_MSEC,

--- 9 unchanged lines hidden (view full) ---

1076 {
1077 nxt_string("requests"),
1078 NXT_CONF_MAP_INT32,
1079 offsetof(nxt_router_app_conf_t, requests),
1080 },
1081};
1082
1083
1084static nxt_conf_map_t nxt_router_app_processes_conf[] = {
1085 {
1086 nxt_string("spare"),
1087 NXT_CONF_MAP_INT32,
1088 offsetof(nxt_router_app_conf_t, spare_processes),
1089 },
1090
1091 {
1092 nxt_string("max"),
1093 NXT_CONF_MAP_INT32,
1094 offsetof(nxt_router_app_conf_t, max_processes),
1095 },
1096
1097 {
1098 nxt_string("idle_timeout"),
1099 NXT_CONF_MAP_MSEC,
1100 offsetof(nxt_router_app_conf_t, idle_timeout),
1101 },
1102};
1103
1104
1021static nxt_conf_map_t nxt_router_listener_conf[] = {
1022 {
1023 nxt_string("application"),
1024 NXT_CONF_MAP_STR,
1025 offsetof(nxt_router_listener_conf_t, application),
1026 },
1027};
1028

--- 66 unchanged lines hidden (view full) ---

1095 nxt_int_t ret;
1096 nxt_str_t name;
1097 nxt_app_t *app, *prev;
1098 nxt_router_t *router;
1099 nxt_conf_value_t *conf, *http;
1100 nxt_conf_value_t *applications, *application;
1101 nxt_conf_value_t *listeners, *listener;
1102 nxt_socket_conf_t *skcf;
1105static nxt_conf_map_t nxt_router_listener_conf[] = {
1106 {
1107 nxt_string("application"),
1108 NXT_CONF_MAP_STR,
1109 offsetof(nxt_router_listener_conf_t, application),
1110 },
1111};
1112

--- 66 unchanged lines hidden (view full) ---

1179 nxt_int_t ret;
1180 nxt_str_t name;
1181 nxt_app_t *app, *prev;
1182 nxt_router_t *router;
1183 nxt_conf_value_t *conf, *http;
1184 nxt_conf_value_t *applications, *application;
1185 nxt_conf_value_t *listeners, *listener;
1186 nxt_socket_conf_t *skcf;
1187 nxt_event_engine_t *engine;
1103 nxt_app_lang_module_t *lang;
1104 nxt_router_app_conf_t apcf;
1105 nxt_router_listener_conf_t lscf;
1106
1107 static nxt_str_t http_path = nxt_string("/http");
1108 static nxt_str_t applications_path = nxt_string("/applications");
1109 static nxt_str_t listeners_path = nxt_string("/listeners");
1110

--- 58 unchanged lines hidden (view full) ---

1169 if (prev != NULL && nxt_strstr_eq(&app->conf, &prev->conf)) {
1170 nxt_free(app);
1171
1172 nxt_queue_remove(&prev->link);
1173 nxt_queue_insert_tail(&tmcf->previous, &prev->link);
1174 continue;
1175 }
1176
1188 nxt_app_lang_module_t *lang;
1189 nxt_router_app_conf_t apcf;
1190 nxt_router_listener_conf_t lscf;
1191
1192 static nxt_str_t http_path = nxt_string("/http");
1193 static nxt_str_t applications_path = nxt_string("/applications");
1194 static nxt_str_t listeners_path = nxt_string("/listeners");
1195

--- 58 unchanged lines hidden (view full) ---

1254 if (prev != NULL && nxt_strstr_eq(&app->conf, &prev->conf)) {
1255 nxt_free(app);
1256
1257 nxt_queue_remove(&prev->link);
1258 nxt_queue_insert_tail(&tmcf->previous, &prev->link);
1259 continue;
1260 }
1261
1177 apcf.workers = 1;
1262 apcf.processes = 1;
1263 apcf.max_processes = 1;
1264 apcf.spare_processes = 1;
1178 apcf.timeout = 0;
1179 apcf.res_timeout = 1000;
1265 apcf.timeout = 0;
1266 apcf.res_timeout = 1000;
1267 apcf.idle_timeout = 15000;
1180 apcf.requests = 0;
1181 apcf.limits_value = NULL;
1268 apcf.requests = 0;
1269 apcf.limits_value = NULL;
1270 apcf.processes_value = NULL;
1182
1183 ret = nxt_conf_map_object(mp, application, nxt_router_app_conf,
1184 nxt_nitems(nxt_router_app_conf), &apcf);
1185 if (ret != NXT_OK) {
1186 nxt_log(task, NXT_LOG_CRIT, "application map error");
1187 goto app_fail;
1188 }
1189

--- 9 unchanged lines hidden (view full) ---

1199 nxt_nitems(nxt_router_app_limits_conf),
1200 &apcf);
1201 if (ret != NXT_OK) {
1202 nxt_log(task, NXT_LOG_CRIT, "application limits map error");
1203 goto app_fail;
1204 }
1205 }
1206
1271
1272 ret = nxt_conf_map_object(mp, application, nxt_router_app_conf,
1273 nxt_nitems(nxt_router_app_conf), &apcf);
1274 if (ret != NXT_OK) {
1275 nxt_log(task, NXT_LOG_CRIT, "application map error");
1276 goto app_fail;
1277 }
1278

--- 9 unchanged lines hidden (view full) ---

1288 nxt_nitems(nxt_router_app_limits_conf),
1289 &apcf);
1290 if (ret != NXT_OK) {
1291 nxt_log(task, NXT_LOG_CRIT, "application limits map error");
1292 goto app_fail;
1293 }
1294 }
1295
1296 if (apcf.processes_value != NULL
1297 && nxt_conf_type(apcf.processes_value) == NXT_CONF_OBJECT)
1298 {
1299 ret = nxt_conf_map_object(mp, apcf.processes_value,
1300 nxt_router_app_processes_conf,
1301 nxt_nitems(nxt_router_app_processes_conf),
1302 &apcf);
1303 if (ret != NXT_OK) {
1304 nxt_log(task, NXT_LOG_CRIT, "application processes map error");
1305 goto app_fail;
1306 }
1307
1308 } else {
1309 apcf.max_processes = apcf.processes;
1310 apcf.spare_processes = apcf.processes;
1311 }
1312
1207 nxt_debug(task, "application type: %V", &apcf.type);
1313 nxt_debug(task, "application type: %V", &apcf.type);
1208 nxt_debug(task, "application workers: %D", apcf.workers);
1209 nxt_debug(task, "application request timeout: %D", apcf.timeout);
1210 nxt_debug(task, "application reschedule timeout: %D", apcf.res_timeout);
1314 nxt_debug(task, "application processes: %D", apcf.processes);
1315 nxt_debug(task, "application request timeout: %M", apcf.timeout);
1316 nxt_debug(task, "application reschedule timeout: %M", apcf.res_timeout);
1211 nxt_debug(task, "application requests: %D", apcf.requests);
1212
1213 lang = nxt_app_lang_module(task->thread->runtime, &apcf.type);
1214
1215 if (lang == NULL) {
1216 nxt_log(task, NXT_LOG_CRIT, "unknown application type: \"%V\"",
1217 &apcf.type);
1218 goto app_fail;
1219 }
1220
1221 nxt_debug(task, "application language module: \"%s\"", lang->file);
1222
1223 ret = nxt_thread_mutex_create(&app->mutex);
1224 if (ret != NXT_OK) {
1225 goto app_fail;
1226 }
1227
1228 nxt_queue_init(&app->ports);
1317 nxt_debug(task, "application requests: %D", apcf.requests);
1318
1319 lang = nxt_app_lang_module(task->thread->runtime, &apcf.type);
1320
1321 if (lang == NULL) {
1322 nxt_log(task, NXT_LOG_CRIT, "unknown application type: \"%V\"",
1323 &apcf.type);
1324 goto app_fail;
1325 }
1326
1327 nxt_debug(task, "application language module: \"%s\"", lang->file);
1328
1329 ret = nxt_thread_mutex_create(&app->mutex);
1330 if (ret != NXT_OK) {
1331 goto app_fail;
1332 }
1333
1334 nxt_queue_init(&app->ports);
1335 nxt_queue_init(&app->spare_ports);
1336 nxt_queue_init(&app->idle_ports);
1229 nxt_queue_init(&app->requests);
1230 nxt_queue_init(&app->pending);
1231
1232 app->name.length = name.length;
1233 nxt_memcpy(app->name.start, name.start, name.length);
1234
1235 app->type = lang->type;
1337 nxt_queue_init(&app->requests);
1338 nxt_queue_init(&app->pending);
1339
1340 app->name.length = name.length;
1341 nxt_memcpy(app->name.start, name.start, name.length);
1342
1343 app->type = lang->type;
1236 app->max_workers = apcf.workers;
1344 app->max_processes = apcf.max_processes;
1345 app->spare_processes = apcf.spare_processes;
1346 app->max_pending_processes = apcf.spare_processes
1347 ? apcf.spare_processes : 1;
1237 app->timeout = apcf.timeout;
1238 app->res_timeout = apcf.res_timeout * 1000000;
1348 app->timeout = apcf.timeout;
1349 app->res_timeout = apcf.res_timeout * 1000000;
1350 app->idle_timeout = apcf.idle_timeout;
1239 app->live = 1;
1240 app->max_pending_responses = 2;
1241 app->max_requests = apcf.requests;
1242 app->prepare_msg = nxt_app_prepare_msg[lang->type];
1243
1351 app->live = 1;
1352 app->max_pending_responses = 2;
1353 app->max_requests = apcf.requests;
1354 app->prepare_msg = nxt_app_prepare_msg[lang->type];
1355
1356 engine = task->thread->engine;
1357
1358 app->engine = engine;
1359
1360 app->idle_timer.precision = NXT_TIMER_DEFAULT_PRECISION;
1361 app->idle_timer.work_queue = &engine->fast_work_queue;
1362 app->idle_timer.handler = nxt_router_app_idle_timeout;
1363 app->idle_timer.task = &engine->task;
1364 app->idle_timer.log = app->idle_timer.task->log;
1365
1366 app->adjust_idle_work.handler = nxt_router_adjust_idle_timer;
1367 app->adjust_idle_work.task = &engine->task;
1368 app->adjust_idle_work.obj = app;
1369
1244 nxt_queue_insert_tail(&tmcf->apps, &app->link);
1245
1246 nxt_router_app_use(task, app, 1);
1247 }
1248
1249 http = nxt_conf_get_path(conf, &http_path);
1250#if 0
1251 if (http == NULL) {

--- 367 unchanged lines hidden (view full) ---

1619 &socket_errors[error], in->mem.free - p, p);
1620
1621 nxt_debug(task, "%*s", out->mem.free - out->mem.pos, out->mem.pos);
1622
1623 nxt_router_conf_error(task, tmcf);
1624}
1625
1626
1370 nxt_queue_insert_tail(&tmcf->apps, &app->link);
1371
1372 nxt_router_app_use(task, app, 1);
1373 }
1374
1375 http = nxt_conf_get_path(conf, &http_path);
1376#if 0
1377 if (http == NULL) {

--- 367 unchanged lines hidden (view full) ---

1745 &socket_errors[error], in->mem.free - p, p);
1746
1747 nxt_debug(task, "%*s", out->mem.free - out->mem.pos, out->mem.pos);
1748
1749 nxt_router_conf_error(task, tmcf);
1750}
1751
1752
1753static void
1754nxt_router_app_rpc_create(nxt_task_t *task,
1755 nxt_router_temp_conf_t *tmcf, nxt_app_t *app)
1756{
1757 size_t size;
1758 uint32_t stream;
1759 nxt_buf_t *b;
1760 nxt_port_t *main_port, *router_port;
1761 nxt_runtime_t *rt;
1762 nxt_app_rpc_t *rpc;
1763
1764 rpc = nxt_mp_alloc(tmcf->mem_pool, sizeof(nxt_app_rpc_t));
1765 if (rpc == NULL) {
1766 goto fail;
1767 }
1768
1769 rpc->app = app;
1770 rpc->temp_conf = tmcf;
1771
1772 nxt_debug(task, "app '%V' prefork", &app->name);
1773
1774 size = app->name.length + 1 + app->conf.length;
1775
1776 b = nxt_buf_mem_alloc(tmcf->mem_pool, size, 0);
1777 if (nxt_slow_path(b == NULL)) {
1778 goto fail;
1779 }
1780
1781 nxt_buf_cpystr(b, &app->name);
1782 *b->mem.free++ = '\0';
1783 nxt_buf_cpystr(b, &app->conf);
1784
1785 rt = task->thread->runtime;
1786 main_port = rt->port_by_type[NXT_PROCESS_MAIN];
1787 router_port = rt->port_by_type[NXT_PROCESS_ROUTER];
1788
1789 stream = nxt_port_rpc_register_handler(task, router_port,
1790 nxt_router_app_prefork_ready,
1791 nxt_router_app_prefork_error,
1792 -1, rpc);
1793 if (nxt_slow_path(stream == 0)) {
1794 goto fail;
1795 }
1796
1797 app->pending_processes++;
1798
1799 nxt_port_socket_write(task, main_port, NXT_PORT_MSG_START_WORKER, -1,
1800 stream, router_port->id, b);
1801
1802 return;
1803
1804fail:
1805
1806 nxt_router_conf_error(task, tmcf);
1807}
1808
1809
1810static void
1811nxt_router_app_prefork_ready(nxt_task_t *task, nxt_port_recv_msg_t *msg,
1812 void *data)
1813{
1814 nxt_app_t *app;
1815 nxt_port_t *port;
1816 nxt_app_rpc_t *rpc;
1817 nxt_event_engine_t *engine;
1818
1819 rpc = data;
1820 app = rpc->app;
1821
1822 port = msg->u.new_port;
1823 port->app = app;
1824
1825 nxt_router_app_use(task, app, 1);
1826
1827 app->pending_processes--;
1828 app->processes++;
1829 app->idle_processes++;
1830
1831 engine = task->thread->engine;
1832
1833 nxt_queue_insert_tail(&app->ports, &port->app_link);
1834 nxt_queue_insert_tail(&app->spare_ports, &port->idle_link);
1835
1836 port->idle_start = 0;
1837
1838 nxt_port_inc_use(port);
1839
1840 nxt_work_queue_add(&engine->fast_work_queue,
1841 nxt_router_conf_apply, task, rpc->temp_conf, NULL);
1842}
1843
1844
1845static void
1846nxt_router_app_prefork_error(nxt_task_t *task, nxt_port_recv_msg_t *msg,
1847 void *data)
1848{
1849 nxt_app_t *app;
1850 nxt_app_rpc_t *rpc;
1851 nxt_router_temp_conf_t *tmcf;
1852
1853 rpc = data;
1854 app = rpc->app;
1855 tmcf = rpc->temp_conf;
1856
1857 nxt_log(task, NXT_LOG_WARN, "failed to start application \"%V\"",
1858 &app->name);
1859
1860 app->pending_processes--;
1861
1862 nxt_router_conf_error(task, tmcf);
1863}
1864
1865
1627static nxt_int_t
1628nxt_router_engines_create(nxt_task_t *task, nxt_router_t *router,
1629 nxt_router_temp_conf_t *tmcf, const nxt_event_interface_t *interface)
1630{
1631 nxt_int_t ret;
1632 nxt_uint_t n, threads;
1633 nxt_queue_link_t *qlk;
1634 nxt_router_engine_conf_t *recf;

--- 293 unchanged lines hidden (view full) ---

1928 return ret;
1929}
1930
1931
1932static void
1933nxt_router_apps_sort(nxt_task_t *task, nxt_router_t *router,
1934 nxt_router_temp_conf_t *tmcf)
1935{
1866static nxt_int_t
1867nxt_router_engines_create(nxt_task_t *task, nxt_router_t *router,
1868 nxt_router_temp_conf_t *tmcf, const nxt_event_interface_t *interface)
1869{
1870 nxt_int_t ret;
1871 nxt_uint_t n, threads;
1872 nxt_queue_link_t *qlk;
1873 nxt_router_engine_conf_t *recf;

--- 293 unchanged lines hidden (view full) ---

2167 return ret;
2168}
2169
2170
2171static void
2172nxt_router_apps_sort(nxt_task_t *task, nxt_router_t *router,
2173 nxt_router_temp_conf_t *tmcf)
2174{
1936 nxt_app_t *app;
1937 nxt_port_t *port;
2175 nxt_app_t *app;
1938
1939 nxt_queue_each(app, &router->apps, nxt_app_t, link) {
1940
2176
2177 nxt_queue_each(app, &router->apps, nxt_app_t, link) {
2178
1941 nxt_queue_remove(&app->link);
2179 nxt_router_app_quit(task, app);
1942
2180
1943 nxt_debug(task, "about to free app '%V' %p", &app->name, app);
1944
1945 app->live = 0;
1946
1947 do {
1948 port = nxt_router_app_get_idle_port(app);
1949 if (port == NULL) {
1950 break;
1951 }
1952
1953 nxt_debug(task, "port %p send quit", port);
1954
1955 nxt_port_socket_write(task, port, NXT_PORT_MSG_QUIT, -1, 0, 0,
1956 NULL);
1957
1958 nxt_port_use(task, port, -1);
1959 } while (1);
1960
1961 nxt_router_app_use(task, app, -1);
1962
1963 } nxt_queue_loop;
1964
1965 nxt_queue_add(&router->apps, &tmcf->previous);
1966 nxt_queue_add(&router->apps, &tmcf->apps);
1967}
1968
1969
1970static void

--- 478 unchanged lines hidden (view full) ---

2449 &nxt_response_fields_hash, r);
2450 if (nxt_slow_path(ret != NXT_OK)) {
2451 goto fail;
2452 }
2453
2454 if (nxt_buf_mem_used_size(&b->mem) == 0) {
2455 nxt_work_queue_add(&task->thread->engine->fast_work_queue,
2456 b->completion_handler, task, b, b->parent);
2181 } nxt_queue_loop;
2182
2183 nxt_queue_add(&router->apps, &tmcf->previous);
2184 nxt_queue_add(&router->apps, &tmcf->apps);
2185}
2186
2187
2188static void

--- 478 unchanged lines hidden (view full) ---

2667 &nxt_response_fields_hash, r);
2668 if (nxt_slow_path(ret != NXT_OK)) {
2669 goto fail;
2670 }
2671
2672 if (nxt_buf_mem_used_size(&b->mem) == 0) {
2673 nxt_work_queue_add(&task->thread->engine->fast_work_queue,
2674 b->completion_handler, task, b, b->parent);
2675
2457 } else {
2458 nxt_buf_chain_add(&r->out, b);
2459 }
2460
2461 r->state = &nxt_http_request_send_state;
2462
2463 nxt_http_request_header_send(task, r);
2464 }

--- 91 unchanged lines hidden (view full) ---

2556
2557 nxt_assert(app != NULL);
2558 nxt_assert(port != NULL);
2559
2560 port->app = app;
2561
2562 nxt_thread_mutex_lock(&app->mutex);
2563
2676 } else {
2677 nxt_buf_chain_add(&r->out, b);
2678 }
2679
2680 r->state = &nxt_http_request_send_state;
2681
2682 nxt_http_request_header_send(task, r);
2683 }

--- 91 unchanged lines hidden (view full) ---

2775
2776 nxt_assert(app != NULL);
2777 nxt_assert(port != NULL);
2778
2779 port->app = app;
2780
2781 nxt_thread_mutex_lock(&app->mutex);
2782
2564 nxt_assert(app->pending_workers != 0);
2783 nxt_assert(app->pending_processes != 0);
2565
2784
2566 app->pending_workers--;
2567 app->workers++;
2785 app->pending_processes--;
2786 app->processes++;
2568
2569 nxt_thread_mutex_unlock(&app->mutex);
2570
2787
2788 nxt_thread_mutex_unlock(&app->mutex);
2789
2571 nxt_debug(task, "app '%V' %p new port ready", &app->name, app);
2790 nxt_debug(task, "app '%V' new port ready, pid %PI, %d/%d",
2791 &app->name, port->pid, app->processes, app->pending_processes);
2572
2573 nxt_router_app_port_release(task, port, 0, 0);
2574}
2575
2576
2577static void
2578nxt_router_app_port_error(nxt_task_t *task, nxt_port_recv_msg_t *msg,
2579 void *data)

--- 5 unchanged lines hidden (view full) ---

2585 app = data;
2586
2587 nxt_assert(app != NULL);
2588
2589 nxt_debug(task, "app '%V' %p start error", &app->name, app);
2590
2591 nxt_thread_mutex_lock(&app->mutex);
2592
2792
2793 nxt_router_app_port_release(task, port, 0, 0);
2794}
2795
2796
2797static void
2798nxt_router_app_port_error(nxt_task_t *task, nxt_port_recv_msg_t *msg,
2799 void *data)

--- 5 unchanged lines hidden (view full) ---

2805 app = data;
2806
2807 nxt_assert(app != NULL);
2808
2809 nxt_debug(task, "app '%V' %p start error", &app->name, app);
2810
2811 nxt_thread_mutex_lock(&app->mutex);
2812
2593 nxt_assert(app->pending_workers != 0);
2813 nxt_assert(app->pending_processes != 0);
2594
2814
2595 app->pending_workers--;
2815 app->pending_processes--;
2596
2597 if (!nxt_queue_is_empty(&app->requests)) {
2598 lnk = nxt_queue_last(&app->requests);
2599 nxt_queue_remove(lnk);
2600 lnk->next = NULL;
2601
2602 ra = nxt_queue_link_data(lnk, nxt_req_app_link_t, link_app_requests);
2603
2604 } else {
2605 ra = NULL;
2606 }
2607
2608 nxt_thread_mutex_unlock(&app->mutex);
2609
2610 if (ra != NULL) {
2611 nxt_debug(task, "app '%V' %p abort next stream #%uD",
2612 &app->name, app, ra->stream);
2613
2816
2817 if (!nxt_queue_is_empty(&app->requests)) {
2818 lnk = nxt_queue_last(&app->requests);
2819 nxt_queue_remove(lnk);
2820 lnk->next = NULL;
2821
2822 ra = nxt_queue_link_data(lnk, nxt_req_app_link_t, link_app_requests);
2823
2824 } else {
2825 ra = NULL;
2826 }
2827
2828 nxt_thread_mutex_unlock(&app->mutex);
2829
2830 if (ra != NULL) {
2831 nxt_debug(task, "app '%V' %p abort next stream #%uD",
2832 &app->name, app, ra->stream);
2833
2614 nxt_router_ra_error(ra, 500, "Failed to start application worker");
2834 nxt_router_ra_error(ra, 500, "Failed to start application process");
2615 nxt_router_ra_use(task, ra, -1);
2616 }
2617
2618 nxt_router_app_use(task, app, -1);
2619}
2620
2621
2622void
2623nxt_router_app_use(nxt_task_t *task, nxt_app_t *app, int i)
2624{
2625 int c;
2626
2627 c = nxt_atomic_fetch_add(&app->use_count, i);
2628
2629 if (i < 0 && c == -i) {
2630
2631 nxt_assert(app->live == 0);
2835 nxt_router_ra_use(task, ra, -1);
2836 }
2837
2838 nxt_router_app_use(task, app, -1);
2839}
2840
2841
2842void
2843nxt_router_app_use(nxt_task_t *task, nxt_app_t *app, int i)
2844{
2845 int c;
2846
2847 c = nxt_atomic_fetch_add(&app->use_count, i);
2848
2849 if (i < 0 && c == -i) {
2850
2851 nxt_assert(app->live == 0);
2632 nxt_assert(app->workers == 0);
2633 nxt_assert(app->pending_workers == 0);
2852 nxt_assert(app->processes == 0);
2853 nxt_assert(app->idle_processes == 0);
2854 nxt_assert(app->pending_processes == 0);
2634 nxt_assert(nxt_queue_is_empty(&app->requests) != 0);
2635 nxt_assert(nxt_queue_is_empty(&app->ports) != 0);
2855 nxt_assert(nxt_queue_is_empty(&app->requests) != 0);
2856 nxt_assert(nxt_queue_is_empty(&app->ports) != 0);
2857 nxt_assert(nxt_queue_is_empty(&app->spare_ports) != 0);
2858 nxt_assert(nxt_queue_is_empty(&app->idle_ports) != 0);
2636
2637 nxt_thread_mutex_destroy(&app->mutex);
2638 nxt_free(app);
2639 }
2640}
2641
2642
2643nxt_inline nxt_bool_t

--- 17 unchanged lines hidden (view full) ---

2661
2662 lnk = nxt_queue_first(&app->ports);
2663 nxt_queue_remove(lnk);
2664
2665 port = nxt_queue_link_data(lnk, nxt_port_t, app_link);
2666
2667 port->app_pending_responses++;
2668
2859
2860 nxt_thread_mutex_destroy(&app->mutex);
2861 nxt_free(app);
2862 }
2863}
2864
2865
2866nxt_inline nxt_bool_t

--- 17 unchanged lines hidden (view full) ---

2884
2885 lnk = nxt_queue_first(&app->ports);
2886 nxt_queue_remove(lnk);
2887
2888 port = nxt_queue_link_data(lnk, nxt_port_t, app_link);
2889
2890 port->app_pending_responses++;
2891
2892 if (nxt_queue_chk_remove(&port->idle_link)) {
2893 app->idle_processes--;
2894
2895 if (port->idle_start == 0) {
2896 nxt_assert(app->idle_processes < app->spare_processes);
2897
2898 } else {
2899 nxt_assert(app->idle_processes >= app->spare_processes);
2900
2901 port->idle_start = 0;
2902 }
2903 }
2904
2669 if ((app->max_pending_responses == 0
2670 || port->app_pending_responses < app->max_pending_responses)
2671 && (app->max_requests == 0
2672 || port->app_responses + port->app_pending_responses
2673 < app->max_requests))
2674 {
2675 nxt_queue_insert_tail(&app->ports, lnk);
2676
2677 nxt_port_inc_use(port);
2678
2679 } else {
2680 lnk->next = NULL;
2681 }
2682
2683 return port;
2684}
2685
2686
2905 if ((app->max_pending_responses == 0
2906 || port->app_pending_responses < app->max_pending_responses)
2907 && (app->max_requests == 0
2908 || port->app_responses + port->app_pending_responses
2909 < app->max_requests))
2910 {
2911 nxt_queue_insert_tail(&app->ports, lnk);
2912
2913 nxt_port_inc_use(port);
2914
2915 } else {
2916 lnk->next = NULL;
2917 }
2918
2919 return port;
2920}
2921
2922
2687static nxt_port_t *
2688nxt_router_app_get_idle_port(nxt_app_t *app)
2923nxt_inline nxt_port_t *
2924nxt_router_app_get_port_for_quit(nxt_app_t *app)
2689{
2690 nxt_port_t *port;
2691
2692 port = NULL;
2693
2694 nxt_thread_mutex_lock(&app->mutex);
2695
2696 nxt_queue_each(port, &app->ports, nxt_port_t, app_link) {
2697
2698 if (port->app_pending_responses > 0) {
2699 port = NULL;
2700
2701 continue;
2702 }
2703
2925{
2926 nxt_port_t *port;
2927
2928 port = NULL;
2929
2930 nxt_thread_mutex_lock(&app->mutex);
2931
2932 nxt_queue_each(port, &app->ports, nxt_port_t, app_link) {
2933
2934 if (port->app_pending_responses > 0) {
2935 port = NULL;
2936
2937 continue;
2938 }
2939
2704 nxt_queue_remove(&port->app_link);
2705 port->app_link.next = NULL;
2940 /* Caller is responsible to decrease port use count. */
2941 nxt_queue_chk_remove(&port->app_link);
2706
2942
2943 if (nxt_queue_chk_remove(&port->idle_link)) {
2944 app->idle_processes--;
2945 }
2946
2947 /* Caller is responsible to decrease app use count. */
2948 port->app = NULL;
2949 app->processes--;
2950
2707 break;
2708
2709 } nxt_queue_loop;
2710
2711 nxt_thread_mutex_unlock(&app->mutex);
2712
2713 return port;
2714}
2715
2716
2717static void
2951 break;
2952
2953 } nxt_queue_loop;
2954
2955 nxt_thread_mutex_unlock(&app->mutex);
2956
2957 return port;
2958}
2959
2960
2961static void
2962nxt_router_app_quit(nxt_task_t *task, nxt_app_t *app)
2963{
2964 nxt_port_t *port;
2965
2966 nxt_queue_remove(&app->link);
2967
2968 app->live = 0;
2969
2970 for ( ;; ) {
2971 port = nxt_router_app_get_port_for_quit(app);
2972 if (port == NULL) {
2973 break;
2974 }
2975
2976 nxt_debug(task, "send QUIT to app '%V' pid %PI", &app->name, port->pid);
2977
2978 nxt_port_socket_write(task, port, NXT_PORT_MSG_QUIT, -1, 0, 0, NULL);
2979
2980 nxt_port_use(task, port, -1);
2981 nxt_router_app_use(task, app, -1);
2982 }
2983
2984 if (nxt_timer_is_in_tree(&app->idle_timer)) {
2985 nxt_assert(app->engine == task->thread->engine);
2986
2987 app->idle_timer.handler = nxt_router_app_release_handler;
2988 nxt_timer_add(app->engine, &app->idle_timer, 0);
2989
2990 } else {
2991 nxt_router_app_use(task, app, -1);
2992 }
2993}
2994
2995
2996static void
2718nxt_router_app_process_request(nxt_task_t *task, void *obj, void *data)
2719{
2720 nxt_app_t *app;
2721 nxt_req_app_link_t *ra;
2722
2723 app = obj;
2724 ra = data;
2725

--- 8 unchanged lines hidden (view full) ---

2734}
2735
2736
2737static void
2738nxt_router_app_port_release(nxt_task_t *task, nxt_port_t *port,
2739 uint32_t request_failed, uint32_t got_response)
2740{
2741 nxt_app_t *app;
2997nxt_router_app_process_request(nxt_task_t *task, void *obj, void *data)
2998{
2999 nxt_app_t *app;
3000 nxt_req_app_link_t *ra;
3001
3002 app = obj;
3003 ra = data;
3004

--- 8 unchanged lines hidden (view full) ---

3013}
3014
3015
3016static void
3017nxt_router_app_port_release(nxt_task_t *task, nxt_port_t *port,
3018 uint32_t request_failed, uint32_t got_response)
3019{
3020 nxt_app_t *app;
2742 nxt_bool_t send_quit, cancelled;
3021 nxt_bool_t port_unchained;
3022 nxt_bool_t send_quit, cancelled, adjust_idle_timer;
2743 nxt_queue_link_t *lnk;
2744 nxt_req_app_link_t *ra, *pending_ra, *re_ra;
2745 nxt_port_select_state_t state;
2746
2747 nxt_assert(port != NULL);
2748 nxt_assert(port->app != NULL);
2749
2750 ra = NULL;

--- 100 unchanged lines hidden (view full) ---

2851 }
2852 }
2853 }
2854
2855 re_ra = NULL;
2856
2857re_ra_cancelled:
2858
3023 nxt_queue_link_t *lnk;
3024 nxt_req_app_link_t *ra, *pending_ra, *re_ra;
3025 nxt_port_select_state_t state;
3026
3027 nxt_assert(port != NULL);
3028 nxt_assert(port->app != NULL);
3029
3030 ra = NULL;

--- 100 unchanged lines hidden (view full) ---

3131 }
3132 }
3133 }
3134
3135 re_ra = NULL;
3136
3137re_ra_cancelled:
3138
2859 send_quit = (app->live == 0 && port->app_pending_responses > 0)
3139 send_quit = (app->live == 0 && port->app_pending_responses == 0)
2860 || (app->max_requests > 0 && port->app_pending_responses == 0
2861 && port->app_responses >= app->max_requests);
2862
3140 || (app->max_requests > 0 && port->app_pending_responses == 0
3141 && port->app_responses >= app->max_requests);
3142
3143 if (send_quit) {
3144 port_unchained = nxt_queue_chk_remove(&port->app_link);
3145
3146 port->app = NULL;
3147 app->processes--;
3148
3149 } else {
3150 port_unchained = 0;
3151 }
3152
3153 adjust_idle_timer = 0;
3154
3155 if (!send_quit && port->app_pending_responses == 0) {
3156 nxt_assert(port->idle_link.next == NULL);
3157
3158 if (app->idle_processes == app->spare_processes
3159 && app->adjust_idle_work.data == NULL)
3160 {
3161 adjust_idle_timer = 1;
3162 app->adjust_idle_work.data = app;
3163 app->adjust_idle_work.next = NULL;
3164 }
3165
3166 if (app->idle_processes < app->spare_processes) {
3167 nxt_queue_insert_tail(&app->spare_ports, &port->idle_link);
3168
3169 } else {
3170 nxt_queue_insert_tail(&app->idle_ports, &port->idle_link);
3171
3172 port->idle_start = task->thread->engine->timers.now;
3173 }
3174
3175 app->idle_processes++;
3176 }
3177
2863 nxt_thread_mutex_unlock(&app->mutex);
2864
3178 nxt_thread_mutex_unlock(&app->mutex);
3179
3180 if (adjust_idle_timer) {
3181 nxt_router_app_use(task, app, 1);
3182 nxt_event_engine_post(app->engine, &app->adjust_idle_work);
3183 }
3184
2865 if (pending_ra != NULL) {
2866 nxt_router_ra_use(task, pending_ra, -1);
2867 }
2868
2869 if (re_ra != NULL) {
2870 if (nxt_router_port_post_select(task, &state) == NXT_OK) {
2871 nxt_work_queue_add(&task->thread->engine->fast_work_queue,
2872 nxt_router_app_process_request,

--- 15 unchanged lines hidden (view full) ---

2888 if (port->pair[1] == -1) {
2889 nxt_debug(task, "app '%V' %p port %p already closed (pid %PI dead?)",
2890 &app->name, app, port, port->pid);
2891
2892 goto adjust_use;
2893 }
2894
2895 if (send_quit) {
3185 if (pending_ra != NULL) {
3186 nxt_router_ra_use(task, pending_ra, -1);
3187 }
3188
3189 if (re_ra != NULL) {
3190 if (nxt_router_port_post_select(task, &state) == NXT_OK) {
3191 nxt_work_queue_add(&task->thread->engine->fast_work_queue,
3192 nxt_router_app_process_request,

--- 15 unchanged lines hidden (view full) ---

3208 if (port->pair[1] == -1) {
3209 nxt_debug(task, "app '%V' %p port %p already closed (pid %PI dead?)",
3210 &app->name, app, port, port->pid);
3211
3212 goto adjust_use;
3213 }
3214
3215 if (send_quit) {
2896 nxt_debug(task, "app '%V' %p is not alive, send QUIT to port",
3216 nxt_debug(task, "app '%V' %p send QUIT to port",
2897 &app->name, app);
2898
2899 nxt_port_socket_write(task, port, NXT_PORT_MSG_QUIT,
2900 -1, 0, 0, NULL);
2901
3217 &app->name, app);
3218
3219 nxt_port_socket_write(task, port, NXT_PORT_MSG_QUIT,
3220 -1, 0, 0, NULL);
3221
3222 if (port_unchained) {
3223 nxt_port_use(task, port, -1);
3224 }
3225
3226 nxt_router_app_use(task, app, -1);
3227
2902 goto adjust_use;
2903 }
2904
2905 nxt_debug(task, "app '%V' %p requests queue is empty, keep the port",
2906 &app->name, app);
2907
2908adjust_use:
2909
2910 if (request_failed > 0 || got_response > 0) {
2911 nxt_port_use(task, port, -1);
2912 }
2913}
2914
2915
2916void
2917nxt_router_app_port_close(nxt_task_t *task, nxt_port_t *port)
2918{
3228 goto adjust_use;
3229 }
3230
3231 nxt_debug(task, "app '%V' %p requests queue is empty, keep the port",
3232 &app->name, app);
3233
3234adjust_use:
3235
3236 if (request_failed > 0 || got_response > 0) {
3237 nxt_port_use(task, port, -1);
3238 }
3239}
3240
3241
3242void
3243nxt_router_app_port_close(nxt_task_t *task, nxt_port_t *port)
3244{
2919 nxt_app_t *app;
2920 nxt_bool_t unchain, start_worker;
3245 nxt_app_t *app;
3246 nxt_bool_t unchain, start_process;
3247 nxt_port_t *idle_port;
3248 nxt_queue_link_t *idle_lnk;
2921
2922 app = port->app;
2923
2924 nxt_assert(app != NULL);
2925
2926 nxt_thread_mutex_lock(&app->mutex);
2927
3249
3250 app = port->app;
3251
3252 nxt_assert(app != NULL);
3253
3254 nxt_thread_mutex_lock(&app->mutex);
3255
2928 unchain = port->app_link.next != NULL;
3256 unchain = nxt_queue_chk_remove(&port->app_link);
2929
3257
2930 if (unchain) {
2931 nxt_queue_remove(&port->app_link);
2932 port->app_link.next = NULL;
3258 if (nxt_queue_chk_remove(&port->idle_link)) {
3259 app->idle_processes--;
3260
3261 if (port->idle_start == 0
3262 && app->idle_processes >= app->spare_processes)
3263 {
3264 nxt_assert(!nxt_queue_is_empty(&app->idle_ports));
3265
3266 idle_lnk = nxt_queue_last(&app->idle_ports);
3267 idle_port = nxt_queue_link_data(idle_lnk, nxt_port_t, idle_link);
3268 nxt_queue_remove(idle_lnk);
3269
3270 nxt_queue_insert_tail(&app->spare_ports, idle_lnk);
3271
3272 idle_port->idle_start = 0;
3273 }
2933 }
2934
3274 }
3275
2935 app->workers--;
3276 app->processes--;
2936
3277
2937 start_worker = app->live != 0
2938 && nxt_queue_is_empty(&app->requests) == 0
2939 && app->workers + app->pending_workers < app->max_workers;
3278 start_process = app->live != 0
3279 && !task->thread->engine->shutdown
3280 && nxt_router_app_can_start(app)
3281 && (!nxt_queue_is_empty(&app->requests)
3282 || nxt_router_app_need_start(app));
2940
3283
2941 if (start_worker) {
2942 app->pending_workers++;
3284 if (start_process) {
3285 app->pending_processes++;
2943 }
2944
2945 nxt_thread_mutex_unlock(&app->mutex);
2946
3286 }
3287
3288 nxt_thread_mutex_unlock(&app->mutex);
3289
2947 nxt_debug(task, "app '%V' %p port %p close", &app->name, app, port);
3290 nxt_debug(task, "app '%V' pid %PI closed", &app->name, port->pid);
2948
2949 if (unchain) {
2950 nxt_port_use(task, port, -1);
2951 }
2952
3291
3292 if (unchain) {
3293 nxt_port_use(task, port, -1);
3294 }
3295
2953 if (start_worker) {
2954 nxt_router_start_worker(task, app);
3296 if (start_process) {
3297 nxt_router_start_app_process(task, app);
2955 }
2956}
2957
2958
2959static void
3298 }
3299}
3300
3301
3302static void
3303nxt_router_adjust_idle_timer(nxt_task_t *task, void *obj, void *data)
3304{
3305 nxt_app_t *app;
3306 nxt_bool_t queued;
3307 nxt_port_t *port;
3308 nxt_msec_t timeout, threshold;
3309 nxt_queue_link_t *lnk;
3310 nxt_event_engine_t *engine;
3311
3312 app = obj;
3313 queued = (data == app);
3314
3315 nxt_debug(task, "nxt_router_adjust_idle_timer: app \"%V\", queued %b",
3316 &app->name, queued);
3317
3318 engine = task->thread->engine;
3319
3320 nxt_assert(app->engine == engine);
3321
3322 threshold = engine->timers.now + app->idle_timer.precision;
3323 timeout = 0;
3324
3325 nxt_thread_mutex_lock(&app->mutex);
3326
3327 if (queued) {
3328 app->adjust_idle_work.data = NULL;
3329 }
3330
3331 while (app->idle_processes > app->spare_processes) {
3332
3333 nxt_assert(nxt_queue_is_empty(&app->idle_ports) == 0);
3334
3335 lnk = nxt_queue_first(&app->idle_ports);
3336 port = nxt_queue_link_data(lnk, nxt_port_t, idle_link);
3337
3338 timeout = port->idle_start + app->idle_timeout;
3339
3340 if (timeout > threshold) {
3341 break;
3342 }
3343
3344 nxt_queue_remove(lnk);
3345 lnk->next = NULL;
3346
3347 nxt_queue_chk_remove(&port->app_link);
3348
3349 app->idle_processes--;
3350 app->processes--;
3351 port->app = NULL;
3352
3353 nxt_thread_mutex_unlock(&app->mutex);
3354
3355 nxt_debug(task, "app '%V' send QUIT to idle port %PI",
3356 &app->name, port->pid);
3357
3358 nxt_port_socket_write(task, port, NXT_PORT_MSG_QUIT, -1, 0, 0, NULL);
3359
3360 nxt_port_use(task, port, -1);
3361 nxt_router_app_use(task, app, -1);
3362
3363 nxt_thread_mutex_lock(&app->mutex);
3364 }
3365
3366 nxt_thread_mutex_unlock(&app->mutex);
3367
3368 if (timeout > threshold) {
3369 nxt_timer_add(engine, &app->idle_timer, timeout - threshold);
3370
3371 } else {
3372 nxt_timer_disable(engine, &app->idle_timer);
3373 }
3374
3375 if (queued) {
3376 nxt_router_app_use(task, app, -1);
3377 }
3378}
3379
3380
3381static void
3382nxt_router_app_idle_timeout(nxt_task_t *task, void *obj, void *data)
3383{
3384 nxt_app_t *app;
3385 nxt_timer_t *timer;
3386
3387 timer = obj;
3388 app = nxt_container_of(timer, nxt_app_t, idle_timer);
3389
3390 nxt_router_adjust_idle_timer(task, app, NULL);
3391}
3392
3393
3394static void
3395nxt_router_app_release_handler(nxt_task_t *task, void *obj, void *data)
3396{
3397 nxt_app_t *app;
3398 nxt_timer_t *timer;
3399
3400 timer = obj;
3401 app = nxt_container_of(timer, nxt_app_t, idle_timer);
3402
3403 nxt_router_app_use(task, app, -1);
3404}
3405
3406
3407static void
2960nxt_router_port_select(nxt_task_t *task, nxt_port_select_state_t *state)
2961{
2962 nxt_app_t *app;
3408nxt_router_port_select(nxt_task_t *task, nxt_port_select_state_t *state)
3409{
3410 nxt_app_t *app;
3411 nxt_bool_t can_start_process;
2963 nxt_req_app_link_t *ra;
2964
2965 ra = state->ra;
2966 app = state->app;
2967
2968 state->failed_port_use_delta = 0;
2969
2970 if (nxt_queue_chk_remove(&ra->link_app_requests))

--- 20 unchanged lines hidden (view full) ---

2991
2992 if (nxt_queue_chk_remove(&state->failed_port->app_link)) {
2993 state->failed_port_use_delta--;
2994 }
2995
2996 ra->app_port = NULL;
2997 }
2998
3412 nxt_req_app_link_t *ra;
3413
3414 ra = state->ra;
3415 app = state->app;
3416
3417 state->failed_port_use_delta = 0;
3418
3419 if (nxt_queue_chk_remove(&ra->link_app_requests))

--- 20 unchanged lines hidden (view full) ---

3440
3441 if (nxt_queue_chk_remove(&state->failed_port->app_link)) {
3442 state->failed_port_use_delta--;
3443 }
3444
3445 ra->app_port = NULL;
3446 }
3447
2999 state->can_start_worker = (app->workers + app->pending_workers)
3000 < app->max_workers;
3448 can_start_process = nxt_router_app_can_start(app);
3449
3001 state->port = NULL;
3450 state->port = NULL;
3451 state->start_process = 0;
3002
3003 if (nxt_queue_is_empty(&app->ports)
3452
3453 if (nxt_queue_is_empty(&app->ports)
3004 || (state->can_start_worker && nxt_router_app_first_port_busy(app)) )
3454 || (can_start_process && nxt_router_app_first_port_busy(app)) )
3005 {
3006 ra = nxt_router_ra_create(task, ra);
3007
3008 if (nxt_slow_path(ra == NULL)) {
3009 goto fail;
3010 }
3011
3012 if (nxt_slow_path(state->failed_port != NULL)) {
3013 nxt_queue_insert_head(&app->requests, &ra->link_app_requests);
3014
3015 } else {
3016 nxt_queue_insert_tail(&app->requests, &ra->link_app_requests);
3017 }
3018
3019 nxt_router_ra_inc_use(ra);
3020
3021 nxt_debug(task, "ra stream #%uD enqueue to app->requests", ra->stream);
3022
3455 {
3456 ra = nxt_router_ra_create(task, ra);
3457
3458 if (nxt_slow_path(ra == NULL)) {
3459 goto fail;
3460 }
3461
3462 if (nxt_slow_path(state->failed_port != NULL)) {
3463 nxt_queue_insert_head(&app->requests, &ra->link_app_requests);
3464
3465 } else {
3466 nxt_queue_insert_tail(&app->requests, &ra->link_app_requests);
3467 }
3468
3469 nxt_router_ra_inc_use(ra);
3470
3471 nxt_debug(task, "ra stream #%uD enqueue to app->requests", ra->stream);
3472
3023 if (state->can_start_worker) {
3024 app->pending_workers++;
3473 if (can_start_process) {
3474 app->pending_processes++;
3475 state->start_process = 1;
3025 }
3026
3027 } else {
3028 state->port = nxt_router_pop_first_port(app);
3029
3030 if (state->port->app_pending_responses > 1) {
3031 ra = nxt_router_ra_create(task, ra);
3032
3033 if (nxt_slow_path(ra == NULL)) {
3034 goto fail;
3035 }
3036
3037 ra->app_port = state->port;
3038
3039 nxt_router_ra_pending(task, app, ra);
3040 }
3476 }
3477
3478 } else {
3479 state->port = nxt_router_pop_first_port(app);
3480
3481 if (state->port->app_pending_responses > 1) {
3482 ra = nxt_router_ra_create(task, ra);
3483
3484 if (nxt_slow_path(ra == NULL)) {
3485 goto fail;
3486 }
3487
3488 ra->app_port = state->port;
3489
3490 nxt_router_ra_pending(task, app, ra);
3491 }
3492
3493 if (can_start_process && nxt_router_app_need_start(app)) {
3494 app->pending_processes++;
3495 state->start_process = 1;
3496 }
3041 }
3042
3043fail:
3044
3045 state->shared_ra = ra;
3046}
3047
3048

--- 23 unchanged lines hidden (view full) ---

3072 return NXT_ERROR;
3073 }
3074
3075 if (state->port != NULL) {
3076 nxt_debug(task, "already have port for app '%V' %p ", &app->name, app);
3077
3078 ra->app_port = state->port;
3079
3497 }
3498
3499fail:
3500
3501 state->shared_ra = ra;
3502}
3503
3504

--- 23 unchanged lines hidden (view full) ---

3528 return NXT_ERROR;
3529 }
3530
3531 if (state->port != NULL) {
3532 nxt_debug(task, "already have port for app '%V' %p ", &app->name, app);
3533
3534 ra->app_port = state->port;
3535
3536 if (state->start_process) {
3537 nxt_router_start_app_process(task, app);
3538 }
3539
3080 return NXT_OK;
3081 }
3082
3540 return NXT_OK;
3541 }
3542
3083 if (!state->can_start_worker) {
3084 nxt_debug(task, "app '%V' %p too many running or pending workers",
3543 if (!state->start_process) {
3544 nxt_debug(task, "app '%V' %p too many running or pending processes",
3085 &app->name, app);
3086
3087 return NXT_AGAIN;
3088 }
3089
3545 &app->name, app);
3546
3547 return NXT_AGAIN;
3548 }
3549
3090 res = nxt_router_start_worker(task, app);
3550 res = nxt_router_start_app_process(task, app);
3091
3092 if (nxt_slow_path(res != NXT_OK)) {
3551
3552 if (nxt_slow_path(res != NXT_OK)) {
3093 nxt_router_ra_error(ra, 500, "Failed to start worker");
3553 nxt_router_ra_error(ra, 500, "Failed to start app process");
3094 nxt_router_ra_use(task, ra, -1);
3095
3096 return NXT_ERROR;
3097 }
3098
3099 return NXT_AGAIN;
3100}
3101

--- 120 unchanged lines hidden (view full) ---

3222 res = port->app->prepare_msg(task, &ap->r, &wmsg);
3223
3224 if (nxt_slow_path(res != NXT_OK)) {
3225 nxt_router_ra_error(ra, 500,
3226 "Failed to prepare message for application");
3227 goto release_port;
3228 }
3229
3554 nxt_router_ra_use(task, ra, -1);
3555
3556 return NXT_ERROR;
3557 }
3558
3559 return NXT_AGAIN;
3560}
3561

--- 120 unchanged lines hidden (view full) ---

3682 res = port->app->prepare_msg(task, &ap->r, &wmsg);
3683
3684 if (nxt_slow_path(res != NXT_OK)) {
3685 nxt_router_ra_error(ra, 500,
3686 "Failed to prepare message for application");
3687 goto release_port;
3688 }
3689
3230 nxt_debug(task, "about to send %O bytes buffer to worker port %d",
3690 nxt_debug(task, "about to send %O bytes buffer to app process port %d",
3231 nxt_buf_used_size(wmsg.write),
3232 wmsg.port->socket.fd);
3233
3234 request_failed = 0;
3235
3236 ra->msg_info.buf = wmsg.write;
3237 ra->msg_info.completion_handler = wmsg.write->completion_handler;
3238

--- 354 unchanged lines hidden ---
3691 nxt_buf_used_size(wmsg.write),
3692 wmsg.port->socket.fd);
3693
3694 request_failed = 0;
3695
3696 ra->msg_info.buf = wmsg.write;
3697 ra->msg_info.completion_handler = wmsg.write->completion_handler;
3698

--- 354 unchanged lines hidden ---