1import re 2import socket 3import time 4 5import pytest 6 7from conftest import run_process 8from unit.applications.lang.python import TestApplicationPython 9from unit.option import option 10from unit.utils import waitforsocket 11 12 13class TestProxy(TestApplicationPython): 14 prerequisites = {'modules': {'python': 'any'}} 15 16 SERVER_PORT = 7999 17 18 @staticmethod 19 def run_server(server_port): 20 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 21 sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) 22 23 server_address = ('', server_port) 24 sock.bind(server_address) 25 sock.listen(5) 26 27 def recvall(sock): 28 buff_size = 4096 29 data = b'' 30 while True: 31 part = sock.recv(buff_size) 32 data += part 33 if len(part) < buff_size: 34 break 35 return data 36 37 req = b"""HTTP/1.1 200 OK 38Content-Length: 10 39 40""" 41 42 while True: 43 connection, client_address = sock.accept() 44 45 data = recvall(connection).decode() 46 47 to_send = req 48 49 m = re.search(r'X-Len: (\d+)', data) 50 if m: 51 to_send += b'X' * int(m.group(1)) 52 53 connection.sendall(to_send) 54 55 connection.close() 56 57 def get_http10(self, *args, **kwargs): 58 return self.get(*args, http_10=True, **kwargs) 59 60 def post_http10(self, *args, **kwargs): 61 return self.post(*args, http_10=True, **kwargs) 62 63 def setup_method(self): 64 run_process(self.run_server, self.SERVER_PORT) 65 waitforsocket(self.SERVER_PORT) 66 67 assert 'success' in self.conf( 68 { 69 "listeners": { 70 "*:7080": {"pass": "routes"}, 71 "*:7081": {"pass": "applications/mirror"}, 72 }, 73 "routes": [{"action": {"proxy": "http://127.0.0.1:7081"}}], 74 "applications": { 75 "mirror": { 76 "type": "python", 77 "processes": {"spare": 0}, 78 "path": option.test_dir + "/python/mirror", 79 "working_directory": option.test_dir 80 + "/python/mirror", 81 "module": "wsgi", 82 }, 83 "custom_header": { 84 "type": "python", 85 "processes": {"spare": 0}, 86 "path": option.test_dir + "/python/custom_header", 87 "working_directory": option.test_dir 88 + "/python/custom_header", 89 "module": "wsgi", 90 }, 91 "delayed": { 92 "type": "python", 93 "processes": {"spare": 0}, 94 "path": option.test_dir + "/python/delayed", 95 "working_directory": option.test_dir 96 + "/python/delayed", 97 "module": "wsgi", 98 }, 99 }, 100 } 101 ), 'proxy initial configuration' 102 103 def test_proxy_http10(self): 104 for _ in range(10): 105 assert self.get_http10()['status'] == 200, 'status' 106 107 def test_proxy_chain(self): 108 assert 'success' in self.conf( 109 { 110 "listeners": { 111 "*:7080": {"pass": "routes/first"}, 112 "*:7081": {"pass": "routes/second"}, 113 "*:7082": {"pass": "routes/third"}, 114 "*:7083": {"pass": "routes/fourth"}, 115 "*:7084": {"pass": "routes/fifth"}, 116 "*:7085": {"pass": "applications/mirror"}, 117 }, 118 "routes": { 119 "first": [{"action": {"proxy": "http://127.0.0.1:7081"}}], 120 "second": [{"action": {"proxy": "http://127.0.0.1:7082"}}], 121 "third": [{"action": {"proxy": "http://127.0.0.1:7083"}}], 122 "fourth": [{"action": {"proxy": "http://127.0.0.1:7084"}}], 123 "fifth": [{"action": {"proxy": "http://127.0.0.1:7085"}}], 124 }, 125 "applications": { 126 "mirror": { 127 "type": "python", 128 "processes": {"spare": 0}, 129 "path": option.test_dir + "/python/mirror", 130 "working_directory": option.test_dir 131 + "/python/mirror", 132 "module": "wsgi", 133 } 134 }, 135 } 136 ), 'proxy chain configuration' 137 138 assert self.get_http10()['status'] == 200, 'status' 139 140 def test_proxy_body(self): 141 payload = '0123456789' 142 for _ in range(10): 143 resp = self.post_http10(body=payload) 144 145 assert resp['status'] == 200, 'status' 146 assert resp['body'] == payload, 'body' 147 148 payload = 'X' * 4096 149 for _ in range(10): 150 resp = self.post_http10(body=payload) 151 152 assert resp['status'] == 200, 'status' 153 assert resp['body'] == payload, 'body' 154 155 payload = 'X' * 4097 156 for _ in range(10): 157 resp = self.post_http10(body=payload) 158 159 assert resp['status'] == 200, 'status' 160 assert resp['body'] == payload, 'body' 161 162 payload = 'X' * 4096 * 256 163 for _ in range(10): 164 resp = self.post_http10(body=payload, read_buffer_size=4096 * 128) 165 166 assert resp['status'] == 200, 'status' 167 assert resp['body'] == payload, 'body' 168 169 payload = 'X' * 4096 * 257 170 for _ in range(10): 171 resp = self.post_http10(body=payload, read_buffer_size=4096 * 128) 172 173 assert resp['status'] == 200, 'status' 174 assert resp['body'] == payload, 'body' 175 176 self.conf({'http': {'max_body_size': 32 * 1024 * 1024}}, 'settings') 177 178 payload = '0123456789abcdef' * 32 * 64 * 1024 179 resp = self.post_http10(body=payload, read_buffer_size=1024 * 1024) 180 assert resp['status'] == 200, 'status' 181 assert resp['body'] == payload, 'body' 182 183 def test_proxy_parallel(self): 184 payload = 'X' * 4096 * 257 185 buff_size = 4096 * 258 186 187 socks = [] 188 for i in range(10): 189 _, sock = self.post_http10( 190 body=payload + str(i), 191 start=True, 192 no_recv=True, 193 read_buffer_size=buff_size, 194 ) 195 socks.append(sock) 196 197 for i in range(10): 198 resp = self.recvall(socks[i], buff_size=buff_size).decode() 199 socks[i].close() 200 201 resp = self._resp_to_dict(resp) 202 203 assert resp['status'] == 200, 'status' 204 assert resp['body'] == payload + str(i), 'body' 205 206 def test_proxy_header(self): 207 assert 'success' in self.conf( 208 {"pass": "applications/custom_header"}, 'listeners/*:7081' 209 ), 'custom_header configure' 210 211 header_value = 'blah' 212 assert ( 213 self.get_http10( 214 headers={'Host': 'localhost', 'Custom-Header': header_value} 215 )['headers']['Custom-Header'] 216 == header_value 217 ), 'custom header' 218 219 header_value = r'(),/:;<=>?@[\]{}\t !#$%&\'*+-.^_`|~' 220 assert ( 221 self.get_http10( 222 headers={'Host': 'localhost', 'Custom-Header': header_value} 223 )['headers']['Custom-Header'] 224 == header_value 225 ), 'custom header 2' 226 227 header_value = 'X' * 4096 228 assert ( 229 self.get_http10( 230 headers={'Host': 'localhost', 'Custom-Header': header_value} 231 )['headers']['Custom-Header'] 232 == header_value 233 ), 'custom header 3' 234 235 header_value = 'X' * 8191 236 assert ( 237 self.get_http10( 238 headers={'Host': 'localhost', 'Custom-Header': header_value} 239 )['headers']['Custom-Header'] 240 == header_value 241 ), 'custom header 4' 242 243 header_value = 'X' * 8192 244 assert ( 245 self.get_http10( 246 headers={'Host': 'localhost', 'Custom-Header': header_value} 247 )['status'] 248 == 431 249 ), 'custom header 5' 250 251 def test_proxy_fragmented(self): 252 _, sock = self.http( 253 b"""GET / HTT""", raw=True, start=True, no_recv=True 254 ) 255 256 time.sleep(1) 257 258 sock.sendall("P/1.0\r\nHost: localhos".encode()) 259 260 time.sleep(1) 261 262 sock.sendall("t\r\n\r\n".encode()) 263 264 assert re.search( 265 '200 OK', self.recvall(sock).decode() 266 ), 'fragmented send' 267 sock.close() 268 269 def test_proxy_fragmented_close(self): 270 _, sock = self.http( 271 b"""GET / HTT""", raw=True, start=True, no_recv=True 272 ) 273 274 time.sleep(1) 275 276 sock.sendall("P/1.0\r\nHo".encode()) 277 278 sock.close() 279 280 def test_proxy_fragmented_body(self): 281 _, sock = self.http( 282 b"""GET / HTT""", raw=True, start=True, no_recv=True 283 ) 284 285 time.sleep(1) 286 287 sock.sendall("P/1.0\r\nHost: localhost\r\n".encode()) 288 sock.sendall("Content-Length: 30000\r\n".encode()) 289 290 time.sleep(1) 291 292 sock.sendall("\r\n".encode()) 293 sock.sendall(("X" * 10000).encode()) 294 295 time.sleep(1) 296 297 sock.sendall(("X" * 10000).encode()) 298 299 time.sleep(1) 300 301 sock.sendall(("X" * 10000).encode()) 302 303 resp = self._resp_to_dict(self.recvall(sock).decode()) 304 sock.close() 305 306 assert resp['status'] == 200, 'status' 307 assert resp['body'] == "X" * 30000, 'body' 308 309 def test_proxy_fragmented_body_close(self): 310 _, sock = self.http( 311 b"""GET / HTT""", raw=True, start=True, no_recv=True 312 ) 313 314 time.sleep(1) 315 316 sock.sendall("P/1.0\r\nHost: localhost\r\n".encode()) 317 sock.sendall("Content-Length: 30000\r\n".encode()) 318 319 time.sleep(1) 320 321 sock.sendall("\r\n".encode()) 322 sock.sendall(("X" * 10000).encode()) 323 324 sock.close() 325 326 def test_proxy_nowhere(self): 327 assert 'success' in self.conf( 328 [{"action": {"proxy": "http://127.0.0.1:7082"}}], 'routes' 329 ), 'proxy path changed' 330 331 assert self.get_http10()['status'] == 502, 'status' 332 333 def test_proxy_ipv6(self): 334 assert 'success' in self.conf( 335 { 336 "*:7080": {"pass": "routes"}, 337 "[::1]:7081": {'application': 'mirror'}, 338 }, 339 'listeners', 340 ), 'add ipv6 listener configure' 341 342 assert 'success' in self.conf( 343 [{"action": {"proxy": "http://[::1]:7081"}}], 'routes' 344 ), 'proxy ipv6 configure' 345 346 assert self.get_http10()['status'] == 200, 'status' 347 348 def test_proxy_unix(self, temp_dir): 349 addr = temp_dir + '/sock' 350 351 assert 'success' in self.conf( 352 { 353 "*:7080": {"pass": "routes"}, 354 "unix:" + addr: {'application': 'mirror'}, 355 }, 356 'listeners', 357 ), 'add unix listener configure' 358 359 assert 'success' in self.conf( 360 [{"action": {"proxy": 'http://unix:' + addr}}], 'routes' 361 ), 'proxy unix configure' 362 363 assert self.get_http10()['status'] == 200, 'status' 364 365 def test_proxy_delayed(self): 366 assert 'success' in self.conf( 367 {"pass": "applications/delayed"}, 'listeners/*:7081' 368 ), 'delayed configure' 369 370 body = '0123456789' * 1000 371 resp = self.post_http10( 372 headers={ 373 'Host': 'localhost', 374 'Content-Type': 'text/html', 375 'Content-Length': str(len(body)), 376 'X-Parts': '2', 377 'X-Delay': '1', 378 }, 379 body=body, 380 ) 381 382 assert resp['status'] == 200, 'status' 383 assert resp['body'] == body, 'body' 384 385 resp = self.post_http10( 386 headers={ 387 'Host': 'localhost', 388 'Content-Type': 'text/html', 389 'Content-Length': str(len(body)), 390 'X-Parts': '2', 391 'X-Delay': '1', 392 }, 393 body=body, 394 ) 395 396 assert resp['status'] == 200, 'status' 397 assert resp['body'] == body, 'body' 398 399 def test_proxy_delayed_close(self): 400 assert 'success' in self.conf( 401 {"pass": "applications/delayed"}, 'listeners/*:7081' 402 ), 'delayed configure' 403 404 _, sock = self.post_http10( 405 headers={ 406 'Host': 'localhost', 407 'Content-Type': 'text/html', 408 'Content-Length': '10000', 409 'X-Parts': '3', 410 'X-Delay': '1', 411 }, 412 body='0123456789' * 1000, 413 start=True, 414 no_recv=True, 415 ) 416 417 assert re.search('200 OK', sock.recv(100).decode()), 'first' 418 sock.close() 419 420 _, sock = self.post_http10( 421 headers={ 422 'Host': 'localhost', 423 'Content-Type': 'text/html', 424 'Content-Length': '10000', 425 'X-Parts': '3', 426 'X-Delay': '1', 427 }, 428 body='0123456789' * 1000, 429 start=True, 430 no_recv=True, 431 ) 432 433 assert re.search('200 OK', sock.recv(100).decode()), 'second' 434 sock.close() 435 436 @pytest.mark.skip('not yet') 437 def test_proxy_content_length(self): 438 assert 'success' in self.conf( 439 [ 440 { 441 "action": { 442 "proxy": "http://127.0.0.1:" + str(self.SERVER_PORT) 443 } 444 } 445 ], 446 'routes', 447 ), 'proxy backend configure' 448 449 resp = self.get_http10() 450 assert len(resp['body']) == 0, 'body lt Content-Length 0' 451 452 resp = self.get_http10(headers={'Host': 'localhost', 'X-Len': '5'}) 453 assert len(resp['body']) == 5, 'body lt Content-Length 5' 454 455 resp = self.get_http10(headers={'Host': 'localhost', 'X-Len': '9'}) 456 assert len(resp['body']) == 9, 'body lt Content-Length 9' 457 458 resp = self.get_http10(headers={'Host': 'localhost', 'X-Len': '11'}) 459 assert len(resp['body']) == 10, 'body gt Content-Length 11' 460 461 resp = self.get_http10(headers={'Host': 'localhost', 'X-Len': '15'}) 462 assert len(resp['body']) == 10, 'body gt Content-Length 15' 463 464 def test_proxy_invalid(self): 465 def check_proxy(proxy): 466 assert 'error' in \ 467 self.conf([{"action": {"proxy": proxy}}], 'routes'), \ 468 'proxy invalid' 469 470 check_proxy('blah') 471 check_proxy('/blah') 472 check_proxy('unix:/blah') 473 check_proxy('http://blah') 474 check_proxy('http://127.0.0.1') 475 check_proxy('http://127.0.0.1:') 476 check_proxy('http://127.0.0.1:blah') 477 check_proxy('http://127.0.0.1:-1') 478 check_proxy('http://127.0.0.1:7080b') 479 check_proxy('http://[]') 480 check_proxy('http://[]:7080') 481 check_proxy('http://[:]:7080') 482 check_proxy('http://[::7080') 483 484 def test_proxy_loop(self, skip_alert): 485 skip_alert( 486 r'socket.*failed', 487 r'accept.*failed', 488 r'new connections are not accepted', 489 ) 490 self.conf( 491 { 492 "listeners": { 493 "*:7080": {"pass": "routes"}, 494 "*:7081": {"pass": "applications/mirror"}, 495 "*:7082": {"pass": "routes"}, 496 }, 497 "routes": [{"action": {"proxy": "http://127.0.0.1:7082"}}], 498 "applications": { 499 "mirror": { 500 "type": "python", 501 "processes": {"spare": 0}, 502 "path": option.test_dir + "/python/mirror", 503 "working_directory": option.test_dir + "/python/mirror", 504 "module": "wsgi", 505 }, 506 }, 507 } 508 ) 509 510 self.get_http10(no_recv=True) 511 self.get_http10(read_timeout=1) 512