1
2/*
3 * Copyright (C) Igor Sysoev
4 * Copyright (C) Valentin V. Bartenev
5 * Copyright (C) NGINX, Inc.
6 */
7
8#include <nxt_router.h>
9#include <nxt_conf.h>
10
11
12typedef struct {
13 nxt_str_t type;
14 uint32_t workers;
15 nxt_msec_t timeout;
16 nxt_msec_t res_timeout;
17 uint32_t requests;
18 nxt_conf_value_t *limits_value;
19} nxt_router_app_conf_t;
20
21
22typedef struct {
23 nxt_str_t application;
24} nxt_router_listener_conf_t;

--- 26 unchanged lines hidden (view full) ---

51 uint32_t stream;
52 nxt_atomic_t use_count;
53 nxt_port_t *app_port;
54 nxt_port_t *reply_port;
55 nxt_app_parse_ctx_t *ap;
56 nxt_msg_info_t msg_info;
57 nxt_req_conn_link_t *rc;
58
59 nxt_nsec_t res_time;
60
61 nxt_queue_link_t link_app_requests; /* for nxt_app_t.requests */
62 nxt_queue_link_t link_port_pending; /* for nxt_port_t.pending_requests */
63 nxt_queue_link_t link_app_pending; /* for nxt_app_t.pending */
64
65 nxt_mp_t *mem_pool;
66 nxt_work_t work;
67
68 int err_code;
69 const char *err_str;
70};
71
72
73typedef struct {
74 nxt_socket_conf_t *socket_conf;
75 nxt_router_temp_conf_t *temp_conf;
76} nxt_socket_rpc_t;
77
78
79struct nxt_port_select_state_s {
80 nxt_app_t *app;
81 nxt_req_app_link_t *ra;
82
83 nxt_port_t *failed_port;
84 int failed_port_use_delta;
85
86 nxt_bool_t can_start_worker;
87 nxt_req_app_link_t *shared_ra;
88 nxt_port_t *port;
89};
90
91typedef struct nxt_port_select_state_s nxt_port_select_state_t;
92
93static void nxt_router_port_select(nxt_task_t *task,
94 nxt_port_select_state_t *state);
95
96static nxt_int_t nxt_router_port_post_select(nxt_task_t *task,
97 nxt_port_select_state_t *state);
98
99static nxt_int_t nxt_router_start_worker(nxt_task_t *task, nxt_app_t *app);
100
101nxt_inline void
102nxt_router_ra_inc_use(nxt_req_app_link_t *ra)
103{
104 nxt_atomic_fetch_add(&ra->use_count, 1);
105}
106

--- 84 unchanged lines hidden (view full) ---

191static void nxt_router_app_port_ready(nxt_task_t *task,
192 nxt_port_recv_msg_t *msg, void *data);
193static void nxt_router_app_port_error(nxt_task_t *task,
194 nxt_port_recv_msg_t *msg, void *data);
195
196static nxt_port_t * nxt_router_app_get_idle_port(nxt_app_t *app);
197static void nxt_router_app_port_release(nxt_task_t *task, nxt_port_t *port,
198 uint32_t request_failed, uint32_t got_response);
175static nxt_int_t nxt_router_app_port(nxt_task_t *task, nxt_req_app_link_t *ra);
199static nxt_int_t nxt_router_app_port(nxt_task_t *task, nxt_app_t *app,
200 nxt_req_app_link_t *ra);
201
202static void nxt_router_conn_init(nxt_task_t *task, void *obj, void *data);
203static void nxt_router_conn_http_header_parse(nxt_task_t *task, void *obj,
204 void *data);
205static nxt_sockaddr_t *nxt_router_local_addr(nxt_task_t *task, nxt_conn_t *c);
206static void nxt_router_conn_http_body_read(nxt_task_t *task, void *obj,
207 void *data);
208static void nxt_router_process_http_request(nxt_task_t *task,

--- 386 unchanged lines hidden (view full) ---

595nxt_router_ra_error(nxt_req_app_link_t *ra, int code, const char* str)
596{
597 ra->app_port = NULL;
598 ra->err_code = code;
599 ra->err_str = str;
600}
601
602
603nxt_inline void
604nxt_router_ra_pending(nxt_task_t *task, nxt_app_t *app, nxt_req_app_link_t *ra)
605{
606 nxt_queue_insert_tail(&ra->app_port->pending_requests,
607 &ra->link_port_pending);
608 nxt_queue_insert_tail(&app->pending, &ra->link_app_pending);
609
610 nxt_router_ra_inc_use(ra);
611
612 ra->res_time = nxt_thread_monotonic_time(task->thread) + app->res_timeout;
613
614 nxt_debug(task, "ra stream #%uD enqueue to pending_requests", ra->stream);
615}
616
617
618nxt_inline nxt_bool_t
619nxt_queue_chk_remove(nxt_queue_link_t *lnk)
620{
621 if (lnk->next != NULL) {
622 nxt_queue_remove(lnk);
623
624 lnk->next = NULL;
625

--- 24 unchanged lines hidden (view full) ---

650 rc->ra = NULL;
651 ra->rc = NULL;
652
653 ra_use_delta = 0;
654
655 nxt_thread_mutex_lock(&rc->app->mutex);
656
657 if (ra->link_app_requests.next == NULL
618 && ra->link_port_pending.next == NULL)
658 && ra->link_port_pending.next == NULL
659 && ra->link_app_pending.next == NULL)
660 {
661 ra = NULL;
662
663 } else {
664 ra_use_delta -= nxt_queue_chk_remove(&ra->link_app_requests);
665 ra_use_delta -= nxt_queue_chk_remove(&ra->link_port_pending);
666 nxt_queue_chk_remove(&ra->link_app_pending);
667 }
668
669 nxt_thread_mutex_unlock(&rc->app->mutex);
670
671 if (ra != NULL) {
672 nxt_router_ra_use(task, ra, ra_use_delta);
673 }
674 }

--- 336 unchanged lines hidden (view full) ---

1011static nxt_conf_map_t nxt_router_app_limits_conf[] = {
1012 {
1013 nxt_string("timeout"),
1014 NXT_CONF_MAP_MSEC,
1015 offsetof(nxt_router_app_conf_t, timeout),
1016 },
1017
1018 {
1019 nxt_string("reschedule_timeout"),
1020 NXT_CONF_MAP_MSEC,
1021 offsetof(nxt_router_app_conf_t, res_timeout),
1022 },
1023
1024 {
1025 nxt_string("requests"),
1026 NXT_CONF_MAP_INT32,
1027 offsetof(nxt_router_app_conf_t, requests),
1028 },
1029};
1030
1031
1032static nxt_conf_map_t nxt_router_listener_conf[] = {

--- 137 unchanged lines hidden (view full) ---

1170
1171 nxt_queue_remove(&prev->link);
1172 nxt_queue_insert_tail(&tmcf->previous, &prev->link);
1173 continue;
1174 }
1175
1176 apcf.workers = 1;
1177 apcf.timeout = 0;
1178 apcf.res_timeout = 1000;
1179 apcf.requests = 0;
1180 apcf.limits_value = NULL;
1181
1182 ret = nxt_conf_map_object(mp, application, nxt_router_app_conf,
1183 nxt_nitems(nxt_router_app_conf), &apcf);
1184 if (ret != NXT_OK) {
1185 nxt_log(task, NXT_LOG_CRIT, "application map error");
1186 goto app_fail;

--- 13 unchanged lines hidden (view full) ---

1200 if (ret != NXT_OK) {
1201 nxt_log(task, NXT_LOG_CRIT, "application limits map error");
1202 goto app_fail;
1203 }
1204 }
1205
1206 nxt_debug(task, "application type: %V", &apcf.type);
1207 nxt_debug(task, "application workers: %D", apcf.workers);
1159 nxt_debug(task, "application timeout: %D", apcf.timeout);
1208 nxt_debug(task, "application request timeout: %D", apcf.timeout);
1209 nxt_debug(task, "application reschedule timeout: %D", apcf.res_timeout);
1210 nxt_debug(task, "application requests: %D", apcf.requests);
1211
1212 lang = nxt_app_lang_module(task->thread->runtime, &apcf.type);
1213
1214 if (lang == NULL) {
1215 nxt_log(task, NXT_LOG_CRIT, "unknown application type: \"%V\"",
1216 &apcf.type);
1217 goto app_fail;
1218 }
1219
1220 nxt_debug(task, "application language module: \"%s\"", lang->file);
1221
1222 ret = nxt_thread_mutex_create(&app->mutex);
1223 if (ret != NXT_OK) {
1224 goto app_fail;
1225 }
1226
1227 nxt_queue_init(&app->ports);
1228 nxt_queue_init(&app->requests);
1229 nxt_queue_init(&app->pending);
1230
1231 app->name.length = name.length;
1232 nxt_memcpy(app->name.start, name.start, name.length);
1233
1234 app->type = lang->type;
1235 app->max_workers = apcf.workers;
1236 app->timeout = apcf.timeout;
1237 app->res_timeout = apcf.res_timeout * 1000000;
1238 app->live = 1;
1239 app->max_pending_responses = 2;
1240 app->prepare_msg = nxt_app_prepare_msg[lang->type];
1241
1242 nxt_queue_insert_tail(&tmcf->apps, &app->link);
1243
1244 nxt_router_app_use(task, app, 1);
1245 }

--- 1282 unchanged lines hidden (view full) ---

2528 ra = rc->ra;
2529
2530 if (ra != NULL) {
2531 cancelled = nxt_router_msg_cancel(task, &ra->msg_info, ra->stream);
2532
2533 if (cancelled) {
2534 nxt_router_ra_inc_use(ra);
2535
2484 res = nxt_router_app_port(task, ra);
2536 res = nxt_router_app_port(task, rc->app, ra);
2537
2538 if (res == NXT_OK) {
2539 port = ra->app_port;
2540
2541 nxt_assert(port != NULL);
2542
2543 nxt_port_rpc_ex_set_peer(task, task->thread->engine->port, rc,
2544 port->pid);

--- 204 unchanged lines hidden (view full) ---

2749 lnk = nxt_queue_first(&app->ports);
2750 port = nxt_queue_link_data(lnk, nxt_port_t, app_link);
2751
2752 return port->app_pending_responses > 0;
2753}
2754
2755
2756nxt_inline nxt_port_t *
2705nxt_router_app_get_port_unsafe(nxt_app_t *app)
2757nxt_router_pop_first_port(nxt_app_t *app)
2758{
2759 nxt_port_t *port;
2760 nxt_queue_link_t *lnk;
2761
2762 lnk = nxt_queue_first(&app->ports);
2763 nxt_queue_remove(lnk);
2764
2765 port = nxt_queue_link_data(lnk, nxt_port_t, app_link);

--- 64 unchanged lines hidden (view full) ---

2830 nxt_router_app_prepare_request(task, ra);
2831}
2832
2833
2834static void
2835nxt_router_app_port_release(nxt_task_t *task, nxt_port_t *port,
2836 uint32_t request_failed, uint32_t got_response)
2837{
2786 nxt_app_t *app;
2787 nxt_bool_t send_quit;
2788 nxt_queue_link_t *lnk;
2789 nxt_req_app_link_t *ra, *next_ra;
2838 nxt_app_t *app;
2839 nxt_bool_t send_quit, cancelled;
2840 nxt_queue_link_t *lnk;
2841 nxt_req_app_link_t *ra, *pending_ra, *re_ra;
2842 nxt_port_select_state_t state;
2843
2844 nxt_assert(port != NULL);
2845 nxt_assert(port->app != NULL);
2846
2847 ra = NULL;
2848
2849 app = port->app;
2850
2851 nxt_thread_mutex_lock(&app->mutex);
2852
2853 port->app_pending_responses -= request_failed + got_response;
2854 port->app_responses += got_response;
2855
2801 if (app->live != 0
2802 && port->pair[1] != -1
2856 if (nxt_slow_path(app->live == 0)) {
2857 goto app_dead;
2858 }
2859
2860 if (port->pair[1] != -1
2861 && (app->max_pending_responses == 0
2862 || port->app_pending_responses < app->max_pending_responses))
2863 {
2864 if (port->app_link.next == NULL) {
2865 if (port->app_pending_responses > 0) {
2866 nxt_queue_insert_tail(&app->ports, &port->app_link);
2867
2868 } else {

--- 7 unchanged lines hidden (view full) ---

2876 && nxt_queue_first(&app->ports) != &port->app_link)
2877 {
2878 nxt_queue_remove(&port->app_link);
2879 nxt_queue_insert_head(&app->ports, &port->app_link);
2880 }
2881 }
2882 }
2883
2826 if (app->live != 0
2827 && !nxt_queue_is_empty(&app->ports)
2884 if (!nxt_queue_is_empty(&app->ports)
2885 && !nxt_queue_is_empty(&app->requests))
2886 {
2887 lnk = nxt_queue_first(&app->requests);
2888 nxt_queue_remove(lnk);
2889 lnk->next = NULL;
2890
2891 ra = nxt_queue_link_data(lnk, nxt_req_app_link_t, link_app_requests);
2892
2836 ra->app_port = nxt_router_app_get_port_unsafe(app);
2893 ra->app_port = nxt_router_pop_first_port(app);
2894
2895 if (ra->app_port->app_pending_responses > 1) {
2839 nxt_queue_insert_tail(&ra->app_port->pending_requests,
2840 &ra->link_port_pending);
2841
2842 nxt_router_ra_inc_use(ra);
2896 nxt_router_ra_pending(task, app, ra);
2897 }
2844
2845 } else {
2846 ra = NULL;
2898 }
2899
2900app_dead:
2901
2902 /* Pop first pending request for this port. */
2903 if ((request_failed > 0 || got_response > 0)
2904 && !nxt_queue_is_empty(&port->pending_requests))
2905 {
2906 lnk = nxt_queue_first(&port->pending_requests);
2907 nxt_queue_remove(lnk);
2908 lnk->next = NULL;
2909
2856 next_ra = nxt_queue_link_data(lnk, nxt_req_app_link_t,
2857 link_port_pending);
2910 pending_ra = nxt_queue_link_data(lnk, nxt_req_app_link_t,
2911 link_port_pending);
2912
2913 nxt_assert(pending_ra->link_app_pending.next != NULL);
2914
2915 nxt_queue_remove(&pending_ra->link_app_pending);
2916 pending_ra->link_app_pending.next = NULL;
2917
2918 } else {
2860 next_ra = NULL;
2919 pending_ra = NULL;
2920 }
2921
2922 /* Try to cancel and re-schedule first stalled request for this app. */
2923 if (got_response > 0 && !nxt_queue_is_empty(&app->pending)) {
2924 lnk = nxt_queue_first(&app->pending);
2925
2926 re_ra = nxt_queue_link_data(lnk, nxt_req_app_link_t, link_app_pending);
2927
2928 if (re_ra->res_time <= nxt_thread_monotonic_time(task->thread)) {
2929
2930 nxt_debug(task, "app '%V' stalled request #%uD detected",
2931 &app->name, re_ra->stream);
2932
2933 cancelled = nxt_router_msg_cancel(task, &re_ra->msg_info,
2934 re_ra->stream);
2935
2936 if (cancelled) {
2937 nxt_router_ra_inc_use(re_ra);
2938
2939 state.ra = re_ra;
2940 state.app = app;
2941
2942 nxt_router_port_select(task, &state);
2943
2944 goto re_ra_cancelled;
2945 }
2946 }
2947 }
2948
2949 re_ra = NULL;
2950
2951re_ra_cancelled:
2952
2953 send_quit = app->live == 0 && port->app_pending_responses > 0;
2954
2955 nxt_thread_mutex_unlock(&app->mutex);
2956
2867 if (next_ra != NULL) {
2868 nxt_router_ra_use(task, next_ra, -1);
2957 if (pending_ra != NULL) {
2958 nxt_router_ra_use(task, pending_ra, -1);
2959 }
2960
2961 if (re_ra != NULL) {
2962 if (nxt_router_port_post_select(task, &state) == NXT_OK) {
2963 nxt_work_queue_add(&task->thread->engine->fast_work_queue,
2964 nxt_router_app_process_request,
2965 &task->thread->engine->task, app, re_ra);
2966 }
2967 }
2968
2969 if (ra != NULL) {
2970 nxt_router_ra_use(task, ra, -1);
2971
2972 nxt_work_queue_add(&task->thread->engine->fast_work_queue,
2973 nxt_router_app_process_request,
2974 &task->thread->engine->task, app, ra);
2975
2976 goto adjust_use;

--- 66 unchanged lines hidden (view full) ---

3043 }
3044
3045 if (start_worker) {
3046 nxt_router_start_worker(task, app);
3047 }
3048}
3049
3050
2953static nxt_int_t
2954nxt_router_app_port(nxt_task_t *task, nxt_req_app_link_t *ra)
3051static void
3052nxt_router_port_select(nxt_task_t *task, nxt_port_select_state_t *state)
3053{
2956 int failed_port_use_delta;
2957 nxt_int_t res;
2958 nxt_app_t *app;
2959 nxt_bool_t can_start_worker;
2960 nxt_conn_t *c;
2961 nxt_port_t *port, *failed_port;
3054 nxt_app_t *app;
3055 nxt_req_app_link_t *ra;
3056
2963 c = ra->rc->conn;
2964 app = ra->rc->app;
3057 ra = state->ra;
3058 app = state->app;
3059
2966 failed_port_use_delta = 0;
3060 state->failed_port_use_delta = 0;
3061
2968 nxt_thread_mutex_lock(&app->mutex);
2969
3062 if (nxt_queue_chk_remove(&ra->link_app_requests))
3063 {
3064 nxt_router_ra_dec_use(ra);
3065 }
3066
3067 if (nxt_queue_chk_remove(&ra->link_port_pending))
3068 {
3069 nxt_assert(ra->link_app_pending.next != NULL);
3070
3071 nxt_queue_remove(&ra->link_app_pending);
3072 ra->link_app_pending.next = NULL;
3073
3074 nxt_router_ra_dec_use(ra);
3075 }
3076
3077 state->failed_port = ra->app_port;
3078
3079 if (ra->app_port != NULL) {
2981 failed_port = ra->app_port;
2982 failed_port_use_delta--;
3080 state->failed_port_use_delta--;
3081
2984 failed_port->app_pending_responses--;
3082 state->failed_port->app_pending_responses--;
3083
2986 if (failed_port->app_link.next != NULL) {
2987 nxt_queue_remove(&failed_port->app_link);
2988 failed_port->app_link.next = NULL;
2989
2990 failed_port_use_delta--;
3084 if (nxt_queue_chk_remove(&state->failed_port->app_link)) {
3085 state->failed_port_use_delta--;
3086 }
3087
2993 } else {
2994 failed_port = NULL;
3088 ra->app_port = NULL;
3089 }
3090
2997 can_start_worker = (app->workers + app->pending_workers)
2998 < app->max_workers;
3091 state->can_start_worker = (app->workers + app->pending_workers)
3092 < app->max_workers;
3093 state->port = NULL;
3094
3095 if (nxt_queue_is_empty(&app->ports)
3001 || (can_start_worker && nxt_router_app_first_port_busy(app)) )
3096 || (state->can_start_worker && nxt_router_app_first_port_busy(app)) )
3097 {
3098 ra = nxt_router_ra_create(task, ra);
3099
3005 if (nxt_fast_path(ra != NULL)) {
3100 if (nxt_slow_path(ra == NULL)) {
3101 goto fail;
3102 }
3103
3104 if (nxt_slow_path(state->failed_port != NULL)) {
3105 nxt_queue_insert_head(&app->requests, &ra->link_app_requests);
3106
3107 } else {
3108 nxt_queue_insert_tail(&app->requests, &ra->link_app_requests);
3109 }
3110
3008 nxt_router_ra_inc_use(ra);
3111 nxt_router_ra_inc_use(ra);
3112
3010 nxt_debug(task, "ra stream #%uD enqueue to app->requests",
3011 ra->stream);
3113 nxt_debug(task, "ra stream #%uD enqueue to app->requests", ra->stream);
3114
3013 if (can_start_worker) {
3014 app->pending_workers++;
3015 }
3115 if (state->can_start_worker) {
3116 app->pending_workers++;
3117 }
3118
3018 port = NULL;
3019
3119 } else {
3021 port = nxt_router_app_get_port_unsafe(app);
3120 state->port = nxt_router_pop_first_port(app);
3121
3023 if (port->app_pending_responses > 1) {
3122 if (state->port->app_pending_responses > 1) {
3123 ra = nxt_router_ra_create(task, ra);
3124
3026 if (nxt_fast_path(ra != NULL)) {
3027 nxt_queue_insert_tail(&port->pending_requests,
3028 &ra->link_port_pending);
3125 if (nxt_slow_path(ra == NULL)) {
3126 goto fail;
3127 }
3128
3030 nxt_router_ra_inc_use(ra);
3129 ra->app_port = state->port;
3130
3032 nxt_debug(task, "ra stream #%uD enqueue to "
3033 "port->pending_requests", ra->stream);
3034 }
3131 nxt_router_ra_pending(task, app, ra);
3132 }
3133 }
3134
3038 nxt_thread_mutex_unlock(&app->mutex);
3135fail:
3136
3040 if (failed_port_use_delta != 0) {
3041 nxt_port_use(task, failed_port, failed_port_use_delta);
3137 state->shared_ra = ra;
3138}
3139
3140
3141static nxt_int_t
3142nxt_router_port_post_select(nxt_task_t *task, nxt_port_select_state_t *state)
3143{
3144 nxt_int_t res;
3145 nxt_app_t *app;
3146 nxt_req_app_link_t *ra;
3147
3148 ra = state->shared_ra;
3149 app = state->app;
3150
3151 if (state->failed_port_use_delta != 0) {
3152 nxt_port_use(task, state->failed_port, state->failed_port_use_delta);
3153 }
3154
3155 if (nxt_slow_path(ra == NULL)) {
3045 nxt_router_gen_error(task, c, 500, "Failed to allocate "
3046 "req<->app link");
3047
3048 if (port != NULL) {
3049 nxt_port_use(task, port, -1);
3156 if (state->port != NULL) {
3157 nxt_port_use(task, state->port, -1);
3158 }
3159
3160 nxt_router_ra_error(state->ra, 500,
3161 "Failed to allocate shared req<->app link");
3162 nxt_router_ra_use(task, state->ra, -1);
3163
3164 return NXT_ERROR;
3165 }
3166
3055 if (port != NULL) {
3167 if (state->port != NULL) {
3168 nxt_debug(task, "already have port for app '%V' %p ", &app->name, app);
3169
3058 ra->app_port = port;
3170 ra->app_port = state->port;
3171
3172 return NXT_OK;
3173 }
3174
3063 if (!can_start_worker) {
3175 if (!state->can_start_worker) {
3176 nxt_debug(task, "app '%V' %p too many running or pending workers",
3177 &app->name, app);
3178
3179 return NXT_AGAIN;
3180 }
3181
3182 res = nxt_router_start_worker(task, app);
3183
3184 if (nxt_slow_path(res != NXT_OK)) {
3073 nxt_router_gen_error(task, c, 500, "Failed to start worker");
3185 nxt_router_ra_error(ra, 500, "Failed to start worker");
3186 nxt_router_ra_use(task, ra, -1);
3187
3188 return NXT_ERROR;
3189 }
3190
3191 return NXT_AGAIN;
3192}
3193
3194
3195static nxt_int_t
3196nxt_router_app_port(nxt_task_t *task, nxt_app_t *app, nxt_req_app_link_t *ra)
3197{
3198 nxt_port_select_state_t state;
3199
3200 state.ra = ra;
3201 state.app = app;
3202
3203 nxt_thread_mutex_lock(&app->mutex);
3204
3205 nxt_router_port_select(task, &state);
3206
3207 nxt_thread_mutex_unlock(&app->mutex);
3208
3209 return nxt_router_port_post_select(task, &state);
3210}
3211
3212
3213static void
3214nxt_router_conn_http_header_parse(nxt_task_t *task, void *obj, void *data)
3215{
3216 size_t size;
3217 nxt_int_t ret;
3218 nxt_buf_t *buf;
3219 nxt_conn_t *c;
3220 nxt_sockaddr_t *local;

--- 297 unchanged lines hidden (view full) ---

3518 rc->stream, c, engine);
3519
3520 rc->ap = ap;
3521 c->socket.data = NULL;
3522
3523 ra = &ra_local;
3524 nxt_router_ra_init(task, ra, rc);
3525
3395 res = nxt_router_app_port(task, ra);
3526 res = nxt_router_app_port(task, app, ra);
3527
3528 if (res != NXT_OK) {
3529 return;
3530 }
3531
3532 ra = rc->ra;
3533 port = ra->app_port;
3534

--- 557 unchanged lines hidden ---