xref: /unit/test/test_upstreams_rr.py (revision 1445:5a603827ec10)
1import os
2import re
3import unittest
4from unit.applications.lang.python import TestApplicationPython
5
6
7class TestUpstreamsRR(TestApplicationPython):
8    prerequisites = {'modules': ['python']}
9
10    def setUp(self):
11        super().setUp()
12
13        self.assertIn(
14            'success',
15            self.conf(
16                {
17                    "listeners": {
18                        "*:7080": {"pass": "upstreams/one"},
19                        "*:7090": {"pass": "upstreams/two"},
20                        "*:7081": {"pass": "routes/one"},
21                        "*:7082": {"pass": "routes/two"},
22                        "*:7083": {"pass": "routes/three"},
23                    },
24                    "upstreams": {
25                        "one": {
26                            "servers": {
27                                "127.0.0.1:7081": {},
28                                "127.0.0.1:7082": {},
29                            },
30                        },
31                        "two": {
32                            "servers": {
33                                "127.0.0.1:7081": {},
34                                "127.0.0.1:7082": {},
35                            },
36                        },
37                    },
38                    "routes": {
39                        "one": [{"action": {"return": 200}}],
40                        "two": [{"action": {"return": 201}}],
41                        "three": [{"action": {"return": 202}}],
42                    },
43                    "applications": {},
44                },
45            ),
46            'upstreams initial configuration',
47        )
48
49        self.cpu_count = os.cpu_count()
50
51    def get_resps(self, req=100, port=7080):
52        resps = [0]
53
54        for _ in range(req):
55            status = self.get(port=port)['status']
56            if 200 > status or status > 209:
57                continue
58
59            ups = status % 10
60            if ups > len(resps) - 1:
61                resps.extend([0] * (ups - len(resps) + 1))
62
63            resps[ups] += 1
64
65        return resps
66
67    def get_resps_sc(self, req=100, port=7080):
68        to_send = b"""GET / HTTP/1.1
69Host: localhost
70
71""" * (
72            req - 1
73        )
74
75        to_send += b"""GET / HTTP/1.1
76Host: localhost
77Connection: close
78
79"""
80
81        resp = self.http(to_send, raw_resp=True, raw=True, port=port)
82        status = re.findall(r'HTTP\/\d\.\d\s(\d\d\d)', resp)
83        status = list(filter(lambda x: x[:2] == '20', status))
84        ups = list(map(lambda x: int(x[-1]), status))
85
86        resps = [0] * (max(ups) + 1)
87        for i in range(len(ups)):
88            resps[ups[i]] += 1
89
90        return resps
91
92    def test_upstreams_rr_no_weight(self):
93        resps = self.get_resps()
94        self.assertEqual(sum(resps), 100, 'no weight sum')
95        self.assertLessEqual(
96            abs(resps[0] - resps[1]), self.cpu_count, 'no weight'
97        )
98
99        self.assertIn(
100            'success',
101            self.conf_delete('upstreams/one/servers/127.0.0.1:7081'),
102            'no weight server remove',
103        )
104
105        resps = self.get_resps(req=50)
106        self.assertEqual(resps[1], 50, 'no weight 2')
107
108        self.assertIn(
109            'success',
110            self.conf({}, 'upstreams/one/servers/127.0.0.1:7081'),
111            'no weight server revert',
112        )
113
114        resps = self.get_resps()
115        self.assertEqual(sum(resps), 100, 'no weight 3 sum')
116        self.assertLessEqual(
117            abs(resps[0] - resps[1]), self.cpu_count, 'no weight 3'
118        )
119
120        self.assertIn(
121            'success',
122            self.conf({}, 'upstreams/one/servers/127.0.0.1:7083'),
123            'no weight server new',
124        )
125
126        resps = self.get_resps()
127        self.assertEqual(sum(resps), 100, 'no weight 4 sum')
128        self.assertLessEqual(
129            max(resps) - min(resps), self.cpu_count, 'no weight 4'
130        )
131
132        resps = self.get_resps_sc(req=30)
133        self.assertEqual(resps[0], 10, 'no weight 4 0')
134        self.assertEqual(resps[1], 10, 'no weight 4 1')
135        self.assertEqual(resps[2], 10, 'no weight 4 2')
136
137    def test_upstreams_rr_weight(self):
138        self.assertIn(
139            'success',
140            self.conf({"weight": 3}, 'upstreams/one/servers/127.0.0.1:7081'),
141            'configure weight',
142        )
143
144        resps = self.get_resps_sc()
145        self.assertEqual(resps[0], 75, 'weight 3 0')
146        self.assertEqual(resps[1], 25, 'weight 3 1')
147
148        self.assertIn(
149            'success',
150            self.conf_delete('upstreams/one/servers/127.0.0.1:7081/weight'),
151            'configure weight remove',
152        )
153        resps = self.get_resps_sc(req=10)
154        self.assertEqual(resps[0], 5, 'weight 0 0')
155        self.assertEqual(resps[1], 5, 'weight 0 1')
156
157        self.assertIn(
158            'success',
159            self.conf('1', 'upstreams/one/servers/127.0.0.1:7081/weight'),
160            'configure weight 1',
161        )
162
163        resps = self.get_resps_sc()
164        self.assertEqual(resps[0], 50, 'weight 1 0')
165        self.assertEqual(resps[1], 50, 'weight 1 1')
166
167        self.assertIn(
168            'success',
169            self.conf(
170                {
171                    "127.0.0.1:7081": {"weight": 3},
172                    "127.0.0.1:7083": {"weight": 2},
173                },
174                'upstreams/one/servers',
175            ),
176            'configure weight 2',
177        )
178
179        resps = self.get_resps_sc()
180        self.assertEqual(resps[0], 60, 'weight 2 0')
181        self.assertEqual(resps[2], 40, 'weight 2 1')
182
183    def test_upstreams_rr_weight_rational(self):
184        def set_weights(w1, w2):
185            self.assertIn(
186                'success',
187                self.conf(
188                    {
189                        "127.0.0.1:7081": {"weight": w1},
190                        "127.0.0.1:7082": {"weight": w2},
191                    },
192                    'upstreams/one/servers',
193                ),
194                'configure weights',
195            )
196
197        def check_reqs(w1, w2, reqs=10):
198            resps = self.get_resps_sc(req=reqs)
199            self.assertEqual(resps[0], reqs * w1 / (w1 + w2), 'weight 1')
200            self.assertEqual(resps[1], reqs * w2 / (w1 + w2), 'weight 2')
201
202        def check_weights(w1, w2):
203            set_weights(w1, w2)
204            check_reqs(w1, w2)
205
206        check_weights(0, 1)
207        check_weights(0, 999999.0123456)
208        check_weights(1, 9)
209        check_weights(100000, 900000)
210        check_weights(1, .25)
211        check_weights(1, 0.25)
212        check_weights(0.2, .8)
213        check_weights(1, 1.5)
214        check_weights(1e-3, 1E-3)
215        check_weights(1e-20, 1e-20)
216        check_weights(1e4, 1e4)
217        check_weights(1000000, 1000000)
218
219        set_weights(0.25, 0.25)
220        self.assertIn(
221            'success',
222            self.conf_delete('upstreams/one/servers/127.0.0.1:7081/weight'),
223            'delete weight',
224        )
225        check_reqs(1, 0.25)
226
227        self.assertIn(
228            'success',
229            self.conf(
230                {
231                    "127.0.0.1:7081": {"weight": 0.1},
232                    "127.0.0.1:7082": {"weight": 1},
233                    "127.0.0.1:7083": {"weight": 0.9},
234                },
235                'upstreams/one/servers',
236            ),
237            'configure weights',
238        )
239        resps = self.get_resps_sc(req=20)
240        self.assertEqual(resps[0], 1, 'weight 3 1')
241        self.assertEqual(resps[1], 10, 'weight 3 2')
242        self.assertEqual(resps[2], 9, 'weight 3 3')
243
244    def test_upstreams_rr_independent(self):
245        def sum_resps(*args):
246            sum = [0] * len(args[0])
247            for arg in args:
248                sum = [x + y for x, y in zip(sum, arg)]
249
250            return sum
251
252        resps = self.get_resps_sc(req=30, port=7090)
253        self.assertEqual(resps[0], 15, 'dep two before 0')
254        self.assertEqual(resps[1], 15, 'dep two before 1')
255
256        resps = self.get_resps_sc(req=30)
257        self.assertEqual(resps[0], 15, 'dep one before 0')
258        self.assertEqual(resps[1], 15, 'dep one before 1')
259
260        self.assertIn(
261            'success',
262            self.conf('2', 'upstreams/two/servers/127.0.0.1:7081/weight'),
263            'configure dep weight',
264        )
265
266        resps = self.get_resps_sc(req=30, port=7090)
267        self.assertEqual(resps[0], 20, 'dep two 0')
268        self.assertEqual(resps[1], 10, 'dep two 1')
269
270        resps = self.get_resps_sc(req=30)
271        self.assertEqual(resps[0], 15, 'dep one 0')
272        self.assertEqual(resps[1], 15, 'dep one 1')
273
274        self.assertIn(
275            'success',
276            self.conf('1', 'upstreams/two/servers/127.0.0.1:7081/weight'),
277            'configure dep weight 1',
278        )
279
280        r_one, r_two = [0, 0], [0, 0]
281        for _ in range(10):
282            r_one = sum_resps(r_one, self.get_resps(req=10))
283            r_two = sum_resps(r_two, self.get_resps(req=10, port=7090))
284
285
286        self.assertEqual(sum(r_one), 100, 'dep one mix sum')
287        self.assertLessEqual(
288            abs(r_one[0] - r_one[1]), self.cpu_count, 'dep one mix'
289        )
290        self.assertEqual(sum(r_two), 100, 'dep two mix sum')
291        self.assertLessEqual(
292            abs(r_two[0] - r_two[1]), self.cpu_count, 'dep two mix'
293        )
294
295    def test_upstreams_rr_delay(self):
296        self.assertIn(
297            'success',
298            self.conf(
299                {
300                    "listeners": {
301                        "*:7080": {"pass": "upstreams/one"},
302                        "*:7081": {"pass": "routes"},
303                        "*:7082": {"pass": "routes"},
304                    },
305                    "upstreams": {
306                        "one": {
307                            "servers": {
308                                "127.0.0.1:7081": {},
309                                "127.0.0.1:7082": {},
310                            },
311                        },
312                    },
313                    "routes": [
314                        {
315                            "match": {"destination": "*:7081"},
316                            "action": {"pass": "applications/delayed"},
317                        },
318                        {
319                            "match": {"destination": "*:7082"},
320                            "action": {"return": 201},
321                        },
322                    ],
323                    "applications": {
324                        "delayed": {
325                            "type": "python",
326                            "processes": {"spare": 0},
327                            "path": self.current_dir + "/python/delayed",
328                            "working_directory": self.current_dir
329                            + "/python/delayed",
330                            "module": "wsgi",
331                        }
332                    },
333                },
334            ),
335            'upstreams initial configuration',
336        )
337
338        req = 50
339
340        socks = []
341        for i in range(req):
342            delay = 1 if i % 5 == 0 else 0
343            _, sock = self.get(
344                headers={
345                    'Host': 'localhost',
346                    'Content-Length': '0',
347                    'X-Delay': str(delay),
348                    'Connection': 'close',
349                },
350                start=True,
351                no_recv=True,
352            )
353            socks.append(sock)
354
355        resps = [0, 0]
356        for i in range(req):
357            resp = self.recvall(socks[i]).decode()
358            socks[i].close()
359
360            m = re.search('HTTP/1.1 20(\d)', resp)
361            self.assertIsNotNone(m, 'status')
362            resps[int(m.group(1))] += 1
363
364        self.assertEqual(sum(resps), req, 'delay sum')
365        self.assertLessEqual(abs(resps[0] - resps[1]), self.cpu_count, 'delay')
366
367    def test_upstreams_rr_active_req(self):
368        conns = 5
369        socks = []
370        socks2 = []
371
372        for _ in range(conns):
373            _, sock = self.get(start=True, no_recv=True)
374            socks.append(sock)
375
376            _, sock2 = self.http(
377                b"""POST / HTTP/1.1
378Host: localhost
379Content-Length: 10
380Connection: close
381
382""",
383                start=True,
384                no_recv=True,
385                raw=True,
386            )
387            socks2.append(sock2)
388
389        # Send one more request and read response to make sure that previous
390        # requests had enough time to reach server.
391
392        self.assertEqual(self.get()['body'], '')
393
394        self.assertIn(
395            'success',
396            self.conf(
397                {"127.0.0.1:7083": {"weight": 2}}, 'upstreams/one/servers',
398            ),
399            'active req new server',
400        )
401        self.assertIn(
402            'success',
403            self.conf_delete('upstreams/one/servers/127.0.0.1:7083'),
404            'active req server remove',
405        )
406        self.assertIn(
407            'success', self.conf_delete('listeners/*:7080'), 'delete listener'
408        )
409        self.assertIn(
410            'success',
411            self.conf_delete('upstreams/one'),
412            'active req upstream remove',
413        )
414
415        for i in range(conns):
416            self.assertEqual(
417                self.http(b'', sock=socks[i], raw=True)['body'],
418                '',
419                'active req GET',
420            )
421
422            self.assertEqual(
423                self.http(b"""0123456789""", sock=socks2[i], raw=True)['body'],
424                '',
425                'active req POST',
426            )
427
428    def test_upstreams_rr_bad_server(self):
429        self.assertIn(
430            'success',
431            self.conf({"weight": 1}, 'upstreams/one/servers/127.0.0.1:7084'),
432            'configure bad server',
433        )
434
435        resps = self.get_resps_sc(req=30)
436        self.assertEqual(resps[0], 10, 'bad server 0')
437        self.assertEqual(resps[1], 10, 'bad server 1')
438        self.assertEqual(sum(resps), 20, 'bad server sum')
439
440    def test_upstreams_rr_pipeline(self):
441        resps = self.get_resps_sc()
442
443        self.assertEqual(resps[0], 50, 'pipeline 0')
444        self.assertEqual(resps[1], 50, 'pipeline 1')
445
446    def test_upstreams_rr_post(self):
447        resps = [0, 0]
448        for _ in range(50):
449            resps[self.get()['status'] % 10] += 1
450            resps[self.post(body='0123456789')['status'] % 10] += 1
451
452        self.assertEqual(sum(resps), 100, 'post sum')
453        self.assertLessEqual(abs(resps[0] - resps[1]), self.cpu_count, 'post')
454
455    def test_upstreams_rr_unix(self):
456        addr_0 = self.testdir + '/sock_0'
457        addr_1 = self.testdir + '/sock_1'
458
459        self.assertIn(
460            'success',
461            self.conf(
462                {
463                    "*:7080": {"pass": "upstreams/one"},
464                    "unix:" + addr_0: {"pass": "routes/one"},
465                    "unix:" + addr_1: {"pass": "routes/two"},
466                },
467                'listeners',
468            ),
469            'configure listeners unix',
470        )
471
472        self.assertIn(
473            'success',
474            self.conf(
475                {"unix:" + addr_0: {}, "unix:" + addr_1: {}},
476                'upstreams/one/servers',
477            ),
478            'configure servers unix',
479        )
480
481        resps = self.get_resps_sc()
482
483        self.assertEqual(resps[0], 50, 'unix 0')
484        self.assertEqual(resps[1], 50, 'unix 1')
485
486    def test_upstreams_rr_ipv6(self):
487        self.assertIn(
488            'success',
489            self.conf(
490                {
491                    "*:7080": {"pass": "upstreams/one"},
492                    "[::1]:7081": {"pass": "routes/one"},
493                    "[::1]:7082": {"pass": "routes/two"},
494                },
495                'listeners',
496            ),
497            'configure listeners ipv6',
498        )
499
500        self.assertIn(
501            'success',
502            self.conf(
503                {"[::1]:7081": {}, "[::1]:7082": {}}, 'upstreams/one/servers'
504            ),
505            'configure servers ipv6',
506        )
507
508        resps = self.get_resps_sc()
509
510        self.assertEqual(resps[0], 50, 'ipv6 0')
511        self.assertEqual(resps[1], 50, 'ipv6 1')
512
513    def test_upstreams_rr_servers_empty(self):
514        self.assertIn(
515            'success',
516            self.conf({}, 'upstreams/one/servers'),
517            'configure servers empty',
518        )
519        self.assertEqual(self.get()['status'], 502, 'servers empty')
520
521        self.assertIn(
522            'success',
523            self.conf(
524                {"127.0.0.1:7081": {"weight": 0}}, 'upstreams/one/servers'
525            ),
526            'configure servers empty one',
527        )
528        self.assertEqual(self.get()['status'], 502, 'servers empty one')
529        self.assertIn(
530            'success',
531            self.conf(
532                {
533                    "127.0.0.1:7081": {"weight": 0},
534                    "127.0.0.1:7082": {"weight": 0},
535                },
536                'upstreams/one/servers',
537            ),
538            'configure servers empty two',
539        )
540        self.assertEqual(self.get()['status'], 502, 'servers empty two')
541
542    def test_upstreams_rr_invalid(self):
543        self.assertIn(
544            'error', self.conf({}, 'upstreams'), 'upstreams empty',
545        )
546        self.assertIn(
547            'error', self.conf({}, 'upstreams/one'), 'named upstreams empty',
548        )
549        self.assertIn(
550            'error',
551            self.conf({}, 'upstreams/one/servers/127.0.0.1'),
552            'invalid address',
553        )
554        self.assertIn(
555            'error',
556            self.conf({}, 'upstreams/one/servers/127.0.0.1:7081/blah'),
557            'invalid server option',
558        )
559
560        def check_weight(w):
561            self.assertIn(
562                'error',
563                self.conf(w, 'upstreams/one/servers/127.0.0.1:7081/weight'),
564                'invalid weight option',
565            )
566        check_weight({})
567        check_weight('-1')
568        check_weight('1.')
569        check_weight('1.1.')
570        check_weight('.')
571        check_weight('.01234567890123')
572        check_weight('1000001')
573        check_weight('2e6')
574
575
576if __name__ == '__main__':
577    TestUpstreamsRR.main()
578