1import re 2import time 3import json 4import socket 5import select 6from unit.main import TestUnit 7 8 9class TestHTTP(TestUnit): 10 def http(self, start_str, **kwargs): 11 sock_type = ( 12 'ipv4' if 'sock_type' not in kwargs else kwargs['sock_type'] 13 ) 14 port = 7080 if 'port' not in kwargs else kwargs['port'] 15 url = '/' if 'url' not in kwargs else kwargs['url'] 16 http = 'HTTP/1.0' if 'http_10' in kwargs else 'HTTP/1.1' 17 read_buffer_size = ( 18 4096 19 if 'read_buffer_size' not in kwargs 20 else kwargs['read_buffer_size'] 21 ) 22 23 headers = ( 24 {'Host': 'localhost', 'Connection': 'close'} 25 if 'headers' not in kwargs 26 else kwargs['headers'] 27 ) 28 29 body = b'' if 'body' not in kwargs else kwargs['body'] 30 crlf = '\r\n' 31 32 if 'addr' not in kwargs: 33 addr = '::1' if sock_type == 'ipv6' else '127.0.0.1' 34 else: 35 addr = kwargs['addr'] 36 37 sock_types = { 38 'ipv4': socket.AF_INET, 39 'ipv6': socket.AF_INET6, 40 'unix': socket.AF_UNIX, 41 } 42 43 if 'sock' not in kwargs: 44 sock = socket.socket(sock_types[sock_type], socket.SOCK_STREAM) 45 46 if ( 47 sock_type == sock_types['ipv4'] 48 or sock_type == sock_types['ipv6'] 49 ): 50 sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) 51 52 if 'wrapper' in kwargs: 53 sock = kwargs['wrapper'](sock) 54 55 connect_args = addr if sock_type == 'unix' else (addr, port) 56 try: 57 sock.connect(connect_args) 58 except ConnectionRefusedError: 59 sock.close() 60 return None 61 62 else: 63 sock = kwargs['sock'] 64 65 if 'raw' not in kwargs: 66 req = ' '.join([start_str, url, http]) + crlf 67 68 if body != b'': 69 if isinstance(body, str): 70 body = body.encode() 71 72 if 'Content-Length' not in headers: 73 headers['Content-Length'] = len(body) 74 75 for header, value in headers.items(): 76 if isinstance(value, list): 77 for v in value: 78 req += header + ': ' + str(v) + crlf 79 80 else: 81 req += header + ': ' + str(value) + crlf 82 83 req = (req + crlf).encode() + body 84 85 else: 86 req = start_str 87 88 sock.sendall(req) 89 90 encoding = 'utf-8' if 'encoding' not in kwargs else kwargs['encoding'] 91 92 if TestUnit.detailed: 93 print('>>>') 94 try: 95 print(req.decode(encoding, 'ignore')) 96 except UnicodeEncodeError: 97 print(req) 98 99 resp = '' 100 101 if 'no_recv' not in kwargs: 102 read_timeout = ( 103 30 if 'read_timeout' not in kwargs else kwargs['read_timeout'] 104 ) 105 resp = self.recvall( 106 sock, read_timeout=read_timeout, buff_size=read_buffer_size 107 ).decode(encoding) 108 109 if TestUnit.detailed: 110 print('<<<') 111 try: 112 print(resp) 113 except UnicodeEncodeError: 114 print(resp.encode()) 115 116 if 'raw_resp' not in kwargs: 117 resp = self._resp_to_dict(resp) 118 119 headers = resp.get('headers') 120 if headers and headers.get('Transfer-Encoding') == 'chunked': 121 resp['body'] = self._parse_chunked_body(resp['body']).decode( 122 encoding 123 ) 124 125 if 'start' not in kwargs: 126 sock.close() 127 return resp 128 129 return (resp, sock) 130 131 def delete(self, **kwargs): 132 return self.http('DELETE', **kwargs) 133 134 def get(self, **kwargs): 135 return self.http('GET', **kwargs) 136 137 def head(self, **kwargs): 138 return self.http('HEAD', **kwargs) 139 140 def post(self, **kwargs): 141 return self.http('POST', **kwargs) 142 143 def put(self, **kwargs): 144 return self.http('PUT', **kwargs) 145 146 def recvall(self, sock, read_timeout=30, buff_size=4096): 147 data = b'' 148 while select.select([sock], [], [], read_timeout)[0]: 149 try: 150 part = sock.recv(buff_size) 151 except: 152 break 153 154 data += part 155 156 if not len(part): 157 break 158 159 return data 160 161 def _resp_to_dict(self, resp): 162 m = re.search(r'(.*?\x0d\x0a?)\x0d\x0a?(.*)', resp, re.M | re.S) 163 164 if not m: 165 return {} 166 167 headers_text, body = m.group(1), m.group(2) 168 169 p = re.compile('(.*?)\x0d\x0a?', re.M | re.S) 170 headers_lines = p.findall(headers_text) 171 172 status = re.search( 173 r'^HTTP\/\d\.\d\s(\d+)|$', headers_lines.pop(0) 174 ).group(1) 175 176 headers = {} 177 for line in headers_lines: 178 m = re.search(r'(.*)\:\s(.*)', line) 179 180 if m.group(1) not in headers: 181 headers[m.group(1)] = m.group(2) 182 183 elif isinstance(headers[m.group(1)], list): 184 headers[m.group(1)].append(m.group(2)) 185 186 else: 187 headers[m.group(1)] = [headers[m.group(1)], m.group(2)] 188 189 return {'status': int(status), 'headers': headers, 'body': body} 190 191 def _parse_chunked_body(self, raw_body): 192 if isinstance(raw_body, str): 193 raw_body = bytes(raw_body.encode()) 194 195 crlf = b'\r\n' 196 chunks = raw_body.split(crlf) 197 198 if len(chunks) < 3: 199 self.fail('Invalid chunked body') 200 201 if chunks.pop() != b'': 202 self.fail('No CRLF at the end of the body') 203 204 try: 205 last_size = int(chunks[-2], 16) 206 except: 207 self.fail('Invalid zero size chunk') 208 209 if last_size != 0 or chunks[-1] != b'': 210 self.fail('Incomplete body') 211 212 body = b'' 213 while len(chunks) >= 2: 214 try: 215 size = int(chunks.pop(0), 16) 216 except: 217 self.fail('Invalid chunk size %s' % str(size)) 218 219 if size == 0: 220 self.assertEqual(len(chunks), 1, 'last zero size') 221 break 222 223 temp_body = crlf.join(chunks) 224 225 body += temp_body[:size] 226 227 temp_body = temp_body[size + len(crlf) :] 228 229 chunks = temp_body.split(crlf) 230 231 return body 232 233 def waitforsocket(self, port): 234 ret = False 235 236 for i in range(50): 237 try: 238 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 239 sock.connect(('127.0.0.1', port)) 240 ret = True 241 break 242 except: 243 sock.close() 244 time.sleep(0.1) 245 246 sock.close() 247 248 self.assertTrue(ret, 'socket connected') 249