nxt_router.c (423:449f2a9c5e62) nxt_router.c (424:38b478d79178)
1
2/*
3 * Copyright (C) Igor Sysoev
4 * Copyright (C) Valentin V. Bartenev
5 * Copyright (C) NGINX, Inc.
6 */
7
8#include <nxt_router.h>

--- 2505 unchanged lines hidden (view full) ---

2514 nxt_assert(nxt_queue_is_empty(&app->ports) != 0);
2515
2516 nxt_thread_mutex_destroy(&app->mutex);
2517 nxt_free(app);
2518 }
2519}
2520
2521
1
2/*
3 * Copyright (C) Igor Sysoev
4 * Copyright (C) Valentin V. Bartenev
5 * Copyright (C) NGINX, Inc.
6 */
7
8#include <nxt_router.h>

--- 2505 unchanged lines hidden (view full) ---

2514 nxt_assert(nxt_queue_is_empty(&app->ports) != 0);
2515
2516 nxt_thread_mutex_destroy(&app->mutex);
2517 nxt_free(app);
2518 }
2519}
2520
2521
2522nxt_inline nxt_bool_t
2523nxt_router_app_first_port_busy(nxt_app_t *app)
2524{
2525 nxt_port_t *port;
2526 nxt_queue_link_t *lnk;
2527
2528 lnk = nxt_queue_first(&app->ports);
2529 port = nxt_queue_link_data(lnk, nxt_port_t, app_link);
2530
2531 return port->app_pending_responses > 0;
2532}
2533
2534
2522nxt_inline nxt_port_t *
2523nxt_router_app_get_port_unsafe(nxt_app_t *app, int *use_delta)
2524{
2525 nxt_port_t *port;
2526 nxt_queue_link_t *lnk;
2527
2528 lnk = nxt_queue_first(&app->ports);
2529 nxt_queue_remove(lnk);
2530
2531 port = nxt_queue_link_data(lnk, nxt_port_t, app_link);
2532
2535nxt_inline nxt_port_t *
2536nxt_router_app_get_port_unsafe(nxt_app_t *app, int *use_delta)
2537{
2538 nxt_port_t *port;
2539 nxt_queue_link_t *lnk;
2540
2541 lnk = nxt_queue_first(&app->ports);
2542 nxt_queue_remove(lnk);
2543
2544 port = nxt_queue_link_data(lnk, nxt_port_t, app_link);
2545
2533 port->app_requests++;
2546 port->app_pending_responses++;
2534
2547
2535 if (app->live &&
2536 (app->max_pending_responses == 0 ||
2537 (port->app_requests - port->app_responses) <
2538 app->max_pending_responses) )
2548 if (app->max_pending_responses == 0
2549 || port->app_pending_responses < app->max_pending_responses)
2539 {
2540 nxt_queue_insert_tail(&app->ports, lnk);
2541
2542 } else {
2543 lnk->next = NULL;
2544
2545 (*use_delta)--;
2546 }

--- 8 unchanged lines hidden (view full) ---

2555 nxt_port_t *port;
2556
2557 port = NULL;
2558
2559 nxt_thread_mutex_lock(&app->mutex);
2560
2561 nxt_queue_each(port, &app->ports, nxt_port_t, app_link) {
2562
2550 {
2551 nxt_queue_insert_tail(&app->ports, lnk);
2552
2553 } else {
2554 lnk->next = NULL;
2555
2556 (*use_delta)--;
2557 }

--- 8 unchanged lines hidden (view full) ---

2566 nxt_port_t *port;
2567
2568 port = NULL;
2569
2570 nxt_thread_mutex_lock(&app->mutex);
2571
2572 nxt_queue_each(port, &app->ports, nxt_port_t, app_link) {
2573
2563 if (port->app_requests > port->app_responses) {
2574 if (port->app_pending_responses > 0) {
2564 port = NULL;
2565
2566 continue;
2567 }
2568
2569 nxt_queue_remove(&port->app_link);
2570 port->app_link.next = NULL;
2571

--- 41 unchanged lines hidden (view full) ---

2613 nxt_assert(port->app != NULL);
2614
2615 app = port->app;
2616
2617 use_delta = (request_failed == 0 && got_response == 0) ? 0 : -1;
2618
2619 nxt_thread_mutex_lock(&app->mutex);
2620
2575 port = NULL;
2576
2577 continue;
2578 }
2579
2580 nxt_queue_remove(&port->app_link);
2581 port->app_link.next = NULL;
2582

--- 41 unchanged lines hidden (view full) ---

2624 nxt_assert(port->app != NULL);
2625
2626 app = port->app;
2627
2628 use_delta = (request_failed == 0 && got_response == 0) ? 0 : -1;
2629
2630 nxt_thread_mutex_lock(&app->mutex);
2631
2621 port->app_requests -= request_failed;
2632 port->app_pending_responses -= request_failed + got_response;
2622 port->app_responses += got_response;
2623
2624 if (app->live != 0 &&
2625 port->pair[1] != -1 &&
2633 port->app_responses += got_response;
2634
2635 if (app->live != 0 &&
2636 port->pair[1] != -1 &&
2626 port->app_link.next == NULL &&
2627 (app->max_pending_responses == 0 ||
2628 (port->app_requests - port->app_responses) <
2629 app->max_pending_responses) )
2637 (app->max_pending_responses == 0
2638 || port->app_pending_responses < app->max_pending_responses))
2630 {
2639 {
2631 nxt_queue_insert_tail(&app->ports, &port->app_link);
2632 use_delta++;
2640 if (port->app_link.next == NULL) {
2641 if (port->app_pending_responses > 0) {
2642 nxt_queue_insert_tail(&app->ports, &port->app_link);
2643
2644 } else {
2645 nxt_queue_insert_head(&app->ports, &port->app_link);
2646 }
2647
2648 use_delta++;
2649
2650 } else {
2651 if (port->app_pending_responses == 0
2652 && nxt_queue_first(&app->ports) != &port->app_link)
2653 {
2654 nxt_queue_remove(&port->app_link);
2655 nxt_queue_insert_head(&app->ports, &port->app_link);
2656 }
2657 }
2633 }
2634
2635 if (app->live != 0 &&
2636 !nxt_queue_is_empty(&app->ports) &&
2637 !nxt_queue_is_empty(&app->requests))
2638 {
2639 lnk = nxt_queue_first(&app->requests);
2640 nxt_queue_remove(lnk);

--- 4 unchanged lines hidden (view full) ---

2645 ra_use_delta = 1;
2646 ra->app_port = nxt_router_app_get_port_unsafe(app, &ra_use_delta);
2647
2648 } else {
2649 ra = NULL;
2650 ra_use_delta = 0;
2651 }
2652
2658 }
2659
2660 if (app->live != 0 &&
2661 !nxt_queue_is_empty(&app->ports) &&
2662 !nxt_queue_is_empty(&app->requests))
2663 {
2664 lnk = nxt_queue_first(&app->requests);
2665 nxt_queue_remove(lnk);

--- 4 unchanged lines hidden (view full) ---

2670 ra_use_delta = 1;
2671 ra->app_port = nxt_router_app_get_port_unsafe(app, &ra_use_delta);
2672
2673 } else {
2674 ra = NULL;
2675 ra_use_delta = 0;
2676 }
2677
2653 send_quit = app->live == 0 && port->app_requests == port->app_responses;
2678 send_quit = app->live == 0 && port->app_pending_responses > 0;
2654
2655 nxt_thread_mutex_unlock(&app->mutex);
2656
2657 if (ra != NULL) {
2658 nxt_work_queue_add(&task->thread->engine->fast_work_queue,
2659 nxt_router_app_process_request,
2660 &task->thread->engine->task, app, ra);
2661

--- 83 unchanged lines hidden (view full) ---

2745 nxt_int_t res;
2746 nxt_app_t *app;
2747 nxt_bool_t can_start_worker;
2748 nxt_conn_t *c;
2749 nxt_port_t *port;
2750 nxt_event_engine_t *engine;
2751 nxt_socket_conf_joint_t *joint;
2752
2679
2680 nxt_thread_mutex_unlock(&app->mutex);
2681
2682 if (ra != NULL) {
2683 nxt_work_queue_add(&task->thread->engine->fast_work_queue,
2684 nxt_router_app_process_request,
2685 &task->thread->engine->task, app, ra);
2686

--- 83 unchanged lines hidden (view full) ---

2770 nxt_int_t res;
2771 nxt_app_t *app;
2772 nxt_bool_t can_start_worker;
2773 nxt_conn_t *c;
2774 nxt_port_t *port;
2775 nxt_event_engine_t *engine;
2776 nxt_socket_conf_joint_t *joint;
2777
2753 port = NULL;
2754 use_delta = 1;
2755 c = ra->rc->conn;
2756
2757 joint = c->joint;
2758 app = joint->socket_conf->application;
2759
2760 if (app == NULL) {
2761 nxt_router_gen_error(task, c, 500,

--- 9 unchanged lines hidden (view full) ---

2771
2772 nxt_timer_disable(engine, &c->read_timer);
2773
2774 if (app->timeout != 0) {
2775 c->read_timer.handler = nxt_router_app_timeout;
2776 nxt_timer_add(engine, &c->read_timer, app->timeout);
2777 }
2778
2778 use_delta = 1;
2779 c = ra->rc->conn;
2780
2781 joint = c->joint;
2782 app = joint->socket_conf->application;
2783
2784 if (app == NULL) {
2785 nxt_router_gen_error(task, c, 500,

--- 9 unchanged lines hidden (view full) ---

2795
2796 nxt_timer_disable(engine, &c->read_timer);
2797
2798 if (app->timeout != 0) {
2799 c->read_timer.handler = nxt_router_app_timeout;
2800 nxt_timer_add(engine, &c->read_timer, app->timeout);
2801 }
2802
2779 can_start_worker = 0;
2780
2781 nxt_thread_mutex_lock(&app->mutex);
2782
2803 nxt_thread_mutex_lock(&app->mutex);
2804
2783 if (!nxt_queue_is_empty(&app->ports)) {
2784 port = nxt_router_app_get_port_unsafe(app, &use_delta);
2805 can_start_worker = (app->workers + app->pending_workers)
2806 < app->max_workers;
2785
2807
2786 } else {
2808 if (nxt_queue_is_empty(&app->ports)
2809 || (can_start_worker && nxt_router_app_first_port_busy(app)) )
2810 {
2787 ra = nxt_router_ra_create(task, ra);
2788
2789 if (nxt_fast_path(ra != NULL)) {
2790 nxt_queue_insert_tail(&app->requests, &ra->link);
2791
2811 ra = nxt_router_ra_create(task, ra);
2812
2813 if (nxt_fast_path(ra != NULL)) {
2814 nxt_queue_insert_tail(&app->requests, &ra->link);
2815
2792 can_start_worker = (app->workers + app->pending_workers) <
2793 app->max_workers;
2794 if (can_start_worker) {
2795 app->pending_workers++;
2796 }
2797 }
2798
2799 port = NULL;
2816 if (can_start_worker) {
2817 app->pending_workers++;
2818 }
2819 }
2820
2821 port = NULL;
2822
2823 } else {
2824 port = nxt_router_app_get_port_unsafe(app, &use_delta);
2800 }
2801
2802 nxt_thread_mutex_unlock(&app->mutex);
2803
2804 if (nxt_slow_path(ra == NULL)) {
2805 nxt_router_gen_error(task, c, 500, "Failed to allocate "
2806 "req<->app link");
2807 return NXT_ERROR;

--- 899 unchanged lines hidden ---
2825 }
2826
2827 nxt_thread_mutex_unlock(&app->mutex);
2828
2829 if (nxt_slow_path(ra == NULL)) {
2830 nxt_router_gen_error(task, c, 500, "Failed to allocate "
2831 "req<->app link");
2832 return NXT_ERROR;

--- 899 unchanged lines hidden ---