1import socket 2import ssl 3import time 4 5import pytest 6 7from unit.applications.tls import ApplicationTLS 8from unit.option import option 9 10prerequisites = {'modules': {'openssl': 'any'}} 11 12client = ApplicationTLS() 13 14 15@pytest.fixture(autouse=True) 16def setup_method_fixture(): 17 if 'HAS_TLSv1_2' not in dir(ssl) or not ssl.HAS_TLSv1_2: 18 pytest.skip('OpenSSL too old') 19 20 client.certificate() 21 22 assert 'success' in client.conf( 23 { 24 "listeners": { 25 "*:8080": { 26 "pass": "routes", 27 "tls": {"certificate": "default"}, 28 } 29 }, 30 "routes": [{"action": {"return": 200}}], 31 "applications": {}, 32 } 33 ), 'load application configuration' 34 35 36def create_socket(): 37 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT) 38 ctx.check_hostname = False 39 ctx.verify_mode = ssl.CERT_NONE 40 41 s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 42 ssl_sock = ctx.wrap_socket( 43 s, server_hostname='localhost', do_handshake_on_connect=False 44 ) 45 ssl_sock.connect(('127.0.0.1', 8080)) 46 47 return ssl_sock 48 49 50def clear_conf(): 51 assert 'success' in client.conf({"listeners": {}, "applications": {}}) 52 53 54@pytest.mark.skip('not yet') 55def test_reconfigure_tls_switch(): 56 assert 'success' in client.conf_delete('listeners/*:8080/tls') 57 58 (_, sock) = client.get( 59 headers={'Host': 'localhost', 'Connection': 'keep-alive'}, 60 start=True, 61 read_timeout=1, 62 ) 63 64 assert 'success' in client.conf( 65 {"pass": "routes", "tls": {"certificate": "default"}}, 66 'listeners/*:8080', 67 ) 68 69 assert client.get(sock=sock)['status'] == 200, 'reconfigure' 70 assert client.get_ssl()['status'] == 200, 'reconfigure tls' 71 72 73def test_reconfigure_tls(): 74 if option.configure_flag['asan']: 75 pytest.skip('not yet, router crash') 76 77 ssl_sock = create_socket() 78 79 ssl_sock.sendall("""GET / HTTP/1.1\r\n""".encode()) 80 81 clear_conf() 82 83 ssl_sock.sendall( 84 """Host: localhost\r\nConnection: close\r\n\r\n""".encode() 85 ) 86 87 assert ( 88 client.recvall(ssl_sock).decode().startswith('HTTP/1.1 200 OK') 89 ), 'finish request' 90 91 92def test_reconfigure_tls_2(): 93 ssl_sock = create_socket() 94 95 # Waiting for connection completion. 96 # Delay should be more than TCP_DEFER_ACCEPT. 97 time.sleep(1.5) 98 99 clear_conf() 100 101 success = False 102 103 try: 104 ssl_sock.do_handshake() 105 except ssl.SSLError: 106 ssl_sock.close() 107 success = True 108 109 if not success: 110 pytest.fail('Connection is not closed.') 111 112 113def test_reconfigure_tls_3(): 114 if option.configure_flag['asan']: 115 pytest.skip('not yet, router crash') 116 117 ssl_sock = create_socket() 118 ssl_sock.do_handshake() 119 120 clear_conf() 121 122 assert client.get(sock=ssl_sock)['status'] == 408, 'request timeout' 123