xref: /unit/test/unit/http.py (revision 1256:60e5df142860)
1import re
2import socket
3import select
4from unit.main import TestUnit
5
6
7class TestHTTP(TestUnit):
8    def http(self, start_str, **kwargs):
9        sock_type = (
10            'ipv4' if 'sock_type' not in kwargs else kwargs['sock_type']
11        )
12        port = 7080 if 'port' not in kwargs else kwargs['port']
13        url = '/' if 'url' not in kwargs else kwargs['url']
14        http = 'HTTP/1.0' if 'http_10' in kwargs else 'HTTP/1.1'
15        read_buffer_size = (
16            4096
17            if 'read_buffer_size' not in kwargs
18            else kwargs['read_buffer_size']
19        )
20
21        headers = (
22            {'Host': 'localhost', 'Connection': 'close'}
23            if 'headers' not in kwargs
24            else kwargs['headers']
25        )
26
27        body = b'' if 'body' not in kwargs else kwargs['body']
28        crlf = '\r\n'
29
30        if 'addr' not in kwargs:
31            addr = '::1' if sock_type == 'ipv6' else '127.0.0.1'
32        else:
33            addr = kwargs['addr']
34
35        sock_types = {
36            'ipv4': socket.AF_INET,
37            'ipv6': socket.AF_INET6,
38            'unix': socket.AF_UNIX,
39        }
40
41        if 'sock' not in kwargs:
42            sock = socket.socket(sock_types[sock_type], socket.SOCK_STREAM)
43
44            if (
45                sock_type == sock_types['ipv4']
46                or sock_type == sock_types['ipv6']
47            ):
48                sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
49
50            if 'wrapper' in kwargs:
51                sock = kwargs['wrapper'](sock)
52
53            connect_args = addr if sock_type == 'unix' else (addr, port)
54            try:
55                sock.connect(connect_args)
56            except ConnectionRefusedError:
57                sock.close()
58                return None
59
60        else:
61            sock = kwargs['sock']
62
63        if 'raw' not in kwargs:
64            req = ' '.join([start_str, url, http]) + crlf
65
66            if body != b'':
67                if isinstance(body, str):
68                    body = body.encode()
69
70                if 'Content-Length' not in headers:
71                    headers['Content-Length'] = len(body)
72
73            for header, value in headers.items():
74                if isinstance(value, list):
75                    for v in value:
76                        req += header + ': ' + str(v) + crlf
77
78                else:
79                    req += header + ': ' + str(value) + crlf
80
81            req = (req + crlf).encode() + body
82
83        else:
84            req = start_str
85
86        sock.sendall(req)
87
88        if TestUnit.detailed:
89            print('>>>')
90            try:
91                print(req.decode('utf-8', 'ignore'))
92            except UnicodeEncodeError:
93                print(req)
94
95        resp = ''
96
97        if 'no_recv' not in kwargs:
98            enc = 'utf-8' if 'encoding' not in kwargs else kwargs['encoding']
99            read_timeout = (
100                30 if 'read_timeout' not in kwargs else kwargs['read_timeout']
101            )
102            resp = self.recvall(
103                sock, read_timeout=read_timeout, buff_size=read_buffer_size
104            ).decode(enc)
105
106        if TestUnit.detailed:
107            print('<<<')
108            try:
109                print(resp)
110            except UnicodeEncodeError:
111                print(resp.encode())
112
113        if 'raw_resp' not in kwargs:
114            resp = self._resp_to_dict(resp)
115
116        if 'start' not in kwargs:
117            sock.close()
118            return resp
119
120        return (resp, sock)
121
122    def delete(self, **kwargs):
123        return self.http('DELETE', **kwargs)
124
125    def get(self, **kwargs):
126        return self.http('GET', **kwargs)
127
128    def head(self, **kwargs):
129        return self.http('HEAD', **kwargs)
130
131    def post(self, **kwargs):
132        return self.http('POST', **kwargs)
133
134    def put(self, **kwargs):
135        return self.http('PUT', **kwargs)
136
137    def recvall(self, sock, read_timeout=30, buff_size=4096):
138        data = b''
139        while select.select([sock], [], [], read_timeout)[0]:
140            try:
141                part = sock.recv(buff_size)
142            except:
143                break
144
145            data += part
146
147            if not len(part):
148                break
149
150        return data
151
152    def _resp_to_dict(self, resp):
153        m = re.search('(.*?\x0d\x0a?)\x0d\x0a?(.*)', resp, re.M | re.S)
154
155        if not m:
156            return {}
157
158        headers_text, body = m.group(1), m.group(2)
159
160        p = re.compile('(.*?)\x0d\x0a?', re.M | re.S)
161        headers_lines = p.findall(headers_text)
162
163        status = re.search(
164            '^HTTP\/\d\.\d\s(\d+)|$', headers_lines.pop(0)
165        ).group(1)
166
167        headers = {}
168        for line in headers_lines:
169            m = re.search('(.*)\:\s(.*)', line)
170
171            if m.group(1) not in headers:
172                headers[m.group(1)] = m.group(2)
173
174            elif isinstance(headers[m.group(1)], list):
175                headers[m.group(1)].append(m.group(2))
176
177            else:
178                headers[m.group(1)] = [headers[m.group(1)], m.group(2)]
179
180        return {'status': int(status), 'headers': headers, 'body': body}
181