Deleted Added
1import binascii
2import io
3import os
4import re
5import time
6import json
7import socket
8import select
9from unit.main import TestUnit
10
11
12class TestHTTP(TestUnit):
13 def http(self, start_str, **kwargs):
14 sock_type = (
15 'ipv4' if 'sock_type' not in kwargs else kwargs['sock_type']
16 )
17 port = 7080 if 'port' not in kwargs else kwargs['port']
18 url = '/' if 'url' not in kwargs else kwargs['url']
19 http = 'HTTP/1.0' if 'http_10' in kwargs else 'HTTP/1.1'
20 read_buffer_size = (
21 4096
22 if 'read_buffer_size' not in kwargs
23 else kwargs['read_buffer_size']
24 )
25
26 headers = (
27 {'Host': 'localhost', 'Connection': 'close'}
28 if 'headers' not in kwargs
29 else kwargs['headers']
30 )
31
32 body = b'' if 'body' not in kwargs else kwargs['body']

--- 63 unchanged lines hidden (view full) ---

96
97 encoding = 'utf-8' if 'encoding' not in kwargs else kwargs['encoding']
98
99 self.log_out(req, encoding)
100
101 resp = ''
102
103 if 'no_recv' not in kwargs:
104 read_timeout = (
105 30 if 'read_timeout' not in kwargs else kwargs['read_timeout']
106 )
107 resp = self.recvall(
108 sock, read_timeout=read_timeout, buff_size=read_buffer_size
109 ).decode(encoding)
110
111 self.log_in(resp)
112
113 if 'raw_resp' not in kwargs:
114 resp = self._resp_to_dict(resp)
115
116 headers = resp.get('headers')
117 if headers and headers.get('Transfer-Encoding') == 'chunked':
118 resp['body'] = self._parse_chunked_body(resp['body']).decode(

--- 50 unchanged lines hidden (view full) ---

169 return self.http('HEAD', **kwargs)
170
171 def post(self, **kwargs):
172 return self.http('POST', **kwargs)
173
174 def put(self, **kwargs):
175 return self.http('PUT', **kwargs)
176
177 def recvall(self, sock, read_timeout=30, buff_size=4096):
178 data = b''
179 while select.select([sock], [], [], read_timeout)[0]:
180 try:
181 part = sock.recv(buff_size)
182 except:
183 break
184
185 data += part
186
187 if not len(part):

--- 175 unchanged lines hidden ---