1import io 2import ssl 3import subprocess 4import time 5 6import pytest 7from unit.applications.tls import TestApplicationTLS 8from unit.option import option 9 10 11class TestTLS(TestApplicationTLS): 12 prerequisites = {'modules': {'python': 'any', 'openssl': 'any'}} 13 14 def openssl_date_to_sec_epoch(self, date): 15 return self.date_to_sec_epoch(date, '%b %d %X %Y %Z') 16 17 def add_tls(self, application='empty', cert='default', port=7080): 18 assert 'success' in self.conf( 19 { 20 "pass": "applications/" + application, 21 "tls": {"certificate": cert}, 22 }, 23 'listeners/*:' + str(port), 24 ) 25 26 def remove_tls(self, application='empty', port=7080): 27 assert 'success' in self.conf( 28 {"pass": "applications/" + application}, 'listeners/*:' + str(port) 29 ) 30 31 def req(self, name='localhost', subject=None, x509=False): 32 subj = subject if subject is not None else '/CN=' + name + '/' 33 34 subprocess.check_output( 35 [ 36 'openssl', 37 'req', 38 '-new', 39 '-subj', 40 subj, 41 '-config', 42 option.temp_dir + '/openssl.conf', 43 '-out', 44 option.temp_dir + '/' + name + '.csr', 45 '-keyout', 46 option.temp_dir + '/' + name + '.key', 47 ], 48 stderr=subprocess.STDOUT, 49 ) 50 51 def generate_ca_conf(self): 52 with open(option.temp_dir + '/ca.conf', 'w') as f: 53 f.write( 54 """[ ca ] 55default_ca = myca 56 57[ myca ] 58new_certs_dir = %(dir)s 59database = %(database)s 60default_md = sha256 61policy = myca_policy 62serial = %(certserial)s 63default_days = 1 64x509_extensions = myca_extensions 65copy_extensions = copy 66 67[ myca_policy ] 68commonName = optional 69 70[ myca_extensions ] 71basicConstraints = critical,CA:TRUE""" 72 % { 73 'dir': option.temp_dir, 74 'database': option.temp_dir + '/certindex', 75 'certserial': option.temp_dir + '/certserial', 76 } 77 ) 78 79 with open(option.temp_dir + '/certserial', 'w') as f: 80 f.write('1000') 81 82 with open(option.temp_dir + '/certindex', 'w') as f: 83 f.write('') 84 85 with open(option.temp_dir + '/certindex.attr', 'w') as f: 86 f.write('') 87 88 def ca(self, cert='root', out='localhost'): 89 subprocess.check_output( 90 [ 91 'openssl', 92 'ca', 93 '-batch', 94 '-config', 95 option.temp_dir + '/ca.conf', 96 '-keyfile', 97 option.temp_dir + '/' + cert + '.key', 98 '-cert', 99 option.temp_dir + '/' + cert + '.crt', 100 '-in', 101 option.temp_dir + '/' + out + '.csr', 102 '-out', 103 option.temp_dir + '/' + out + '.crt', 104 ], 105 stderr=subprocess.STDOUT, 106 ) 107 108 def set_certificate_req_context(self, cert='root'): 109 self.context = ssl.create_default_context() 110 self.context.check_hostname = False 111 self.context.verify_mode = ssl.CERT_REQUIRED 112 self.context.load_verify_locations( 113 option.temp_dir + '/' + cert + '.crt' 114 ) 115 116 def test_tls_listener_option_add(self): 117 self.load('empty') 118 119 self.certificate() 120 121 self.add_tls() 122 123 assert self.get_ssl()['status'] == 200, 'add listener option' 124 125 def test_tls_listener_option_remove(self): 126 self.load('empty') 127 128 self.certificate() 129 130 self.add_tls() 131 132 self.get_ssl() 133 134 self.remove_tls() 135 136 assert self.get()['status'] == 200, 'remove listener option' 137 138 def test_tls_certificate_remove(self): 139 self.load('empty') 140 141 self.certificate() 142 143 assert 'success' in self.conf_delete( 144 '/certificates/default' 145 ), 'remove certificate' 146 147 def test_tls_certificate_remove_used(self): 148 self.load('empty') 149 150 self.certificate() 151 152 self.add_tls() 153 154 assert 'error' in self.conf_delete( 155 '/certificates/default' 156 ), 'remove certificate' 157 158 def test_tls_certificate_remove_nonexisting(self): 159 self.load('empty') 160 161 self.certificate() 162 163 self.add_tls() 164 165 assert 'error' in self.conf_delete( 166 '/certificates/blah' 167 ), 'remove nonexistings certificate' 168 169 @pytest.mark.skip('not yet') 170 def test_tls_certificate_update(self): 171 self.load('empty') 172 173 self.certificate() 174 175 self.add_tls() 176 177 cert_old = ssl.get_server_certificate(('127.0.0.1', 7080)) 178 179 self.certificate() 180 181 assert cert_old != ssl.get_server_certificate( 182 ('127.0.0.1', 7080) 183 ), 'update certificate' 184 185 @pytest.mark.skip('not yet') 186 def test_tls_certificate_key_incorrect(self): 187 self.load('empty') 188 189 self.certificate('first', False) 190 self.certificate('second', False) 191 192 assert 'error' in self.certificate_load( 193 'first', 'second' 194 ), 'key incorrect' 195 196 def test_tls_certificate_change(self): 197 self.load('empty') 198 199 self.certificate() 200 self.certificate('new') 201 202 self.add_tls() 203 204 cert_old = ssl.get_server_certificate(('127.0.0.1', 7080)) 205 206 self.add_tls(cert='new') 207 208 assert cert_old != ssl.get_server_certificate( 209 ('127.0.0.1', 7080) 210 ), 'change certificate' 211 212 def test_tls_certificate_key_rsa(self): 213 self.load('empty') 214 215 self.certificate() 216 217 assert ( 218 self.conf_get('/certificates/default/key') == 'RSA (2048 bits)' 219 ), 'certificate key rsa' 220 221 def test_tls_certificate_key_ec(self, temp_dir): 222 self.load('empty') 223 224 self.openssl_conf() 225 226 subprocess.check_output( 227 [ 228 'openssl', 229 'ecparam', 230 '-noout', 231 '-genkey', 232 '-out', 233 temp_dir + '/ec.key', 234 '-name', 235 'prime256v1', 236 ], 237 stderr=subprocess.STDOUT, 238 ) 239 240 subprocess.check_output( 241 [ 242 'openssl', 243 'req', 244 '-x509', 245 '-new', 246 '-subj', 247 '/CN=ec/', 248 '-config', 249 temp_dir + '/openssl.conf', 250 '-key', 251 temp_dir + '/ec.key', 252 '-out', 253 temp_dir + '/ec.crt', 254 ], 255 stderr=subprocess.STDOUT, 256 ) 257 258 self.certificate_load('ec') 259 260 assert ( 261 self.conf_get('/certificates/ec/key') == 'ECDH' 262 ), 'certificate key ec' 263 264 def test_tls_certificate_chain_options(self): 265 self.load('empty') 266 267 self.certificate() 268 269 chain = self.conf_get('/certificates/default/chain') 270 271 assert len(chain) == 1, 'certificate chain length' 272 273 cert = chain[0] 274 275 assert ( 276 cert['subject']['common_name'] == 'default' 277 ), 'certificate subject common name' 278 assert ( 279 cert['issuer']['common_name'] == 'default' 280 ), 'certificate issuer common name' 281 282 assert ( 283 abs( 284 self.sec_epoch() 285 - self.openssl_date_to_sec_epoch(cert['validity']['since']) 286 ) 287 < 60 288 ), 'certificate validity since' 289 assert ( 290 self.openssl_date_to_sec_epoch(cert['validity']['until']) 291 - self.openssl_date_to_sec_epoch(cert['validity']['since']) 292 == 2592000 293 ), 'certificate validity until' 294 295 def test_tls_certificate_chain(self, temp_dir): 296 self.load('empty') 297 298 self.certificate('root', False) 299 300 self.req('int') 301 self.req('end') 302 303 self.generate_ca_conf() 304 305 self.ca(cert='root', out='int') 306 self.ca(cert='int', out='end') 307 308 crt_path = temp_dir + '/end-int.crt' 309 end_path = temp_dir + '/end.crt' 310 int_path = temp_dir + '/int.crt' 311 312 with open(crt_path, 'wb') as crt, open(end_path, 'rb') as end, open( 313 int_path, 'rb' 314 ) as int: 315 crt.write(end.read() + int.read()) 316 317 self.set_certificate_req_context() 318 319 # incomplete chain 320 321 assert 'success' in self.certificate_load( 322 'end', 'end' 323 ), 'certificate chain end upload' 324 325 chain = self.conf_get('/certificates/end/chain') 326 assert len(chain) == 1, 'certificate chain end length' 327 assert ( 328 chain[0]['subject']['common_name'] == 'end' 329 ), 'certificate chain end subject common name' 330 assert ( 331 chain[0]['issuer']['common_name'] == 'int' 332 ), 'certificate chain end issuer common name' 333 334 self.add_tls(cert='end') 335 336 try: 337 resp = self.get_ssl() 338 except ssl.SSLError: 339 resp = None 340 341 assert resp == None, 'certificate chain incomplete chain' 342 343 # intermediate 344 345 assert 'success' in self.certificate_load( 346 'int', 'int' 347 ), 'certificate chain int upload' 348 349 chain = self.conf_get('/certificates/int/chain') 350 assert len(chain) == 1, 'certificate chain int length' 351 assert ( 352 chain[0]['subject']['common_name'] == 'int' 353 ), 'certificate chain int subject common name' 354 assert ( 355 chain[0]['issuer']['common_name'] == 'root' 356 ), 'certificate chain int issuer common name' 357 358 self.add_tls(cert='int') 359 360 assert self.get_ssl()['status'] == 200, 'certificate chain intermediate' 361 362 # intermediate server 363 364 assert 'success' in self.certificate_load( 365 'end-int', 'end' 366 ), 'certificate chain end-int upload' 367 368 chain = self.conf_get('/certificates/end-int/chain') 369 assert len(chain) == 2, 'certificate chain end-int length' 370 assert ( 371 chain[0]['subject']['common_name'] == 'end' 372 ), 'certificate chain end-int int subject common name' 373 assert ( 374 chain[0]['issuer']['common_name'] == 'int' 375 ), 'certificate chain end-int int issuer common name' 376 assert ( 377 chain[1]['subject']['common_name'] == 'int' 378 ), 'certificate chain end-int end subject common name' 379 assert ( 380 chain[1]['issuer']['common_name'] == 'root' 381 ), 'certificate chain end-int end issuer common name' 382 383 self.add_tls(cert='end-int') 384 385 assert ( 386 self.get_ssl()['status'] == 200 387 ), 'certificate chain intermediate server' 388 389 def test_tls_certificate_chain_long(self, temp_dir): 390 self.load('empty') 391 392 self.generate_ca_conf() 393 394 # Minimum chain length is 3. 395 chain_length = 10 396 397 for i in range(chain_length): 398 if i == 0: 399 self.certificate('root', False) 400 elif i == chain_length - 1: 401 self.req('end') 402 else: 403 self.req('int{}'.format(i)) 404 405 for i in range(chain_length - 1): 406 if i == 0: 407 self.ca(cert='root', out='int1') 408 elif i == chain_length - 2: 409 self.ca(cert='int{}'.format(chain_length - 2), out='end') 410 else: 411 self.ca(cert='int{}'.format(i), out='int{}'.format(i + 1)) 412 413 for i in range(chain_length - 1, 0, -1): 414 path = temp_dir + ( 415 '/end.crt' if i == chain_length - 1 else '/int{}.crt'.format(i) 416 ) 417 418 with open(temp_dir + '/all.crt', 'a') as chain, open(path) as cert: 419 chain.write(cert.read()) 420 421 self.set_certificate_req_context() 422 423 assert 'success' in self.certificate_load( 424 'all', 'end' 425 ), 'certificate chain upload' 426 427 chain = self.conf_get('/certificates/all/chain') 428 assert len(chain) == chain_length - 1, 'certificate chain length' 429 430 self.add_tls(cert='all') 431 432 assert self.get_ssl()['status'] == 200, 'certificate chain long' 433 434 def test_tls_certificate_empty_cn(self, temp_dir): 435 self.certificate('root', False) 436 437 self.req(subject='/') 438 439 self.generate_ca_conf() 440 self.ca() 441 442 self.set_certificate_req_context() 443 444 assert 'success' in self.certificate_load('localhost', 'localhost') 445 446 cert = self.conf_get('/certificates/localhost') 447 assert cert['chain'][0]['subject'] == {}, 'empty subject' 448 assert cert['chain'][0]['issuer']['common_name'] == 'root', 'issuer' 449 450 def test_tls_certificate_empty_cn_san(self, temp_dir): 451 self.certificate('root', False) 452 453 self.openssl_conf( 454 rewrite=True, alt_names=["example.com", "www.example.net"] 455 ) 456 457 self.req(subject='/') 458 459 self.generate_ca_conf() 460 self.ca() 461 462 self.set_certificate_req_context() 463 464 assert 'success' in self.certificate_load('localhost', 'localhost') 465 466 cert = self.conf_get('/certificates/localhost') 467 assert cert['chain'][0]['subject'] == { 468 'alt_names': ['example.com', 'www.example.net'] 469 }, 'subject alt_names' 470 assert cert['chain'][0]['issuer']['common_name'] == 'root', 'issuer' 471 472 def test_tls_certificate_empty_cn_san_ip(self): 473 self.certificate('root', False) 474 475 self.openssl_conf( 476 rewrite=True, 477 alt_names=['example.com', 'www.example.net', 'IP|10.0.0.1'], 478 ) 479 480 self.req(subject='/') 481 482 self.generate_ca_conf() 483 self.ca() 484 485 self.set_certificate_req_context() 486 487 assert 'success' in self.certificate_load('localhost', 'localhost') 488 489 cert = self.conf_get('/certificates/localhost') 490 assert cert['chain'][0]['subject'] == { 491 'alt_names': ['example.com', 'www.example.net'] 492 }, 'subject alt_names' 493 assert cert['chain'][0]['issuer']['common_name'] == 'root', 'issuer' 494 495 def test_tls_keepalive(self): 496 self.load('mirror') 497 498 assert self.get()['status'] == 200, 'init' 499 500 self.certificate() 501 502 self.add_tls(application='mirror') 503 504 (resp, sock) = self.post_ssl( 505 headers={ 506 'Host': 'localhost', 507 'Connection': 'keep-alive', 508 'Content-Type': 'text/html', 509 }, 510 start=True, 511 body='0123456789', 512 read_timeout=1, 513 ) 514 515 assert resp['body'] == '0123456789', 'keepalive 1' 516 517 resp = self.post_ssl( 518 headers={ 519 'Host': 'localhost', 520 'Connection': 'close', 521 'Content-Type': 'text/html', 522 }, 523 sock=sock, 524 body='0123456789', 525 ) 526 527 assert resp['body'] == '0123456789', 'keepalive 2' 528 529 def test_tls_no_close_notify(self): 530 self.certificate() 531 532 assert 'success' in self.conf( 533 { 534 "listeners": { 535 "*:7080": { 536 "pass": "routes", 537 "tls": {"certificate": "default"}, 538 } 539 }, 540 "routes": [{"action": {"return": 200}}], 541 "applications": {}, 542 } 543 ), 'load application configuration' 544 545 (resp, sock) = self.get_ssl(start=True) 546 547 time.sleep(5) 548 549 sock.close() 550 551 @pytest.mark.skip('not yet') 552 def test_tls_keepalive_certificate_remove(self): 553 self.load('empty') 554 555 assert self.get()['status'] == 200, 'init' 556 557 self.certificate() 558 559 self.add_tls() 560 561 (resp, sock) = self.get_ssl( 562 headers={'Host': 'localhost', 'Connection': 'keep-alive'}, 563 start=True, 564 read_timeout=1, 565 ) 566 567 assert 'success' in self.conf( 568 {"pass": "applications/empty"}, 'listeners/*:7080' 569 ) 570 assert 'success' in self.conf_delete('/certificates/default') 571 572 try: 573 resp = self.get_ssl( 574 headers={'Host': 'localhost', 'Connection': 'close'}, sock=sock 575 ) 576 577 except KeyboardInterrupt: 578 raise 579 580 except: 581 resp = None 582 583 assert resp == None, 'keepalive remove certificate' 584 585 @pytest.mark.skip('not yet') 586 def test_tls_certificates_remove_all(self): 587 self.load('empty') 588 589 self.certificate() 590 591 assert 'success' in self.conf_delete( 592 '/certificates' 593 ), 'remove all certificates' 594 595 def test_tls_application_respawn(self, skip_alert): 596 self.load('mirror') 597 598 self.certificate() 599 600 assert 'success' in self.conf('1', 'applications/mirror/processes') 601 602 self.add_tls(application='mirror') 603 604 (_, sock) = self.post_ssl( 605 headers={ 606 'Host': 'localhost', 607 'Connection': 'keep-alive', 608 'Content-Type': 'text/html', 609 }, 610 start=True, 611 body='0123456789', 612 read_timeout=1, 613 ) 614 615 app_id = self.findall(r'(\d+)#\d+ "mirror" application started')[0] 616 617 subprocess.check_output(['kill', '-9', app_id]) 618 619 skip_alert(r'process %s exited on signal 9' % app_id) 620 621 self.wait_for_record( 622 r' (?!' + app_id + r'#)(\d+)#\d+ "mirror" application started' 623 ) 624 625 resp = self.post_ssl( 626 headers={ 627 'Host': 'localhost', 628 'Connection': 'close', 629 'Content-Type': 'text/html', 630 }, 631 sock=sock, 632 body='0123456789', 633 ) 634 635 assert resp['status'] == 200, 'application respawn status' 636 assert resp['body'] == '0123456789', 'application respawn body' 637 638 def test_tls_url_scheme(self): 639 self.load('variables') 640 641 assert ( 642 self.post( 643 headers={ 644 'Host': 'localhost', 645 'Content-Type': 'text/html', 646 'Custom-Header': '', 647 'Connection': 'close', 648 } 649 )['headers']['Wsgi-Url-Scheme'] 650 == 'http' 651 ), 'url scheme http' 652 653 self.certificate() 654 655 self.add_tls(application='variables') 656 657 assert ( 658 self.post_ssl( 659 headers={ 660 'Host': 'localhost', 661 'Content-Type': 'text/html', 662 'Custom-Header': '', 663 'Connection': 'close', 664 } 665 )['headers']['Wsgi-Url-Scheme'] 666 == 'https' 667 ), 'url scheme https' 668 669 def test_tls_big_upload(self): 670 self.load('upload') 671 672 self.certificate() 673 674 self.add_tls(application='upload') 675 676 filename = 'test.txt' 677 data = '0123456789' * 9000 678 679 res = self.post_ssl( 680 body={ 681 'file': { 682 'filename': filename, 683 'type': 'text/plain', 684 'data': io.StringIO(data), 685 } 686 } 687 ) 688 assert res['status'] == 200, 'status ok' 689 assert res['body'] == filename + data 690 691 def test_tls_multi_listener(self): 692 self.load('empty') 693 694 self.certificate() 695 696 self.add_tls() 697 self.add_tls(port=7081) 698 699 assert self.get_ssl()['status'] == 200, 'listener #1' 700 701 assert self.get_ssl(port=7081)['status'] == 200, 'listener #2' 702