xref: /unit/test/test_upstreams_rr.py (revision 2055:84cb1ec94bae)
1import os
2import re
3
4from unit.applications.lang.python import TestApplicationPython
5from unit.option import option
6
7
8class TestUpstreamsRR(TestApplicationPython):
9    prerequisites = {'modules': {'python': 'any'}}
10
11    def setup_method(self):
12        assert 'success' in self.conf(
13            {
14                "listeners": {
15                    "*:7080": {"pass": "upstreams/one"},
16                    "*:7090": {"pass": "upstreams/two"},
17                    "*:7081": {"pass": "routes/one"},
18                    "*:7082": {"pass": "routes/two"},
19                    "*:7083": {"pass": "routes/three"},
20                },
21                "upstreams": {
22                    "one": {
23                        "servers": {
24                            "127.0.0.1:7081": {},
25                            "127.0.0.1:7082": {},
26                        },
27                    },
28                    "two": {
29                        "servers": {
30                            "127.0.0.1:7081": {},
31                            "127.0.0.1:7082": {},
32                        },
33                    },
34                },
35                "routes": {
36                    "one": [{"action": {"return": 200}}],
37                    "two": [{"action": {"return": 201}}],
38                    "three": [{"action": {"return": 202}}],
39                },
40                "applications": {},
41            },
42        ), 'upstreams initial configuration'
43
44        self.cpu_count = os.cpu_count()
45
46    def get_resps(self, req=100, port=7080):
47        resps = [0]
48
49        for _ in range(req):
50            status = self.get(port=port)['status']
51            if 200 > status or status > 209:
52                continue
53
54            ups = status % 10
55            if ups > len(resps) - 1:
56                resps.extend([0] * (ups - len(resps) + 1))
57
58            resps[ups] += 1
59
60        return resps
61
62    def get_resps_sc(self, req=100, port=7080):
63        to_send = b"""GET / HTTP/1.1
64Host: localhost
65
66""" * (
67            req - 1
68        )
69
70        to_send += b"""GET / HTTP/1.1
71Host: localhost
72Connection: close
73
74"""
75
76        resp = self.http(to_send, raw_resp=True, raw=True, port=port)
77        status = re.findall(r'HTTP\/\d\.\d\s(\d\d\d)', resp)
78        status = list(filter(lambda x: x[:2] == '20', status))
79        ups = list(map(lambda x: int(x[-1]), status))
80
81        resps = [0] * (max(ups) + 1)
82        for i in range(len(ups)):
83            resps[ups[i]] += 1
84
85        return resps
86
87    def test_upstreams_rr_no_weight(self):
88        resps = self.get_resps()
89        assert sum(resps) == 100, 'no weight sum'
90        assert abs(resps[0] - resps[1]) <= self.cpu_count, 'no weight'
91
92        assert 'success' in self.conf_delete(
93            'upstreams/one/servers/127.0.0.1:7081'
94        ), 'no weight server remove'
95
96        resps = self.get_resps(req=50)
97        assert resps[1] == 50, 'no weight 2'
98
99        assert 'success' in self.conf(
100            {}, 'upstreams/one/servers/127.0.0.1:7081'
101        ), 'no weight server revert'
102
103        resps = self.get_resps()
104        assert sum(resps) == 100, 'no weight 3 sum'
105        assert abs(resps[0] - resps[1]) <= self.cpu_count, 'no weight 3'
106
107        assert 'success' in self.conf(
108            {}, 'upstreams/one/servers/127.0.0.1:7083'
109        ), 'no weight server new'
110
111        resps = self.get_resps()
112        assert sum(resps) == 100, 'no weight 4 sum'
113        assert max(resps) - min(resps) <= self.cpu_count, 'no weight 4'
114
115        resps = self.get_resps_sc(req=30)
116        assert resps[0] == 10, 'no weight 4 0'
117        assert resps[1] == 10, 'no weight 4 1'
118        assert resps[2] == 10, 'no weight 4 2'
119
120    def test_upstreams_rr_weight(self):
121        assert 'success' in self.conf(
122            {"weight": 3}, 'upstreams/one/servers/127.0.0.1:7081'
123        ), 'configure weight'
124
125        resps = self.get_resps_sc()
126        assert resps[0] == 75, 'weight 3 0'
127        assert resps[1] == 25, 'weight 3 1'
128
129        assert 'success' in self.conf_delete(
130            'upstreams/one/servers/127.0.0.1:7081/weight'
131        ), 'configure weight remove'
132        resps = self.get_resps_sc(req=10)
133        assert resps[0] == 5, 'weight 0 0'
134        assert resps[1] == 5, 'weight 0 1'
135
136        assert 'success' in self.conf(
137            '1', 'upstreams/one/servers/127.0.0.1:7081/weight'
138        ), 'configure weight 1'
139
140        resps = self.get_resps_sc()
141        assert resps[0] == 50, 'weight 1 0'
142        assert resps[1] == 50, 'weight 1 1'
143
144        assert 'success' in self.conf(
145            {
146                "127.0.0.1:7081": {"weight": 3},
147                "127.0.0.1:7083": {"weight": 2},
148            },
149            'upstreams/one/servers',
150        ), 'configure weight 2'
151
152        resps = self.get_resps_sc()
153        assert resps[0] == 60, 'weight 2 0'
154        assert resps[2] == 40, 'weight 2 1'
155
156    def test_upstreams_rr_weight_rational(self):
157        def set_weights(w1, w2):
158            assert 'success' in self.conf(
159                {
160                    "127.0.0.1:7081": {"weight": w1},
161                    "127.0.0.1:7082": {"weight": w2},
162                },
163                'upstreams/one/servers',
164            ), 'configure weights'
165
166        def check_reqs(w1, w2, reqs=10):
167            resps = self.get_resps_sc(req=reqs)
168            assert resps[0] == reqs * w1 / (w1 + w2), 'weight 1'
169            assert resps[1] == reqs * w2 / (w1 + w2), 'weight 2'
170
171        def check_weights(w1, w2):
172            set_weights(w1, w2)
173            check_reqs(w1, w2)
174
175        check_weights(0, 1)
176        check_weights(0, 999999.0123456)
177        check_weights(1, 9)
178        check_weights(100000, 900000)
179        check_weights(1, 0.25)
180        check_weights(1, 0.25)
181        check_weights(0.2, 0.8)
182        check_weights(1, 1.5)
183        check_weights(1e-3, 1e-3)
184        check_weights(1e-20, 1e-20)
185        check_weights(1e4, 1e4)
186        check_weights(1000000, 1000000)
187
188        set_weights(0.25, 0.25)
189        assert 'success' in self.conf_delete(
190            'upstreams/one/servers/127.0.0.1:7081/weight'
191        ), 'delete weight'
192        check_reqs(1, 0.25)
193
194        assert 'success' in self.conf(
195            {
196                "127.0.0.1:7081": {"weight": 0.1},
197                "127.0.0.1:7082": {"weight": 1},
198                "127.0.0.1:7083": {"weight": 0.9},
199            },
200            'upstreams/one/servers',
201        ), 'configure weights'
202        resps = self.get_resps_sc(req=20)
203        assert resps[0] == 1, 'weight 3 1'
204        assert resps[1] == 10, 'weight 3 2'
205        assert resps[2] == 9, 'weight 3 3'
206
207    def test_upstreams_rr_independent(self):
208        def sum_resps(*args):
209            sum = [0] * len(args[0])
210            for arg in args:
211                sum = [x + y for x, y in zip(sum, arg)]
212
213            return sum
214
215        resps = self.get_resps_sc(req=30, port=7090)
216        assert resps[0] == 15, 'dep two before 0'
217        assert resps[1] == 15, 'dep two before 1'
218
219        resps = self.get_resps_sc(req=30)
220        assert resps[0] == 15, 'dep one before 0'
221        assert resps[1] == 15, 'dep one before 1'
222
223        assert 'success' in self.conf(
224            '2', 'upstreams/two/servers/127.0.0.1:7081/weight'
225        ), 'configure dep weight'
226
227        resps = self.get_resps_sc(req=30, port=7090)
228        assert resps[0] == 20, 'dep two 0'
229        assert resps[1] == 10, 'dep two 1'
230
231        resps = self.get_resps_sc(req=30)
232        assert resps[0] == 15, 'dep one 0'
233        assert resps[1] == 15, 'dep one 1'
234
235        assert 'success' in self.conf(
236            '1', 'upstreams/two/servers/127.0.0.1:7081/weight'
237        ), 'configure dep weight 1'
238
239        r_one, r_two = [0, 0], [0, 0]
240        for _ in range(10):
241            r_one = sum_resps(r_one, self.get_resps(req=10))
242            r_two = sum_resps(r_two, self.get_resps(req=10, port=7090))
243
244        assert sum(r_one) == 100, 'dep one mix sum'
245        assert abs(r_one[0] - r_one[1]) <= self.cpu_count, 'dep one mix'
246        assert sum(r_two) == 100, 'dep two mix sum'
247        assert abs(r_two[0] - r_two[1]) <= self.cpu_count, 'dep two mix'
248
249    def test_upstreams_rr_delay(self):
250        assert 'success' in self.conf(
251            {
252                "listeners": {
253                    "*:7080": {"pass": "upstreams/one"},
254                    "*:7081": {"pass": "routes"},
255                    "*:7082": {"pass": "routes"},
256                },
257                "upstreams": {
258                    "one": {
259                        "servers": {
260                            "127.0.0.1:7081": {},
261                            "127.0.0.1:7082": {},
262                        },
263                    },
264                },
265                "routes": [
266                    {
267                        "match": {"destination": "*:7081"},
268                        "action": {"pass": "applications/delayed"},
269                    },
270                    {
271                        "match": {"destination": "*:7082"},
272                        "action": {"return": 201},
273                    },
274                ],
275                "applications": {
276                    "delayed": {
277                        "type": self.get_application_type(),
278                        "processes": {"spare": 0},
279                        "path": option.test_dir + "/python/delayed",
280                        "working_directory": option.test_dir
281                        + "/python/delayed",
282                        "module": "wsgi",
283                    }
284                },
285            },
286        ), 'upstreams initial configuration'
287
288        req = 50
289
290        socks = []
291        for i in range(req):
292            delay = 1 if i % 5 == 0 else 0
293            _, sock = self.get(
294                headers={
295                    'Host': 'localhost',
296                    'Content-Length': '0',
297                    'X-Delay': str(delay),
298                    'Connection': 'close',
299                },
300                start=True,
301                no_recv=True,
302            )
303            socks.append(sock)
304
305        resps = [0, 0]
306        for i in range(req):
307            resp = self.recvall(socks[i]).decode()
308            socks[i].close()
309
310            m = re.search(r'HTTP/1.1 20(\d)', resp)
311            assert m is not None, 'status'
312            resps[int(m.group(1))] += 1
313
314        assert sum(resps) == req, 'delay sum'
315        assert abs(resps[0] - resps[1]) <= self.cpu_count, 'delay'
316
317    def test_upstreams_rr_active_req(self):
318        conns = 5
319        socks = []
320        socks2 = []
321
322        for _ in range(conns):
323            _, sock = self.get(start=True, no_recv=True)
324            socks.append(sock)
325
326            _, sock2 = self.http(
327                b"""POST / HTTP/1.1
328Host: localhost
329Content-Length: 10
330Connection: close
331
332""",
333                start=True,
334                no_recv=True,
335                raw=True,
336            )
337            socks2.append(sock2)
338
339        # Send one more request and read response to make sure that previous
340        # requests had enough time to reach server.
341
342        assert self.get()['body'] == ''
343
344        assert 'success' in self.conf(
345            {"127.0.0.1:7083": {"weight": 2}}, 'upstreams/one/servers',
346        ), 'active req new server'
347        assert 'success' in self.conf_delete(
348            'upstreams/one/servers/127.0.0.1:7083'
349        ), 'active req server remove'
350        assert 'success' in self.conf_delete(
351            'listeners/*:7080'
352        ), 'delete listener'
353        assert 'success' in self.conf_delete(
354            'upstreams/one'
355        ), 'active req upstream remove'
356
357        for i in range(conns):
358            assert (
359                self.http(b'', sock=socks[i], raw=True)['body'] == ''
360            ), 'active req GET'
361
362            assert (
363                self.http(b"""0123456789""", sock=socks2[i], raw=True)['body']
364                == ''
365            ), 'active req POST'
366
367    def test_upstreams_rr_bad_server(self):
368        assert 'success' in self.conf(
369            {"weight": 1}, 'upstreams/one/servers/127.0.0.1:7084'
370        ), 'configure bad server'
371
372        resps = self.get_resps_sc(req=30)
373        assert resps[0] == 10, 'bad server 0'
374        assert resps[1] == 10, 'bad server 1'
375        assert sum(resps) == 20, 'bad server sum'
376
377    def test_upstreams_rr_pipeline(self):
378        resps = self.get_resps_sc()
379
380        assert resps[0] == 50, 'pipeline 0'
381        assert resps[1] == 50, 'pipeline 1'
382
383    def test_upstreams_rr_post(self):
384        resps = [0, 0]
385        for _ in range(50):
386            resps[self.get()['status'] % 10] += 1
387            resps[self.post(body='0123456789')['status'] % 10] += 1
388
389        assert sum(resps) == 100, 'post sum'
390        assert abs(resps[0] - resps[1]) <= self.cpu_count, 'post'
391
392    def test_upstreams_rr_unix(self, temp_dir):
393        addr_0 = temp_dir + '/sock_0'
394        addr_1 = temp_dir + '/sock_1'
395
396        assert 'success' in self.conf(
397            {
398                "*:7080": {"pass": "upstreams/one"},
399                "unix:" + addr_0: {"pass": "routes/one"},
400                "unix:" + addr_1: {"pass": "routes/two"},
401            },
402            'listeners',
403        ), 'configure listeners unix'
404
405        assert 'success' in self.conf(
406            {"unix:" + addr_0: {}, "unix:" + addr_1: {}},
407            'upstreams/one/servers',
408        ), 'configure servers unix'
409
410        resps = self.get_resps_sc()
411
412        assert resps[0] == 50, 'unix 0'
413        assert resps[1] == 50, 'unix 1'
414
415    def test_upstreams_rr_ipv6(self):
416        assert 'success' in self.conf(
417            {
418                "*:7080": {"pass": "upstreams/one"},
419                "[::1]:7081": {"pass": "routes/one"},
420                "[::1]:7082": {"pass": "routes/two"},
421            },
422            'listeners',
423        ), 'configure listeners ipv6'
424
425        assert 'success' in self.conf(
426            {"[::1]:7081": {}, "[::1]:7082": {}}, 'upstreams/one/servers'
427        ), 'configure servers ipv6'
428
429        resps = self.get_resps_sc()
430
431        assert resps[0] == 50, 'ipv6 0'
432        assert resps[1] == 50, 'ipv6 1'
433
434    def test_upstreams_rr_servers_empty(self):
435        assert 'success' in self.conf(
436            {}, 'upstreams/one/servers'
437        ), 'configure servers empty'
438        assert self.get()['status'] == 502, 'servers empty'
439
440        assert 'success' in self.conf(
441            {"127.0.0.1:7081": {"weight": 0}}, 'upstreams/one/servers'
442        ), 'configure servers empty one'
443        assert self.get()['status'] == 502, 'servers empty one'
444        assert 'success' in self.conf(
445            {
446                "127.0.0.1:7081": {"weight": 0},
447                "127.0.0.1:7082": {"weight": 0},
448            },
449            'upstreams/one/servers',
450        ), 'configure servers empty two'
451        assert self.get()['status'] == 502, 'servers empty two'
452
453    def test_upstreams_rr_invalid(self):
454        assert 'error' in self.conf({}, 'upstreams'), 'upstreams empty'
455        assert 'error' in self.conf(
456            {}, 'upstreams/one'
457        ), 'named upstreams empty'
458        assert 'error' in self.conf(
459            {}, 'upstreams/one/servers/127.0.0.1'
460        ), 'invalid address'
461        assert 'error' in self.conf(
462            {}, 'upstreams/one/servers/127.0.0.1:7081/blah'
463        ), 'invalid server option'
464
465        def check_weight(w):
466            assert 'error' in self.conf(
467                w, 'upstreams/one/servers/127.0.0.1:7081/weight'
468            ), 'invalid weight option'
469
470        check_weight({})
471        check_weight('-1')
472        check_weight('1.')
473        check_weight('1.1.')
474        check_weight('.')
475        check_weight('.01234567890123')
476        check_weight('1000001')
477        check_weight('2e6')
478