Deleted Added
1import os
2import re
3import unittest
4from unit.applications.lang.python import TestApplicationPython
5
6
7class TestUpstreamsRR(TestApplicationPython):
8 prerequisites = {'modules': ['python']}
9
10 def setUp(self):
11 super().setUp()
12
13 self.assertIn(
14 'success',
15 self.conf(
16 {
17 "listeners": {
18 "*:7080": {"pass": "upstreams/one"},
19 "*:7081": {"pass": "applications/ups_0"},
20 "*:7082": {"pass": "applications/ups_1"},
21 "*:7083": {"pass": "applications/ups_2"},
22 "*:7090": {"pass": "upstreams/two"},
23 },
24 "upstreams": {
25 "one": {
26 "servers": {
27 "127.0.0.1:7081": {},
28 "127.0.0.1:7082": {},
29 },
30 },
31 "two": {
32 "servers": {
33 "127.0.0.1:7081": {},
34 "127.0.0.1:7082": {},
35 },
36 },
37 },
38 "applications": {
39 "ups_0": {
40 "type": "python",
41 "processes": {"spare": 0},
42 "path": self.current_dir + "/python/upstreams/0",
43 "working_directory": self.current_dir
44 + "/python/upstreams/0",
45 "module": "wsgi",
46 },
47 "ups_1": {
48 "type": "python",
49 "processes": {"spare": 0},
50 "path": self.current_dir + "/python/upstreams/1",
51 "working_directory": self.current_dir
52 + "/python/upstreams/1",
53 "module": "wsgi",
54 },
55 "ups_2": {
56 "type": "python",
57 "processes": {"spare": 0},
58 "path": self.current_dir + "/python/upstreams/2",
59 "working_directory": self.current_dir
60 + "/python/upstreams/2",
61 "module": "wsgi",
62 },
63 },
64 },
65 ),
66 'upstreams initial configuration',
67 )
68
69 self.cpu_count = os.cpu_count()
70
71 def get_resps(self, req=100, port=7080):
72 resps = [0]
73 for _ in range(req):
74 headers = self.get(port=port)['headers']
75 if 'X-Upstream' in headers:
76 ups = int(headers['X-Upstream'])
77
78 if ups > len(resps) - 1:
79 resps.extend([0] * (ups - len(resps) + 1))
80
81 resps[ups] += 1
82
83 return resps
84
85 def get_resps_sc(self, req=100, port=7080):
86 to_send = b"""GET / HTTP/1.1
87Host: localhost
88
89""" * (
90 req - 1
91 )
92
93 to_send += b"""GET / HTTP/1.1
94Host: localhost
95Connection: close
96
97"""
98
99 resp = self.http(to_send, raw_resp=True, raw=True, port=port)
100 ups = re.findall('X-Upstream: (\d+)', resp)
101 resps = [0] * (int(max(ups)) + 1)
102
103 for i in range(len(ups)):
104 resps[int(ups[i])] += 1
105
106 return resps
107
108 def test_upstreams_rr_no_weight(self):
109 resps = self.get_resps()
110 self.assertLessEqual(
111 abs(resps[0] - resps[1]), self.cpu_count, 'no weight'
112 )
113
114 self.assertIn(
115 'success',
116 self.conf_delete('upstreams/one/servers/127.0.0.1:7081'),
117 'no weight server remove',

--- 4 unchanged lines hidden (view full) ---

122
123 self.assertIn(
124 'success',
125 self.conf({}, 'upstreams/one/servers/127.0.0.1:7081'),
126 'no weight server revert',
127 )
128
129 resps = self.get_resps()
130 self.assertLessEqual(
131 abs(resps[0] - resps[1]), self.cpu_count, 'no weight 3'
132 )
133
134 self.assertIn(
135 'success',
136 self.conf({}, 'upstreams/one/servers/127.0.0.1:7083'),
137 'no weight server new',
138 )
139
140 resps = self.get_resps()
141 self.assertLessEqual(
142 max(resps) - min(resps), self.cpu_count, 'no weight 4'
143 )
144
145 resps = self.get_resps_sc(req=30)
146 self.assertEqual(resps[0], 10, 'no weight 4 0')
147 self.assertEqual(resps[1], 10, 'no weight 4 1')
148 self.assertEqual(resps[2], 10, 'no weight 4 2')

--- 141 unchanged lines hidden (view full) ---

290 'configure dep weight 1',
291 )
292
293 r_one, r_two = [0, 0], [0, 0]
294 for _ in range(10):
295 r_one = sum_resps(r_one, self.get_resps(req=10))
296 r_two = sum_resps(r_two, self.get_resps(req=10, port=7090))
297
298 self.assertLessEqual(
299 abs(r_one[0] - r_one[1]), self.cpu_count, 'dep one mix'
300 )
301 self.assertLessEqual(
302 abs(r_two[0] - r_two[1]), self.cpu_count, 'dep two mix'
303 )
304
305 def test_upstreams_rr_delay(self):
306 headers_delay_1 = {
307 'Connection': 'close',
308 'Host': 'localhost',
309 'Content-Length': '0',
310 'X-Delay': '1',
311 }
312 headers_no_delay = {
313 'Connection': 'close',
314 'Host': 'localhost',
315 'Content-Length': '0',
316 }
317
318 req = 50
319
320 socks = []
321 for i in range(req):
322 headers = headers_delay_1 if i % 5 == 0 else headers_no_delay
323 _, sock = self.get(
324 headers=headers,
325 start=True,
326 no_recv=True,
327 )
328 socks.append(sock)
329
330 resps = [0, 0]
331 for i in range(req):
332 resp = self.recvall(socks[i]).decode()
333 socks[i].close()
334
335 m = re.search('X-Upstream: (\d+)', resp)
336 resps[int(m.group(1))] += 1
337
338 self.assertLessEqual(
339 abs(resps[0] - resps[1]), self.cpu_count, 'dep two mix'
340 )
341
342 def test_upstreams_rr_active_req(self):
343 conns = 5
344 socks = []
345 socks2 = []
346
347 for _ in range(conns):
348 _, sock = self.get(start=True, no_recv=True)

--- 10 unchanged lines hidden (view full) ---

359 no_recv=True,
360 raw=True,
361 )
362 socks2.append(sock2)
363
364 # Send one more request and read response to make sure that previous
365 # requests had enough time to reach server.
366
367 self.assertEqual(self.get()['status'], 200)
368
369 self.assertIn(
370 'success',
371 self.conf(
372 {"127.0.0.1:7083": {"weight": 2}}, 'upstreams/one/servers',
373 ),
374 'active req new server',
375 )

--- 7 unchanged lines hidden (view full) ---

383 )
384 self.assertIn(
385 'success',
386 self.conf_delete('upstreams/one'),
387 'active req upstream remove',
388 )
389
390 for i in range(conns):
391 resp = self.recvall(socks[i]).decode()
392 socks[i].close()
393
394 self.assertRegex(resp, r'X-Upstream', 'active req GET')
395
396 resp = self.http(b"""0123456789""", sock=socks2[i], raw=True)
397 self.assertEqual(resp['status'], 200, 'active req POST')
398
399 def test_upstreams_rr_bad_server(self):
400 self.assertIn(
401 'success',
402 self.conf({"weight": 1}, 'upstreams/one/servers/127.0.0.1:7084'),
403 'configure bad server',
404 )
405
406 resps = self.get_resps_sc(req=30)

--- 5 unchanged lines hidden (view full) ---

412 resps = self.get_resps_sc()
413
414 self.assertEqual(resps[0], 50, 'pipeline 0')
415 self.assertEqual(resps[1], 50, 'pipeline 1')
416
417 def test_upstreams_rr_post(self):
418 resps = [0, 0]
419 for _ in range(50):
420 resps[
421 int(self.post(body='0123456789')['headers']['X-Upstream'])
422 ] += 1
423 resps[int(self.get()['headers']['X-Upstream'])] += 1
424
425 self.assertLessEqual(
426 abs(resps[0] - resps[1]), self.cpu_count, 'post'
427 )
428
429 def test_upstreams_rr_unix(self):
430 addr_0 = self.testdir + '/sock_0'
431 addr_1 = self.testdir + '/sock_1'
432
433 self.assertIn(
434 'success',
435 self.conf(
436 {
437 "*:7080": {"pass": "upstreams/one"},
438 "unix:" + addr_0: {"pass": "applications/ups_0"},
439 "unix:" + addr_1: {"pass": "applications/ups_1"},
440 },
441 'listeners',
442 ),
443 'configure listeners unix',
444 )
445
446 self.assertIn(
447 'success',
448 self.conf(
449 {"unix:" + addr_0: {}, "unix:" + addr_1: {},},
450 'upstreams/one/servers',
451 ),
452 'configure servers unix',
453 )
454
455 resps = self.get_resps_sc()
456
457 self.assertEqual(resps[0], 50, 'unix 0')
458 self.assertEqual(resps[1], 50, 'unix 1')
459
460 def test_upstreams_rr_ipv6(self):
461 self.assertIn(
462 'success',
463 self.conf(
464 {
465 "*:7080": {"pass": "upstreams/one"},
466 "[::1]:7081": {"pass": "applications/ups_0"},
467 "[::1]:7082": {"pass": "applications/ups_1"},
468 },
469 'listeners',
470 ),
471 'configure listeners ipv6',
472 )
473
474 self.assertIn(
475 'success',
476 self.conf(
477 {"[::1]:7081": {}, "[::1]:7082": {},}, 'upstreams/one/servers'
478 ),
479 'configure servers ipv6',
480 )
481
482 resps = self.get_resps_sc()
483
484 self.assertEqual(resps[0], 50, 'ipv6 0')
485 self.assertEqual(resps[1], 50, 'ipv6 1')

--- 66 unchanged lines hidden ---