1import re 2import time 3import socket 4import unittest 5from unit.applications.lang.python import TestApplicationPython 6 7 8class TestProxy(TestApplicationPython): 9 prerequisites = {'modules': ['python']} 10 11 SERVER_PORT = 7999 12 13 @staticmethod 14 def run_server(server_port): 15 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 16 sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) 17 18 server_address = ('', server_port) 19 sock.bind(server_address) 20 sock.listen(5) 21 22 def recvall(sock): 23 buff_size = 4096 24 data = b'' 25 while True: 26 part = sock.recv(buff_size) 27 data += part 28 if len(part) < buff_size: 29 break 30 return data 31 32 req = b"""HTTP/1.1 200 OK 33Content-Length: 10 34 35""" 36 37 while True: 38 connection, client_address = sock.accept() 39 40 data = recvall(connection).decode() 41 42 to_send = req 43 44 m = re.search('X-Len: (\d+)', data) 45 if m: 46 to_send += b'X' * int(m.group(1)) 47 48 connection.sendall(to_send) 49 50 connection.close() 51 52 def get_http10(self, *args, **kwargs): 53 return self.get(*args, http_10=True, **kwargs) 54 55 def post_http10(self, *args, **kwargs): 56 return self.post(*args, http_10=True, **kwargs) 57 58 def setUp(self): 59 super().setUp() 60 61 self.run_process(self.run_server, self.SERVER_PORT) 62 self.waitforsocket(self.SERVER_PORT) 63 64 self.assertIn( 65 'success', 66 self.conf( 67 { 68 "listeners": { 69 "*:7080": {"pass": "routes"}, 70 "*:7081": {"pass": "applications/mirror"}, 71 }, 72 "routes": [{"action": {"proxy": "http://127.0.0.1:7081"}}], 73 "applications": { 74 "mirror": { 75 "type": "python", 76 "processes": {"spare": 0}, 77 "path": self.current_dir + "/python/mirror", 78 "working_directory": self.current_dir 79 + "/python/mirror", 80 "module": "wsgi", 81 }, 82 "custom_header": { 83 "type": "python", 84 "processes": {"spare": 0}, 85 "path": self.current_dir + "/python/custom_header", 86 "working_directory": self.current_dir 87 + "/python/custom_header", 88 "module": "wsgi", 89 }, 90 "delayed": { 91 "type": "python", 92 "processes": {"spare": 0}, 93 "path": self.current_dir + "/python/delayed", 94 "working_directory": self.current_dir 95 + "/python/delayed", 96 "module": "wsgi", 97 }, 98 }, 99 } 100 ), 101 'proxy initial configuration', 102 ) 103 104 def test_proxy_http10(self): 105 for _ in range(10): 106 self.assertEqual(self.get_http10()['status'], 200, 'status') 107 108 def test_proxy_chain(self): 109 self.assertIn( 110 'success', 111 self.conf( 112 { 113 "listeners": { 114 "*:7080": {"pass": "routes/first"}, 115 "*:7081": {"pass": "routes/second"}, 116 "*:7082": {"pass": "routes/third"}, 117 "*:7083": {"pass": "routes/fourth"}, 118 "*:7084": {"pass": "routes/fifth"}, 119 "*:7085": {"pass": "applications/mirror"}, 120 }, 121 "routes": { 122 "first": [ 123 {"action": {"proxy": "http://127.0.0.1:7081"}} 124 ], 125 "second": [ 126 {"action": {"proxy": "http://127.0.0.1:7082"}} 127 ], 128 "third": [ 129 {"action": {"proxy": "http://127.0.0.1:7083"}} 130 ], 131 "fourth": [ 132 {"action": {"proxy": "http://127.0.0.1:7084"}} 133 ], 134 "fifth": [ 135 {"action": {"proxy": "http://127.0.0.1:7085"}} 136 ], 137 }, 138 "applications": { 139 "mirror": { 140 "type": "python", 141 "processes": {"spare": 0}, 142 "path": self.current_dir + "/python/mirror", 143 "working_directory": self.current_dir 144 + "/python/mirror", 145 "module": "wsgi", 146 } 147 }, 148 } 149 ), 150 'proxy chain configuration', 151 ) 152 153 self.assertEqual(self.get_http10()['status'], 200, 'status') 154 155 def test_proxy_body(self): 156 payload = '0123456789' 157 for _ in range(10): 158 resp = self.post_http10(body=payload) 159 160 self.assertEqual(resp['status'], 200, 'status') 161 self.assertEqual(resp['body'], payload, 'body') 162 163 payload = 'X' * 4096 164 for _ in range(10): 165 resp = self.post_http10(body=payload) 166 167 self.assertEqual(resp['status'], 200, 'status') 168 self.assertEqual(resp['body'], payload, 'body') 169 170 payload = 'X' * 4097 171 for _ in range(10): 172 resp = self.post_http10(body=payload) 173 174 self.assertEqual(resp['status'], 200, 'status') 175 self.assertEqual(resp['body'], payload, 'body') 176 177 payload = 'X' * 4096 * 256 178 for _ in range(10): 179 resp = self.post_http10(body=payload, read_buffer_size=4096 * 128) 180 181 self.assertEqual(resp['status'], 200, 'status') 182 self.assertEqual(resp['body'], payload, 'body') 183 184 payload = 'X' * 4096 * 257 185 for _ in range(10): 186 resp = self.post_http10(body=payload, read_buffer_size=4096 * 128) 187 188 self.assertEqual(resp['status'], 200, 'status') 189 self.assertEqual(resp['body'], payload, 'body') 190 191 self.conf({'http': {'max_body_size': 32 * 1024 * 1024}}, 'settings') 192 193 payload = '0123456789abcdef' * 32 * 64 * 1024 194 resp = self.post_http10(body=payload, read_buffer_size=1024 * 1024) 195 self.assertEqual(resp['status'], 200, 'status') 196 self.assertEqual(resp['body'], payload, 'body') 197 198 def test_proxy_parallel(self): 199 payload = 'X' * 4096 * 257 200 buff_size = 4096 * 258 201 202 socks = [] 203 for i in range(10): 204 _, sock = self.post_http10( 205 body=payload + str(i), 206 start=True, 207 no_recv=True, 208 read_buffer_size=buff_size, 209 ) 210 socks.append(sock) 211 212 for i in range(10): 213 resp = self.recvall(socks[i], buff_size=buff_size).decode() 214 socks[i].close() 215 216 resp = self._resp_to_dict(resp) 217 218 self.assertEqual(resp['status'], 200, 'status') 219 self.assertEqual(resp['body'], payload + str(i), 'body') 220 221 def test_proxy_header(self): 222 self.assertIn( 223 'success', 224 self.conf( 225 {"pass": "applications/custom_header"}, 'listeners/*:7081' 226 ), 227 'custom_header configure', 228 ) 229 230 header_value = 'blah' 231 self.assertEqual( 232 self.get_http10( 233 headers={'Host': 'localhost', 'Custom-Header': header_value} 234 )['headers']['Custom-Header'], 235 header_value, 236 'custom header', 237 ) 238 239 header_value = '(),/:;<=>?@[\]{}\t !#$%&\'*+-.^_`|~' 240 self.assertEqual( 241 self.get_http10( 242 headers={'Host': 'localhost', 'Custom-Header': header_value} 243 )['headers']['Custom-Header'], 244 header_value, 245 'custom header 2', 246 ) 247 248 header_value = 'X' * 4096 249 self.assertEqual( 250 self.get_http10( 251 headers={'Host': 'localhost', 'Custom-Header': header_value} 252 )['headers']['Custom-Header'], 253 header_value, 254 'custom header 3', 255 ) 256 257 header_value = 'X' * 8191 258 self.assertEqual( 259 self.get_http10( 260 headers={'Host': 'localhost', 'Custom-Header': header_value} 261 )['headers']['Custom-Header'], 262 header_value, 263 'custom header 4', 264 ) 265 266 header_value = 'X' * 8192 267 self.assertEqual( 268 self.get_http10( 269 headers={'Host': 'localhost', 'Custom-Header': header_value} 270 )['status'], 271 431, 272 'custom header 5', 273 ) 274 275 def test_proxy_fragmented(self): 276 _, sock = self.http( 277 b"""GET / HTT""", raw=True, start=True, no_recv=True 278 ) 279 280 time.sleep(1) 281 282 sock.sendall("P/1.0\r\nHost: localhos".encode()) 283 284 time.sleep(1) 285 286 sock.sendall("t\r\n\r\n".encode()) 287 288 self.assertRegex( 289 self.recvall(sock).decode(), '200 OK', 'fragmented send' 290 ) 291 sock.close() 292 293 def test_proxy_fragmented_close(self): 294 _, sock = self.http( 295 b"""GET / HTT""", raw=True, start=True, no_recv=True 296 ) 297 298 time.sleep(1) 299 300 sock.sendall("P/1.0\r\nHo".encode()) 301 302 sock.close() 303 304 def test_proxy_fragmented_body(self): 305 _, sock = self.http( 306 b"""GET / HTT""", raw=True, start=True, no_recv=True 307 ) 308 309 time.sleep(1) 310 311 sock.sendall("P/1.0\r\nHost: localhost\r\n".encode()) 312 sock.sendall("Content-Length: 30000\r\n".encode()) 313 314 time.sleep(1) 315 316 sock.sendall("\r\n".encode()) 317 sock.sendall(("X" * 10000).encode()) 318 319 time.sleep(1) 320 321 sock.sendall(("X" * 10000).encode()) 322 323 time.sleep(1) 324 325 sock.sendall(("X" * 10000).encode()) 326 327 resp = self._resp_to_dict(self.recvall(sock).decode()) 328 sock.close() 329 330 self.assertEqual(resp['status'], 200, 'status') 331 self.assertEqual(resp['body'], "X" * 30000, 'body') 332 333 def test_proxy_fragmented_body_close(self): 334 _, sock = self.http( 335 b"""GET / HTT""", raw=True, start=True, no_recv=True 336 ) 337 338 time.sleep(1) 339 340 sock.sendall("P/1.0\r\nHost: localhost\r\n".encode()) 341 sock.sendall("Content-Length: 30000\r\n".encode()) 342 343 time.sleep(1) 344 345 sock.sendall("\r\n".encode()) 346 sock.sendall(("X" * 10000).encode()) 347 348 sock.close() 349 350 def test_proxy_nowhere(self): 351 self.assertIn( 352 'success', 353 self.conf( 354 [{"action": {"proxy": "http://127.0.0.1:7082"}}], 'routes' 355 ), 356 'proxy path changed', 357 ) 358 359 self.assertEqual(self.get_http10()['status'], 502, 'status') 360 361 def test_proxy_ipv6(self): 362 self.assertIn( 363 'success', 364 self.conf( 365 { 366 "*:7080": {"pass": "routes"}, 367 "[::1]:7081": {'application': 'mirror'}, 368 }, 369 'listeners', 370 ), 371 'add ipv6 listener configure', 372 ) 373 374 self.assertIn( 375 'success', 376 self.conf([{"action": {"proxy": "http://[::1]:7081"}}], 'routes'), 377 'proxy ipv6 configure', 378 ) 379 380 self.assertEqual(self.get_http10()['status'], 200, 'status') 381 382 def test_proxy_unix(self): 383 addr = self.testdir + '/sock' 384 385 self.assertIn( 386 'success', 387 self.conf( 388 { 389 "*:7080": {"pass": "routes"}, 390 "unix:" + addr: {'application': 'mirror'}, 391 }, 392 'listeners', 393 ), 394 'add unix listener configure', 395 ) 396 397 self.assertIn( 398 'success', 399 self.conf( 400 [{"action": {"proxy": 'http://unix:' + addr}}], 'routes' 401 ), 402 'proxy unix configure', 403 ) 404 405 self.assertEqual(self.get_http10()['status'], 200, 'status') 406 407 def test_proxy_delayed(self): 408 self.assertIn( 409 'success', 410 self.conf( 411 {"pass": "applications/delayed"}, 'listeners/*:7081' 412 ), 413 'delayed configure', 414 ) 415 416 body = '0123456789' * 1000 417 resp = self.post_http10( 418 headers={ 419 'Host': 'localhost', 420 'Content-Type': 'text/html', 421 'Content-Length': str(len(body)), 422 'X-Parts': '2', 423 'X-Delay': '1', 424 }, 425 body=body, 426 ) 427 428 self.assertEqual(resp['status'], 200, 'status') 429 self.assertEqual(resp['body'], body, 'body') 430 431 resp = self.post_http10( 432 headers={ 433 'Host': 'localhost', 434 'Content-Type': 'text/html', 435 'Content-Length': str(len(body)), 436 'X-Parts': '2', 437 'X-Delay': '1', 438 }, 439 body=body, 440 ) 441 442 self.assertEqual(resp['status'], 200, 'status') 443 self.assertEqual(resp['body'], body, 'body') 444 445 def test_proxy_delayed_close(self): 446 self.assertIn( 447 'success', 448 self.conf( 449 {"pass": "applications/delayed"}, 'listeners/*:7081' 450 ), 451 'delayed configure', 452 ) 453 454 _, sock = self.post_http10( 455 headers={ 456 'Host': 'localhost', 457 'Content-Type': 'text/html', 458 'Content-Length': '10000', 459 'X-Parts': '3', 460 'X-Delay': '1', 461 }, 462 body='0123456789' * 1000, 463 start=True, 464 no_recv=True, 465 ) 466 467 self.assertRegex( 468 sock.recv(100).decode(), '200 OK', 'first' 469 ) 470 sock.close() 471 472 _, sock = self.post_http10( 473 headers={ 474 'Host': 'localhost', 475 'Content-Type': 'text/html', 476 'Content-Length': '10000', 477 'X-Parts': '3', 478 'X-Delay': '1', 479 }, 480 body='0123456789' * 1000, 481 start=True, 482 no_recv=True, 483 ) 484 485 self.assertRegex( 486 sock.recv(100).decode(), '200 OK', 'second' 487 ) 488 sock.close() 489 490 @unittest.skip('not yet') 491 def test_proxy_content_length(self): 492 self.assertIn( 493 'success', 494 self.conf( 495 [ 496 { 497 "action": { 498 "proxy": "http://127.0.0.1:" 499 + str(self.SERVER_PORT) 500 } 501 } 502 ], 503 'routes', 504 ), 505 'proxy backend configure', 506 ) 507 508 resp = self.get_http10() 509 self.assertEqual(len(resp['body']), 0, 'body lt Content-Length 0') 510 511 resp = self.get_http10(headers={'Host': 'localhost', 'X-Len': '5'}) 512 self.assertEqual(len(resp['body']), 5, 'body lt Content-Length 5') 513 514 resp = self.get_http10(headers={'Host': 'localhost', 'X-Len': '9'}) 515 self.assertEqual(len(resp['body']), 9, 'body lt Content-Length 9') 516 517 resp = self.get_http10(headers={'Host': 'localhost', 'X-Len': '11'}) 518 self.assertEqual(len(resp['body']), 10, 'body gt Content-Length 11') 519 520 resp = self.get_http10(headers={'Host': 'localhost', 'X-Len': '15'}) 521 self.assertEqual(len(resp['body']), 10, 'body gt Content-Length 15') 522 523 def test_proxy_invalid(self): 524 self.assertIn( 525 'error', 526 self.conf([{"action": {"proxy": 'blah'}}], 'routes'), 527 'proxy invalid', 528 ) 529 self.assertIn( 530 'error', 531 self.conf([{"action": {"proxy": '/blah'}}], 'routes'), 532 'proxy invalid 2', 533 ) 534 self.assertIn( 535 'error', 536 self.conf([{"action": {"proxy": 'unix:/blah'}}], 'routes'), 537 'proxy unix invalid 2', 538 ) 539 self.assertIn( 540 'error', 541 self.conf([{"action": {"proxy": 'http://blah'}}], 'routes'), 542 'proxy unix invalid 3', 543 ) 544 self.assertIn( 545 'error', 546 self.conf([{"action": {"proxy": 'http://127.0.0.1'}}], 'routes'), 547 'proxy ipv4 invalid', 548 ) 549 self.assertIn( 550 'error', 551 self.conf([{"action": {"proxy": 'http://127.0.0.1:'}}], 'routes'), 552 'proxy ipv4 invalid 2', 553 ) 554 self.assertIn( 555 'error', 556 self.conf( 557 [{"action": {"proxy": 'http://127.0.0.1:blah'}}], 'routes' 558 ), 559 'proxy ipv4 invalid 3', 560 ) 561 self.assertIn( 562 'error', 563 self.conf( 564 [{"action": {"proxy": 'http://127.0.0.1:-1'}}], 'routes' 565 ), 566 'proxy ipv4 invalid 4', 567 ) 568 self.assertIn( 569 'error', 570 self.conf( 571 [{"action": {"proxy": 'http://127.0.0.1:7080b'}}], 'routes' 572 ), 573 'proxy ipv4 invalid 5', 574 ) 575 self.assertIn( 576 'error', 577 self.conf( 578 [{"action": {"proxy": 'http://[]'}}], 'routes' 579 ), 580 'proxy ipv6 invalid', 581 ) 582 self.assertIn( 583 'error', 584 self.conf( 585 [{"action": {"proxy": 'http://[]:7080'}}], 'routes' 586 ), 587 'proxy ipv6 invalid 2', 588 ) 589 self.assertIn( 590 'error', 591 self.conf( 592 [{"action": {"proxy": 'http://[:]:7080'}}], 'routes' 593 ), 594 'proxy ipv6 invalid 3', 595 ) 596 self.assertIn( 597 'error', 598 self.conf( 599 [{"action": {"proxy": 'http://[::7080'}}], 'routes' 600 ), 601 'proxy ipv6 invalid 4', 602 ) 603 604 def test_proxy_loop(self): 605 self.skip_alerts.extend( 606 [ 607 r'socket.*failed', 608 r'accept.*failed', 609 r'new connections are not accepted', 610 ] 611 ) 612 self.conf( 613 { 614 "listeners": { 615 "*:7080": {"pass": "routes"}, 616 "*:7081": {"pass": "applications/mirror"}, 617 "*:7082": {"pass": "routes"}, 618 }, 619 "routes": [{"action": {"proxy": "http://127.0.0.1:7082"}}], 620 "applications": { 621 "mirror": { 622 "type": "python", 623 "processes": {"spare": 0}, 624 "path": self.current_dir + "/python/mirror", 625 "working_directory": self.current_dir 626 + "/python/mirror", 627 "module": "wsgi", 628 }, 629 }, 630 } 631 ) 632 633 self.get_http10(no_recv=True) 634 self.get_http10(read_timeout=1) 635 636if __name__ == '__main__': 637 TestProxy.main() 638