xref: /unit/test/test_proxy.py (revision 2190:fbfec2aaf4c3)
1import re
2import socket
3import time
4
5import pytest
6from conftest import run_process
7from unit.applications.lang.python import TestApplicationPython
8from unit.option import option
9from unit.utils import waitforsocket
10
11
12class TestProxy(TestApplicationPython):
13    prerequisites = {'modules': {'python': 'any'}}
14
15    SERVER_PORT = 7999
16
17    @staticmethod
18    def run_server(server_port):
19        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
20        sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
21
22        server_address = ('', server_port)
23        sock.bind(server_address)
24        sock.listen(5)
25
26        def recvall(sock):
27            buff_size = 4096
28            data = b''
29            while True:
30                part = sock.recv(buff_size)
31                data += part
32                if len(part) < buff_size:
33                    break
34            return data
35
36        req = b"""HTTP/1.1 200 OK
37Content-Length: 10
38
39"""
40
41        while True:
42            connection, client_address = sock.accept()
43
44            data = recvall(connection).decode()
45
46            to_send = req
47
48            m = re.search(r'X-Len: (\d+)', data)
49            if m:
50                to_send += b'X' * int(m.group(1))
51
52            connection.sendall(to_send)
53
54            connection.close()
55
56    def get_http10(self, *args, **kwargs):
57        return self.get(*args, http_10=True, **kwargs)
58
59    def post_http10(self, *args, **kwargs):
60        return self.post(*args, http_10=True, **kwargs)
61
62    def setup_method(self):
63        run_process(self.run_server, self.SERVER_PORT)
64        waitforsocket(self.SERVER_PORT)
65
66        assert 'success' in self.conf(
67            {
68                "listeners": {
69                    "*:7080": {"pass": "routes"},
70                    "*:7081": {"pass": "applications/mirror"},
71                },
72                "routes": [{"action": {"proxy": "http://127.0.0.1:7081"}}],
73                "applications": {
74                    "mirror": {
75                        "type": self.get_application_type(),
76                        "processes": {"spare": 0},
77                        "path": option.test_dir + "/python/mirror",
78                        "working_directory": option.test_dir + "/python/mirror",
79                        "module": "wsgi",
80                    },
81                    "custom_header": {
82                        "type": self.get_application_type(),
83                        "processes": {"spare": 0},
84                        "path": option.test_dir + "/python/custom_header",
85                        "working_directory": option.test_dir
86                        + "/python/custom_header",
87                        "module": "wsgi",
88                    },
89                    "delayed": {
90                        "type": self.get_application_type(),
91                        "processes": {"spare": 0},
92                        "path": option.test_dir + "/python/delayed",
93                        "working_directory": option.test_dir
94                        + "/python/delayed",
95                        "module": "wsgi",
96                    },
97                },
98            }
99        ), 'proxy initial configuration'
100
101    def test_proxy_http10(self):
102        for _ in range(10):
103            assert self.get_http10()['status'] == 200, 'status'
104
105    def test_proxy_chain(self):
106        assert 'success' in self.conf(
107            {
108                "listeners": {
109                    "*:7080": {"pass": "routes/first"},
110                    "*:7081": {"pass": "routes/second"},
111                    "*:7082": {"pass": "routes/third"},
112                    "*:7083": {"pass": "routes/fourth"},
113                    "*:7084": {"pass": "routes/fifth"},
114                    "*:7085": {"pass": "applications/mirror"},
115                },
116                "routes": {
117                    "first": [{"action": {"proxy": "http://127.0.0.1:7081"}}],
118                    "second": [{"action": {"proxy": "http://127.0.0.1:7082"}}],
119                    "third": [{"action": {"proxy": "http://127.0.0.1:7083"}}],
120                    "fourth": [{"action": {"proxy": "http://127.0.0.1:7084"}}],
121                    "fifth": [{"action": {"proxy": "http://127.0.0.1:7085"}}],
122                },
123                "applications": {
124                    "mirror": {
125                        "type": self.get_application_type(),
126                        "processes": {"spare": 0},
127                        "path": option.test_dir + "/python/mirror",
128                        "working_directory": option.test_dir + "/python/mirror",
129                        "module": "wsgi",
130                    }
131                },
132            }
133        ), 'proxy chain configuration'
134
135        assert self.get_http10()['status'] == 200, 'status'
136
137    def test_proxy_body(self):
138        payload = '0123456789'
139        for _ in range(10):
140            resp = self.post_http10(body=payload)
141
142            assert resp['status'] == 200, 'status'
143            assert resp['body'] == payload, 'body'
144
145        payload = 'X' * 4096
146        for _ in range(10):
147            resp = self.post_http10(body=payload)
148
149            assert resp['status'] == 200, 'status'
150            assert resp['body'] == payload, 'body'
151
152        payload = 'X' * 4097
153        for _ in range(10):
154            resp = self.post_http10(body=payload)
155
156            assert resp['status'] == 200, 'status'
157            assert resp['body'] == payload, 'body'
158
159        payload = 'X' * 4096 * 256
160        for _ in range(10):
161            resp = self.post_http10(body=payload, read_buffer_size=4096 * 128)
162
163            assert resp['status'] == 200, 'status'
164            assert resp['body'] == payload, 'body'
165
166        payload = 'X' * 4096 * 257
167        for _ in range(10):
168            resp = self.post_http10(body=payload, read_buffer_size=4096 * 128)
169
170            assert resp['status'] == 200, 'status'
171            assert resp['body'] == payload, 'body'
172
173        assert 'success' in self.conf(
174            {'http': {'max_body_size': 32 * 1024 * 1024}}, 'settings'
175        )
176
177        payload = '0123456789abcdef' * 32 * 64 * 1024
178        resp = self.post_http10(body=payload, read_buffer_size=1024 * 1024)
179        assert resp['status'] == 200, 'status'
180        assert resp['body'] == payload, 'body'
181
182    def test_proxy_parallel(self):
183        payload = 'X' * 4096 * 257
184        buff_size = 4096 * 258
185
186        socks = []
187        for i in range(10):
188            _, sock = self.post_http10(
189                body=payload + str(i),
190                start=True,
191                no_recv=True,
192                read_buffer_size=buff_size,
193            )
194            socks.append(sock)
195
196        for i in range(10):
197            resp = self.recvall(socks[i], buff_size=buff_size).decode()
198            socks[i].close()
199
200            resp = self._resp_to_dict(resp)
201
202            assert resp['status'] == 200, 'status'
203            assert resp['body'] == payload + str(i), 'body'
204
205    def test_proxy_header(self):
206        assert 'success' in self.conf(
207            {"pass": "applications/custom_header"}, 'listeners/*:7081'
208        ), 'custom_header configure'
209
210        header_value = 'blah'
211        assert (
212            self.get_http10(
213                headers={'Host': 'localhost', 'Custom-Header': header_value}
214            )['headers']['Custom-Header']
215            == header_value
216        ), 'custom header'
217
218        header_value = r'(),/:;<=>?@[\]{}\t !#$%&\'*+-.^_`|~'
219        assert (
220            self.get_http10(
221                headers={'Host': 'localhost', 'Custom-Header': header_value}
222            )['headers']['Custom-Header']
223            == header_value
224        ), 'custom header 2'
225
226        header_value = 'X' * 4096
227        assert (
228            self.get_http10(
229                headers={'Host': 'localhost', 'Custom-Header': header_value}
230            )['headers']['Custom-Header']
231            == header_value
232        ), 'custom header 3'
233
234        header_value = 'X' * 8191
235        assert (
236            self.get_http10(
237                headers={'Host': 'localhost', 'Custom-Header': header_value}
238            )['headers']['Custom-Header']
239            == header_value
240        ), 'custom header 4'
241
242        header_value = 'X' * 8192
243        assert (
244            self.get_http10(
245                headers={'Host': 'localhost', 'Custom-Header': header_value}
246            )['status']
247            == 431
248        ), 'custom header 5'
249
250    def test_proxy_fragmented(self):
251        _, sock = self.http(
252            b"""GET / HTT""", raw=True, start=True, no_recv=True
253        )
254
255        time.sleep(1)
256
257        sock.sendall("P/1.0\r\nHost: localhos".encode())
258
259        time.sleep(1)
260
261        sock.sendall("t\r\n\r\n".encode())
262
263        assert re.search(
264            '200 OK', self.recvall(sock).decode()
265        ), 'fragmented send'
266        sock.close()
267
268    def test_proxy_fragmented_close(self):
269        _, sock = self.http(
270            b"""GET / HTT""", raw=True, start=True, no_recv=True
271        )
272
273        time.sleep(1)
274
275        sock.sendall("P/1.0\r\nHo".encode())
276
277        sock.close()
278
279    def test_proxy_fragmented_body(self):
280        _, sock = self.http(
281            b"""GET / HTT""", raw=True, start=True, no_recv=True
282        )
283
284        time.sleep(1)
285
286        sock.sendall("P/1.0\r\nHost: localhost\r\n".encode())
287        sock.sendall("Content-Length: 30000\r\n".encode())
288
289        time.sleep(1)
290
291        sock.sendall("\r\n".encode())
292        sock.sendall(("X" * 10000).encode())
293
294        time.sleep(1)
295
296        sock.sendall(("X" * 10000).encode())
297
298        time.sleep(1)
299
300        sock.sendall(("X" * 10000).encode())
301
302        resp = self._resp_to_dict(self.recvall(sock).decode())
303        sock.close()
304
305        assert resp['status'] == 200, 'status'
306        assert resp['body'] == "X" * 30000, 'body'
307
308    def test_proxy_fragmented_body_close(self):
309        _, sock = self.http(
310            b"""GET / HTT""", raw=True, start=True, no_recv=True
311        )
312
313        time.sleep(1)
314
315        sock.sendall("P/1.0\r\nHost: localhost\r\n".encode())
316        sock.sendall("Content-Length: 30000\r\n".encode())
317
318        time.sleep(1)
319
320        sock.sendall("\r\n".encode())
321        sock.sendall(("X" * 10000).encode())
322
323        sock.close()
324
325    def test_proxy_nowhere(self):
326        assert 'success' in self.conf(
327            [{"action": {"proxy": "http://127.0.0.1:7082"}}], 'routes'
328        ), 'proxy path changed'
329
330        assert self.get_http10()['status'] == 502, 'status'
331
332    def test_proxy_ipv6(self):
333        assert 'success' in self.conf(
334            {
335                "*:7080": {"pass": "routes"},
336                "[::1]:7081": {'application': 'mirror'},
337            },
338            'listeners',
339        ), 'add ipv6 listener configure'
340
341        assert 'success' in self.conf(
342            [{"action": {"proxy": "http://[::1]:7081"}}], 'routes'
343        ), 'proxy ipv6 configure'
344
345        assert self.get_http10()['status'] == 200, 'status'
346
347    def test_proxy_unix(self, temp_dir):
348        addr = temp_dir + '/sock'
349
350        assert 'success' in self.conf(
351            {
352                "*:7080": {"pass": "routes"},
353                "unix:" + addr: {'application': 'mirror'},
354            },
355            'listeners',
356        ), 'add unix listener configure'
357
358        assert 'success' in self.conf(
359            [{"action": {"proxy": 'http://unix:' + addr}}], 'routes'
360        ), 'proxy unix configure'
361
362        assert self.get_http10()['status'] == 200, 'status'
363
364    def test_proxy_delayed(self):
365        assert 'success' in self.conf(
366            {"pass": "applications/delayed"}, 'listeners/*:7081'
367        ), 'delayed configure'
368
369        body = '0123456789' * 1000
370        resp = self.post_http10(
371            headers={
372                'Host': 'localhost',
373                'Content-Length': str(len(body)),
374                'X-Parts': '2',
375                'X-Delay': '1',
376            },
377            body=body,
378        )
379
380        assert resp['status'] == 200, 'status'
381        assert resp['body'] == body, 'body'
382
383        resp = self.post_http10(
384            headers={
385                'Host': 'localhost',
386                'Content-Length': str(len(body)),
387                'X-Parts': '2',
388                'X-Delay': '1',
389            },
390            body=body,
391        )
392
393        assert resp['status'] == 200, 'status'
394        assert resp['body'] == body, 'body'
395
396    def test_proxy_delayed_close(self):
397        assert 'success' in self.conf(
398            {"pass": "applications/delayed"}, 'listeners/*:7081'
399        ), 'delayed configure'
400
401        _, sock = self.post_http10(
402            headers={
403                'Host': 'localhost',
404                'Content-Length': '10000',
405                'X-Parts': '3',
406                'X-Delay': '1',
407            },
408            body='0123456789' * 1000,
409            start=True,
410            no_recv=True,
411        )
412
413        assert re.search('200 OK', sock.recv(100).decode()), 'first'
414        sock.close()
415
416        _, sock = self.post_http10(
417            headers={
418                'Host': 'localhost',
419                'Content-Length': '10000',
420                'X-Parts': '3',
421                'X-Delay': '1',
422            },
423            body='0123456789' * 1000,
424            start=True,
425            no_recv=True,
426        )
427
428        assert re.search('200 OK', sock.recv(100).decode()), 'second'
429        sock.close()
430
431    @pytest.mark.skip('not yet')
432    def test_proxy_content_length(self):
433        assert 'success' in self.conf(
434            [
435                {
436                    "action": {
437                        "proxy": "http://127.0.0.1:" + str(self.SERVER_PORT)
438                    }
439                }
440            ],
441            'routes',
442        ), 'proxy backend configure'
443
444        resp = self.get_http10()
445        assert len(resp['body']) == 0, 'body lt Content-Length 0'
446
447        resp = self.get_http10(headers={'Host': 'localhost', 'X-Len': '5'})
448        assert len(resp['body']) == 5, 'body lt Content-Length 5'
449
450        resp = self.get_http10(headers={'Host': 'localhost', 'X-Len': '9'})
451        assert len(resp['body']) == 9, 'body lt Content-Length 9'
452
453        resp = self.get_http10(headers={'Host': 'localhost', 'X-Len': '11'})
454        assert len(resp['body']) == 10, 'body gt Content-Length 11'
455
456        resp = self.get_http10(headers={'Host': 'localhost', 'X-Len': '15'})
457        assert len(resp['body']) == 10, 'body gt Content-Length 15'
458
459    def test_proxy_invalid(self):
460        def check_proxy(proxy):
461            assert 'error' in self.conf(
462                [{"action": {"proxy": proxy}}], 'routes'
463            ), 'proxy invalid'
464
465        check_proxy('blah')
466        check_proxy('/blah')
467        check_proxy('unix:/blah')
468        check_proxy('http://blah')
469        check_proxy('http://127.0.0.1')
470        check_proxy('http://127.0.0.1:')
471        check_proxy('http://127.0.0.1:blah')
472        check_proxy('http://127.0.0.1:-1')
473        check_proxy('http://127.0.0.1:7080b')
474        check_proxy('http://[]')
475        check_proxy('http://[]:7080')
476        check_proxy('http://[:]:7080')
477        check_proxy('http://[::7080')
478
479    @pytest.mark.skip('not yet')
480    def test_proxy_loop(self, skip_alert):
481        skip_alert(
482            r'socket.*failed',
483            r'accept.*failed',
484            r'new connections are not accepted',
485        )
486        assert 'success' in self.conf(
487            {
488                "listeners": {
489                    "*:7080": {"pass": "routes"},
490                    "*:7081": {"pass": "applications/mirror"},
491                    "*:7082": {"pass": "routes"},
492                },
493                "routes": [{"action": {"proxy": "http://127.0.0.1:7082"}}],
494                "applications": {
495                    "mirror": {
496                        "type": self.get_application_type(),
497                        "processes": {"spare": 0},
498                        "path": option.test_dir + "/python/mirror",
499                        "working_directory": option.test_dir + "/python/mirror",
500                        "module": "wsgi",
501                    },
502                },
503            }
504        )
505
506        self.get_http10(no_recv=True)
507        self.get_http10(read_timeout=1)
508