1import io 2import re 3import ssl 4import subprocess 5import time 6 7import pytest 8from unit.applications.tls import TestApplicationTLS 9from unit.option import option 10 11 12class TestTLS(TestApplicationTLS): 13 prerequisites = {'modules': {'python': 'any', 'openssl': 'any'}} 14 15 def openssl_date_to_sec_epoch(self, date): 16 return self.date_to_sec_epoch(date, '%b %d %H:%M:%S %Y %Z') 17 18 def add_tls(self, application='empty', cert='default', port=7080): 19 assert 'success' in self.conf( 20 { 21 "pass": "applications/" + application, 22 "tls": {"certificate": cert}, 23 }, 24 'listeners/*:' + str(port), 25 ) 26 27 def remove_tls(self, application='empty', port=7080): 28 assert 'success' in self.conf( 29 {"pass": "applications/" + application}, 'listeners/*:' + str(port) 30 ) 31 32 def req(self, name='localhost', subject=None, x509=False): 33 subj = subject if subject is not None else '/CN=' + name + '/' 34 35 subprocess.check_output( 36 [ 37 'openssl', 38 'req', 39 '-new', 40 '-subj', 41 subj, 42 '-config', 43 option.temp_dir + '/openssl.conf', 44 '-out', 45 option.temp_dir + '/' + name + '.csr', 46 '-keyout', 47 option.temp_dir + '/' + name + '.key', 48 ], 49 stderr=subprocess.STDOUT, 50 ) 51 52 def generate_ca_conf(self): 53 with open(option.temp_dir + '/ca.conf', 'w') as f: 54 f.write( 55 """[ ca ] 56default_ca = myca 57 58[ myca ] 59new_certs_dir = %(dir)s 60database = %(database)s 61default_md = sha256 62policy = myca_policy 63serial = %(certserial)s 64default_days = 1 65x509_extensions = myca_extensions 66copy_extensions = copy 67 68[ myca_policy ] 69commonName = optional 70 71[ myca_extensions ] 72basicConstraints = critical,CA:TRUE""" 73 % { 74 'dir': option.temp_dir, 75 'database': option.temp_dir + '/certindex', 76 'certserial': option.temp_dir + '/certserial', 77 } 78 ) 79 80 with open(option.temp_dir + '/certserial', 'w') as f: 81 f.write('1000') 82 83 with open(option.temp_dir + '/certindex', 'w') as f: 84 f.write('') 85 86 with open(option.temp_dir + '/certindex.attr', 'w') as f: 87 f.write('') 88 89 def ca(self, cert='root', out='localhost'): 90 subprocess.check_output( 91 [ 92 'openssl', 93 'ca', 94 '-batch', 95 '-config', 96 option.temp_dir + '/ca.conf', 97 '-keyfile', 98 option.temp_dir + '/' + cert + '.key', 99 '-cert', 100 option.temp_dir + '/' + cert + '.crt', 101 '-in', 102 option.temp_dir + '/' + out + '.csr', 103 '-out', 104 option.temp_dir + '/' + out + '.crt', 105 ], 106 stderr=subprocess.STDOUT, 107 ) 108 109 def set_certificate_req_context(self, cert='root'): 110 self.context = ssl.create_default_context() 111 self.context.check_hostname = False 112 self.context.verify_mode = ssl.CERT_REQUIRED 113 self.context.load_verify_locations( 114 option.temp_dir + '/' + cert + '.crt' 115 ) 116 117 def test_tls_listener_option_add(self): 118 self.load('empty') 119 120 self.certificate() 121 122 self.add_tls() 123 124 assert self.get_ssl()['status'] == 200, 'add listener option' 125 126 def test_tls_listener_option_remove(self): 127 self.load('empty') 128 129 self.certificate() 130 131 self.add_tls() 132 133 self.get_ssl() 134 135 self.remove_tls() 136 137 assert self.get()['status'] == 200, 'remove listener option' 138 139 def test_tls_certificate_remove(self): 140 self.load('empty') 141 142 self.certificate() 143 144 assert 'success' in self.conf_delete( 145 '/certificates/default' 146 ), 'remove certificate' 147 148 def test_tls_certificate_remove_used(self): 149 self.load('empty') 150 151 self.certificate() 152 153 self.add_tls() 154 155 assert 'error' in self.conf_delete( 156 '/certificates/default' 157 ), 'remove certificate' 158 159 def test_tls_certificate_remove_nonexisting(self): 160 self.load('empty') 161 162 self.certificate() 163 164 self.add_tls() 165 166 assert 'error' in self.conf_delete( 167 '/certificates/blah' 168 ), 'remove nonexistings certificate' 169 170 @pytest.mark.skip('not yet') 171 def test_tls_certificate_update(self): 172 self.load('empty') 173 174 self.certificate() 175 176 self.add_tls() 177 178 cert_old = ssl.get_server_certificate(('127.0.0.1', 7080)) 179 180 self.certificate() 181 182 assert cert_old != ssl.get_server_certificate( 183 ('127.0.0.1', 7080) 184 ), 'update certificate' 185 186 @pytest.mark.skip('not yet') 187 def test_tls_certificate_key_incorrect(self): 188 self.load('empty') 189 190 self.certificate('first', False) 191 self.certificate('second', False) 192 193 assert 'error' in self.certificate_load( 194 'first', 'second' 195 ), 'key incorrect' 196 197 def test_tls_certificate_change(self): 198 self.load('empty') 199 200 self.certificate() 201 self.certificate('new') 202 203 self.add_tls() 204 205 cert_old = ssl.get_server_certificate(('127.0.0.1', 7080)) 206 207 self.add_tls(cert='new') 208 209 assert cert_old != ssl.get_server_certificate( 210 ('127.0.0.1', 7080) 211 ), 'change certificate' 212 213 def test_tls_certificate_key_rsa(self): 214 self.load('empty') 215 216 self.certificate() 217 218 assert ( 219 self.conf_get('/certificates/default/key') == 'RSA (2048 bits)' 220 ), 'certificate key rsa' 221 222 def test_tls_certificate_key_ec(self, temp_dir): 223 self.load('empty') 224 225 self.openssl_conf() 226 227 subprocess.check_output( 228 [ 229 'openssl', 230 'ecparam', 231 '-noout', 232 '-genkey', 233 '-out', 234 temp_dir + '/ec.key', 235 '-name', 236 'prime256v1', 237 ], 238 stderr=subprocess.STDOUT, 239 ) 240 241 subprocess.check_output( 242 [ 243 'openssl', 244 'req', 245 '-x509', 246 '-new', 247 '-subj', 248 '/CN=ec/', 249 '-config', 250 temp_dir + '/openssl.conf', 251 '-key', 252 temp_dir + '/ec.key', 253 '-out', 254 temp_dir + '/ec.crt', 255 ], 256 stderr=subprocess.STDOUT, 257 ) 258 259 self.certificate_load('ec') 260 261 assert ( 262 self.conf_get('/certificates/ec/key') == 'ECDH' 263 ), 'certificate key ec' 264 265 def test_tls_certificate_chain_options(self): 266 self.load('empty') 267 268 self.certificate() 269 270 chain = self.conf_get('/certificates/default/chain') 271 272 assert len(chain) == 1, 'certificate chain length' 273 274 cert = chain[0] 275 276 assert ( 277 cert['subject']['common_name'] == 'default' 278 ), 'certificate subject common name' 279 assert ( 280 cert['issuer']['common_name'] == 'default' 281 ), 'certificate issuer common name' 282 283 assert ( 284 abs( 285 self.sec_epoch() 286 - self.openssl_date_to_sec_epoch(cert['validity']['since']) 287 ) 288 < 60 289 ), 'certificate validity since' 290 assert ( 291 self.openssl_date_to_sec_epoch(cert['validity']['until']) 292 - self.openssl_date_to_sec_epoch(cert['validity']['since']) 293 == 2592000 294 ), 'certificate validity until' 295 296 def test_tls_certificate_chain(self, temp_dir): 297 self.load('empty') 298 299 self.certificate('root', False) 300 301 self.req('int') 302 self.req('end') 303 304 self.generate_ca_conf() 305 306 self.ca(cert='root', out='int') 307 self.ca(cert='int', out='end') 308 309 crt_path = temp_dir + '/end-int.crt' 310 end_path = temp_dir + '/end.crt' 311 int_path = temp_dir + '/int.crt' 312 313 with open(crt_path, 'wb') as crt, open(end_path, 'rb') as end, open( 314 int_path, 'rb' 315 ) as int: 316 crt.write(end.read() + int.read()) 317 318 self.set_certificate_req_context() 319 320 # incomplete chain 321 322 assert 'success' in self.certificate_load( 323 'end', 'end' 324 ), 'certificate chain end upload' 325 326 chain = self.conf_get('/certificates/end/chain') 327 assert len(chain) == 1, 'certificate chain end length' 328 assert ( 329 chain[0]['subject']['common_name'] == 'end' 330 ), 'certificate chain end subject common name' 331 assert ( 332 chain[0]['issuer']['common_name'] == 'int' 333 ), 'certificate chain end issuer common name' 334 335 self.add_tls(cert='end') 336 337 try: 338 resp = self.get_ssl() 339 except ssl.SSLError: 340 resp = None 341 342 assert resp == None, 'certificate chain incomplete chain' 343 344 # intermediate 345 346 assert 'success' in self.certificate_load( 347 'int', 'int' 348 ), 'certificate chain int upload' 349 350 chain = self.conf_get('/certificates/int/chain') 351 assert len(chain) == 1, 'certificate chain int length' 352 assert ( 353 chain[0]['subject']['common_name'] == 'int' 354 ), 'certificate chain int subject common name' 355 assert ( 356 chain[0]['issuer']['common_name'] == 'root' 357 ), 'certificate chain int issuer common name' 358 359 self.add_tls(cert='int') 360 361 assert self.get_ssl()['status'] == 200, 'certificate chain intermediate' 362 363 # intermediate server 364 365 assert 'success' in self.certificate_load( 366 'end-int', 'end' 367 ), 'certificate chain end-int upload' 368 369 chain = self.conf_get('/certificates/end-int/chain') 370 assert len(chain) == 2, 'certificate chain end-int length' 371 assert ( 372 chain[0]['subject']['common_name'] == 'end' 373 ), 'certificate chain end-int int subject common name' 374 assert ( 375 chain[0]['issuer']['common_name'] == 'int' 376 ), 'certificate chain end-int int issuer common name' 377 assert ( 378 chain[1]['subject']['common_name'] == 'int' 379 ), 'certificate chain end-int end subject common name' 380 assert ( 381 chain[1]['issuer']['common_name'] == 'root' 382 ), 'certificate chain end-int end issuer common name' 383 384 self.add_tls(cert='end-int') 385 386 assert ( 387 self.get_ssl()['status'] == 200 388 ), 'certificate chain intermediate server' 389 390 def test_tls_certificate_chain_long(self, temp_dir): 391 self.load('empty') 392 393 self.generate_ca_conf() 394 395 # Minimum chain length is 3. 396 chain_length = 10 397 398 for i in range(chain_length): 399 if i == 0: 400 self.certificate('root', False) 401 elif i == chain_length - 1: 402 self.req('end') 403 else: 404 self.req('int{}'.format(i)) 405 406 for i in range(chain_length - 1): 407 if i == 0: 408 self.ca(cert='root', out='int1') 409 elif i == chain_length - 2: 410 self.ca(cert='int{}'.format(chain_length - 2), out='end') 411 else: 412 self.ca(cert='int{}'.format(i), out='int{}'.format(i + 1)) 413 414 for i in range(chain_length - 1, 0, -1): 415 path = temp_dir + ( 416 '/end.crt' if i == chain_length - 1 else '/int{}.crt'.format(i) 417 ) 418 419 with open(temp_dir + '/all.crt', 'a') as chain, open(path) as cert: 420 chain.write(cert.read()) 421 422 self.set_certificate_req_context() 423 424 assert 'success' in self.certificate_load( 425 'all', 'end' 426 ), 'certificate chain upload' 427 428 chain = self.conf_get('/certificates/all/chain') 429 assert len(chain) == chain_length - 1, 'certificate chain length' 430 431 self.add_tls(cert='all') 432 433 assert self.get_ssl()['status'] == 200, 'certificate chain long' 434 435 def test_tls_certificate_empty_cn(self, temp_dir): 436 self.certificate('root', False) 437 438 self.req(subject='/') 439 440 self.generate_ca_conf() 441 self.ca() 442 443 self.set_certificate_req_context() 444 445 assert 'success' in self.certificate_load('localhost', 'localhost') 446 447 cert = self.conf_get('/certificates/localhost') 448 assert cert['chain'][0]['subject'] == {}, 'empty subject' 449 assert cert['chain'][0]['issuer']['common_name'] == 'root', 'issuer' 450 451 def test_tls_certificate_empty_cn_san(self, temp_dir): 452 self.certificate('root', False) 453 454 self.openssl_conf( 455 rewrite=True, alt_names=["example.com", "www.example.net"] 456 ) 457 458 self.req(subject='/') 459 460 self.generate_ca_conf() 461 self.ca() 462 463 self.set_certificate_req_context() 464 465 assert 'success' in self.certificate_load('localhost', 'localhost') 466 467 cert = self.conf_get('/certificates/localhost') 468 assert cert['chain'][0]['subject'] == { 469 'alt_names': ['example.com', 'www.example.net'] 470 }, 'subject alt_names' 471 assert cert['chain'][0]['issuer']['common_name'] == 'root', 'issuer' 472 473 def test_tls_certificate_empty_cn_san_ip(self): 474 self.certificate('root', False) 475 476 self.openssl_conf( 477 rewrite=True, 478 alt_names=['example.com', 'www.example.net', 'IP|10.0.0.1'], 479 ) 480 481 self.req(subject='/') 482 483 self.generate_ca_conf() 484 self.ca() 485 486 self.set_certificate_req_context() 487 488 assert 'success' in self.certificate_load('localhost', 'localhost') 489 490 cert = self.conf_get('/certificates/localhost') 491 assert cert['chain'][0]['subject'] == { 492 'alt_names': ['example.com', 'www.example.net'] 493 }, 'subject alt_names' 494 assert cert['chain'][0]['issuer']['common_name'] == 'root', 'issuer' 495 496 @pytest.mark.skip('not yet') 497 def test_tls_reconfigure(self): 498 self.load('empty') 499 500 assert self.get()['status'] == 200, 'init' 501 502 self.certificate() 503 504 (resp, sock) = self.get( 505 headers={'Host': 'localhost', 'Connection': 'keep-alive'}, 506 start=True, 507 read_timeout=1, 508 ) 509 510 assert resp['status'] == 200, 'initial status' 511 512 self.add_tls() 513 514 assert self.get(sock=sock)['status'] == 200, 'reconfigure status' 515 assert self.get_ssl()['status'] == 200, 'reconfigure tls status' 516 517 def test_tls_keepalive(self): 518 self.load('mirror') 519 520 assert self.get()['status'] == 200, 'init' 521 522 self.certificate() 523 524 self.add_tls(application='mirror') 525 526 (resp, sock) = self.post_ssl( 527 headers={ 528 'Host': 'localhost', 529 'Connection': 'keep-alive', 530 'Content-Type': 'text/html', 531 }, 532 start=True, 533 body='0123456789', 534 read_timeout=1, 535 ) 536 537 assert resp['body'] == '0123456789', 'keepalive 1' 538 539 resp = self.post_ssl( 540 headers={ 541 'Host': 'localhost', 542 'Connection': 'close', 543 'Content-Type': 'text/html', 544 }, 545 sock=sock, 546 body='0123456789', 547 ) 548 549 assert resp['body'] == '0123456789', 'keepalive 2' 550 551 def test_tls_no_close_notify(self): 552 self.certificate() 553 554 assert 'success' in self.conf( 555 { 556 "listeners": { 557 "*:7080": { 558 "pass": "routes", 559 "tls": {"certificate": "default"}, 560 } 561 }, 562 "routes": [{"action": {"return": 200}}], 563 "applications": {}, 564 } 565 ), 'load application configuration' 566 567 (resp, sock) = self.get_ssl(start=True) 568 569 time.sleep(5) 570 571 sock.close() 572 573 @pytest.mark.skip('not yet') 574 def test_tls_keepalive_certificate_remove(self): 575 self.load('empty') 576 577 assert self.get()['status'] == 200, 'init' 578 579 self.certificate() 580 581 self.add_tls() 582 583 (resp, sock) = self.get_ssl( 584 headers={'Host': 'localhost', 'Connection': 'keep-alive'}, 585 start=True, 586 read_timeout=1, 587 ) 588 589 assert 'success' in self.conf( 590 {"pass": "applications/empty"}, 'listeners/*:7080' 591 ) 592 assert 'success' in self.conf_delete('/certificates/default') 593 594 try: 595 resp = self.get_ssl( 596 headers={'Host': 'localhost', 'Connection': 'close'}, sock=sock 597 ) 598 599 except KeyboardInterrupt: 600 raise 601 602 except: 603 resp = None 604 605 assert resp == None, 'keepalive remove certificate' 606 607 @pytest.mark.skip('not yet') 608 def test_tls_certificates_remove_all(self): 609 self.load('empty') 610 611 self.certificate() 612 613 assert 'success' in self.conf_delete( 614 '/certificates' 615 ), 'remove all certificates' 616 617 def test_tls_application_respawn(self, skip_alert): 618 self.load('mirror') 619 620 self.certificate() 621 622 assert 'success' in self.conf('1', 'applications/mirror/processes') 623 624 self.add_tls(application='mirror') 625 626 (_, sock) = self.post_ssl( 627 headers={ 628 'Host': 'localhost', 629 'Connection': 'keep-alive', 630 'Content-Type': 'text/html', 631 }, 632 start=True, 633 body='0123456789', 634 read_timeout=1, 635 ) 636 637 app_id = self.findall(r'(\d+)#\d+ "mirror" application started')[0] 638 639 subprocess.check_output(['kill', '-9', app_id]) 640 641 skip_alert(r'process .* %s.* exited on signal 9' % app_id) 642 643 self.wait_for_record( 644 re.compile( 645 r' (?!' + app_id + r'#)(\d+)#\d+ "mirror" application started' 646 ) 647 ) 648 649 resp = self.post_ssl( 650 headers={ 651 'Host': 'localhost', 652 'Connection': 'close', 653 'Content-Type': 'text/html', 654 }, 655 sock=sock, 656 body='0123456789', 657 ) 658 659 assert resp['status'] == 200, 'application respawn status' 660 assert resp['body'] == '0123456789', 'application respawn body' 661 662 def test_tls_url_scheme(self): 663 self.load('variables') 664 665 assert ( 666 self.post( 667 headers={ 668 'Host': 'localhost', 669 'Content-Type': 'text/html', 670 'Custom-Header': '', 671 'Connection': 'close', 672 } 673 )['headers']['Wsgi-Url-Scheme'] 674 == 'http' 675 ), 'url scheme http' 676 677 self.certificate() 678 679 self.add_tls(application='variables') 680 681 assert ( 682 self.post_ssl( 683 headers={ 684 'Host': 'localhost', 685 'Content-Type': 'text/html', 686 'Custom-Header': '', 687 'Connection': 'close', 688 } 689 )['headers']['Wsgi-Url-Scheme'] 690 == 'https' 691 ), 'url scheme https' 692 693 def test_tls_big_upload(self): 694 self.load('upload') 695 696 self.certificate() 697 698 self.add_tls(application='upload') 699 700 filename = 'test.txt' 701 data = '0123456789' * 9000 702 703 res = self.post_ssl( 704 body={ 705 'file': { 706 'filename': filename, 707 'type': 'text/plain', 708 'data': io.StringIO(data), 709 } 710 } 711 ) 712 assert res['status'] == 200, 'status ok' 713 assert res['body'] == filename + data 714 715 def test_tls_multi_listener(self): 716 self.load('empty') 717 718 self.certificate() 719 720 self.add_tls() 721 self.add_tls(port=7081) 722 723 assert self.get_ssl()['status'] == 200, 'listener #1' 724 725 assert self.get_ssl(port=7081)['status'] == 200, 'listener #2' 726