1import ssl 2import subprocess 3 4import pytest 5 6from unit.applications.tls import ApplicationTLS 7from unit.option import option 8 9prerequisites = {'modules': {'openssl': 'any'}} 10 11client = ApplicationTLS() 12 13 14@pytest.fixture(autouse=True) 15def setup_method_fixture(): 16 assert 'success' in client.conf( 17 { 18 "listeners": {"*:8080": {"pass": "routes"}}, 19 "routes": [{"action": {"return": 200}}], 20 "applications": {}, 21 } 22 ) 23 24 25def add_tls(cert='default'): 26 assert 'success' in client.conf( 27 {"pass": "routes", "tls": {"certificate": cert}}, 28 'listeners/*:8080', 29 ) 30 31 32def check_cert(host, expect, ctx): 33 resp, sock = client.get_ssl( 34 headers={ 35 'Host': host, 36 'Content-Length': '0', 37 'Connection': 'close', 38 }, 39 start=True, 40 context=ctx, 41 ) 42 43 assert resp['status'] == 200 44 assert sock.getpeercert()['subject'][0][0][1] == expect 45 46 47def config_bundles(bundles): 48 client.certificate('root', False) 49 50 for b in bundles: 51 client.openssl_conf(rewrite=True, alt_names=bundles[b]['alt_names']) 52 subj = f'/CN={bundles[b]["subj"]}/' if 'subj' in bundles[b] else '/' 53 54 subprocess.check_output( 55 [ 56 'openssl', 57 'req', 58 '-new', 59 '-subj', 60 subj, 61 '-config', 62 f'{option.temp_dir}/openssl.conf', 63 '-out', 64 f'{option.temp_dir}/{b}.csr', 65 '-keyout', 66 f'{option.temp_dir}/{b}.key', 67 ], 68 stderr=subprocess.STDOUT, 69 ) 70 71 generate_ca_conf() 72 73 for b in bundles: 74 subj = f'/CN={bundles[b]["subj"]}/' if 'subj' in bundles[b] else '/' 75 76 subprocess.check_output( 77 [ 78 'openssl', 79 'ca', 80 '-batch', 81 '-subj', 82 subj, 83 '-config', 84 f'{option.temp_dir}/ca.conf', 85 '-keyfile', 86 f'{option.temp_dir}/root.key', 87 '-cert', 88 f'{option.temp_dir}/root.crt', 89 '-in', 90 f'{option.temp_dir}/{b}.csr', 91 '-out', 92 f'{option.temp_dir}/{b}.crt', 93 ], 94 stderr=subprocess.STDOUT, 95 ) 96 97 load_certs(bundles) 98 99 context = ssl.create_default_context() 100 context.check_hostname = False 101 context.verify_mode = ssl.CERT_REQUIRED 102 context.load_verify_locations(f'{option.temp_dir}/root.crt') 103 104 return context 105 106 107def generate_ca_conf(): 108 with open(f'{option.temp_dir}/ca.conf', 'w', encoding='utf-8') as f: 109 f.write( 110 f"""[ ca ] 111default_ca = myca 112 113[ myca ] 114new_certs_dir = {option.temp_dir} 115database = {option.temp_dir}/certindex 116default_md = sha256 117policy = myca_policy 118serial = {option.temp_dir}/certserial 119default_days = 1 120x509_extensions = myca_extensions 121copy_extensions = copy 122 123[ myca_policy ] 124commonName = optional 125 126[ myca_extensions ] 127basicConstraints = critical,CA:TRUE""" 128 ) 129 130 with open(f'{option.temp_dir}/certserial', 'w', encoding='utf-8') as f: 131 f.write('1000') 132 133 with open(f'{option.temp_dir}/certindex', 'w', encoding='utf-8') as f: 134 f.write('') 135 136 137def load_certs(bundles): 138 for bname, bvalue in bundles.items(): 139 assert 'success' in client.certificate_load( 140 bname, bname 141 ), f'certificate {bvalue["subj"]} upload' 142 143 144def remove_tls(): 145 assert 'success' in client.conf({"pass": "routes"}, 'listeners/*:8080') 146 147 148def test_tls_sni(): 149 bundles = { 150 "default": {"subj": "default", "alt_names": ["default"]}, 151 "localhost.com": { 152 "subj": "localhost.com", 153 "alt_names": ["alt1.localhost.com"], 154 }, 155 "example.com": { 156 "subj": "example.com", 157 "alt_names": ["alt1.example.com", "alt2.example.com"], 158 }, 159 } 160 ctx = config_bundles(bundles) 161 add_tls(["default", "localhost.com", "example.com"]) 162 163 check_cert('alt1.localhost.com', bundles['localhost.com']['subj'], ctx) 164 check_cert('alt2.example.com', bundles['example.com']['subj'], ctx) 165 check_cert('blah', bundles['default']['subj'], ctx) 166 167 168def test_tls_sni_no_hostname(): 169 bundles = { 170 "localhost.com": {"subj": "localhost.com", "alt_names": []}, 171 "example.com": { 172 "subj": "example.com", 173 "alt_names": ["example.com"], 174 }, 175 } 176 ctx = config_bundles(bundles) 177 add_tls(["localhost.com", "example.com"]) 178 179 resp, sock = client.get_ssl( 180 headers={'Content-Length': '0', 'Connection': 'close'}, 181 start=True, 182 context=ctx, 183 ) 184 assert resp['status'] == 200 185 assert ( 186 sock.getpeercert()['subject'][0][0][1] 187 == bundles['localhost.com']['subj'] 188 ) 189 190 191def test_tls_sni_upper_case(): 192 bundles = { 193 "localhost.com": {"subj": "LOCALHOST.COM", "alt_names": []}, 194 "example.com": { 195 "subj": "example.com", 196 "alt_names": ["ALT1.EXAMPLE.COM", "*.ALT2.EXAMPLE.COM"], 197 }, 198 } 199 ctx = config_bundles(bundles) 200 add_tls(["localhost.com", "example.com"]) 201 202 check_cert('localhost.com', bundles['localhost.com']['subj'], ctx) 203 check_cert('LOCALHOST.COM', bundles['localhost.com']['subj'], ctx) 204 check_cert('EXAMPLE.COM', bundles['localhost.com']['subj'], ctx) 205 check_cert('ALT1.EXAMPLE.COM', bundles['example.com']['subj'], ctx) 206 check_cert('WWW.ALT2.EXAMPLE.COM', bundles['example.com']['subj'], ctx) 207 208 209def test_tls_sni_only_bundle(): 210 bundles = { 211 "localhost.com": { 212 "subj": "localhost.com", 213 "alt_names": ["alt1.localhost.com", "alt2.localhost.com"], 214 } 215 } 216 ctx = config_bundles(bundles) 217 add_tls(["localhost.com"]) 218 219 check_cert('domain.com', bundles['localhost.com']['subj'], ctx) 220 check_cert('alt1.domain.com', bundles['localhost.com']['subj'], ctx) 221 222 223def test_tls_sni_wildcard(): 224 bundles = { 225 "localhost.com": {"subj": "localhost.com", "alt_names": []}, 226 "example.com": { 227 "subj": "example.com", 228 "alt_names": ["*.example.com", "*.alt.example.com"], 229 }, 230 } 231 ctx = config_bundles(bundles) 232 add_tls(["localhost.com", "example.com"]) 233 234 check_cert('example.com', bundles['localhost.com']['subj'], ctx) 235 check_cert('www.example.com', bundles['example.com']['subj'], ctx) 236 check_cert('alt.example.com', bundles['example.com']['subj'], ctx) 237 check_cert('www.alt.example.com', bundles['example.com']['subj'], ctx) 238 check_cert('www.alt.example.ru', bundles['localhost.com']['subj'], ctx) 239 240 241def test_tls_sni_duplicated_bundle(): 242 bundles = { 243 "localhost.com": { 244 "subj": "localhost.com", 245 "alt_names": ["localhost.com", "alt2.localhost.com"], 246 } 247 } 248 ctx = config_bundles(bundles) 249 add_tls(["localhost.com", "localhost.com"]) 250 251 check_cert('localhost.com', bundles['localhost.com']['subj'], ctx) 252 check_cert('alt2.localhost.com', bundles['localhost.com']['subj'], ctx) 253 254 255def test_tls_sni_same_alt(): 256 bundles = { 257 "localhost": {"subj": "subj1", "alt_names": "same.altname.com"}, 258 "example": {"subj": "subj2", "alt_names": "same.altname.com"}, 259 } 260 ctx = config_bundles(bundles) 261 add_tls(["localhost", "example"]) 262 263 check_cert('localhost', bundles['localhost']['subj'], ctx) 264 check_cert('example', bundles['localhost']['subj'], ctx) 265 266 267def test_tls_sni_empty_cn(): 268 bundles = {"localhost": {"alt_names": ["alt.localhost.com"]}} 269 ctx = config_bundles(bundles) 270 add_tls(["localhost"]) 271 272 resp, sock = client.get_ssl( 273 headers={ 274 'Host': 'domain.com', 275 'Content-Length': '0', 276 'Connection': 'close', 277 }, 278 start=True, 279 context=ctx, 280 ) 281 282 assert resp['status'] == 200 283 assert sock.getpeercert()['subjectAltName'][0][1] == 'alt.localhost.com' 284 285 286def test_tls_sni_invalid(): 287 _ = config_bundles({"localhost": {"subj": "subj1", "alt_names": ''}}) 288 add_tls(["localhost"]) 289 290 def check_certificate(cert): 291 assert 'error' in client.conf( 292 {"pass": "routes", "tls": {"certificate": cert}}, 293 'listeners/*:8080', 294 ) 295 296 check_certificate('') 297 check_certificate('blah') 298 check_certificate([]) 299 check_certificate(['blah']) 300 check_certificate(['localhost', 'blah']) 301 check_certificate(['localhost', []]) 302