1import re 2import socket 3import time 4 5import pytest 6from conftest import run_process 7from unit.applications.lang.python import TestApplicationPython 8from unit.option import option 9from unit.utils import waitforsocket 10 11 12class TestProxy(TestApplicationPython): 13 prerequisites = {'modules': {'python': 'any'}} 14 15 SERVER_PORT = 7999 16 17 @staticmethod 18 def run_server(server_port): 19 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 20 sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) 21 22 server_address = ('', server_port) 23 sock.bind(server_address) 24 sock.listen(5) 25 26 def recvall(sock): 27 buff_size = 4096 28 data = b'' 29 while True: 30 part = sock.recv(buff_size) 31 data += part 32 if len(part) < buff_size: 33 break 34 return data 35 36 req = b"""HTTP/1.1 200 OK 37Content-Length: 10 38 39""" 40 41 while True: 42 connection, client_address = sock.accept() 43 44 data = recvall(connection).decode() 45 46 to_send = req 47 48 m = re.search(r'X-Len: (\d+)', data) 49 if m: 50 to_send += b'X' * int(m.group(1)) 51 52 connection.sendall(to_send) 53 54 connection.close() 55 56 def get_http10(self, *args, **kwargs): 57 return self.get(*args, http_10=True, **kwargs) 58 59 def post_http10(self, *args, **kwargs): 60 return self.post(*args, http_10=True, **kwargs) 61 62 def setup_method(self): 63 run_process(self.run_server, self.SERVER_PORT) 64 waitforsocket(self.SERVER_PORT) 65 66 assert 'success' in self.conf( 67 { 68 "listeners": { 69 "*:7080": {"pass": "routes"}, 70 "*:7081": {"pass": "applications/mirror"}, 71 }, 72 "routes": [{"action": {"proxy": "http://127.0.0.1:7081"}}], 73 "applications": { 74 "mirror": { 75 "type": self.get_application_type(), 76 "processes": {"spare": 0}, 77 "path": option.test_dir + "/python/mirror", 78 "working_directory": option.test_dir + "/python/mirror", 79 "module": "wsgi", 80 }, 81 "custom_header": { 82 "type": self.get_application_type(), 83 "processes": {"spare": 0}, 84 "path": option.test_dir + "/python/custom_header", 85 "working_directory": option.test_dir 86 + "/python/custom_header", 87 "module": "wsgi", 88 }, 89 "delayed": { 90 "type": self.get_application_type(), 91 "processes": {"spare": 0}, 92 "path": option.test_dir + "/python/delayed", 93 "working_directory": option.test_dir 94 + "/python/delayed", 95 "module": "wsgi", 96 }, 97 }, 98 } 99 ), 'proxy initial configuration' 100 101 def test_proxy_http10(self): 102 for _ in range(10): 103 assert self.get_http10()['status'] == 200, 'status' 104 105 def test_proxy_chain(self): 106 assert 'success' in self.conf( 107 { 108 "listeners": { 109 "*:7080": {"pass": "routes/first"}, 110 "*:7081": {"pass": "routes/second"}, 111 "*:7082": {"pass": "routes/third"}, 112 "*:7083": {"pass": "routes/fourth"}, 113 "*:7084": {"pass": "routes/fifth"}, 114 "*:7085": {"pass": "applications/mirror"}, 115 }, 116 "routes": { 117 "first": [{"action": {"proxy": "http://127.0.0.1:7081"}}], 118 "second": [{"action": {"proxy": "http://127.0.0.1:7082"}}], 119 "third": [{"action": {"proxy": "http://127.0.0.1:7083"}}], 120 "fourth": [{"action": {"proxy": "http://127.0.0.1:7084"}}], 121 "fifth": [{"action": {"proxy": "http://127.0.0.1:7085"}}], 122 }, 123 "applications": { 124 "mirror": { 125 "type": self.get_application_type(), 126 "processes": {"spare": 0}, 127 "path": option.test_dir + "/python/mirror", 128 "working_directory": option.test_dir + "/python/mirror", 129 "module": "wsgi", 130 } 131 }, 132 } 133 ), 'proxy chain configuration' 134 135 assert self.get_http10()['status'] == 200, 'status' 136 137 def test_proxy_body(self): 138 payload = '0123456789' 139 for _ in range(10): 140 resp = self.post_http10(body=payload) 141 142 assert resp['status'] == 200, 'status' 143 assert resp['body'] == payload, 'body' 144 145 payload = 'X' * 4096 146 for _ in range(10): 147 resp = self.post_http10(body=payload) 148 149 assert resp['status'] == 200, 'status' 150 assert resp['body'] == payload, 'body' 151 152 payload = 'X' * 4097 153 for _ in range(10): 154 resp = self.post_http10(body=payload) 155 156 assert resp['status'] == 200, 'status' 157 assert resp['body'] == payload, 'body' 158 159 payload = 'X' * 4096 * 256 160 for _ in range(10): 161 resp = self.post_http10(body=payload, read_buffer_size=4096 * 128) 162 163 assert resp['status'] == 200, 'status' 164 assert resp['body'] == payload, 'body' 165 166 payload = 'X' * 4096 * 257 167 for _ in range(10): 168 resp = self.post_http10(body=payload, read_buffer_size=4096 * 128) 169 170 assert resp['status'] == 200, 'status' 171 assert resp['body'] == payload, 'body' 172 173 assert 'success' in self.conf( 174 {'http': {'max_body_size': 32 * 1024 * 1024}}, 'settings' 175 ) 176 177 payload = '0123456789abcdef' * 32 * 64 * 1024 178 resp = self.post_http10(body=payload, read_buffer_size=1024 * 1024) 179 assert resp['status'] == 200, 'status' 180 assert resp['body'] == payload, 'body' 181 182 def test_proxy_parallel(self): 183 payload = 'X' * 4096 * 257 184 buff_size = 4096 * 258 185 186 socks = [] 187 for i in range(10): 188 _, sock = self.post_http10( 189 body=payload + str(i), 190 start=True, 191 no_recv=True, 192 read_buffer_size=buff_size, 193 ) 194 socks.append(sock) 195 196 for i in range(10): 197 resp = self.recvall(socks[i], buff_size=buff_size).decode() 198 socks[i].close() 199 200 resp = self._resp_to_dict(resp) 201 202 assert resp['status'] == 200, 'status' 203 assert resp['body'] == payload + str(i), 'body' 204 205 def test_proxy_header(self): 206 assert 'success' in self.conf( 207 {"pass": "applications/custom_header"}, 'listeners/*:7081' 208 ), 'custom_header configure' 209 210 header_value = 'blah' 211 assert ( 212 self.get_http10( 213 headers={'Host': 'localhost', 'Custom-Header': header_value} 214 )['headers']['Custom-Header'] 215 == header_value 216 ), 'custom header' 217 218 header_value = r'(),/:;<=>?@[\]{}\t !#$%&\'*+-.^_`|~' 219 assert ( 220 self.get_http10( 221 headers={'Host': 'localhost', 'Custom-Header': header_value} 222 )['headers']['Custom-Header'] 223 == header_value 224 ), 'custom header 2' 225 226 header_value = 'X' * 4096 227 assert ( 228 self.get_http10( 229 headers={'Host': 'localhost', 'Custom-Header': header_value} 230 )['headers']['Custom-Header'] 231 == header_value 232 ), 'custom header 3' 233 234 header_value = 'X' * 8191 235 assert ( 236 self.get_http10( 237 headers={'Host': 'localhost', 'Custom-Header': header_value} 238 )['headers']['Custom-Header'] 239 == header_value 240 ), 'custom header 4' 241 242 header_value = 'X' * 8192 243 assert ( 244 self.get_http10( 245 headers={'Host': 'localhost', 'Custom-Header': header_value} 246 )['status'] 247 == 431 248 ), 'custom header 5' 249 250 def test_proxy_fragmented(self): 251 _, sock = self.http( 252 b"""GET / HTT""", raw=True, start=True, no_recv=True 253 ) 254 255 time.sleep(1) 256 257 sock.sendall("P/1.0\r\nHost: localhos".encode()) 258 259 time.sleep(1) 260 261 sock.sendall("t\r\n\r\n".encode()) 262 263 assert re.search( 264 '200 OK', self.recvall(sock).decode() 265 ), 'fragmented send' 266 sock.close() 267 268 def test_proxy_fragmented_close(self): 269 _, sock = self.http( 270 b"""GET / HTT""", raw=True, start=True, no_recv=True 271 ) 272 273 time.sleep(1) 274 275 sock.sendall("P/1.0\r\nHo".encode()) 276 277 sock.close() 278 279 def test_proxy_fragmented_body(self): 280 _, sock = self.http( 281 b"""GET / HTT""", raw=True, start=True, no_recv=True 282 ) 283 284 time.sleep(1) 285 286 sock.sendall("P/1.0\r\nHost: localhost\r\n".encode()) 287 sock.sendall("Content-Length: 30000\r\n".encode()) 288 289 time.sleep(1) 290 291 sock.sendall("\r\n".encode()) 292 sock.sendall(("X" * 10000).encode()) 293 294 time.sleep(1) 295 296 sock.sendall(("X" * 10000).encode()) 297 298 time.sleep(1) 299 300 sock.sendall(("X" * 10000).encode()) 301 302 resp = self._resp_to_dict(self.recvall(sock).decode()) 303 sock.close() 304 305 assert resp['status'] == 200, 'status' 306 assert resp['body'] == "X" * 30000, 'body' 307 308 def test_proxy_fragmented_body_close(self): 309 _, sock = self.http( 310 b"""GET / HTT""", raw=True, start=True, no_recv=True 311 ) 312 313 time.sleep(1) 314 315 sock.sendall("P/1.0\r\nHost: localhost\r\n".encode()) 316 sock.sendall("Content-Length: 30000\r\n".encode()) 317 318 time.sleep(1) 319 320 sock.sendall("\r\n".encode()) 321 sock.sendall(("X" * 10000).encode()) 322 323 sock.close() 324 325 def test_proxy_nowhere(self): 326 assert 'success' in self.conf( 327 [{"action": {"proxy": "http://127.0.0.1:7082"}}], 'routes' 328 ), 'proxy path changed' 329 330 assert self.get_http10()['status'] == 502, 'status' 331 332 def test_proxy_ipv6(self): 333 assert 'success' in self.conf( 334 { 335 "*:7080": {"pass": "routes"}, 336 "[::1]:7081": {'application': 'mirror'}, 337 }, 338 'listeners', 339 ), 'add ipv6 listener configure' 340 341 assert 'success' in self.conf( 342 [{"action": {"proxy": "http://[::1]:7081"}}], 'routes' 343 ), 'proxy ipv6 configure' 344 345 assert self.get_http10()['status'] == 200, 'status' 346 347 def test_proxy_unix(self, temp_dir): 348 addr = temp_dir + '/sock' 349 350 assert 'success' in self.conf( 351 { 352 "*:7080": {"pass": "routes"}, 353 "unix:" + addr: {'application': 'mirror'}, 354 }, 355 'listeners', 356 ), 'add unix listener configure' 357 358 assert 'success' in self.conf( 359 [{"action": {"proxy": 'http://unix:' + addr}}], 'routes' 360 ), 'proxy unix configure' 361 362 assert self.get_http10()['status'] == 200, 'status' 363 364 def test_proxy_delayed(self): 365 assert 'success' in self.conf( 366 {"pass": "applications/delayed"}, 'listeners/*:7081' 367 ), 'delayed configure' 368 369 body = '0123456789' * 1000 370 resp = self.post_http10( 371 headers={ 372 'Host': 'localhost', 373 'Content-Length': str(len(body)), 374 'X-Parts': '2', 375 'X-Delay': '1', 376 }, 377 body=body, 378 ) 379 380 assert resp['status'] == 200, 'status' 381 assert resp['body'] == body, 'body' 382 383 resp = self.post_http10( 384 headers={ 385 'Host': 'localhost', 386 'Content-Length': str(len(body)), 387 'X-Parts': '2', 388 'X-Delay': '1', 389 }, 390 body=body, 391 ) 392 393 assert resp['status'] == 200, 'status' 394 assert resp['body'] == body, 'body' 395 396 def test_proxy_delayed_close(self): 397 assert 'success' in self.conf( 398 {"pass": "applications/delayed"}, 'listeners/*:7081' 399 ), 'delayed configure' 400 401 _, sock = self.post_http10( 402 headers={ 403 'Host': 'localhost', 404 'Content-Length': '10000', 405 'X-Parts': '3', 406 'X-Delay': '1', 407 }, 408 body='0123456789' * 1000, 409 start=True, 410 no_recv=True, 411 ) 412 413 assert re.search('200 OK', sock.recv(100).decode()), 'first' 414 sock.close() 415 416 _, sock = self.post_http10( 417 headers={ 418 'Host': 'localhost', 419 'Content-Length': '10000', 420 'X-Parts': '3', 421 'X-Delay': '1', 422 }, 423 body='0123456789' * 1000, 424 start=True, 425 no_recv=True, 426 ) 427 428 assert re.search('200 OK', sock.recv(100).decode()), 'second' 429 sock.close() 430 431 @pytest.mark.skip('not yet') 432 def test_proxy_content_length(self): 433 assert 'success' in self.conf( 434 [ 435 { 436 "action": { 437 "proxy": "http://127.0.0.1:" + str(self.SERVER_PORT) 438 } 439 } 440 ], 441 'routes', 442 ), 'proxy backend configure' 443 444 resp = self.get_http10() 445 assert len(resp['body']) == 0, 'body lt Content-Length 0' 446 447 resp = self.get_http10(headers={'Host': 'localhost', 'X-Len': '5'}) 448 assert len(resp['body']) == 5, 'body lt Content-Length 5' 449 450 resp = self.get_http10(headers={'Host': 'localhost', 'X-Len': '9'}) 451 assert len(resp['body']) == 9, 'body lt Content-Length 9' 452 453 resp = self.get_http10(headers={'Host': 'localhost', 'X-Len': '11'}) 454 assert len(resp['body']) == 10, 'body gt Content-Length 11' 455 456 resp = self.get_http10(headers={'Host': 'localhost', 'X-Len': '15'}) 457 assert len(resp['body']) == 10, 'body gt Content-Length 15' 458 459 def test_proxy_invalid(self): 460 def check_proxy(proxy): 461 assert 'error' in self.conf( 462 [{"action": {"proxy": proxy}}], 'routes' 463 ), 'proxy invalid' 464 465 check_proxy('blah') 466 check_proxy('/blah') 467 check_proxy('unix:/blah') 468 check_proxy('http://blah') 469 check_proxy('http://127.0.0.1') 470 check_proxy('http://127.0.0.1:') 471 check_proxy('http://127.0.0.1:blah') 472 check_proxy('http://127.0.0.1:-1') 473 check_proxy('http://127.0.0.1:7080b') 474 check_proxy('http://[]') 475 check_proxy('http://[]:7080') 476 check_proxy('http://[:]:7080') 477 check_proxy('http://[::7080') 478 479 @pytest.mark.skip('not yet') 480 def test_proxy_loop(self, skip_alert): 481 skip_alert( 482 r'socket.*failed', 483 r'accept.*failed', 484 r'new connections are not accepted', 485 ) 486 assert 'success' in self.conf( 487 { 488 "listeners": { 489 "*:7080": {"pass": "routes"}, 490 "*:7081": {"pass": "applications/mirror"}, 491 "*:7082": {"pass": "routes"}, 492 }, 493 "routes": [{"action": {"proxy": "http://127.0.0.1:7082"}}], 494 "applications": { 495 "mirror": { 496 "type": self.get_application_type(), 497 "processes": {"spare": 0}, 498 "path": option.test_dir + "/python/mirror", 499 "working_directory": option.test_dir + "/python/mirror", 500 "module": "wsgi", 501 }, 502 }, 503 } 504 ) 505 506 self.get_http10(no_recv=True) 507 self.get_http10(read_timeout=1) 508