nxt_router.c (342:82c2825a617a) nxt_router.c (343:9fa845db60fb)
1
2/*
3 * Copyright (C) Igor Sysoev
4 * Copyright (C) Valentin V. Bartenev
5 * Copyright (C) NGINX, Inc.
6 */
7
8#include <nxt_router.h>

--- 10 unchanged lines hidden (view full) ---

19
20
21typedef struct {
22 nxt_str_t application;
23} nxt_router_listener_conf_t;
24
25
26typedef struct nxt_req_app_link_s nxt_req_app_link_t;
1
2/*
3 * Copyright (C) Igor Sysoev
4 * Copyright (C) Valentin V. Bartenev
5 * Copyright (C) NGINX, Inc.
6 */
7
8#include <nxt_router.h>

--- 10 unchanged lines hidden (view full) ---

19
20
21typedef struct {
22 nxt_str_t application;
23} nxt_router_listener_conf_t;
24
25
26typedef struct nxt_req_app_link_s nxt_req_app_link_t;
27typedef struct nxt_start_worker_s nxt_start_worker_t;
28
27
29struct nxt_start_worker_s {
30 nxt_app_t *app;
31 nxt_req_app_link_t *ra;
32
28
33 nxt_work_t work;
34};
35
36
37typedef struct {
38 uint32_t stream;
39 nxt_conn_t *conn;
29typedef struct {
30 uint32_t stream;
31 nxt_conn_t *conn;
32 nxt_app_t *app;
40 nxt_port_t *app_port;
41 nxt_req_app_link_t *ra;
42
43 nxt_queue_link_t link; /* for nxt_conn_t.requests */
44} nxt_req_conn_link_t;
45
46
47struct nxt_req_app_link_s {

--- 19 unchanged lines hidden (view full) ---

67
68typedef struct {
69 nxt_mp_t *mem_pool;
70 nxt_port_recv_msg_t msg;
71 nxt_work_t work;
72} nxt_remove_pid_msg_t;
73
74
33 nxt_port_t *app_port;
34 nxt_req_app_link_t *ra;
35
36 nxt_queue_link_t link; /* for nxt_conn_t.requests */
37} nxt_req_conn_link_t;
38
39
40struct nxt_req_app_link_s {

--- 19 unchanged lines hidden (view full) ---

60
61typedef struct {
62 nxt_mp_t *mem_pool;
63 nxt_port_recv_msg_t msg;
64 nxt_work_t work;
65} nxt_remove_pid_msg_t;
66
67
68static nxt_int_t nxt_router_start_worker(nxt_task_t *task, nxt_app_t *app);
69
75static void nxt_router_worker_remove_pid_handler(nxt_task_t *task, void *obj,
76 void *data);
77static void nxt_router_worker_remove_pid_done(nxt_task_t *task, void *obj,
78 void *data);
79
80static nxt_router_temp_conf_t *nxt_router_temp_conf(nxt_task_t *task);
81static void nxt_router_conf_apply(nxt_task_t *task, void *obj, void *data);
82static void nxt_router_conf_ready(nxt_task_t *task,

--- 36 unchanged lines hidden (view full) ---

119 nxt_router_engine_conf_t *recf);
120static nxt_int_t nxt_router_engine_joints_delete(nxt_router_temp_conf_t *tmcf,
121 nxt_router_engine_conf_t *recf, nxt_queue_t *sockets);
122
123static nxt_int_t nxt_router_threads_create(nxt_task_t *task, nxt_runtime_t *rt,
124 nxt_router_temp_conf_t *tmcf);
125static nxt_int_t nxt_router_thread_create(nxt_task_t *task, nxt_runtime_t *rt,
126 nxt_event_engine_t *engine);
70static void nxt_router_worker_remove_pid_handler(nxt_task_t *task, void *obj,
71 void *data);
72static void nxt_router_worker_remove_pid_done(nxt_task_t *task, void *obj,
73 void *data);
74
75static nxt_router_temp_conf_t *nxt_router_temp_conf(nxt_task_t *task);
76static void nxt_router_conf_apply(nxt_task_t *task, void *obj, void *data);
77static void nxt_router_conf_ready(nxt_task_t *task,

--- 36 unchanged lines hidden (view full) ---

114 nxt_router_engine_conf_t *recf);
115static nxt_int_t nxt_router_engine_joints_delete(nxt_router_temp_conf_t *tmcf,
116 nxt_router_engine_conf_t *recf, nxt_queue_t *sockets);
117
118static nxt_int_t nxt_router_threads_create(nxt_task_t *task, nxt_runtime_t *rt,
119 nxt_router_temp_conf_t *tmcf);
120static nxt_int_t nxt_router_thread_create(nxt_task_t *task, nxt_runtime_t *rt,
121 nxt_event_engine_t *engine);
127static void nxt_router_apps_sort(nxt_router_t *router,
122static void nxt_router_apps_sort(nxt_task_t *task, nxt_router_t *router,
128 nxt_router_temp_conf_t *tmcf);
129
130static void nxt_router_engines_post(nxt_router_t *router,
131 nxt_router_temp_conf_t *tmcf);
132static void nxt_router_engine_post(nxt_event_engine_t *engine,
133 nxt_work_t *jobs);
134
135static void nxt_router_thread_start(void *data);

--- 9 unchanged lines hidden (view full) ---

145 void *data);
146static void nxt_router_listen_socket_release(nxt_task_t *task,
147 nxt_socket_conf_joint_t *joint);
148static void nxt_router_thread_exit_handler(nxt_task_t *task, void *obj,
149 void *data);
150static void nxt_router_conf_release(nxt_task_t *task,
151 nxt_socket_conf_joint_t *joint);
152
123 nxt_router_temp_conf_t *tmcf);
124
125static void nxt_router_engines_post(nxt_router_t *router,
126 nxt_router_temp_conf_t *tmcf);
127static void nxt_router_engine_post(nxt_event_engine_t *engine,
128 nxt_work_t *jobs);
129
130static void nxt_router_thread_start(void *data);

--- 9 unchanged lines hidden (view full) ---

140 void *data);
141static void nxt_router_listen_socket_release(nxt_task_t *task,
142 nxt_socket_conf_joint_t *joint);
143static void nxt_router_thread_exit_handler(nxt_task_t *task, void *obj,
144 void *data);
145static void nxt_router_conf_release(nxt_task_t *task,
146 nxt_socket_conf_joint_t *joint);
147
153static void nxt_router_send_sw_request(nxt_task_t *task, void *obj,
154 void *data);
155static nxt_bool_t nxt_router_app_free(nxt_task_t *task, nxt_app_t *app);
156static nxt_port_t * nxt_router_app_get_port(nxt_app_t *app, uint32_t stream);
157static void nxt_router_app_release_port(nxt_task_t *task, void *obj,
158 void *data);
148static void nxt_router_app_port_ready(nxt_task_t *task,
149 nxt_port_recv_msg_t *msg, void *data);
150static void nxt_router_app_port_error(nxt_task_t *task,
151 nxt_port_recv_msg_t *msg, void *data);
159
152
153static nxt_port_t * nxt_router_app_get_idle_port(nxt_app_t *app);
154static void nxt_router_app_port_release(nxt_task_t *task, nxt_port_t *port,
155 uint32_t request_failed, uint32_t got_response);
156
160static void nxt_router_conn_init(nxt_task_t *task, void *obj, void *data);
161static void nxt_router_conn_http_header_parse(nxt_task_t *task, void *obj,
162 void *data);
163static void nxt_router_conn_http_body_read(nxt_task_t *task, void *obj,
164 void *data);
165static void nxt_router_process_http_request(nxt_task_t *task,
166 nxt_conn_t *c, nxt_app_parse_ctx_t *ap);
167static void nxt_router_process_http_request_mp(nxt_task_t *task,
157static void nxt_router_conn_init(nxt_task_t *task, void *obj, void *data);
158static void nxt_router_conn_http_header_parse(nxt_task_t *task, void *obj,
159 void *data);
160static void nxt_router_conn_http_body_read(nxt_task_t *task, void *obj,
161 void *data);
162static void nxt_router_process_http_request(nxt_task_t *task,
163 nxt_conn_t *c, nxt_app_parse_ctx_t *ap);
164static void nxt_router_process_http_request_mp(nxt_task_t *task,
168 nxt_req_app_link_t *ra, nxt_port_t *port);
165 nxt_req_app_link_t *ra);
169static nxt_int_t nxt_python_prepare_msg(nxt_task_t *task, nxt_app_request_t *r,
170 nxt_app_wmsg_t *wmsg);
171static nxt_int_t nxt_php_prepare_msg(nxt_task_t *task, nxt_app_request_t *r,
172 nxt_app_wmsg_t *wmsg);
173static nxt_int_t nxt_go_prepare_msg(nxt_task_t *task, nxt_app_request_t *r,
174 nxt_app_wmsg_t *wmsg);
175static void nxt_router_conn_ready(nxt_task_t *task, void *obj, void *data);
176static void nxt_router_conn_close(nxt_task_t *task, void *obj, void *data);

--- 40 unchanged lines hidden (view full) ---

217 nxt_queue_init(&router->apps);
218
219 nxt_router = router;
220
221 return NXT_OK;
222}
223
224
166static nxt_int_t nxt_python_prepare_msg(nxt_task_t *task, nxt_app_request_t *r,
167 nxt_app_wmsg_t *wmsg);
168static nxt_int_t nxt_php_prepare_msg(nxt_task_t *task, nxt_app_request_t *r,
169 nxt_app_wmsg_t *wmsg);
170static nxt_int_t nxt_go_prepare_msg(nxt_task_t *task, nxt_app_request_t *r,
171 nxt_app_wmsg_t *wmsg);
172static void nxt_router_conn_ready(nxt_task_t *task, void *obj, void *data);
173static void nxt_router_conn_close(nxt_task_t *task, void *obj, void *data);

--- 40 unchanged lines hidden (view full) ---

214 nxt_queue_init(&router->apps);
215
216 nxt_router = router;
217
218 return NXT_OK;
219}
220
221
225static nxt_start_worker_t *
226nxt_router_sw_create(nxt_task_t *task, nxt_app_t *app, nxt_req_app_link_t *ra)
222static void
223nxt_router_start_worker_handler(nxt_task_t *task, nxt_port_t *port, void *data)
227{
224{
228 nxt_port_t *main_port;
229 nxt_runtime_t *rt;
230 nxt_start_worker_t *sw;
225 size_t size;
226 uint32_t stream;
227 nxt_app_t *app;
228 nxt_buf_t *b;
229 nxt_port_t *main_port;
230 nxt_runtime_t *rt;
231
231
232 sw = nxt_zalloc(sizeof(nxt_start_worker_t));
232 app = data;
233
233
234 if (nxt_slow_path(sw == NULL)) {
235 return NULL;
236 }
234 rt = task->thread->runtime;
235 main_port = rt->port_by_type[NXT_PROCESS_MAIN];
237
236
238 sw->app = app;
239 sw->ra = ra;
237 nxt_debug(task, "app '%V' %p start worker", &app->name, app);
240
238
241 nxt_debug(task, "sw %p create, stream #%uD, app '%V' %p", sw,
242 ra->stream, &app->name, app);
239 size = app->name.length + 1 + app->conf.length;
243
240
244 rt = task->thread->runtime;
245 main_port = rt->port_by_type[NXT_PROCESS_MAIN];
241 b = nxt_buf_mem_ts_alloc(task, task->thread->engine->mem_pool, size);
246
242
247 sw->work.handler = nxt_router_send_sw_request;
248 sw->work.task = &main_port->engine->task;
249 sw->work.obj = sw;
250 sw->work.data = task->thread->engine;
251 sw->work.next = NULL;
243 if (nxt_slow_path(b == NULL)) {
244 goto failed;
245 }
252
246
253 if (task->thread->engine != main_port->engine) {
254 nxt_debug(task, "sw %p post send to main engine %p", sw,
255 main_port->engine);
247 nxt_buf_cpystr(b, &app->name);
248 *b->mem.free++ = '\0';
249 nxt_buf_cpystr(b, &app->conf);
256
250
257 nxt_event_engine_post(main_port->engine, &sw->work);
251 stream = nxt_port_rpc_register_handler(task, port,
252 nxt_router_app_port_ready,
253 nxt_router_app_port_error,
254 -1, app);
258
255
259 } else {
260 nxt_router_send_sw_request(task, sw, sw->work.data);
256 if (nxt_slow_path(stream == 0)) {
257 nxt_mp_release(b->data, b);
258
259 goto failed;
261 }
262
260 }
261
263 return sw;
264}
262 nxt_port_socket_write(task, main_port, NXT_PORT_MSG_START_WORKER, -1,
263 stream, port->id, b);
265
264
265 return;
266
266
267nxt_inline void
268nxt_router_sw_release(nxt_task_t *task, nxt_start_worker_t *sw)
269{
270 nxt_debug(task, "sw %p release", sw);
267failed:
271
268
272 nxt_free(sw);
269 nxt_thread_mutex_lock(&app->mutex);
270
271 app->pending_workers--;
272
273 nxt_thread_mutex_unlock(&app->mutex);
274
275 nxt_router_app_use(task, app, -1);
273}
274
275
276}
277
278
276nxt_inline void
277nxt_router_rc_unlink(nxt_req_conn_link_t *rc)
279static nxt_int_t
280nxt_router_start_worker(nxt_task_t *task, nxt_app_t *app)
278{
281{
279 nxt_queue_remove(&rc->link);
282 nxt_int_t res;
283 nxt_port_t *router_port;
284 nxt_runtime_t *rt;
280
285
281 if (rc->ra != NULL) {
282 rc->ra->rc = NULL;
283 rc->ra = NULL;
286 rt = task->thread->runtime;
287 router_port = rt->port_by_type[NXT_PROCESS_ROUTER];
288
289 nxt_router_app_use(task, app, 1);
290
291 res = nxt_port_post(task, router_port, nxt_router_start_worker_handler,
292 app);
293
294 if (res == NXT_OK) {
295 return res;
284 }
285
296 }
297
286 rc->conn = NULL;
298 nxt_thread_mutex_lock(&app->mutex);
299
300 app->pending_workers--;
301
302 nxt_thread_mutex_unlock(&app->mutex);
303
304 nxt_router_app_use(task, app, -1);
305
306 return NXT_ERROR;
287}
288
289
290static nxt_req_app_link_t *
291nxt_router_ra_create(nxt_task_t *task, nxt_req_conn_link_t *rc)
292{
293 nxt_mp_t *mp;
294 nxt_event_engine_t *engine;

--- 27 unchanged lines hidden (view full) ---

322
323 return ra;
324}
325
326
327static void
328nxt_router_ra_release(nxt_task_t *task, void *obj, void *data)
329{
307}
308
309
310static nxt_req_app_link_t *
311nxt_router_ra_create(nxt_task_t *task, nxt_req_conn_link_t *rc)
312{
313 nxt_mp_t *mp;
314 nxt_event_engine_t *engine;

--- 27 unchanged lines hidden (view full) ---

342
343 return ra;
344}
345
346
347static void
348nxt_router_ra_release(nxt_task_t *task, void *obj, void *data)
349{
330 nxt_port_t *app_port;
331 nxt_req_app_link_t *ra;
332 nxt_event_engine_t *engine;
350 nxt_req_app_link_t *ra;
351 nxt_event_engine_t *engine;
352 nxt_req_conn_link_t *rc;
333
334 ra = obj;
335 engine = data;
336
353
354 ra = obj;
355 engine = data;
356
337 if (ra->app_port != NULL) {
338
339 app_port = ra->app_port;
340 ra->app_port = NULL;
341
342 if (task->thread->engine != engine) {
343 ra->app_pid = app_port->pid;
357 if (task->thread->engine != engine) {
358 if (ra->app_port != NULL) {
359 ra->app_pid = ra->app_port->pid;
344 }
345
360 }
361
346 nxt_router_app_release_port(task, app_port, app_port->app);
347
348#if 0
349 /* Uncomment to hold app port until complete response received. */
350 if (ra->rc != NULL) {
351 ra->rc->app_port = ra->app_port;
352
353 } else {
354 nxt_router_app_release_port(task, ra->app_port, ra->app_port->app);
355 }
356#endif
357 }
358
359 if (task->thread->engine != engine) {
360 ra->work.handler = nxt_router_ra_release;
361 ra->work.task = &engine->task;
362 ra->work.next = NULL;
363
364 nxt_debug(task, "ra stream #%uD post release to %p",
365 ra->stream, engine);
366
367 nxt_event_engine_post(engine, &ra->work);
368
369 return;
370 }
371
362 ra->work.handler = nxt_router_ra_release;
363 ra->work.task = &engine->task;
364 ra->work.next = NULL;
365
366 nxt_debug(task, "ra stream #%uD post release to %p",
367 ra->stream, engine);
368
369 nxt_event_engine_post(engine, &ra->work);
370
371 return;
372 }
373
372 if (ra->rc != NULL && ra->app_pid != -1) {
373 nxt_port_rpc_ex_set_peer(task, engine->port, ra->rc, ra->app_pid);
374 nxt_debug(task, "ra stream #%uD release", ra->stream);
375
376 rc = ra->rc;
377
378 if (rc != NULL) {
379 if (ra->app_pid != -1) {
380 nxt_port_rpc_ex_set_peer(task, engine->port, rc, ra->app_pid);
381 }
382
383 rc->app_port = ra->app_port;
384
385 ra->app_port = NULL;
386 rc->ra = NULL;
387 ra->rc = NULL;
374 }
375
388 }
389
376 nxt_debug(task, "ra stream #%uD release", ra->stream);
390 if (ra->app_port != NULL) {
391 nxt_router_app_port_release(task, ra->app_port, 0, 1);
377
392
393 ra->app_port = NULL;
394 }
395
378 nxt_mp_release(ra->mem_pool, ra);
379}
380
381
382static void
383nxt_router_ra_abort(nxt_task_t *task, void *obj, void *data)
384{
396 nxt_mp_release(ra->mem_pool, ra);
397}
398
399
400static void
401nxt_router_ra_abort(nxt_task_t *task, void *obj, void *data)
402{
385 nxt_conn_t *c;
386 nxt_req_app_link_t *ra;
387 nxt_event_engine_t *engine;
403 nxt_conn_t *c;
404 nxt_req_app_link_t *ra;
405 nxt_req_conn_link_t *rc;
406 nxt_event_engine_t *engine;
388
389 ra = obj;
390 engine = data;
391
392 if (task->thread->engine != engine) {
393 ra->work.handler = nxt_router_ra_abort;
394 ra->work.task = &engine->task;
395 ra->work.next = NULL;
396
397 nxt_debug(task, "ra stream #%uD post abort to %p", ra->stream, engine);
398
399 nxt_event_engine_post(engine, &ra->work);
400
401 return;
402 }
403
404 nxt_debug(task, "ra stream #%uD abort", ra->stream);
405
407
408 ra = obj;
409 engine = data;
410
411 if (task->thread->engine != engine) {
412 ra->work.handler = nxt_router_ra_abort;
413 ra->work.task = &engine->task;
414 ra->work.next = NULL;
415
416 nxt_debug(task, "ra stream #%uD post abort to %p", ra->stream, engine);
417
418 nxt_event_engine_post(engine, &ra->work);
419
420 return;
421 }
422
423 nxt_debug(task, "ra stream #%uD abort", ra->stream);
424
406 if (ra->rc != NULL) {
407 c = ra->rc->conn;
425 rc = ra->rc;
408
426
427 if (rc != NULL) {
428 c = rc->conn;
429
409 nxt_router_gen_error(task, c, 500,
410 "Failed to start application worker");
430 nxt_router_gen_error(task, c, 500,
431 "Failed to start application worker");
432
433 rc->ra = NULL;
434 ra->rc = NULL;
411 }
412
435 }
436
437 if (ra->app_port != NULL) {
438 nxt_router_app_port_release(task, ra->app_port, 0, 1);
439
440 ra->app_port = NULL;
441 }
442
413 nxt_mp_release(ra->mem_pool, ra);
414}
415
416
443 nxt_mp_release(ra->mem_pool, ra);
444}
445
446
447nxt_inline void
448nxt_router_rc_unlink(nxt_task_t *task, nxt_req_conn_link_t *rc)
449{
450 nxt_req_app_link_t *ra;
451
452 if (rc->app_port != NULL) {
453 nxt_router_app_port_release(task, rc->app_port, 0, 1);
454
455 rc->app_port = NULL;
456 }
457
458 ra = rc->ra;
459
460 if (ra != NULL) {
461 rc->ra = NULL;
462 ra->rc = NULL;
463
464 nxt_thread_mutex_lock(&rc->app->mutex);
465
466 if (ra->link.next != NULL) {
467 nxt_queue_remove(&ra->link);
468
469 ra->link.next = NULL;
470
471 } else {
472 ra = NULL;
473 }
474
475 nxt_thread_mutex_unlock(&rc->app->mutex);
476 }
477
478 if (ra != NULL) {
479 nxt_router_ra_release(task, ra, ra->work.data);
480 }
481
482 if (rc->app != NULL) {
483 nxt_router_app_use(task, rc->app, -1);
484
485 rc->app = NULL;
486 }
487
488 nxt_queue_remove(&rc->link);
489
490 rc->conn = NULL;
491}
492
493
417void
418nxt_router_new_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
419{
420 nxt_port_new_port_handler(task, msg);
421
422 if (msg->port_msg.stream == 0) {
423 return;
424 }

--- 96 unchanged lines hidden (view full) ---

521
522 nxt_port_rpc_handler(task, msg);
523}
524
525
526static void
527nxt_router_worker_remove_pid_handler(nxt_task_t *task, void *obj, void *data)
528{
494void
495nxt_router_new_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
496{
497 nxt_port_new_port_handler(task, msg);
498
499 if (msg->port_msg.stream == 0) {
500 return;
501 }

--- 96 unchanged lines hidden (view full) ---

598
599 nxt_port_rpc_handler(task, msg);
600}
601
602
603static void
604nxt_router_worker_remove_pid_handler(nxt_task_t *task, void *obj, void *data)
605{
606 nxt_pid_t pid;
607 nxt_buf_t *buf;
529 nxt_event_engine_t *engine;
530 nxt_remove_pid_msg_t *rp;
531
532 rp = obj;
533
608 nxt_event_engine_t *engine;
609 nxt_remove_pid_msg_t *rp;
610
611 rp = obj;
612
534 nxt_port_remove_pid_handler(task, &rp->msg);
613 buf = rp->msg.buf;
535
614
615 nxt_assert(nxt_buf_used_size(buf) == sizeof(pid));
616
617 nxt_memcpy(&pid, buf->mem.pos, sizeof(pid));
618
619 nxt_port_rpc_remove_peer(task, rp->msg.port, pid);
620
536 engine = rp->work.data;
537
538 rp->work.handler = nxt_router_worker_remove_pid_done;
539 rp->work.task = &engine->task;
540 rp->work.next = NULL;
541
542 nxt_event_engine_post(engine, &rp->work);
543}

--- 109 unchanged lines hidden (view full) ---

653 goto fail;
654 }
655
656 ret = nxt_router_threads_create(task, rt, tmcf);
657 if (nxt_slow_path(ret != NXT_OK)) {
658 goto fail;
659 }
660
621 engine = rp->work.data;
622
623 rp->work.handler = nxt_router_worker_remove_pid_done;
624 rp->work.task = &engine->task;
625 rp->work.next = NULL;
626
627 nxt_event_engine_post(engine, &rp->work);
628}

--- 109 unchanged lines hidden (view full) ---

738 goto fail;
739 }
740
741 ret = nxt_router_threads_create(task, rt, tmcf);
742 if (nxt_slow_path(ret != NXT_OK)) {
743 goto fail;
744 }
745
661 nxt_router_apps_sort(router, tmcf);
746 nxt_router_apps_sort(task, router, tmcf);
662
663 nxt_router_engines_post(router, tmcf);
664
665 nxt_queue_add(&router->sockets, &tmcf->updating);
666 nxt_queue_add(&router->sockets, &tmcf->creating);
667
668 nxt_router_conf_ready(task, tmcf);
669

--- 330 unchanged lines hidden (view full) ---

1000
1001 app->name.length = name.length;
1002 nxt_memcpy(app->name.start, name.start, name.length);
1003
1004 app->type = type;
1005 app->max_workers = apcf.workers;
1006 app->timeout = apcf.timeout;
1007 app->live = 1;
747
748 nxt_router_engines_post(router, tmcf);
749
750 nxt_queue_add(&router->sockets, &tmcf->updating);
751 nxt_queue_add(&router->sockets, &tmcf->creating);
752
753 nxt_router_conf_ready(task, tmcf);
754

--- 330 unchanged lines hidden (view full) ---

1085
1086 app->name.length = name.length;
1087 nxt_memcpy(app->name.start, name.start, name.length);
1088
1089 app->type = type;
1090 app->max_workers = apcf.workers;
1091 app->timeout = apcf.timeout;
1092 app->live = 1;
1093 app->max_pending_responses = 2;
1008 app->prepare_msg = nxt_app_prepare_msg[type];
1009
1010 nxt_queue_insert_tail(&tmcf->apps, &app->link);
1094 app->prepare_msg = nxt_app_prepare_msg[type];
1095
1096 nxt_queue_insert_tail(&tmcf->apps, &app->link);
1097
1098 nxt_router_app_use(task, app, 1);
1011 }
1012
1013 http = nxt_conf_get_path(conf, &http_path);
1014#if 0
1015 if (http == NULL) {
1016 nxt_log(task, NXT_LOG_CRIT, "no \"http\" block");
1017 return NXT_ERROR;
1018 }

--- 671 unchanged lines hidden (view full) ---

1690 nxt_queue_remove(&engine->link);
1691 }
1692
1693 return ret;
1694}
1695
1696
1697static void
1099 }
1100
1101 http = nxt_conf_get_path(conf, &http_path);
1102#if 0
1103 if (http == NULL) {
1104 nxt_log(task, NXT_LOG_CRIT, "no \"http\" block");
1105 return NXT_ERROR;
1106 }

--- 671 unchanged lines hidden (view full) ---

1778 nxt_queue_remove(&engine->link);
1779 }
1780
1781 return ret;
1782}
1783
1784
1785static void
1698nxt_router_apps_sort(nxt_router_t *router, nxt_router_temp_conf_t *tmcf)
1786nxt_router_apps_sort(nxt_task_t *task, nxt_router_t *router,
1787 nxt_router_temp_conf_t *tmcf)
1699{
1788{
1700 nxt_app_t *app;
1701 nxt_port_t *port;
1789 nxt_app_t *app;
1790 nxt_port_t *port;
1702
1703 nxt_queue_each(app, &router->apps, nxt_app_t, link) {
1704
1705 nxt_queue_remove(&app->link);
1706
1791
1792 nxt_queue_each(app, &router->apps, nxt_app_t, link) {
1793
1794 nxt_queue_remove(&app->link);
1795
1707 nxt_thread_log_debug("about to remove app '%V' %p", &app->name, app);
1796 nxt_debug(task, "about to free app '%V' %p", &app->name, app);
1708
1709 app->live = 0;
1710
1797
1798 app->live = 0;
1799
1711 if (nxt_router_app_free(NULL, app) != 0) {
1712 continue;
1713 }
1714
1715 if (!nxt_queue_is_empty(&app->requests)) {
1716
1717 nxt_thread_log_debug("app '%V' %p pending requests found",
1718 &app->name, app);
1719 continue;
1720 }
1721
1722 do {
1800 do {
1723 port = nxt_router_app_get_port(app, 0);
1801 port = nxt_router_app_get_idle_port(app);
1724 if (port == NULL) {
1725 break;
1726 }
1727
1802 if (port == NULL) {
1803 break;
1804 }
1805
1728 nxt_thread_log_debug("port %p send quit", port);
1806 nxt_debug(task, "port %p send quit", port);
1729
1807
1730 nxt_port_socket_write(&port->engine->task, port,
1731 NXT_PORT_MSG_QUIT, -1, 0, 0, NULL);
1808 nxt_port_socket_write(task, port, NXT_PORT_MSG_QUIT, -1, 0, 0,
1809 NULL);
1810
1811 nxt_port_use(task, port, -1);
1732 } while (1);
1733
1812 } while (1);
1813
1814 nxt_router_app_use(task, app, -1);
1815
1734 } nxt_queue_loop;
1735
1736 nxt_queue_add(&router->apps, &tmcf->previous);
1737 nxt_queue_add(&router->apps, &tmcf->apps);
1738}
1739
1740
1741static void

--- 86 unchanged lines hidden (view full) ---

1828 port = nxt_port_new(task, nxt_port_get_next_id(), nxt_pid,
1829 NXT_PROCESS_ROUTER);
1830 if (nxt_slow_path(port == NULL)) {
1831 return;
1832 }
1833
1834 ret = nxt_port_socket_init(task, port, 0);
1835 if (nxt_slow_path(ret != NXT_OK)) {
1816 } nxt_queue_loop;
1817
1818 nxt_queue_add(&router->apps, &tmcf->previous);
1819 nxt_queue_add(&router->apps, &tmcf->apps);
1820}
1821
1822
1823static void

--- 86 unchanged lines hidden (view full) ---

1910 port = nxt_port_new(task, nxt_port_get_next_id(), nxt_pid,
1911 NXT_PROCESS_ROUTER);
1912 if (nxt_slow_path(port == NULL)) {
1913 return;
1914 }
1915
1916 ret = nxt_port_socket_init(task, port, 0);
1917 if (nxt_slow_path(ret != NXT_OK)) {
1836 nxt_mp_release(port->mem_pool, port);
1918 nxt_port_use(task, port, -1);
1837 return;
1838 }
1839
1840 engine->port = port;
1841
1842 nxt_port_enable(task, port, &nxt_router_app_port_handlers);
1843
1844 nxt_event_engine_start(engine);

--- 274 unchanged lines hidden (view full) ---

2119 engine = link->engine;
2120
2121 nxt_queue_remove(&engine->link);
2122
2123 port = engine->port;
2124
2125 // TODO notify all apps
2126
1919 return;
1920 }
1921
1922 engine->port = port;
1923
1924 nxt_port_enable(task, port, &nxt_router_app_port_handlers);
1925
1926 nxt_event_engine_start(engine);

--- 274 unchanged lines hidden (view full) ---

2201 engine = link->engine;
2202
2203 nxt_queue_remove(&engine->link);
2204
2205 port = engine->port;
2206
2207 // TODO notify all apps
2208
2209 port->engine = task->thread->engine;
2127 nxt_mp_thread_adopt(port->mem_pool);
2210 nxt_mp_thread_adopt(port->mem_pool);
2128 nxt_port_release(port);
2211 nxt_port_use(task, port, -1);
2129
2130 nxt_mp_thread_adopt(engine->mem_pool);
2131 nxt_mp_destroy(engine->mem_pool);
2132
2133 nxt_event_engine_free(engine);
2134
2135 nxt_free(link);
2136}

--- 98 unchanged lines hidden (view full) ---

2235
2236 last = nxt_buf_sync_alloc(c->mem_pool, NXT_BUF_SYNC_LAST);
2237 if (nxt_slow_path(last == NULL)) {
2238 /* TODO pogorevaTb */
2239 }
2240
2241 nxt_buf_chain_add(&b, last);
2242
2212
2213 nxt_mp_thread_adopt(engine->mem_pool);
2214 nxt_mp_destroy(engine->mem_pool);
2215
2216 nxt_event_engine_free(engine);
2217
2218 nxt_free(link);
2219}

--- 98 unchanged lines hidden (view full) ---

2318
2319 last = nxt_buf_sync_alloc(c->mem_pool, NXT_BUF_SYNC_LAST);
2320 if (nxt_slow_path(last == NULL)) {
2321 /* TODO pogorevaTb */
2322 }
2323
2324 nxt_buf_chain_add(&b, last);
2325
2243 if (rc->app_port != NULL) {
2244 nxt_router_app_release_port(task, rc->app_port, rc->app_port->app);
2245
2246 rc->app_port = NULL;
2247 }
2248
2249 nxt_router_rc_unlink(rc);
2326 nxt_router_rc_unlink(task, rc);
2250 }
2251
2252 if (b == NULL) {
2253 return;
2254 }
2255
2256 if (msg->buf == b) {
2257 /* Disable instant buffer completion/re-using by port. */

--- 20 unchanged lines hidden (view full) ---

2278{
2279 nxt_req_conn_link_t *rc;
2280
2281 rc = data;
2282
2283 nxt_router_gen_error(task, rc->conn, 500,
2284 "Application terminated unexpectedly");
2285
2327 }
2328
2329 if (b == NULL) {
2330 return;
2331 }
2332
2333 if (msg->buf == b) {
2334 /* Disable instant buffer completion/re-using by port. */

--- 20 unchanged lines hidden (view full) ---

2355{
2356 nxt_req_conn_link_t *rc;
2357
2358 rc = data;
2359
2360 nxt_router_gen_error(task, rc->conn, 500,
2361 "Application terminated unexpectedly");
2362
2286 nxt_router_rc_unlink(rc);
2363 nxt_router_rc_unlink(task, rc);
2287}
2288
2289
2290nxt_inline const char *
2291nxt_router_text_by_code(int code)
2292{
2293 switch (code) {
2294 case 400: return "Bad request";

--- 83 unchanged lines hidden (view full) ---

2378 nxt_debug(task, "router data attach out bufs to existing chain");
2379
2380 nxt_buf_chain_add(&c->write, b);
2381 }
2382}
2383
2384
2385static void
2364}
2365
2366
2367nxt_inline const char *
2368nxt_router_text_by_code(int code)
2369{
2370 switch (code) {
2371 case 400: return "Bad request";

--- 83 unchanged lines hidden (view full) ---

2455 nxt_debug(task, "router data attach out bufs to existing chain");
2456
2457 nxt_buf_chain_add(&c->write, b);
2458 }
2459}
2460
2461
2462static void
2386nxt_router_sw_ready(nxt_task_t *task, nxt_port_recv_msg_t *msg, void *data)
2463nxt_router_app_port_ready(nxt_task_t *task, nxt_port_recv_msg_t *msg,
2464 void *data)
2387{
2465{
2388 nxt_start_worker_t *sw;
2466 nxt_app_t *app;
2467 nxt_port_t *port;
2389
2468
2390 sw = data;
2469 app = data;
2470 port = msg->new_port;
2391
2471
2392 nxt_assert(sw != NULL);
2393 nxt_assert(sw->app->pending_workers != 0);
2472 nxt_assert(app != NULL);
2473 nxt_assert(port != NULL);
2394
2474
2395 msg->new_port->app = sw->app;
2475 port->app = app;
2396
2476
2397 sw->app->pending_workers--;
2398 sw->app->workers++;
2477 nxt_thread_mutex_lock(&app->mutex);
2399
2478
2400 nxt_debug(task, "sw %p got port %p", sw, msg->new_port);
2479 nxt_assert(app->pending_workers != 0);
2401
2480
2402 nxt_router_app_release_port(task, msg->new_port, sw->app);
2481 app->pending_workers--;
2482 app->workers++;
2403
2483
2404 nxt_router_sw_release(task, sw);
2484 nxt_thread_mutex_unlock(&app->mutex);
2485
2486 nxt_debug(task, "app '%V' %p new port ready", &app->name, app);
2487
2488 nxt_router_app_port_release(task, port, 0, 0);
2405}
2406
2407
2408static void
2489}
2490
2491
2492static void
2409nxt_router_sw_error(nxt_task_t *task, nxt_port_recv_msg_t *msg, void *data)
2493nxt_router_app_port_error(nxt_task_t *task, nxt_port_recv_msg_t *msg,
2494 void *data)
2410{
2411 nxt_app_t *app;
2412 nxt_queue_link_t *lnk;
2413 nxt_req_app_link_t *ra;
2495{
2496 nxt_app_t *app;
2497 nxt_queue_link_t *lnk;
2498 nxt_req_app_link_t *ra;
2414 nxt_start_worker_t *sw;
2415
2499
2416 sw = data;
2500 app = data;
2417
2501
2418 nxt_assert(sw != NULL);
2419 nxt_assert(sw->app != NULL);
2420 nxt_assert(sw->app->pending_workers != 0);
2502 nxt_assert(app != NULL);
2421
2503
2422 app = sw->app;
2504 nxt_debug(task, "app '%V' %p start error", &app->name, app);
2423
2505
2424 sw->app->pending_workers--;
2506 nxt_thread_mutex_lock(&app->mutex);
2425
2507
2426 nxt_debug(task, "sw %p error, failed to start app '%V'",
2427 sw, &app->name);
2508 nxt_assert(app->pending_workers != 0);
2428
2509
2510 app->pending_workers--;
2511
2429 if (!nxt_queue_is_empty(&app->requests)) {
2430 lnk = nxt_queue_last(&app->requests);
2431 nxt_queue_remove(lnk);
2512 if (!nxt_queue_is_empty(&app->requests)) {
2513 lnk = nxt_queue_last(&app->requests);
2514 nxt_queue_remove(lnk);
2515 lnk->next = NULL;
2432
2433 ra = nxt_queue_link_data(lnk, nxt_req_app_link_t, link);
2434
2516
2517 ra = nxt_queue_link_data(lnk, nxt_req_app_link_t, link);
2518
2519 } else {
2520 ra = NULL;
2521 }
2522
2523 nxt_thread_mutex_unlock(&app->mutex);
2524
2525 if (ra != NULL) {
2435 nxt_debug(task, "app '%V' %p abort next stream #%uD",
2436 &app->name, app, ra->stream);
2437
2438 nxt_router_ra_abort(task, ra, ra->work.data);
2439 }
2440
2526 nxt_debug(task, "app '%V' %p abort next stream #%uD",
2527 &app->name, app, ra->stream);
2528
2529 nxt_router_ra_abort(task, ra, ra->work.data);
2530 }
2531
2441 nxt_router_sw_release(task, sw);
2532 nxt_router_app_use(task, app, -1);
2442}
2443
2444
2533}
2534
2535
2445static void
2446nxt_router_send_sw_request(nxt_task_t *task, void *obj, void *data)
2536void
2537nxt_router_app_use(nxt_task_t *task, nxt_app_t *app, int i)
2447{
2538{
2448 size_t size;
2449 uint32_t stream;
2450 nxt_buf_t *b;
2451 nxt_app_t *app;
2452 nxt_port_t *main_port, *router_port, *app_port;
2453 nxt_runtime_t *rt;
2454 nxt_start_worker_t *sw;
2455 nxt_req_app_link_t *ra;
2539 int c;
2456
2540
2457 sw = obj;
2458 app = sw->app;
2541 c = nxt_atomic_fetch_add(&app->use_count, i);
2459
2542
2460 if (nxt_queue_is_empty(&app->requests)) {
2461 ra = sw->ra;
2462 app_port = nxt_router_app_get_port(app, ra->stream);
2543 if (i < 0 && c == -i) {
2463
2544
2464 if (app_port != NULL) {
2465 nxt_debug(task, "app '%V' %p process stream #%uD",
2466 &app->name, app, ra->stream);
2545 nxt_assert(app->live == 0);
2546 nxt_assert(app->workers == 0);
2547 nxt_assert(app->pending_workers == 0);
2548 nxt_assert(nxt_queue_is_empty(&app->requests) != 0);
2549 nxt_assert(nxt_queue_is_empty(&app->ports) != 0);
2467
2550
2468 ra->app_port = app_port;
2469
2470 nxt_router_process_http_request_mp(task, ra, app_port);
2471
2472 nxt_router_ra_release(task, ra, ra->work.data);
2473 nxt_router_sw_release(task, sw);
2474
2475 return;
2476 }
2551 nxt_thread_mutex_destroy(&app->mutex);
2552 nxt_free(app);
2477 }
2553 }
2478
2479 nxt_queue_insert_tail(&app->requests, &sw->ra->link);
2480
2481 if (app->workers + app->pending_workers >= app->max_workers) {
2482 nxt_debug(task, "app '%V' %p %uD/%uD running/pending workers, "
2483 "max_workers (%uD) reached", &app->name, app,
2484 app->workers, app->pending_workers, app->max_workers);
2485
2486 nxt_router_sw_release(task, sw);
2487
2488 return;
2489 }
2490
2491 app->pending_workers++;
2492
2493 nxt_debug(task, "sw %p send", sw);
2494
2495 rt = task->thread->runtime;
2496 main_port = rt->port_by_type[NXT_PROCESS_MAIN];
2497 router_port = rt->port_by_type[NXT_PROCESS_ROUTER];
2498
2499 size = app->name.length + 1 + app->conf.length;
2500
2501 b = nxt_buf_mem_ts_alloc(task, task->thread->engine->mem_pool, size);
2502
2503 nxt_buf_cpystr(b, &app->name);
2504 *b->mem.free++ = '\0';
2505 nxt_buf_cpystr(b, &app->conf);
2506
2507 stream = nxt_port_rpc_register_handler(task, router_port,
2508 nxt_router_sw_ready,
2509 nxt_router_sw_error,
2510 main_port->pid, sw);
2511
2512 nxt_port_socket_write(task, main_port, NXT_PORT_MSG_START_WORKER, -1,
2513 stream, router_port->id, b);
2514}
2515
2516
2554}
2555
2556
2517static nxt_bool_t
2518nxt_router_app_free(nxt_task_t *task, nxt_app_t *app)
2557nxt_inline nxt_port_t *
2558nxt_router_app_get_port_unsafe(nxt_app_t *app, int *use_delta)
2519{
2559{
2520 nxt_queue_link_t *lnk;
2521 nxt_req_app_link_t *ra;
2560 nxt_port_t *port;
2561 nxt_queue_link_t *lnk;
2522
2562
2523 nxt_thread_log_debug("app '%V' %p state: %d/%uD/%uD/%d", &app->name, app,
2524 app->live, app->workers, app->pending_workers,
2525 nxt_queue_is_empty(&app->requests));
2563 lnk = nxt_queue_first(&app->ports);
2564 nxt_queue_remove(lnk);
2526
2565
2527 if (app->live == 0
2528 && app->workers == 0
2529 && app->pending_workers == 0
2530 && nxt_queue_is_empty(&app->requests))
2531 {
2532 nxt_thread_mutex_destroy(&app->mutex);
2533 nxt_free(app);
2566 port = nxt_queue_link_data(lnk, nxt_port_t, app_link);
2534
2567
2535 return 1;
2536 }
2568 port->app_requests++;
2537
2569
2538 if (app->live == 1
2539 && nxt_queue_is_empty(&app->requests) == 0
2540 && app->workers + app->pending_workers < app->max_workers)
2570 if (app->live &&
2571 (app->max_pending_responses == 0 ||
2572 (port->app_requests - port->app_responses) <
2573 app->max_pending_responses) )
2541 {
2574 {
2542 lnk = nxt_queue_first(&app->requests);
2543 nxt_queue_remove(lnk);
2575 nxt_queue_insert_tail(&app->ports, lnk);
2544
2576
2545 ra = nxt_queue_link_data(lnk, nxt_req_app_link_t, link);
2577 } else {
2578 lnk->next = NULL;
2546
2579
2547 nxt_router_sw_create(task, app, ra);
2580 (*use_delta)--;
2548 }
2549
2581 }
2582
2550 return 0;
2583 return port;
2551}
2552
2553
2554static nxt_port_t *
2584}
2585
2586
2587static nxt_port_t *
2555nxt_router_app_get_port(nxt_app_t *app, uint32_t stream)
2588nxt_router_app_get_idle_port(nxt_app_t *app)
2556{
2589{
2557 nxt_port_t *port;
2558 nxt_queue_link_t *lnk;
2590 nxt_port_t *port;
2559
2560 port = NULL;
2561
2562 nxt_thread_mutex_lock(&app->mutex);
2563
2591
2592 port = NULL;
2593
2594 nxt_thread_mutex_lock(&app->mutex);
2595
2564 if (!nxt_queue_is_empty(&app->ports)) {
2565 lnk = nxt_queue_first(&app->ports);
2566 nxt_queue_remove(lnk);
2596 nxt_queue_each(port, &app->ports, nxt_port_t, app_link) {
2567
2597
2568 lnk->next = NULL;
2598 if (port->app_requests > port->app_responses) {
2599 port = NULL;
2569
2600
2570 port = nxt_queue_link_data(lnk, nxt_port_t, app_link);
2601 continue;
2602 }
2571
2603
2572 port->app_stream = stream;
2573 }
2604 nxt_queue_remove(&port->app_link);
2605 port->app_link.next = NULL;
2574
2606
2607 break;
2608
2609 } nxt_queue_loop;
2610
2575 nxt_thread_mutex_unlock(&app->mutex);
2576
2577 return port;
2578}
2579
2580
2581static void
2611 nxt_thread_mutex_unlock(&app->mutex);
2612
2613 return port;
2614}
2615
2616
2617static void
2582nxt_router_app_release_port(nxt_task_t *task, void *obj, void *data)
2618nxt_router_app_process_request(nxt_task_t *task, void *obj, void *data)
2583{
2619{
2584 nxt_app_t *app;
2585 nxt_port_t *port;
2586 nxt_work_t *work;
2587 nxt_queue_link_t *lnk;
2588 nxt_req_app_link_t *ra;
2620 nxt_app_t *app;
2621 nxt_req_app_link_t *ra;
2589
2622
2590 port = obj;
2591 app = data;
2623 app = obj;
2624 ra = data;
2592
2593 nxt_assert(app != NULL);
2625
2626 nxt_assert(app != NULL);
2594 nxt_assert(app == port->app);
2595 nxt_assert(port->app_link.next == NULL);
2627 nxt_assert(ra != NULL);
2628 nxt_assert(ra->app_port != NULL);
2596
2629
2630 nxt_debug(task, "app '%V' %p process next stream #%uD",
2631 &app->name, app, ra->stream);
2597
2632
2598 if (task->thread->engine != port->engine) {
2599 work = &port->work;
2633 nxt_router_process_http_request_mp(task, ra);
2634}
2600
2635
2601 nxt_debug(task, "post release port to engine %p", port->engine);
2602
2636
2603 work->next = NULL;
2604 work->handler = nxt_router_app_release_port;
2605 work->task = &port->engine->task;
2606 work->obj = port;
2607 work->data = app;
2637static void
2638nxt_router_app_port_release(nxt_task_t *task, nxt_port_t *port,
2639 uint32_t request_failed, uint32_t got_response)
2640{
2641 int use_delta, ra_use_delta;
2642 nxt_app_t *app;
2643 nxt_queue_link_t *lnk;
2644 nxt_req_app_link_t *ra;
2608
2645
2609 nxt_event_engine_post(port->engine, work);
2646 nxt_assert(port != NULL);
2647 nxt_assert(port->app != NULL);
2610
2648
2611 return;
2649 app = port->app;
2650
2651 use_delta = (request_failed == 0 && got_response == 0) ? 0 : -1;
2652
2653 nxt_thread_mutex_lock(&app->mutex);
2654
2655 port->app_requests -= request_failed;
2656 port->app_responses += got_response;
2657
2658 if (app->live != 0 &&
2659 port->pair[1] != -1 &&
2660 port->app_link.next == NULL &&
2661 (app->max_pending_responses == 0 ||
2662 (port->app_requests - port->app_responses) <
2663 app->max_pending_responses) )
2664 {
2665 nxt_queue_insert_tail(&app->ports, &port->app_link);
2666 use_delta++;
2612 }
2613
2667 }
2668
2614 if (!nxt_queue_is_empty(&app->requests)) {
2669 if (app->live != 0 &&
2670 !nxt_queue_is_empty(&app->ports) &&
2671 !nxt_queue_is_empty(&app->requests))
2672 {
2615 lnk = nxt_queue_first(&app->requests);
2616 nxt_queue_remove(lnk);
2673 lnk = nxt_queue_first(&app->requests);
2674 nxt_queue_remove(lnk);
2675 lnk->next = NULL;
2617
2618 ra = nxt_queue_link_data(lnk, nxt_req_app_link_t, link);
2619
2676
2677 ra = nxt_queue_link_data(lnk, nxt_req_app_link_t, link);
2678
2620 nxt_debug(task, "app '%V' %p process next stream #%uD",
2621 &app->name, app, ra->stream);
2679 ra_use_delta = 1;
2680 ra->app_port = nxt_router_app_get_port_unsafe(app, &ra_use_delta);
2622
2681
2623 ra->app_port = port;
2624 port->app_stream = ra->stream;
2682 } else {
2683 ra = NULL;
2684 ra_use_delta = 0;
2685 }
2625
2686
2626 nxt_router_process_http_request_mp(task, ra, port);
2687 nxt_thread_mutex_unlock(&app->mutex);
2627
2688
2628 nxt_router_ra_release(task, ra, ra->work.data);
2689 if (ra != NULL) {
2690 nxt_work_queue_add(&task->thread->engine->fast_work_queue,
2691 nxt_router_app_process_request,
2692 &task->thread->engine->task, app, ra);
2629
2693
2630 return;
2694 goto adjust_use;
2631 }
2632
2695 }
2696
2633 port->app_stream = 0;
2634
2697 /* ? */
2635 if (port->pair[1] == -1) {
2698 if (port->pair[1] == -1) {
2636 nxt_debug(task, "app '%V' %p port already closed (pid %PI dead?)",
2637 &app->name, app, port->pid);
2699 nxt_debug(task, "app '%V' %p port %p already closed (pid %PI dead?)",
2700 &app->name, app, port, port->pid);
2638
2701
2639 app->workers--;
2640 nxt_router_app_free(task, app);
2641
2642 port->app = NULL;
2643
2644 nxt_port_release(port);
2645
2646 return;
2702 goto adjust_use;
2647 }
2648
2703 }
2704
2649 if (!app->live) {
2705 if (app->live == 0) {
2650 nxt_debug(task, "app '%V' %p is not alive, send QUIT to port",
2651 &app->name, app);
2652
2653 nxt_port_socket_write(task, port, NXT_PORT_MSG_QUIT,
2654 -1, 0, 0, NULL);
2655
2706 nxt_debug(task, "app '%V' %p is not alive, send QUIT to port",
2707 &app->name, app);
2708
2709 nxt_port_socket_write(task, port, NXT_PORT_MSG_QUIT,
2710 -1, 0, 0, NULL);
2711
2656 return;
2712 goto adjust_use;
2657 }
2658
2659 nxt_debug(task, "app '%V' %p requests queue is empty, keep the port",
2660 &app->name, app);
2661
2713 }
2714
2715 nxt_debug(task, "app '%V' %p requests queue is empty, keep the port",
2716 &app->name, app);
2717
2662 nxt_thread_mutex_lock(&app->mutex);
2718adjust_use:
2663
2719
2664 nxt_queue_insert_head(&app->ports, &port->app_link);
2720 if (use_delta != 0) {
2721 nxt_port_use(task, port, use_delta);
2722 }
2665
2723
2666 nxt_thread_mutex_unlock(&app->mutex);
2724 if (ra_use_delta != 0) {
2725 nxt_port_use(task, ra->app_port, ra_use_delta);
2726 }
2667}
2668
2669
2727}
2728
2729
2670nxt_bool_t
2671nxt_router_app_remove_port(nxt_port_t *port)
2730void
2731nxt_router_app_port_close(nxt_task_t *task, nxt_port_t *port)
2672{
2673 nxt_app_t *app;
2732{
2733 nxt_app_t *app;
2674 nxt_bool_t busy;
2734 nxt_bool_t unchain, start_worker;
2675
2676 app = port->app;
2735
2736 app = port->app;
2677 busy = port->app_stream != 0;
2678
2737
2679 if (app == NULL) {
2680 nxt_thread_log_debug("port %p app remove, no app", port);
2738 nxt_assert(app != NULL);
2681
2739
2682 nxt_assert(port->app_link.next == NULL);
2683
2684 return 1;
2685 }
2686
2687 nxt_thread_mutex_lock(&app->mutex);
2688
2740 nxt_thread_mutex_lock(&app->mutex);
2741
2689 if (port->app_link.next != NULL) {
2742 unchain = port->app_link.next != NULL;
2690
2743
2744 if (unchain) {
2691 nxt_queue_remove(&port->app_link);
2692 port->app_link.next = NULL;
2745 nxt_queue_remove(&port->app_link);
2746 port->app_link.next = NULL;
2747 }
2693
2748
2749 app->workers--;
2750
2751 start_worker = app->live != 0 &&
2752 nxt_queue_is_empty(&app->requests) == 0 &&
2753 app->workers + app->pending_workers < app->max_workers;
2754
2755 if (start_worker) {
2756 app->pending_workers++;
2694 }
2695
2696 nxt_thread_mutex_unlock(&app->mutex);
2697
2757 }
2758
2759 nxt_thread_mutex_unlock(&app->mutex);
2760
2698 if (busy == 0) {
2699 nxt_thread_log_debug("port %p app remove, free, app '%V' %p", port,
2700 &app->name, app);
2761 nxt_debug(task, "app '%V' %p port %p close", &app->name, app, port);
2701
2762
2702 app->workers--;
2703 nxt_router_app_free(&port->engine->task, app);
2704
2705 return 1;
2763 if (unchain) {
2764 nxt_port_use(task, port, -1);
2706 }
2707
2765 }
2766
2708 nxt_thread_log_debug("port %p app remove, busy, app '%V' %p, "
2709 "app stream #%uD", port, &app->name, app,
2710 port->app_stream);
2711
2712 return 0;
2767 if (start_worker) {
2768 nxt_router_start_worker(task, app);
2769 }
2713}
2714
2715
2716static nxt_int_t
2717nxt_router_app_port(nxt_task_t *task, nxt_req_app_link_t *ra)
2718{
2770}
2771
2772
2773static nxt_int_t
2774nxt_router_app_port(nxt_task_t *task, nxt_req_app_link_t *ra)
2775{
2776 int use_delta;
2777 nxt_int_t res;
2719 nxt_app_t *app;
2778 nxt_app_t *app;
2779 nxt_bool_t can_start_worker;
2720 nxt_conn_t *c;
2721 nxt_port_t *port;
2722 nxt_event_engine_t *engine;
2780 nxt_conn_t *c;
2781 nxt_port_t *port;
2782 nxt_event_engine_t *engine;
2723 nxt_start_worker_t *sw;
2724 nxt_socket_conf_joint_t *joint;
2725
2726 port = NULL;
2783 nxt_socket_conf_joint_t *joint;
2784
2785 port = NULL;
2786 use_delta = 1;
2727 c = ra->rc->conn;
2728
2729 joint = c->listen->socket.data;
2730 app = joint->socket_conf->application;
2731
2732 if (app == NULL) {
2733 nxt_router_gen_error(task, c, 500,
2734 "Application is NULL in socket_conf");
2735 return NXT_ERROR;
2736 }
2737
2787 c = ra->rc->conn;
2788
2789 joint = c->listen->socket.data;
2790 app = joint->socket_conf->application;
2791
2792 if (app == NULL) {
2793 nxt_router_gen_error(task, c, 500,
2794 "Application is NULL in socket_conf");
2795 return NXT_ERROR;
2796 }
2797
2798 ra->rc->app = app;
2799
2800 nxt_router_app_use(task, app, 1);
2801
2738 engine = task->thread->engine;
2739
2740 nxt_timer_disable(engine, &c->read_timer);
2741
2742 if (app->timeout != 0) {
2743 c->read_timer.handler = nxt_router_app_timeout;
2744 nxt_timer_add(engine, &c->read_timer, app->timeout);
2745 }
2746
2802 engine = task->thread->engine;
2803
2804 nxt_timer_disable(engine, &c->read_timer);
2805
2806 if (app->timeout != 0) {
2807 c->read_timer.handler = nxt_router_app_timeout;
2808 nxt_timer_add(engine, &c->read_timer, app->timeout);
2809 }
2810
2747 port = nxt_router_app_get_port(app, ra->stream);
2811 nxt_thread_mutex_lock(&app->mutex);
2748
2812
2813 if (!nxt_queue_is_empty(&app->ports)) {
2814 port = nxt_router_app_get_port_unsafe(app, &use_delta);
2815
2816 can_start_worker = 0;
2817
2818 } else {
2819 nxt_queue_insert_tail(&app->requests, &ra->link);
2820
2821 can_start_worker = (app->workers + app->pending_workers) <
2822 app->max_workers;
2823 if (can_start_worker) {
2824 app->pending_workers++;
2825 }
2826
2827 port = NULL;
2828 }
2829
2830 nxt_thread_mutex_unlock(&app->mutex);
2831
2749 if (port != NULL) {
2832 if (port != NULL) {
2750 nxt_debug(task, "already have port for app '%V'", &app->name);
2833 nxt_debug(task, "already have port for app '%V' %p ", &app->name, app);
2751
2752 ra->app_port = port;
2834
2835 ra->app_port = port;
2836
2837 if (use_delta != 0) {
2838 nxt_port_use(task, port, use_delta);
2839 }
2753 return NXT_OK;
2754 }
2755
2840 return NXT_OK;
2841 }
2842
2756 sw = nxt_router_sw_create(task, app, ra);
2843 if (!can_start_worker) {
2844 nxt_debug(task, "app '%V' %p too many running or pending workers",
2845 &app->name, app);
2757
2846
2758 if (nxt_slow_path(sw == NULL)) {
2759 nxt_router_gen_error(task, c, 500,
2760 "Failed to allocate start worker struct");
2847 return NXT_AGAIN;
2848 }
2849
2850 res = nxt_router_start_worker(task, app);
2851
2852 if (nxt_slow_path(res != NXT_OK)) {
2853 nxt_router_gen_error(task, c, 500, "Failed to start worker");
2854
2761 return NXT_ERROR;
2762 }
2763
2764 return NXT_AGAIN;
2765}
2766
2767
2768static void

--- 237 unchanged lines hidden (view full) ---

3006
3007 if (nxt_slow_path(port == NULL)) {
3008 nxt_router_gen_error(task, c, 500, "Application port not found");
3009 return;
3010 }
3011
3012 nxt_port_rpc_ex_set_peer(task, engine->port, rc, port->pid);
3013
2855 return NXT_ERROR;
2856 }
2857
2858 return NXT_AGAIN;
2859}
2860
2861
2862static void

--- 237 unchanged lines hidden (view full) ---

3100
3101 if (nxt_slow_path(port == NULL)) {
3102 nxt_router_gen_error(task, c, 500, "Application port not found");
3103 return;
3104 }
3105
3106 nxt_port_rpc_ex_set_peer(task, engine->port, rc, port->pid);
3107
3014 nxt_router_process_http_request_mp(task, ra, port);
3015
3016 nxt_router_ra_release(task, ra, ra->work.data);
3108 nxt_router_process_http_request_mp(task, ra);
3017}
3018
3019
3020static void
3109}
3110
3111
3112static void
3021nxt_router_process_http_request_mp(nxt_task_t *task, nxt_req_app_link_t *ra,
3022 nxt_port_t *port)
3113nxt_router_process_http_request_mp(nxt_task_t *task, nxt_req_app_link_t *ra)
3023{
3114{
3115 uint32_t request_failed;
3024 nxt_int_t res;
3116 nxt_int_t res;
3025 nxt_port_t *c_port, *reply_port;
3117 nxt_port_t *port, *c_port, *reply_port;
3026 nxt_conn_t *c;
3027 nxt_app_wmsg_t wmsg;
3028 nxt_app_parse_ctx_t *ap;
3029
3030 /* TODO: it is unsafe to use ra->rc and ra->rc->conn in main thread */
3031
3032 nxt_assert(ra->rc != NULL);
3118 nxt_conn_t *c;
3119 nxt_app_wmsg_t wmsg;
3120 nxt_app_parse_ctx_t *ap;
3121
3122 /* TODO: it is unsafe to use ra->rc and ra->rc->conn in main thread */
3123
3124 nxt_assert(ra->rc != NULL);
3125 nxt_assert(ra->app_port != NULL);
3033
3126
3127 port = ra->app_port;
3034 reply_port = ra->reply_port;
3035 ap = ra->ap;
3036 c = ra->rc->conn;
3037
3128 reply_port = ra->reply_port;
3129 ap = ra->ap;
3130 c = ra->rc->conn;
3131
3132 request_failed = 1;
3133
3038 c_port = nxt_process_connected_port_find(port->process, reply_port->pid,
3039 reply_port->id);
3040 if (nxt_slow_path(c_port != reply_port)) {
3041 res = nxt_port_send_port(task, port, reply_port, 0);
3042
3043 if (nxt_slow_path(res != NXT_OK)) {
3044 nxt_router_gen_error(task, c, 500,
3045 "Failed to send reply port to application");
3134 c_port = nxt_process_connected_port_find(port->process, reply_port->pid,
3135 reply_port->id);
3136 if (nxt_slow_path(c_port != reply_port)) {
3137 res = nxt_port_send_port(task, port, reply_port, 0);
3138
3139 if (nxt_slow_path(res != NXT_OK)) {
3140 nxt_router_gen_error(task, c, 500,
3141 "Failed to send reply port to application");
3046 return;
3142 goto release_port;
3047 }
3048
3049 nxt_process_connected_port_add(port->process, reply_port);
3050 }
3051
3052 wmsg.port = port;
3053 wmsg.write = NULL;
3054 wmsg.buf = &wmsg.write;
3055 wmsg.stream = ra->stream;
3056
3057 res = port->app->prepare_msg(task, &ap->r, &wmsg);
3058
3059 if (nxt_slow_path(res != NXT_OK)) {
3060 nxt_router_gen_error(task, c, 500,
3061 "Failed to prepare message for application");
3143 }
3144
3145 nxt_process_connected_port_add(port->process, reply_port);
3146 }
3147
3148 wmsg.port = port;
3149 wmsg.write = NULL;
3150 wmsg.buf = &wmsg.write;
3151 wmsg.stream = ra->stream;
3152
3153 res = port->app->prepare_msg(task, &ap->r, &wmsg);
3154
3155 if (nxt_slow_path(res != NXT_OK)) {
3156 nxt_router_gen_error(task, c, 500,
3157 "Failed to prepare message for application");
3062 return;
3158 goto release_port;
3063 }
3064
3065 nxt_debug(task, "about to send %d bytes buffer to worker port %d",
3066 nxt_buf_used_size(wmsg.write),
3067 wmsg.port->socket.fd);
3068
3159 }
3160
3161 nxt_debug(task, "about to send %d bytes buffer to worker port %d",
3162 nxt_buf_used_size(wmsg.write),
3163 wmsg.port->socket.fd);
3164
3165 request_failed = 0;
3166
3069 res = nxt_port_socket_write(task, wmsg.port, NXT_PORT_MSG_DATA,
3070 -1, ra->stream, reply_port->id, wmsg.write);
3071
3072 if (nxt_slow_path(res != NXT_OK)) {
3073 nxt_router_gen_error(task, c, 500,
3074 "Failed to send message to application");
3167 res = nxt_port_socket_write(task, wmsg.port, NXT_PORT_MSG_DATA,
3168 -1, ra->stream, reply_port->id, wmsg.write);
3169
3170 if (nxt_slow_path(res != NXT_OK)) {
3171 nxt_router_gen_error(task, c, 500,
3172 "Failed to send message to application");
3075 return;
3173 goto release_port;
3076 }
3174 }
3175
3176release_port:
3177
3178 if (request_failed != 0) {
3179 ra->app_port = 0;
3180 }
3181
3182 nxt_router_app_port_release(task, port, request_failed, 0);
3183
3184 nxt_router_ra_release(task, ra, ra->work.data);
3077}
3078
3079
3080static nxt_int_t
3081nxt_python_prepare_msg(nxt_task_t *task, nxt_app_request_t *r,
3082 nxt_app_wmsg_t *wmsg)
3083{
3084 nxt_int_t rc;

--- 362 unchanged lines hidden (view full) ---

3447
3448 c->socket.data = NULL;
3449 }
3450
3451 nxt_queue_each(rc, &c->requests, nxt_req_conn_link_t, link) {
3452
3453 nxt_debug(task, "conn %p close, stream #%uD", c, rc->stream);
3454
3185}
3186
3187
3188static nxt_int_t
3189nxt_python_prepare_msg(nxt_task_t *task, nxt_app_request_t *r,
3190 nxt_app_wmsg_t *wmsg)
3191{
3192 nxt_int_t rc;

--- 362 unchanged lines hidden (view full) ---

3555
3556 c->socket.data = NULL;
3557 }
3558
3559 nxt_queue_each(rc, &c->requests, nxt_req_conn_link_t, link) {
3560
3561 nxt_debug(task, "conn %p close, stream #%uD", c, rc->stream);
3562
3455 if (rc->app_port != NULL) {
3456 nxt_router_app_release_port(task, rc->app_port, rc->app_port->app);
3563 nxt_router_rc_unlink(task, rc);
3457
3564
3458 rc->app_port = NULL;
3459 }
3460
3461 nxt_router_rc_unlink(rc);
3462
3463 nxt_port_rpc_cancel(task, task->thread->engine->port, rc->stream);
3464
3465 } nxt_queue_loop;
3466
3467 nxt_queue_remove(&c->link);
3468
3469 engine = task->thread->engine;
3470

--- 74 unchanged lines hidden ---
3565 nxt_port_rpc_cancel(task, task->thread->engine->port, rc->stream);
3566
3567 } nxt_queue_loop;
3568
3569 nxt_queue_remove(&c->link);
3570
3571 engine = task->thread->engine;
3572

--- 74 unchanged lines hidden ---