xref: /unit/test/test_upstreams_rr.py (revision 1596:b7e2d4d92624)
1import os
2import re
3
4from unit.applications.lang.python import TestApplicationPython
5from conftest import option
6
7
8class TestUpstreamsRR(TestApplicationPython):
9    prerequisites = {'modules': {'python': 'any'}}
10
11    def setup_method(self):
12        super().setup_method()
13
14        assert 'success' in self.conf(
15            {
16                "listeners": {
17                    "*:7080": {"pass": "upstreams/one"},
18                    "*:7090": {"pass": "upstreams/two"},
19                    "*:7081": {"pass": "routes/one"},
20                    "*:7082": {"pass": "routes/two"},
21                    "*:7083": {"pass": "routes/three"},
22                },
23                "upstreams": {
24                    "one": {
25                        "servers": {
26                            "127.0.0.1:7081": {},
27                            "127.0.0.1:7082": {},
28                        },
29                    },
30                    "two": {
31                        "servers": {
32                            "127.0.0.1:7081": {},
33                            "127.0.0.1:7082": {},
34                        },
35                    },
36                },
37                "routes": {
38                    "one": [{"action": {"return": 200}}],
39                    "two": [{"action": {"return": 201}}],
40                    "three": [{"action": {"return": 202}}],
41                },
42                "applications": {},
43            },
44        ), 'upstreams initial configuration'
45
46        self.cpu_count = os.cpu_count()
47
48    def get_resps(self, req=100, port=7080):
49        resps = [0]
50
51        for _ in range(req):
52            status = self.get(port=port)['status']
53            if 200 > status or status > 209:
54                continue
55
56            ups = status % 10
57            if ups > len(resps) - 1:
58                resps.extend([0] * (ups - len(resps) + 1))
59
60            resps[ups] += 1
61
62        return resps
63
64    def get_resps_sc(self, req=100, port=7080):
65        to_send = b"""GET / HTTP/1.1
66Host: localhost
67
68""" * (
69            req - 1
70        )
71
72        to_send += b"""GET / HTTP/1.1
73Host: localhost
74Connection: close
75
76"""
77
78        resp = self.http(to_send, raw_resp=True, raw=True, port=port)
79        status = re.findall(r'HTTP\/\d\.\d\s(\d\d\d)', resp)
80        status = list(filter(lambda x: x[:2] == '20', status))
81        ups = list(map(lambda x: int(x[-1]), status))
82
83        resps = [0] * (max(ups) + 1)
84        for i in range(len(ups)):
85            resps[ups[i]] += 1
86
87        return resps
88
89    def test_upstreams_rr_no_weight(self):
90        resps = self.get_resps()
91        assert sum(resps) == 100, 'no weight sum'
92        assert abs(resps[0] - resps[1]) <= self.cpu_count, 'no weight'
93
94        assert 'success' in self.conf_delete(
95            'upstreams/one/servers/127.0.0.1:7081'
96        ), 'no weight server remove'
97
98        resps = self.get_resps(req=50)
99        assert resps[1] == 50, 'no weight 2'
100
101        assert 'success' in self.conf(
102            {}, 'upstreams/one/servers/127.0.0.1:7081'
103        ), 'no weight server revert'
104
105        resps = self.get_resps()
106        assert sum(resps) == 100, 'no weight 3 sum'
107        assert abs(resps[0] - resps[1]) <= self.cpu_count, 'no weight 3'
108
109        assert 'success' in self.conf(
110            {}, 'upstreams/one/servers/127.0.0.1:7083'
111        ), 'no weight server new'
112
113        resps = self.get_resps()
114        assert sum(resps) == 100, 'no weight 4 sum'
115        assert max(resps) - min(resps) <= self.cpu_count, 'no weight 4'
116
117        resps = self.get_resps_sc(req=30)
118        assert resps[0] == 10, 'no weight 4 0'
119        assert resps[1] == 10, 'no weight 4 1'
120        assert resps[2] == 10, 'no weight 4 2'
121
122    def test_upstreams_rr_weight(self):
123        assert 'success' in self.conf(
124            {"weight": 3}, 'upstreams/one/servers/127.0.0.1:7081'
125        ), 'configure weight'
126
127        resps = self.get_resps_sc()
128        assert resps[0] == 75, 'weight 3 0'
129        assert resps[1] == 25, 'weight 3 1'
130
131        assert 'success' in self.conf_delete(
132            'upstreams/one/servers/127.0.0.1:7081/weight'
133        ), 'configure weight remove'
134        resps = self.get_resps_sc(req=10)
135        assert resps[0] == 5, 'weight 0 0'
136        assert resps[1] == 5, 'weight 0 1'
137
138        assert 'success' in self.conf(
139            '1', 'upstreams/one/servers/127.0.0.1:7081/weight'
140        ), 'configure weight 1'
141
142        resps = self.get_resps_sc()
143        assert resps[0] == 50, 'weight 1 0'
144        assert resps[1] == 50, 'weight 1 1'
145
146        assert 'success' in self.conf(
147            {
148                "127.0.0.1:7081": {"weight": 3},
149                "127.0.0.1:7083": {"weight": 2},
150            },
151            'upstreams/one/servers',
152        ), 'configure weight 2'
153
154        resps = self.get_resps_sc()
155        assert resps[0] == 60, 'weight 2 0'
156        assert resps[2] == 40, 'weight 2 1'
157
158    def test_upstreams_rr_weight_rational(self):
159        def set_weights(w1, w2):
160            assert 'success' in self.conf(
161                {
162                    "127.0.0.1:7081": {"weight": w1},
163                    "127.0.0.1:7082": {"weight": w2},
164                },
165                'upstreams/one/servers',
166            ), 'configure weights'
167
168        def check_reqs(w1, w2, reqs=10):
169            resps = self.get_resps_sc(req=reqs)
170            assert resps[0] == reqs * w1 / (w1 + w2), 'weight 1'
171            assert resps[1] == reqs * w2 / (w1 + w2), 'weight 2'
172
173        def check_weights(w1, w2):
174            set_weights(w1, w2)
175            check_reqs(w1, w2)
176
177        check_weights(0, 1)
178        check_weights(0, 999999.0123456)
179        check_weights(1, 9)
180        check_weights(100000, 900000)
181        check_weights(1, 0.25)
182        check_weights(1, 0.25)
183        check_weights(0.2, 0.8)
184        check_weights(1, 1.5)
185        check_weights(1e-3, 1e-3)
186        check_weights(1e-20, 1e-20)
187        check_weights(1e4, 1e4)
188        check_weights(1000000, 1000000)
189
190        set_weights(0.25, 0.25)
191        assert 'success' in self.conf_delete(
192            'upstreams/one/servers/127.0.0.1:7081/weight'
193        ), 'delete weight'
194        check_reqs(1, 0.25)
195
196        assert 'success' in self.conf(
197            {
198                "127.0.0.1:7081": {"weight": 0.1},
199                "127.0.0.1:7082": {"weight": 1},
200                "127.0.0.1:7083": {"weight": 0.9},
201            },
202            'upstreams/one/servers',
203        ), 'configure weights'
204        resps = self.get_resps_sc(req=20)
205        assert resps[0] == 1, 'weight 3 1'
206        assert resps[1] == 10, 'weight 3 2'
207        assert resps[2] == 9, 'weight 3 3'
208
209    def test_upstreams_rr_independent(self):
210        def sum_resps(*args):
211            sum = [0] * len(args[0])
212            for arg in args:
213                sum = [x + y for x, y in zip(sum, arg)]
214
215            return sum
216
217        resps = self.get_resps_sc(req=30, port=7090)
218        assert resps[0] == 15, 'dep two before 0'
219        assert resps[1] == 15, 'dep two before 1'
220
221        resps = self.get_resps_sc(req=30)
222        assert resps[0] == 15, 'dep one before 0'
223        assert resps[1] == 15, 'dep one before 1'
224
225        assert 'success' in self.conf(
226            '2', 'upstreams/two/servers/127.0.0.1:7081/weight'
227        ), 'configure dep weight'
228
229        resps = self.get_resps_sc(req=30, port=7090)
230        assert resps[0] == 20, 'dep two 0'
231        assert resps[1] == 10, 'dep two 1'
232
233        resps = self.get_resps_sc(req=30)
234        assert resps[0] == 15, 'dep one 0'
235        assert resps[1] == 15, 'dep one 1'
236
237        assert 'success' in self.conf(
238            '1', 'upstreams/two/servers/127.0.0.1:7081/weight'
239        ), 'configure dep weight 1'
240
241        r_one, r_two = [0, 0], [0, 0]
242        for _ in range(10):
243            r_one = sum_resps(r_one, self.get_resps(req=10))
244            r_two = sum_resps(r_two, self.get_resps(req=10, port=7090))
245
246        assert sum(r_one) == 100, 'dep one mix sum'
247        assert abs(r_one[0] - r_one[1]) <= self.cpu_count, 'dep one mix'
248        assert sum(r_two) == 100, 'dep two mix sum'
249        assert abs(r_two[0] - r_two[1]) <= self.cpu_count, 'dep two mix'
250
251    def test_upstreams_rr_delay(self):
252        assert 'success' in self.conf(
253            {
254                "listeners": {
255                    "*:7080": {"pass": "upstreams/one"},
256                    "*:7081": {"pass": "routes"},
257                    "*:7082": {"pass": "routes"},
258                },
259                "upstreams": {
260                    "one": {
261                        "servers": {
262                            "127.0.0.1:7081": {},
263                            "127.0.0.1:7082": {},
264                        },
265                    },
266                },
267                "routes": [
268                    {
269                        "match": {"destination": "*:7081"},
270                        "action": {"pass": "applications/delayed"},
271                    },
272                    {
273                        "match": {"destination": "*:7082"},
274                        "action": {"return": 201},
275                    },
276                ],
277                "applications": {
278                    "delayed": {
279                        "type": "python",
280                        "processes": {"spare": 0},
281                        "path": option.test_dir + "/python/delayed",
282                        "working_directory": option.test_dir
283                        + "/python/delayed",
284                        "module": "wsgi",
285                    }
286                },
287            },
288        ), 'upstreams initial configuration'
289
290        req = 50
291
292        socks = []
293        for i in range(req):
294            delay = 1 if i % 5 == 0 else 0
295            _, sock = self.get(
296                headers={
297                    'Host': 'localhost',
298                    'Content-Length': '0',
299                    'X-Delay': str(delay),
300                    'Connection': 'close',
301                },
302                start=True,
303                no_recv=True,
304            )
305            socks.append(sock)
306
307        resps = [0, 0]
308        for i in range(req):
309            resp = self.recvall(socks[i]).decode()
310            socks[i].close()
311
312            m = re.search(r'HTTP/1.1 20(\d)', resp)
313            assert m is not None, 'status'
314            resps[int(m.group(1))] += 1
315
316        assert sum(resps) == req, 'delay sum'
317        assert abs(resps[0] - resps[1]) <= self.cpu_count, 'delay'
318
319    def test_upstreams_rr_active_req(self):
320        conns = 5
321        socks = []
322        socks2 = []
323
324        for _ in range(conns):
325            _, sock = self.get(start=True, no_recv=True)
326            socks.append(sock)
327
328            _, sock2 = self.http(
329                b"""POST / HTTP/1.1
330Host: localhost
331Content-Length: 10
332Connection: close
333
334""",
335                start=True,
336                no_recv=True,
337                raw=True,
338            )
339            socks2.append(sock2)
340
341        # Send one more request and read response to make sure that previous
342        # requests had enough time to reach server.
343
344        assert self.get()['body'] == ''
345
346        assert 'success' in self.conf(
347            {"127.0.0.1:7083": {"weight": 2}}, 'upstreams/one/servers',
348        ), 'active req new server'
349        assert 'success' in self.conf_delete(
350            'upstreams/one/servers/127.0.0.1:7083'
351        ), 'active req server remove'
352        assert 'success' in self.conf_delete(
353            'listeners/*:7080'
354        ), 'delete listener'
355        assert 'success' in self.conf_delete(
356            'upstreams/one'
357        ), 'active req upstream remove'
358
359        for i in range(conns):
360            assert (
361                self.http(b'', sock=socks[i], raw=True)['body'] == ''
362            ), 'active req GET'
363
364            assert (
365                self.http(b"""0123456789""", sock=socks2[i], raw=True)['body']
366                == ''
367            ), 'active req POST'
368
369    def test_upstreams_rr_bad_server(self):
370        assert 'success' in self.conf(
371            {"weight": 1}, 'upstreams/one/servers/127.0.0.1:7084'
372        ), 'configure bad server'
373
374        resps = self.get_resps_sc(req=30)
375        assert resps[0] == 10, 'bad server 0'
376        assert resps[1] == 10, 'bad server 1'
377        assert sum(resps) == 20, 'bad server sum'
378
379    def test_upstreams_rr_pipeline(self):
380        resps = self.get_resps_sc()
381
382        assert resps[0] == 50, 'pipeline 0'
383        assert resps[1] == 50, 'pipeline 1'
384
385    def test_upstreams_rr_post(self):
386        resps = [0, 0]
387        for _ in range(50):
388            resps[self.get()['status'] % 10] += 1
389            resps[self.post(body='0123456789')['status'] % 10] += 1
390
391        assert sum(resps) == 100, 'post sum'
392        assert abs(resps[0] - resps[1]) <= self.cpu_count, 'post'
393
394    def test_upstreams_rr_unix(self):
395        addr_0 = self.temp_dir + '/sock_0'
396        addr_1 = self.temp_dir + '/sock_1'
397
398        assert 'success' in self.conf(
399            {
400                "*:7080": {"pass": "upstreams/one"},
401                "unix:" + addr_0: {"pass": "routes/one"},
402                "unix:" + addr_1: {"pass": "routes/two"},
403            },
404            'listeners',
405        ), 'configure listeners unix'
406
407        assert 'success' in self.conf(
408            {"unix:" + addr_0: {}, "unix:" + addr_1: {}},
409            'upstreams/one/servers',
410        ), 'configure servers unix'
411
412        resps = self.get_resps_sc()
413
414        assert resps[0] == 50, 'unix 0'
415        assert resps[1] == 50, 'unix 1'
416
417    def test_upstreams_rr_ipv6(self):
418        assert 'success' in self.conf(
419            {
420                "*:7080": {"pass": "upstreams/one"},
421                "[::1]:7081": {"pass": "routes/one"},
422                "[::1]:7082": {"pass": "routes/two"},
423            },
424            'listeners',
425        ), 'configure listeners ipv6'
426
427        assert 'success' in self.conf(
428            {"[::1]:7081": {}, "[::1]:7082": {}}, 'upstreams/one/servers'
429        ), 'configure servers ipv6'
430
431        resps = self.get_resps_sc()
432
433        assert resps[0] == 50, 'ipv6 0'
434        assert resps[1] == 50, 'ipv6 1'
435
436    def test_upstreams_rr_servers_empty(self):
437        assert 'success' in self.conf(
438            {}, 'upstreams/one/servers'
439        ), 'configure servers empty'
440        assert self.get()['status'] == 502, 'servers empty'
441
442        assert 'success' in self.conf(
443            {"127.0.0.1:7081": {"weight": 0}}, 'upstreams/one/servers'
444        ), 'configure servers empty one'
445        assert self.get()['status'] == 502, 'servers empty one'
446        assert 'success' in self.conf(
447            {
448                "127.0.0.1:7081": {"weight": 0},
449                "127.0.0.1:7082": {"weight": 0},
450            },
451            'upstreams/one/servers',
452        ), 'configure servers empty two'
453        assert self.get()['status'] == 502, 'servers empty two'
454
455    def test_upstreams_rr_invalid(self):
456        assert 'error' in self.conf({}, 'upstreams'), 'upstreams empty'
457        assert 'error' in self.conf(
458            {}, 'upstreams/one'
459        ), 'named upstreams empty'
460        assert 'error' in self.conf(
461            {}, 'upstreams/one/servers/127.0.0.1'
462        ), 'invalid address'
463        assert 'error' in self.conf(
464            {}, 'upstreams/one/servers/127.0.0.1:7081/blah'
465        ), 'invalid server option'
466
467        def check_weight(w):
468            assert 'error' in self.conf(
469                w, 'upstreams/one/servers/127.0.0.1:7081/weight'
470            ), 'invalid weight option'
471
472        check_weight({})
473        check_weight('-1')
474        check_weight('1.')
475        check_weight('1.1.')
476        check_weight('.')
477        check_weight('.01234567890123')
478        check_weight('1000001')
479        check_weight('2e6')
480