xref: /unit/test/test_proxy.py (revision 2212:975ab1624784)
1import re
2import socket
3import time
4
5import pytest
6from conftest import run_process
7from unit.applications.lang.python import TestApplicationPython
8from unit.option import option
9from unit.utils import waitforsocket
10
11
12class TestProxy(TestApplicationPython):
13    prerequisites = {'modules': {'python': 'any'}}
14
15    SERVER_PORT = 7999
16
17    @staticmethod
18    def run_server(server_port):
19        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
20        sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
21
22        server_address = ('', server_port)
23        sock.bind(server_address)
24        sock.listen(5)
25
26        def recvall(sock):
27            buff_size = 4096
28            data = b''
29            while True:
30                part = sock.recv(buff_size)
31                data += part
32                if len(part) < buff_size:
33                    break
34            return data
35
36        req = b"""HTTP/1.1 200 OK
37Content-Length: 10
38
39"""
40
41        while True:
42            connection, client_address = sock.accept()
43
44            data = recvall(connection).decode()
45
46            to_send = req
47
48            m = re.search(r'X-Len: (\d+)', data)
49            if m:
50                to_send += b'X' * int(m.group(1))
51
52            connection.sendall(to_send)
53
54            connection.close()
55
56    def get_http10(self, *args, **kwargs):
57        return self.get(*args, http_10=True, **kwargs)
58
59    def post_http10(self, *args, **kwargs):
60        return self.post(*args, http_10=True, **kwargs)
61
62    def setup_method(self):
63        run_process(self.run_server, self.SERVER_PORT)
64        waitforsocket(self.SERVER_PORT)
65
66        assert 'success' in self.conf(
67            {
68                "listeners": {
69                    "*:7080": {"pass": "routes"},
70                    "*:7081": {"pass": "applications/mirror"},
71                },
72                "routes": [{"action": {"proxy": "http://127.0.0.1:7081"}}],
73                "applications": {
74                    "mirror": {
75                        "type": self.get_application_type(),
76                        "processes": {"spare": 0},
77                        "path": option.test_dir + "/python/mirror",
78                        "working_directory": option.test_dir + "/python/mirror",
79                        "module": "wsgi",
80                    },
81                    "custom_header": {
82                        "type": self.get_application_type(),
83                        "processes": {"spare": 0},
84                        "path": option.test_dir + "/python/custom_header",
85                        "working_directory": option.test_dir
86                        + "/python/custom_header",
87                        "module": "wsgi",
88                    },
89                    "delayed": {
90                        "type": self.get_application_type(),
91                        "processes": {"spare": 0},
92                        "path": option.test_dir + "/python/delayed",
93                        "working_directory": option.test_dir
94                        + "/python/delayed",
95                        "module": "wsgi",
96                    },
97                },
98            }
99        ), 'proxy initial configuration'
100
101    def test_proxy_http10(self):
102        for _ in range(10):
103            assert self.get_http10()['status'] == 200, 'status'
104
105    def test_proxy_chain(self):
106        assert 'success' in self.conf(
107            {
108                "listeners": {
109                    "*:7080": {"pass": "routes/first"},
110                    "*:7081": {"pass": "routes/second"},
111                    "*:7082": {"pass": "routes/third"},
112                    "*:7083": {"pass": "routes/fourth"},
113                    "*:7084": {"pass": "routes/fifth"},
114                    "*:7085": {"pass": "applications/mirror"},
115                },
116                "routes": {
117                    "first": [{"action": {"proxy": "http://127.0.0.1:7081"}}],
118                    "second": [{"action": {"proxy": "http://127.0.0.1:7082"}}],
119                    "third": [{"action": {"proxy": "http://127.0.0.1:7083"}}],
120                    "fourth": [{"action": {"proxy": "http://127.0.0.1:7084"}}],
121                    "fifth": [{"action": {"proxy": "http://127.0.0.1:7085"}}],
122                },
123                "applications": {
124                    "mirror": {
125                        "type": self.get_application_type(),
126                        "processes": {"spare": 0},
127                        "path": option.test_dir + "/python/mirror",
128                        "working_directory": option.test_dir + "/python/mirror",
129                        "module": "wsgi",
130                    }
131                },
132            }
133        ), 'proxy chain configuration'
134
135        assert self.get_http10()['status'] == 200, 'status'
136
137    def test_proxy_body(self):
138        payload = '0123456789'
139        for _ in range(10):
140            resp = self.post_http10(body=payload)
141
142            assert resp['status'] == 200, 'status'
143            assert resp['body'] == payload, 'body'
144
145        payload = 'X' * 4096
146        for _ in range(10):
147            resp = self.post_http10(body=payload)
148
149            assert resp['status'] == 200, 'status'
150            assert resp['body'] == payload, 'body'
151
152        payload = 'X' * 4097
153        for _ in range(10):
154            resp = self.post_http10(body=payload)
155
156            assert resp['status'] == 200, 'status'
157            assert resp['body'] == payload, 'body'
158
159        payload = 'X' * 4096 * 256
160        for _ in range(10):
161            resp = self.post_http10(body=payload, read_buffer_size=4096 * 128)
162
163            assert resp['status'] == 200, 'status'
164            assert resp['body'] == payload, 'body'
165
166        payload = 'X' * 4096 * 257
167        for _ in range(10):
168            resp = self.post_http10(body=payload, read_buffer_size=4096 * 128)
169
170            assert resp['status'] == 200, 'status'
171            assert resp['body'] == payload, 'body'
172
173        assert 'success' in self.conf(
174            {'http': {'max_body_size': 32 * 1024 * 1024}}, 'settings'
175        )
176
177        payload = '0123456789abcdef' * 32 * 64 * 1024
178        resp = self.post_http10(body=payload, read_buffer_size=1024 * 1024)
179        assert resp['status'] == 200, 'status'
180        assert resp['body'] == payload, 'body'
181
182    def test_proxy_parallel(self):
183        payload = 'X' * 4096 * 257
184        buff_size = 4096 * 258
185
186        socks = []
187        for i in range(10):
188            sock = self.post_http10(
189                body=payload + str(i),
190                no_recv=True,
191                read_buffer_size=buff_size,
192            )
193            socks.append(sock)
194
195        for i in range(10):
196            resp = self.recvall(socks[i], buff_size=buff_size).decode()
197            socks[i].close()
198
199            resp = self._resp_to_dict(resp)
200
201            assert resp['status'] == 200, 'status'
202            assert resp['body'] == payload + str(i), 'body'
203
204    def test_proxy_header(self):
205        assert 'success' in self.conf(
206            {"pass": "applications/custom_header"}, 'listeners/*:7081'
207        ), 'custom_header configure'
208
209        header_value = 'blah'
210        assert (
211            self.get_http10(
212                headers={'Host': 'localhost', 'Custom-Header': header_value}
213            )['headers']['Custom-Header']
214            == header_value
215        ), 'custom header'
216
217        header_value = r'(),/:;<=>?@[\]{}\t !#$%&\'*+-.^_`|~'
218        assert (
219            self.get_http10(
220                headers={'Host': 'localhost', 'Custom-Header': header_value}
221            )['headers']['Custom-Header']
222            == header_value
223        ), 'custom header 2'
224
225        header_value = 'X' * 4096
226        assert (
227            self.get_http10(
228                headers={'Host': 'localhost', 'Custom-Header': header_value}
229            )['headers']['Custom-Header']
230            == header_value
231        ), 'custom header 3'
232
233        header_value = 'X' * 8191
234        assert (
235            self.get_http10(
236                headers={'Host': 'localhost', 'Custom-Header': header_value}
237            )['headers']['Custom-Header']
238            == header_value
239        ), 'custom header 4'
240
241        header_value = 'X' * 8192
242        assert (
243            self.get_http10(
244                headers={'Host': 'localhost', 'Custom-Header': header_value}
245            )['status']
246            == 431
247        ), 'custom header 5'
248
249    def test_proxy_fragmented(self):
250        sock = self.http(b"""GET / HTT""", raw=True, no_recv=True)
251
252        time.sleep(1)
253
254        sock.sendall("P/1.0\r\nHost: localhos".encode())
255
256        time.sleep(1)
257
258        sock.sendall("t\r\n\r\n".encode())
259
260        assert re.search(
261            '200 OK', self.recvall(sock).decode()
262        ), 'fragmented send'
263        sock.close()
264
265    def test_proxy_fragmented_close(self):
266        sock = self.http(b"""GET / HTT""", raw=True, no_recv=True)
267
268        time.sleep(1)
269
270        sock.sendall("P/1.0\r\nHo".encode())
271
272        sock.close()
273
274    def test_proxy_fragmented_body(self):
275        sock = self.http(b"""GET / HTT""", raw=True, no_recv=True)
276
277        time.sleep(1)
278
279        sock.sendall("P/1.0\r\nHost: localhost\r\n".encode())
280        sock.sendall("Content-Length: 30000\r\n".encode())
281
282        time.sleep(1)
283
284        sock.sendall("\r\n".encode())
285        sock.sendall(("X" * 10000).encode())
286
287        time.sleep(1)
288
289        sock.sendall(("X" * 10000).encode())
290
291        time.sleep(1)
292
293        sock.sendall(("X" * 10000).encode())
294
295        resp = self._resp_to_dict(self.recvall(sock).decode())
296        sock.close()
297
298        assert resp['status'] == 200, 'status'
299        assert resp['body'] == "X" * 30000, 'body'
300
301    def test_proxy_fragmented_body_close(self):
302        sock = self.http(b"""GET / HTT""", raw=True, no_recv=True)
303
304        time.sleep(1)
305
306        sock.sendall("P/1.0\r\nHost: localhost\r\n".encode())
307        sock.sendall("Content-Length: 30000\r\n".encode())
308
309        time.sleep(1)
310
311        sock.sendall("\r\n".encode())
312        sock.sendall(("X" * 10000).encode())
313
314        sock.close()
315
316    def test_proxy_nowhere(self):
317        assert 'success' in self.conf(
318            [{"action": {"proxy": "http://127.0.0.1:7082"}}], 'routes'
319        ), 'proxy path changed'
320
321        assert self.get_http10()['status'] == 502, 'status'
322
323    def test_proxy_ipv6(self):
324        assert 'success' in self.conf(
325            {
326                "*:7080": {"pass": "routes"},
327                "[::1]:7081": {'application': 'mirror'},
328            },
329            'listeners',
330        ), 'add ipv6 listener configure'
331
332        assert 'success' in self.conf(
333            [{"action": {"proxy": "http://[::1]:7081"}}], 'routes'
334        ), 'proxy ipv6 configure'
335
336        assert self.get_http10()['status'] == 200, 'status'
337
338    def test_proxy_unix(self, temp_dir):
339        addr = temp_dir + '/sock'
340
341        assert 'success' in self.conf(
342            {
343                "*:7080": {"pass": "routes"},
344                "unix:" + addr: {'application': 'mirror'},
345            },
346            'listeners',
347        ), 'add unix listener configure'
348
349        assert 'success' in self.conf(
350            [{"action": {"proxy": 'http://unix:' + addr}}], 'routes'
351        ), 'proxy unix configure'
352
353        assert self.get_http10()['status'] == 200, 'status'
354
355    def test_proxy_delayed(self):
356        assert 'success' in self.conf(
357            {"pass": "applications/delayed"}, 'listeners/*:7081'
358        ), 'delayed configure'
359
360        body = '0123456789' * 1000
361        resp = self.post_http10(
362            headers={
363                'Host': 'localhost',
364                'Content-Length': str(len(body)),
365                'X-Parts': '2',
366                'X-Delay': '1',
367            },
368            body=body,
369        )
370
371        assert resp['status'] == 200, 'status'
372        assert resp['body'] == body, 'body'
373
374        resp = self.post_http10(
375            headers={
376                'Host': 'localhost',
377                'Content-Length': str(len(body)),
378                'X-Parts': '2',
379                'X-Delay': '1',
380            },
381            body=body,
382        )
383
384        assert resp['status'] == 200, 'status'
385        assert resp['body'] == body, 'body'
386
387    def test_proxy_delayed_close(self):
388        assert 'success' in self.conf(
389            {"pass": "applications/delayed"}, 'listeners/*:7081'
390        ), 'delayed configure'
391
392        sock = self.post_http10(
393            headers={
394                'Host': 'localhost',
395                'Content-Length': '10000',
396                'X-Parts': '3',
397                'X-Delay': '1',
398            },
399            body='0123456789' * 1000,
400            no_recv=True,
401        )
402
403        assert re.search('200 OK', sock.recv(100).decode()), 'first'
404        sock.close()
405
406        sock = self.post_http10(
407            headers={
408                'Host': 'localhost',
409                'Content-Length': '10000',
410                'X-Parts': '3',
411                'X-Delay': '1',
412            },
413            body='0123456789' * 1000,
414            no_recv=True,
415        )
416
417        assert re.search('200 OK', sock.recv(100).decode()), 'second'
418        sock.close()
419
420    @pytest.mark.skip('not yet')
421    def test_proxy_content_length(self):
422        assert 'success' in self.conf(
423            [
424                {
425                    "action": {
426                        "proxy": "http://127.0.0.1:" + str(self.SERVER_PORT)
427                    }
428                }
429            ],
430            'routes',
431        ), 'proxy backend configure'
432
433        resp = self.get_http10()
434        assert len(resp['body']) == 0, 'body lt Content-Length 0'
435
436        resp = self.get_http10(headers={'Host': 'localhost', 'X-Len': '5'})
437        assert len(resp['body']) == 5, 'body lt Content-Length 5'
438
439        resp = self.get_http10(headers={'Host': 'localhost', 'X-Len': '9'})
440        assert len(resp['body']) == 9, 'body lt Content-Length 9'
441
442        resp = self.get_http10(headers={'Host': 'localhost', 'X-Len': '11'})
443        assert len(resp['body']) == 10, 'body gt Content-Length 11'
444
445        resp = self.get_http10(headers={'Host': 'localhost', 'X-Len': '15'})
446        assert len(resp['body']) == 10, 'body gt Content-Length 15'
447
448    def test_proxy_invalid(self):
449        def check_proxy(proxy):
450            assert 'error' in self.conf(
451                [{"action": {"proxy": proxy}}], 'routes'
452            ), 'proxy invalid'
453
454        check_proxy('blah')
455        check_proxy('/blah')
456        check_proxy('unix:/blah')
457        check_proxy('http://blah')
458        check_proxy('http://127.0.0.1')
459        check_proxy('http://127.0.0.1:')
460        check_proxy('http://127.0.0.1:blah')
461        check_proxy('http://127.0.0.1:-1')
462        check_proxy('http://127.0.0.1:7080b')
463        check_proxy('http://[]')
464        check_proxy('http://[]:7080')
465        check_proxy('http://[:]:7080')
466        check_proxy('http://[::7080')
467
468    @pytest.mark.skip('not yet')
469    def test_proxy_loop(self, skip_alert):
470        skip_alert(
471            r'socket.*failed',
472            r'accept.*failed',
473            r'new connections are not accepted',
474        )
475        assert 'success' in self.conf(
476            {
477                "listeners": {
478                    "*:7080": {"pass": "routes"},
479                    "*:7081": {"pass": "applications/mirror"},
480                    "*:7082": {"pass": "routes"},
481                },
482                "routes": [{"action": {"proxy": "http://127.0.0.1:7082"}}],
483                "applications": {
484                    "mirror": {
485                        "type": self.get_application_type(),
486                        "processes": {"spare": 0},
487                        "path": option.test_dir + "/python/mirror",
488                        "working_directory": option.test_dir + "/python/mirror",
489                        "module": "wsgi",
490                    },
491                },
492            }
493        )
494
495        self.get_http10(no_recv=True)
496        self.get_http10(read_timeout=1)
497