1import os 2import ssl 3import subprocess 4 5from unit.applications.proto import TestApplicationProto 6from unit.option import option 7 8 9class TestApplicationTLS(TestApplicationProto): 10 def setup_method(self): 11 self.context = ssl.create_default_context() 12 self.context.check_hostname = False 13 self.context.verify_mode = ssl.CERT_NONE 14 15 def certificate(self, name='default', load=True): 16 self.openssl_conf() 17 18 subprocess.check_output( 19 [ 20 'openssl', 21 'req', 22 '-x509', 23 '-new', 24 '-subj', 25 f'/CN={name}/', 26 '-config', 27 f'{option.temp_dir}/openssl.conf', 28 '-out', 29 f'{option.temp_dir}/{name}.crt', 30 '-keyout', 31 f'{option.temp_dir}/{name}.key', 32 ], 33 stderr=subprocess.STDOUT, 34 ) 35 36 if load: 37 self.certificate_load(name) 38 39 def certificate_load(self, crt, key=None): 40 if key is None: 41 key = crt 42 43 key_path = f'{option.temp_dir}/{key}.key' 44 crt_path = f'{option.temp_dir}/{crt}.crt' 45 46 with open(key_path, 'rb') as k, open(crt_path, 'rb') as c: 47 return self.conf(k.read() + c.read(), f'/certificates/{crt}') 48 49 def get_ssl(self, **kwargs): 50 return self.get(wrapper=self.context.wrap_socket, **kwargs) 51 52 def post_ssl(self, **kwargs): 53 return self.post(wrapper=self.context.wrap_socket, **kwargs) 54 55 def openssl_conf(self, rewrite=False, alt_names=None): 56 alt_names = alt_names or [] 57 conf_path = f'{option.temp_dir}/openssl.conf' 58 59 if not rewrite and os.path.exists(conf_path): 60 return 61 62 # Generates alt_names section with dns names 63 a_names = '[alt_names]\n' 64 for i, k in enumerate(alt_names, 1): 65 k = k.split('|') 66 67 if k[0] == 'IP': 68 a_names += f'IP.{i} = {k[1]}\n' 69 else: 70 a_names += f'DNS.{i} = {k[0]}\n' 71 72 # Generates section for sign request extension 73 a_sec = f'''req_extensions = myca_req_extensions 74 75[ myca_req_extensions ] 76subjectAltName = @alt_names 77 78{a_names}''' 79 80 with open(conf_path, 'w') as f: 81 f.write( 82 f'''[ req ] 83default_bits = 2048 84encrypt_key = no 85distinguished_name = req_distinguished_name 86 87{a_sec if alt_names else ""} 88[ req_distinguished_name ]''' 89 ) 90 91 def load(self, script, name=None): 92 if name is None: 93 name = script 94 95 script_path = f'{option.test_dir}/python/{script}' 96 self._load_conf( 97 { 98 "listeners": {"*:7080": {"pass": f"applications/{name}"}}, 99 "applications": { 100 name: { 101 "type": "python", 102 "processes": {"spare": 0}, 103 "path": script_path, 104 "working_directory": script_path, 105 "module": "wsgi", 106 } 107 }, 108 } 109 ) 110