1import ssl 2import subprocess 3 4import pytest 5 6from unit.applications.tls import TestApplicationTLS 7from unit.option import option 8 9 10class TestTLSSNI(TestApplicationTLS): 11 prerequisites = {'modules': {'openssl': 'any'}} 12 13 def setup_method(self): 14 self._load_conf( 15 { 16 "listeners": {"*:7080": {"pass": "routes"}}, 17 "routes": [{"action": {"return": 200}}], 18 "applications": {}, 19 } 20 ) 21 22 def openssl_date_to_sec_epoch(self, date): 23 return self.date_to_sec_epoch(date, '%b %d %H:%M:%S %Y %Z') 24 25 def add_tls(self, cert='default'): 26 assert 'success' in self.conf( 27 {"pass": "routes", "tls": {"certificate": cert}}, 28 'listeners/*:7080', 29 ) 30 31 def remove_tls(self): 32 assert 'success' in self.conf({"pass": "routes"}, 'listeners/*:7080') 33 34 def generate_ca_conf(self): 35 with open(option.temp_dir + '/ca.conf', 'w') as f: 36 f.write( 37 """[ ca ] 38default_ca = myca 39 40[ myca ] 41new_certs_dir = %(dir)s 42database = %(database)s 43default_md = sha256 44policy = myca_policy 45serial = %(certserial)s 46default_days = 1 47x509_extensions = myca_extensions 48copy_extensions = copy 49 50[ myca_policy ] 51commonName = optional 52 53[ myca_extensions ] 54basicConstraints = critical,CA:TRUE""" 55 % { 56 'dir': option.temp_dir, 57 'database': option.temp_dir + '/certindex', 58 'certserial': option.temp_dir + '/certserial', 59 } 60 ) 61 62 with open(option.temp_dir + '/certserial', 'w') as f: 63 f.write('1000') 64 65 with open(option.temp_dir + '/certindex', 'w') as f: 66 f.write('') 67 68 def config_bundles(self, bundles): 69 self.certificate('root', False) 70 71 for b in bundles: 72 self.openssl_conf(rewrite=True, alt_names=bundles[b]['alt_names']) 73 subj = ( 74 '/CN={}/'.format(bundles[b]['subj']) 75 if 'subj' in bundles[b] 76 else '/' 77 ) 78 79 subprocess.call( 80 [ 81 'openssl', 82 'req', 83 '-new', 84 '-subj', 85 subj, 86 '-config', 87 option.temp_dir + '/openssl.conf', 88 '-out', 89 option.temp_dir + '/{}.csr'.format(b), 90 '-keyout', 91 option.temp_dir + '/{}.key'.format(b), 92 ], 93 stderr=subprocess.STDOUT, 94 ) 95 96 self.generate_ca_conf() 97 98 for b in bundles: 99 subj = ( 100 '/CN={}/'.format(bundles[b]['subj']) 101 if 'subj' in bundles[b] 102 else '/' 103 ) 104 105 subprocess.call( 106 [ 107 'openssl', 108 'ca', 109 '-batch', 110 '-subj', 111 subj, 112 '-config', 113 option.temp_dir + '/ca.conf', 114 '-keyfile', 115 option.temp_dir + '/root.key', 116 '-cert', 117 option.temp_dir + '/root.crt', 118 '-in', 119 option.temp_dir + '/{}.csr'.format(b), 120 '-out', 121 option.temp_dir + '/{}.crt'.format(b), 122 ], 123 stderr=subprocess.STDOUT, 124 ) 125 126 self.context = ssl.create_default_context() 127 self.context.check_hostname = False 128 self.context.verify_mode = ssl.CERT_REQUIRED 129 self.context.load_verify_locations(option.temp_dir + '/root.crt') 130 131 self.load_certs(bundles) 132 133 def load_certs(self, bundles): 134 for bname, bvalue in bundles.items(): 135 assert 'success' in self.certificate_load( 136 bname, bname 137 ), 'certificate {} upload'.format(bvalue['subj']) 138 139 def check_cert(self, host, expect): 140 resp, sock = self.get_ssl( 141 headers={ 142 'Host': host, 143 'Content-Length': '0', 144 'Connection': 'close', 145 }, 146 start=True, 147 ) 148 149 assert resp['status'] == 200 150 assert sock.getpeercert()['subject'][0][0][1] == expect 151 152 def test_tls_sni(self): 153 bundles = { 154 "default": {"subj": "default", "alt_names": ["default"]}, 155 "localhost.com": { 156 "subj": "localhost.com", 157 "alt_names": ["alt1.localhost.com"], 158 }, 159 "example.com": { 160 "subj": "example.com", 161 "alt_names": ["alt1.example.com", "alt2.example.com"], 162 }, 163 } 164 self.config_bundles(bundles) 165 self.add_tls(["default", "localhost.com", "example.com"]) 166 167 self.check_cert('alt1.localhost.com', bundles['localhost.com']['subj']) 168 self.check_cert('alt2.example.com', bundles['example.com']['subj']) 169 self.check_cert('blah', bundles['default']['subj']) 170 171 def test_tls_sni_no_hostname(self): 172 bundles = { 173 "localhost.com": {"subj": "localhost.com", "alt_names": []}, 174 "example.com": { 175 "subj": "example.com", 176 "alt_names": ["example.com"], 177 }, 178 } 179 self.config_bundles(bundles) 180 self.add_tls(["localhost.com", "example.com"]) 181 182 resp, sock = self.get_ssl( 183 headers={'Content-Length': '0', 'Connection': 'close'}, start=True, 184 ) 185 assert resp['status'] == 200 186 assert ( 187 sock.getpeercert()['subject'][0][0][1] 188 == bundles['localhost.com']['subj'] 189 ) 190 191 def test_tls_sni_upper_case(self): 192 bundles = { 193 "localhost.com": {"subj": "LOCALHOST.COM", "alt_names": []}, 194 "example.com": { 195 "subj": "example.com", 196 "alt_names": ["ALT1.EXAMPLE.COM", "*.ALT2.EXAMPLE.COM"], 197 }, 198 } 199 self.config_bundles(bundles) 200 self.add_tls(["localhost.com", "example.com"]) 201 202 self.check_cert('localhost.com', bundles['localhost.com']['subj']) 203 self.check_cert('LOCALHOST.COM', bundles['localhost.com']['subj']) 204 self.check_cert('EXAMPLE.COM', bundles['localhost.com']['subj']) 205 self.check_cert('ALT1.EXAMPLE.COM', bundles['example.com']['subj']) 206 self.check_cert('WWW.ALT2.EXAMPLE.COM', bundles['example.com']['subj']) 207 208 def test_tls_sni_only_bundle(self): 209 bundles = { 210 "localhost.com": { 211 "subj": "localhost.com", 212 "alt_names": ["alt1.localhost.com", "alt2.localhost.com"], 213 } 214 } 215 self.config_bundles(bundles) 216 self.add_tls(["localhost.com"]) 217 218 self.check_cert('domain.com', bundles['localhost.com']['subj']) 219 self.check_cert('alt1.domain.com', bundles['localhost.com']['subj']) 220 221 def test_tls_sni_wildcard(self): 222 bundles = { 223 "localhost.com": {"subj": "localhost.com", "alt_names": []}, 224 "example.com": { 225 "subj": "example.com", 226 "alt_names": ["*.example.com", "*.alt.example.com"], 227 }, 228 } 229 self.config_bundles(bundles) 230 self.add_tls(["localhost.com", "example.com"]) 231 232 self.check_cert('example.com', bundles['localhost.com']['subj']) 233 self.check_cert('www.example.com', bundles['example.com']['subj']) 234 self.check_cert('alt.example.com', bundles['example.com']['subj']) 235 self.check_cert('www.alt.example.com', bundles['example.com']['subj']) 236 self.check_cert('www.alt.example.ru', bundles['localhost.com']['subj']) 237 238 def test_tls_sni_duplicated_bundle(self): 239 bundles = { 240 "localhost.com": { 241 "subj": "localhost.com", 242 "alt_names": ["localhost.com", "alt2.localhost.com"], 243 } 244 } 245 self.config_bundles(bundles) 246 self.add_tls(["localhost.com", "localhost.com"]) 247 248 self.check_cert('localhost.com', bundles['localhost.com']['subj']) 249 self.check_cert('alt2.localhost.com', bundles['localhost.com']['subj']) 250 251 def test_tls_sni_same_alt(self): 252 bundles = { 253 "localhost": {"subj": "subj1", "alt_names": "same.altname.com"}, 254 "example": {"subj": "subj2", "alt_names": "same.altname.com"}, 255 } 256 self.config_bundles(bundles) 257 self.add_tls(["localhost", "example"]) 258 259 self.check_cert('localhost', bundles['localhost']['subj']) 260 self.check_cert('example', bundles['localhost']['subj']) 261 262 def test_tls_sni_empty_cn(self): 263 bundles = {"localhost": {"alt_names": ["alt.localhost.com"]}} 264 self.config_bundles(bundles) 265 self.add_tls(["localhost"]) 266 267 resp, sock = self.get_ssl( 268 headers={ 269 'Host': 'domain.com', 270 'Content-Length': '0', 271 'Connection': 'close', 272 }, 273 start=True, 274 ) 275 276 assert resp['status'] == 200 277 assert ( 278 sock.getpeercert()['subjectAltName'][0][1] == 'alt.localhost.com' 279 ) 280 281 def test_tls_sni_invalid(self): 282 self.config_bundles({"localhost": {"subj": "subj1", "alt_names": ''}}) 283 self.add_tls(["localhost"]) 284 285 def check_certificate(cert): 286 assert 'error' in self.conf( 287 {"pass": "routes", "tls": {"certificate": cert}}, 288 'listeners/*:7080', 289 ) 290 291 check_certificate('') 292 check_certificate('blah') 293 check_certificate([]) 294 check_certificate(['blah']) 295 check_certificate(['localhost', 'blah']) 296 check_certificate(['localhost', []]) 297