Deleted Added
1import os
2import re
3
4from unit.applications.lang.python import TestApplicationPython
5from conftest import option
6
7
8class TestUpstreamsRR(TestApplicationPython):
9 prerequisites = {'modules': {'python': 'any'}}
10
11 def setup_method(self):
12 super().setup_method()
13
14 assert 'success' in self.conf(
15 {
16 "listeners": {
17 "*:7080": {"pass": "upstreams/one"},
18 "*:7090": {"pass": "upstreams/two"},
19 "*:7081": {"pass": "routes/one"},
20 "*:7082": {"pass": "routes/two"},
21 "*:7083": {"pass": "routes/three"},
22 },
23 "upstreams": {
24 "one": {
25 "servers": {
26 "127.0.0.1:7081": {},
27 "127.0.0.1:7082": {},
28 },
29 },
30 "two": {
31 "servers": {
32 "127.0.0.1:7081": {},
33 "127.0.0.1:7082": {},
34 },
35 },
36 },
37 "routes": {
38 "one": [{"action": {"return": 200}}],
39 "two": [{"action": {"return": 201}}],
40 "three": [{"action": {"return": 202}}],
41 },
42 "applications": {},
43 },
44 ), 'upstreams initial configuration'
45
46 self.cpu_count = os.cpu_count()
47
48 def get_resps(self, req=100, port=7080):
49 resps = [0]
50
51 for _ in range(req):
52 status = self.get(port=port)['status']

--- 30 unchanged lines hidden (view full) ---

83 resps = [0] * (max(ups) + 1)
84 for i in range(len(ups)):
85 resps[ups[i]] += 1
86
87 return resps
88
89 def test_upstreams_rr_no_weight(self):
90 resps = self.get_resps()
91 assert sum(resps) == 100, 'no weight sum'
92 assert abs(resps[0] - resps[1]) <= self.cpu_count, 'no weight'
93
94 assert 'success' in self.conf_delete(
95 'upstreams/one/servers/127.0.0.1:7081'
96 ), 'no weight server remove'
97
98 resps = self.get_resps(req=50)
99 assert resps[1] == 50, 'no weight 2'
100
101 assert 'success' in self.conf(
102 {}, 'upstreams/one/servers/127.0.0.1:7081'
103 ), 'no weight server revert'
104
105 resps = self.get_resps()
106 assert sum(resps) == 100, 'no weight 3 sum'
107 assert abs(resps[0] - resps[1]) <= self.cpu_count, 'no weight 3'
108
109 assert 'success' in self.conf(
110 {}, 'upstreams/one/servers/127.0.0.1:7083'
111 ), 'no weight server new'
112
113 resps = self.get_resps()
114 assert sum(resps) == 100, 'no weight 4 sum'
115 assert max(resps) - min(resps) <= self.cpu_count, 'no weight 4'
116
117 resps = self.get_resps_sc(req=30)
118 assert resps[0] == 10, 'no weight 4 0'
119 assert resps[1] == 10, 'no weight 4 1'
120 assert resps[2] == 10, 'no weight 4 2'
121
122 def test_upstreams_rr_weight(self):
123 assert 'success' in self.conf(
124 {"weight": 3}, 'upstreams/one/servers/127.0.0.1:7081'
125 ), 'configure weight'
126
127 resps = self.get_resps_sc()
128 assert resps[0] == 75, 'weight 3 0'
129 assert resps[1] == 25, 'weight 3 1'
130
131 assert 'success' in self.conf_delete(
132 'upstreams/one/servers/127.0.0.1:7081/weight'
133 ), 'configure weight remove'
134 resps = self.get_resps_sc(req=10)
135 assert resps[0] == 5, 'weight 0 0'
136 assert resps[1] == 5, 'weight 0 1'
137
138 assert 'success' in self.conf(
139 '1', 'upstreams/one/servers/127.0.0.1:7081/weight'
140 ), 'configure weight 1'
141
142 resps = self.get_resps_sc()
143 assert resps[0] == 50, 'weight 1 0'
144 assert resps[1] == 50, 'weight 1 1'
145
146 assert 'success' in self.conf(
147 {
148 "127.0.0.1:7081": {"weight": 3},
149 "127.0.0.1:7083": {"weight": 2},
150 },
151 'upstreams/one/servers',
152 ), 'configure weight 2'
153
154 resps = self.get_resps_sc()
155 assert resps[0] == 60, 'weight 2 0'
156 assert resps[2] == 40, 'weight 2 1'
157
158 def test_upstreams_rr_weight_rational(self):
159 def set_weights(w1, w2):
160 assert 'success' in self.conf(
161 {
162 "127.0.0.1:7081": {"weight": w1},
163 "127.0.0.1:7082": {"weight": w2},
164 },
165 'upstreams/one/servers',
166 ), 'configure weights'
167
168 def check_reqs(w1, w2, reqs=10):
169 resps = self.get_resps_sc(req=reqs)
170 assert resps[0] == reqs * w1 / (w1 + w2), 'weight 1'
171 assert resps[1] == reqs * w2 / (w1 + w2), 'weight 2'
172
173 def check_weights(w1, w2):
174 set_weights(w1, w2)
175 check_reqs(w1, w2)
176
177 check_weights(0, 1)
178 check_weights(0, 999999.0123456)
179 check_weights(1, 9)
180 check_weights(100000, 900000)
181 check_weights(1, 0.25)
182 check_weights(1, 0.25)
183 check_weights(0.2, 0.8)
184 check_weights(1, 1.5)
185 check_weights(1e-3, 1e-3)
186 check_weights(1e-20, 1e-20)
187 check_weights(1e4, 1e4)
188 check_weights(1000000, 1000000)
189
190 set_weights(0.25, 0.25)
191 assert 'success' in self.conf_delete(
192 'upstreams/one/servers/127.0.0.1:7081/weight'
193 ), 'delete weight'
194 check_reqs(1, 0.25)
195
196 assert 'success' in self.conf(
197 {
198 "127.0.0.1:7081": {"weight": 0.1},
199 "127.0.0.1:7082": {"weight": 1},
200 "127.0.0.1:7083": {"weight": 0.9},
201 },
202 'upstreams/one/servers',
203 ), 'configure weights'
204 resps = self.get_resps_sc(req=20)
205 assert resps[0] == 1, 'weight 3 1'
206 assert resps[1] == 10, 'weight 3 2'
207 assert resps[2] == 9, 'weight 3 3'
208
209 def test_upstreams_rr_independent(self):
210 def sum_resps(*args):
211 sum = [0] * len(args[0])
212 for arg in args:
213 sum = [x + y for x, y in zip(sum, arg)]
214
215 return sum
216
217 resps = self.get_resps_sc(req=30, port=7090)
218 assert resps[0] == 15, 'dep two before 0'
219 assert resps[1] == 15, 'dep two before 1'
220
221 resps = self.get_resps_sc(req=30)
222 assert resps[0] == 15, 'dep one before 0'
223 assert resps[1] == 15, 'dep one before 1'
224
225 assert 'success' in self.conf(
226 '2', 'upstreams/two/servers/127.0.0.1:7081/weight'
227 ), 'configure dep weight'
228
229 resps = self.get_resps_sc(req=30, port=7090)
230 assert resps[0] == 20, 'dep two 0'
231 assert resps[1] == 10, 'dep two 1'
232
233 resps = self.get_resps_sc(req=30)
234 assert resps[0] == 15, 'dep one 0'
235 assert resps[1] == 15, 'dep one 1'
236
237 assert 'success' in self.conf(
238 '1', 'upstreams/two/servers/127.0.0.1:7081/weight'
239 ), 'configure dep weight 1'
240
241 r_one, r_two = [0, 0], [0, 0]
242 for _ in range(10):
243 r_one = sum_resps(r_one, self.get_resps(req=10))
244 r_two = sum_resps(r_two, self.get_resps(req=10, port=7090))
245
246 assert sum(r_one) == 100, 'dep one mix sum'
247 assert abs(r_one[0] - r_one[1]) <= self.cpu_count, 'dep one mix'
248 assert sum(r_two) == 100, 'dep two mix sum'
249 assert abs(r_two[0] - r_two[1]) <= self.cpu_count, 'dep two mix'
250
251 def test_upstreams_rr_delay(self):
252 assert 'success' in self.conf(
253 {
254 "listeners": {
255 "*:7080": {"pass": "upstreams/one"},
256 "*:7081": {"pass": "routes"},
257 "*:7082": {"pass": "routes"},
258 },
259 "upstreams": {
260 "one": {
261 "servers": {
262 "127.0.0.1:7081": {},
263 "127.0.0.1:7082": {},
264 },
265 },
266 },
267 "routes": [
268 {
269 "match": {"destination": "*:7081"},
270 "action": {"pass": "applications/delayed"},
271 },
272 {
273 "match": {"destination": "*:7082"},
274 "action": {"return": 201},
275 },
276 ],
277 "applications": {
278 "delayed": {
279 "type": "python",
280 "processes": {"spare": 0},
281 "path": option.test_dir + "/python/delayed",
282 "working_directory": option.test_dir
283 + "/python/delayed",
284 "module": "wsgi",
285 }
286 },
287 },
288 ), 'upstreams initial configuration'
289
290 req = 50
291
292 socks = []
293 for i in range(req):
294 delay = 1 if i % 5 == 0 else 0
295 _, sock = self.get(
296 headers={

--- 7 unchanged lines hidden (view full) ---

304 )
305 socks.append(sock)
306
307 resps = [0, 0]
308 for i in range(req):
309 resp = self.recvall(socks[i]).decode()
310 socks[i].close()
311
312 m = re.search(r'HTTP/1.1 20(\d)', resp)
313 assert m is not None, 'status'
314 resps[int(m.group(1))] += 1
315
316 assert sum(resps) == req, 'delay sum'
317 assert abs(resps[0] - resps[1]) <= self.cpu_count, 'delay'
318
319 def test_upstreams_rr_active_req(self):
320 conns = 5
321 socks = []
322 socks2 = []
323
324 for _ in range(conns):
325 _, sock = self.get(start=True, no_recv=True)

--- 10 unchanged lines hidden (view full) ---

336 no_recv=True,
337 raw=True,
338 )
339 socks2.append(sock2)
340
341 # Send one more request and read response to make sure that previous
342 # requests had enough time to reach server.
343
344 assert self.get()['body'] == ''
345
346 assert 'success' in self.conf(
347 {"127.0.0.1:7083": {"weight": 2}}, 'upstreams/one/servers',
348 ), 'active req new server'
349 assert 'success' in self.conf_delete(
350 'upstreams/one/servers/127.0.0.1:7083'
351 ), 'active req server remove'
352 assert 'success' in self.conf_delete(
353 'listeners/*:7080'
354 ), 'delete listener'
355 assert 'success' in self.conf_delete(
356 'upstreams/one'
357 ), 'active req upstream remove'
358
359 for i in range(conns):
360 assert (
361 self.http(b'', sock=socks[i], raw=True)['body'] == ''
362 ), 'active req GET'
363
364 assert (
365 self.http(b"""0123456789""", sock=socks2[i], raw=True)['body']
366 == ''
367 ), 'active req POST'
368
369 def test_upstreams_rr_bad_server(self):
370 assert 'success' in self.conf(
371 {"weight": 1}, 'upstreams/one/servers/127.0.0.1:7084'
372 ), 'configure bad server'
373
374 resps = self.get_resps_sc(req=30)
375 assert resps[0] == 10, 'bad server 0'
376 assert resps[1] == 10, 'bad server 1'
377 assert sum(resps) == 20, 'bad server sum'
378
379 def test_upstreams_rr_pipeline(self):
380 resps = self.get_resps_sc()
381
382 assert resps[0] == 50, 'pipeline 0'
383 assert resps[1] == 50, 'pipeline 1'
384
385 def test_upstreams_rr_post(self):
386 resps = [0, 0]
387 for _ in range(50):
388 resps[self.get()['status'] % 10] += 1
389 resps[self.post(body='0123456789')['status'] % 10] += 1
390
391 assert sum(resps) == 100, 'post sum'
392 assert abs(resps[0] - resps[1]) <= self.cpu_count, 'post'
393
394 def test_upstreams_rr_unix(self):
395 addr_0 = self.temp_dir + '/sock_0'
396 addr_1 = self.temp_dir + '/sock_1'
397
398 assert 'success' in self.conf(
399 {
400 "*:7080": {"pass": "upstreams/one"},
401 "unix:" + addr_0: {"pass": "routes/one"},
402 "unix:" + addr_1: {"pass": "routes/two"},
403 },
404 'listeners',
405 ), 'configure listeners unix'
406
407 assert 'success' in self.conf(
408 {"unix:" + addr_0: {}, "unix:" + addr_1: {}},
409 'upstreams/one/servers',
410 ), 'configure servers unix'
411
412 resps = self.get_resps_sc()
413
414 assert resps[0] == 50, 'unix 0'
415 assert resps[1] == 50, 'unix 1'
416
417 def test_upstreams_rr_ipv6(self):
418 assert 'success' in self.conf(
419 {
420 "*:7080": {"pass": "upstreams/one"},
421 "[::1]:7081": {"pass": "routes/one"},
422 "[::1]:7082": {"pass": "routes/two"},
423 },
424 'listeners',
425 ), 'configure listeners ipv6'
426
427 assert 'success' in self.conf(
428 {"[::1]:7081": {}, "[::1]:7082": {}}, 'upstreams/one/servers'
429 ), 'configure servers ipv6'
430
431 resps = self.get_resps_sc()
432
433 assert resps[0] == 50, 'ipv6 0'
434 assert resps[1] == 50, 'ipv6 1'
435
436 def test_upstreams_rr_servers_empty(self):
437 assert 'success' in self.conf(
438 {}, 'upstreams/one/servers'
439 ), 'configure servers empty'
440 assert self.get()['status'] == 502, 'servers empty'
441
442 assert 'success' in self.conf(
443 {"127.0.0.1:7081": {"weight": 0}}, 'upstreams/one/servers'
444 ), 'configure servers empty one'
445 assert self.get()['status'] == 502, 'servers empty one'
446 assert 'success' in self.conf(
447 {
448 "127.0.0.1:7081": {"weight": 0},
449 "127.0.0.1:7082": {"weight": 0},
450 },
451 'upstreams/one/servers',
452 ), 'configure servers empty two'
453 assert self.get()['status'] == 502, 'servers empty two'
454
455 def test_upstreams_rr_invalid(self):
456 assert 'error' in self.conf({}, 'upstreams'), 'upstreams empty'
457 assert 'error' in self.conf(
458 {}, 'upstreams/one'
459 ), 'named upstreams empty'
460 assert 'error' in self.conf(
461 {}, 'upstreams/one/servers/127.0.0.1'
462 ), 'invalid address'
463 assert 'error' in self.conf(
464 {}, 'upstreams/one/servers/127.0.0.1:7081/blah'
465 ), 'invalid server option'
466
467 def check_weight(w):
468 assert 'error' in self.conf(
469 w, 'upstreams/one/servers/127.0.0.1:7081/weight'
470 ), 'invalid weight option'
471
472 check_weight({})
473 check_weight('-1')
474 check_weight('1.')
475 check_weight('1.1.')
476 check_weight('.')
477 check_weight('.01234567890123')
478 check_weight('1000001')
479 check_weight('2e6')