test_upstreams_rr.py (1441:b2caecb224f7) test_upstreams_rr.py (1445:5a603827ec10)
1import os
2import re
3import unittest
4from unit.applications.lang.python import TestApplicationPython
5
6
7class TestUpstreamsRR(TestApplicationPython):
8 prerequisites = {'modules': ['python']}
9
10 def setUp(self):
11 super().setUp()
12
13 self.assertIn(
14 'success',
15 self.conf(
16 {
17 "listeners": {
18 "*:7080": {"pass": "upstreams/one"},
1import os
2import re
3import unittest
4from unit.applications.lang.python import TestApplicationPython
5
6
7class TestUpstreamsRR(TestApplicationPython):
8 prerequisites = {'modules': ['python']}
9
10 def setUp(self):
11 super().setUp()
12
13 self.assertIn(
14 'success',
15 self.conf(
16 {
17 "listeners": {
18 "*:7080": {"pass": "upstreams/one"},
19 "*:7081": {"pass": "applications/ups_0"},
20 "*:7082": {"pass": "applications/ups_1"},
21 "*:7083": {"pass": "applications/ups_2"},
22 "*:7090": {"pass": "upstreams/two"},
19 "*:7090": {"pass": "upstreams/two"},
20 "*:7081": {"pass": "routes/one"},
21 "*:7082": {"pass": "routes/two"},
22 "*:7083": {"pass": "routes/three"},
23 },
24 "upstreams": {
25 "one": {
26 "servers": {
27 "127.0.0.1:7081": {},
28 "127.0.0.1:7082": {},
29 },
30 },
31 "two": {
32 "servers": {
33 "127.0.0.1:7081": {},
34 "127.0.0.1:7082": {},
35 },
36 },
37 },
23 },
24 "upstreams": {
25 "one": {
26 "servers": {
27 "127.0.0.1:7081": {},
28 "127.0.0.1:7082": {},
29 },
30 },
31 "two": {
32 "servers": {
33 "127.0.0.1:7081": {},
34 "127.0.0.1:7082": {},
35 },
36 },
37 },
38 "applications": {
39 "ups_0": {
40 "type": "python",
41 "processes": {"spare": 0},
42 "path": self.current_dir + "/python/upstreams/0",
43 "working_directory": self.current_dir
44 + "/python/upstreams/0",
45 "module": "wsgi",
46 },
47 "ups_1": {
48 "type": "python",
49 "processes": {"spare": 0},
50 "path": self.current_dir + "/python/upstreams/1",
51 "working_directory": self.current_dir
52 + "/python/upstreams/1",
53 "module": "wsgi",
54 },
55 "ups_2": {
56 "type": "python",
57 "processes": {"spare": 0},
58 "path": self.current_dir + "/python/upstreams/2",
59 "working_directory": self.current_dir
60 + "/python/upstreams/2",
61 "module": "wsgi",
62 },
38 "routes": {
39 "one": [{"action": {"return": 200}}],
40 "two": [{"action": {"return": 201}}],
41 "three": [{"action": {"return": 202}}],
63 },
42 },
43 "applications": {},
64 },
65 ),
66 'upstreams initial configuration',
67 )
68
69 self.cpu_count = os.cpu_count()
70
71 def get_resps(self, req=100, port=7080):
72 resps = [0]
44 },
45 ),
46 'upstreams initial configuration',
47 )
48
49 self.cpu_count = os.cpu_count()
50
51 def get_resps(self, req=100, port=7080):
52 resps = [0]
53
73 for _ in range(req):
54 for _ in range(req):
74 headers = self.get(port=port)['headers']
75 if 'X-Upstream' in headers:
76 ups = int(headers['X-Upstream'])
55 status = self.get(port=port)['status']
56 if 200 > status or status > 209:
57 continue
77
58
78 if ups > len(resps) - 1:
79 resps.extend([0] * (ups - len(resps) + 1))
59 ups = status % 10
60 if ups > len(resps) - 1:
61 resps.extend([0] * (ups - len(resps) + 1))
80
62
81 resps[ups] += 1
63 resps[ups] += 1
82
83 return resps
84
85 def get_resps_sc(self, req=100, port=7080):
86 to_send = b"""GET / HTTP/1.1
87Host: localhost
88
89""" * (
90 req - 1
91 )
92
93 to_send += b"""GET / HTTP/1.1
94Host: localhost
95Connection: close
96
97"""
98
99 resp = self.http(to_send, raw_resp=True, raw=True, port=port)
64
65 return resps
66
67 def get_resps_sc(self, req=100, port=7080):
68 to_send = b"""GET / HTTP/1.1
69Host: localhost
70
71""" * (
72 req - 1
73 )
74
75 to_send += b"""GET / HTTP/1.1
76Host: localhost
77Connection: close
78
79"""
80
81 resp = self.http(to_send, raw_resp=True, raw=True, port=port)
100 ups = re.findall('X-Upstream: (\d+)', resp)
101 resps = [0] * (int(max(ups)) + 1)
82 status = re.findall(r'HTTP\/\d\.\d\s(\d\d\d)', resp)
83 status = list(filter(lambda x: x[:2] == '20', status))
84 ups = list(map(lambda x: int(x[-1]), status))
102
85
86 resps = [0] * (max(ups) + 1)
103 for i in range(len(ups)):
87 for i in range(len(ups)):
104 resps[int(ups[i])] += 1
88 resps[ups[i]] += 1
105
106 return resps
107
108 def test_upstreams_rr_no_weight(self):
109 resps = self.get_resps()
89
90 return resps
91
92 def test_upstreams_rr_no_weight(self):
93 resps = self.get_resps()
94 self.assertEqual(sum(resps), 100, 'no weight sum')
110 self.assertLessEqual(
111 abs(resps[0] - resps[1]), self.cpu_count, 'no weight'
112 )
113
114 self.assertIn(
115 'success',
116 self.conf_delete('upstreams/one/servers/127.0.0.1:7081'),
117 'no weight server remove',

--- 4 unchanged lines hidden (view full) ---

122
123 self.assertIn(
124 'success',
125 self.conf({}, 'upstreams/one/servers/127.0.0.1:7081'),
126 'no weight server revert',
127 )
128
129 resps = self.get_resps()
95 self.assertLessEqual(
96 abs(resps[0] - resps[1]), self.cpu_count, 'no weight'
97 )
98
99 self.assertIn(
100 'success',
101 self.conf_delete('upstreams/one/servers/127.0.0.1:7081'),
102 'no weight server remove',

--- 4 unchanged lines hidden (view full) ---

107
108 self.assertIn(
109 'success',
110 self.conf({}, 'upstreams/one/servers/127.0.0.1:7081'),
111 'no weight server revert',
112 )
113
114 resps = self.get_resps()
115 self.assertEqual(sum(resps), 100, 'no weight 3 sum')
130 self.assertLessEqual(
131 abs(resps[0] - resps[1]), self.cpu_count, 'no weight 3'
132 )
133
134 self.assertIn(
135 'success',
136 self.conf({}, 'upstreams/one/servers/127.0.0.1:7083'),
137 'no weight server new',
138 )
139
140 resps = self.get_resps()
116 self.assertLessEqual(
117 abs(resps[0] - resps[1]), self.cpu_count, 'no weight 3'
118 )
119
120 self.assertIn(
121 'success',
122 self.conf({}, 'upstreams/one/servers/127.0.0.1:7083'),
123 'no weight server new',
124 )
125
126 resps = self.get_resps()
127 self.assertEqual(sum(resps), 100, 'no weight 4 sum')
141 self.assertLessEqual(
142 max(resps) - min(resps), self.cpu_count, 'no weight 4'
143 )
144
145 resps = self.get_resps_sc(req=30)
146 self.assertEqual(resps[0], 10, 'no weight 4 0')
147 self.assertEqual(resps[1], 10, 'no weight 4 1')
148 self.assertEqual(resps[2], 10, 'no weight 4 2')

--- 141 unchanged lines hidden (view full) ---

290 'configure dep weight 1',
291 )
292
293 r_one, r_two = [0, 0], [0, 0]
294 for _ in range(10):
295 r_one = sum_resps(r_one, self.get_resps(req=10))
296 r_two = sum_resps(r_two, self.get_resps(req=10, port=7090))
297
128 self.assertLessEqual(
129 max(resps) - min(resps), self.cpu_count, 'no weight 4'
130 )
131
132 resps = self.get_resps_sc(req=30)
133 self.assertEqual(resps[0], 10, 'no weight 4 0')
134 self.assertEqual(resps[1], 10, 'no weight 4 1')
135 self.assertEqual(resps[2], 10, 'no weight 4 2')

--- 141 unchanged lines hidden (view full) ---

277 'configure dep weight 1',
278 )
279
280 r_one, r_two = [0, 0], [0, 0]
281 for _ in range(10):
282 r_one = sum_resps(r_one, self.get_resps(req=10))
283 r_two = sum_resps(r_two, self.get_resps(req=10, port=7090))
284
285
286 self.assertEqual(sum(r_one), 100, 'dep one mix sum')
298 self.assertLessEqual(
299 abs(r_one[0] - r_one[1]), self.cpu_count, 'dep one mix'
300 )
287 self.assertLessEqual(
288 abs(r_one[0] - r_one[1]), self.cpu_count, 'dep one mix'
289 )
290 self.assertEqual(sum(r_two), 100, 'dep two mix sum')
301 self.assertLessEqual(
302 abs(r_two[0] - r_two[1]), self.cpu_count, 'dep two mix'
303 )
304
305 def test_upstreams_rr_delay(self):
291 self.assertLessEqual(
292 abs(r_two[0] - r_two[1]), self.cpu_count, 'dep two mix'
293 )
294
295 def test_upstreams_rr_delay(self):
306 headers_delay_1 = {
307 'Connection': 'close',
308 'Host': 'localhost',
309 'Content-Length': '0',
310 'X-Delay': '1',
311 }
312 headers_no_delay = {
313 'Connection': 'close',
314 'Host': 'localhost',
315 'Content-Length': '0',
316 }
296 self.assertIn(
297 'success',
298 self.conf(
299 {
300 "listeners": {
301 "*:7080": {"pass": "upstreams/one"},
302 "*:7081": {"pass": "routes"},
303 "*:7082": {"pass": "routes"},
304 },
305 "upstreams": {
306 "one": {
307 "servers": {
308 "127.0.0.1:7081": {},
309 "127.0.0.1:7082": {},
310 },
311 },
312 },
313 "routes": [
314 {
315 "match": {"destination": "*:7081"},
316 "action": {"pass": "applications/delayed"},
317 },
318 {
319 "match": {"destination": "*:7082"},
320 "action": {"return": 201},
321 },
322 ],
323 "applications": {
324 "delayed": {
325 "type": "python",
326 "processes": {"spare": 0},
327 "path": self.current_dir + "/python/delayed",
328 "working_directory": self.current_dir
329 + "/python/delayed",
330 "module": "wsgi",
331 }
332 },
333 },
334 ),
335 'upstreams initial configuration',
336 )
317
318 req = 50
319
320 socks = []
321 for i in range(req):
337
338 req = 50
339
340 socks = []
341 for i in range(req):
322 headers = headers_delay_1 if i % 5 == 0 else headers_no_delay
342 delay = 1 if i % 5 == 0 else 0
323 _, sock = self.get(
343 _, sock = self.get(
324 headers=headers,
344 headers={
345 'Host': 'localhost',
346 'Content-Length': '0',
347 'X-Delay': str(delay),
348 'Connection': 'close',
349 },
325 start=True,
326 no_recv=True,
327 )
328 socks.append(sock)
329
330 resps = [0, 0]
331 for i in range(req):
332 resp = self.recvall(socks[i]).decode()
333 socks[i].close()
334
350 start=True,
351 no_recv=True,
352 )
353 socks.append(sock)
354
355 resps = [0, 0]
356 for i in range(req):
357 resp = self.recvall(socks[i]).decode()
358 socks[i].close()
359
335 m = re.search('X-Upstream: (\d+)', resp)
360 m = re.search('HTTP/1.1 20(\d)', resp)
361 self.assertIsNotNone(m, 'status')
336 resps[int(m.group(1))] += 1
337
362 resps[int(m.group(1))] += 1
363
338 self.assertLessEqual(
339 abs(resps[0] - resps[1]), self.cpu_count, 'dep two mix'
340 )
364 self.assertEqual(sum(resps), req, 'delay sum')
365 self.assertLessEqual(abs(resps[0] - resps[1]), self.cpu_count, 'delay')
341
342 def test_upstreams_rr_active_req(self):
343 conns = 5
344 socks = []
345 socks2 = []
346
347 for _ in range(conns):
348 _, sock = self.get(start=True, no_recv=True)

--- 10 unchanged lines hidden (view full) ---

359 no_recv=True,
360 raw=True,
361 )
362 socks2.append(sock2)
363
364 # Send one more request and read response to make sure that previous
365 # requests had enough time to reach server.
366
366
367 def test_upstreams_rr_active_req(self):
368 conns = 5
369 socks = []
370 socks2 = []
371
372 for _ in range(conns):
373 _, sock = self.get(start=True, no_recv=True)

--- 10 unchanged lines hidden (view full) ---

384 no_recv=True,
385 raw=True,
386 )
387 socks2.append(sock2)
388
389 # Send one more request and read response to make sure that previous
390 # requests had enough time to reach server.
391
367 self.assertEqual(self.get()['status'], 200)
392 self.assertEqual(self.get()['body'], '')
368
369 self.assertIn(
370 'success',
371 self.conf(
372 {"127.0.0.1:7083": {"weight": 2}}, 'upstreams/one/servers',
373 ),
374 'active req new server',
375 )

--- 7 unchanged lines hidden (view full) ---

383 )
384 self.assertIn(
385 'success',
386 self.conf_delete('upstreams/one'),
387 'active req upstream remove',
388 )
389
390 for i in range(conns):
393
394 self.assertIn(
395 'success',
396 self.conf(
397 {"127.0.0.1:7083": {"weight": 2}}, 'upstreams/one/servers',
398 ),
399 'active req new server',
400 )

--- 7 unchanged lines hidden (view full) ---

408 )
409 self.assertIn(
410 'success',
411 self.conf_delete('upstreams/one'),
412 'active req upstream remove',
413 )
414
415 for i in range(conns):
391 resp = self.recvall(socks[i]).decode()
392 socks[i].close()
416 self.assertEqual(
417 self.http(b'', sock=socks[i], raw=True)['body'],
418 '',
419 'active req GET',
420 )
393
421
394 self.assertRegex(resp, r'X-Upstream', 'active req GET')
422 self.assertEqual(
423 self.http(b"""0123456789""", sock=socks2[i], raw=True)['body'],
424 '',
425 'active req POST',
426 )
395
427
396 resp = self.http(b"""0123456789""", sock=socks2[i], raw=True)
397 self.assertEqual(resp['status'], 200, 'active req POST')
398
399 def test_upstreams_rr_bad_server(self):
400 self.assertIn(
401 'success',
402 self.conf({"weight": 1}, 'upstreams/one/servers/127.0.0.1:7084'),
403 'configure bad server',
404 )
405
406 resps = self.get_resps_sc(req=30)

--- 5 unchanged lines hidden (view full) ---

412 resps = self.get_resps_sc()
413
414 self.assertEqual(resps[0], 50, 'pipeline 0')
415 self.assertEqual(resps[1], 50, 'pipeline 1')
416
417 def test_upstreams_rr_post(self):
418 resps = [0, 0]
419 for _ in range(50):
428 def test_upstreams_rr_bad_server(self):
429 self.assertIn(
430 'success',
431 self.conf({"weight": 1}, 'upstreams/one/servers/127.0.0.1:7084'),
432 'configure bad server',
433 )
434
435 resps = self.get_resps_sc(req=30)

--- 5 unchanged lines hidden (view full) ---

441 resps = self.get_resps_sc()
442
443 self.assertEqual(resps[0], 50, 'pipeline 0')
444 self.assertEqual(resps[1], 50, 'pipeline 1')
445
446 def test_upstreams_rr_post(self):
447 resps = [0, 0]
448 for _ in range(50):
420 resps[
421 int(self.post(body='0123456789')['headers']['X-Upstream'])
422 ] += 1
423 resps[int(self.get()['headers']['X-Upstream'])] += 1
449 resps[self.get()['status'] % 10] += 1
450 resps[self.post(body='0123456789')['status'] % 10] += 1
424
451
425 self.assertLessEqual(
426 abs(resps[0] - resps[1]), self.cpu_count, 'post'
427 )
452 self.assertEqual(sum(resps), 100, 'post sum')
453 self.assertLessEqual(abs(resps[0] - resps[1]), self.cpu_count, 'post')
428
429 def test_upstreams_rr_unix(self):
430 addr_0 = self.testdir + '/sock_0'
431 addr_1 = self.testdir + '/sock_1'
432
433 self.assertIn(
434 'success',
435 self.conf(
436 {
437 "*:7080": {"pass": "upstreams/one"},
454
455 def test_upstreams_rr_unix(self):
456 addr_0 = self.testdir + '/sock_0'
457 addr_1 = self.testdir + '/sock_1'
458
459 self.assertIn(
460 'success',
461 self.conf(
462 {
463 "*:7080": {"pass": "upstreams/one"},
438 "unix:" + addr_0: {"pass": "applications/ups_0"},
439 "unix:" + addr_1: {"pass": "applications/ups_1"},
464 "unix:" + addr_0: {"pass": "routes/one"},
465 "unix:" + addr_1: {"pass": "routes/two"},
440 },
441 'listeners',
442 ),
443 'configure listeners unix',
444 )
445
446 self.assertIn(
447 'success',
448 self.conf(
466 },
467 'listeners',
468 ),
469 'configure listeners unix',
470 )
471
472 self.assertIn(
473 'success',
474 self.conf(
449 {"unix:" + addr_0: {}, "unix:" + addr_1: {},},
475 {"unix:" + addr_0: {}, "unix:" + addr_1: {}},
450 'upstreams/one/servers',
451 ),
452 'configure servers unix',
453 )
454
455 resps = self.get_resps_sc()
456
457 self.assertEqual(resps[0], 50, 'unix 0')
458 self.assertEqual(resps[1], 50, 'unix 1')
459
460 def test_upstreams_rr_ipv6(self):
461 self.assertIn(
462 'success',
463 self.conf(
464 {
465 "*:7080": {"pass": "upstreams/one"},
476 'upstreams/one/servers',
477 ),
478 'configure servers unix',
479 )
480
481 resps = self.get_resps_sc()
482
483 self.assertEqual(resps[0], 50, 'unix 0')
484 self.assertEqual(resps[1], 50, 'unix 1')
485
486 def test_upstreams_rr_ipv6(self):
487 self.assertIn(
488 'success',
489 self.conf(
490 {
491 "*:7080": {"pass": "upstreams/one"},
466 "[::1]:7081": {"pass": "applications/ups_0"},
467 "[::1]:7082": {"pass": "applications/ups_1"},
492 "[::1]:7081": {"pass": "routes/one"},
493 "[::1]:7082": {"pass": "routes/two"},
468 },
469 'listeners',
470 ),
471 'configure listeners ipv6',
472 )
473
474 self.assertIn(
475 'success',
476 self.conf(
494 },
495 'listeners',
496 ),
497 'configure listeners ipv6',
498 )
499
500 self.assertIn(
501 'success',
502 self.conf(
477 {"[::1]:7081": {}, "[::1]:7082": {},}, 'upstreams/one/servers'
503 {"[::1]:7081": {}, "[::1]:7082": {}}, 'upstreams/one/servers'
478 ),
479 'configure servers ipv6',
480 )
481
482 resps = self.get_resps_sc()
483
484 self.assertEqual(resps[0], 50, 'ipv6 0')
485 self.assertEqual(resps[1], 50, 'ipv6 1')

--- 66 unchanged lines hidden ---
504 ),
505 'configure servers ipv6',
506 )
507
508 resps = self.get_resps_sc()
509
510 self.assertEqual(resps[0], 50, 'ipv6 0')
511 self.assertEqual(resps[1], 50, 'ipv6 1')

--- 66 unchanged lines hidden ---