http.py (1444:8f7f7970c07a) http.py (1453:71af60a59338)
1import binascii
2import io
3import os
4import re
5import time
6import json
7import socket
8import select
9from unit.main import TestUnit
10
11
12class TestHTTP(TestUnit):
13 def http(self, start_str, **kwargs):
14 sock_type = (
15 'ipv4' if 'sock_type' not in kwargs else kwargs['sock_type']
16 )
17 port = 7080 if 'port' not in kwargs else kwargs['port']
18 url = '/' if 'url' not in kwargs else kwargs['url']
19 http = 'HTTP/1.0' if 'http_10' in kwargs else 'HTTP/1.1'
20
21 headers = (
22 {'Host': 'localhost', 'Connection': 'close'}
23 if 'headers' not in kwargs
24 else kwargs['headers']
25 )
26
27 body = b'' if 'body' not in kwargs else kwargs['body']
28 crlf = '\r\n'
29
30 if 'addr' not in kwargs:
31 addr = '::1' if sock_type == 'ipv6' else '127.0.0.1'
32 else:
33 addr = kwargs['addr']
34
35 sock_types = {
36 'ipv4': socket.AF_INET,
37 'ipv6': socket.AF_INET6,
38 'unix': socket.AF_UNIX,
39 }
40
41 if 'sock' not in kwargs:
42 sock = socket.socket(sock_types[sock_type], socket.SOCK_STREAM)
43
44 if (
45 sock_type == sock_types['ipv4']
46 or sock_type == sock_types['ipv6']
47 ):
48 sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
49
50 if 'wrapper' in kwargs:
51 sock = kwargs['wrapper'](sock)
52
53 connect_args = addr if sock_type == 'unix' else (addr, port)
54 try:
55 sock.connect(connect_args)
56 except ConnectionRefusedError:
57 sock.close()
58 self.fail('Client can\'t connect to the server.')
59
60 else:
61 sock = kwargs['sock']
62
63 if 'raw' not in kwargs:
64 req = ' '.join([start_str, url, http]) + crlf
65
66 if body != b'':
67 if isinstance(body, str):
68 body = body.encode()
69 elif isinstance(body, dict):
70 body, content_type = self.form_encode(body)
71
72 headers['Content-Type'] = content_type
73
74 if 'Content-Length' not in headers:
75 headers['Content-Length'] = len(body)
76
77 for header, value in headers.items():
78 if isinstance(value, list):
79 for v in value:
80 req += header + ': ' + str(v) + crlf
81
82 else:
83 req += header + ': ' + str(value) + crlf
84
85 req = (req + crlf).encode() + body
86
87 else:
88 req = start_str
89
90 sock.sendall(req)
91
92 encoding = 'utf-8' if 'encoding' not in kwargs else kwargs['encoding']
93
94 self.log_out(req, encoding)
95
96 resp = ''
97
98 if 'no_recv' not in kwargs:
99 recvall_kwargs = {}
100
101 if 'read_timeout' in kwargs:
102 recvall_kwargs['read_timeout'] = kwargs['read_timeout']
103
104 if 'read_buffer_size' in kwargs:
105 recvall_kwargs['buff_size'] = kwargs['read_buffer_size']
106
107 resp = self.recvall(sock, **recvall_kwargs).decode(encoding)
108
109 self.log_in(resp)
110
111 if 'raw_resp' not in kwargs:
112 resp = self._resp_to_dict(resp)
113
114 headers = resp.get('headers')
115 if headers and headers.get('Transfer-Encoding') == 'chunked':
116 resp['body'] = self._parse_chunked_body(resp['body']).decode(
117 encoding
118 )
119
120 if 'json' in kwargs:
121 resp = self._parse_json(resp)
122
123 if 'start' not in kwargs:
124 sock.close()
125 return resp
126
127 return (resp, sock)
128
129 def log_out(self, log, encoding):
130 if TestUnit.detailed:
131 print('>>>')
132 log = self.log_truncate(log)
133 try:
134 print(log.decode(encoding, 'ignore'))
135 except UnicodeEncodeError:
136 print(log)
137
138 def log_in(self, log):
139 if TestUnit.detailed:
140 print('<<<')
141 log = self.log_truncate(log)
142 try:
143 print(log)
144 except UnicodeEncodeError:
145 print(log.encode())
146
147 def log_truncate(self, log, limit=1024):
148 len_log = len(log)
149 if len_log > limit:
150 log = log[:limit]
151 appendix = '(...logged %s of %s bytes)' % (limit, len_log)
152
153 if isinstance(log, bytes):
154 appendix = appendix.encode()
155
156 log = log + appendix
157
158 return log
159
160 def delete(self, **kwargs):
161 return self.http('DELETE', **kwargs)
162
163 def get(self, **kwargs):
164 return self.http('GET', **kwargs)
165
166 def head(self, **kwargs):
167 return self.http('HEAD', **kwargs)
168
169 def post(self, **kwargs):
170 return self.http('POST', **kwargs)
171
172 def put(self, **kwargs):
173 return self.http('PUT', **kwargs)
174
175 def recvall(self, sock, **kwargs):
176 timeout_default = 60
177
178 timeout = (
179 timeout_default
180 if 'read_timeout' not in kwargs
181 else kwargs['read_timeout']
182 )
183 buff_size = 4096 if 'buff_size' not in kwargs else kwargs['buff_size']
184
185 data = b''
186 while True:
187 rlist = select.select([sock], [], [], timeout)[0]
188 if not rlist:
189 # For all current cases if the "read_timeout" was changed
190 # than test do not expect to get a response from server.
191 if timeout == timeout_default:
192 self.fail('Can\'t read response from server.')
193 break
194
195 try:
196 part = sock.recv(buff_size)
197 except:
198 break
199
200 data += part
201
202 if not len(part):
203 break
204
205 return data
206
207 def _resp_to_dict(self, resp):
208 m = re.search(r'(.*?\x0d\x0a?)\x0d\x0a?(.*)', resp, re.M | re.S)
209
210 if not m:
211 return {}
212
213 headers_text, body = m.group(1), m.group(2)
214
215 p = re.compile('(.*?)\x0d\x0a?', re.M | re.S)
216 headers_lines = p.findall(headers_text)
217
218 status = re.search(
219 r'^HTTP\/\d\.\d\s(\d+)|$', headers_lines.pop(0)
220 ).group(1)
221
222 headers = {}
223 for line in headers_lines:
224 m = re.search(r'(.*)\:\s(.*)', line)
225
226 if m.group(1) not in headers:
227 headers[m.group(1)] = m.group(2)
228
229 elif isinstance(headers[m.group(1)], list):
230 headers[m.group(1)].append(m.group(2))
231
232 else:
233 headers[m.group(1)] = [headers[m.group(1)], m.group(2)]
234
235 return {'status': int(status), 'headers': headers, 'body': body}
236
237 def _parse_chunked_body(self, raw_body):
238 if isinstance(raw_body, str):
239 raw_body = bytes(raw_body.encode())
240
241 crlf = b'\r\n'
242 chunks = raw_body.split(crlf)
243
244 if len(chunks) < 3:
245 self.fail('Invalid chunked body')
246
247 if chunks.pop() != b'':
248 self.fail('No CRLF at the end of the body')
249
250 try:
251 last_size = int(chunks[-2], 16)
252 except:
253 self.fail('Invalid zero size chunk')
254
255 if last_size != 0 or chunks[-1] != b'':
256 self.fail('Incomplete body')
257
258 body = b''
259 while len(chunks) >= 2:
260 try:
261 size = int(chunks.pop(0), 16)
262 except:
263 self.fail('Invalid chunk size %s' % str(size))
264
265 if size == 0:
266 self.assertEqual(len(chunks), 1, 'last zero size')
267 break
268
269 temp_body = crlf.join(chunks)
270
271 body += temp_body[:size]
272
273 temp_body = temp_body[size + len(crlf) :]
274
275 chunks = temp_body.split(crlf)
276
277 return body
278
279 def _parse_json(self, resp):
280 headers = resp['headers']
281
1import binascii
2import io
3import os
4import re
5import time
6import json
7import socket
8import select
9from unit.main import TestUnit
10
11
12class TestHTTP(TestUnit):
13 def http(self, start_str, **kwargs):
14 sock_type = (
15 'ipv4' if 'sock_type' not in kwargs else kwargs['sock_type']
16 )
17 port = 7080 if 'port' not in kwargs else kwargs['port']
18 url = '/' if 'url' not in kwargs else kwargs['url']
19 http = 'HTTP/1.0' if 'http_10' in kwargs else 'HTTP/1.1'
20
21 headers = (
22 {'Host': 'localhost', 'Connection': 'close'}
23 if 'headers' not in kwargs
24 else kwargs['headers']
25 )
26
27 body = b'' if 'body' not in kwargs else kwargs['body']
28 crlf = '\r\n'
29
30 if 'addr' not in kwargs:
31 addr = '::1' if sock_type == 'ipv6' else '127.0.0.1'
32 else:
33 addr = kwargs['addr']
34
35 sock_types = {
36 'ipv4': socket.AF_INET,
37 'ipv6': socket.AF_INET6,
38 'unix': socket.AF_UNIX,
39 }
40
41 if 'sock' not in kwargs:
42 sock = socket.socket(sock_types[sock_type], socket.SOCK_STREAM)
43
44 if (
45 sock_type == sock_types['ipv4']
46 or sock_type == sock_types['ipv6']
47 ):
48 sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
49
50 if 'wrapper' in kwargs:
51 sock = kwargs['wrapper'](sock)
52
53 connect_args = addr if sock_type == 'unix' else (addr, port)
54 try:
55 sock.connect(connect_args)
56 except ConnectionRefusedError:
57 sock.close()
58 self.fail('Client can\'t connect to the server.')
59
60 else:
61 sock = kwargs['sock']
62
63 if 'raw' not in kwargs:
64 req = ' '.join([start_str, url, http]) + crlf
65
66 if body != b'':
67 if isinstance(body, str):
68 body = body.encode()
69 elif isinstance(body, dict):
70 body, content_type = self.form_encode(body)
71
72 headers['Content-Type'] = content_type
73
74 if 'Content-Length' not in headers:
75 headers['Content-Length'] = len(body)
76
77 for header, value in headers.items():
78 if isinstance(value, list):
79 for v in value:
80 req += header + ': ' + str(v) + crlf
81
82 else:
83 req += header + ': ' + str(value) + crlf
84
85 req = (req + crlf).encode() + body
86
87 else:
88 req = start_str
89
90 sock.sendall(req)
91
92 encoding = 'utf-8' if 'encoding' not in kwargs else kwargs['encoding']
93
94 self.log_out(req, encoding)
95
96 resp = ''
97
98 if 'no_recv' not in kwargs:
99 recvall_kwargs = {}
100
101 if 'read_timeout' in kwargs:
102 recvall_kwargs['read_timeout'] = kwargs['read_timeout']
103
104 if 'read_buffer_size' in kwargs:
105 recvall_kwargs['buff_size'] = kwargs['read_buffer_size']
106
107 resp = self.recvall(sock, **recvall_kwargs).decode(encoding)
108
109 self.log_in(resp)
110
111 if 'raw_resp' not in kwargs:
112 resp = self._resp_to_dict(resp)
113
114 headers = resp.get('headers')
115 if headers and headers.get('Transfer-Encoding') == 'chunked':
116 resp['body'] = self._parse_chunked_body(resp['body']).decode(
117 encoding
118 )
119
120 if 'json' in kwargs:
121 resp = self._parse_json(resp)
122
123 if 'start' not in kwargs:
124 sock.close()
125 return resp
126
127 return (resp, sock)
128
129 def log_out(self, log, encoding):
130 if TestUnit.detailed:
131 print('>>>')
132 log = self.log_truncate(log)
133 try:
134 print(log.decode(encoding, 'ignore'))
135 except UnicodeEncodeError:
136 print(log)
137
138 def log_in(self, log):
139 if TestUnit.detailed:
140 print('<<<')
141 log = self.log_truncate(log)
142 try:
143 print(log)
144 except UnicodeEncodeError:
145 print(log.encode())
146
147 def log_truncate(self, log, limit=1024):
148 len_log = len(log)
149 if len_log > limit:
150 log = log[:limit]
151 appendix = '(...logged %s of %s bytes)' % (limit, len_log)
152
153 if isinstance(log, bytes):
154 appendix = appendix.encode()
155
156 log = log + appendix
157
158 return log
159
160 def delete(self, **kwargs):
161 return self.http('DELETE', **kwargs)
162
163 def get(self, **kwargs):
164 return self.http('GET', **kwargs)
165
166 def head(self, **kwargs):
167 return self.http('HEAD', **kwargs)
168
169 def post(self, **kwargs):
170 return self.http('POST', **kwargs)
171
172 def put(self, **kwargs):
173 return self.http('PUT', **kwargs)
174
175 def recvall(self, sock, **kwargs):
176 timeout_default = 60
177
178 timeout = (
179 timeout_default
180 if 'read_timeout' not in kwargs
181 else kwargs['read_timeout']
182 )
183 buff_size = 4096 if 'buff_size' not in kwargs else kwargs['buff_size']
184
185 data = b''
186 while True:
187 rlist = select.select([sock], [], [], timeout)[0]
188 if not rlist:
189 # For all current cases if the "read_timeout" was changed
190 # than test do not expect to get a response from server.
191 if timeout == timeout_default:
192 self.fail('Can\'t read response from server.')
193 break
194
195 try:
196 part = sock.recv(buff_size)
197 except:
198 break
199
200 data += part
201
202 if not len(part):
203 break
204
205 return data
206
207 def _resp_to_dict(self, resp):
208 m = re.search(r'(.*?\x0d\x0a?)\x0d\x0a?(.*)', resp, re.M | re.S)
209
210 if not m:
211 return {}
212
213 headers_text, body = m.group(1), m.group(2)
214
215 p = re.compile('(.*?)\x0d\x0a?', re.M | re.S)
216 headers_lines = p.findall(headers_text)
217
218 status = re.search(
219 r'^HTTP\/\d\.\d\s(\d+)|$', headers_lines.pop(0)
220 ).group(1)
221
222 headers = {}
223 for line in headers_lines:
224 m = re.search(r'(.*)\:\s(.*)', line)
225
226 if m.group(1) not in headers:
227 headers[m.group(1)] = m.group(2)
228
229 elif isinstance(headers[m.group(1)], list):
230 headers[m.group(1)].append(m.group(2))
231
232 else:
233 headers[m.group(1)] = [headers[m.group(1)], m.group(2)]
234
235 return {'status': int(status), 'headers': headers, 'body': body}
236
237 def _parse_chunked_body(self, raw_body):
238 if isinstance(raw_body, str):
239 raw_body = bytes(raw_body.encode())
240
241 crlf = b'\r\n'
242 chunks = raw_body.split(crlf)
243
244 if len(chunks) < 3:
245 self.fail('Invalid chunked body')
246
247 if chunks.pop() != b'':
248 self.fail('No CRLF at the end of the body')
249
250 try:
251 last_size = int(chunks[-2], 16)
252 except:
253 self.fail('Invalid zero size chunk')
254
255 if last_size != 0 or chunks[-1] != b'':
256 self.fail('Incomplete body')
257
258 body = b''
259 while len(chunks) >= 2:
260 try:
261 size = int(chunks.pop(0), 16)
262 except:
263 self.fail('Invalid chunk size %s' % str(size))
264
265 if size == 0:
266 self.assertEqual(len(chunks), 1, 'last zero size')
267 break
268
269 temp_body = crlf.join(chunks)
270
271 body += temp_body[:size]
272
273 temp_body = temp_body[size + len(crlf) :]
274
275 chunks = temp_body.split(crlf)
276
277 return body
278
279 def _parse_json(self, resp):
280 headers = resp['headers']
281
282 self.assertIn('Content-Type', headers, 'Content-Type header set')
283 self.assertEqual(
284 headers['Content-Type'],
285 'application/json',
286 'Content-Type header is application/json',
287 )
282 self.assertIn('Content-Type', headers)
283 self.assertEqual(headers['Content-Type'], 'application/json')
288
289 resp['body'] = json.loads(resp['body'])
290
291 return resp
292
293 def getjson(self, **kwargs):
294 return self.get(json=True, **kwargs)
295
296 def waitforsocket(self, port):
297 ret = False
298
299 for i in range(50):
300 try:
301 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
302 sock.connect(('127.0.0.1', port))
303 ret = True
304 break
305 except:
306 sock.close()
307 time.sleep(0.1)
308
309 sock.close()
310
311 self.assertTrue(ret, 'socket connected')
312
313 def form_encode(self, fields):
314 is_multipart = False
315
316 for _, value in fields.items():
317 if isinstance(value, dict):
318 is_multipart = True
319 break
320
321 if is_multipart:
322 body, content_type = self.multipart_encode(fields)
323
324 else:
325 body, content_type = self.form_url_encode(fields)
326
327 return body, content_type
328
329 def form_url_encode(self, fields):
330 data = "&".join("%s=%s" % (name, value)
331 for name, value in fields.items()).encode()
332 return data, 'application/x-www-form-urlencoded'
333
334 def multipart_encode(self, fields):
335 boundary = binascii.hexlify(os.urandom(16)).decode('ascii')
336
337 body = ''
338
339 for field, value in fields.items():
340 filename = ''
341 datatype = ''
342
343 if isinstance(value, dict):
344 datatype = 'text/plain'
345 filename = value['filename']
346
347 if value.get('type'):
348 datatype = value['type']
349
350 if not isinstance(value['data'], io.IOBase):
351 self.fail('multipart encoding of file requires a stream.')
352
353 data = value['data'].read()
354
355 elif isinstance(value, str):
356 data = value
357
358 else:
359 self.fail('multipart requires a string or stream data')
360
361 body += (
362 "--%s\r\nContent-Disposition: form-data; name=\"%s\""
363 ) % (boundary, field)
364
365 if filename != '':
366 body += "; filename=\"%s\"" % filename
367
368 body += "\r\n"
369
370 if datatype != '':
371 body += "Content-Type: %s\r\n" % datatype
372
373 body += "\r\n%s\r\n" % data
374
375 body += "--%s--\r\n" % boundary
376
377 return body.encode(), "multipart/form-data; boundary=%s" % boundary
284
285 resp['body'] = json.loads(resp['body'])
286
287 return resp
288
289 def getjson(self, **kwargs):
290 return self.get(json=True, **kwargs)
291
292 def waitforsocket(self, port):
293 ret = False
294
295 for i in range(50):
296 try:
297 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
298 sock.connect(('127.0.0.1', port))
299 ret = True
300 break
301 except:
302 sock.close()
303 time.sleep(0.1)
304
305 sock.close()
306
307 self.assertTrue(ret, 'socket connected')
308
309 def form_encode(self, fields):
310 is_multipart = False
311
312 for _, value in fields.items():
313 if isinstance(value, dict):
314 is_multipart = True
315 break
316
317 if is_multipart:
318 body, content_type = self.multipart_encode(fields)
319
320 else:
321 body, content_type = self.form_url_encode(fields)
322
323 return body, content_type
324
325 def form_url_encode(self, fields):
326 data = "&".join("%s=%s" % (name, value)
327 for name, value in fields.items()).encode()
328 return data, 'application/x-www-form-urlencoded'
329
330 def multipart_encode(self, fields):
331 boundary = binascii.hexlify(os.urandom(16)).decode('ascii')
332
333 body = ''
334
335 for field, value in fields.items():
336 filename = ''
337 datatype = ''
338
339 if isinstance(value, dict):
340 datatype = 'text/plain'
341 filename = value['filename']
342
343 if value.get('type'):
344 datatype = value['type']
345
346 if not isinstance(value['data'], io.IOBase):
347 self.fail('multipart encoding of file requires a stream.')
348
349 data = value['data'].read()
350
351 elif isinstance(value, str):
352 data = value
353
354 else:
355 self.fail('multipart requires a string or stream data')
356
357 body += (
358 "--%s\r\nContent-Disposition: form-data; name=\"%s\""
359 ) % (boundary, field)
360
361 if filename != '':
362 body += "; filename=\"%s\"" % filename
363
364 body += "\r\n"
365
366 if datatype != '':
367 body += "Content-Type: %s\r\n" % datatype
368
369 body += "\r\n%s\r\n" % data
370
371 body += "--%s--\r\n" % boundary
372
373 return body.encode(), "multipart/form-data; boundary=%s" % boundary