1import re 2import ssl 3import time 4import subprocess 5import unittest 6import unit 7 8class TestUnitTLS(unit.TestUnitApplicationTLS): 9 10 def setUpClass(): 11 unit.TestUnit().check_modules('python', 'openssl') 12 13 def findall(self, pattern): 14 with open(self.testdir + '/unit.log', 'r', errors='ignore') as f: 15 return re.findall(pattern, f.read()) 16 17 def wait_for_record(self, pattern): 18 for i in range(50): 19 with open(self.testdir + '/unit.log', 'r', errors='ignore') as f: 20 if re.search(pattern, f.read()) is not None: 21 break 22 23 time.sleep(0.1) 24 25 def openssl_date_to_sec_epoch(self, date): 26 return self.date_to_sec_epoch(date, '%b %d %H:%M:%S %Y %Z') 27 28 def add_tls(self, application='empty', cert='default', port=7080): 29 self.conf({ 30 "application": application, 31 "tls": { 32 "certificate": cert 33 } 34 }, 'listeners/*:' + str(port)) 35 36 def remove_tls(self, application='empty', port=7080): 37 self.conf({ 38 "application": application 39 }, 'listeners/*:' + str(port)) 40 41 def test_tls_listener_option_add(self): 42 self.load('empty') 43 44 self.certificate() 45 46 self.add_tls() 47 48 self.assertEqual(self.get_ssl()['status'], 200, 'add listener option') 49 50 def test_tls_listener_option_remove(self): 51 self.load('empty') 52 53 self.certificate() 54 55 self.add_tls() 56 57 self.get_ssl() 58 59 self.remove_tls() 60 61 self.assertEqual(self.get()['status'], 200, 'remove listener option') 62 63 def test_tls_certificate_remove(self): 64 self.load('empty') 65 66 self.certificate() 67 68 self.assertIn('success', self.conf_delete('/certificates/default'), 69 'remove certificate') 70 71 def test_tls_certificate_remove_used(self): 72 self.load('empty') 73 74 self.certificate() 75 76 self.add_tls() 77 78 self.assertIn('error', self.conf_delete('/certificates/default'), 79 'remove certificate') 80 81 def test_tls_certificate_remove_nonexisting(self): 82 self.load('empty') 83 84 self.certificate() 85 86 self.add_tls() 87 88 self.assertIn('error', self.conf_delete('/certificates/blah'), 89 'remove nonexistings certificate') 90 91 @unittest.expectedFailure 92 def test_tls_certificate_update(self): 93 self.load('empty') 94 95 self.certificate() 96 97 self.add_tls() 98 99 cert_old = self.get_server_certificate() 100 101 self.certificate() 102 103 self.assertNotEqual(cert_old, self.get_server_certificate(), 104 'update certificate') 105 106 @unittest.expectedFailure 107 def test_tls_certificate_key_incorrect(self): 108 self.load('empty') 109 110 self.certificate('first', False) 111 self.certificate('second', False) 112 113 self.assertIn('error', self.certificate_load('first', 'second'), 114 'key incorrect') 115 116 def test_tls_certificate_change(self): 117 self.load('empty') 118 119 self.certificate() 120 self.certificate('new') 121 122 self.add_tls() 123 124 cert_old = self.get_server_certificate() 125 126 self.add_tls(cert='new') 127 128 self.assertNotEqual(cert_old, self.get_server_certificate(), 129 'change certificate') 130 131 def test_tls_certificate_key_rsa(self): 132 self.load('empty') 133 134 self.certificate() 135 136 self.assertEqual(self.conf_get('/certificates/default/key'), 137 'RSA (1024 bits)', 'certificate key rsa') 138 139 def test_tls_certificate_key_ec(self): 140 self.load('empty') 141 142 subprocess.call(['openssl', 'ecparam', '-noout', '-genkey', 143 '-out', self.testdir + '/ec.key', 144 '-name', 'prime256v1']) 145 146 subprocess.call(['openssl', 'req', '-x509', '-new', 147 '-config', self.testdir + '/openssl.conf', 148 '-key', self.testdir + '/ec.key', '-subj', '/CN=ec/', 149 '-out', self.testdir + '/ec.crt']) 150 151 self.certificate_load('ec') 152 153 self.assertEqual(self.conf_get('/certificates/ec/key'), 'ECDH', 154 'certificate key ec') 155 156 def test_tls_certificate_chain_options(self): 157 self.load('empty') 158 159 self.certificate() 160 161 chain = self.conf_get('/certificates/default/chain') 162 163 self.assertEqual(len(chain), 1, 'certificate chain length') 164 165 cert = chain[0] 166 167 self.assertEqual(cert['subject']['common_name'], 'default', 168 'certificate subject common name') 169 self.assertEqual(cert['issuer']['common_name'], 'default', 170 'certificate issuer common name') 171 172 self.assertLess(abs(self.sec_epoch() - 173 self.openssl_date_to_sec_epoch(cert['validity']['since'])), 5, 174 'certificate validity since') 175 self.assertEqual( 176 self.openssl_date_to_sec_epoch(cert['validity']['until']) - 177 self.openssl_date_to_sec_epoch(cert['validity']['since']), 2592000, 178 'certificate validity until') 179 180 def test_tls_certificate_chain(self): 181 self.load('empty') 182 183 self.certificate('root', False) 184 185 subprocess.call(['openssl', 'req', '-new', '-config', 186 self.testdir + '/openssl.conf', '-subj', '/CN=int/', 187 '-out', self.testdir + '/int.csr', 188 '-keyout', self.testdir + '/int.key']) 189 190 subprocess.call(['openssl', 'req', '-new', '-config', 191 self.testdir + '/openssl.conf', '-subj', '/CN=end/', 192 '-out', self.testdir + '/end.csr', 193 '-keyout', self.testdir + '/end.key']) 194 195 with open(self.testdir + '/ca.conf', 'w') as f: 196 f.write("""[ ca ] 197default_ca = myca 198 199[ myca ] 200new_certs_dir = %(dir)s 201database = %(database)s 202default_md = sha1 203policy = myca_policy 204serial = %(certserial)s 205default_days = 1 206x509_extensions = myca_extensions 207 208[ myca_policy ] 209commonName = supplied 210 211[ myca_extensions ] 212basicConstraints = critical,CA:TRUE""" % { 213 'dir': self.testdir, 214 'database': self.testdir + '/certindex', 215 'certserial': self.testdir + '/certserial' 216 }) 217 218 with open(self.testdir + '/certserial', 'w') as f: 219 f.write('1000') 220 221 with open(self.testdir + '/certindex', 'w') as f: 222 f.write('') 223 224 subprocess.call(['openssl', 'ca', '-batch', 225 '-config', self.testdir + '/ca.conf', 226 '-keyfile', self.testdir + '/root.key', 227 '-cert', self.testdir + '/root.crt', 228 '-subj', '/CN=int/', 229 '-in', self.testdir + '/int.csr', 230 '-out', self.testdir + '/int.crt']) 231 232 subprocess.call(['openssl', 'ca', '-batch', 233 '-config', self.testdir + '/ca.conf', 234 '-keyfile', self.testdir + '/int.key', 235 '-cert', self.testdir + '/int.crt', 236 '-subj', '/CN=end/', 237 '-in', self.testdir + '/end.csr', 238 '-out', self.testdir + '/end.crt']) 239 240 with open(self.testdir + '/end-int.crt', 'wb') as crt, \ 241 open(self.testdir + '/end.crt', 'rb') as end, \ 242 open(self.testdir + '/int.crt', 'rb') as int: 243 crt.write(end.read() + int.read()) 244 245 self.context = ssl.create_default_context() 246 self.context.check_hostname = False 247 self.context.verify_mode = ssl.CERT_REQUIRED 248 self.context.load_verify_locations(self.testdir + '/root.crt') 249 250 # incomplete chain 251 252 self.assertIn('success', self.certificate_load('end', 'end'), 253 'certificate chain end upload') 254 255 chain = self.conf_get('/certificates/end/chain') 256 self.assertEqual(len(chain), 1, 'certificate chain end length') 257 self.assertEqual(chain[0]['subject']['common_name'], 'end', 258 'certificate chain end subject common name') 259 self.assertEqual(chain[0]['issuer']['common_name'], 'int', 260 'certificate chain end issuer common name') 261 262 self.add_tls(cert='end') 263 264 try: 265 resp = self.get_ssl() 266 except ssl.SSLError: 267 resp = None 268 269 self.assertEqual(resp, None, 'certificate chain incomplete chain') 270 271 # intermediate 272 273 self.assertIn('success', self.certificate_load('int', 'int'), 274 'certificate chain int upload') 275 276 chain = self.conf_get('/certificates/int/chain') 277 self.assertEqual(len(chain), 1, 'certificate chain int length') 278 self.assertEqual(chain[0]['subject']['common_name'], 'int', 279 'certificate chain int subject common name') 280 self.assertEqual(chain[0]['issuer']['common_name'], 'root', 281 'certificate chain int issuer common name') 282 283 self.add_tls(cert='int') 284 285 self.assertEqual(self.get_ssl()['status'], 200, 286 'certificate chain intermediate') 287 288 # intermediate server 289 290 self.assertIn('success', self.certificate_load('end-int', 'end'), 291 'certificate chain end-int upload') 292 293 chain = self.conf_get('/certificates/end-int/chain') 294 self.assertEqual(len(chain), 2, 'certificate chain end-int length') 295 self.assertEqual(chain[0]['subject']['common_name'], 'end', 296 'certificate chain end-int int subject common name') 297 self.assertEqual(chain[0]['issuer']['common_name'], 'int', 298 'certificate chain end-int int issuer common name') 299 self.assertEqual(chain[1]['subject']['common_name'], 'int', 300 'certificate chain end-int end subject common name') 301 self.assertEqual(chain[1]['issuer']['common_name'], 'root', 302 'certificate chain end-int end issuer common name') 303 304 self.add_tls(cert='end-int') 305 306 self.assertEqual(self.get_ssl()['status'], 200, 307 'certificate chain intermediate server') 308 309 @unittest.expectedFailure 310 def test_tls_reconfigure(self): 311 self.load('empty') 312 313 self.certificate() 314 315 (resp, sock) = self.get(headers={
| 1import re 2import ssl 3import time 4import subprocess 5import unittest 6import unit 7 8class TestUnitTLS(unit.TestUnitApplicationTLS): 9 10 def setUpClass(): 11 unit.TestUnit().check_modules('python', 'openssl') 12 13 def findall(self, pattern): 14 with open(self.testdir + '/unit.log', 'r', errors='ignore') as f: 15 return re.findall(pattern, f.read()) 16 17 def wait_for_record(self, pattern): 18 for i in range(50): 19 with open(self.testdir + '/unit.log', 'r', errors='ignore') as f: 20 if re.search(pattern, f.read()) is not None: 21 break 22 23 time.sleep(0.1) 24 25 def openssl_date_to_sec_epoch(self, date): 26 return self.date_to_sec_epoch(date, '%b %d %H:%M:%S %Y %Z') 27 28 def add_tls(self, application='empty', cert='default', port=7080): 29 self.conf({ 30 "application": application, 31 "tls": { 32 "certificate": cert 33 } 34 }, 'listeners/*:' + str(port)) 35 36 def remove_tls(self, application='empty', port=7080): 37 self.conf({ 38 "application": application 39 }, 'listeners/*:' + str(port)) 40 41 def test_tls_listener_option_add(self): 42 self.load('empty') 43 44 self.certificate() 45 46 self.add_tls() 47 48 self.assertEqual(self.get_ssl()['status'], 200, 'add listener option') 49 50 def test_tls_listener_option_remove(self): 51 self.load('empty') 52 53 self.certificate() 54 55 self.add_tls() 56 57 self.get_ssl() 58 59 self.remove_tls() 60 61 self.assertEqual(self.get()['status'], 200, 'remove listener option') 62 63 def test_tls_certificate_remove(self): 64 self.load('empty') 65 66 self.certificate() 67 68 self.assertIn('success', self.conf_delete('/certificates/default'), 69 'remove certificate') 70 71 def test_tls_certificate_remove_used(self): 72 self.load('empty') 73 74 self.certificate() 75 76 self.add_tls() 77 78 self.assertIn('error', self.conf_delete('/certificates/default'), 79 'remove certificate') 80 81 def test_tls_certificate_remove_nonexisting(self): 82 self.load('empty') 83 84 self.certificate() 85 86 self.add_tls() 87 88 self.assertIn('error', self.conf_delete('/certificates/blah'), 89 'remove nonexistings certificate') 90 91 @unittest.expectedFailure 92 def test_tls_certificate_update(self): 93 self.load('empty') 94 95 self.certificate() 96 97 self.add_tls() 98 99 cert_old = self.get_server_certificate() 100 101 self.certificate() 102 103 self.assertNotEqual(cert_old, self.get_server_certificate(), 104 'update certificate') 105 106 @unittest.expectedFailure 107 def test_tls_certificate_key_incorrect(self): 108 self.load('empty') 109 110 self.certificate('first', False) 111 self.certificate('second', False) 112 113 self.assertIn('error', self.certificate_load('first', 'second'), 114 'key incorrect') 115 116 def test_tls_certificate_change(self): 117 self.load('empty') 118 119 self.certificate() 120 self.certificate('new') 121 122 self.add_tls() 123 124 cert_old = self.get_server_certificate() 125 126 self.add_tls(cert='new') 127 128 self.assertNotEqual(cert_old, self.get_server_certificate(), 129 'change certificate') 130 131 def test_tls_certificate_key_rsa(self): 132 self.load('empty') 133 134 self.certificate() 135 136 self.assertEqual(self.conf_get('/certificates/default/key'), 137 'RSA (1024 bits)', 'certificate key rsa') 138 139 def test_tls_certificate_key_ec(self): 140 self.load('empty') 141 142 subprocess.call(['openssl', 'ecparam', '-noout', '-genkey', 143 '-out', self.testdir + '/ec.key', 144 '-name', 'prime256v1']) 145 146 subprocess.call(['openssl', 'req', '-x509', '-new', 147 '-config', self.testdir + '/openssl.conf', 148 '-key', self.testdir + '/ec.key', '-subj', '/CN=ec/', 149 '-out', self.testdir + '/ec.crt']) 150 151 self.certificate_load('ec') 152 153 self.assertEqual(self.conf_get('/certificates/ec/key'), 'ECDH', 154 'certificate key ec') 155 156 def test_tls_certificate_chain_options(self): 157 self.load('empty') 158 159 self.certificate() 160 161 chain = self.conf_get('/certificates/default/chain') 162 163 self.assertEqual(len(chain), 1, 'certificate chain length') 164 165 cert = chain[0] 166 167 self.assertEqual(cert['subject']['common_name'], 'default', 168 'certificate subject common name') 169 self.assertEqual(cert['issuer']['common_name'], 'default', 170 'certificate issuer common name') 171 172 self.assertLess(abs(self.sec_epoch() - 173 self.openssl_date_to_sec_epoch(cert['validity']['since'])), 5, 174 'certificate validity since') 175 self.assertEqual( 176 self.openssl_date_to_sec_epoch(cert['validity']['until']) - 177 self.openssl_date_to_sec_epoch(cert['validity']['since']), 2592000, 178 'certificate validity until') 179 180 def test_tls_certificate_chain(self): 181 self.load('empty') 182 183 self.certificate('root', False) 184 185 subprocess.call(['openssl', 'req', '-new', '-config', 186 self.testdir + '/openssl.conf', '-subj', '/CN=int/', 187 '-out', self.testdir + '/int.csr', 188 '-keyout', self.testdir + '/int.key']) 189 190 subprocess.call(['openssl', 'req', '-new', '-config', 191 self.testdir + '/openssl.conf', '-subj', '/CN=end/', 192 '-out', self.testdir + '/end.csr', 193 '-keyout', self.testdir + '/end.key']) 194 195 with open(self.testdir + '/ca.conf', 'w') as f: 196 f.write("""[ ca ] 197default_ca = myca 198 199[ myca ] 200new_certs_dir = %(dir)s 201database = %(database)s 202default_md = sha1 203policy = myca_policy 204serial = %(certserial)s 205default_days = 1 206x509_extensions = myca_extensions 207 208[ myca_policy ] 209commonName = supplied 210 211[ myca_extensions ] 212basicConstraints = critical,CA:TRUE""" % { 213 'dir': self.testdir, 214 'database': self.testdir + '/certindex', 215 'certserial': self.testdir + '/certserial' 216 }) 217 218 with open(self.testdir + '/certserial', 'w') as f: 219 f.write('1000') 220 221 with open(self.testdir + '/certindex', 'w') as f: 222 f.write('') 223 224 subprocess.call(['openssl', 'ca', '-batch', 225 '-config', self.testdir + '/ca.conf', 226 '-keyfile', self.testdir + '/root.key', 227 '-cert', self.testdir + '/root.crt', 228 '-subj', '/CN=int/', 229 '-in', self.testdir + '/int.csr', 230 '-out', self.testdir + '/int.crt']) 231 232 subprocess.call(['openssl', 'ca', '-batch', 233 '-config', self.testdir + '/ca.conf', 234 '-keyfile', self.testdir + '/int.key', 235 '-cert', self.testdir + '/int.crt', 236 '-subj', '/CN=end/', 237 '-in', self.testdir + '/end.csr', 238 '-out', self.testdir + '/end.crt']) 239 240 with open(self.testdir + '/end-int.crt', 'wb') as crt, \ 241 open(self.testdir + '/end.crt', 'rb') as end, \ 242 open(self.testdir + '/int.crt', 'rb') as int: 243 crt.write(end.read() + int.read()) 244 245 self.context = ssl.create_default_context() 246 self.context.check_hostname = False 247 self.context.verify_mode = ssl.CERT_REQUIRED 248 self.context.load_verify_locations(self.testdir + '/root.crt') 249 250 # incomplete chain 251 252 self.assertIn('success', self.certificate_load('end', 'end'), 253 'certificate chain end upload') 254 255 chain = self.conf_get('/certificates/end/chain') 256 self.assertEqual(len(chain), 1, 'certificate chain end length') 257 self.assertEqual(chain[0]['subject']['common_name'], 'end', 258 'certificate chain end subject common name') 259 self.assertEqual(chain[0]['issuer']['common_name'], 'int', 260 'certificate chain end issuer common name') 261 262 self.add_tls(cert='end') 263 264 try: 265 resp = self.get_ssl() 266 except ssl.SSLError: 267 resp = None 268 269 self.assertEqual(resp, None, 'certificate chain incomplete chain') 270 271 # intermediate 272 273 self.assertIn('success', self.certificate_load('int', 'int'), 274 'certificate chain int upload') 275 276 chain = self.conf_get('/certificates/int/chain') 277 self.assertEqual(len(chain), 1, 'certificate chain int length') 278 self.assertEqual(chain[0]['subject']['common_name'], 'int', 279 'certificate chain int subject common name') 280 self.assertEqual(chain[0]['issuer']['common_name'], 'root', 281 'certificate chain int issuer common name') 282 283 self.add_tls(cert='int') 284 285 self.assertEqual(self.get_ssl()['status'], 200, 286 'certificate chain intermediate') 287 288 # intermediate server 289 290 self.assertIn('success', self.certificate_load('end-int', 'end'), 291 'certificate chain end-int upload') 292 293 chain = self.conf_get('/certificates/end-int/chain') 294 self.assertEqual(len(chain), 2, 'certificate chain end-int length') 295 self.assertEqual(chain[0]['subject']['common_name'], 'end', 296 'certificate chain end-int int subject common name') 297 self.assertEqual(chain[0]['issuer']['common_name'], 'int', 298 'certificate chain end-int int issuer common name') 299 self.assertEqual(chain[1]['subject']['common_name'], 'int', 300 'certificate chain end-int end subject common name') 301 self.assertEqual(chain[1]['issuer']['common_name'], 'root', 302 'certificate chain end-int end issuer common name') 303 304 self.add_tls(cert='end-int') 305 306 self.assertEqual(self.get_ssl()['status'], 200, 307 'certificate chain intermediate server') 308 309 @unittest.expectedFailure 310 def test_tls_reconfigure(self): 311 self.load('empty') 312 313 self.certificate() 314 315 (resp, sock) = self.get(headers={
|