1import re 2import select 3import socket 4import time 5 6from conftest import option 7from conftest import run_process 8from conftest import waitforsocket 9from unit.applications.lang.python import TestApplicationPython 10 11 12class TestProxyChunked(TestApplicationPython): 13 prerequisites = {'modules': {'python': 'any'}} 14 15 SERVER_PORT = 7999 16 17 @staticmethod 18 def run_server(server_port, temp_dir): 19 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 20 sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) 21 sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) 22 23 server_address = ('127.0.0.1', server_port) 24 sock.bind(server_address) 25 sock.listen(10) 26 27 def recvall(sock): 28 buff_size = 4096 * 4096 29 data = b'' 30 while True: 31 rlist = select.select([sock], [], [], 0.1) 32 33 if not rlist[0]: 34 break 35 36 part = sock.recv(buff_size) 37 data += part 38 39 if not len(part): 40 break 41 42 return data 43 44 while True: 45 connection, client_address = sock.accept() 46 47 req = """HTTP/1.1 200 OK\r\nTransfer-Encoding: chunked""" 48 49 data = recvall(connection).decode() 50 51 m = re.search('\x0d\x0a\x0d\x0a(.*)', data, re.M | re.S) 52 if m is not None: 53 body = m.group(1) 54 55 for line in re.split('\r\n', body): 56 add = '' 57 m1 = re.search(r'(.*)\sX\s(\d+)', line) 58 59 if m1 is not None: 60 add = m1.group(1) * int(m1.group(2)) 61 else: 62 add = line 63 64 req = req + add + '\r\n' 65 66 for chunk in re.split(r'([@#])', req): 67 if chunk == '@' or chunk == '#': 68 if chunk == '#': 69 time.sleep(0.1) 70 continue 71 72 connection.sendall(chunk.encode()) 73 74 connection.close() 75 76 def chunks(self, chunks): 77 body = '\r\n\r\n' 78 79 for l, c in chunks: 80 body = body + l + '\r\n' + c + '\r\n' 81 82 return body + '0\r\n\r\n' 83 84 def get_http10(self, *args, **kwargs): 85 return self.get(*args, http_10=True, **kwargs) 86 87 def setup_method(self): 88 run_process(self.run_server, self.SERVER_PORT, option.temp_dir) 89 waitforsocket(self.SERVER_PORT) 90 91 assert 'success' in self.conf( 92 { 93 "listeners": {"*:7080": {"pass": "routes"},}, 94 "routes": [ 95 { 96 "action": { 97 "proxy": "http://127.0.0.1:" 98 + str(self.SERVER_PORT) 99 } 100 } 101 ], 102 } 103 ), 'proxy initial configuration' 104 105 def test_proxy_chunked(self): 106 for _ in range(10): 107 assert self.get_http10(body='\r\n\r\n0\r\n\r\n')['status'] == 200 108 109 def test_proxy_chunked_body(self): 110 part = '0123456789abcdef' 111 112 assert ( 113 self.get_http10(body=self.chunks([('1000', part + ' X 256')]))[ 114 'body' 115 ] 116 == part * 256 117 ) 118 assert ( 119 self.get_http10(body=self.chunks([('100000', part + ' X 65536')]))[ 120 'body' 121 ] 122 == part * 65536 123 ) 124 assert ( 125 self.get_http10( 126 body=self.chunks([('1000000', part + ' X 1048576')]), 127 read_buffer_size=4096 * 4096, 128 )['body'] 129 == part * 1048576 130 ) 131 132 assert ( 133 self.get_http10( 134 body=self.chunks( 135 [('1000', part + ' X 256'), ('1000', part + ' X 256')] 136 ) 137 )['body'] 138 == part * 256 * 2 139 ) 140 assert ( 141 self.get_http10( 142 body=self.chunks( 143 [ 144 ('100000', part + ' X 65536'), 145 ('100000', part + ' X 65536'), 146 ] 147 ) 148 )['body'] 149 == part * 65536 * 2 150 ) 151 assert ( 152 self.get_http10( 153 body=self.chunks( 154 [ 155 ('1000000', part + ' X 1048576'), 156 ('1000000', part + ' X 1048576'), 157 ] 158 ), 159 read_buffer_size=4096 * 4096, 160 )['body'] 161 == part * 1048576 * 2 162 ) 163 164 def test_proxy_chunked_fragmented(self): 165 part = '0123456789abcdef' 166 167 assert ( 168 self.get_http10( 169 body=self.chunks( 170 [('1', hex(i % 16)[2:]) for i in range(4096)] 171 ), 172 )['body'] 173 == part * 256 174 ) 175 176 def test_proxy_chunked_send(self): 177 assert self.get_http10(body='\r\n\r\n@0@\r\n\r\n')['status'] == 200 178 assert ( 179 self.get_http10( 180 body='\r@\n\r\n2\r@\na@b\r\n2\r\ncd@\r\n0\r@\n\r\n' 181 )['body'] 182 == 'abcd' 183 ) 184 assert ( 185 self.get_http10( 186 body='\r\n\r\n2\r#\na#b\r\n##2\r\n#cd\r\n0\r\n#\r#\n' 187 )['body'] 188 == 'abcd' 189 ) 190 191 def test_proxy_chunked_invalid(self): 192 def check_invalid(body): 193 assert self.get_http10(body=body)['status'] != 200 194 195 check_invalid('\r\n\r0') 196 check_invalid('\r\n\r\n\r0') 197 check_invalid('\r\n\r\n\r\n0') 198 check_invalid('\r\nContent-Length: 5\r\n\r\n0\r\n\r\n') 199 check_invalid('\r\n\r\n1\r\nXX\r\n0\r\n\r\n') 200 check_invalid('\r\n\r\n2\r\nX\r\n0\r\n\r\n') 201 check_invalid('\r\n\r\nH\r\nXX\r\n0\r\n\r\n') 202 check_invalid('\r\n\r\n0\r\nX') 203 204 resp = self.get_http10(body='\r\n\r\n65#\r\nA X 100') 205 assert resp['status'] == 200, 'incomplete chunk status' 206 assert resp['body'][-5:] != '0\r\n\r\n', 'incomplete chunk' 207 208 resp = self.get_http10(body='\r\n\r\n64#\r\nA X 100') 209 assert resp['status'] == 200, 'no zero chunk status' 210 assert resp['body'][-5:] != '0\r\n\r\n', 'no zero chunk' 211 212 assert ( 213 self.get_http10(body='\r\n\r\n80000000\r\nA X 100')['status'] 214 == 200 215 ) 216 assert ( 217 self.get_http10(body='\r\n\r\n10000000000000000\r\nA X 100')[ 218 'status' 219 ] 220 == 502 221 ) 222 assert ( 223 len( 224 self.get_http10( 225 body='\r\n\r\n1000000\r\nA X 1048576\r\n1000000\r\nA X 100', 226 read_buffer_size=4096 * 4096, 227 )['body'] 228 ) 229 >= 1048576 230 ) 231 assert ( 232 len( 233 self.get_http10( 234 body='\r\n\r\n1000000\r\nA X 1048576\r\nXXX\r\nA X 100', 235 read_buffer_size=4096 * 4096, 236 )['body'] 237 ) 238 >= 1048576 239 ) 240