Deleted
Added
1 2/* 3 * Copyright (C) Igor Sysoev 4 * Copyright (C) Valentin V. Bartenev 5 * Copyright (C) NGINX, Inc. 6 */ 7 8#include <nxt_router.h> 9#include <nxt_conf.h> 10 11 12typedef struct { 13 nxt_str_t type; 14 uint32_t workers; 15 nxt_msec_t timeout; |
16 nxt_msec_t res_timeout; |
17 uint32_t requests; 18 nxt_conf_value_t *limits_value; 19} nxt_router_app_conf_t; 20 21 22typedef struct { 23 nxt_str_t application; 24} nxt_router_listener_conf_t; --- 26 unchanged lines hidden (view full) --- 51 uint32_t stream; 52 nxt_atomic_t use_count; 53 nxt_port_t *app_port; 54 nxt_port_t *reply_port; 55 nxt_app_parse_ctx_t *ap; 56 nxt_msg_info_t msg_info; 57 nxt_req_conn_link_t *rc; 58 |
59 nxt_nsec_t res_time; 60 |
61 nxt_queue_link_t link_app_requests; /* for nxt_app_t.requests */ 62 nxt_queue_link_t link_port_pending; /* for nxt_port_t.pending_requests */ |
63 nxt_queue_link_t link_app_pending; /* for nxt_app_t.pending */ |
64 65 nxt_mp_t *mem_pool; 66 nxt_work_t work; 67 68 int err_code; 69 const char *err_str; 70}; 71 72 73typedef struct { 74 nxt_socket_conf_t *socket_conf; 75 nxt_router_temp_conf_t *temp_conf; 76} nxt_socket_rpc_t; 77 78 |
79struct nxt_port_select_state_s { 80 nxt_app_t *app; 81 nxt_req_app_link_t *ra; 82 83 nxt_port_t *failed_port; 84 int failed_port_use_delta; 85 86 nxt_bool_t can_start_worker; 87 nxt_req_app_link_t *shared_ra; 88 nxt_port_t *port; 89}; 90 91typedef struct nxt_port_select_state_s nxt_port_select_state_t; 92 93static void nxt_router_port_select(nxt_task_t *task, 94 nxt_port_select_state_t *state); 95 96static nxt_int_t nxt_router_port_post_select(nxt_task_t *task, 97 nxt_port_select_state_t *state); 98 |
99static nxt_int_t nxt_router_start_worker(nxt_task_t *task, nxt_app_t *app); 100 101nxt_inline void 102nxt_router_ra_inc_use(nxt_req_app_link_t *ra) 103{ 104 nxt_atomic_fetch_add(&ra->use_count, 1); 105} 106 --- 84 unchanged lines hidden (view full) --- 191static void nxt_router_app_port_ready(nxt_task_t *task, 192 nxt_port_recv_msg_t *msg, void *data); 193static void nxt_router_app_port_error(nxt_task_t *task, 194 nxt_port_recv_msg_t *msg, void *data); 195 196static nxt_port_t * nxt_router_app_get_idle_port(nxt_app_t *app); 197static void nxt_router_app_port_release(nxt_task_t *task, nxt_port_t *port, 198 uint32_t request_failed, uint32_t got_response); |
199static nxt_int_t nxt_router_app_port(nxt_task_t *task, nxt_app_t *app, 200 nxt_req_app_link_t *ra); |
201 202static void nxt_router_conn_init(nxt_task_t *task, void *obj, void *data); 203static void nxt_router_conn_http_header_parse(nxt_task_t *task, void *obj, 204 void *data); 205static nxt_sockaddr_t *nxt_router_local_addr(nxt_task_t *task, nxt_conn_t *c); 206static void nxt_router_conn_http_body_read(nxt_task_t *task, void *obj, 207 void *data); 208static void nxt_router_process_http_request(nxt_task_t *task, --- 386 unchanged lines hidden (view full) --- 595nxt_router_ra_error(nxt_req_app_link_t *ra, int code, const char* str) 596{ 597 ra->app_port = NULL; 598 ra->err_code = code; 599 ra->err_str = str; 600} 601 602 |
603nxt_inline void 604nxt_router_ra_pending(nxt_task_t *task, nxt_app_t *app, nxt_req_app_link_t *ra) 605{ 606 nxt_queue_insert_tail(&ra->app_port->pending_requests, 607 &ra->link_port_pending); 608 nxt_queue_insert_tail(&app->pending, &ra->link_app_pending); 609 610 nxt_router_ra_inc_use(ra); 611 612 ra->res_time = nxt_thread_monotonic_time(task->thread) + app->res_timeout; 613 614 nxt_debug(task, "ra stream #%uD enqueue to pending_requests", ra->stream); 615} 616 617 |
618nxt_inline nxt_bool_t 619nxt_queue_chk_remove(nxt_queue_link_t *lnk) 620{ 621 if (lnk->next != NULL) { 622 nxt_queue_remove(lnk); 623 624 lnk->next = NULL; 625 --- 24 unchanged lines hidden (view full) --- 650 rc->ra = NULL; 651 ra->rc = NULL; 652 653 ra_use_delta = 0; 654 655 nxt_thread_mutex_lock(&rc->app->mutex); 656 657 if (ra->link_app_requests.next == NULL |
658 && ra->link_port_pending.next == NULL 659 && ra->link_app_pending.next == NULL) |
660 { 661 ra = NULL; 662 663 } else { 664 ra_use_delta -= nxt_queue_chk_remove(&ra->link_app_requests); 665 ra_use_delta -= nxt_queue_chk_remove(&ra->link_port_pending); |
666 nxt_queue_chk_remove(&ra->link_app_pending); |
667 } 668 669 nxt_thread_mutex_unlock(&rc->app->mutex); 670 671 if (ra != NULL) { 672 nxt_router_ra_use(task, ra, ra_use_delta); 673 } 674 } --- 336 unchanged lines hidden (view full) --- 1011static nxt_conf_map_t nxt_router_app_limits_conf[] = { 1012 { 1013 nxt_string("timeout"), 1014 NXT_CONF_MAP_MSEC, 1015 offsetof(nxt_router_app_conf_t, timeout), 1016 }, 1017 1018 { |
1019 nxt_string("reschedule_timeout"), 1020 NXT_CONF_MAP_MSEC, 1021 offsetof(nxt_router_app_conf_t, res_timeout), 1022 }, 1023 1024 { |
1025 nxt_string("requests"), 1026 NXT_CONF_MAP_INT32, 1027 offsetof(nxt_router_app_conf_t, requests), 1028 }, 1029}; 1030 1031 1032static nxt_conf_map_t nxt_router_listener_conf[] = { --- 137 unchanged lines hidden (view full) --- 1170 1171 nxt_queue_remove(&prev->link); 1172 nxt_queue_insert_tail(&tmcf->previous, &prev->link); 1173 continue; 1174 } 1175 1176 apcf.workers = 1; 1177 apcf.timeout = 0; |
1178 apcf.res_timeout = 1000; |
1179 apcf.requests = 0; 1180 apcf.limits_value = NULL; 1181 1182 ret = nxt_conf_map_object(mp, application, nxt_router_app_conf, 1183 nxt_nitems(nxt_router_app_conf), &apcf); 1184 if (ret != NXT_OK) { 1185 nxt_log(task, NXT_LOG_CRIT, "application map error"); 1186 goto app_fail; --- 13 unchanged lines hidden (view full) --- 1200 if (ret != NXT_OK) { 1201 nxt_log(task, NXT_LOG_CRIT, "application limits map error"); 1202 goto app_fail; 1203 } 1204 } 1205 1206 nxt_debug(task, "application type: %V", &apcf.type); 1207 nxt_debug(task, "application workers: %D", apcf.workers); |
1208 nxt_debug(task, "application request timeout: %D", apcf.timeout); 1209 nxt_debug(task, "application reschedule timeout: %D", apcf.res_timeout); |
1210 nxt_debug(task, "application requests: %D", apcf.requests); 1211 1212 lang = nxt_app_lang_module(task->thread->runtime, &apcf.type); 1213 1214 if (lang == NULL) { 1215 nxt_log(task, NXT_LOG_CRIT, "unknown application type: \"%V\"", 1216 &apcf.type); 1217 goto app_fail; 1218 } 1219 1220 nxt_debug(task, "application language module: \"%s\"", lang->file); 1221 1222 ret = nxt_thread_mutex_create(&app->mutex); 1223 if (ret != NXT_OK) { 1224 goto app_fail; 1225 } 1226 1227 nxt_queue_init(&app->ports); 1228 nxt_queue_init(&app->requests); |
1229 nxt_queue_init(&app->pending); |
1230 1231 app->name.length = name.length; 1232 nxt_memcpy(app->name.start, name.start, name.length); 1233 1234 app->type = lang->type; 1235 app->max_workers = apcf.workers; 1236 app->timeout = apcf.timeout; |
1237 app->res_timeout = apcf.res_timeout * 1000000; |
1238 app->live = 1; 1239 app->max_pending_responses = 2; 1240 app->prepare_msg = nxt_app_prepare_msg[lang->type]; 1241 1242 nxt_queue_insert_tail(&tmcf->apps, &app->link); 1243 1244 nxt_router_app_use(task, app, 1); 1245 } --- 1282 unchanged lines hidden (view full) --- 2528 ra = rc->ra; 2529 2530 if (ra != NULL) { 2531 cancelled = nxt_router_msg_cancel(task, &ra->msg_info, ra->stream); 2532 2533 if (cancelled) { 2534 nxt_router_ra_inc_use(ra); 2535 |
2536 res = nxt_router_app_port(task, rc->app, ra); |
2537 2538 if (res == NXT_OK) { 2539 port = ra->app_port; 2540 2541 nxt_assert(port != NULL); 2542 2543 nxt_port_rpc_ex_set_peer(task, task->thread->engine->port, rc, 2544 port->pid); --- 204 unchanged lines hidden (view full) --- 2749 lnk = nxt_queue_first(&app->ports); 2750 port = nxt_queue_link_data(lnk, nxt_port_t, app_link); 2751 2752 return port->app_pending_responses > 0; 2753} 2754 2755 2756nxt_inline nxt_port_t * |
2757nxt_router_pop_first_port(nxt_app_t *app) |
2758{ 2759 nxt_port_t *port; 2760 nxt_queue_link_t *lnk; 2761 2762 lnk = nxt_queue_first(&app->ports); 2763 nxt_queue_remove(lnk); 2764 2765 port = nxt_queue_link_data(lnk, nxt_port_t, app_link); --- 64 unchanged lines hidden (view full) --- 2830 nxt_router_app_prepare_request(task, ra); 2831} 2832 2833 2834static void 2835nxt_router_app_port_release(nxt_task_t *task, nxt_port_t *port, 2836 uint32_t request_failed, uint32_t got_response) 2837{ |
2838 nxt_app_t *app; 2839 nxt_bool_t send_quit, cancelled; 2840 nxt_queue_link_t *lnk; 2841 nxt_req_app_link_t *ra, *pending_ra, *re_ra; 2842 nxt_port_select_state_t state; |
2843 2844 nxt_assert(port != NULL); 2845 nxt_assert(port->app != NULL); 2846 |
2847 ra = NULL; 2848 |
2849 app = port->app; 2850 2851 nxt_thread_mutex_lock(&app->mutex); 2852 2853 port->app_pending_responses -= request_failed + got_response; 2854 port->app_responses += got_response; 2855 |
2856 if (nxt_slow_path(app->live == 0)) { 2857 goto app_dead; 2858 } 2859 2860 if (port->pair[1] != -1 |
2861 && (app->max_pending_responses == 0 2862 || port->app_pending_responses < app->max_pending_responses)) 2863 { 2864 if (port->app_link.next == NULL) { 2865 if (port->app_pending_responses > 0) { 2866 nxt_queue_insert_tail(&app->ports, &port->app_link); 2867 2868 } else { --- 7 unchanged lines hidden (view full) --- 2876 && nxt_queue_first(&app->ports) != &port->app_link) 2877 { 2878 nxt_queue_remove(&port->app_link); 2879 nxt_queue_insert_head(&app->ports, &port->app_link); 2880 } 2881 } 2882 } 2883 |
2884 if (!nxt_queue_is_empty(&app->ports) |
2885 && !nxt_queue_is_empty(&app->requests)) 2886 { 2887 lnk = nxt_queue_first(&app->requests); 2888 nxt_queue_remove(lnk); 2889 lnk->next = NULL; 2890 2891 ra = nxt_queue_link_data(lnk, nxt_req_app_link_t, link_app_requests); 2892 |
2893 ra->app_port = nxt_router_pop_first_port(app); |
2894 2895 if (ra->app_port->app_pending_responses > 1) { |
2896 nxt_router_ra_pending(task, app, ra); |
2897 } |
2898 } 2899 |
2900app_dead: 2901 2902 /* Pop first pending request for this port. */ |
2903 if ((request_failed > 0 || got_response > 0) 2904 && !nxt_queue_is_empty(&port->pending_requests)) 2905 { 2906 lnk = nxt_queue_first(&port->pending_requests); 2907 nxt_queue_remove(lnk); 2908 lnk->next = NULL; 2909 |
2910 pending_ra = nxt_queue_link_data(lnk, nxt_req_app_link_t, 2911 link_port_pending); |
2912 |
2913 nxt_assert(pending_ra->link_app_pending.next != NULL); 2914 2915 nxt_queue_remove(&pending_ra->link_app_pending); 2916 pending_ra->link_app_pending.next = NULL; 2917 |
2918 } else { |
2919 pending_ra = NULL; |
2920 } 2921 |
2922 /* Try to cancel and re-schedule first stalled request for this app. */ 2923 if (got_response > 0 && !nxt_queue_is_empty(&app->pending)) { 2924 lnk = nxt_queue_first(&app->pending); 2925 2926 re_ra = nxt_queue_link_data(lnk, nxt_req_app_link_t, link_app_pending); 2927 2928 if (re_ra->res_time <= nxt_thread_monotonic_time(task->thread)) { 2929 2930 nxt_debug(task, "app '%V' stalled request #%uD detected", 2931 &app->name, re_ra->stream); 2932 2933 cancelled = nxt_router_msg_cancel(task, &re_ra->msg_info, 2934 re_ra->stream); 2935 2936 if (cancelled) { 2937 nxt_router_ra_inc_use(re_ra); 2938 2939 state.ra = re_ra; 2940 state.app = app; 2941 2942 nxt_router_port_select(task, &state); 2943 2944 goto re_ra_cancelled; 2945 } 2946 } 2947 } 2948 2949 re_ra = NULL; 2950 2951re_ra_cancelled: 2952 |
2953 send_quit = app->live == 0 && port->app_pending_responses > 0; 2954 2955 nxt_thread_mutex_unlock(&app->mutex); 2956 |
2957 if (pending_ra != NULL) { 2958 nxt_router_ra_use(task, pending_ra, -1); |
2959 } 2960 |
2961 if (re_ra != NULL) { 2962 if (nxt_router_port_post_select(task, &state) == NXT_OK) { 2963 nxt_work_queue_add(&task->thread->engine->fast_work_queue, 2964 nxt_router_app_process_request, 2965 &task->thread->engine->task, app, re_ra); 2966 } 2967 } 2968 |
2969 if (ra != NULL) { 2970 nxt_router_ra_use(task, ra, -1); 2971 2972 nxt_work_queue_add(&task->thread->engine->fast_work_queue, 2973 nxt_router_app_process_request, 2974 &task->thread->engine->task, app, ra); 2975 2976 goto adjust_use; --- 66 unchanged lines hidden (view full) --- 3043 } 3044 3045 if (start_worker) { 3046 nxt_router_start_worker(task, app); 3047 } 3048} 3049 3050 |
3051static void 3052nxt_router_port_select(nxt_task_t *task, nxt_port_select_state_t *state) |
3053{ |
3054 nxt_app_t *app; 3055 nxt_req_app_link_t *ra; |
3056 |
3057 ra = state->ra; 3058 app = state->app; |
3059 |
3060 state->failed_port_use_delta = 0; |
3061 |
3062 if (nxt_queue_chk_remove(&ra->link_app_requests)) 3063 { 3064 nxt_router_ra_dec_use(ra); 3065 } 3066 3067 if (nxt_queue_chk_remove(&ra->link_port_pending)) 3068 { |
3069 nxt_assert(ra->link_app_pending.next != NULL); 3070 3071 nxt_queue_remove(&ra->link_app_pending); 3072 ra->link_app_pending.next = NULL; 3073 |
3074 nxt_router_ra_dec_use(ra); 3075 } 3076 |
3077 state->failed_port = ra->app_port; 3078 |
3079 if (ra->app_port != NULL) { |
3080 state->failed_port_use_delta--; |
3081 |
3082 state->failed_port->app_pending_responses--; |
3083 |
3084 if (nxt_queue_chk_remove(&state->failed_port->app_link)) { 3085 state->failed_port_use_delta--; |
3086 } 3087 |
3088 ra->app_port = NULL; |
3089 } 3090 |
3091 state->can_start_worker = (app->workers + app->pending_workers) 3092 < app->max_workers; 3093 state->port = NULL; |
3094 3095 if (nxt_queue_is_empty(&app->ports) |
3096 || (state->can_start_worker && nxt_router_app_first_port_busy(app)) ) |
3097 { 3098 ra = nxt_router_ra_create(task, ra); 3099 |
3100 if (nxt_slow_path(ra == NULL)) { 3101 goto fail; 3102 } 3103 3104 if (nxt_slow_path(state->failed_port != NULL)) { 3105 nxt_queue_insert_head(&app->requests, &ra->link_app_requests); 3106 3107 } else { |
3108 nxt_queue_insert_tail(&app->requests, &ra->link_app_requests); |
3109 } |
3110 |
3111 nxt_router_ra_inc_use(ra); |
3112 |
3113 nxt_debug(task, "ra stream #%uD enqueue to app->requests", ra->stream); |
3114 |
3115 if (state->can_start_worker) { 3116 app->pending_workers++; |
3117 } 3118 |
3119 } else { |
3120 state->port = nxt_router_pop_first_port(app); |
3121 |
3122 if (state->port->app_pending_responses > 1) { |
3123 ra = nxt_router_ra_create(task, ra); 3124 |
3125 if (nxt_slow_path(ra == NULL)) { 3126 goto fail; 3127 } |
3128 |
3129 ra->app_port = state->port; |
3130 |
3131 nxt_router_ra_pending(task, app, ra); |
3132 } 3133 } 3134 |
3135fail: |
3136 |
3137 state->shared_ra = ra; 3138} 3139 3140 3141static nxt_int_t 3142nxt_router_port_post_select(nxt_task_t *task, nxt_port_select_state_t *state) 3143{ 3144 nxt_int_t res; 3145 nxt_app_t *app; 3146 nxt_req_app_link_t *ra; 3147 3148 ra = state->shared_ra; 3149 app = state->app; 3150 3151 if (state->failed_port_use_delta != 0) { 3152 nxt_port_use(task, state->failed_port, state->failed_port_use_delta); |
3153 } 3154 3155 if (nxt_slow_path(ra == NULL)) { |
3156 if (state->port != NULL) { 3157 nxt_port_use(task, state->port, -1); |
3158 } 3159 |
3160 nxt_router_ra_error(state->ra, 500, 3161 "Failed to allocate shared req<->app link"); 3162 nxt_router_ra_use(task, state->ra, -1); 3163 |
3164 return NXT_ERROR; 3165 } 3166 |
3167 if (state->port != NULL) { |
3168 nxt_debug(task, "already have port for app '%V' %p ", &app->name, app); 3169 |
3170 ra->app_port = state->port; |
3171 3172 return NXT_OK; 3173 } 3174 |
3175 if (!state->can_start_worker) { |
3176 nxt_debug(task, "app '%V' %p too many running or pending workers", 3177 &app->name, app); 3178 3179 return NXT_AGAIN; 3180 } 3181 3182 res = nxt_router_start_worker(task, app); 3183 3184 if (nxt_slow_path(res != NXT_OK)) { |
3185 nxt_router_ra_error(ra, 500, "Failed to start worker"); 3186 nxt_router_ra_use(task, ra, -1); |
3187 3188 return NXT_ERROR; 3189 } 3190 3191 return NXT_AGAIN; 3192} 3193 3194 |
3195static nxt_int_t 3196nxt_router_app_port(nxt_task_t *task, nxt_app_t *app, nxt_req_app_link_t *ra) 3197{ 3198 nxt_port_select_state_t state; 3199 3200 state.ra = ra; 3201 state.app = app; 3202 3203 nxt_thread_mutex_lock(&app->mutex); 3204 3205 nxt_router_port_select(task, &state); 3206 3207 nxt_thread_mutex_unlock(&app->mutex); 3208 3209 return nxt_router_port_post_select(task, &state); 3210} 3211 3212 |
3213static void 3214nxt_router_conn_http_header_parse(nxt_task_t *task, void *obj, void *data) 3215{ 3216 size_t size; 3217 nxt_int_t ret; 3218 nxt_buf_t *buf; 3219 nxt_conn_t *c; 3220 nxt_sockaddr_t *local; --- 297 unchanged lines hidden (view full) --- 3518 rc->stream, c, engine); 3519 3520 rc->ap = ap; 3521 c->socket.data = NULL; 3522 3523 ra = &ra_local; 3524 nxt_router_ra_init(task, ra, rc); 3525 |
3526 res = nxt_router_app_port(task, app, ra); |
3527 3528 if (res != NXT_OK) { 3529 return; 3530 } 3531 3532 ra = rc->ra; 3533 port = ra->app_port; 3534 --- 557 unchanged lines hidden --- |