1import ssl 2 3import pytest 4from unit.applications.tls import TestApplicationTLS 5 6 7class TestTLSConfCommand(TestApplicationTLS): 8 prerequisites = {'modules': {'openssl': 'any'}} 9 10 @pytest.fixture(autouse=True) 11 def setup_method_fixture(self, request): 12 self.certificate() 13 14 assert 'success' in self.conf( 15 { 16 "listeners": { 17 "*:7080": { 18 "pass": "routes", 19 "tls": {"certificate": "default"}, 20 } 21 }, 22 "routes": [{"action": {"return": 200}}], 23 "applications": {}, 24 } 25 ), 'load application configuration' 26 27 def test_tls_conf_command(self): 28 def check_no_connection(): 29 try: 30 self.get_ssl() 31 pytest.fail('Unexpected connection.') 32 33 except (ssl.SSLError, ConnectionRefusedError): 34 pass 35 36 # Set one conf_commands (disable protocol). 37 38 (resp, sock) = self.get_ssl(start=True) 39 40 shared_ciphers = sock.shared_ciphers() 41 protocols = list(set(c[1] for c in shared_ciphers)) 42 protocol = sock.cipher()[1] 43 44 if '/' in protocol: 45 pytest.skip('Complex protocol format.') 46 47 assert 'success' in self.conf( 48 { 49 "certificate": "default", 50 "conf_commands": {"protocol": '-' + protocol}, 51 }, 52 'listeners/*:7080/tls', 53 ), 'protocol disabled' 54 55 sock.close() 56 57 if len(protocols) > 1: 58 (resp, sock) = self.get_ssl(start=True) 59 60 cipher = sock.cipher() 61 assert cipher[1] != protocol, 'new protocol used' 62 63 shared_ciphers = sock.shared_ciphers() 64 ciphers = list(set(c for c in shared_ciphers if c[1] == cipher[1])) 65 66 sock.close() 67 else: 68 check_no_connection() 69 pytest.skip('One TLS protocol available only.') 70 71 # Set two conf_commands (disable protocol and cipher). 72 73 assert 'success' in self.conf( 74 { 75 "certificate": "default", 76 "conf_commands": { 77 "protocol": '-' + protocol, 78 "cipherstring": cipher[1] + ":!" + cipher[0], 79 }, 80 }, 81 'listeners/*:7080/tls', 82 ), 'cipher disabled' 83 84 if len(ciphers) > 1: 85 (resp, sock) = self.get_ssl(start=True) 86 87 cipher_new = sock.cipher() 88 assert cipher_new[1] == cipher[1], 'previous protocol used' 89 assert cipher_new[0] != cipher[0], 'new cipher used' 90 91 sock.close() 92 93 else: 94 check_no_connection() 95 96 def test_tls_conf_command_invalid(self, skip_alert): 97 skip_alert(r'SSL_CONF_cmd', r'failed to apply new conf') 98 99 def check_conf_commands(conf_commands): 100 assert 'error' in self.conf( 101 {"certificate": "default", "conf_commands": conf_commands}, 102 'listeners/*:7080/tls', 103 ), 'ivalid conf_commands' 104 105 check_conf_commands([]) 106 check_conf_commands("blah") 107 check_conf_commands({"": ""}) 108 check_conf_commands({"blah": ""}) 109 check_conf_commands({"protocol": {}}) 110 check_conf_commands({"protocol": "blah"}) 111 check_conf_commands({"protocol": "TLSv1.2", "blah": ""}) 112