1import os 2import ssl 3import subprocess 4 5from unit.applications.proto import TestApplicationProto 6from unit.option import option 7 8 9class TestApplicationTLS(TestApplicationProto): 10 def setup_method(self): 11 self.context = ssl.create_default_context() 12 self.context.check_hostname = False 13 self.context.verify_mode = ssl.CERT_NONE 14 15 def certificate(self, name='default', load=True): 16 self.openssl_conf() 17 18 subprocess.call( 19 [ 20 'openssl', 21 'req', 22 '-x509', 23 '-new', 24 '-subj', 25 '/CN=' + name + '/', 26 '-config', 27 option.temp_dir + '/openssl.conf', 28 '-out', 29 option.temp_dir + '/' + name + '.crt', 30 '-keyout', 31 option.temp_dir + '/' + name + '.key', 32 ], 33 stderr=subprocess.STDOUT, 34 ) 35 36 if load: 37 self.certificate_load(name) 38 39 def certificate_load(self, crt, key=None): 40 if key is None: 41 key = crt 42 43 key_path = option.temp_dir + '/' + key + '.key' 44 crt_path = option.temp_dir + '/' + crt + '.crt' 45 46 with open(key_path, 'rb') as k, open(crt_path, 'rb') as c: 47 return self.conf(k.read() + c.read(), '/certificates/' + crt) 48 49 def get_ssl(self, **kwargs): 50 return self.get(wrapper=self.context.wrap_socket, **kwargs) 51 52 def post_ssl(self, **kwargs): 53 return self.post(wrapper=self.context.wrap_socket, **kwargs) 54 55 def get_server_certificate(self, addr=('127.0.0.1', 7080)): 56 57 ssl_list = dir(ssl) 58 59 if 'PROTOCOL_TLS' in ssl_list: 60 ssl_version = ssl.PROTOCOL_TLS 61 62 elif 'PROTOCOL_TLSv1_2' in ssl_list: 63 ssl_version = ssl.PROTOCOL_TLSv1_2 64 65 else: 66 ssl_version = ssl.PROTOCOL_TLSv1_1 67 68 return ssl.get_server_certificate(addr, ssl_version=ssl_version) 69 70 def openssl_conf(self, rewrite=False, alt_names=[]): 71 conf_path = option.temp_dir + '/openssl.conf' 72 73 if not rewrite and os.path.exists(conf_path): 74 return 75 76 # Generates alt_names section with dns names 77 a_names = "[alt_names]\n" 78 for i, k in enumerate(alt_names, 1): 79 k = k.split('|') 80 81 if k[0] == 'IP': 82 a_names += "IP.%d = %s\n" % (i, k[1]) 83 else: 84 a_names += "DNS.%d = %s\n" % (i, k[0]) 85 86 # Generates section for sign request extension 87 a_sec = """req_extensions = myca_req_extensions 88 89[ myca_req_extensions ] 90subjectAltName = @alt_names 91 92{a_names}""".format( 93 a_names=a_names 94 ) 95 96 with open(conf_path, 'w') as f: 97 f.write( 98 """[ req ] 99default_bits = 2048 100encrypt_key = no 101distinguished_name = req_distinguished_name 102 103{a_sec} 104[ req_distinguished_name ]""".format( 105 a_sec=a_sec if alt_names else "" 106 ) 107 ) 108 109 def load(self, script, name=None): 110 if name is None: 111 name = script 112 113 script_path = option.test_dir + '/python/' + script 114 115 self._load_conf( 116 { 117 "listeners": {"*:7080": {"pass": "applications/" + name}}, 118 "applications": { 119 name: { 120 "type": "python", 121 "processes": {"spare": 0}, 122 "path": script_path, 123 "working_directory": script_path, 124 "module": "wsgi", 125 } 126 }, 127 } 128 ) 129