1import io 2import re 3import ssl 4import subprocess 5import time 6 7import pytest 8from unit.applications.tls import TestApplicationTLS 9from unit.option import option 10 11 12class TestTLS(TestApplicationTLS): 13 prerequisites = {'modules': {'python': 'any', 'openssl': 'any'}} 14 15 def openssl_date_to_sec_epoch(self, date): 16 return self.date_to_sec_epoch(date, '%b %d %X %Y %Z') 17 18 def add_tls(self, application='empty', cert='default', port=7080): 19 assert 'success' in self.conf( 20 { 21 "pass": "applications/" + application, 22 "tls": {"certificate": cert}, 23 }, 24 'listeners/*:' + str(port), 25 ) 26 27 def remove_tls(self, application='empty', port=7080): 28 assert 'success' in self.conf( 29 {"pass": "applications/" + application}, 'listeners/*:' + str(port) 30 ) 31 32 def req(self, name='localhost', subject=None, x509=False): 33 subj = subject if subject is not None else '/CN=' + name + '/' 34 35 subprocess.check_output( 36 [ 37 'openssl', 38 'req', 39 '-new', 40 '-subj', 41 subj, 42 '-config', 43 option.temp_dir + '/openssl.conf', 44 '-out', 45 option.temp_dir + '/' + name + '.csr', 46 '-keyout', 47 option.temp_dir + '/' + name + '.key', 48 ], 49 stderr=subprocess.STDOUT, 50 ) 51 52 def generate_ca_conf(self): 53 with open(option.temp_dir + '/ca.conf', 'w') as f: 54 f.write( 55 """[ ca ] 56default_ca = myca 57 58[ myca ] 59new_certs_dir = %(dir)s 60database = %(database)s 61default_md = sha256 62policy = myca_policy 63serial = %(certserial)s 64default_days = 1 65x509_extensions = myca_extensions 66copy_extensions = copy 67 68[ myca_policy ] 69commonName = optional 70 71[ myca_extensions ] 72basicConstraints = critical,CA:TRUE""" 73 % { 74 'dir': option.temp_dir, 75 'database': option.temp_dir + '/certindex', 76 'certserial': option.temp_dir + '/certserial', 77 } 78 ) 79 80 with open(option.temp_dir + '/certserial', 'w') as f: 81 f.write('1000') 82 83 with open(option.temp_dir + '/certindex', 'w') as f: 84 f.write('') 85 86 with open(option.temp_dir + '/certindex.attr', 'w') as f: 87 f.write('') 88 89 def ca(self, cert='root', out='localhost'): 90 subprocess.check_output( 91 [ 92 'openssl', 93 'ca', 94 '-batch', 95 '-config', 96 option.temp_dir + '/ca.conf', 97 '-keyfile', 98 option.temp_dir + '/' + cert + '.key', 99 '-cert', 100 option.temp_dir + '/' + cert + '.crt', 101 '-in', 102 option.temp_dir + '/' + out + '.csr', 103 '-out', 104 option.temp_dir + '/' + out + '.crt', 105 ], 106 stderr=subprocess.STDOUT, 107 ) 108 109 def set_certificate_req_context(self, cert='root'): 110 self.context = ssl.create_default_context() 111 self.context.check_hostname = False 112 self.context.verify_mode = ssl.CERT_REQUIRED 113 self.context.load_verify_locations( 114 option.temp_dir + '/' + cert + '.crt' 115 ) 116 117 def test_tls_listener_option_add(self): 118 self.load('empty') 119 120 self.certificate() 121 122 self.add_tls() 123 124 assert self.get_ssl()['status'] == 200, 'add listener option' 125 126 def test_tls_listener_option_remove(self): 127 self.load('empty') 128 129 self.certificate() 130 131 self.add_tls() 132 133 self.get_ssl() 134 135 self.remove_tls() 136 137 assert self.get()['status'] == 200, 'remove listener option' 138 139 def test_tls_certificate_remove(self): 140 self.load('empty') 141 142 self.certificate() 143 144 assert 'success' in self.conf_delete( 145 '/certificates/default' 146 ), 'remove certificate' 147 148 def test_tls_certificate_remove_used(self): 149 self.load('empty') 150 151 self.certificate() 152 153 self.add_tls() 154 155 assert 'error' in self.conf_delete( 156 '/certificates/default' 157 ), 'remove certificate' 158 159 def test_tls_certificate_remove_nonexisting(self): 160 self.load('empty') 161 162 self.certificate() 163 164 self.add_tls() 165 166 assert 'error' in self.conf_delete( 167 '/certificates/blah' 168 ), 'remove nonexistings certificate' 169 170 @pytest.mark.skip('not yet') 171 def test_tls_certificate_update(self): 172 self.load('empty') 173 174 self.certificate() 175 176 self.add_tls() 177 178 cert_old = ssl.get_server_certificate(('127.0.0.1', 7080)) 179 180 self.certificate() 181 182 assert cert_old != ssl.get_server_certificate( 183 ('127.0.0.1', 7080) 184 ), 'update certificate' 185 186 @pytest.mark.skip('not yet') 187 def test_tls_certificate_key_incorrect(self): 188 self.load('empty') 189 190 self.certificate('first', False) 191 self.certificate('second', False) 192 193 assert 'error' in self.certificate_load( 194 'first', 'second' 195 ), 'key incorrect' 196 197 def test_tls_certificate_change(self): 198 self.load('empty') 199 200 self.certificate() 201 self.certificate('new') 202 203 self.add_tls() 204 205 cert_old = ssl.get_server_certificate(('127.0.0.1', 7080)) 206 207 self.add_tls(cert='new') 208 209 assert cert_old != ssl.get_server_certificate( 210 ('127.0.0.1', 7080) 211 ), 'change certificate' 212 213 def test_tls_certificate_key_rsa(self): 214 self.load('empty') 215 216 self.certificate() 217 218 assert ( 219 self.conf_get('/certificates/default/key') == 'RSA (2048 bits)' 220 ), 'certificate key rsa' 221 222 def test_tls_certificate_key_ec(self, temp_dir): 223 self.load('empty') 224 225 self.openssl_conf() 226 227 subprocess.check_output( 228 [ 229 'openssl', 230 'ecparam', 231 '-noout', 232 '-genkey', 233 '-out', 234 temp_dir + '/ec.key', 235 '-name', 236 'prime256v1', 237 ], 238 stderr=subprocess.STDOUT, 239 ) 240 241 subprocess.check_output( 242 [ 243 'openssl', 244 'req', 245 '-x509', 246 '-new', 247 '-subj', 248 '/CN=ec/', 249 '-config', 250 temp_dir + '/openssl.conf', 251 '-key', 252 temp_dir + '/ec.key', 253 '-out', 254 temp_dir + '/ec.crt', 255 ], 256 stderr=subprocess.STDOUT, 257 ) 258 259 self.certificate_load('ec') 260 261 assert ( 262 self.conf_get('/certificates/ec/key') == 'ECDH' 263 ), 'certificate key ec' 264 265 def test_tls_certificate_chain_options(self): 266 self.load('empty') 267 268 self.certificate() 269 270 chain = self.conf_get('/certificates/default/chain') 271 272 assert len(chain) == 1, 'certificate chain length' 273 274 cert = chain[0] 275 276 assert ( 277 cert['subject']['common_name'] == 'default' 278 ), 'certificate subject common name' 279 assert ( 280 cert['issuer']['common_name'] == 'default' 281 ), 'certificate issuer common name' 282 283 assert ( 284 abs( 285 self.sec_epoch() 286 - self.openssl_date_to_sec_epoch(cert['validity']['since']) 287 ) 288 < 60 289 ), 'certificate validity since' 290 assert ( 291 self.openssl_date_to_sec_epoch(cert['validity']['until']) 292 - self.openssl_date_to_sec_epoch(cert['validity']['since']) 293 == 2592000 294 ), 'certificate validity until' 295 296 def test_tls_certificate_chain(self, temp_dir): 297 self.load('empty') 298 299 self.certificate('root', False) 300 301 self.req('int') 302 self.req('end') 303 304 self.generate_ca_conf() 305 306 self.ca(cert='root', out='int') 307 self.ca(cert='int', out='end') 308 309 crt_path = temp_dir + '/end-int.crt' 310 end_path = temp_dir + '/end.crt' 311 int_path = temp_dir + '/int.crt' 312 313 with open(crt_path, 'wb') as crt, open(end_path, 'rb') as end, open( 314 int_path, 'rb' 315 ) as int: 316 crt.write(end.read() + int.read()) 317 318 self.set_certificate_req_context() 319 320 # incomplete chain 321 322 assert 'success' in self.certificate_load( 323 'end', 'end' 324 ), 'certificate chain end upload' 325 326 chain = self.conf_get('/certificates/end/chain') 327 assert len(chain) == 1, 'certificate chain end length' 328 assert ( 329 chain[0]['subject']['common_name'] == 'end' 330 ), 'certificate chain end subject common name' 331 assert ( 332 chain[0]['issuer']['common_name'] == 'int' 333 ), 'certificate chain end issuer common name' 334 335 self.add_tls(cert='end') 336 337 try: 338 resp = self.get_ssl() 339 except ssl.SSLError: 340 resp = None 341 342 assert resp == None, 'certificate chain incomplete chain' 343 344 # intermediate 345 346 assert 'success' in self.certificate_load( 347 'int', 'int' 348 ), 'certificate chain int upload' 349 350 chain = self.conf_get('/certificates/int/chain') 351 assert len(chain) == 1, 'certificate chain int length' 352 assert ( 353 chain[0]['subject']['common_name'] == 'int' 354 ), 'certificate chain int subject common name' 355 assert ( 356 chain[0]['issuer']['common_name'] == 'root' 357 ), 'certificate chain int issuer common name' 358 359 self.add_tls(cert='int') 360 361 assert self.get_ssl()['status'] == 200, 'certificate chain intermediate' 362 363 # intermediate server 364 365 assert 'success' in self.certificate_load( 366 'end-int', 'end' 367 ), 'certificate chain end-int upload' 368 369 chain = self.conf_get('/certificates/end-int/chain') 370 assert len(chain) == 2, 'certificate chain end-int length' 371 assert ( 372 chain[0]['subject']['common_name'] == 'end' 373 ), 'certificate chain end-int int subject common name' 374 assert ( 375 chain[0]['issuer']['common_name'] == 'int' 376 ), 'certificate chain end-int int issuer common name' 377 assert ( 378 chain[1]['subject']['common_name'] == 'int' 379 ), 'certificate chain end-int end subject common name' 380 assert ( 381 chain[1]['issuer']['common_name'] == 'root' 382 ), 'certificate chain end-int end issuer common name' 383 384 self.add_tls(cert='end-int') 385 386 assert ( 387 self.get_ssl()['status'] == 200 388 ), 'certificate chain intermediate server' 389 390 def test_tls_certificate_chain_long(self, temp_dir): 391 self.load('empty') 392 393 self.generate_ca_conf() 394 395 # Minimum chain length is 3. 396 chain_length = 10 397 398 for i in range(chain_length): 399 if i == 0: 400 self.certificate('root', False) 401 elif i == chain_length - 1: 402 self.req('end') 403 else: 404 self.req('int{}'.format(i)) 405 406 for i in range(chain_length - 1): 407 if i == 0: 408 self.ca(cert='root', out='int1') 409 elif i == chain_length - 2: 410 self.ca(cert='int{}'.format(chain_length - 2), out='end') 411 else: 412 self.ca(cert='int{}'.format(i), out='int{}'.format(i + 1)) 413 414 for i in range(chain_length - 1, 0, -1): 415 path = temp_dir + ( 416 '/end.crt' if i == chain_length - 1 else '/int{}.crt'.format(i) 417 ) 418 419 with open(temp_dir + '/all.crt', 'a') as chain, open(path) as cert: 420 chain.write(cert.read()) 421 422 self.set_certificate_req_context() 423 424 assert 'success' in self.certificate_load( 425 'all', 'end' 426 ), 'certificate chain upload' 427 428 chain = self.conf_get('/certificates/all/chain') 429 assert len(chain) == chain_length - 1, 'certificate chain length' 430 431 self.add_tls(cert='all') 432 433 assert self.get_ssl()['status'] == 200, 'certificate chain long' 434 435 def test_tls_certificate_empty_cn(self, temp_dir): 436 self.certificate('root', False) 437 438 self.req(subject='/') 439 440 self.generate_ca_conf() 441 self.ca() 442 443 self.set_certificate_req_context() 444 445 assert 'success' in self.certificate_load('localhost', 'localhost') 446 447 cert = self.conf_get('/certificates/localhost') 448 assert cert['chain'][0]['subject'] == {}, 'empty subject' 449 assert cert['chain'][0]['issuer']['common_name'] == 'root', 'issuer' 450 451 def test_tls_certificate_empty_cn_san(self, temp_dir): 452 self.certificate('root', False) 453 454 self.openssl_conf( 455 rewrite=True, alt_names=["example.com", "www.example.net"] 456 ) 457 458 self.req(subject='/') 459 460 self.generate_ca_conf() 461 self.ca() 462 463 self.set_certificate_req_context() 464 465 assert 'success' in self.certificate_load('localhost', 'localhost') 466 467 cert = self.conf_get('/certificates/localhost') 468 assert cert['chain'][0]['subject'] == { 469 'alt_names': ['example.com', 'www.example.net'] 470 }, 'subject alt_names' 471 assert cert['chain'][0]['issuer']['common_name'] == 'root', 'issuer' 472 473 def test_tls_certificate_empty_cn_san_ip(self): 474 self.certificate('root', False) 475 476 self.openssl_conf( 477 rewrite=True, 478 alt_names=['example.com', 'www.example.net', 'IP|10.0.0.1'], 479 ) 480 481 self.req(subject='/') 482 483 self.generate_ca_conf() 484 self.ca() 485 486 self.set_certificate_req_context() 487 488 assert 'success' in self.certificate_load('localhost', 'localhost') 489 490 cert = self.conf_get('/certificates/localhost') 491 assert cert['chain'][0]['subject'] == { 492 'alt_names': ['example.com', 'www.example.net'] 493 }, 'subject alt_names' 494 assert cert['chain'][0]['issuer']['common_name'] == 'root', 'issuer' 495 496 def test_tls_keepalive(self): 497 self.load('mirror') 498 499 assert self.get()['status'] == 200, 'init' 500 501 self.certificate() 502 503 self.add_tls(application='mirror') 504 505 (resp, sock) = self.post_ssl( 506 headers={ 507 'Host': 'localhost', 508 'Connection': 'keep-alive', 509 'Content-Type': 'text/html', 510 }, 511 start=True, 512 body='0123456789', 513 read_timeout=1, 514 ) 515 516 assert resp['body'] == '0123456789', 'keepalive 1' 517 518 resp = self.post_ssl( 519 headers={ 520 'Host': 'localhost', 521 'Connection': 'close', 522 'Content-Type': 'text/html', 523 }, 524 sock=sock, 525 body='0123456789', 526 ) 527 528 assert resp['body'] == '0123456789', 'keepalive 2' 529 530 def test_tls_no_close_notify(self): 531 self.certificate() 532 533 assert 'success' in self.conf( 534 { 535 "listeners": { 536 "*:7080": { 537 "pass": "routes", 538 "tls": {"certificate": "default"}, 539 } 540 }, 541 "routes": [{"action": {"return": 200}}], 542 "applications": {}, 543 } 544 ), 'load application configuration' 545 546 (resp, sock) = self.get_ssl(start=True) 547 548 time.sleep(5) 549 550 sock.close() 551 552 @pytest.mark.skip('not yet') 553 def test_tls_keepalive_certificate_remove(self): 554 self.load('empty') 555 556 assert self.get()['status'] == 200, 'init' 557 558 self.certificate() 559 560 self.add_tls() 561 562 (resp, sock) = self.get_ssl( 563 headers={'Host': 'localhost', 'Connection': 'keep-alive'}, 564 start=True, 565 read_timeout=1, 566 ) 567 568 assert 'success' in self.conf( 569 {"pass": "applications/empty"}, 'listeners/*:7080' 570 ) 571 assert 'success' in self.conf_delete('/certificates/default') 572 573 try: 574 resp = self.get_ssl( 575 headers={'Host': 'localhost', 'Connection': 'close'}, sock=sock 576 ) 577 578 except KeyboardInterrupt: 579 raise 580 581 except: 582 resp = None 583 584 assert resp == None, 'keepalive remove certificate' 585 586 @pytest.mark.skip('not yet') 587 def test_tls_certificates_remove_all(self): 588 self.load('empty') 589 590 self.certificate() 591 592 assert 'success' in self.conf_delete( 593 '/certificates' 594 ), 'remove all certificates' 595 596 def test_tls_application_respawn(self, skip_alert): 597 self.load('mirror') 598 599 self.certificate() 600 601 assert 'success' in self.conf('1', 'applications/mirror/processes') 602 603 self.add_tls(application='mirror') 604 605 (_, sock) = self.post_ssl( 606 headers={ 607 'Host': 'localhost', 608 'Connection': 'keep-alive', 609 'Content-Type': 'text/html', 610 }, 611 start=True, 612 body='0123456789', 613 read_timeout=1, 614 ) 615 616 app_id = self.findall(r'(\d+)#\d+ "mirror" application started')[0] 617 618 subprocess.check_output(['kill', '-9', app_id]) 619 620 skip_alert(r'process .* %s.* exited on signal 9' % app_id) 621 622 self.wait_for_record( 623 re.compile( 624 r' (?!' + app_id + r'#)(\d+)#\d+ "mirror" application started' 625 ) 626 ) 627 628 resp = self.post_ssl( 629 headers={ 630 'Host': 'localhost', 631 'Connection': 'close', 632 'Content-Type': 'text/html', 633 }, 634 sock=sock, 635 body='0123456789', 636 ) 637 638 assert resp['status'] == 200, 'application respawn status' 639 assert resp['body'] == '0123456789', 'application respawn body' 640 641 def test_tls_url_scheme(self): 642 self.load('variables') 643 644 assert ( 645 self.post( 646 headers={ 647 'Host': 'localhost', 648 'Content-Type': 'text/html', 649 'Custom-Header': '', 650 'Connection': 'close', 651 } 652 )['headers']['Wsgi-Url-Scheme'] 653 == 'http' 654 ), 'url scheme http' 655 656 self.certificate() 657 658 self.add_tls(application='variables') 659 660 assert ( 661 self.post_ssl( 662 headers={ 663 'Host': 'localhost', 664 'Content-Type': 'text/html', 665 'Custom-Header': '', 666 'Connection': 'close', 667 } 668 )['headers']['Wsgi-Url-Scheme'] 669 == 'https' 670 ), 'url scheme https' 671 672 def test_tls_big_upload(self): 673 self.load('upload') 674 675 self.certificate() 676 677 self.add_tls(application='upload') 678 679 filename = 'test.txt' 680 data = '0123456789' * 9000 681 682 res = self.post_ssl( 683 body={ 684 'file': { 685 'filename': filename, 686 'type': 'text/plain', 687 'data': io.StringIO(data), 688 } 689 } 690 ) 691 assert res['status'] == 200, 'status ok' 692 assert res['body'] == filename + data 693 694 def test_tls_multi_listener(self): 695 self.load('empty') 696 697 self.certificate() 698 699 self.add_tls() 700 self.add_tls(port=7081) 701 702 assert self.get_ssl()['status'] == 200, 'listener #1' 703 704 assert self.get_ssl(port=7081)['status'] == 200, 'listener #2' 705