1import re 2import ssl 3import time 4import subprocess 5import unittest 6from unit.applications.tls import TestApplicationTLS 7 8 9class TestTLS(TestApplicationTLS): 10 prerequisites = ['python', 'openssl'] 11 12 def findall(self, pattern): 13 with open(self.testdir + '/unit.log', 'r', errors='ignore') as f: 14 return re.findall(pattern, f.read()) 15 16 def openssl_date_to_sec_epoch(self, date): 17 return self.date_to_sec_epoch(date, '%b %d %H:%M:%S %Y %Z') 18 19 def add_tls(self, application='empty', cert='default', port=7080): 20 self.conf( 21 {"application": application, "tls": {"certificate": cert}}, 22 'listeners/*:' + str(port), 23 ) 24 25 def remove_tls(self, application='empty', port=7080): 26 self.conf({"application": application}, 'listeners/*:' + str(port)) 27 28 def test_tls_listener_option_add(self): 29 self.load('empty') 30 31 self.certificate() 32 33 self.add_tls() 34 35 self.assertEqual(self.get_ssl()['status'], 200, 'add listener option') 36 37 def test_tls_listener_option_remove(self): 38 self.load('empty') 39 40 self.certificate() 41 42 self.add_tls() 43 44 self.get_ssl() 45 46 self.remove_tls() 47 48 self.assertEqual(self.get()['status'], 200, 'remove listener option') 49 50 def test_tls_certificate_remove(self): 51 self.load('empty') 52 53 self.certificate() 54 55 self.assertIn( 56 'success', 57 self.conf_delete('/certificates/default'), 58 'remove certificate', 59 ) 60 61 def test_tls_certificate_remove_used(self): 62 self.load('empty') 63 64 self.certificate() 65 66 self.add_tls() 67 68 self.assertIn( 69 'error', 70 self.conf_delete('/certificates/default'), 71 'remove certificate', 72 ) 73 74 def test_tls_certificate_remove_nonexisting(self): 75 self.load('empty') 76 77 self.certificate() 78 79 self.add_tls() 80 81 self.assertIn( 82 'error', 83 self.conf_delete('/certificates/blah'), 84 'remove nonexistings certificate', 85 ) 86 87 @unittest.expectedFailure 88 def test_tls_certificate_update(self): 89 self.load('empty') 90 91 self.certificate() 92 93 self.add_tls() 94 95 cert_old = self.get_server_certificate() 96 97 self.certificate() 98 99 self.assertNotEqual( 100 cert_old, self.get_server_certificate(), 'update certificate' 101 ) 102 103 @unittest.expectedFailure 104 def test_tls_certificate_key_incorrect(self): 105 self.load('empty') 106 107 self.certificate('first', False) 108 self.certificate('second', False) 109 110 self.assertIn( 111 'error', self.certificate_load('first', 'second'), 'key incorrect' 112 ) 113 114 def test_tls_certificate_change(self): 115 self.load('empty') 116 117 self.certificate() 118 self.certificate('new') 119 120 self.add_tls() 121 122 cert_old = self.get_server_certificate() 123 124 self.add_tls(cert='new') 125 126 self.assertNotEqual( 127 cert_old, self.get_server_certificate(), 'change certificate' 128 ) 129 130 def test_tls_certificate_key_rsa(self): 131 self.load('empty') 132 133 self.certificate() 134 135 self.assertEqual( 136 self.conf_get('/certificates/default/key'), 137 'RSA (1024 bits)', 138 'certificate key rsa', 139 ) 140 141 def test_tls_certificate_key_ec(self): 142 self.load('empty') 143 144 subprocess.call( 145 [ 146 'openssl', 147 'ecparam', 148 '-noout', 149 '-genkey', 150 '-out', self.testdir + '/ec.key', 151 '-name', 'prime256v1', 152 ] 153 ) 154 155 subprocess.call( 156 [ 157 'openssl', 158 'req', 159 '-x509', 160 '-new', 161 '-subj', '/CN=ec/', 162 '-config', self.testdir + '/openssl.conf', 163 '-key', self.testdir + '/ec.key', 164 '-out', self.testdir + '/ec.crt', 165 ] 166 ) 167 168 self.certificate_load('ec') 169 170 self.assertEqual( 171 self.conf_get('/certificates/ec/key'), 'ECDH', 'certificate key ec' 172 ) 173 174 def test_tls_certificate_chain_options(self): 175 self.load('empty') 176 177 self.certificate() 178 179 chain = self.conf_get('/certificates/default/chain') 180 181 self.assertEqual(len(chain), 1, 'certificate chain length') 182 183 cert = chain[0] 184 185 self.assertEqual( 186 cert['subject']['common_name'], 187 'default', 188 'certificate subject common name', 189 ) 190 self.assertEqual( 191 cert['issuer']['common_name'], 192 'default', 193 'certificate issuer common name', 194 ) 195 196 self.assertLess( 197 abs( 198 self.sec_epoch() 199 - self.openssl_date_to_sec_epoch(cert['validity']['since']) 200 ), 201 5, 202 'certificate validity since', 203 ) 204 self.assertEqual( 205 self.openssl_date_to_sec_epoch(cert['validity']['until']) 206 - self.openssl_date_to_sec_epoch(cert['validity']['since']), 207 2592000, 208 'certificate validity until', 209 ) 210 211 def test_tls_certificate_chain(self): 212 self.load('empty') 213 214 self.certificate('root', False) 215 216 subprocess.call( 217 [ 218 'openssl', 219 'req', 220 '-new', 221 '-subj', '/CN=int/', 222 '-config', self.testdir + '/openssl.conf', 223 '-out', self.testdir + '/int.csr', 224 '-keyout', self.testdir + '/int.key', 225 ] 226 ) 227 228 subprocess.call( 229 [ 230 'openssl', 231 'req', 232 '-new', 233 '-subj', '/CN=end/', 234 '-config', self.testdir + '/openssl.conf', 235 '-out', self.testdir + '/end.csr', 236 '-keyout', self.testdir + '/end.key', 237 ] 238 ) 239 240 with open(self.testdir + '/ca.conf', 'w') as f: 241 f.write( 242 """[ ca ] 243default_ca = myca 244 245[ myca ] 246new_certs_dir = %(dir)s 247database = %(database)s 248default_md = sha1 249policy = myca_policy 250serial = %(certserial)s 251default_days = 1 252x509_extensions = myca_extensions 253 254[ myca_policy ] 255commonName = supplied 256 257[ myca_extensions ] 258basicConstraints = critical,CA:TRUE""" 259 % { 260 'dir': self.testdir, 261 'database': self.testdir + '/certindex', 262 'certserial': self.testdir + '/certserial', 263 } 264 ) 265 266 with open(self.testdir + '/certserial', 'w') as f: 267 f.write('1000') 268 269 with open(self.testdir + '/certindex', 'w') as f: 270 f.write('') 271 272 subprocess.call( 273 [ 274 'openssl', 275 'ca', 276 '-batch', 277 '-subj', '/CN=int/', 278 '-config', self.testdir + '/ca.conf', 279 '-keyfile', self.testdir + '/root.key', 280 '-cert', self.testdir + '/root.crt', 281 '-in', self.testdir + '/int.csr', 282 '-out', self.testdir + '/int.crt', 283 ] 284 ) 285 286 subprocess.call( 287 [ 288 'openssl', 289 'ca', 290 '-batch', 291 '-subj', '/CN=end/', 292 '-config', self.testdir + '/ca.conf', 293 '-keyfile', self.testdir + '/int.key', 294 '-cert', self.testdir + '/int.crt', 295 '-in', self.testdir + '/end.csr', 296 '-out', self.testdir + '/end.crt', 297 ] 298 ) 299 300 crt_path = self.testdir + '/end-int.crt' 301 end_path = self.testdir + '/end.crt' 302 int_path = self.testdir + '/int.crt' 303 304 with open(crt_path, 'wb') as crt, \ 305 open(end_path, 'rb') as end, \ 306 open(int_path, 'rb') as int: 307 crt.write(end.read() + int.read()) 308 309 self.context = ssl.create_default_context() 310 self.context.check_hostname = False 311 self.context.verify_mode = ssl.CERT_REQUIRED 312 self.context.load_verify_locations(self.testdir + '/root.crt') 313 314 # incomplete chain 315 316 self.assertIn( 317 'success', 318 self.certificate_load('end', 'end'), 319 'certificate chain end upload', 320 ) 321 322 chain = self.conf_get('/certificates/end/chain') 323 self.assertEqual(len(chain), 1, 'certificate chain end length') 324 self.assertEqual( 325 chain[0]['subject']['common_name'], 326 'end', 327 'certificate chain end subject common name', 328 ) 329 self.assertEqual( 330 chain[0]['issuer']['common_name'], 331 'int', 332 'certificate chain end issuer common name', 333 ) 334 335 self.add_tls(cert='end') 336 337 try: 338 resp = self.get_ssl() 339 except ssl.SSLError: 340 resp = None 341 342 self.assertEqual(resp, None, 'certificate chain incomplete chain') 343 344 # intermediate 345 346 self.assertIn( 347 'success', 348 self.certificate_load('int', 'int'), 349 'certificate chain int upload', 350 ) 351 352 chain = self.conf_get('/certificates/int/chain') 353 self.assertEqual(len(chain), 1, 'certificate chain int length') 354 self.assertEqual( 355 chain[0]['subject']['common_name'], 356 'int', 357 'certificate chain int subject common name', 358 ) 359 self.assertEqual( 360 chain[0]['issuer']['common_name'], 361 'root', 362 'certificate chain int issuer common name', 363 ) 364 365 self.add_tls(cert='int') 366 367 self.assertEqual( 368 self.get_ssl()['status'], 200, 'certificate chain intermediate' 369 ) 370 371 # intermediate server 372 373 self.assertIn( 374 'success', 375 self.certificate_load('end-int', 'end'), 376 'certificate chain end-int upload', 377 ) 378 379 chain = self.conf_get('/certificates/end-int/chain') 380 self.assertEqual(len(chain), 2, 'certificate chain end-int length') 381 self.assertEqual( 382 chain[0]['subject']['common_name'], 383 'end', 384 'certificate chain end-int int subject common name', 385 ) 386 self.assertEqual( 387 chain[0]['issuer']['common_name'], 388 'int', 389 'certificate chain end-int int issuer common name', 390 ) 391 self.assertEqual( 392 chain[1]['subject']['common_name'], 393 'int', 394 'certificate chain end-int end subject common name', 395 ) 396 self.assertEqual( 397 chain[1]['issuer']['common_name'], 398 'root', 399 'certificate chain end-int end issuer common name', 400 ) 401 402 self.add_tls(cert='end-int') 403 404 self.assertEqual( 405 self.get_ssl()['status'], 406 200, 407 'certificate chain intermediate server', 408 ) 409 410 @unittest.expectedFailure 411 def test_tls_reconfigure(self): 412 self.load('empty') 413 414 self.assertEqual(self.get()['status'], 200, 'init') 415 416 self.certificate() 417 418 (resp, sock) = self.get( 419 headers={'Host': 'localhost', 'Connection': 'keep-alive'}, 420 start=True, 421 read_timeout=1, 422 ) 423 424 self.assertEqual(resp['status'], 200, 'initial status') 425 426 self.add_tls() 427 428 self.assertEqual( 429 self.get(sock=sock)['status'], 200, 'reconfigure status' 430 ) 431 self.assertEqual( 432 self.get_ssl()['status'], 200, 'reconfigure tls status' 433 ) 434 435 def test_tls_keepalive(self): 436 self.load('mirror') 437 438 self.assertEqual(self.get()['status'], 200, 'init') 439 440 self.certificate() 441 442 self.add_tls(application='mirror') 443 444 (resp, sock) = self.post_ssl( 445 headers={ 446 'Host': 'localhost', 447 'Connection': 'keep-alive', 448 'Content-Type': 'text/html', 449 }, 450 start=True, 451 body='0123456789', 452 read_timeout=1, 453 ) 454 455 self.assertEqual(resp['body'], '0123456789', 'keepalive 1') 456 457 resp = self.post_ssl( 458 headers={ 459 'Host': 'localhost', 460 'Connection': 'close', 461 'Content-Type': 'text/html', 462 }, 463 sock=sock, 464 body='0123456789', 465 ) 466 467 self.assertEqual(resp['body'], '0123456789', 'keepalive 2') 468 469 @unittest.expectedFailure 470 def test_tls_keepalive_certificate_remove(self): 471 self.load('empty') 472 473 self.assertEqual(self.get()['status'], 200, 'init') 474 475 self.certificate() 476 477 self.add_tls() 478 479 (resp, sock) = self.get_ssl( 480 headers={'Host': 'localhost', 'Connection': 'keep-alive'}, 481 start=True, 482 read_timeout=1, 483 ) 484 485 self.conf({"application": "empty"}, 'listeners/*:7080') 486 self.conf_delete('/certificates/default') 487 488 try: 489 resp = self.get_ssl( 490 headers={'Host': 'localhost', 'Connection': 'close'}, sock=sock 491 ) 492 except: 493 resp = None 494 495 self.assertEqual(resp, None, 'keepalive remove certificate') 496 497 @unittest.expectedFailure 498 def test_tls_certificates_remove_all(self): 499 self.load('empty') 500 501 self.certificate() 502 503 self.assertIn( 504 'success', 505 self.conf_delete('/certificates'), 506 'remove all certificates', 507 ) 508 509 def test_tls_application_respawn(self): 510 self.skip_alerts.append(r'process \d+ exited on signal 9') 511 self.load('mirror') 512 513 self.assertEqual(self.get()['status'], 200, 'init') 514 515 self.certificate() 516 517 self.conf('1', 'applications/mirror/processes') 518 519 self.add_tls(application='mirror') 520 521 (resp, sock) = self.post_ssl( 522 headers={ 523 'Host': 'localhost', 524 'Connection': 'keep-alive', 525 'Content-Type': 'text/html', 526 }, 527 start=True, 528 body='0123456789', 529 read_timeout=1, 530 ) 531 532 app_id = self.findall(r'(\d+)#\d+ "mirror" application started')[0] 533 534 subprocess.call(['kill', '-9', app_id]) 535 536 self.wait_for_record( 537 re.compile( 538 ' (?!' + app_id + '#)(\d+)#\d+ "mirror" application started' 539 ) 540 ) 541 542 resp = self.post_ssl( 543 headers={ 544 'Host': 'localhost', 545 'Connection': 'close', 546 'Content-Type': 'text/html', 547 }, 548 sock=sock, 549 body='0123456789', 550 ) 551 552 self.assertEqual(resp['status'], 200, 'application respawn status') 553 self.assertEqual( 554 resp['body'], '0123456789', 'application respawn body' 555 ) 556 557 def test_tls_url_scheme(self): 558 self.load('variables') 559 560 self.assertEqual( 561 self.post( 562 headers={ 563 'Host': 'localhost', 564 'Content-Type': 'text/html', 565 'Custom-Header': '', 566 'Connection': 'close', 567 } 568 )['headers']['Wsgi-Url-Scheme'], 569 'http', 570 'url scheme http', 571 ) 572 573 self.certificate() 574 575 self.add_tls(application='variables') 576 577 self.assertEqual( 578 self.post_ssl( 579 headers={ 580 'Host': 'localhost', 581 'Content-Type': 'text/html', 582 'Custom-Header': '', 583 'Connection': 'close', 584 } 585 )['headers']['Wsgi-Url-Scheme'], 586 'https', 587 'url scheme https', 588 ) 589 590if __name__ == '__main__': 591 TestTLS.main() 592