1 2/* 3 * Copyright (C) Igor Sysoev 4 * Copyright (C) NGINX, Inc. 5 */ 6
| 1 2/* 3 * Copyright (C) Igor Sysoev 4 * Copyright (C) NGINX, Inc. 5 */ 6
|
| 7#include <math.h>
|
7#include <nxt_router.h> 8#include <nxt_http.h> 9#include <nxt_upstream.h> 10 11 12struct nxt_upstream_round_robin_server_s { 13 nxt_sockaddr_t *sockaddr; 14 15 int32_t current_weight; 16 int32_t effective_weight; 17 int32_t weight; 18 19 uint8_t protocol; 20}; 21 22 23struct nxt_upstream_round_robin_s { 24 uint32_t items; 25 nxt_upstream_round_robin_server_t server[0]; 26}; 27 28 29static nxt_upstream_t *nxt_upstream_round_robin_joint_create( 30 nxt_router_temp_conf_t *tmcf, nxt_upstream_t *upstream); 31static void nxt_upstream_round_robin_server_get(nxt_task_t *task, 32 nxt_upstream_server_t *us); 33 34 35static const nxt_upstream_server_proto_t nxt_upstream_round_robin_proto = { 36 .joint_create = nxt_upstream_round_robin_joint_create, 37 .get = nxt_upstream_round_robin_server_get, 38}; 39 40
| 8#include <nxt_router.h> 9#include <nxt_http.h> 10#include <nxt_upstream.h> 11 12 13struct nxt_upstream_round_robin_server_s { 14 nxt_sockaddr_t *sockaddr; 15 16 int32_t current_weight; 17 int32_t effective_weight; 18 int32_t weight; 19 20 uint8_t protocol; 21}; 22 23 24struct nxt_upstream_round_robin_s { 25 uint32_t items; 26 nxt_upstream_round_robin_server_t server[0]; 27}; 28 29 30static nxt_upstream_t *nxt_upstream_round_robin_joint_create( 31 nxt_router_temp_conf_t *tmcf, nxt_upstream_t *upstream); 32static void nxt_upstream_round_robin_server_get(nxt_task_t *task, 33 nxt_upstream_server_t *us); 34 35 36static const nxt_upstream_server_proto_t nxt_upstream_round_robin_proto = { 37 .joint_create = nxt_upstream_round_robin_joint_create, 38 .get = nxt_upstream_round_robin_server_get, 39}; 40 41
|
41static nxt_conf_map_t nxt_upstream_round_robin_server_conf[] = { 42 { 43 nxt_string("weight"), 44 NXT_CONF_MAP_INT32, 45 offsetof(nxt_upstream_round_robin_server_t, weight), 46 }, 47}; 48 49
| |
50nxt_int_t 51nxt_upstream_round_robin_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf, 52 nxt_conf_value_t *upstream_conf, nxt_upstream_t *upstream) 53{
| 42nxt_int_t 43nxt_upstream_round_robin_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf, 44 nxt_conf_value_t *upstream_conf, nxt_upstream_t *upstream) 45{
|
| 46 double total, k, w;
|
54 size_t size;
| 47 size_t size;
|
55 uint32_t i, n, next;
| 48 uint32_t i, n, next, wt;
|
56 nxt_mp_t *mp; 57 nxt_str_t name; 58 nxt_sockaddr_t *sa;
| 49 nxt_mp_t *mp; 50 nxt_str_t name; 51 nxt_sockaddr_t *sa;
|
59 nxt_conf_value_t *servers_conf, *srvcf;
| 52 nxt_conf_value_t *servers_conf, *srvcf, *wtcf;
|
60 nxt_upstream_round_robin_t *urr; 61 62 static nxt_str_t servers = nxt_string("servers");
| 53 nxt_upstream_round_robin_t *urr; 54 55 static nxt_str_t servers = nxt_string("servers");
|
| 56 static nxt_str_t weight = nxt_string("weight");
|
63 64 mp = tmcf->router_conf->mem_pool; 65 66 servers_conf = nxt_conf_get_object_member(upstream_conf, &servers, NULL); 67 n = nxt_conf_object_members_count(servers_conf); 68
| 57 58 mp = tmcf->router_conf->mem_pool; 59 60 servers_conf = nxt_conf_get_object_member(upstream_conf, &servers, NULL); 61 n = nxt_conf_object_members_count(servers_conf); 62
|
| 63 total = 0.0; 64 next = 0; 65 66 for (i = 0; i < n; i++) { 67 srvcf = nxt_conf_next_object_member(servers_conf, &name, &next); 68 wtcf = nxt_conf_get_object_member(srvcf, &weight, NULL); 69 w = (wtcf != NULL) ? nxt_conf_get_number(wtcf) : 1; 70 total += w; 71 } 72 73 /* 74 * This prevents overflow of int32_t 75 * in nxt_upstream_round_robin_server_get(). 76 */ 77 k = (total == 0) ? 0 : (NXT_INT32_T_MAX / 2) / total; 78 79 if (isinf(k)) { 80 k = 1; 81 } 82
|
69 size = sizeof(nxt_upstream_round_robin_t) 70 + n * sizeof(nxt_upstream_round_robin_server_t); 71 72 urr = nxt_mp_zalloc(mp, size); 73 if (nxt_slow_path(urr == NULL)) { 74 return NXT_ERROR; 75 } 76 77 urr->items = n; 78 next = 0; 79 80 for (i = 0; i < n; i++) { 81 srvcf = nxt_conf_next_object_member(servers_conf, &name, &next); 82 83 sa = nxt_sockaddr_parse(mp, &name); 84 if (nxt_slow_path(sa == NULL)) { 85 return NXT_ERROR; 86 } 87 88 sa->type = SOCK_STREAM; 89 90 urr->server[i].sockaddr = sa;
| 83 size = sizeof(nxt_upstream_round_robin_t) 84 + n * sizeof(nxt_upstream_round_robin_server_t); 85 86 urr = nxt_mp_zalloc(mp, size); 87 if (nxt_slow_path(urr == NULL)) { 88 return NXT_ERROR; 89 } 90 91 urr->items = n; 92 next = 0; 93 94 for (i = 0; i < n; i++) { 95 srvcf = nxt_conf_next_object_member(servers_conf, &name, &next); 96 97 sa = nxt_sockaddr_parse(mp, &name); 98 if (nxt_slow_path(sa == NULL)) { 99 return NXT_ERROR; 100 } 101 102 sa->type = SOCK_STREAM; 103 104 urr->server[i].sockaddr = sa;
|
91 urr->server[i].weight = 1;
| |
92 urr->server[i].protocol = NXT_HTTP_PROTO_H1; 93
| 105 urr->server[i].protocol = NXT_HTTP_PROTO_H1; 106
|
94 nxt_conf_map_object(mp, srvcf, nxt_upstream_round_robin_server_conf, 95 nxt_nitems(nxt_upstream_round_robin_server_conf), 96 &urr->server[i]);
| 107 wtcf = nxt_conf_get_object_member(srvcf, &weight, NULL); 108 w = (wtcf != NULL) ? k * nxt_conf_get_number(wtcf) : k; 109 wt = (w > 1 || w == 0) ? round(w) : 1;
|
97
| 110
|
98 urr->server[i].effective_weight = urr->server[i].weight;
| 111 urr->server[i].weight = wt; 112 urr->server[i].effective_weight = wt;
|
99 } 100 101 upstream->proto = &nxt_upstream_round_robin_proto; 102 upstream->type.round_robin = urr; 103 104 return NXT_OK; 105} 106 107 108static nxt_upstream_t * 109nxt_upstream_round_robin_joint_create(nxt_router_temp_conf_t *tmcf, 110 nxt_upstream_t *upstream) 111{ 112 size_t size; 113 uint32_t i, n; 114 nxt_mp_t *mp; 115 nxt_upstream_t *u; 116 nxt_upstream_round_robin_t *urr, *urrcf; 117 118 mp = tmcf->router_conf->mem_pool; 119 120 u = nxt_mp_alloc(mp, sizeof(nxt_upstream_t)); 121 if (nxt_slow_path(u == NULL)) { 122 return NULL; 123 } 124 125 *u = *upstream; 126 127 urrcf = upstream->type.round_robin; 128 129 size = sizeof(nxt_upstream_round_robin_t) 130 + urrcf->items * sizeof(nxt_upstream_round_robin_server_t); 131 132 urr = nxt_mp_alloc(mp, size); 133 if (nxt_slow_path(urr == NULL)) { 134 return NULL; 135 } 136 137 u->type.round_robin = urr; 138 139 n = urrcf->items; 140 urr->items = n; 141 142 for (i = 0; i < n; i++) { 143 urr->server[i] = urrcf->server[i]; 144 } 145 146 return u; 147} 148 149 150static void 151nxt_upstream_round_robin_server_get(nxt_task_t *task, nxt_upstream_server_t *us) 152{ 153 int32_t total; 154 uint32_t i, n; 155 nxt_upstream_round_robin_t *round_robin; 156 nxt_upstream_round_robin_server_t *s, *best; 157 158 best = NULL; 159 total = 0; 160 161 round_robin = us->upstream->type.round_robin; 162 163 s = round_robin->server; 164 n = round_robin->items; 165 166 for (i = 0; i < n; i++) { 167 168 s[i].current_weight += s[i].effective_weight; 169 total += s[i].effective_weight; 170 171 if (s[i].effective_weight < s[i].weight) { 172 s[i].effective_weight++; 173 } 174 175 if (best == NULL || s[i].current_weight > best->current_weight) { 176 best = &s[i]; 177 } 178 } 179
| 113 } 114 115 upstream->proto = &nxt_upstream_round_robin_proto; 116 upstream->type.round_robin = urr; 117 118 return NXT_OK; 119} 120 121 122static nxt_upstream_t * 123nxt_upstream_round_robin_joint_create(nxt_router_temp_conf_t *tmcf, 124 nxt_upstream_t *upstream) 125{ 126 size_t size; 127 uint32_t i, n; 128 nxt_mp_t *mp; 129 nxt_upstream_t *u; 130 nxt_upstream_round_robin_t *urr, *urrcf; 131 132 mp = tmcf->router_conf->mem_pool; 133 134 u = nxt_mp_alloc(mp, sizeof(nxt_upstream_t)); 135 if (nxt_slow_path(u == NULL)) { 136 return NULL; 137 } 138 139 *u = *upstream; 140 141 urrcf = upstream->type.round_robin; 142 143 size = sizeof(nxt_upstream_round_robin_t) 144 + urrcf->items * sizeof(nxt_upstream_round_robin_server_t); 145 146 urr = nxt_mp_alloc(mp, size); 147 if (nxt_slow_path(urr == NULL)) { 148 return NULL; 149 } 150 151 u->type.round_robin = urr; 152 153 n = urrcf->items; 154 urr->items = n; 155 156 for (i = 0; i < n; i++) { 157 urr->server[i] = urrcf->server[i]; 158 } 159 160 return u; 161} 162 163 164static void 165nxt_upstream_round_robin_server_get(nxt_task_t *task, nxt_upstream_server_t *us) 166{ 167 int32_t total; 168 uint32_t i, n; 169 nxt_upstream_round_robin_t *round_robin; 170 nxt_upstream_round_robin_server_t *s, *best; 171 172 best = NULL; 173 total = 0; 174 175 round_robin = us->upstream->type.round_robin; 176 177 s = round_robin->server; 178 n = round_robin->items; 179 180 for (i = 0; i < n; i++) { 181 182 s[i].current_weight += s[i].effective_weight; 183 total += s[i].effective_weight; 184 185 if (s[i].effective_weight < s[i].weight) { 186 s[i].effective_weight++; 187 } 188 189 if (best == NULL || s[i].current_weight > best->current_weight) { 190 best = &s[i]; 191 } 192 } 193
|
180 if (best == NULL) {
| 194 if (best == NULL || total == 0) {
|
181 us->state->error(task, us); 182 return; 183 } 184 185 best->current_weight -= total; 186 us->sockaddr = best->sockaddr; 187 us->protocol = best->protocol; 188 us->server.round_robin = best; 189 190 us->state->ready(task, us); 191}
| 195 us->state->error(task, us); 196 return; 197 } 198 199 best->current_weight -= total; 200 us->sockaddr = best->sockaddr; 201 us->protocol = best->protocol; 202 us->server.round_robin = best; 203 204 us->state->ready(task, us); 205}
|