nxt_router.c (424:38b478d79178) nxt_router.c (425:1da949cf0a34)
1
2/*
3 * Copyright (C) Igor Sysoev
4 * Copyright (C) Valentin V. Bartenev
5 * Copyright (C) NGINX, Inc.
6 */
7
8#include <nxt_router.h>

--- 34 unchanged lines hidden (view full) ---

43 nxt_req_app_link_t *ra;
44
45 nxt_queue_link_t link; /* for nxt_conn_t.requests */
46} nxt_req_conn_link_t;
47
48
49struct nxt_req_app_link_s {
50 uint32_t stream;
1
2/*
3 * Copyright (C) Igor Sysoev
4 * Copyright (C) Valentin V. Bartenev
5 * Copyright (C) NGINX, Inc.
6 */
7
8#include <nxt_router.h>

--- 34 unchanged lines hidden (view full) ---

43 nxt_req_app_link_t *ra;
44
45 nxt_queue_link_t link; /* for nxt_conn_t.requests */
46} nxt_req_conn_link_t;
47
48
49struct nxt_req_app_link_s {
50 uint32_t stream;
51 nxt_atomic_t use_count;
51 nxt_port_t *app_port;
52 nxt_port_t *app_port;
52 nxt_pid_t app_pid;
53 nxt_port_t *reply_port;
54 nxt_app_parse_ctx_t *ap;
55 nxt_msg_info_t msg_info;
56 nxt_req_conn_link_t *rc;
57
53 nxt_port_t *reply_port;
54 nxt_app_parse_ctx_t *ap;
55 nxt_msg_info_t msg_info;
56 nxt_req_conn_link_t *rc;
57
58 nxt_queue_link_t link; /* for nxt_app_t.requests */
58 nxt_queue_link_t link_app_requests; /* for nxt_app_t.requests */
59 nxt_queue_link_t link_port_pending; /* for nxt_port_t.pending_requests */
59
60 nxt_mp_t *mem_pool;
61 nxt_work_t work;
62
63 int err_code;
64 const char *err_str;
65};
66
67
68typedef struct {
69 nxt_socket_conf_t *socket_conf;
70 nxt_router_temp_conf_t *temp_conf;
71} nxt_socket_rpc_t;
72
73
74static nxt_int_t nxt_router_start_worker(nxt_task_t *task, nxt_app_t *app);
75
60
61 nxt_mp_t *mem_pool;
62 nxt_work_t work;
63
64 int err_code;
65 const char *err_str;
66};
67
68
69typedef struct {
70 nxt_socket_conf_t *socket_conf;
71 nxt_router_temp_conf_t *temp_conf;
72} nxt_socket_rpc_t;
73
74
75static nxt_int_t nxt_router_start_worker(nxt_task_t *task, nxt_app_t *app);
76
77nxt_inline void
78nxt_router_ra_inc_use(nxt_req_app_link_t *ra)
79{
80 nxt_atomic_fetch_add(&ra->use_count, 1);
81}
82
83nxt_inline void
84nxt_router_ra_dec_use(nxt_req_app_link_t *ra)
85{
86 int c;
87
88 c = nxt_atomic_fetch_add(&ra->use_count, -1);
89
90 nxt_assert(c > 1);
91}
92
93static void nxt_router_ra_use(nxt_task_t *task, nxt_req_app_link_t *ra, int i);
94
76static nxt_router_temp_conf_t *nxt_router_temp_conf(nxt_task_t *task);
77static void nxt_router_conf_apply(nxt_task_t *task, void *obj, void *data);
78static void nxt_router_conf_ready(nxt_task_t *task,
79 nxt_router_temp_conf_t *tmcf);
80static void nxt_router_conf_error(nxt_task_t *task,
81 nxt_router_temp_conf_t *tmcf);
82static void nxt_router_conf_send(nxt_task_t *task,
83 nxt_router_temp_conf_t *tmcf, nxt_port_msg_type_t type);

--- 64 unchanged lines hidden (view full) ---

148static void nxt_router_app_port_ready(nxt_task_t *task,
149 nxt_port_recv_msg_t *msg, void *data);
150static void nxt_router_app_port_error(nxt_task_t *task,
151 nxt_port_recv_msg_t *msg, void *data);
152
153static nxt_port_t * nxt_router_app_get_idle_port(nxt_app_t *app);
154static void nxt_router_app_port_release(nxt_task_t *task, nxt_port_t *port,
155 uint32_t request_failed, uint32_t got_response);
95static nxt_router_temp_conf_t *nxt_router_temp_conf(nxt_task_t *task);
96static void nxt_router_conf_apply(nxt_task_t *task, void *obj, void *data);
97static void nxt_router_conf_ready(nxt_task_t *task,
98 nxt_router_temp_conf_t *tmcf);
99static void nxt_router_conf_error(nxt_task_t *task,
100 nxt_router_temp_conf_t *tmcf);
101static void nxt_router_conf_send(nxt_task_t *task,
102 nxt_router_temp_conf_t *tmcf, nxt_port_msg_type_t type);

--- 64 unchanged lines hidden (view full) ---

167static void nxt_router_app_port_ready(nxt_task_t *task,
168 nxt_port_recv_msg_t *msg, void *data);
169static void nxt_router_app_port_error(nxt_task_t *task,
170 nxt_port_recv_msg_t *msg, void *data);
171
172static nxt_port_t * nxt_router_app_get_idle_port(nxt_app_t *app);
173static void nxt_router_app_port_release(nxt_task_t *task, nxt_port_t *port,
174 uint32_t request_failed, uint32_t got_response);
175static nxt_int_t nxt_router_app_port(nxt_task_t *task, nxt_req_app_link_t *ra);
156
157static void nxt_router_conn_init(nxt_task_t *task, void *obj, void *data);
158static void nxt_router_conn_http_header_parse(nxt_task_t *task, void *obj,
159 void *data);
160static nxt_sockaddr_t *nxt_router_local_addr(nxt_task_t *task, nxt_conn_t *c);
161static void nxt_router_conn_http_body_read(nxt_task_t *task, void *obj,
162 void *data);
163static void nxt_router_process_http_request(nxt_task_t *task,
164 nxt_conn_t *c, nxt_app_parse_ctx_t *ap);
176
177static void nxt_router_conn_init(nxt_task_t *task, void *obj, void *data);
178static void nxt_router_conn_http_header_parse(nxt_task_t *task, void *obj,
179 void *data);
180static nxt_sockaddr_t *nxt_router_local_addr(nxt_task_t *task, nxt_conn_t *c);
181static void nxt_router_conn_http_body_read(nxt_task_t *task, void *obj,
182 void *data);
183static void nxt_router_process_http_request(nxt_task_t *task,
184 nxt_conn_t *c, nxt_app_parse_ctx_t *ap);
165static void nxt_router_process_http_request_mp(nxt_task_t *task,
185static void nxt_router_app_prepare_request(nxt_task_t *task,
166 nxt_req_app_link_t *ra);
167static nxt_int_t nxt_python_prepare_msg(nxt_task_t *task, nxt_app_request_t *r,
168 nxt_app_wmsg_t *wmsg);
169static nxt_int_t nxt_php_prepare_msg(nxt_task_t *task, nxt_app_request_t *r,
170 nxt_app_wmsg_t *wmsg);
171static nxt_int_t nxt_go_prepare_msg(nxt_task_t *task, nxt_app_request_t *r,
172 nxt_app_wmsg_t *wmsg);
173static void nxt_router_conn_ready(nxt_task_t *task, void *obj, void *data);

--- 140 unchanged lines hidden (view full) ---

314{
315 nxt_event_engine_t *engine;
316
317 engine = task->thread->engine;
318
319 nxt_memzero(ra, sizeof(nxt_req_app_link_t));
320
321 ra->stream = rc->stream;
186 nxt_req_app_link_t *ra);
187static nxt_int_t nxt_python_prepare_msg(nxt_task_t *task, nxt_app_request_t *r,
188 nxt_app_wmsg_t *wmsg);
189static nxt_int_t nxt_php_prepare_msg(nxt_task_t *task, nxt_app_request_t *r,
190 nxt_app_wmsg_t *wmsg);
191static nxt_int_t nxt_go_prepare_msg(nxt_task_t *task, nxt_app_request_t *r,
192 nxt_app_wmsg_t *wmsg);
193static void nxt_router_conn_ready(nxt_task_t *task, void *obj, void *data);

--- 140 unchanged lines hidden (view full) ---

334{
335 nxt_event_engine_t *engine;
336
337 engine = task->thread->engine;
338
339 nxt_memzero(ra, sizeof(nxt_req_app_link_t));
340
341 ra->stream = rc->stream;
322 ra->app_pid = -1;
342 ra->use_count = 1;
323 ra->rc = rc;
324 rc->ra = ra;
325 ra->reply_port = engine->port;
326 ra->ap = rc->ap;
327
328 ra->work.handler = NULL;
329 ra->work.task = &engine->task;
330 ra->work.obj = ra;
331 ra->work.data = engine;
332}
333
334
335nxt_inline nxt_req_app_link_t *
336nxt_router_ra_create(nxt_task_t *task, nxt_req_app_link_t *ra_src)
337{
338 nxt_mp_t *mp;
339 nxt_req_app_link_t *ra;
340
343 ra->rc = rc;
344 rc->ra = ra;
345 ra->reply_port = engine->port;
346 ra->ap = rc->ap;
347
348 ra->work.handler = NULL;
349 ra->work.task = &engine->task;
350 ra->work.obj = ra;
351 ra->work.data = engine;
352}
353
354
355nxt_inline nxt_req_app_link_t *
356nxt_router_ra_create(nxt_task_t *task, nxt_req_app_link_t *ra_src)
357{
358 nxt_mp_t *mp;
359 nxt_req_app_link_t *ra;
360
361 if (ra_src->mem_pool != NULL) {
362 return ra_src;
363 }
364
341 mp = ra_src->ap->mem_pool;
342
343 ra = nxt_mp_retain(mp, sizeof(nxt_req_app_link_t));
344
345 if (nxt_slow_path(ra == NULL)) {
346
347 ra_src->rc->ra = NULL;
348 ra_src->rc = NULL;

--- 40 unchanged lines hidden (view full) ---

389
390 msg_info->buf = NULL;
391
392 return cancelled;
393}
394
395
396static void
365 mp = ra_src->ap->mem_pool;
366
367 ra = nxt_mp_retain(mp, sizeof(nxt_req_app_link_t));
368
369 if (nxt_slow_path(ra == NULL)) {
370
371 ra_src->rc->ra = NULL;
372 ra_src->rc = NULL;

--- 40 unchanged lines hidden (view full) ---

413
414 msg_info->buf = NULL;
415
416 return cancelled;
417}
418
419
420static void
397nxt_router_ra_release(nxt_task_t *task, void *obj, void *data)
421nxt_router_ra_update_peer(nxt_task_t *task, nxt_req_app_link_t *ra);
422
423
424static void
425nxt_router_ra_update_peer_handler(nxt_task_t *task, void *obj, void *data)
398{
426{
399 nxt_req_app_link_t *ra;
427 nxt_req_app_link_t *ra;
428
429 ra = obj;
430
431 nxt_router_ra_update_peer(task, ra);
432
433 nxt_router_ra_use(task, ra, -1);
434}
435
436
437static void
438nxt_router_ra_update_peer(nxt_task_t *task, nxt_req_app_link_t *ra)
439{
400 nxt_event_engine_t *engine;
401 nxt_req_conn_link_t *rc;
402
440 nxt_event_engine_t *engine;
441 nxt_req_conn_link_t *rc;
442
403 ra = obj;
404 engine = data;
443 engine = ra->work.data;
405
406 if (task->thread->engine != engine) {
444
445 if (task->thread->engine != engine) {
407 if (ra->app_port != NULL) {
408 ra->app_pid = ra->app_port->pid;
409 }
446 nxt_router_ra_inc_use(ra);
410
447
411 ra->work.handler = nxt_router_ra_release;
448 ra->work.handler = nxt_router_ra_update_peer_handler;
412 ra->work.task = &engine->task;
413 ra->work.next = NULL;
414
449 ra->work.task = &engine->task;
450 ra->work.next = NULL;
451
415 nxt_debug(task, "ra stream #%uD post release to %p",
452 nxt_debug(task, "ra stream #%uD post update peer to %p",
416 ra->stream, engine);
417
418 nxt_event_engine_post(engine, &ra->work);
419
420 return;
421 }
422
453 ra->stream, engine);
454
455 nxt_event_engine_post(engine, &ra->work);
456
457 return;
458 }
459
460 nxt_debug(task, "ra stream #%uD update peer", ra->stream);
461
462 rc = ra->rc;
463
464 if (rc != NULL && ra->app_port != NULL) {
465 nxt_port_rpc_ex_set_peer(task, engine->port, rc, ra->app_port->pid);
466 }
467
468 nxt_router_ra_use(task, ra, -1);
469}
470
471
472static void
473nxt_router_ra_release(nxt_task_t *task, nxt_req_app_link_t *ra)
474{
475 nxt_conn_t *c;
476 nxt_req_conn_link_t *rc;
477
478 nxt_assert(task->thread->engine == ra->work.data);
479 nxt_assert(ra->use_count == 0);
480
423 nxt_debug(task, "ra stream #%uD release", ra->stream);
424
425 rc = ra->rc;
426
427 if (rc != NULL) {
481 nxt_debug(task, "ra stream #%uD release", ra->stream);
482
483 rc = ra->rc;
484
485 if (rc != NULL) {
428 if (ra->app_pid != -1) {
429 nxt_port_rpc_ex_set_peer(task, engine->port, rc, ra->app_pid);
430 }
486 c = rc->conn;
431
432 if (nxt_slow_path(ra->err_code != 0)) {
487
488 if (nxt_slow_path(ra->err_code != 0)) {
433 nxt_router_gen_error(task, rc->conn, ra->err_code, ra->err_str);
489 nxt_router_gen_error(task, c, ra->err_code, ra->err_str);
434
435 } else {
436 rc->app_port = ra->app_port;
437 rc->msg_info = ra->msg_info;
438
490
491 } else {
492 rc->app_port = ra->app_port;
493 rc->msg_info = ra->msg_info;
494
495 if (rc->app->timeout != 0) {
496 c->read_timer.handler = nxt_router_app_timeout;
497 nxt_timer_add(task->thread->engine, &c->read_timer,
498 rc->app->timeout);
499 }
500
439 ra->app_port = NULL;
440 ra->msg_info.buf = NULL;
441 }
442
443 rc->ra = NULL;
444 ra->rc = NULL;
445 }
446

--- 6 unchanged lines hidden (view full) ---

453 nxt_router_msg_cancel(task, &ra->msg_info, ra->stream);
454
455 if (ra->mem_pool != NULL) {
456 nxt_mp_release(ra->mem_pool, ra);
457 }
458}
459
460
501 ra->app_port = NULL;
502 ra->msg_info.buf = NULL;
503 }
504
505 rc->ra = NULL;
506 ra->rc = NULL;
507 }
508

--- 6 unchanged lines hidden (view full) ---

515 nxt_router_msg_cancel(task, &ra->msg_info, ra->stream);
516
517 if (ra->mem_pool != NULL) {
518 nxt_mp_release(ra->mem_pool, ra);
519 }
520}
521
522
523static void
524nxt_router_ra_release_handler(nxt_task_t *task, void *obj, void *data)
525{
526 nxt_req_app_link_t *ra;
527
528 ra = obj;
529
530 nxt_assert(ra->work.data == data);
531
532 nxt_atomic_fetch_add(&ra->use_count, -1);
533
534 nxt_router_ra_release(task, ra);
535}
536
537
538static void
539nxt_router_ra_use(nxt_task_t *task, nxt_req_app_link_t *ra, int i)
540{
541 int c;
542 nxt_event_engine_t *engine;
543
544 c = nxt_atomic_fetch_add(&ra->use_count, i);
545
546 if (i < 0 && c == -i) {
547 engine = ra->work.data;
548
549 if (task->thread->engine == engine) {
550 nxt_router_ra_release(task, ra);
551
552 return;
553 }
554
555 nxt_router_ra_inc_use(ra);
556
557 ra->work.handler = nxt_router_ra_release_handler;
558 ra->work.task = &engine->task;
559 ra->work.next = NULL;
560
561 nxt_debug(task, "ra stream #%uD post release to %p",
562 ra->stream, engine);
563
564 nxt_event_engine_post(engine, &ra->work);
565 }
566}
567
568
461nxt_inline void
462nxt_router_ra_error(nxt_req_app_link_t *ra, int code, const char* str)
463{
464 ra->app_port = NULL;
465 ra->err_code = code;
466 ra->err_str = str;
467}
468
469
569nxt_inline void
570nxt_router_ra_error(nxt_req_app_link_t *ra, int code, const char* str)
571{
572 ra->app_port = NULL;
573 ra->err_code = code;
574 ra->err_str = str;
575}
576
577
578nxt_inline nxt_bool_t
579nxt_queue_chk_remove(nxt_queue_link_t *lnk)
580{
581 if (lnk->next != NULL) {
582 nxt_queue_remove(lnk);
583
584 lnk->next = NULL;
585
586 return 1;
587 }
588
589 return 0;
590}
591
592
470nxt_inline void
471nxt_router_rc_unlink(nxt_task_t *task, nxt_req_conn_link_t *rc)
472{
593nxt_inline void
594nxt_router_rc_unlink(nxt_task_t *task, nxt_req_conn_link_t *rc)
595{
596 int ra_use_delta;
473 nxt_req_app_link_t *ra;
474
475 if (rc->app_port != NULL) {
476 nxt_router_app_port_release(task, rc->app_port, 0, 1);
477
478 rc->app_port = NULL;
479 }
480
481 nxt_router_msg_cancel(task, &rc->msg_info, rc->stream);
482
483 ra = rc->ra;
484
485 if (ra != NULL) {
486 rc->ra = NULL;
487 ra->rc = NULL;
488
597 nxt_req_app_link_t *ra;
598
599 if (rc->app_port != NULL) {
600 nxt_router_app_port_release(task, rc->app_port, 0, 1);
601
602 rc->app_port = NULL;
603 }
604
605 nxt_router_msg_cancel(task, &rc->msg_info, rc->stream);
606
607 ra = rc->ra;
608
609 if (ra != NULL) {
610 rc->ra = NULL;
611 ra->rc = NULL;
612
613 ra_use_delta = 0;
614
489 nxt_thread_mutex_lock(&rc->app->mutex);
490
615 nxt_thread_mutex_lock(&rc->app->mutex);
616
491 if (ra->link.next != NULL) {
492 nxt_queue_remove(&ra->link);
617 if (ra->link_app_requests.next == NULL
618 && ra->link_port_pending.next == NULL)
619 {
620 ra = NULL;
493
621
494 ra->link.next = NULL;
495
496 } else {
622 } else {
497 ra = NULL;
623 ra_use_delta -= nxt_queue_chk_remove(&ra->link_app_requests);
624 ra_use_delta -= nxt_queue_chk_remove(&ra->link_port_pending);
498 }
499
500 nxt_thread_mutex_unlock(&rc->app->mutex);
625 }
626
627 nxt_thread_mutex_unlock(&rc->app->mutex);
501 }
502
628
503 if (ra != NULL) {
504 nxt_router_ra_release(task, ra, ra->work.data);
629 if (ra != NULL) {
630 nxt_router_ra_use(task, ra, ra_use_delta);
631 }
505 }
506
507 if (rc->app != NULL) {
508 nxt_router_app_use(task, rc->app, -1);
509
510 rc->app = NULL;
511 }
512

--- 1748 unchanged lines hidden (view full) ---

2261
2262static void
2263nxt_router_response_ready_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg,
2264 void *data)
2265{
2266 size_t dump_size;
2267 nxt_buf_t *b, *last;
2268 nxt_conn_t *c;
632 }
633
634 if (rc->app != NULL) {
635 nxt_router_app_use(task, rc->app, -1);
636
637 rc->app = NULL;
638 }
639

--- 1748 unchanged lines hidden (view full) ---

2388
2389static void
2390nxt_router_response_ready_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg,
2391 void *data)
2392{
2393 size_t dump_size;
2394 nxt_buf_t *b, *last;
2395 nxt_conn_t *c;
2396 nxt_event_engine_t *engine;
2269 nxt_req_conn_link_t *rc;
2270
2271 b = msg->buf;
2272 rc = data;
2273
2274 c = rc->conn;
2275
2276 dump_size = nxt_buf_used_size(b);

--- 5 unchanged lines hidden (view full) ---

2282 nxt_debug(task, "%srouter app data (%z): %*s",
2283 msg->port_msg.last ? "last " : "", msg->size, dump_size,
2284 b->mem.pos);
2285
2286 if (msg->size == 0) {
2287 b = NULL;
2288 }
2289
2397 nxt_req_conn_link_t *rc;
2398
2399 b = msg->buf;
2400 rc = data;
2401
2402 c = rc->conn;
2403
2404 dump_size = nxt_buf_used_size(b);

--- 5 unchanged lines hidden (view full) ---

2410 nxt_debug(task, "%srouter app data (%z): %*s",
2411 msg->port_msg.last ? "last " : "", msg->size, dump_size,
2412 b->mem.pos);
2413
2414 if (msg->size == 0) {
2415 b = NULL;
2416 }
2417
2418 engine = task->thread->engine;
2419
2420 nxt_timer_disable(engine, &c->read_timer);
2421
2290 if (msg->port_msg.last != 0) {
2291 nxt_debug(task, "router data create last buf");
2292
2293 last = nxt_buf_sync_alloc(c->mem_pool, NXT_BUF_SYNC_LAST);
2294 if (nxt_slow_path(last == NULL)) {
2295 /* TODO pogorevaTb */
2296 }
2297
2298 nxt_buf_chain_add(&b, last);
2299
2300 nxt_router_rc_unlink(task, rc);
2422 if (msg->port_msg.last != 0) {
2423 nxt_debug(task, "router data create last buf");
2424
2425 last = nxt_buf_sync_alloc(c->mem_pool, NXT_BUF_SYNC_LAST);
2426 if (nxt_slow_path(last == NULL)) {
2427 /* TODO pogorevaTb */
2428 }
2429
2430 nxt_buf_chain_add(&b, last);
2431
2432 nxt_router_rc_unlink(task, rc);
2433
2434 } else {
2435 if (rc->app->timeout != 0) {
2436 c->read_timer.handler = nxt_router_app_timeout;
2437 nxt_timer_add(engine, &c->read_timer, rc->app->timeout);
2438 }
2301 }
2302
2303 if (b == NULL) {
2304 return;
2305 }
2306
2307 if (msg->buf == b) {
2308 /* Disable instant buffer completion/re-using by port. */

--- 13 unchanged lines hidden (view full) ---

2322 }
2323}
2324
2325
2326static void
2327nxt_router_response_error_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg,
2328 void *data)
2329{
2439 }
2440
2441 if (b == NULL) {
2442 return;
2443 }
2444
2445 if (msg->buf == b) {
2446 /* Disable instant buffer completion/re-using by port. */

--- 13 unchanged lines hidden (view full) ---

2460 }
2461}
2462
2463
2464static void
2465nxt_router_response_error_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg,
2466 void *data)
2467{
2468 nxt_int_t res;
2469 nxt_port_t *port;
2470 nxt_bool_t cancelled;
2471 nxt_req_app_link_t *ra;
2330 nxt_req_conn_link_t *rc;
2331
2332 rc = data;
2333
2472 nxt_req_conn_link_t *rc;
2473
2474 rc = data;
2475
2476 ra = rc->ra;
2477
2478 if (ra != NULL) {
2479 cancelled = nxt_router_msg_cancel(task, &ra->msg_info, ra->stream);
2480
2481 if (cancelled) {
2482 nxt_router_ra_inc_use(ra);
2483
2484 res = nxt_router_app_port(task, ra);
2485
2486 if (res == NXT_OK) {
2487 port = ra->app_port;
2488
2489 nxt_assert(port != NULL);
2490
2491 nxt_port_rpc_ex_set_peer(task, task->thread->engine->port, rc,
2492 port->pid);
2493
2494 nxt_router_app_prepare_request(task, ra);
2495 }
2496
2497 msg->port_msg.last = 0;
2498
2499 return;
2500 }
2501 }
2502
2334 nxt_router_gen_error(task, rc->conn, 500,
2335 "Application terminated unexpectedly");
2336
2337 nxt_router_rc_unlink(task, rc);
2338}
2339
2340
2341nxt_inline const char *

--- 131 unchanged lines hidden (view full) ---

2473
2474 app->pending_workers--;
2475
2476 if (!nxt_queue_is_empty(&app->requests)) {
2477 lnk = nxt_queue_last(&app->requests);
2478 nxt_queue_remove(lnk);
2479 lnk->next = NULL;
2480
2503 nxt_router_gen_error(task, rc->conn, 500,
2504 "Application terminated unexpectedly");
2505
2506 nxt_router_rc_unlink(task, rc);
2507}
2508
2509
2510nxt_inline const char *

--- 131 unchanged lines hidden (view full) ---

2642
2643 app->pending_workers--;
2644
2645 if (!nxt_queue_is_empty(&app->requests)) {
2646 lnk = nxt_queue_last(&app->requests);
2647 nxt_queue_remove(lnk);
2648 lnk->next = NULL;
2649
2481 ra = nxt_queue_link_data(lnk, nxt_req_app_link_t, link);
2650 ra = nxt_queue_link_data(lnk, nxt_req_app_link_t, link_app_requests);
2482
2483 } else {
2484 ra = NULL;
2485 }
2486
2487 nxt_thread_mutex_unlock(&app->mutex);
2488
2489 if (ra != NULL) {
2490 nxt_debug(task, "app '%V' %p abort next stream #%uD",
2491 &app->name, app, ra->stream);
2492
2493 nxt_router_ra_error(ra, 500, "Failed to start application worker");
2651
2652 } else {
2653 ra = NULL;
2654 }
2655
2656 nxt_thread_mutex_unlock(&app->mutex);
2657
2658 if (ra != NULL) {
2659 nxt_debug(task, "app '%V' %p abort next stream #%uD",
2660 &app->name, app, ra->stream);
2661
2662 nxt_router_ra_error(ra, 500, "Failed to start application worker");
2494 nxt_router_ra_release(task, ra, ra->work.data);
2663 nxt_router_ra_use(task, ra, -1);
2495 }
2496
2497 nxt_router_app_use(task, app, -1);
2498}
2499
2500
2501void
2502nxt_router_app_use(nxt_task_t *task, nxt_app_t *app, int i)

--- 25 unchanged lines hidden (view full) ---

2528 lnk = nxt_queue_first(&app->ports);
2529 port = nxt_queue_link_data(lnk, nxt_port_t, app_link);
2530
2531 return port->app_pending_responses > 0;
2532}
2533
2534
2535nxt_inline nxt_port_t *
2664 }
2665
2666 nxt_router_app_use(task, app, -1);
2667}
2668
2669
2670void
2671nxt_router_app_use(nxt_task_t *task, nxt_app_t *app, int i)

--- 25 unchanged lines hidden (view full) ---

2697 lnk = nxt_queue_first(&app->ports);
2698 port = nxt_queue_link_data(lnk, nxt_port_t, app_link);
2699
2700 return port->app_pending_responses > 0;
2701}
2702
2703
2704nxt_inline nxt_port_t *
2536nxt_router_app_get_port_unsafe(nxt_app_t *app, int *use_delta)
2705nxt_router_app_get_port_unsafe(nxt_app_t *app)
2537{
2538 nxt_port_t *port;
2539 nxt_queue_link_t *lnk;
2540
2541 lnk = nxt_queue_first(&app->ports);
2542 nxt_queue_remove(lnk);
2543
2544 port = nxt_queue_link_data(lnk, nxt_port_t, app_link);
2545
2546 port->app_pending_responses++;
2547
2548 if (app->max_pending_responses == 0
2549 || port->app_pending_responses < app->max_pending_responses)
2550 {
2551 nxt_queue_insert_tail(&app->ports, lnk);
2552
2706{
2707 nxt_port_t *port;
2708 nxt_queue_link_t *lnk;
2709
2710 lnk = nxt_queue_first(&app->ports);
2711 nxt_queue_remove(lnk);
2712
2713 port = nxt_queue_link_data(lnk, nxt_port_t, app_link);
2714
2715 port->app_pending_responses++;
2716
2717 if (app->max_pending_responses == 0
2718 || port->app_pending_responses < app->max_pending_responses)
2719 {
2720 nxt_queue_insert_tail(&app->ports, lnk);
2721
2722 nxt_port_inc_use(port);
2723
2553 } else {
2554 lnk->next = NULL;
2724 } else {
2725 lnk->next = NULL;
2555
2556 (*use_delta)--;
2557 }
2558
2559 return port;
2560}
2561
2562
2563static nxt_port_t *
2564nxt_router_app_get_idle_port(nxt_app_t *app)

--- 36 unchanged lines hidden (view full) ---

2601
2602 nxt_assert(app != NULL);
2603 nxt_assert(ra != NULL);
2604 nxt_assert(ra->app_port != NULL);
2605
2606 nxt_debug(task, "app '%V' %p process next stream #%uD",
2607 &app->name, app, ra->stream);
2608
2726 }
2727
2728 return port;
2729}
2730
2731
2732static nxt_port_t *
2733nxt_router_app_get_idle_port(nxt_app_t *app)

--- 36 unchanged lines hidden (view full) ---

2770
2771 nxt_assert(app != NULL);
2772 nxt_assert(ra != NULL);
2773 nxt_assert(ra->app_port != NULL);
2774
2775 nxt_debug(task, "app '%V' %p process next stream #%uD",
2776 &app->name, app, ra->stream);
2777
2609 nxt_router_process_http_request_mp(task, ra);
2778 nxt_router_app_prepare_request(task, ra);
2610}
2611
2612
2613static void
2614nxt_router_app_port_release(nxt_task_t *task, nxt_port_t *port,
2615 uint32_t request_failed, uint32_t got_response)
2616{
2779}
2780
2781
2782static void
2783nxt_router_app_port_release(nxt_task_t *task, nxt_port_t *port,
2784 uint32_t request_failed, uint32_t got_response)
2785{
2617 int use_delta, ra_use_delta;
2618 nxt_app_t *app;
2619 nxt_bool_t send_quit;
2620 nxt_queue_link_t *lnk;
2786 nxt_app_t *app;
2787 nxt_bool_t send_quit;
2788 nxt_queue_link_t *lnk;
2621 nxt_req_app_link_t *ra;
2789 nxt_req_app_link_t *ra, *next_ra;
2622
2623 nxt_assert(port != NULL);
2624 nxt_assert(port->app != NULL);
2625
2626 app = port->app;
2627
2790
2791 nxt_assert(port != NULL);
2792 nxt_assert(port->app != NULL);
2793
2794 app = port->app;
2795
2628 use_delta = (request_failed == 0 && got_response == 0) ? 0 : -1;
2629
2630 nxt_thread_mutex_lock(&app->mutex);
2631
2632 port->app_pending_responses -= request_failed + got_response;
2633 port->app_responses += got_response;
2634
2635 if (app->live != 0 &&
2636 port->pair[1] != -1 &&
2637 (app->max_pending_responses == 0
2638 || port->app_pending_responses < app->max_pending_responses))
2639 {
2640 if (port->app_link.next == NULL) {
2641 if (port->app_pending_responses > 0) {
2642 nxt_queue_insert_tail(&app->ports, &port->app_link);
2643
2644 } else {
2645 nxt_queue_insert_head(&app->ports, &port->app_link);
2646 }
2647
2796 nxt_thread_mutex_lock(&app->mutex);
2797
2798 port->app_pending_responses -= request_failed + got_response;
2799 port->app_responses += got_response;
2800
2801 if (app->live != 0 &&
2802 port->pair[1] != -1 &&
2803 (app->max_pending_responses == 0
2804 || port->app_pending_responses < app->max_pending_responses))
2805 {
2806 if (port->app_link.next == NULL) {
2807 if (port->app_pending_responses > 0) {
2808 nxt_queue_insert_tail(&app->ports, &port->app_link);
2809
2810 } else {
2811 nxt_queue_insert_head(&app->ports, &port->app_link);
2812 }
2813
2648 use_delta++;
2814 nxt_port_inc_use(port);
2649
2650 } else {
2651 if (port->app_pending_responses == 0
2652 && nxt_queue_first(&app->ports) != &port->app_link)
2653 {
2654 nxt_queue_remove(&port->app_link);
2655 nxt_queue_insert_head(&app->ports, &port->app_link);
2656 }
2657 }
2658 }
2659
2660 if (app->live != 0 &&
2661 !nxt_queue_is_empty(&app->ports) &&
2662 !nxt_queue_is_empty(&app->requests))
2663 {
2664 lnk = nxt_queue_first(&app->requests);
2665 nxt_queue_remove(lnk);
2666 lnk->next = NULL;
2667
2815
2816 } else {
2817 if (port->app_pending_responses == 0
2818 && nxt_queue_first(&app->ports) != &port->app_link)
2819 {
2820 nxt_queue_remove(&port->app_link);
2821 nxt_queue_insert_head(&app->ports, &port->app_link);
2822 }
2823 }
2824 }
2825
2826 if (app->live != 0 &&
2827 !nxt_queue_is_empty(&app->ports) &&
2828 !nxt_queue_is_empty(&app->requests))
2829 {
2830 lnk = nxt_queue_first(&app->requests);
2831 nxt_queue_remove(lnk);
2832 lnk->next = NULL;
2833
2668 ra = nxt_queue_link_data(lnk, nxt_req_app_link_t, link);
2834 ra = nxt_queue_link_data(lnk, nxt_req_app_link_t, link_app_requests);
2669
2835
2670 ra_use_delta = 1;
2671 ra->app_port = nxt_router_app_get_port_unsafe(app, &ra_use_delta);
2836 ra->app_port = nxt_router_app_get_port_unsafe(app);
2672
2837
2838 if (ra->app_port->app_pending_responses > 1) {
2839 nxt_queue_insert_tail(&ra->app_port->pending_requests,
2840 &ra->link_port_pending);
2841
2842 nxt_router_ra_inc_use(ra);
2843 }
2844
2673 } else {
2674 ra = NULL;
2845 } else {
2846 ra = NULL;
2675 ra_use_delta = 0;
2676 }
2677
2847 }
2848
2849 if ((request_failed > 0 || got_response > 0)
2850 && !nxt_queue_is_empty(&port->pending_requests))
2851 {
2852 lnk = nxt_queue_first(&port->pending_requests);
2853 nxt_queue_remove(lnk);
2854 lnk->next = NULL;
2855
2856 next_ra = nxt_queue_link_data(lnk, nxt_req_app_link_t,
2857 link_port_pending);
2858
2859 } else {
2860 next_ra = NULL;
2861 }
2862
2678 send_quit = app->live == 0 && port->app_pending_responses > 0;
2679
2680 nxt_thread_mutex_unlock(&app->mutex);
2681
2863 send_quit = app->live == 0 && port->app_pending_responses > 0;
2864
2865 nxt_thread_mutex_unlock(&app->mutex);
2866
2867 if (next_ra != NULL) {
2868 nxt_router_ra_use(task, next_ra, -1);
2869 }
2870
2682 if (ra != NULL) {
2871 if (ra != NULL) {
2872 nxt_router_ra_use(task, ra, -1);
2873
2683 nxt_work_queue_add(&task->thread->engine->fast_work_queue,
2684 nxt_router_app_process_request,
2685 &task->thread->engine->task, app, ra);
2686
2687 goto adjust_use;
2688 }
2689
2690 /* ? */

--- 14 unchanged lines hidden (view full) ---

2705 goto adjust_use;
2706 }
2707
2708 nxt_debug(task, "app '%V' %p requests queue is empty, keep the port",
2709 &app->name, app);
2710
2711adjust_use:
2712
2874 nxt_work_queue_add(&task->thread->engine->fast_work_queue,
2875 nxt_router_app_process_request,
2876 &task->thread->engine->task, app, ra);
2877
2878 goto adjust_use;
2879 }
2880
2881 /* ? */

--- 14 unchanged lines hidden (view full) ---

2896 goto adjust_use;
2897 }
2898
2899 nxt_debug(task, "app '%V' %p requests queue is empty, keep the port",
2900 &app->name, app);
2901
2902adjust_use:
2903
2713 if (use_delta != 0) {
2714 nxt_port_use(task, port, use_delta);
2904 if (request_failed > 0 || got_response > 0) {
2905 nxt_port_use(task, port, -1);
2715 }
2906 }
2716
2717 if (ra_use_delta != 0) {
2718 nxt_port_use(task, ra->app_port, ra_use_delta);
2719 }
2720}
2721
2722
2723void
2724nxt_router_app_port_close(nxt_task_t *task, nxt_port_t *port)
2725{
2726 nxt_app_t *app;
2727 nxt_bool_t unchain, start_worker;

--- 33 unchanged lines hidden (view full) ---

2761 nxt_router_start_worker(task, app);
2762 }
2763}
2764
2765
2766static nxt_int_t
2767nxt_router_app_port(nxt_task_t *task, nxt_req_app_link_t *ra)
2768{
2907}
2908
2909
2910void
2911nxt_router_app_port_close(nxt_task_t *task, nxt_port_t *port)
2912{
2913 nxt_app_t *app;
2914 nxt_bool_t unchain, start_worker;

--- 33 unchanged lines hidden (view full) ---

2948 nxt_router_start_worker(task, app);
2949 }
2950}
2951
2952
2953static nxt_int_t
2954nxt_router_app_port(nxt_task_t *task, nxt_req_app_link_t *ra)
2955{
2769 int use_delta;
2770 nxt_int_t res;
2771 nxt_app_t *app;
2772 nxt_bool_t can_start_worker;
2773 nxt_conn_t *c;
2774 nxt_port_t *port;
2775 nxt_event_engine_t *engine;
2776 nxt_socket_conf_joint_t *joint;
2956 int failed_port_use_delta;
2957 nxt_int_t res;
2958 nxt_app_t *app;
2959 nxt_bool_t can_start_worker;
2960 nxt_conn_t *c;
2961 nxt_port_t *port, *failed_port;
2777
2962
2778 use_delta = 1;
2779 c = ra->rc->conn;
2963 c = ra->rc->conn;
2964 app = ra->rc->app;
2780
2965
2781 joint = c->joint;
2782 app = joint->socket_conf->application;
2966 failed_port_use_delta = 0;
2783
2967
2784 if (app == NULL) {
2785 nxt_router_gen_error(task, c, 500,
2786 "Application is NULL in socket_conf");
2787 return NXT_ERROR;
2968 nxt_thread_mutex_lock(&app->mutex);
2969
2970 if (nxt_queue_chk_remove(&ra->link_app_requests))
2971 {
2972 nxt_router_ra_dec_use(ra);
2788 }
2789
2973 }
2974
2790 ra->rc->app = app;
2975 if (nxt_queue_chk_remove(&ra->link_port_pending))
2976 {
2977 nxt_router_ra_dec_use(ra);
2978 }
2791
2979
2792 nxt_router_app_use(task, app, 1);
2980 if (ra->app_port != NULL) {
2981 failed_port = ra->app_port;
2982 failed_port_use_delta--;
2793
2983
2794 engine = task->thread->engine;
2984 failed_port->app_pending_responses--;
2795
2985
2796 nxt_timer_disable(engine, &c->read_timer);
2986 if (failed_port->app_link.next != NULL) {
2987 nxt_queue_remove(&failed_port->app_link);
2988 failed_port->app_link.next = NULL;
2797
2989
2798 if (app->timeout != 0) {
2799 c->read_timer.handler = nxt_router_app_timeout;
2800 nxt_timer_add(engine, &c->read_timer, app->timeout);
2990 failed_port_use_delta--;
2991 }
2992
2993 } else {
2994 failed_port = NULL;
2801 }
2802
2995 }
2996
2803 nxt_thread_mutex_lock(&app->mutex);
2804
2805 can_start_worker = (app->workers + app->pending_workers)
2806 < app->max_workers;
2807
2808 if (nxt_queue_is_empty(&app->ports)
2809 || (can_start_worker && nxt_router_app_first_port_busy(app)) )
2810 {
2811 ra = nxt_router_ra_create(task, ra);
2812
2813 if (nxt_fast_path(ra != NULL)) {
2997 can_start_worker = (app->workers + app->pending_workers)
2998 < app->max_workers;
2999
3000 if (nxt_queue_is_empty(&app->ports)
3001 || (can_start_worker && nxt_router_app_first_port_busy(app)) )
3002 {
3003 ra = nxt_router_ra_create(task, ra);
3004
3005 if (nxt_fast_path(ra != NULL)) {
2814 nxt_queue_insert_tail(&app->requests, &ra->link);
3006 nxt_queue_insert_tail(&app->requests, &ra->link_app_requests);
2815
3007
3008 nxt_router_ra_inc_use(ra);
3009
3010 nxt_debug(task, "ra stream #%uD enqueue to app->requests",
3011 ra->stream);
3012
2816 if (can_start_worker) {
2817 app->pending_workers++;
2818 }
2819 }
2820
2821 port = NULL;
2822
2823 } else {
3013 if (can_start_worker) {
3014 app->pending_workers++;
3015 }
3016 }
3017
3018 port = NULL;
3019
3020 } else {
2824 port = nxt_router_app_get_port_unsafe(app, &use_delta);
3021 port = nxt_router_app_get_port_unsafe(app);
3022
3023 if (port->app_pending_responses > 1) {
3024 ra = nxt_router_ra_create(task, ra);
3025
3026 if (nxt_fast_path(ra != NULL)) {
3027 nxt_queue_insert_tail(&port->pending_requests,
3028 &ra->link_port_pending);
3029
3030 nxt_router_ra_inc_use(ra);
3031
3032 nxt_debug(task, "ra stream #%uD enqueue to "
3033 "port->pending_requests", ra->stream);
3034 }
3035 }
2825 }
2826
2827 nxt_thread_mutex_unlock(&app->mutex);
2828
3036 }
3037
3038 nxt_thread_mutex_unlock(&app->mutex);
3039
3040 if (failed_port_use_delta != 0) {
3041 nxt_port_use(task, failed_port, failed_port_use_delta);
3042 }
3043
2829 if (nxt_slow_path(ra == NULL)) {
2830 nxt_router_gen_error(task, c, 500, "Failed to allocate "
2831 "req<->app link");
3044 if (nxt_slow_path(ra == NULL)) {
3045 nxt_router_gen_error(task, c, 500, "Failed to allocate "
3046 "req<->app link");
3047
3048 if (port != NULL) {
3049 nxt_port_use(task, port, -1);
3050 }
3051
2832 return NXT_ERROR;
2833 }
2834
2835 if (port != NULL) {
2836 nxt_debug(task, "already have port for app '%V' %p ", &app->name, app);
2837
2838 ra->app_port = port;
2839
3052 return NXT_ERROR;
3053 }
3054
3055 if (port != NULL) {
3056 nxt_debug(task, "already have port for app '%V' %p ", &app->name, app);
3057
3058 ra->app_port = port;
3059
2840 if (use_delta != 0) {
2841 nxt_port_use(task, port, use_delta);
2842 }
2843 return NXT_OK;
2844 }
2845
3060 return NXT_OK;
3061 }
3062
2846 nxt_debug(task, "ra stream #%uD allocated", ra->stream);
2847
2848 if (!can_start_worker) {
2849 nxt_debug(task, "app '%V' %p too many running or pending workers",
2850 &app->name, app);
2851
2852 return NXT_AGAIN;
2853 }
2854
2855 res = nxt_router_start_worker(task, app);

--- 266 unchanged lines hidden (view full) ---

3122 nxt_conn_read(task->thread->engine, c);
3123}
3124
3125
3126static void
3127nxt_router_process_http_request(nxt_task_t *task, nxt_conn_t *c,
3128 nxt_app_parse_ctx_t *ap)
3129{
3063 if (!can_start_worker) {
3064 nxt_debug(task, "app '%V' %p too many running or pending workers",
3065 &app->name, app);
3066
3067 return NXT_AGAIN;
3068 }
3069
3070 res = nxt_router_start_worker(task, app);

--- 266 unchanged lines hidden (view full) ---

3337 nxt_conn_read(task->thread->engine, c);
3338}
3339
3340
3341static void
3342nxt_router_process_http_request(nxt_task_t *task, nxt_conn_t *c,
3343 nxt_app_parse_ctx_t *ap)
3344{
3130 nxt_int_t res;
3131 nxt_port_t *port;
3132 nxt_event_engine_t *engine;
3133 nxt_req_app_link_t ra_local, *ra;
3134 nxt_req_conn_link_t *rc;
3345 nxt_int_t res;
3346 nxt_app_t *app;
3347 nxt_port_t *port;
3348 nxt_event_engine_t *engine;
3349 nxt_req_app_link_t ra_local, *ra;
3350 nxt_req_conn_link_t *rc;
3351 nxt_socket_conf_joint_t *joint;
3135
3352
3353 joint = c->joint;
3354 app = joint->socket_conf->application;
3355
3356 if (app == NULL) {
3357 nxt_router_gen_error(task, c, 500,
3358 "Application is NULL in socket_conf");
3359 return;
3360 }
3361
3136 engine = task->thread->engine;
3137
3138 rc = nxt_port_rpc_register_handler_ex(task, engine->port,
3139 nxt_router_response_ready_handler,
3140 nxt_router_response_error_handler,
3141 sizeof(nxt_req_conn_link_t));
3142
3143 if (nxt_slow_path(rc == NULL)) {
3144 nxt_router_gen_error(task, c, 500, "Failed to allocate "
3145 "req<->conn link");
3146
3147 return;
3148 }
3149
3150 rc->stream = nxt_port_rpc_ex_stream(rc);
3151 rc->conn = c;
3362 engine = task->thread->engine;
3363
3364 rc = nxt_port_rpc_register_handler_ex(task, engine->port,
3365 nxt_router_response_ready_handler,
3366 nxt_router_response_error_handler,
3367 sizeof(nxt_req_conn_link_t));
3368
3369 if (nxt_slow_path(rc == NULL)) {
3370 nxt_router_gen_error(task, c, 500, "Failed to allocate "
3371 "req<->conn link");
3372
3373 return;
3374 }
3375
3376 rc->stream = nxt_port_rpc_ex_stream(rc);
3377 rc->conn = c;
3378 rc->app = app;
3152
3379
3380 nxt_router_app_use(task, app, 1);
3381
3382 nxt_timer_disable(engine, &c->read_timer);
3383
3153 nxt_queue_insert_tail(&c->requests, &rc->link);
3154
3155 nxt_debug(task, "stream #%uD linked to conn %p at engine %p",
3156 rc->stream, c, engine);
3157
3158 rc->ap = ap;
3159 c->socket.data = NULL;
3160
3161 ra = &ra_local;
3162 nxt_router_ra_init(task, ra, rc);
3163
3164 res = nxt_router_app_port(task, ra);
3165
3166 if (res != NXT_OK) {
3167 return;
3168 }
3169
3384 nxt_queue_insert_tail(&c->requests, &rc->link);
3385
3386 nxt_debug(task, "stream #%uD linked to conn %p at engine %p",
3387 rc->stream, c, engine);
3388
3389 rc->ap = ap;
3390 c->socket.data = NULL;
3391
3392 ra = &ra_local;
3393 nxt_router_ra_init(task, ra, rc);
3394
3395 res = nxt_router_app_port(task, ra);
3396
3397 if (res != NXT_OK) {
3398 return;
3399 }
3400
3401 ra = rc->ra;
3170 port = ra->app_port;
3171
3402 port = ra->app_port;
3403
3172 if (nxt_slow_path(port == NULL)) {
3173 nxt_router_gen_error(task, c, 500, "Application port not found");
3174 return;
3175 }
3404 nxt_assert(port != NULL);
3176
3177 nxt_port_rpc_ex_set_peer(task, engine->port, rc, port->pid);
3178
3405
3406 nxt_port_rpc_ex_set_peer(task, engine->port, rc, port->pid);
3407
3179 nxt_router_process_http_request_mp(task, ra);
3408 nxt_router_app_prepare_request(task, ra);
3180}
3181
3182
3183static void
3184nxt_router_dummy_buf_completion(nxt_task_t *task, void *obj, void *data)
3185{
3186}
3187
3188
3189static void
3409}
3410
3411
3412static void
3413nxt_router_dummy_buf_completion(nxt_task_t *task, void *obj, void *data)
3414{
3415}
3416
3417
3418static void
3190nxt_router_process_http_request_mp(nxt_task_t *task, nxt_req_app_link_t *ra)
3419nxt_router_app_prepare_request(nxt_task_t *task, nxt_req_app_link_t *ra)
3191{
3192 uint32_t request_failed;
3193 nxt_buf_t *b;
3194 nxt_int_t res;
3195 nxt_port_t *port, *c_port, *reply_port;
3196 nxt_app_wmsg_t wmsg;
3197 nxt_app_parse_ctx_t *ap;
3198

--- 62 unchanged lines hidden (view full) ---

3261 "Failed to send message to application");
3262 goto release_port;
3263 }
3264
3265release_port:
3266
3267 nxt_router_app_port_release(task, port, request_failed, 0);
3268
3420{
3421 uint32_t request_failed;
3422 nxt_buf_t *b;
3423 nxt_int_t res;
3424 nxt_port_t *port, *c_port, *reply_port;
3425 nxt_app_wmsg_t wmsg;
3426 nxt_app_parse_ctx_t *ap;
3427

--- 62 unchanged lines hidden (view full) ---

3490 "Failed to send message to application");
3491 goto release_port;
3492 }
3493
3494release_port:
3495
3496 nxt_router_app_port_release(task, port, request_failed, 0);
3497
3269 nxt_router_ra_release(task, ra, ra->work.data);
3498 nxt_router_ra_update_peer(task, ra);
3270}
3271
3272
3273static nxt_int_t
3274nxt_python_prepare_msg(nxt_task_t *task, nxt_app_request_t *r,
3275 nxt_app_wmsg_t *wmsg)
3276{
3277 nxt_int_t rc;

--- 454 unchanged lines hidden ---
3499}
3500
3501
3502static nxt_int_t
3503nxt_python_prepare_msg(nxt_task_t *task, nxt_app_request_t *r,
3504 nxt_app_wmsg_t *wmsg)
3505{
3506 nxt_int_t rc;

--- 454 unchanged lines hidden ---