1import socket 2import ssl 3import time 4 5import pytest 6from unit.applications.tls import TestApplicationTLS 7 8 9class TestReconfigureTLS(TestApplicationTLS): 10 prerequisites = {'modules': {'openssl': 'any'}} 11 12 @pytest.fixture(autouse=True) 13 def setup_method_fixture(self): 14 self.certificate() 15 16 assert 'success' in self.conf( 17 { 18 "listeners": { 19 "*:7080": { 20 "pass": "routes", 21 "tls": {"certificate": "default"}, 22 } 23 }, 24 "routes": [{"action": {"return": 200}}], 25 "applications": {}, 26 } 27 ), 'load application configuration' 28 29 def create_socket(self): 30 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1_2) 31 ctx.verify_mode = ssl.CERT_NONE 32 ctx.check_hostname = False 33 34 s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 35 ssl_sock = ctx.wrap_socket( 36 s, server_hostname='localhost', do_handshake_on_connect=False 37 ) 38 ssl_sock.connect(('127.0.0.1', 7080)) 39 40 return ssl_sock 41 42 def clear_conf(self): 43 assert 'success' in self.conf({"listeners": {}, "applications": {}}) 44 45 @pytest.mark.skip('not yet') 46 def test_reconfigure_tls_switch(self): 47 assert 'success' in self.conf_delete('listeners/*:7080/tls') 48 49 (_, sock) = self.get( 50 headers={'Host': 'localhost', 'Connection': 'keep-alive'}, 51 start=True, 52 read_timeout=1, 53 ) 54 55 assert 'success' in self.conf( 56 {"pass": "routes", "tls": {"certificate": "default"}}, 57 'listeners/*:7080', 58 ) 59 60 assert self.get(sock=sock)['status'] == 200, 'reconfigure' 61 assert self.get_ssl()['status'] == 200, 'reconfigure tls' 62 63 def test_reconfigure_tls(self): 64 ssl_sock = self.create_socket() 65 66 ssl_sock.sendall("""GET / HTTP/1.1\r\n""".encode()) 67 68 self.clear_conf() 69 70 ssl_sock.sendall( 71 """Host: localhost\r\nConnection: close\r\n\r\n""".encode() 72 ) 73 74 assert ( 75 self.recvall(ssl_sock).decode().startswith('HTTP/1.1 200 OK') 76 ), 'finish request' 77 78 def test_reconfigure_tls_2(self): 79 ssl_sock = self.create_socket() 80 81 # Waiting for connection completion. 82 # Delay should be more than TCP_DEFER_ACCEPT. 83 time.sleep(1.5) 84 85 self.clear_conf() 86 87 try: 88 ssl_sock.do_handshake() 89 except ssl.SSLError: 90 ssl_sock.close() 91 success = True 92 93 if not success: 94 pytest.fail('Connection is not closed.') 95 96 def test_reconfigure_tls_3(self): 97 ssl_sock = self.create_socket() 98 ssl_sock.do_handshake() 99 100 self.clear_conf() 101 102 assert self.get(sock=ssl_sock)['status'] == 408, 'request timeout' 103