Deleted
Added
1import io 2import re 3import ssl 4import subprocess 5 6import pytest 7 8from unit.applications.tls import TestApplicationTLS |
9from unit.option import option |
10 11 12class TestTLS(TestApplicationTLS): 13 prerequisites = {'modules': {'python': 'any', 'openssl': 'any'}} 14 15 def openssl_date_to_sec_epoch(self, date): 16 return self.date_to_sec_epoch(date, '%b %d %H:%M:%S %Y %Z') 17 --- 6 unchanged lines hidden (view full) --- 24 'listeners/*:' + str(port), 25 ) 26 27 def remove_tls(self, application='empty', port=7080): 28 assert 'success' in self.conf( 29 {"pass": "applications/" + application}, 'listeners/*:' + str(port) 30 ) 31 |
32 def req(self, name='localhost', subject=None, x509=False): 33 subj = subject if subject is not None else '/CN=' + name + '/' 34 35 subprocess.call( 36 [ 37 'openssl', 38 'req', 39 '-new', 40 '-subj', 41 subj, 42 '-config', 43 option.temp_dir + '/openssl.conf', 44 '-out', 45 option.temp_dir + '/' + name + '.csr', 46 '-keyout', 47 option.temp_dir + '/' + name + '.key', 48 ], 49 stderr=subprocess.STDOUT, 50 ) 51 52 def generate_ca_conf(self): 53 with open(option.temp_dir + '/ca.conf', 'w') as f: 54 f.write( 55 """[ ca ] 56default_ca = myca 57 58[ myca ] 59new_certs_dir = %(dir)s 60database = %(database)s 61default_md = sha256 62policy = myca_policy 63serial = %(certserial)s 64default_days = 1 65x509_extensions = myca_extensions 66copy_extensions = copy 67 68[ myca_policy ] 69commonName = optional 70 71[ myca_extensions ] 72basicConstraints = critical,CA:TRUE""" 73 % { 74 'dir': option.temp_dir, 75 'database': option.temp_dir + '/certindex', 76 'certserial': option.temp_dir + '/certserial', 77 } 78 ) 79 80 with open(option.temp_dir + '/certserial', 'w') as f: 81 f.write('1000') 82 83 with open(option.temp_dir + '/certindex', 'w') as f: 84 f.write('') 85 86 with open(option.temp_dir + '/certindex.attr', 'w') as f: 87 f.write('') 88 89 def ca(self, cert='root', out='localhost'): 90 subprocess.call( 91 [ 92 'openssl', 93 'ca', 94 '-batch', 95 '-config', 96 option.temp_dir + '/ca.conf', 97 '-keyfile', 98 option.temp_dir + '/' + cert + '.key', 99 '-cert', 100 option.temp_dir + '/' + cert + '.crt', 101 '-in', 102 option.temp_dir + '/' + out + '.csr', 103 '-out', 104 option.temp_dir + '/' + out + '.crt', 105 ], 106 stderr=subprocess.STDOUT, 107 ) 108 109 def set_certificate_req_context(self, cert='root'): 110 self.context = ssl.create_default_context() 111 self.context.check_hostname = False 112 self.context.verify_mode = ssl.CERT_REQUIRED 113 self.context.load_verify_locations( 114 option.temp_dir + '/' + cert + '.crt' 115 ) 116 |
117 def test_tls_listener_option_add(self): 118 self.load('empty') 119 120 self.certificate() 121 122 self.add_tls() 123 124 assert self.get_ssl()['status'] == 200, 'add listener option' --- 164 unchanged lines hidden (view full) --- 289 == 2592000 290 ), 'certificate validity until' 291 292 def test_tls_certificate_chain(self, temp_dir): 293 self.load('empty') 294 295 self.certificate('root', False) 296 |
297 self.req('int') 298 self.req('end') |
299 |
300 self.generate_ca_conf() |
301 |
302 self.ca(cert='root', out='int') 303 self.ca(cert='int', out='end') |
304 |
305 crt_path = temp_dir + '/end-int.crt' 306 end_path = temp_dir + '/end.crt' 307 int_path = temp_dir + '/int.crt' 308 309 with open(crt_path, 'wb') as crt, open(end_path, 'rb') as end, open( 310 int_path, 'rb' 311 ) as int: 312 crt.write(end.read() + int.read()) 313 |
314 self.set_certificate_req_context() |
315 316 # incomplete chain 317 318 assert 'success' in self.certificate_load( 319 'end', 'end' 320 ), 'certificate chain end upload' 321 322 chain = self.conf_get('/certificates/end/chain') --- 57 unchanged lines hidden (view full) --- 380 ), 'certificate chain end-int end issuer common name' 381 382 self.add_tls(cert='end-int') 383 384 assert ( 385 self.get_ssl()['status'] == 200 386 ), 'certificate chain intermediate server' 387 |
388 def test_tls_certificate_empty_cn(self, temp_dir): 389 self.certificate('root', False) 390 391 self.req(subject='/') 392 393 self.generate_ca_conf() 394 self.ca() 395 396 self.set_certificate_req_context() 397 398 assert 'success' in self.certificate_load('localhost', 'localhost') 399 400 cert = self.conf_get('/certificates/localhost') 401 assert cert['chain'][0]['subject'] == {}, 'empty subject' 402 assert cert['chain'][0]['issuer']['common_name'] == 'root', 'issuer' 403 404 def test_tls_certificate_empty_cn_san(self, temp_dir): 405 self.certificate('root', False) 406 407 self.openssl_conf( 408 rewrite=True, alt_names=["example.com", "www.example.net"] 409 ) 410 411 self.req(subject='/') 412 413 self.generate_ca_conf() 414 self.ca() 415 416 self.set_certificate_req_context() 417 418 assert 'success' in self.certificate_load('localhost', 'localhost') 419 420 cert = self.conf_get('/certificates/localhost') 421 assert cert['chain'][0]['subject'] == { 422 'alt_names': ['example.com', 'www.example.net'] 423 }, 'subject alt_names' 424 assert cert['chain'][0]['issuer']['common_name'] == 'root', 'issuer' 425 |
426 @pytest.mark.skip('not yet') 427 def test_tls_reconfigure(self): 428 self.load('empty') 429 430 assert self.get()['status'] == 200, 'init' 431 432 self.certificate() 433 --- 188 unchanged lines hidden --- |