Deleted Added
1import re
2import time
3import json
4import socket
5import select
6from unit.main import TestUnit
7
8

--- 108 unchanged lines hidden (view full) ---

117 resp = self._resp_to_dict(resp)
118
119 headers = resp.get('headers')
120 if headers and headers.get('Transfer-Encoding') == 'chunked':
121 resp['body'] = self._parse_chunked_body(resp['body']).decode(
122 encoding
123 )
124
125 if 'start' not in kwargs:
126 sock.close()
127 return resp
128
129 return (resp, sock)
130
131 def delete(self, **kwargs):
132 return self.http('DELETE', **kwargs)

--- 92 unchanged lines hidden (view full) ---

225 body += temp_body[:size]
226
227 temp_body = temp_body[size + len(crlf) :]
228
229 chunks = temp_body.split(crlf)
230
231 return body
232
233 def waitforsocket(self, port):
234 ret = False
235
236 for i in range(50):
237 try:
238 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
239 sock.connect(('127.0.0.1', port))
240 ret = True
241 break
242 except:
243 sock.close()
244 time.sleep(0.1)
245
246 sock.close()
247
248 self.assertTrue(ret, 'socket connected')