nxt_upstream_round_robin.c (1394:20b41ebfff79) nxt_upstream_round_robin.c (1440:d1ad3857769c)
1
2/*
3 * Copyright (C) Igor Sysoev
4 * Copyright (C) NGINX, Inc.
5 */
6
1
2/*
3 * Copyright (C) Igor Sysoev
4 * Copyright (C) NGINX, Inc.
5 */
6
7#include <math.h>
7#include <nxt_router.h>
8#include <nxt_http.h>
9#include <nxt_upstream.h>
10
11
12struct nxt_upstream_round_robin_server_s {
13 nxt_sockaddr_t *sockaddr;
14
15 int32_t current_weight;
16 int32_t effective_weight;
17 int32_t weight;
18
19 uint8_t protocol;
20};
21
22
23struct nxt_upstream_round_robin_s {
24 uint32_t items;
25 nxt_upstream_round_robin_server_t server[0];
26};
27
28
29static nxt_upstream_t *nxt_upstream_round_robin_joint_create(
30 nxt_router_temp_conf_t *tmcf, nxt_upstream_t *upstream);
31static void nxt_upstream_round_robin_server_get(nxt_task_t *task,
32 nxt_upstream_server_t *us);
33
34
35static const nxt_upstream_server_proto_t nxt_upstream_round_robin_proto = {
36 .joint_create = nxt_upstream_round_robin_joint_create,
37 .get = nxt_upstream_round_robin_server_get,
38};
39
40
8#include <nxt_router.h>
9#include <nxt_http.h>
10#include <nxt_upstream.h>
11
12
13struct nxt_upstream_round_robin_server_s {
14 nxt_sockaddr_t *sockaddr;
15
16 int32_t current_weight;
17 int32_t effective_weight;
18 int32_t weight;
19
20 uint8_t protocol;
21};
22
23
24struct nxt_upstream_round_robin_s {
25 uint32_t items;
26 nxt_upstream_round_robin_server_t server[0];
27};
28
29
30static nxt_upstream_t *nxt_upstream_round_robin_joint_create(
31 nxt_router_temp_conf_t *tmcf, nxt_upstream_t *upstream);
32static void nxt_upstream_round_robin_server_get(nxt_task_t *task,
33 nxt_upstream_server_t *us);
34
35
36static const nxt_upstream_server_proto_t nxt_upstream_round_robin_proto = {
37 .joint_create = nxt_upstream_round_robin_joint_create,
38 .get = nxt_upstream_round_robin_server_get,
39};
40
41
41static nxt_conf_map_t nxt_upstream_round_robin_server_conf[] = {
42 {
43 nxt_string("weight"),
44 NXT_CONF_MAP_INT32,
45 offsetof(nxt_upstream_round_robin_server_t, weight),
46 },
47};
48
49
50nxt_int_t
51nxt_upstream_round_robin_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf,
52 nxt_conf_value_t *upstream_conf, nxt_upstream_t *upstream)
53{
42nxt_int_t
43nxt_upstream_round_robin_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf,
44 nxt_conf_value_t *upstream_conf, nxt_upstream_t *upstream)
45{
46 double total, k, w;
54 size_t size;
47 size_t size;
55 uint32_t i, n, next;
48 uint32_t i, n, next, wt;
56 nxt_mp_t *mp;
57 nxt_str_t name;
58 nxt_sockaddr_t *sa;
49 nxt_mp_t *mp;
50 nxt_str_t name;
51 nxt_sockaddr_t *sa;
59 nxt_conf_value_t *servers_conf, *srvcf;
52 nxt_conf_value_t *servers_conf, *srvcf, *wtcf;
60 nxt_upstream_round_robin_t *urr;
61
62 static nxt_str_t servers = nxt_string("servers");
53 nxt_upstream_round_robin_t *urr;
54
55 static nxt_str_t servers = nxt_string("servers");
56 static nxt_str_t weight = nxt_string("weight");
63
64 mp = tmcf->router_conf->mem_pool;
65
66 servers_conf = nxt_conf_get_object_member(upstream_conf, &servers, NULL);
67 n = nxt_conf_object_members_count(servers_conf);
68
57
58 mp = tmcf->router_conf->mem_pool;
59
60 servers_conf = nxt_conf_get_object_member(upstream_conf, &servers, NULL);
61 n = nxt_conf_object_members_count(servers_conf);
62
63 total = 0.0;
64 next = 0;
65
66 for (i = 0; i < n; i++) {
67 srvcf = nxt_conf_next_object_member(servers_conf, &name, &next);
68 wtcf = nxt_conf_get_object_member(srvcf, &weight, NULL);
69 w = (wtcf != NULL) ? nxt_conf_get_number(wtcf) : 1;
70 total += w;
71 }
72
73 /*
74 * This prevents overflow of int32_t
75 * in nxt_upstream_round_robin_server_get().
76 */
77 k = (total == 0) ? 0 : (NXT_INT32_T_MAX / 2) / total;
78
79 if (isinf(k)) {
80 k = 1;
81 }
82
69 size = sizeof(nxt_upstream_round_robin_t)
70 + n * sizeof(nxt_upstream_round_robin_server_t);
71
72 urr = nxt_mp_zalloc(mp, size);
73 if (nxt_slow_path(urr == NULL)) {
74 return NXT_ERROR;
75 }
76
77 urr->items = n;
78 next = 0;
79
80 for (i = 0; i < n; i++) {
81 srvcf = nxt_conf_next_object_member(servers_conf, &name, &next);
82
83 sa = nxt_sockaddr_parse(mp, &name);
84 if (nxt_slow_path(sa == NULL)) {
85 return NXT_ERROR;
86 }
87
88 sa->type = SOCK_STREAM;
89
90 urr->server[i].sockaddr = sa;
83 size = sizeof(nxt_upstream_round_robin_t)
84 + n * sizeof(nxt_upstream_round_robin_server_t);
85
86 urr = nxt_mp_zalloc(mp, size);
87 if (nxt_slow_path(urr == NULL)) {
88 return NXT_ERROR;
89 }
90
91 urr->items = n;
92 next = 0;
93
94 for (i = 0; i < n; i++) {
95 srvcf = nxt_conf_next_object_member(servers_conf, &name, &next);
96
97 sa = nxt_sockaddr_parse(mp, &name);
98 if (nxt_slow_path(sa == NULL)) {
99 return NXT_ERROR;
100 }
101
102 sa->type = SOCK_STREAM;
103
104 urr->server[i].sockaddr = sa;
91 urr->server[i].weight = 1;
92 urr->server[i].protocol = NXT_HTTP_PROTO_H1;
93
105 urr->server[i].protocol = NXT_HTTP_PROTO_H1;
106
94 nxt_conf_map_object(mp, srvcf, nxt_upstream_round_robin_server_conf,
95 nxt_nitems(nxt_upstream_round_robin_server_conf),
96 &urr->server[i]);
107 wtcf = nxt_conf_get_object_member(srvcf, &weight, NULL);
108 w = (wtcf != NULL) ? k * nxt_conf_get_number(wtcf) : k;
109 wt = (w > 1 || w == 0) ? round(w) : 1;
97
110
98 urr->server[i].effective_weight = urr->server[i].weight;
111 urr->server[i].weight = wt;
112 urr->server[i].effective_weight = wt;
99 }
100
101 upstream->proto = &nxt_upstream_round_robin_proto;
102 upstream->type.round_robin = urr;
103
104 return NXT_OK;
105}
106
107
108static nxt_upstream_t *
109nxt_upstream_round_robin_joint_create(nxt_router_temp_conf_t *tmcf,
110 nxt_upstream_t *upstream)
111{
112 size_t size;
113 uint32_t i, n;
114 nxt_mp_t *mp;
115 nxt_upstream_t *u;
116 nxt_upstream_round_robin_t *urr, *urrcf;
117
118 mp = tmcf->router_conf->mem_pool;
119
120 u = nxt_mp_alloc(mp, sizeof(nxt_upstream_t));
121 if (nxt_slow_path(u == NULL)) {
122 return NULL;
123 }
124
125 *u = *upstream;
126
127 urrcf = upstream->type.round_robin;
128
129 size = sizeof(nxt_upstream_round_robin_t)
130 + urrcf->items * sizeof(nxt_upstream_round_robin_server_t);
131
132 urr = nxt_mp_alloc(mp, size);
133 if (nxt_slow_path(urr == NULL)) {
134 return NULL;
135 }
136
137 u->type.round_robin = urr;
138
139 n = urrcf->items;
140 urr->items = n;
141
142 for (i = 0; i < n; i++) {
143 urr->server[i] = urrcf->server[i];
144 }
145
146 return u;
147}
148
149
150static void
151nxt_upstream_round_robin_server_get(nxt_task_t *task, nxt_upstream_server_t *us)
152{
153 int32_t total;
154 uint32_t i, n;
155 nxt_upstream_round_robin_t *round_robin;
156 nxt_upstream_round_robin_server_t *s, *best;
157
158 best = NULL;
159 total = 0;
160
161 round_robin = us->upstream->type.round_robin;
162
163 s = round_robin->server;
164 n = round_robin->items;
165
166 for (i = 0; i < n; i++) {
167
168 s[i].current_weight += s[i].effective_weight;
169 total += s[i].effective_weight;
170
171 if (s[i].effective_weight < s[i].weight) {
172 s[i].effective_weight++;
173 }
174
175 if (best == NULL || s[i].current_weight > best->current_weight) {
176 best = &s[i];
177 }
178 }
179
113 }
114
115 upstream->proto = &nxt_upstream_round_robin_proto;
116 upstream->type.round_robin = urr;
117
118 return NXT_OK;
119}
120
121
122static nxt_upstream_t *
123nxt_upstream_round_robin_joint_create(nxt_router_temp_conf_t *tmcf,
124 nxt_upstream_t *upstream)
125{
126 size_t size;
127 uint32_t i, n;
128 nxt_mp_t *mp;
129 nxt_upstream_t *u;
130 nxt_upstream_round_robin_t *urr, *urrcf;
131
132 mp = tmcf->router_conf->mem_pool;
133
134 u = nxt_mp_alloc(mp, sizeof(nxt_upstream_t));
135 if (nxt_slow_path(u == NULL)) {
136 return NULL;
137 }
138
139 *u = *upstream;
140
141 urrcf = upstream->type.round_robin;
142
143 size = sizeof(nxt_upstream_round_robin_t)
144 + urrcf->items * sizeof(nxt_upstream_round_robin_server_t);
145
146 urr = nxt_mp_alloc(mp, size);
147 if (nxt_slow_path(urr == NULL)) {
148 return NULL;
149 }
150
151 u->type.round_robin = urr;
152
153 n = urrcf->items;
154 urr->items = n;
155
156 for (i = 0; i < n; i++) {
157 urr->server[i] = urrcf->server[i];
158 }
159
160 return u;
161}
162
163
164static void
165nxt_upstream_round_robin_server_get(nxt_task_t *task, nxt_upstream_server_t *us)
166{
167 int32_t total;
168 uint32_t i, n;
169 nxt_upstream_round_robin_t *round_robin;
170 nxt_upstream_round_robin_server_t *s, *best;
171
172 best = NULL;
173 total = 0;
174
175 round_robin = us->upstream->type.round_robin;
176
177 s = round_robin->server;
178 n = round_robin->items;
179
180 for (i = 0; i < n; i++) {
181
182 s[i].current_weight += s[i].effective_weight;
183 total += s[i].effective_weight;
184
185 if (s[i].effective_weight < s[i].weight) {
186 s[i].effective_weight++;
187 }
188
189 if (best == NULL || s[i].current_weight > best->current_weight) {
190 best = &s[i];
191 }
192 }
193
180 if (best == NULL) {
194 if (best == NULL || total == 0) {
181 us->state->error(task, us);
182 return;
183 }
184
185 best->current_weight -= total;
186 us->sockaddr = best->sockaddr;
187 us->protocol = best->protocol;
188 us->server.round_robin = best;
189
190 us->state->ready(task, us);
191}
195 us->state->error(task, us);
196 return;
197 }
198
199 best->current_weight -= total;
200 us->sockaddr = best->sockaddr;
201 us->protocol = best->protocol;
202 us->server.round_robin = best;
203
204 us->state->ready(task, us);
205}