xref: /unit/test/unit/http.py (revision 2073:bc6ad31ce286)
1import binascii
2import io
3import json
4import os
5import re
6import select
7import socket
8
9import pytest
10from unit.option import option
11
12
13class TestHTTP:
14    def http(self, start_str, **kwargs):
15        sock_type = kwargs.get('sock_type', 'ipv4')
16        port = kwargs.get('port', 7080)
17        url = kwargs.get('url', '/')
18        http = 'HTTP/1.0' if 'http_10' in kwargs else 'HTTP/1.1'
19
20        headers = kwargs.get(
21            'headers', {'Host': 'localhost', 'Connection': 'close'}
22        )
23
24        body = kwargs.get('body', b'')
25        crlf = '\r\n'
26
27        if 'addr' not in kwargs:
28            addr = '::1' if sock_type == 'ipv6' else '127.0.0.1'
29        else:
30            addr = kwargs['addr']
31
32        sock_types = {
33            'ipv4': socket.AF_INET,
34            'ipv6': socket.AF_INET6,
35            'unix': socket.AF_UNIX,
36        }
37
38        if 'sock' not in kwargs:
39            sock = socket.socket(sock_types[sock_type], socket.SOCK_STREAM)
40
41            if (
42                sock_type == sock_types['ipv4']
43                or sock_type == sock_types['ipv6']
44            ):
45                sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
46
47            if 'wrapper' in kwargs:
48                server_hostname = headers.get('Host', None)
49                sock = kwargs['wrapper'](sock, server_hostname=server_hostname)
50
51            connect_args = addr if sock_type == 'unix' else (addr, port)
52            try:
53                sock.connect(connect_args)
54            except ConnectionRefusedError:
55                sock.close()
56                pytest.fail('Client can\'t connect to the server.')
57
58        else:
59            sock = kwargs['sock']
60
61        if 'raw' not in kwargs:
62            req = ' '.join([start_str, url, http]) + crlf
63
64            if body != b'':
65                if isinstance(body, str):
66                    body = body.encode()
67                elif isinstance(body, dict):
68                    body, content_type = self.form_encode(body)
69
70                    headers['Content-Type'] = content_type
71
72                if 'Content-Length' not in headers:
73                    headers['Content-Length'] = len(body)
74
75            for header, value in headers.items():
76                if isinstance(value, list):
77                    for v in value:
78                        req += header + ': ' + str(v) + crlf
79
80                else:
81                    req += header + ': ' + str(value) + crlf
82
83            req = (req + crlf).encode() + body
84
85        else:
86            req = start_str
87
88        sock.sendall(req)
89
90        encoding = kwargs.get('encoding', 'utf-8')
91
92        self.log_out(req, encoding)
93
94        resp = ''
95
96        if 'no_recv' not in kwargs:
97            recvall_kwargs = {}
98
99            if 'read_timeout' in kwargs:
100                recvall_kwargs['read_timeout'] = kwargs['read_timeout']
101
102            if 'read_buffer_size' in kwargs:
103                recvall_kwargs['buff_size'] = kwargs['read_buffer_size']
104
105            resp = self.recvall(sock, **recvall_kwargs).decode(encoding)
106
107        self.log_in(resp)
108
109        if 'raw_resp' not in kwargs:
110            resp = self._resp_to_dict(resp)
111
112            headers = resp.get('headers')
113            if headers and headers.get('Transfer-Encoding') == 'chunked':
114                resp['body'] = self._parse_chunked_body(resp['body']).decode(
115                    encoding
116                )
117
118            if 'json' in kwargs:
119                resp = self._parse_json(resp)
120
121        if 'start' not in kwargs:
122            sock.close()
123            return resp
124
125        return (resp, sock)
126
127    def log_out(self, log, encoding):
128        if option.detailed:
129            print('>>>')
130            log = self.log_truncate(log)
131            try:
132                print(log.decode(encoding, 'ignore'))
133            except UnicodeEncodeError:
134                print(log)
135
136    def log_in(self, log):
137        if option.detailed:
138            print('<<<')
139            log = self.log_truncate(log)
140            try:
141                print(log)
142            except UnicodeEncodeError:
143                print(log.encode())
144
145    def log_truncate(self, log, limit=1024):
146        len_log = len(log)
147        if len_log > limit:
148            log = log[:limit]
149            appendix = '(...logged %s of %s bytes)' % (limit, len_log)
150
151            if isinstance(log, bytes):
152                appendix = appendix.encode()
153
154            log = log + appendix
155
156        return log
157
158    def delete(self, **kwargs):
159        return self.http('DELETE', **kwargs)
160
161    def get(self, **kwargs):
162        return self.http('GET', **kwargs)
163
164    def head(self, **kwargs):
165        return self.http('HEAD', **kwargs)
166
167    def post(self, **kwargs):
168        return self.http('POST', **kwargs)
169
170    def put(self, **kwargs):
171        return self.http('PUT', **kwargs)
172
173    def recvall(self, sock, **kwargs):
174        timeout_default = 60
175
176        timeout = kwargs.get('read_timeout', timeout_default)
177        buff_size = kwargs.get('buff_size', 4096)
178
179        data = b''
180        while True:
181            rlist = select.select([sock], [], [], timeout)[0]
182            if not rlist:
183                # For all current cases if the "read_timeout" was changed
184                # than test do not expect to get a response from server.
185                if timeout == timeout_default:
186                    pytest.fail('Can\'t read response from server.')
187                break
188
189            try:
190                part = sock.recv(buff_size)
191
192            except KeyboardInterrupt:
193                raise
194
195            except:
196                break
197
198            data += part
199
200            if not len(part):
201                break
202
203        return data
204
205    def _resp_to_dict(self, resp):
206        m = re.search(r'(.*?\x0d\x0a?)\x0d\x0a?(.*)', resp, re.M | re.S)
207
208        if not m:
209            return {}
210
211        headers_text, body = m.group(1), m.group(2)
212
213        p = re.compile('(.*?)\x0d\x0a?', re.M | re.S)
214        headers_lines = p.findall(headers_text)
215
216        status = re.search(
217            r'^HTTP\/\d\.\d\s(\d+)|$', headers_lines.pop(0)
218        ).group(1)
219
220        headers = {}
221        for line in headers_lines:
222            m = re.search(r'(.*)\:\s(.*)', line)
223
224            if m.group(1) not in headers:
225                headers[m.group(1)] = m.group(2)
226
227            elif isinstance(headers[m.group(1)], list):
228                headers[m.group(1)].append(m.group(2))
229
230            else:
231                headers[m.group(1)] = [headers[m.group(1)], m.group(2)]
232
233        return {'status': int(status), 'headers': headers, 'body': body}
234
235    def _parse_chunked_body(self, raw_body):
236        if isinstance(raw_body, str):
237            raw_body = bytes(raw_body.encode())
238
239        crlf = b'\r\n'
240        chunks = raw_body.split(crlf)
241
242        if len(chunks) < 3:
243            pytest.fail('Invalid chunked body')
244
245        if chunks.pop() != b'':
246            pytest.fail('No CRLF at the end of the body')
247
248        try:
249            last_size = int(chunks[-2], 16)
250
251        except ValueError:
252            pytest.fail('Invalid zero size chunk')
253
254        if last_size != 0 or chunks[-1] != b'':
255            pytest.fail('Incomplete body')
256
257        body = b''
258        while len(chunks) >= 2:
259            try:
260                size = int(chunks.pop(0), 16)
261
262            except ValueError:
263                pytest.fail('Invalid chunk size %s' % str(size))
264
265            if size == 0:
266                assert len(chunks) == 1, 'last zero size'
267                break
268
269            temp_body = crlf.join(chunks)
270
271            body += temp_body[:size]
272
273            temp_body = temp_body[size + len(crlf) :]
274
275            chunks = temp_body.split(crlf)
276
277        return body
278
279    def _parse_json(self, resp):
280        headers = resp['headers']
281
282        assert 'Content-Type' in headers
283        assert headers['Content-Type'] == 'application/json'
284
285        resp['body'] = json.loads(resp['body'])
286
287        return resp
288
289    def getjson(self, **kwargs):
290        return self.get(json=True, **kwargs)
291
292    def form_encode(self, fields):
293        is_multipart = False
294
295        for _, value in fields.items():
296            if isinstance(value, dict):
297                is_multipart = True
298                break
299
300        if is_multipart:
301            body, content_type = self.multipart_encode(fields)
302
303        else:
304            body, content_type = self.form_url_encode(fields)
305
306        return body, content_type
307
308    def form_url_encode(self, fields):
309        data = "&".join(
310            "%s=%s" % (name, value) for name, value in fields.items()
311        ).encode()
312        return data, 'application/x-www-form-urlencoded'
313
314    def multipart_encode(self, fields):
315        boundary = binascii.hexlify(os.urandom(16)).decode('ascii')
316
317        body = ''
318
319        for field, value in fields.items():
320            filename = ''
321            datatype = ''
322
323            if isinstance(value, dict):
324                datatype = 'text/plain'
325                filename = value['filename']
326
327                if value.get('type'):
328                    datatype = value['type']
329
330                if not isinstance(value['data'], io.IOBase):
331                    pytest.fail('multipart encoding of file requires a stream.')
332
333                data = value['data'].read()
334
335            elif isinstance(value, str):
336                data = value
337
338            else:
339                pytest.fail('multipart requires a string or stream data')
340
341            body += ("--%s\r\nContent-Disposition: form-data; name=\"%s\"") % (
342                boundary,
343                field,
344            )
345
346            if filename != '':
347                body += "; filename=\"%s\"" % filename
348
349            body += "\r\n"
350
351            if datatype != '':
352                body += "Content-Type: %s\r\n" % datatype
353
354            body += "\r\n%s\r\n" % data
355
356        body += "--%s--\r\n" % boundary
357
358        return body.encode(), "multipart/form-data; boundary=%s" % boundary
359