1import ssl 2import subprocess 3 4from unit.applications.tls import TestApplicationTLS 5from unit.option import option 6 7 8class TestTLSSNI(TestApplicationTLS): 9 prerequisites = {'modules': {'openssl': 'any'}} 10 11 def setup_method(self): 12 self._load_conf( 13 { 14 "listeners": {"*:7080": {"pass": "routes"}}, 15 "routes": [{"action": {"return": 200}}], 16 "applications": {}, 17 } 18 ) 19 20 def openssl_date_to_sec_epoch(self, date): 21 return self.date_to_sec_epoch(date, '%b %d %X %Y %Z') 22 23 def add_tls(self, cert='default'): 24 assert 'success' in self.conf( 25 {"pass": "routes", "tls": {"certificate": cert}}, 26 'listeners/*:7080', 27 ) 28 29 def remove_tls(self): 30 assert 'success' in self.conf({"pass": "routes"}, 'listeners/*:7080') 31 32 def generate_ca_conf(self): 33 with open(option.temp_dir + '/ca.conf', 'w') as f: 34 f.write( 35 """[ ca ] 36default_ca = myca 37 38[ myca ] 39new_certs_dir = %(dir)s 40database = %(database)s 41default_md = sha256 42policy = myca_policy 43serial = %(certserial)s 44default_days = 1 45x509_extensions = myca_extensions 46copy_extensions = copy 47 48[ myca_policy ] 49commonName = optional 50 51[ myca_extensions ] 52basicConstraints = critical,CA:TRUE""" 53 % { 54 'dir': option.temp_dir, 55 'database': option.temp_dir + '/certindex', 56 'certserial': option.temp_dir + '/certserial', 57 } 58 ) 59 60 with open(option.temp_dir + '/certserial', 'w') as f: 61 f.write('1000') 62 63 with open(option.temp_dir + '/certindex', 'w') as f: 64 f.write('') 65 66 def config_bundles(self, bundles): 67 self.certificate('root', False) 68 69 for b in bundles: 70 self.openssl_conf(rewrite=True, alt_names=bundles[b]['alt_names']) 71 subj = ( 72 '/CN={}/'.format(bundles[b]['subj']) 73 if 'subj' in bundles[b] 74 else '/' 75 ) 76 77 subprocess.check_output( 78 [ 79 'openssl', 80 'req', 81 '-new', 82 '-subj', 83 subj, 84 '-config', 85 option.temp_dir + '/openssl.conf', 86 '-out', 87 option.temp_dir + '/{}.csr'.format(b), 88 '-keyout', 89 option.temp_dir + '/{}.key'.format(b), 90 ], 91 stderr=subprocess.STDOUT, 92 ) 93 94 self.generate_ca_conf() 95 96 for b in bundles: 97 subj = ( 98 '/CN={}/'.format(bundles[b]['subj']) 99 if 'subj' in bundles[b] 100 else '/' 101 ) 102 103 subprocess.check_output( 104 [ 105 'openssl', 106 'ca', 107 '-batch', 108 '-subj', 109 subj, 110 '-config', 111 option.temp_dir + '/ca.conf', 112 '-keyfile', 113 option.temp_dir + '/root.key', 114 '-cert', 115 option.temp_dir + '/root.crt', 116 '-in', 117 option.temp_dir + '/{}.csr'.format(b), 118 '-out', 119 option.temp_dir + '/{}.crt'.format(b), 120 ], 121 stderr=subprocess.STDOUT, 122 ) 123 124 self.context = ssl.create_default_context() 125 self.context.check_hostname = False 126 self.context.verify_mode = ssl.CERT_REQUIRED 127 self.context.load_verify_locations(option.temp_dir + '/root.crt') 128 129 self.load_certs(bundles) 130 131 def load_certs(self, bundles): 132 for bname, bvalue in bundles.items(): 133 assert 'success' in self.certificate_load( 134 bname, bname 135 ), 'certificate {} upload'.format(bvalue['subj']) 136 137 def check_cert(self, host, expect): 138 resp, sock = self.get_ssl( 139 headers={ 140 'Host': host, 141 'Content-Length': '0', 142 'Connection': 'close', 143 }, 144 start=True, 145 ) 146 147 assert resp['status'] == 200 148 assert sock.getpeercert()['subject'][0][0][1] == expect 149 150 def test_tls_sni(self): 151 bundles = { 152 "default": {"subj": "default", "alt_names": ["default"]}, 153 "localhost.com": { 154 "subj": "localhost.com", 155 "alt_names": ["alt1.localhost.com"], 156 }, 157 "example.com": { 158 "subj": "example.com", 159 "alt_names": ["alt1.example.com", "alt2.example.com"], 160 }, 161 } 162 self.config_bundles(bundles) 163 self.add_tls(["default", "localhost.com", "example.com"]) 164 165 self.check_cert('alt1.localhost.com', bundles['localhost.com']['subj']) 166 self.check_cert('alt2.example.com', bundles['example.com']['subj']) 167 self.check_cert('blah', bundles['default']['subj']) 168 169 def test_tls_sni_no_hostname(self): 170 bundles = { 171 "localhost.com": {"subj": "localhost.com", "alt_names": []}, 172 "example.com": { 173 "subj": "example.com", 174 "alt_names": ["example.com"], 175 }, 176 } 177 self.config_bundles(bundles) 178 self.add_tls(["localhost.com", "example.com"]) 179 180 resp, sock = self.get_ssl( 181 headers={'Content-Length': '0', 'Connection': 'close'}, 182 start=True, 183 ) 184 assert resp['status'] == 200 185 assert ( 186 sock.getpeercert()['subject'][0][0][1] 187 == bundles['localhost.com']['subj'] 188 ) 189 190 def test_tls_sni_upper_case(self): 191 bundles = { 192 "localhost.com": {"subj": "LOCALHOST.COM", "alt_names": []}, 193 "example.com": { 194 "subj": "example.com", 195 "alt_names": ["ALT1.EXAMPLE.COM", "*.ALT2.EXAMPLE.COM"], 196 }, 197 } 198 self.config_bundles(bundles) 199 self.add_tls(["localhost.com", "example.com"]) 200 201 self.check_cert('localhost.com', bundles['localhost.com']['subj']) 202 self.check_cert('LOCALHOST.COM', bundles['localhost.com']['subj']) 203 self.check_cert('EXAMPLE.COM', bundles['localhost.com']['subj']) 204 self.check_cert('ALT1.EXAMPLE.COM', bundles['example.com']['subj']) 205 self.check_cert('WWW.ALT2.EXAMPLE.COM', bundles['example.com']['subj']) 206 207 def test_tls_sni_only_bundle(self): 208 bundles = { 209 "localhost.com": { 210 "subj": "localhost.com", 211 "alt_names": ["alt1.localhost.com", "alt2.localhost.com"], 212 } 213 } 214 self.config_bundles(bundles) 215 self.add_tls(["localhost.com"]) 216 217 self.check_cert('domain.com', bundles['localhost.com']['subj']) 218 self.check_cert('alt1.domain.com', bundles['localhost.com']['subj']) 219 220 def test_tls_sni_wildcard(self): 221 bundles = { 222 "localhost.com": {"subj": "localhost.com", "alt_names": []}, 223 "example.com": { 224 "subj": "example.com", 225 "alt_names": ["*.example.com", "*.alt.example.com"], 226 }, 227 } 228 self.config_bundles(bundles) 229 self.add_tls(["localhost.com", "example.com"]) 230 231 self.check_cert('example.com', bundles['localhost.com']['subj']) 232 self.check_cert('www.example.com', bundles['example.com']['subj']) 233 self.check_cert('alt.example.com', bundles['example.com']['subj']) 234 self.check_cert('www.alt.example.com', bundles['example.com']['subj']) 235 self.check_cert('www.alt.example.ru', bundles['localhost.com']['subj']) 236 237 def test_tls_sni_duplicated_bundle(self): 238 bundles = { 239 "localhost.com": { 240 "subj": "localhost.com", 241 "alt_names": ["localhost.com", "alt2.localhost.com"], 242 } 243 } 244 self.config_bundles(bundles) 245 self.add_tls(["localhost.com", "localhost.com"]) 246 247 self.check_cert('localhost.com', bundles['localhost.com']['subj']) 248 self.check_cert('alt2.localhost.com', bundles['localhost.com']['subj']) 249 250 def test_tls_sni_same_alt(self): 251 bundles = { 252 "localhost": {"subj": "subj1", "alt_names": "same.altname.com"}, 253 "example": {"subj": "subj2", "alt_names": "same.altname.com"}, 254 } 255 self.config_bundles(bundles) 256 self.add_tls(["localhost", "example"]) 257 258 self.check_cert('localhost', bundles['localhost']['subj']) 259 self.check_cert('example', bundles['localhost']['subj']) 260 261 def test_tls_sni_empty_cn(self): 262 bundles = {"localhost": {"alt_names": ["alt.localhost.com"]}} 263 self.config_bundles(bundles) 264 self.add_tls(["localhost"]) 265 266 resp, sock = self.get_ssl( 267 headers={ 268 'Host': 'domain.com', 269 'Content-Length': '0', 270 'Connection': 'close', 271 }, 272 start=True, 273 ) 274 275 assert resp['status'] == 200 276 assert sock.getpeercert()['subjectAltName'][0][1] == 'alt.localhost.com' 277 278 def test_tls_sni_invalid(self): 279 self.config_bundles({"localhost": {"subj": "subj1", "alt_names": ''}}) 280 self.add_tls(["localhost"]) 281 282 def check_certificate(cert): 283 assert 'error' in self.conf( 284 {"pass": "routes", "tls": {"certificate": cert}}, 285 'listeners/*:7080', 286 ) 287 288 check_certificate('') 289 check_certificate('blah') 290 check_certificate([]) 291 check_certificate(['blah']) 292 check_certificate(['localhost', 'blah']) 293 check_certificate(['localhost', []]) 294