xref: /unit/test/unit/http.py (revision 1295:f60a68728306)
1import re
2import time
3import json
4import socket
5import select
6from unit.main import TestUnit
7
8
9class TestHTTP(TestUnit):
10    def http(self, start_str, **kwargs):
11        sock_type = (
12            'ipv4' if 'sock_type' not in kwargs else kwargs['sock_type']
13        )
14        port = 7080 if 'port' not in kwargs else kwargs['port']
15        url = '/' if 'url' not in kwargs else kwargs['url']
16        http = 'HTTP/1.0' if 'http_10' in kwargs else 'HTTP/1.1'
17        read_buffer_size = (
18            4096
19            if 'read_buffer_size' not in kwargs
20            else kwargs['read_buffer_size']
21        )
22
23        headers = (
24            {'Host': 'localhost', 'Connection': 'close'}
25            if 'headers' not in kwargs
26            else kwargs['headers']
27        )
28
29        body = b'' if 'body' not in kwargs else kwargs['body']
30        crlf = '\r\n'
31
32        if 'addr' not in kwargs:
33            addr = '::1' if sock_type == 'ipv6' else '127.0.0.1'
34        else:
35            addr = kwargs['addr']
36
37        sock_types = {
38            'ipv4': socket.AF_INET,
39            'ipv6': socket.AF_INET6,
40            'unix': socket.AF_UNIX,
41        }
42
43        if 'sock' not in kwargs:
44            sock = socket.socket(sock_types[sock_type], socket.SOCK_STREAM)
45
46            if (
47                sock_type == sock_types['ipv4']
48                or sock_type == sock_types['ipv6']
49            ):
50                sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
51
52            if 'wrapper' in kwargs:
53                sock = kwargs['wrapper'](sock)
54
55            connect_args = addr if sock_type == 'unix' else (addr, port)
56            try:
57                sock.connect(connect_args)
58            except ConnectionRefusedError:
59                sock.close()
60                return None
61
62        else:
63            sock = kwargs['sock']
64
65        if 'raw' not in kwargs:
66            req = ' '.join([start_str, url, http]) + crlf
67
68            if body != b'':
69                if isinstance(body, str):
70                    body = body.encode()
71
72                if 'Content-Length' not in headers:
73                    headers['Content-Length'] = len(body)
74
75            for header, value in headers.items():
76                if isinstance(value, list):
77                    for v in value:
78                        req += header + ': ' + str(v) + crlf
79
80                else:
81                    req += header + ': ' + str(value) + crlf
82
83            req = (req + crlf).encode() + body
84
85        else:
86            req = start_str
87
88        sock.sendall(req)
89
90        encoding = 'utf-8' if 'encoding' not in kwargs else kwargs['encoding']
91
92        if TestUnit.detailed:
93            print('>>>')
94            try:
95                print(req.decode(encoding, 'ignore'))
96            except UnicodeEncodeError:
97                print(req)
98
99        resp = ''
100
101        if 'no_recv' not in kwargs:
102            read_timeout = (
103                30 if 'read_timeout' not in kwargs else kwargs['read_timeout']
104            )
105            resp = self.recvall(
106                sock, read_timeout=read_timeout, buff_size=read_buffer_size
107            ).decode(encoding)
108
109        if TestUnit.detailed:
110            print('<<<')
111            try:
112                print(resp)
113            except UnicodeEncodeError:
114                print(resp.encode())
115
116        if 'raw_resp' not in kwargs:
117            resp = self._resp_to_dict(resp)
118
119            headers = resp.get('headers')
120            if headers and headers.get('Transfer-Encoding') == 'chunked':
121                resp['body'] = self._parse_chunked_body(resp['body']).decode(
122                    encoding
123                )
124
125        if 'start' not in kwargs:
126            sock.close()
127            return resp
128
129        return (resp, sock)
130
131    def delete(self, **kwargs):
132        return self.http('DELETE', **kwargs)
133
134    def get(self, **kwargs):
135        return self.http('GET', **kwargs)
136
137    def head(self, **kwargs):
138        return self.http('HEAD', **kwargs)
139
140    def post(self, **kwargs):
141        return self.http('POST', **kwargs)
142
143    def put(self, **kwargs):
144        return self.http('PUT', **kwargs)
145
146    def recvall(self, sock, read_timeout=30, buff_size=4096):
147        data = b''
148        while select.select([sock], [], [], read_timeout)[0]:
149            try:
150                part = sock.recv(buff_size)
151            except:
152                break
153
154            data += part
155
156            if not len(part):
157                break
158
159        return data
160
161    def _resp_to_dict(self, resp):
162        m = re.search(r'(.*?\x0d\x0a?)\x0d\x0a?(.*)', resp, re.M | re.S)
163
164        if not m:
165            return {}
166
167        headers_text, body = m.group(1), m.group(2)
168
169        p = re.compile('(.*?)\x0d\x0a?', re.M | re.S)
170        headers_lines = p.findall(headers_text)
171
172        status = re.search(
173            r'^HTTP\/\d\.\d\s(\d+)|$', headers_lines.pop(0)
174        ).group(1)
175
176        headers = {}
177        for line in headers_lines:
178            m = re.search(r'(.*)\:\s(.*)', line)
179
180            if m.group(1) not in headers:
181                headers[m.group(1)] = m.group(2)
182
183            elif isinstance(headers[m.group(1)], list):
184                headers[m.group(1)].append(m.group(2))
185
186            else:
187                headers[m.group(1)] = [headers[m.group(1)], m.group(2)]
188
189        return {'status': int(status), 'headers': headers, 'body': body}
190
191    def _parse_chunked_body(self, raw_body):
192        if isinstance(raw_body, str):
193            raw_body = bytes(raw_body.encode())
194
195        crlf = b'\r\n'
196        chunks = raw_body.split(crlf)
197
198        if len(chunks) < 3:
199            self.fail('Invalid chunked body')
200
201        if chunks.pop() != b'':
202            self.fail('No CRLF at the end of the body')
203
204        try:
205            last_size = int(chunks[-2], 16)
206        except:
207            self.fail('Invalid zero size chunk')
208
209        if last_size != 0 or chunks[-1] != b'':
210            self.fail('Incomplete body')
211
212        body = b''
213        while len(chunks) >= 2:
214            try:
215                size = int(chunks.pop(0), 16)
216            except:
217                self.fail('Invalid chunk size %s' % str(size))
218
219            if size == 0:
220                self.assertEqual(len(chunks), 1, 'last zero size')
221                break
222
223            temp_body = crlf.join(chunks)
224
225            body += temp_body[:size]
226
227            temp_body = temp_body[size + len(crlf) :]
228
229            chunks = temp_body.split(crlf)
230
231        return body
232
233    def waitforsocket(self, port):
234        ret = False
235
236        for i in range(50):
237            try:
238                sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
239                sock.connect(('127.0.0.1', port))
240                ret = True
241                break
242            except:
243                sock.close()
244                time.sleep(0.1)
245
246        sock.close()
247
248        self.assertTrue(ret, 'socket connected')
249