http.py (1596:b7e2d4d92624) http.py (1608:283a8bcf8d2e)
1import binascii
2import io
3import json
4import os
5import pytest
6import re
7import select
8import socket
9import time
10
11from unit.main import TestUnit
12from conftest import option
13
14
15class TestHTTP(TestUnit):
16 def http(self, start_str, **kwargs):
1import binascii
2import io
3import json
4import os
5import pytest
6import re
7import select
8import socket
9import time
10
11from unit.main import TestUnit
12from conftest import option
13
14
15class TestHTTP(TestUnit):
16 def http(self, start_str, **kwargs):
17 sock_type = (
18 'ipv4' if 'sock_type' not in kwargs else kwargs['sock_type']
19 )
20 port = 7080 if 'port' not in kwargs else kwargs['port']
21 url = '/' if 'url' not in kwargs else kwargs['url']
17 sock_type = kwargs.get('sock_type', 'ipv4')
18 port = kwargs.get('port', 7080)
19 url = kwargs.get('url', '/')
22 http = 'HTTP/1.0' if 'http_10' in kwargs else 'HTTP/1.1'
23
20 http = 'HTTP/1.0' if 'http_10' in kwargs else 'HTTP/1.1'
21
24 headers = (
25 {'Host': 'localhost', 'Connection': 'close'}
26 if 'headers' not in kwargs
27 else kwargs['headers']
28 )
22 headers = kwargs.get('headers',
23 {'Host': 'localhost', 'Connection': 'close'})
29
24
30 body = b'' if 'body' not in kwargs else kwargs['body']
25 body = kwargs.get('body', b'')
31 crlf = '\r\n'
32
33 if 'addr' not in kwargs:
34 addr = '::1' if sock_type == 'ipv6' else '127.0.0.1'
35 else:
36 addr = kwargs['addr']
37
38 sock_types = {

--- 48 unchanged lines hidden (view full) ---

87
88 req = (req + crlf).encode() + body
89
90 else:
91 req = start_str
92
93 sock.sendall(req)
94
26 crlf = '\r\n'
27
28 if 'addr' not in kwargs:
29 addr = '::1' if sock_type == 'ipv6' else '127.0.0.1'
30 else:
31 addr = kwargs['addr']
32
33 sock_types = {

--- 48 unchanged lines hidden (view full) ---

82
83 req = (req + crlf).encode() + body
84
85 else:
86 req = start_str
87
88 sock.sendall(req)
89
95 encoding = 'utf-8' if 'encoding' not in kwargs else kwargs['encoding']
90 encoding = kwargs.get('encoding', 'utf-8')
96
97 self.log_out(req, encoding)
98
99 resp = ''
100
101 if 'no_recv' not in kwargs:
102 recvall_kwargs = {}
103

--- 69 unchanged lines hidden (view full) ---

173 return self.http('POST', **kwargs)
174
175 def put(self, **kwargs):
176 return self.http('PUT', **kwargs)
177
178 def recvall(self, sock, **kwargs):
179 timeout_default = 60
180
91
92 self.log_out(req, encoding)
93
94 resp = ''
95
96 if 'no_recv' not in kwargs:
97 recvall_kwargs = {}
98

--- 69 unchanged lines hidden (view full) ---

168 return self.http('POST', **kwargs)
169
170 def put(self, **kwargs):
171 return self.http('PUT', **kwargs)
172
173 def recvall(self, sock, **kwargs):
174 timeout_default = 60
175
181 timeout = (
182 timeout_default
183 if 'read_timeout' not in kwargs
184 else kwargs['read_timeout']
185 )
186 buff_size = 4096 if 'buff_size' not in kwargs else kwargs['buff_size']
176 timeout = kwargs.get('read_timeout', timeout_default)
177 buff_size = kwargs.get('buff_size', 4096)
187
188 data = b''
189 while True:
190 rlist = select.select([sock], [], [], timeout)[0]
191 if not rlist:
192 # For all current cases if the "read_timeout" was changed
193 # than test do not expect to get a response from server.
194 if timeout == timeout_default:

--- 182 unchanged lines hidden ---
178
179 data = b''
180 while True:
181 rlist = select.select([sock], [], [], timeout)[0]
182 if not rlist:
183 # For all current cases if the "read_timeout" was changed
184 # than test do not expect to get a response from server.
185 if timeout == timeout_default:

--- 182 unchanged lines hidden ---