xref: /unit/test/test_proxy.py (revision 1736:22db875fda34)
1import re
2import socket
3import time
4
5import pytest
6
7from conftest import run_process
8from unit.applications.lang.python import TestApplicationPython
9from unit.option import option
10from unit.utils import waitforsocket
11
12
13class TestProxy(TestApplicationPython):
14    prerequisites = {'modules': {'python': 'any'}}
15
16    SERVER_PORT = 7999
17
18    @staticmethod
19    def run_server(server_port):
20        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
21        sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
22
23        server_address = ('', server_port)
24        sock.bind(server_address)
25        sock.listen(5)
26
27        def recvall(sock):
28            buff_size = 4096
29            data = b''
30            while True:
31                part = sock.recv(buff_size)
32                data += part
33                if len(part) < buff_size:
34                    break
35            return data
36
37        req = b"""HTTP/1.1 200 OK
38Content-Length: 10
39
40"""
41
42        while True:
43            connection, client_address = sock.accept()
44
45            data = recvall(connection).decode()
46
47            to_send = req
48
49            m = re.search(r'X-Len: (\d+)', data)
50            if m:
51                to_send += b'X' * int(m.group(1))
52
53            connection.sendall(to_send)
54
55            connection.close()
56
57    def get_http10(self, *args, **kwargs):
58        return self.get(*args, http_10=True, **kwargs)
59
60    def post_http10(self, *args, **kwargs):
61        return self.post(*args, http_10=True, **kwargs)
62
63    def setup_method(self):
64        run_process(self.run_server, self.SERVER_PORT)
65        waitforsocket(self.SERVER_PORT)
66
67        assert 'success' in self.conf(
68            {
69                "listeners": {
70                    "*:7080": {"pass": "routes"},
71                    "*:7081": {"pass": "applications/mirror"},
72                },
73                "routes": [{"action": {"proxy": "http://127.0.0.1:7081"}}],
74                "applications": {
75                    "mirror": {
76                        "type": "python",
77                        "processes": {"spare": 0},
78                        "path": option.test_dir + "/python/mirror",
79                        "working_directory": option.test_dir
80                        + "/python/mirror",
81                        "module": "wsgi",
82                    },
83                    "custom_header": {
84                        "type": "python",
85                        "processes": {"spare": 0},
86                        "path": option.test_dir + "/python/custom_header",
87                        "working_directory": option.test_dir
88                        + "/python/custom_header",
89                        "module": "wsgi",
90                    },
91                    "delayed": {
92                        "type": "python",
93                        "processes": {"spare": 0},
94                        "path": option.test_dir + "/python/delayed",
95                        "working_directory": option.test_dir
96                        + "/python/delayed",
97                        "module": "wsgi",
98                    },
99                },
100            }
101        ), 'proxy initial configuration'
102
103    def test_proxy_http10(self):
104        for _ in range(10):
105            assert self.get_http10()['status'] == 200, 'status'
106
107    def test_proxy_chain(self):
108        assert 'success' in self.conf(
109            {
110                "listeners": {
111                    "*:7080": {"pass": "routes/first"},
112                    "*:7081": {"pass": "routes/second"},
113                    "*:7082": {"pass": "routes/third"},
114                    "*:7083": {"pass": "routes/fourth"},
115                    "*:7084": {"pass": "routes/fifth"},
116                    "*:7085": {"pass": "applications/mirror"},
117                },
118                "routes": {
119                    "first": [{"action": {"proxy": "http://127.0.0.1:7081"}}],
120                    "second": [{"action": {"proxy": "http://127.0.0.1:7082"}}],
121                    "third": [{"action": {"proxy": "http://127.0.0.1:7083"}}],
122                    "fourth": [{"action": {"proxy": "http://127.0.0.1:7084"}}],
123                    "fifth": [{"action": {"proxy": "http://127.0.0.1:7085"}}],
124                },
125                "applications": {
126                    "mirror": {
127                        "type": "python",
128                        "processes": {"spare": 0},
129                        "path": option.test_dir + "/python/mirror",
130                        "working_directory": option.test_dir
131                        + "/python/mirror",
132                        "module": "wsgi",
133                    }
134                },
135            }
136        ), 'proxy chain configuration'
137
138        assert self.get_http10()['status'] == 200, 'status'
139
140    def test_proxy_body(self):
141        payload = '0123456789'
142        for _ in range(10):
143            resp = self.post_http10(body=payload)
144
145            assert resp['status'] == 200, 'status'
146            assert resp['body'] == payload, 'body'
147
148        payload = 'X' * 4096
149        for _ in range(10):
150            resp = self.post_http10(body=payload)
151
152            assert resp['status'] == 200, 'status'
153            assert resp['body'] == payload, 'body'
154
155        payload = 'X' * 4097
156        for _ in range(10):
157            resp = self.post_http10(body=payload)
158
159            assert resp['status'] == 200, 'status'
160            assert resp['body'] == payload, 'body'
161
162        payload = 'X' * 4096 * 256
163        for _ in range(10):
164            resp = self.post_http10(body=payload, read_buffer_size=4096 * 128)
165
166            assert resp['status'] == 200, 'status'
167            assert resp['body'] == payload, 'body'
168
169        payload = 'X' * 4096 * 257
170        for _ in range(10):
171            resp = self.post_http10(body=payload, read_buffer_size=4096 * 128)
172
173            assert resp['status'] == 200, 'status'
174            assert resp['body'] == payload, 'body'
175
176        self.conf({'http': {'max_body_size': 32 * 1024 * 1024}}, 'settings')
177
178        payload = '0123456789abcdef' * 32 * 64 * 1024
179        resp = self.post_http10(body=payload, read_buffer_size=1024 * 1024)
180        assert resp['status'] == 200, 'status'
181        assert resp['body'] == payload, 'body'
182
183    def test_proxy_parallel(self):
184        payload = 'X' * 4096 * 257
185        buff_size = 4096 * 258
186
187        socks = []
188        for i in range(10):
189            _, sock = self.post_http10(
190                body=payload + str(i),
191                start=True,
192                no_recv=True,
193                read_buffer_size=buff_size,
194            )
195            socks.append(sock)
196
197        for i in range(10):
198            resp = self.recvall(socks[i], buff_size=buff_size).decode()
199            socks[i].close()
200
201            resp = self._resp_to_dict(resp)
202
203            assert resp['status'] == 200, 'status'
204            assert resp['body'] == payload + str(i), 'body'
205
206    def test_proxy_header(self):
207        assert 'success' in self.conf(
208            {"pass": "applications/custom_header"}, 'listeners/*:7081'
209        ), 'custom_header configure'
210
211        header_value = 'blah'
212        assert (
213            self.get_http10(
214                headers={'Host': 'localhost', 'Custom-Header': header_value}
215            )['headers']['Custom-Header']
216            == header_value
217        ), 'custom header'
218
219        header_value = r'(),/:;<=>?@[\]{}\t !#$%&\'*+-.^_`|~'
220        assert (
221            self.get_http10(
222                headers={'Host': 'localhost', 'Custom-Header': header_value}
223            )['headers']['Custom-Header']
224            == header_value
225        ), 'custom header 2'
226
227        header_value = 'X' * 4096
228        assert (
229            self.get_http10(
230                headers={'Host': 'localhost', 'Custom-Header': header_value}
231            )['headers']['Custom-Header']
232            == header_value
233        ), 'custom header 3'
234
235        header_value = 'X' * 8191
236        assert (
237            self.get_http10(
238                headers={'Host': 'localhost', 'Custom-Header': header_value}
239            )['headers']['Custom-Header']
240            == header_value
241        ), 'custom header 4'
242
243        header_value = 'X' * 8192
244        assert (
245            self.get_http10(
246                headers={'Host': 'localhost', 'Custom-Header': header_value}
247            )['status']
248            == 431
249        ), 'custom header 5'
250
251    def test_proxy_fragmented(self):
252        _, sock = self.http(
253            b"""GET / HTT""", raw=True, start=True, no_recv=True
254        )
255
256        time.sleep(1)
257
258        sock.sendall("P/1.0\r\nHost: localhos".encode())
259
260        time.sleep(1)
261
262        sock.sendall("t\r\n\r\n".encode())
263
264        assert re.search(
265            '200 OK', self.recvall(sock).decode()
266        ), 'fragmented send'
267        sock.close()
268
269    def test_proxy_fragmented_close(self):
270        _, sock = self.http(
271            b"""GET / HTT""", raw=True, start=True, no_recv=True
272        )
273
274        time.sleep(1)
275
276        sock.sendall("P/1.0\r\nHo".encode())
277
278        sock.close()
279
280    def test_proxy_fragmented_body(self):
281        _, sock = self.http(
282            b"""GET / HTT""", raw=True, start=True, no_recv=True
283        )
284
285        time.sleep(1)
286
287        sock.sendall("P/1.0\r\nHost: localhost\r\n".encode())
288        sock.sendall("Content-Length: 30000\r\n".encode())
289
290        time.sleep(1)
291
292        sock.sendall("\r\n".encode())
293        sock.sendall(("X" * 10000).encode())
294
295        time.sleep(1)
296
297        sock.sendall(("X" * 10000).encode())
298
299        time.sleep(1)
300
301        sock.sendall(("X" * 10000).encode())
302
303        resp = self._resp_to_dict(self.recvall(sock).decode())
304        sock.close()
305
306        assert resp['status'] == 200, 'status'
307        assert resp['body'] == "X" * 30000, 'body'
308
309    def test_proxy_fragmented_body_close(self):
310        _, sock = self.http(
311            b"""GET / HTT""", raw=True, start=True, no_recv=True
312        )
313
314        time.sleep(1)
315
316        sock.sendall("P/1.0\r\nHost: localhost\r\n".encode())
317        sock.sendall("Content-Length: 30000\r\n".encode())
318
319        time.sleep(1)
320
321        sock.sendall("\r\n".encode())
322        sock.sendall(("X" * 10000).encode())
323
324        sock.close()
325
326    def test_proxy_nowhere(self):
327        assert 'success' in self.conf(
328            [{"action": {"proxy": "http://127.0.0.1:7082"}}], 'routes'
329        ), 'proxy path changed'
330
331        assert self.get_http10()['status'] == 502, 'status'
332
333    def test_proxy_ipv6(self):
334        assert 'success' in self.conf(
335            {
336                "*:7080": {"pass": "routes"},
337                "[::1]:7081": {'application': 'mirror'},
338            },
339            'listeners',
340        ), 'add ipv6 listener configure'
341
342        assert 'success' in self.conf(
343            [{"action": {"proxy": "http://[::1]:7081"}}], 'routes'
344        ), 'proxy ipv6 configure'
345
346        assert self.get_http10()['status'] == 200, 'status'
347
348    def test_proxy_unix(self, temp_dir):
349        addr = temp_dir + '/sock'
350
351        assert 'success' in self.conf(
352            {
353                "*:7080": {"pass": "routes"},
354                "unix:" + addr: {'application': 'mirror'},
355            },
356            'listeners',
357        ), 'add unix listener configure'
358
359        assert 'success' in self.conf(
360            [{"action": {"proxy": 'http://unix:' + addr}}], 'routes'
361        ), 'proxy unix configure'
362
363        assert self.get_http10()['status'] == 200, 'status'
364
365    def test_proxy_delayed(self):
366        assert 'success' in self.conf(
367            {"pass": "applications/delayed"}, 'listeners/*:7081'
368        ), 'delayed configure'
369
370        body = '0123456789' * 1000
371        resp = self.post_http10(
372            headers={
373                'Host': 'localhost',
374                'Content-Type': 'text/html',
375                'Content-Length': str(len(body)),
376                'X-Parts': '2',
377                'X-Delay': '1',
378            },
379            body=body,
380        )
381
382        assert resp['status'] == 200, 'status'
383        assert resp['body'] == body, 'body'
384
385        resp = self.post_http10(
386            headers={
387                'Host': 'localhost',
388                'Content-Type': 'text/html',
389                'Content-Length': str(len(body)),
390                'X-Parts': '2',
391                'X-Delay': '1',
392            },
393            body=body,
394        )
395
396        assert resp['status'] == 200, 'status'
397        assert resp['body'] == body, 'body'
398
399    def test_proxy_delayed_close(self):
400        assert 'success' in self.conf(
401            {"pass": "applications/delayed"}, 'listeners/*:7081'
402        ), 'delayed configure'
403
404        _, sock = self.post_http10(
405            headers={
406                'Host': 'localhost',
407                'Content-Type': 'text/html',
408                'Content-Length': '10000',
409                'X-Parts': '3',
410                'X-Delay': '1',
411            },
412            body='0123456789' * 1000,
413            start=True,
414            no_recv=True,
415        )
416
417        assert re.search('200 OK', sock.recv(100).decode()), 'first'
418        sock.close()
419
420        _, sock = self.post_http10(
421            headers={
422                'Host': 'localhost',
423                'Content-Type': 'text/html',
424                'Content-Length': '10000',
425                'X-Parts': '3',
426                'X-Delay': '1',
427            },
428            body='0123456789' * 1000,
429            start=True,
430            no_recv=True,
431        )
432
433        assert re.search('200 OK', sock.recv(100).decode()), 'second'
434        sock.close()
435
436    @pytest.mark.skip('not yet')
437    def test_proxy_content_length(self):
438        assert 'success' in self.conf(
439            [
440                {
441                    "action": {
442                        "proxy": "http://127.0.0.1:" + str(self.SERVER_PORT)
443                    }
444                }
445            ],
446            'routes',
447        ), 'proxy backend configure'
448
449        resp = self.get_http10()
450        assert len(resp['body']) == 0, 'body lt Content-Length 0'
451
452        resp = self.get_http10(headers={'Host': 'localhost', 'X-Len': '5'})
453        assert len(resp['body']) == 5, 'body lt Content-Length 5'
454
455        resp = self.get_http10(headers={'Host': 'localhost', 'X-Len': '9'})
456        assert len(resp['body']) == 9, 'body lt Content-Length 9'
457
458        resp = self.get_http10(headers={'Host': 'localhost', 'X-Len': '11'})
459        assert len(resp['body']) == 10, 'body gt Content-Length 11'
460
461        resp = self.get_http10(headers={'Host': 'localhost', 'X-Len': '15'})
462        assert len(resp['body']) == 10, 'body gt Content-Length 15'
463
464    def test_proxy_invalid(self):
465        def check_proxy(proxy):
466            assert 'error' in \
467                self.conf([{"action": {"proxy": proxy}}], 'routes'), \
468                'proxy invalid'
469
470        check_proxy('blah')
471        check_proxy('/blah')
472        check_proxy('unix:/blah')
473        check_proxy('http://blah')
474        check_proxy('http://127.0.0.1')
475        check_proxy('http://127.0.0.1:')
476        check_proxy('http://127.0.0.1:blah')
477        check_proxy('http://127.0.0.1:-1')
478        check_proxy('http://127.0.0.1:7080b')
479        check_proxy('http://[]')
480        check_proxy('http://[]:7080')
481        check_proxy('http://[:]:7080')
482        check_proxy('http://[::7080')
483
484    def test_proxy_loop(self, skip_alert):
485        skip_alert(
486            r'socket.*failed',
487            r'accept.*failed',
488            r'new connections are not accepted',
489        )
490        self.conf(
491            {
492                "listeners": {
493                    "*:7080": {"pass": "routes"},
494                    "*:7081": {"pass": "applications/mirror"},
495                    "*:7082": {"pass": "routes"},
496                },
497                "routes": [{"action": {"proxy": "http://127.0.0.1:7082"}}],
498                "applications": {
499                    "mirror": {
500                        "type": "python",
501                        "processes": {"spare": 0},
502                        "path": option.test_dir + "/python/mirror",
503                        "working_directory": option.test_dir + "/python/mirror",
504                        "module": "wsgi",
505                    },
506                },
507            }
508        )
509
510        self.get_http10(no_recv=True)
511        self.get_http10(read_timeout=1)
512