Deleted
Added
nxt_router.c (423:449f2a9c5e62) | nxt_router.c (424:38b478d79178) |
---|---|
1 2/* 3 * Copyright (C) Igor Sysoev 4 * Copyright (C) Valentin V. Bartenev 5 * Copyright (C) NGINX, Inc. 6 */ 7 8#include <nxt_router.h> --- 2505 unchanged lines hidden (view full) --- 2514 nxt_assert(nxt_queue_is_empty(&app->ports) != 0); 2515 2516 nxt_thread_mutex_destroy(&app->mutex); 2517 nxt_free(app); 2518 } 2519} 2520 2521 | 1 2/* 3 * Copyright (C) Igor Sysoev 4 * Copyright (C) Valentin V. Bartenev 5 * Copyright (C) NGINX, Inc. 6 */ 7 8#include <nxt_router.h> --- 2505 unchanged lines hidden (view full) --- 2514 nxt_assert(nxt_queue_is_empty(&app->ports) != 0); 2515 2516 nxt_thread_mutex_destroy(&app->mutex); 2517 nxt_free(app); 2518 } 2519} 2520 2521 |
2522nxt_inline nxt_bool_t 2523nxt_router_app_first_port_busy(nxt_app_t *app) 2524{ 2525 nxt_port_t *port; 2526 nxt_queue_link_t *lnk; 2527 2528 lnk = nxt_queue_first(&app->ports); 2529 port = nxt_queue_link_data(lnk, nxt_port_t, app_link); 2530 2531 return port->app_pending_responses > 0; 2532} 2533 2534 |
|
2522nxt_inline nxt_port_t * 2523nxt_router_app_get_port_unsafe(nxt_app_t *app, int *use_delta) 2524{ 2525 nxt_port_t *port; 2526 nxt_queue_link_t *lnk; 2527 2528 lnk = nxt_queue_first(&app->ports); 2529 nxt_queue_remove(lnk); 2530 2531 port = nxt_queue_link_data(lnk, nxt_port_t, app_link); 2532 | 2535nxt_inline nxt_port_t * 2536nxt_router_app_get_port_unsafe(nxt_app_t *app, int *use_delta) 2537{ 2538 nxt_port_t *port; 2539 nxt_queue_link_t *lnk; 2540 2541 lnk = nxt_queue_first(&app->ports); 2542 nxt_queue_remove(lnk); 2543 2544 port = nxt_queue_link_data(lnk, nxt_port_t, app_link); 2545 |
2533 port->app_requests++; | 2546 port->app_pending_responses++; |
2534 | 2547 |
2535 if (app->live && 2536 (app->max_pending_responses == 0 || 2537 (port->app_requests - port->app_responses) < 2538 app->max_pending_responses) ) | 2548 if (app->max_pending_responses == 0 2549 || port->app_pending_responses < app->max_pending_responses) |
2539 { 2540 nxt_queue_insert_tail(&app->ports, lnk); 2541 2542 } else { 2543 lnk->next = NULL; 2544 2545 (*use_delta)--; 2546 } --- 8 unchanged lines hidden (view full) --- 2555 nxt_port_t *port; 2556 2557 port = NULL; 2558 2559 nxt_thread_mutex_lock(&app->mutex); 2560 2561 nxt_queue_each(port, &app->ports, nxt_port_t, app_link) { 2562 | 2550 { 2551 nxt_queue_insert_tail(&app->ports, lnk); 2552 2553 } else { 2554 lnk->next = NULL; 2555 2556 (*use_delta)--; 2557 } --- 8 unchanged lines hidden (view full) --- 2566 nxt_port_t *port; 2567 2568 port = NULL; 2569 2570 nxt_thread_mutex_lock(&app->mutex); 2571 2572 nxt_queue_each(port, &app->ports, nxt_port_t, app_link) { 2573 |
2563 if (port->app_requests > port->app_responses) { | 2574 if (port->app_pending_responses > 0) { |
2564 port = NULL; 2565 2566 continue; 2567 } 2568 2569 nxt_queue_remove(&port->app_link); 2570 port->app_link.next = NULL; 2571 --- 41 unchanged lines hidden (view full) --- 2613 nxt_assert(port->app != NULL); 2614 2615 app = port->app; 2616 2617 use_delta = (request_failed == 0 && got_response == 0) ? 0 : -1; 2618 2619 nxt_thread_mutex_lock(&app->mutex); 2620 | 2575 port = NULL; 2576 2577 continue; 2578 } 2579 2580 nxt_queue_remove(&port->app_link); 2581 port->app_link.next = NULL; 2582 --- 41 unchanged lines hidden (view full) --- 2624 nxt_assert(port->app != NULL); 2625 2626 app = port->app; 2627 2628 use_delta = (request_failed == 0 && got_response == 0) ? 0 : -1; 2629 2630 nxt_thread_mutex_lock(&app->mutex); 2631 |
2621 port->app_requests -= request_failed; | 2632 port->app_pending_responses -= request_failed + got_response; |
2622 port->app_responses += got_response; 2623 2624 if (app->live != 0 && 2625 port->pair[1] != -1 && | 2633 port->app_responses += got_response; 2634 2635 if (app->live != 0 && 2636 port->pair[1] != -1 && |
2626 port->app_link.next == NULL && 2627 (app->max_pending_responses == 0 || 2628 (port->app_requests - port->app_responses) < 2629 app->max_pending_responses) ) | 2637 (app->max_pending_responses == 0 2638 || port->app_pending_responses < app->max_pending_responses)) |
2630 { | 2639 { |
2631 nxt_queue_insert_tail(&app->ports, &port->app_link); 2632 use_delta++; | 2640 if (port->app_link.next == NULL) { 2641 if (port->app_pending_responses > 0) { 2642 nxt_queue_insert_tail(&app->ports, &port->app_link); 2643 2644 } else { 2645 nxt_queue_insert_head(&app->ports, &port->app_link); 2646 } 2647 2648 use_delta++; 2649 2650 } else { 2651 if (port->app_pending_responses == 0 2652 && nxt_queue_first(&app->ports) != &port->app_link) 2653 { 2654 nxt_queue_remove(&port->app_link); 2655 nxt_queue_insert_head(&app->ports, &port->app_link); 2656 } 2657 } |
2633 } 2634 2635 if (app->live != 0 && 2636 !nxt_queue_is_empty(&app->ports) && 2637 !nxt_queue_is_empty(&app->requests)) 2638 { 2639 lnk = nxt_queue_first(&app->requests); 2640 nxt_queue_remove(lnk); --- 4 unchanged lines hidden (view full) --- 2645 ra_use_delta = 1; 2646 ra->app_port = nxt_router_app_get_port_unsafe(app, &ra_use_delta); 2647 2648 } else { 2649 ra = NULL; 2650 ra_use_delta = 0; 2651 } 2652 | 2658 } 2659 2660 if (app->live != 0 && 2661 !nxt_queue_is_empty(&app->ports) && 2662 !nxt_queue_is_empty(&app->requests)) 2663 { 2664 lnk = nxt_queue_first(&app->requests); 2665 nxt_queue_remove(lnk); --- 4 unchanged lines hidden (view full) --- 2670 ra_use_delta = 1; 2671 ra->app_port = nxt_router_app_get_port_unsafe(app, &ra_use_delta); 2672 2673 } else { 2674 ra = NULL; 2675 ra_use_delta = 0; 2676 } 2677 |
2653 send_quit = app->live == 0 && port->app_requests == port->app_responses; | 2678 send_quit = app->live == 0 && port->app_pending_responses > 0; |
2654 2655 nxt_thread_mutex_unlock(&app->mutex); 2656 2657 if (ra != NULL) { 2658 nxt_work_queue_add(&task->thread->engine->fast_work_queue, 2659 nxt_router_app_process_request, 2660 &task->thread->engine->task, app, ra); 2661 --- 83 unchanged lines hidden (view full) --- 2745 nxt_int_t res; 2746 nxt_app_t *app; 2747 nxt_bool_t can_start_worker; 2748 nxt_conn_t *c; 2749 nxt_port_t *port; 2750 nxt_event_engine_t *engine; 2751 nxt_socket_conf_joint_t *joint; 2752 | 2679 2680 nxt_thread_mutex_unlock(&app->mutex); 2681 2682 if (ra != NULL) { 2683 nxt_work_queue_add(&task->thread->engine->fast_work_queue, 2684 nxt_router_app_process_request, 2685 &task->thread->engine->task, app, ra); 2686 --- 83 unchanged lines hidden (view full) --- 2770 nxt_int_t res; 2771 nxt_app_t *app; 2772 nxt_bool_t can_start_worker; 2773 nxt_conn_t *c; 2774 nxt_port_t *port; 2775 nxt_event_engine_t *engine; 2776 nxt_socket_conf_joint_t *joint; 2777 |
2753 port = NULL; | |
2754 use_delta = 1; 2755 c = ra->rc->conn; 2756 2757 joint = c->joint; 2758 app = joint->socket_conf->application; 2759 2760 if (app == NULL) { 2761 nxt_router_gen_error(task, c, 500, --- 9 unchanged lines hidden (view full) --- 2771 2772 nxt_timer_disable(engine, &c->read_timer); 2773 2774 if (app->timeout != 0) { 2775 c->read_timer.handler = nxt_router_app_timeout; 2776 nxt_timer_add(engine, &c->read_timer, app->timeout); 2777 } 2778 | 2778 use_delta = 1; 2779 c = ra->rc->conn; 2780 2781 joint = c->joint; 2782 app = joint->socket_conf->application; 2783 2784 if (app == NULL) { 2785 nxt_router_gen_error(task, c, 500, --- 9 unchanged lines hidden (view full) --- 2795 2796 nxt_timer_disable(engine, &c->read_timer); 2797 2798 if (app->timeout != 0) { 2799 c->read_timer.handler = nxt_router_app_timeout; 2800 nxt_timer_add(engine, &c->read_timer, app->timeout); 2801 } 2802 |
2779 can_start_worker = 0; 2780 | |
2781 nxt_thread_mutex_lock(&app->mutex); 2782 | 2803 nxt_thread_mutex_lock(&app->mutex); 2804 |
2783 if (!nxt_queue_is_empty(&app->ports)) { 2784 port = nxt_router_app_get_port_unsafe(app, &use_delta); | 2805 can_start_worker = (app->workers + app->pending_workers) 2806 < app->max_workers; |
2785 | 2807 |
2786 } else { | 2808 if (nxt_queue_is_empty(&app->ports) 2809 || (can_start_worker && nxt_router_app_first_port_busy(app)) ) 2810 { |
2787 ra = nxt_router_ra_create(task, ra); 2788 2789 if (nxt_fast_path(ra != NULL)) { 2790 nxt_queue_insert_tail(&app->requests, &ra->link); 2791 | 2811 ra = nxt_router_ra_create(task, ra); 2812 2813 if (nxt_fast_path(ra != NULL)) { 2814 nxt_queue_insert_tail(&app->requests, &ra->link); 2815 |
2792 can_start_worker = (app->workers + app->pending_workers) < 2793 app->max_workers; | |
2794 if (can_start_worker) { 2795 app->pending_workers++; 2796 } 2797 } 2798 2799 port = NULL; | 2816 if (can_start_worker) { 2817 app->pending_workers++; 2818 } 2819 } 2820 2821 port = NULL; |
2822 2823 } else { 2824 port = nxt_router_app_get_port_unsafe(app, &use_delta); |
|
2800 } 2801 2802 nxt_thread_mutex_unlock(&app->mutex); 2803 2804 if (nxt_slow_path(ra == NULL)) { 2805 nxt_router_gen_error(task, c, 500, "Failed to allocate " 2806 "req<->app link"); 2807 return NXT_ERROR; --- 899 unchanged lines hidden --- | 2825 } 2826 2827 nxt_thread_mutex_unlock(&app->mutex); 2828 2829 if (nxt_slow_path(ra == NULL)) { 2830 nxt_router_gen_error(task, c, 500, "Failed to allocate " 2831 "req<->app link"); 2832 return NXT_ERROR; --- 899 unchanged lines hidden --- |