1import re 2import socket 3import select 4from unit.main import TestUnit 5 6 7class TestHTTP(TestUnit): 8 def http(self, start_str, **kwargs): 9 sock_type = ( 10 'ipv4' if 'sock_type' not in kwargs else kwargs['sock_type'] 11 ) 12 port = 7080 if 'port' not in kwargs else kwargs['port'] 13 url = '/' if 'url' not in kwargs else kwargs['url'] 14 http = 'HTTP/1.0' if 'http_10' in kwargs else 'HTTP/1.1' 15 read_buffer_size = ( 16 4096 17 if 'read_buffer_size' not in kwargs 18 else kwargs['read_buffer_size'] 19 ) 20 21 headers = ( 22 {'Host': 'localhost', 'Connection': 'close'} 23 if 'headers' not in kwargs 24 else kwargs['headers'] 25 ) 26 27 body = b'' if 'body' not in kwargs else kwargs['body'] 28 crlf = '\r\n' 29 30 if 'addr' not in kwargs: 31 addr = '::1' if sock_type == 'ipv6' else '127.0.0.1' 32 else: 33 addr = kwargs['addr'] 34 35 sock_types = { 36 'ipv4': socket.AF_INET, 37 'ipv6': socket.AF_INET6, 38 'unix': socket.AF_UNIX, 39 } 40 41 if 'sock' not in kwargs: 42 sock = socket.socket(sock_types[sock_type], socket.SOCK_STREAM) 43 44 if ( 45 sock_type == sock_types['ipv4'] 46 or sock_type == sock_types['ipv6'] 47 ): 48 sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) 49 50 if 'wrapper' in kwargs: 51 sock = kwargs['wrapper'](sock) 52 53 connect_args = addr if sock_type == 'unix' else (addr, port) 54 try: 55 sock.connect(connect_args) 56 except ConnectionRefusedError: 57 sock.close() 58 return None 59 60 else: 61 sock = kwargs['sock'] 62 63 if 'raw' not in kwargs: 64 req = ' '.join([start_str, url, http]) + crlf 65 66 if body != b'': 67 if isinstance(body, str): 68 body = body.encode() 69 70 if 'Content-Length' not in headers: 71 headers['Content-Length'] = len(body) 72 73 for header, value in headers.items(): 74 if isinstance(value, list): 75 for v in value: 76 req += header + ': ' + str(v) + crlf 77 78 else: 79 req += header + ': ' + str(value) + crlf 80 81 req = (req + crlf).encode() + body 82 83 else: 84 req = start_str 85 86 sock.sendall(req) 87 88 if TestUnit.detailed: 89 print('>>>') 90 try: 91 print(req.decode('utf-8', 'ignore')) 92 except UnicodeEncodeError: 93 print(req) 94 95 resp = '' 96 97 if 'no_recv' not in kwargs: 98 enc = 'utf-8' if 'encoding' not in kwargs else kwargs['encoding'] 99 read_timeout = ( 100 30 if 'read_timeout' not in kwargs else kwargs['read_timeout'] 101 ) 102 resp = self.recvall( 103 sock, read_timeout=read_timeout, buff_size=read_buffer_size 104 ).decode(enc) 105 106 if TestUnit.detailed: 107 print('<<<') 108 try: 109 print(resp) 110 except UnicodeEncodeError: 111 print(resp.encode()) 112 113 if 'raw_resp' not in kwargs: 114 resp = self._resp_to_dict(resp) 115 116 if 'start' not in kwargs: 117 sock.close() 118 return resp 119 120 return (resp, sock) 121 122 def delete(self, **kwargs): 123 return self.http('DELETE', **kwargs) 124 125 def get(self, **kwargs): 126 return self.http('GET', **kwargs) 127 128 def head(self, **kwargs): 129 return self.http('HEAD', **kwargs) 130 131 def post(self, **kwargs): 132 return self.http('POST', **kwargs) 133 134 def put(self, **kwargs): 135 return self.http('PUT', **kwargs) 136 137 def recvall(self, sock, read_timeout=30, buff_size=4096): 138 data = b'' 139 while select.select([sock], [], [], read_timeout)[0]: 140 try: 141 part = sock.recv(buff_size) 142 except: 143 break 144 145 data += part 146 147 if not len(part): 148 break 149 150 return data 151 152 def _resp_to_dict(self, resp): 153 m = re.search('(.*?\x0d\x0a?)\x0d\x0a?(.*)', resp, re.M | re.S) 154 155 if not m: 156 return {} 157 158 headers_text, body = m.group(1), m.group(2) 159 160 p = re.compile('(.*?)\x0d\x0a?', re.M | re.S) 161 headers_lines = p.findall(headers_text) 162 163 status = re.search( 164 '^HTTP\/\d\.\d\s(\d+)|$', headers_lines.pop(0) 165 ).group(1) 166 167 headers = {} 168 for line in headers_lines: 169 m = re.search('(.*)\:\s(.*)', line) 170 171 if m.group(1) not in headers: 172 headers[m.group(1)] = m.group(2) 173 174 elif isinstance(headers[m.group(1)], list): 175 headers[m.group(1)].append(m.group(2)) 176 177 else: 178 headers[m.group(1)] = [headers[m.group(1)], m.group(2)] 179 180 return {'status': int(status), 'headers': headers, 'body': body} 181