1import ssl 2import subprocess 3 4from unit.applications.tls import TestApplicationTLS 5from unit.option import option 6 7 8class TestTLSSNI(TestApplicationTLS): 9 prerequisites = {'modules': {'openssl': 'any'}} 10 11 def setup_method(self): 12 self._load_conf( 13 { 14 "listeners": {"*:7080": {"pass": "routes"}}, 15 "routes": [{"action": {"return": 200}}], 16 "applications": {}, 17 } 18 ) 19 20 def openssl_date_to_sec_epoch(self, date): 21 return self.date_to_sec_epoch(date, '%b %d %H:%M:%S %Y %Z') 22 23 def add_tls(self, cert='default'): 24 assert 'success' in self.conf( 25 {"pass": "routes", "tls": {"certificate": cert}}, 26 'listeners/*:7080', 27 ) 28 29 def remove_tls(self): 30 assert 'success' in self.conf({"pass": "routes"}, 'listeners/*:7080') 31 32 def generate_ca_conf(self): 33 with open(option.temp_dir + '/ca.conf', 'w') as f: 34 f.write( 35 """[ ca ] 36default_ca = myca 37 38[ myca ] 39new_certs_dir = %(dir)s 40database = %(database)s 41default_md = sha256 42policy = myca_policy 43serial = %(certserial)s 44default_days = 1 45x509_extensions = myca_extensions 46copy_extensions = copy 47 48[ myca_policy ] 49commonName = optional 50 51[ myca_extensions ] 52basicConstraints = critical,CA:TRUE""" 53 % { 54 'dir': option.temp_dir, 55 'database': option.temp_dir + '/certindex', 56 'certserial': option.temp_dir + '/certserial', 57 } 58 ) 59 60 with open(option.temp_dir + '/certserial', 'w') as f: 61 f.write('1000') 62 63 with open(option.temp_dir + '/certindex', 'w') as f: 64 f.write('') 65 66 def config_bundles(self, bundles): 67 self.certificate('root', False) 68 69 for b in bundles: 70 self.openssl_conf(rewrite=True, alt_names=bundles[b]['alt_names']) 71 subj = ( 72 '/CN={}/'.format(bundles[b]['subj']) 73 if 'subj' in bundles[b] 74 else '/' 75 ) 76 77 subprocess.call( 78 [ 79 'openssl', 80 'req', 81 '-new', 82 '-subj', 83 subj, 84 '-config', 85 option.temp_dir + '/openssl.conf', 86 '-out', 87 option.temp_dir + '/{}.csr'.format(b), 88 '-keyout', 89 option.temp_dir + '/{}.key'.format(b), 90 ], 91 stderr=subprocess.STDOUT, 92 ) 93 94 self.generate_ca_conf() 95 96 for b in bundles: 97 subj = ( 98 '/CN={}/'.format(bundles[b]['subj']) 99 if 'subj' in bundles[b] 100 else '/' 101 ) 102 103 subprocess.call( 104 [ 105 'openssl', 106 'ca', 107 '-batch', 108 '-subj', 109 subj, 110 '-config', 111 option.temp_dir + '/ca.conf', 112 '-keyfile', 113 option.temp_dir + '/root.key', 114 '-cert', 115 option.temp_dir + '/root.crt', 116 '-in', 117 option.temp_dir + '/{}.csr'.format(b), 118 '-out', 119 option.temp_dir + '/{}.crt'.format(b), 120 ], 121 stderr=subprocess.STDOUT, 122 ) 123 124 self.context = ssl.create_default_context() 125 self.context.check_hostname = False 126 self.context.verify_mode = ssl.CERT_REQUIRED 127 self.context.load_verify_locations(option.temp_dir + '/root.crt') 128 129 self.load_certs(bundles) 130 131 def load_certs(self, bundles): 132 for bname, bvalue in bundles.items(): 133 assert 'success' in self.certificate_load( 134 bname, bname 135 ), 'certificate {} upload'.format(bvalue['subj']) 136 137 def check_cert(self, host, expect): 138 resp, sock = self.get_ssl( 139 headers={ 140 'Host': host, 141 'Content-Length': '0', 142 'Connection': 'close', 143 }, 144 start=True, 145 ) 146 147 assert resp['status'] == 200 148 assert sock.getpeercert()['subject'][0][0][1] == expect 149 150 def test_tls_sni(self): 151 bundles = { 152 "default": {"subj": "default", "alt_names": ["default"]}, 153 "localhost.com": { 154 "subj": "localhost.com", 155 "alt_names": ["alt1.localhost.com"], 156 }, 157 "example.com": { 158 "subj": "example.com", 159 "alt_names": ["alt1.example.com", "alt2.example.com"], 160 }, 161 } 162 self.config_bundles(bundles) 163 self.add_tls(["default", "localhost.com", "example.com"]) 164 165 self.check_cert('alt1.localhost.com', bundles['localhost.com']['subj']) 166 self.check_cert('alt2.example.com', bundles['example.com']['subj']) 167 self.check_cert('blah', bundles['default']['subj']) 168 169 def test_tls_sni_no_hostname(self): 170 bundles = { 171 "localhost.com": {"subj": "localhost.com", "alt_names": []}, 172 "example.com": { 173 "subj": "example.com", 174 "alt_names": ["example.com"], 175 }, 176 } 177 self.config_bundles(bundles) 178 self.add_tls(["localhost.com", "example.com"]) 179 180 resp, sock = self.get_ssl( 181 headers={'Content-Length': '0', 'Connection': 'close'}, start=True, 182 ) 183 assert resp['status'] == 200 184 assert ( 185 sock.getpeercert()['subject'][0][0][1] 186 == bundles['localhost.com']['subj'] 187 ) 188 189 def test_tls_sni_upper_case(self): 190 bundles = { 191 "localhost.com": {"subj": "LOCALHOST.COM", "alt_names": []}, 192 "example.com": { 193 "subj": "example.com", 194 "alt_names": ["ALT1.EXAMPLE.COM", "*.ALT2.EXAMPLE.COM"], 195 }, 196 } 197 self.config_bundles(bundles) 198 self.add_tls(["localhost.com", "example.com"]) 199 200 self.check_cert('localhost.com', bundles['localhost.com']['subj']) 201 self.check_cert('LOCALHOST.COM', bundles['localhost.com']['subj']) 202 self.check_cert('EXAMPLE.COM', bundles['localhost.com']['subj']) 203 self.check_cert('ALT1.EXAMPLE.COM', bundles['example.com']['subj']) 204 self.check_cert('WWW.ALT2.EXAMPLE.COM', bundles['example.com']['subj']) 205 206 def test_tls_sni_only_bundle(self): 207 bundles = { 208 "localhost.com": { 209 "subj": "localhost.com", 210 "alt_names": ["alt1.localhost.com", "alt2.localhost.com"], 211 } 212 } 213 self.config_bundles(bundles) 214 self.add_tls(["localhost.com"]) 215 216 self.check_cert('domain.com', bundles['localhost.com']['subj']) 217 self.check_cert('alt1.domain.com', bundles['localhost.com']['subj']) 218 219 def test_tls_sni_wildcard(self): 220 bundles = { 221 "localhost.com": {"subj": "localhost.com", "alt_names": []}, 222 "example.com": { 223 "subj": "example.com", 224 "alt_names": ["*.example.com", "*.alt.example.com"], 225 }, 226 } 227 self.config_bundles(bundles) 228 self.add_tls(["localhost.com", "example.com"]) 229 230 self.check_cert('example.com', bundles['localhost.com']['subj']) 231 self.check_cert('www.example.com', bundles['example.com']['subj']) 232 self.check_cert('alt.example.com', bundles['example.com']['subj']) 233 self.check_cert('www.alt.example.com', bundles['example.com']['subj']) 234 self.check_cert('www.alt.example.ru', bundles['localhost.com']['subj']) 235 236 def test_tls_sni_duplicated_bundle(self): 237 bundles = { 238 "localhost.com": { 239 "subj": "localhost.com", 240 "alt_names": ["localhost.com", "alt2.localhost.com"], 241 } 242 } 243 self.config_bundles(bundles) 244 self.add_tls(["localhost.com", "localhost.com"]) 245 246 self.check_cert('localhost.com', bundles['localhost.com']['subj']) 247 self.check_cert('alt2.localhost.com', bundles['localhost.com']['subj']) 248 249 def test_tls_sni_same_alt(self): 250 bundles = { 251 "localhost": {"subj": "subj1", "alt_names": "same.altname.com"}, 252 "example": {"subj": "subj2", "alt_names": "same.altname.com"}, 253 } 254 self.config_bundles(bundles) 255 self.add_tls(["localhost", "example"]) 256 257 self.check_cert('localhost', bundles['localhost']['subj']) 258 self.check_cert('example', bundles['localhost']['subj']) 259 260 def test_tls_sni_empty_cn(self): 261 bundles = {"localhost": {"alt_names": ["alt.localhost.com"]}} 262 self.config_bundles(bundles) 263 self.add_tls(["localhost"]) 264 265 resp, sock = self.get_ssl( 266 headers={ 267 'Host': 'domain.com', 268 'Content-Length': '0', 269 'Connection': 'close', 270 }, 271 start=True, 272 ) 273 274 assert resp['status'] == 200 275 assert ( 276 sock.getpeercert()['subjectAltName'][0][1] == 'alt.localhost.com' 277 ) 278 279 def test_tls_sni_invalid(self): 280 self.config_bundles({"localhost": {"subj": "subj1", "alt_names": ''}}) 281 self.add_tls(["localhost"]) 282 283 def check_certificate(cert): 284 assert 'error' in self.conf( 285 {"pass": "routes", "tls": {"certificate": cert}}, 286 'listeners/*:7080', 287 ) 288 289 check_certificate('') 290 check_certificate('blah') 291 check_certificate([]) 292 check_certificate(['blah']) 293 check_certificate(['localhost', 'blah']) 294 check_certificate(['localhost', []]) 295