xref: /unit/test/unit/http.py (revision 2167:e38ab67c7f63)
1import binascii
2import io
3import json
4import os
5import re
6import select
7import socket
8
9import pytest
10from unit.option import option
11
12
13class TestHTTP:
14    def http(self, start_str, **kwargs):
15        sock_type = kwargs.get('sock_type', 'ipv4')
16        port = kwargs.get('port', 7080)
17        url = kwargs.get('url', '/')
18        http = 'HTTP/1.0' if 'http_10' in kwargs else 'HTTP/1.1'
19
20        headers = kwargs.get(
21            'headers', {'Host': 'localhost', 'Connection': 'close'}
22        )
23
24        body = kwargs.get('body', b'')
25        crlf = '\r\n'
26
27        if 'addr' not in kwargs:
28            addr = '::1' if sock_type == 'ipv6' else '127.0.0.1'
29        else:
30            addr = kwargs['addr']
31
32        sock_types = {
33            'ipv4': socket.AF_INET,
34            'ipv6': socket.AF_INET6,
35            'unix': socket.AF_UNIX,
36        }
37
38        if 'sock' not in kwargs:
39            sock = socket.socket(sock_types[sock_type], socket.SOCK_STREAM)
40
41            if (
42                sock_type == sock_types['ipv4']
43                or sock_type == sock_types['ipv6']
44            ):
45                sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
46
47            if 'wrapper' in kwargs:
48                server_hostname = headers.get('Host', None)
49                sock = kwargs['wrapper'](sock, server_hostname=server_hostname)
50
51            connect_args = addr if sock_type == 'unix' else (addr, port)
52            try:
53                sock.connect(connect_args)
54            except ConnectionRefusedError:
55                sock.close()
56                pytest.fail('Client can\'t connect to the server.')
57
58        else:
59            sock = kwargs['sock']
60
61        if 'raw' not in kwargs:
62            req = ' '.join([start_str, url, http]) + crlf
63
64            if body != b'':
65                if isinstance(body, str):
66                    body = body.encode()
67                elif isinstance(body, dict):
68                    body, content_type = self.form_encode(body)
69
70                    headers['Content-Type'] = content_type
71
72                if 'Content-Length' not in headers:
73                    headers['Content-Length'] = len(body)
74
75            for header, value in headers.items():
76                if isinstance(value, list):
77                    for v in value:
78                        req += header + ': ' + str(v) + crlf
79
80                else:
81                    req += header + ': ' + str(value) + crlf
82
83            req = (req + crlf).encode() + body
84
85        else:
86            req = start_str
87
88        sock.sendall(req)
89
90        encoding = kwargs.get('encoding', 'utf-8')
91
92        self.log_out(req, encoding)
93
94        resp = ''
95
96        if 'no_recv' not in kwargs:
97            recvall_kwargs = {}
98
99            if 'read_timeout' in kwargs:
100                recvall_kwargs['read_timeout'] = kwargs['read_timeout']
101
102            if 'read_buffer_size' in kwargs:
103                recvall_kwargs['buff_size'] = kwargs['read_buffer_size']
104
105            resp = self.recvall(sock, **recvall_kwargs).decode(encoding)
106
107        self.log_in(resp)
108
109        if 'raw_resp' not in kwargs:
110            resp = self._resp_to_dict(resp)
111
112            headers = resp.get('headers')
113            if headers and headers.get('Transfer-Encoding') == 'chunked':
114                resp['body'] = self._parse_chunked_body(resp['body']).decode(
115                    encoding
116                )
117
118            if 'json' in kwargs:
119                resp = self._parse_json(resp)
120
121        if 'start' not in kwargs:
122            sock.close()
123            return resp
124
125        return (resp, sock)
126
127    def log_out(self, log, encoding):
128        if option.detailed:
129            print('>>>')
130            log = self.log_truncate(log)
131            try:
132                print(log.decode(encoding, 'ignore'))
133            except UnicodeEncodeError:
134                print(log)
135
136    def log_in(self, log):
137        if option.detailed:
138            print('<<<')
139            log = self.log_truncate(log)
140            try:
141                print(log)
142            except UnicodeEncodeError:
143                print(log.encode())
144
145    def log_truncate(self, log, limit=1024):
146        len_log = len(log)
147        if len_log > limit:
148            log = log[:limit]
149            appendix = '(...logged %s of %s bytes)' % (limit, len_log)
150
151            if isinstance(log, bytes):
152                appendix = appendix.encode()
153
154            log = log + appendix
155
156        return log
157
158    def delete(self, **kwargs):
159        return self.http('DELETE', **kwargs)
160
161    def get(self, **kwargs):
162        return self.http('GET', **kwargs)
163
164    def head(self, **kwargs):
165        return self.http('HEAD', **kwargs)
166
167    def post(self, **kwargs):
168        return self.http('POST', **kwargs)
169
170    def put(self, **kwargs):
171        return self.http('PUT', **kwargs)
172
173    def recvall(self, sock, **kwargs):
174        timeout_default = 60
175
176        timeout = kwargs.get('read_timeout', timeout_default)
177        buff_size = kwargs.get('buff_size', 4096)
178
179        data = b''
180        while True:
181            rlist = select.select([sock], [], [], timeout)[0]
182            if not rlist:
183                # For all current cases if the "read_timeout" was changed
184                # than test do not expect to get a response from server.
185                if timeout == timeout_default:
186                    pytest.fail('Can\'t read response from server.')
187                break
188
189            try:
190                part = sock.recv(buff_size)
191
192            except KeyboardInterrupt:
193                raise
194
195            except:
196                break
197
198            data += part
199
200            if not len(part):
201                break
202
203        return data
204
205    def _resp_to_dict(self, resp):
206        m = re.search(r'(.*?\x0d\x0a?)\x0d\x0a?(.*)', resp, re.M | re.S)
207
208        if not m:
209            return {}
210
211        headers_text, body = m.group(1), m.group(2)
212        headers_lines = re.findall('(.*?)\x0d\x0a?', headers_text, re.M | re.S)
213
214        status = re.search(
215            r'^HTTP\/\d\.\d\s(\d+)|$', headers_lines.pop(0)
216        ).group(1)
217
218        headers = {}
219        for line in headers_lines:
220            m = re.search(r'(.*)\:\s(.*)', line)
221
222            if m.group(1) not in headers:
223                headers[m.group(1)] = m.group(2)
224
225            elif isinstance(headers[m.group(1)], list):
226                headers[m.group(1)].append(m.group(2))
227
228            else:
229                headers[m.group(1)] = [headers[m.group(1)], m.group(2)]
230
231        return {'status': int(status), 'headers': headers, 'body': body}
232
233    def _parse_chunked_body(self, raw_body):
234        if isinstance(raw_body, str):
235            raw_body = bytes(raw_body.encode())
236
237        crlf = b'\r\n'
238        chunks = raw_body.split(crlf)
239
240        if len(chunks) < 3:
241            pytest.fail('Invalid chunked body')
242
243        if chunks.pop() != b'':
244            pytest.fail('No CRLF at the end of the body')
245
246        try:
247            last_size = int(chunks[-2], 16)
248
249        except ValueError:
250            pytest.fail('Invalid zero size chunk')
251
252        if last_size != 0 or chunks[-1] != b'':
253            pytest.fail('Incomplete body')
254
255        body = b''
256        while len(chunks) >= 2:
257            try:
258                size = int(chunks.pop(0), 16)
259
260            except ValueError:
261                pytest.fail('Invalid chunk size %s' % str(size))
262
263            if size == 0:
264                assert len(chunks) == 1, 'last zero size'
265                break
266
267            temp_body = crlf.join(chunks)
268
269            body += temp_body[:size]
270
271            temp_body = temp_body[size + len(crlf) :]
272
273            chunks = temp_body.split(crlf)
274
275        return body
276
277    def _parse_json(self, resp):
278        headers = resp['headers']
279
280        assert 'Content-Type' in headers
281        assert headers['Content-Type'] == 'application/json'
282
283        resp['body'] = json.loads(resp['body'])
284
285        return resp
286
287    def getjson(self, **kwargs):
288        return self.get(json=True, **kwargs)
289
290    def form_encode(self, fields):
291        is_multipart = False
292
293        for _, value in fields.items():
294            if isinstance(value, dict):
295                is_multipart = True
296                break
297
298        if is_multipart:
299            body, content_type = self.multipart_encode(fields)
300
301        else:
302            body, content_type = self.form_url_encode(fields)
303
304        return body, content_type
305
306    def form_url_encode(self, fields):
307        data = "&".join(
308            "%s=%s" % (name, value) for name, value in fields.items()
309        ).encode()
310        return data, 'application/x-www-form-urlencoded'
311
312    def multipart_encode(self, fields):
313        boundary = binascii.hexlify(os.urandom(16)).decode('ascii')
314
315        body = ''
316
317        for field, value in fields.items():
318            filename = ''
319            datatype = ''
320
321            if isinstance(value, dict):
322                datatype = 'text/plain'
323                filename = value['filename']
324
325                if value.get('type'):
326                    datatype = value['type']
327
328                if not isinstance(value['data'], io.IOBase):
329                    pytest.fail('multipart encoding of file requires a stream.')
330
331                data = value['data'].read()
332
333            elif isinstance(value, str):
334                data = value
335
336            else:
337                pytest.fail('multipart requires a string or stream data')
338
339            body += ("--%s\r\nContent-Disposition: form-data; name=\"%s\"") % (
340                boundary,
341                field,
342            )
343
344            if filename != '':
345                body += "; filename=\"%s\"" % filename
346
347            body += "\r\n"
348
349            if datatype != '':
350                body += "Content-Type: %s\r\n" % datatype
351
352            body += "\r\n%s\r\n" % data
353
354        body += "--%s--\r\n" % boundary
355
356        return body.encode(), "multipart/form-data; boundary=%s" % boundary
357