xref: /unit/test/unit/http.py (revision 2616:ab2896c980ab)
1import binascii
2import io
3import json
4import os
5import re
6import select
7import socket
8
9import pytest
10
11from unit.option import option
12
13
14class HTTP1:
15    def http(self, start_str, **kwargs):
16        sock_type = kwargs.get('sock_type', 'ipv4')
17        port = kwargs.get('port', 8080)
18        url = kwargs.get('url', '/')
19        http = 'HTTP/1.0' if 'http_10' in kwargs else 'HTTP/1.1'
20
21        headers = kwargs.get(
22            'headers', {'Host': 'localhost', 'Connection': 'close'}
23        )
24
25        body = kwargs.get('body', b'')
26        crlf = '\r\n'
27
28        if 'addr' not in kwargs:
29            addr = '::1' if sock_type == 'ipv6' else '127.0.0.1'
30        else:
31            addr = kwargs['addr']
32
33        sock_types = {
34            'ipv4': socket.AF_INET,
35            'ipv6': socket.AF_INET6,
36            'unix': socket.AF_UNIX,
37        }
38
39        if 'sock' not in kwargs:
40            sock = socket.socket(sock_types[sock_type], socket.SOCK_STREAM)
41
42            if sock_type in (sock_types['ipv4'], sock_types['ipv6']):
43                sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
44
45            if 'wrapper' in kwargs:
46                server_hostname = headers.get('Host', None)
47                sock = kwargs['wrapper'](sock, server_hostname=server_hostname)
48
49            connect_args = addr if sock_type == 'unix' else (addr, port)
50            try:
51                sock.connect(connect_args)
52            except (ConnectionRefusedError, FileNotFoundError):
53                sock.close()
54                pytest.fail("Client can't connect to the server.")
55
56        else:
57            sock = kwargs['sock']
58
59        if 'raw' not in kwargs:
60            req = f'{start_str} {url} {http}{crlf}'
61
62            if body != b'':
63                if isinstance(body, str):
64                    body = body.encode()
65                elif isinstance(body, dict):
66                    body, content_type = self.form_encode(body)
67
68                    headers['Content-Type'] = content_type
69
70                if 'Content-Length' not in headers:
71                    headers['Content-Length'] = len(body)
72
73            for header, value in headers.items():
74                if isinstance(value, list):
75                    for v in value:
76                        req += f'{header}: {v}{crlf}'
77
78                else:
79                    req += f'{header}: {value}{crlf}'
80
81            req = (req + crlf).encode() + body
82
83        else:
84            req = start_str
85
86        sock.sendall(req)
87
88        encoding = kwargs.get('encoding', 'utf-8')
89
90        self.log_out(req, encoding)
91
92        resp = ''
93
94        if 'no_recv' not in kwargs:
95            recvall_kwargs = {}
96
97            if 'read_timeout' in kwargs:
98                recvall_kwargs['read_timeout'] = kwargs['read_timeout']
99
100            if 'read_buffer_size' in kwargs:
101                recvall_kwargs['buff_size'] = kwargs['read_buffer_size']
102
103            resp = self.recvall(sock, **recvall_kwargs).decode(
104                encoding, errors='ignore'
105            )
106
107        else:
108            return sock
109
110        self.log_in(resp)
111
112        if 'raw_resp' not in kwargs:
113            resp = self._resp_to_dict(resp)
114
115            headers = resp.get('headers')
116            if headers and headers.get('Transfer-Encoding') == 'chunked':
117                resp['body'] = self._parse_chunked_body(resp['body']).decode(
118                    encoding
119                )
120
121            if 'json' in kwargs:
122                resp = self._parse_json(resp)
123
124        if 'start' not in kwargs:
125            sock.close()
126            return resp
127
128        return (resp, sock)
129
130    def log_out(self, log, encoding):
131        if option.detailed:
132            print('>>>')
133            log = self.log_truncate(log)
134            try:
135                print(log.decode(encoding, 'ignore'))
136            except UnicodeEncodeError:
137                print(log)
138
139    def log_in(self, log):
140        if option.detailed:
141            print('<<<')
142            log = self.log_truncate(log)
143            try:
144                print(log)
145            except UnicodeEncodeError:
146                print(log.encode())
147
148    def log_truncate(self, log, limit=1024):
149        len_log = len(log)
150        if len_log > limit:
151            log = log[:limit]
152            appendix = f'(...logged {limit} of {len_log} bytes)'
153
154            if isinstance(log, bytes):
155                appendix = appendix.encode()
156
157            log = f'{log}{appendix}'
158
159        return log
160
161    def delete(self, **kwargs):
162        return self.http('DELETE', **kwargs)
163
164    def get(self, **kwargs):
165        return self.http('GET', **kwargs)
166
167    def head(self, **kwargs):
168        return self.http('HEAD', **kwargs)
169
170    def post(self, **kwargs):
171        return self.http('POST', **kwargs)
172
173    def put(self, **kwargs):
174        return self.http('PUT', **kwargs)
175
176    def recvall(self, sock, **kwargs):
177        timeout_default = 60
178
179        timeout = kwargs.get('read_timeout', timeout_default)
180        buff_size = kwargs.get('buff_size', 4096)
181
182        data = b''
183        while True:
184            rlist = select.select([sock], [], [], timeout)[0]
185            if not rlist:
186                # For all current cases if the "read_timeout" was changed
187                # than test do not expect to get a response from server.
188                if timeout == timeout_default:
189                    pytest.fail("Can't read response from server.")
190                break
191
192            try:
193                part = sock.recv(buff_size)
194
195            except KeyboardInterrupt:
196                raise
197
198            except:
199                break
200
201            data += part
202
203            if not part:
204                break
205
206        return data
207
208    def _resp_to_dict(self, resp):
209        m = re.search(r'(.*?\x0d\x0a?)\x0d\x0a?(.*)', resp, re.M | re.S)
210
211        if not m:
212            return {}
213
214        headers_text, body = m.group(1), m.group(2)
215        headers_lines = re.findall('(.*?)\x0d\x0a?', headers_text, re.M | re.S)
216
217        status = re.search(
218            r'^HTTP\/\d\.\d\s(\d+)|$', headers_lines.pop(0)
219        ).group(1)
220
221        headers = {}
222        for line in headers_lines:
223            m = re.search(r'(.*)\:\s(.*)', line)
224
225            if m.group(1) not in headers:
226                headers[m.group(1)] = m.group(2)
227
228            elif isinstance(headers[m.group(1)], list):
229                headers[m.group(1)].append(m.group(2))
230
231            else:
232                headers[m.group(1)] = [headers[m.group(1)], m.group(2)]
233
234        return {'status': int(status), 'headers': headers, 'body': body}
235
236    def _parse_chunked_body(self, raw_body):
237        if isinstance(raw_body, str):
238            raw_body = bytes(raw_body.encode())
239
240        crlf = b'\r\n'
241        chunks = raw_body.split(crlf)
242
243        if len(chunks) < 3:
244            pytest.fail('Invalid chunked body')
245
246        if chunks.pop() != b'':
247            pytest.fail('No CRLF at the end of the body')
248
249        try:
250            last_size = int(chunks[-2], 16)
251
252        except ValueError:
253            pytest.fail('Invalid zero size chunk')
254
255        if last_size != 0 or chunks[-1] != b'':
256            pytest.fail('Incomplete body')
257
258        body = b''
259        while len(chunks) >= 2:
260            try:
261                size = int(chunks.pop(0), 16)
262
263            except ValueError:
264                pytest.fail('Invalid chunk size')
265
266            if size == 0:
267                assert len(chunks) == 1, 'last zero size'
268                break
269
270            temp_body = crlf.join(chunks)
271
272            body += temp_body[:size]
273
274            temp_body = temp_body[size + len(crlf) :]
275
276            chunks = temp_body.split(crlf)
277
278        return body
279
280    def _parse_json(self, resp):
281        headers = resp['headers']
282
283        assert 'Content-Type' in headers
284        assert headers['Content-Type'] == 'application/json'
285
286        resp['body'] = json.loads(resp['body'])
287
288        return resp
289
290    def getjson(self, **kwargs):
291        return self.get(json=True, **kwargs)
292
293    def form_encode(self, fields):
294        is_multipart = False
295
296        for _, value in fields.items():
297            if isinstance(value, dict):
298                is_multipart = True
299                break
300
301        if is_multipart:
302            body, content_type = self.multipart_encode(fields)
303
304        else:
305            body, content_type = self.form_url_encode(fields)
306
307        return body, content_type
308
309    def form_url_encode(self, fields):
310        data = "&".join(
311            f'{name}={value}' for name, value in fields.items()
312        ).encode()
313        return data, 'application/x-www-form-urlencoded'
314
315    def multipart_encode(self, fields):
316        boundary = binascii.hexlify(os.urandom(16)).decode('ascii')
317
318        body = ''
319
320        for field, value in fields.items():
321            filename = ''
322            datatype = ''
323
324            if isinstance(value, dict):
325                datatype = 'text/plain'
326                filename = value['filename']
327
328                if value.get('type'):
329                    datatype = value['type']
330
331                if not isinstance(value['data'], io.IOBase):
332                    pytest.fail('multipart encoding of file requires a stream.')
333
334                data = value['data'].read()
335
336            elif isinstance(value, str):
337                data = value
338
339            else:
340                pytest.fail('multipart requires a string or stream data')
341
342            body += (
343                f'--{boundary}\r\nContent-Disposition: form-data;'
344                f'name="{field}"'
345            )
346
347            if filename != '':
348                body += f'; filename="{filename}"'
349
350            body += '\r\n'
351
352            if datatype != '':
353                body += f'Content-Type: {datatype}\r\n'
354
355            body += f'\r\n{data}\r\n'
356
357        body += f'--{boundary}--\r\n'
358
359        return body.encode(), f'multipart/form-data; boundary={boundary}'
360