xref: /unit/test/test_upstreams_rr.py (revision 2616:ab2896c980ab)
1import os
2import re
3
4import pytest
5
6from unit.applications.lang.python import ApplicationPython
7from unit.option import option
8
9prerequisites = {'modules': {'python': 'any'}}
10
11client = ApplicationPython()
12
13
14@pytest.fixture(autouse=True)
15def setup_method_fixture():
16    assert 'success' in client.conf(
17        {
18            "listeners": {
19                "*:8080": {"pass": "upstreams/one"},
20                "*:8090": {"pass": "upstreams/two"},
21                "*:8081": {"pass": "routes/one"},
22                "*:8082": {"pass": "routes/two"},
23                "*:8083": {"pass": "routes/three"},
24            },
25            "upstreams": {
26                "one": {
27                    "servers": {
28                        "127.0.0.1:8081": {},
29                        "127.0.0.1:8082": {},
30                    },
31                },
32                "two": {
33                    "servers": {
34                        "127.0.0.1:8081": {},
35                        "127.0.0.1:8082": {},
36                    },
37                },
38            },
39            "routes": {
40                "one": [{"action": {"return": 200}}],
41                "two": [{"action": {"return": 201}}],
42                "three": [{"action": {"return": 202}}],
43            },
44            "applications": {},
45        },
46    ), 'upstreams initial configuration'
47
48    client.cpu_count = os.cpu_count()
49
50
51def get_resps(req=100, port=8080):
52    resps = [0]
53
54    for _ in range(req):
55        status = client.get(port=port)['status']
56        if 200 > status or status > 209:
57            continue
58
59        ups = status % 10
60        if ups > len(resps) - 1:
61            resps.extend([0] * (ups - len(resps) + 1))
62
63        resps[ups] += 1
64
65    return resps
66
67
68def get_resps_sc(req=100, port=8080):
69    to_send = b"""GET / HTTP/1.1
70Host: localhost
71
72""" * (
73        req - 1
74    )
75
76    to_send += b"""GET / HTTP/1.1
77Host: localhost
78Connection: close
79
80"""
81
82    resp = client.http(to_send, raw_resp=True, raw=True, port=port)
83    status = re.findall(r'HTTP\/\d\.\d\s(\d\d\d)', resp)
84    status = list(filter(lambda x: x[:2] == '20', status))
85    ups = list(map(lambda x: int(x[-1]), status))
86
87    resps = [0] * (max(ups) + 1)
88    for _, up in enumerate(ups):
89        resps[up] += 1
90
91    return resps
92
93
94def test_upstreams_rr_no_weight():
95    resps = get_resps()
96    assert sum(resps) == 100, 'no weight sum'
97    assert abs(resps[0] - resps[1]) <= client.cpu_count, 'no weight'
98
99    assert 'success' in client.conf_delete(
100        'upstreams/one/servers/127.0.0.1:8081'
101    ), 'no weight server remove'
102
103    resps = get_resps(req=50)
104    assert resps[1] == 50, 'no weight 2'
105
106    assert 'success' in client.conf(
107        {}, 'upstreams/one/servers/127.0.0.1:8081'
108    ), 'no weight server revert'
109
110    resps = get_resps()
111    assert sum(resps) == 100, 'no weight 3 sum'
112    assert abs(resps[0] - resps[1]) <= client.cpu_count, 'no weight 3'
113
114    assert 'success' in client.conf(
115        {}, 'upstreams/one/servers/127.0.0.1:8083'
116    ), 'no weight server new'
117
118    resps = get_resps()
119    assert sum(resps) == 100, 'no weight 4 sum'
120    assert max(resps) - min(resps) <= client.cpu_count, 'no weight 4'
121
122    resps = get_resps_sc(req=30)
123    assert resps[0] == 10, 'no weight 4 0'
124    assert resps[1] == 10, 'no weight 4 1'
125    assert resps[2] == 10, 'no weight 4 2'
126
127
128def test_upstreams_rr_weight():
129    assert 'success' in client.conf(
130        {"weight": 3}, 'upstreams/one/servers/127.0.0.1:8081'
131    ), 'configure weight'
132
133    resps = get_resps_sc()
134    assert resps[0] == 75, 'weight 3 0'
135    assert resps[1] == 25, 'weight 3 1'
136
137    assert 'success' in client.conf_delete(
138        'upstreams/one/servers/127.0.0.1:8081/weight'
139    ), 'configure weight remove'
140    resps = get_resps_sc(req=10)
141    assert resps[0] == 5, 'weight 0 0'
142    assert resps[1] == 5, 'weight 0 1'
143
144    assert 'success' in client.conf(
145        '1', 'upstreams/one/servers/127.0.0.1:8081/weight'
146    ), 'configure weight 1'
147
148    resps = get_resps_sc()
149    assert resps[0] == 50, 'weight 1 0'
150    assert resps[1] == 50, 'weight 1 1'
151
152    assert 'success' in client.conf(
153        {
154            "127.0.0.1:8081": {"weight": 3},
155            "127.0.0.1:8083": {"weight": 2},
156        },
157        'upstreams/one/servers',
158    ), 'configure weight 2'
159
160    resps = get_resps_sc()
161    assert resps[0] == 60, 'weight 2 0'
162    assert resps[2] == 40, 'weight 2 1'
163
164
165def test_upstreams_rr_weight_rational():
166    def set_weights(w1, w2):
167        assert 'success' in client.conf(
168            {
169                "127.0.0.1:8081": {"weight": w1},
170                "127.0.0.1:8082": {"weight": w2},
171            },
172            'upstreams/one/servers',
173        ), 'configure weights'
174
175    def check_reqs(w1, w2, reqs=10):
176        resps = get_resps_sc(req=reqs)
177        assert resps[0] == reqs * w1 / (w1 + w2), 'weight 1'
178        assert resps[1] == reqs * w2 / (w1 + w2), 'weight 2'
179
180    def check_weights(w1, w2):
181        set_weights(w1, w2)
182        check_reqs(w1, w2)
183
184    check_weights(0, 1)
185    check_weights(0, 999999.0123456)
186    check_weights(1, 9)
187    check_weights(100000, 900000)
188    check_weights(1, 0.25)
189    check_weights(1, 0.25)
190    check_weights(0.2, 0.8)
191    check_weights(1, 1.5)
192    check_weights(1e-3, 1e-3)
193    check_weights(1e-20, 1e-20)
194    check_weights(1e4, 1e4)
195    check_weights(1000000, 1000000)
196
197    set_weights(0.25, 0.25)
198    assert 'success' in client.conf_delete(
199        'upstreams/one/servers/127.0.0.1:8081/weight'
200    ), 'delete weight'
201    check_reqs(1, 0.25)
202
203    assert 'success' in client.conf(
204        {
205            "127.0.0.1:8081": {"weight": 0.1},
206            "127.0.0.1:8082": {"weight": 1},
207            "127.0.0.1:8083": {"weight": 0.9},
208        },
209        'upstreams/one/servers',
210    ), 'configure weights'
211    resps = get_resps_sc(req=20)
212    assert resps[0] == 1, 'weight 3 1'
213    assert resps[1] == 10, 'weight 3 2'
214    assert resps[2] == 9, 'weight 3 3'
215
216
217def test_upstreams_rr_independent():
218    def sum_resps(*args):
219        sum_r = [0] * len(args[0])
220        for arg in args:
221            sum_r = [x + y for x, y in zip(sum_r, arg)]
222
223        return sum_r
224
225    resps = get_resps_sc(req=30, port=8090)
226    assert resps[0] == 15, 'dep two before 0'
227    assert resps[1] == 15, 'dep two before 1'
228
229    resps = get_resps_sc(req=30)
230    assert resps[0] == 15, 'dep one before 0'
231    assert resps[1] == 15, 'dep one before 1'
232
233    assert 'success' in client.conf(
234        '2', 'upstreams/two/servers/127.0.0.1:8081/weight'
235    ), 'configure dep weight'
236
237    resps = get_resps_sc(req=30, port=8090)
238    assert resps[0] == 20, 'dep two 0'
239    assert resps[1] == 10, 'dep two 1'
240
241    resps = get_resps_sc(req=30)
242    assert resps[0] == 15, 'dep one 0'
243    assert resps[1] == 15, 'dep one 1'
244
245    assert 'success' in client.conf(
246        '1', 'upstreams/two/servers/127.0.0.1:8081/weight'
247    ), 'configure dep weight 1'
248
249    r_one, r_two = [0, 0], [0, 0]
250    for _ in range(10):
251        r_one = sum_resps(r_one, get_resps(req=10))
252        r_two = sum_resps(r_two, get_resps(req=10, port=8090))
253
254    assert sum(r_one) == 100, 'dep one mix sum'
255    assert abs(r_one[0] - r_one[1]) <= client.cpu_count, 'dep one mix'
256    assert sum(r_two) == 100, 'dep two mix sum'
257    assert abs(r_two[0] - r_two[1]) <= client.cpu_count, 'dep two mix'
258
259
260def test_upstreams_rr_delay():
261    delayed_dir = f'{option.test_dir}/python/delayed'
262    assert 'success' in client.conf(
263        {
264            "listeners": {
265                "*:8080": {"pass": "upstreams/one"},
266                "*:8081": {"pass": "routes"},
267                "*:8082": {"pass": "routes"},
268            },
269            "upstreams": {
270                "one": {
271                    "servers": {
272                        "127.0.0.1:8081": {},
273                        "127.0.0.1:8082": {},
274                    },
275                },
276            },
277            "routes": [
278                {
279                    "match": {"destination": "*:8081"},
280                    "action": {"pass": "applications/delayed"},
281                },
282                {
283                    "match": {"destination": "*:8082"},
284                    "action": {"return": 201},
285                },
286            ],
287            "applications": {
288                "delayed": {
289                    "type": client.get_application_type(),
290                    "processes": {"spare": 0},
291                    "path": delayed_dir,
292                    "working_directory": delayed_dir,
293                    "module": "wsgi",
294                }
295            },
296        },
297    ), 'upstreams initial configuration'
298
299    req = 50
300
301    socks = []
302    for i in range(req):
303        delay = 1 if i % 5 == 0 else 0
304        sock = client.get(
305            headers={
306                'Host': 'localhost',
307                'Content-Length': '0',
308                'X-Delay': str(delay),
309                'Connection': 'close',
310            },
311            no_recv=True,
312        )
313        socks.append(sock)
314
315    resps = [0, 0]
316    for i in range(req):
317        resp = client.recvall(socks[i]).decode()
318        socks[i].close()
319
320        m = re.search(r'HTTP/1.1 20(\d)', resp)
321        assert m is not None, 'status'
322        resps[int(m.group(1))] += 1
323
324    assert sum(resps) == req, 'delay sum'
325    assert abs(resps[0] - resps[1]) <= client.cpu_count, 'delay'
326
327
328def test_upstreams_rr_active_req():
329    conns = 5
330    socks = []
331    socks2 = []
332
333    for _ in range(conns):
334        sock = client.get(no_recv=True)
335        socks.append(sock)
336
337        sock2 = client.http(
338            b"""POST / HTTP/1.1
339Host: localhost
340Content-Length: 10
341Connection: close
342
343""",
344            no_recv=True,
345            raw=True,
346        )
347        socks2.append(sock2)
348
349    # Send one more request and read response to make sure that previous
350    # requests had enough time to reach server.
351
352    assert client.get()['body'] == ''
353
354    assert 'success' in client.conf(
355        {"127.0.0.1:8083": {"weight": 2}},
356        'upstreams/one/servers',
357    ), 'active req new server'
358    assert 'success' in client.conf_delete(
359        'upstreams/one/servers/127.0.0.1:8083'
360    ), 'active req server remove'
361    assert 'success' in client.conf_delete(
362        'listeners/*:8080'
363    ), 'delete listener'
364    assert 'success' in client.conf_delete(
365        'upstreams/one'
366    ), 'active req upstream remove'
367
368    for i in range(conns):
369        assert (
370            client.http(b'', sock=socks[i], raw=True)['body'] == ''
371        ), 'active req GET'
372
373        assert (
374            client.http(b"""0123456789""", sock=socks2[i], raw=True)['body']
375            == ''
376        ), 'active req POST'
377
378
379def test_upstreams_rr_bad_server():
380    assert 'success' in client.conf(
381        {"weight": 1}, 'upstreams/one/servers/127.0.0.1:8084'
382    ), 'configure bad server'
383
384    resps = get_resps_sc(req=30)
385    assert resps[0] == 10, 'bad server 0'
386    assert resps[1] == 10, 'bad server 1'
387    assert sum(resps) == 20, 'bad server sum'
388
389
390def test_upstreams_rr_pipeline():
391    resps = get_resps_sc()
392
393    assert resps[0] == 50, 'pipeline 0'
394    assert resps[1] == 50, 'pipeline 1'
395
396
397def test_upstreams_rr_post():
398    resps = [0, 0]
399    for _ in range(50):
400        resps[client.get()['status'] % 10] += 1
401        resps[client.post(body='0123456789')['status'] % 10] += 1
402
403    assert sum(resps) == 100, 'post sum'
404    assert abs(resps[0] - resps[1]) <= client.cpu_count, 'post'
405
406
407def test_upstreams_rr_unix(temp_dir):
408    addr_0 = f'{temp_dir}/sock_0'
409    addr_1 = f'{temp_dir}/sock_1'
410
411    assert 'success' in client.conf(
412        {
413            "*:8080": {"pass": "upstreams/one"},
414            f"unix:{addr_0}": {"pass": "routes/one"},
415            f"unix:{addr_1}": {"pass": "routes/two"},
416        },
417        'listeners',
418    ), 'configure listeners unix'
419
420    assert 'success' in client.conf(
421        {f"unix:{addr_0}": {}, f"unix:{addr_1}": {}},
422        'upstreams/one/servers',
423    ), 'configure servers unix'
424
425    resps = get_resps_sc()
426
427    assert resps[0] == 50, 'unix 0'
428    assert resps[1] == 50, 'unix 1'
429
430
431def test_upstreams_rr_ipv6():
432    assert 'success' in client.conf(
433        {
434            "*:8080": {"pass": "upstreams/one"},
435            "[::1]:8081": {"pass": "routes/one"},
436            "[::1]:8082": {"pass": "routes/two"},
437        },
438        'listeners',
439    ), 'configure listeners ipv6'
440
441    assert 'success' in client.conf(
442        {"[::1]:8081": {}, "[::1]:8082": {}}, 'upstreams/one/servers'
443    ), 'configure servers ipv6'
444
445    resps = get_resps_sc()
446
447    assert resps[0] == 50, 'ipv6 0'
448    assert resps[1] == 50, 'ipv6 1'
449
450
451def test_upstreams_rr_servers_empty():
452    assert 'success' in client.conf(
453        {}, 'upstreams/one/servers'
454    ), 'configure servers empty'
455    assert client.get()['status'] == 502, 'servers empty'
456
457    assert 'success' in client.conf(
458        {"127.0.0.1:8081": {"weight": 0}}, 'upstreams/one/servers'
459    ), 'configure servers empty one'
460    assert client.get()['status'] == 502, 'servers empty one'
461    assert 'success' in client.conf(
462        {
463            "127.0.0.1:8081": {"weight": 0},
464            "127.0.0.1:8082": {"weight": 0},
465        },
466        'upstreams/one/servers',
467    ), 'configure servers empty two'
468    assert client.get()['status'] == 502, 'servers empty two'
469
470
471def test_upstreams_rr_invalid():
472    assert 'error' in client.conf({}, 'upstreams'), 'upstreams empty'
473    assert 'error' in client.conf({}, 'upstreams/one'), 'named upstreams empty'
474    assert 'error' in client.conf(
475        {}, 'upstreams/one/servers/127.0.0.1'
476    ), 'invalid address'
477    assert 'error' in client.conf(
478        {}, 'upstreams/one/servers/127.0.0.1:8081/blah'
479    ), 'invalid server option'
480
481    def check_weight(w):
482        assert 'error' in client.conf(
483            w, 'upstreams/one/servers/127.0.0.1:8081/weight'
484        ), 'invalid weight option'
485
486    check_weight({})
487    check_weight('-1')
488    check_weight('1.')
489    check_weight('1.1.')
490    check_weight('.')
491    check_weight('.01234567890123')
492    check_weight('1000001')
493    check_weight('2e6')
494