1import io 2import ssl 3import subprocess 4import time 5 6import pytest 7from unit.applications.tls import TestApplicationTLS 8from unit.option import option 9 10 11class TestTLS(TestApplicationTLS): 12 prerequisites = {'modules': {'python': 'any', 'openssl': 'any'}} 13 14 def add_tls(self, application='empty', cert='default', port=7080): 15 assert 'success' in self.conf( 16 { 17 "pass": f"applications/{application}", 18 "tls": {"certificate": cert}, 19 }, 20 f'listeners/*:{port}', 21 ) 22 23 def remove_tls(self, application='empty', port=7080): 24 assert 'success' in self.conf( 25 {"pass": f"applications/{application}"}, f'listeners/*:{port}' 26 ) 27 28 def req(self, name='localhost', subject=None): 29 subj = subject if subject is not None else f'/CN={name}/' 30 31 subprocess.check_output( 32 [ 33 'openssl', 34 'req', 35 '-new', 36 '-subj', 37 subj, 38 '-config', 39 f'{option.temp_dir}/openssl.conf', 40 '-out', 41 f'{option.temp_dir}/{name}.csr', 42 '-keyout', 43 f'{option.temp_dir}/{name}.key', 44 ], 45 stderr=subprocess.STDOUT, 46 ) 47 48 def generate_ca_conf(self): 49 with open(f'{option.temp_dir}/ca.conf', 'w') as f: 50 f.write( 51 f"""[ ca ] 52default_ca = myca 53 54[ myca ] 55new_certs_dir = {option.temp_dir} 56database = {option.temp_dir}/certindex 57default_md = sha256 58policy = myca_policy 59serial = {option.temp_dir}/certserial 60default_days = 1 61x509_extensions = myca_extensions 62copy_extensions = copy 63 64[ myca_policy ] 65commonName = optional 66 67[ myca_extensions ] 68basicConstraints = critical,CA:TRUE""" 69 ) 70 71 with open(f'{option.temp_dir}/certserial', 'w') as f: 72 f.write('1000') 73 74 with open(f'{option.temp_dir}/certindex', 'w') as f: 75 f.write('') 76 77 with open(f'{option.temp_dir}/certindex.attr', 'w') as f: 78 f.write('') 79 80 def ca(self, cert='root', out='localhost'): 81 subprocess.check_output( 82 [ 83 'openssl', 84 'ca', 85 '-batch', 86 '-config', 87 f'{option.temp_dir}/ca.conf', 88 '-keyfile', 89 f'{option.temp_dir}/{cert}.key', 90 '-cert', 91 f'{option.temp_dir}/{cert}.crt', 92 '-in', 93 f'{option.temp_dir}/{out}.csr', 94 '-out', 95 f'{option.temp_dir}/{out}.crt', 96 ], 97 stderr=subprocess.STDOUT, 98 ) 99 100 def set_certificate_req_context(self, cert='root'): 101 self.context = ssl.create_default_context() 102 self.context.check_hostname = False 103 self.context.verify_mode = ssl.CERT_REQUIRED 104 self.context.load_verify_locations(f'{option.temp_dir}/{cert}.crt') 105 106 def test_tls_listener_option_add(self): 107 self.load('empty') 108 109 self.certificate() 110 111 self.add_tls() 112 113 assert self.get_ssl()['status'] == 200, 'add listener option' 114 115 def test_tls_listener_option_remove(self): 116 self.load('empty') 117 118 self.certificate() 119 120 self.add_tls() 121 122 self.get_ssl() 123 124 self.remove_tls() 125 126 assert self.get()['status'] == 200, 'remove listener option' 127 128 def test_tls_certificate_remove(self): 129 self.load('empty') 130 131 self.certificate() 132 133 assert 'success' in self.conf_delete( 134 '/certificates/default' 135 ), 'remove certificate' 136 137 def test_tls_certificate_remove_used(self): 138 self.load('empty') 139 140 self.certificate() 141 142 self.add_tls() 143 144 assert 'error' in self.conf_delete( 145 '/certificates/default' 146 ), 'remove certificate' 147 148 def test_tls_certificate_remove_nonexisting(self): 149 self.load('empty') 150 151 self.certificate() 152 153 self.add_tls() 154 155 assert 'error' in self.conf_delete( 156 '/certificates/blah' 157 ), 'remove nonexistings certificate' 158 159 @pytest.mark.skip('not yet') 160 def test_tls_certificate_update(self): 161 self.load('empty') 162 163 self.certificate() 164 165 self.add_tls() 166 167 cert_old = ssl.get_server_certificate(('127.0.0.1', 7080)) 168 169 self.certificate() 170 171 assert cert_old != ssl.get_server_certificate( 172 ('127.0.0.1', 7080) 173 ), 'update certificate' 174 175 @pytest.mark.skip('not yet') 176 def test_tls_certificate_key_incorrect(self): 177 self.load('empty') 178 179 self.certificate('first', False) 180 self.certificate('second', False) 181 182 assert 'error' in self.certificate_load( 183 'first', 'second' 184 ), 'key incorrect' 185 186 def test_tls_certificate_change(self): 187 self.load('empty') 188 189 self.certificate() 190 self.certificate('new') 191 192 self.add_tls() 193 194 cert_old = ssl.get_server_certificate(('127.0.0.1', 7080)) 195 196 self.add_tls(cert='new') 197 198 assert cert_old != ssl.get_server_certificate( 199 ('127.0.0.1', 7080) 200 ), 'change certificate' 201 202 def test_tls_certificate_key_rsa(self): 203 self.load('empty') 204 205 self.certificate() 206 207 assert ( 208 self.conf_get('/certificates/default/key') == 'RSA (2048 bits)' 209 ), 'certificate key rsa' 210 211 def test_tls_certificate_key_ec(self, temp_dir): 212 self.load('empty') 213 214 self.openssl_conf() 215 216 subprocess.check_output( 217 [ 218 'openssl', 219 'ecparam', 220 '-noout', 221 '-genkey', 222 '-out', 223 f'{temp_dir}/ec.key', 224 '-name', 225 'prime256v1', 226 ], 227 stderr=subprocess.STDOUT, 228 ) 229 230 subprocess.check_output( 231 [ 232 'openssl', 233 'req', 234 '-x509', 235 '-new', 236 '-subj', 237 '/CN=ec/', 238 '-config', 239 f'{temp_dir}/openssl.conf', 240 '-key', 241 f'{temp_dir}/ec.key', 242 '-out', 243 f'{temp_dir}/ec.crt', 244 ], 245 stderr=subprocess.STDOUT, 246 ) 247 248 self.certificate_load('ec') 249 250 assert ( 251 self.conf_get('/certificates/ec/key') == 'ECDH' 252 ), 'certificate key ec' 253 254 def test_tls_certificate_chain_options(self, date_to_sec_epoch, sec_epoch): 255 self.load('empty') 256 date_format = '%b %d %X %Y %Z' 257 258 self.certificate() 259 260 chain = self.conf_get('/certificates/default/chain') 261 262 assert len(chain) == 1, 'certificate chain length' 263 264 cert = chain[0] 265 266 assert ( 267 cert['subject']['common_name'] == 'default' 268 ), 'certificate subject common name' 269 assert ( 270 cert['issuer']['common_name'] == 'default' 271 ), 'certificate issuer common name' 272 273 assert ( 274 abs( 275 sec_epoch 276 - date_to_sec_epoch(cert['validity']['since'], date_format) 277 ) 278 < 60 279 ), 'certificate validity since' 280 assert ( 281 date_to_sec_epoch(cert['validity']['until'], date_format) 282 - date_to_sec_epoch(cert['validity']['since'], date_format) 283 == 2592000 284 ), 'certificate validity until' 285 286 def test_tls_certificate_chain(self, temp_dir): 287 self.load('empty') 288 289 self.certificate('root', False) 290 291 self.req('int') 292 self.req('end') 293 294 self.generate_ca_conf() 295 296 self.ca(cert='root', out='int') 297 self.ca(cert='int', out='end') 298 299 crt_path = f'{temp_dir}/end-int.crt' 300 end_path = f'{temp_dir}/end.crt' 301 int_path = f'{temp_dir}/int.crt' 302 303 with open(crt_path, 'wb') as crt, open(end_path, 'rb') as end, open( 304 int_path, 'rb' 305 ) as int: 306 crt.write(end.read() + int.read()) 307 308 self.set_certificate_req_context() 309 310 # incomplete chain 311 312 assert 'success' in self.certificate_load( 313 'end', 'end' 314 ), 'certificate chain end upload' 315 316 chain = self.conf_get('/certificates/end/chain') 317 assert len(chain) == 1, 'certificate chain end length' 318 assert ( 319 chain[0]['subject']['common_name'] == 'end' 320 ), 'certificate chain end subject common name' 321 assert ( 322 chain[0]['issuer']['common_name'] == 'int' 323 ), 'certificate chain end issuer common name' 324 325 self.add_tls(cert='end') 326 327 try: 328 resp = self.get_ssl() 329 except ssl.SSLError: 330 resp = None 331 332 assert resp is None, 'certificate chain incomplete chain' 333 334 # intermediate 335 336 assert 'success' in self.certificate_load( 337 'int', 'int' 338 ), 'certificate chain int upload' 339 340 chain = self.conf_get('/certificates/int/chain') 341 assert len(chain) == 1, 'certificate chain int length' 342 assert ( 343 chain[0]['subject']['common_name'] == 'int' 344 ), 'certificate chain int subject common name' 345 assert ( 346 chain[0]['issuer']['common_name'] == 'root' 347 ), 'certificate chain int issuer common name' 348 349 self.add_tls(cert='int') 350 351 assert self.get_ssl()['status'] == 200, 'certificate chain intermediate' 352 353 # intermediate server 354 355 assert 'success' in self.certificate_load( 356 'end-int', 'end' 357 ), 'certificate chain end-int upload' 358 359 chain = self.conf_get('/certificates/end-int/chain') 360 assert len(chain) == 2, 'certificate chain end-int length' 361 assert ( 362 chain[0]['subject']['common_name'] == 'end' 363 ), 'certificate chain end-int int subject common name' 364 assert ( 365 chain[0]['issuer']['common_name'] == 'int' 366 ), 'certificate chain end-int int issuer common name' 367 assert ( 368 chain[1]['subject']['common_name'] == 'int' 369 ), 'certificate chain end-int end subject common name' 370 assert ( 371 chain[1]['issuer']['common_name'] == 'root' 372 ), 'certificate chain end-int end issuer common name' 373 374 self.add_tls(cert='end-int') 375 376 assert ( 377 self.get_ssl()['status'] == 200 378 ), 'certificate chain intermediate server' 379 380 def test_tls_certificate_chain_long(self, temp_dir): 381 self.load('empty') 382 383 self.generate_ca_conf() 384 385 # Minimum chain length is 3. 386 chain_length = 10 387 388 for i in range(chain_length): 389 if i == 0: 390 self.certificate('root', False) 391 elif i == chain_length - 1: 392 self.req('end') 393 else: 394 self.req(f'int{i}') 395 396 for i in range(chain_length - 1): 397 if i == 0: 398 self.ca(cert='root', out='int1') 399 elif i == chain_length - 2: 400 self.ca(cert=f'int{(chain_length - 2)}', out='end') 401 else: 402 self.ca(cert=f'int{i}', out=f'int{(i + 1)}') 403 404 for i in range(chain_length - 1, 0, -1): 405 path = ( 406 f'{temp_dir}/end.crt' 407 if i == chain_length - 1 408 else f'{temp_dir}/int{i}.crt' 409 ) 410 411 with open(f'{temp_dir}/all.crt', 'a') as chain, open(path) as cert: 412 chain.write(cert.read()) 413 414 self.set_certificate_req_context() 415 416 assert 'success' in self.certificate_load( 417 'all', 'end' 418 ), 'certificate chain upload' 419 420 chain = self.conf_get('/certificates/all/chain') 421 assert len(chain) == chain_length - 1, 'certificate chain length' 422 423 self.add_tls(cert='all') 424 425 assert self.get_ssl()['status'] == 200, 'certificate chain long' 426 427 def test_tls_certificate_empty_cn(self): 428 self.certificate('root', False) 429 430 self.req(subject='/') 431 432 self.generate_ca_conf() 433 self.ca() 434 435 self.set_certificate_req_context() 436 437 assert 'success' in self.certificate_load('localhost', 'localhost') 438 439 cert = self.conf_get('/certificates/localhost') 440 assert cert['chain'][0]['subject'] == {}, 'empty subject' 441 assert cert['chain'][0]['issuer']['common_name'] == 'root', 'issuer' 442 443 def test_tls_certificate_empty_cn_san(self): 444 self.certificate('root', False) 445 446 self.openssl_conf( 447 rewrite=True, alt_names=["example.com", "www.example.net"] 448 ) 449 450 self.req(subject='/') 451 452 self.generate_ca_conf() 453 self.ca() 454 455 self.set_certificate_req_context() 456 457 assert 'success' in self.certificate_load('localhost', 'localhost') 458 459 cert = self.conf_get('/certificates/localhost') 460 assert cert['chain'][0]['subject'] == { 461 'alt_names': ['example.com', 'www.example.net'] 462 }, 'subject alt_names' 463 assert cert['chain'][0]['issuer']['common_name'] == 'root', 'issuer' 464 465 def test_tls_certificate_empty_cn_san_ip(self): 466 self.certificate('root', False) 467 468 self.openssl_conf( 469 rewrite=True, 470 alt_names=['example.com', 'www.example.net', 'IP|10.0.0.1'], 471 ) 472 473 self.req(subject='/') 474 475 self.generate_ca_conf() 476 self.ca() 477 478 self.set_certificate_req_context() 479 480 assert 'success' in self.certificate_load('localhost', 'localhost') 481 482 cert = self.conf_get('/certificates/localhost') 483 assert cert['chain'][0]['subject'] == { 484 'alt_names': ['example.com', 'www.example.net'] 485 }, 'subject alt_names' 486 assert cert['chain'][0]['issuer']['common_name'] == 'root', 'issuer' 487 488 def test_tls_keepalive(self): 489 self.load('mirror') 490 491 assert self.get()['status'] == 200, 'init' 492 493 self.certificate() 494 495 self.add_tls(application='mirror') 496 497 (resp, sock) = self.post_ssl( 498 headers={ 499 'Host': 'localhost', 500 'Connection': 'keep-alive', 501 }, 502 start=True, 503 body='0123456789', 504 read_timeout=1, 505 ) 506 507 assert resp['body'] == '0123456789', 'keepalive 1' 508 509 resp = self.post_ssl( 510 headers={ 511 'Host': 'localhost', 512 'Connection': 'close', 513 }, 514 sock=sock, 515 body='0123456789', 516 ) 517 518 assert resp['body'] == '0123456789', 'keepalive 2' 519 520 def test_tls_no_close_notify(self): 521 self.certificate() 522 523 assert 'success' in self.conf( 524 { 525 "listeners": { 526 "*:7080": { 527 "pass": "routes", 528 "tls": {"certificate": "default"}, 529 } 530 }, 531 "routes": [{"action": {"return": 200}}], 532 "applications": {}, 533 } 534 ), 'load application configuration' 535 536 (_, sock) = self.get_ssl(start=True) 537 538 time.sleep(5) 539 540 sock.close() 541 542 @pytest.mark.skip('not yet') 543 def test_tls_keepalive_certificate_remove(self): 544 self.load('empty') 545 546 assert self.get()['status'] == 200, 'init' 547 548 self.certificate() 549 550 self.add_tls() 551 552 (resp, sock) = self.get_ssl( 553 headers={'Host': 'localhost', 'Connection': 'keep-alive'}, 554 start=True, 555 read_timeout=1, 556 ) 557 558 assert 'success' in self.conf( 559 {"pass": "applications/empty"}, 'listeners/*:7080' 560 ) 561 assert 'success' in self.conf_delete('/certificates/default') 562 563 try: 564 resp = self.get_ssl(sock=sock) 565 566 except KeyboardInterrupt: 567 raise 568 569 except: 570 resp = None 571 572 assert resp is None, 'keepalive remove certificate' 573 574 @pytest.mark.skip('not yet') 575 def test_tls_certificates_remove_all(self): 576 self.load('empty') 577 578 self.certificate() 579 580 assert 'success' in self.conf_delete( 581 '/certificates' 582 ), 'remove all certificates' 583 584 def test_tls_application_respawn( 585 self, findall, skip_alert, wait_for_record 586 ): 587 self.load('mirror') 588 589 self.certificate() 590 591 assert 'success' in self.conf('1', 'applications/mirror/processes') 592 593 self.add_tls(application='mirror') 594 595 (_, sock) = self.post_ssl( 596 headers={ 597 'Host': 'localhost', 598 'Connection': 'keep-alive', 599 }, 600 start=True, 601 body='0123456789', 602 read_timeout=1, 603 ) 604 605 app_id = findall(r'(\d+)#\d+ "mirror" application started')[0] 606 607 subprocess.check_output(['kill', '-9', app_id]) 608 609 skip_alert(fr'process {app_id} exited on signal 9') 610 611 wait_for_record( 612 fr' (?!{app_id}#)(\d+)#\d+ "mirror" application started' 613 ) 614 615 resp = self.post_ssl(sock=sock, body='0123456789') 616 617 assert resp['status'] == 200, 'application respawn status' 618 assert resp['body'] == '0123456789', 'application respawn body' 619 620 def test_tls_url_scheme(self): 621 self.load('variables') 622 623 assert ( 624 self.post( 625 headers={ 626 'Host': 'localhost', 627 'Content-Type': 'text/html', 628 'Custom-Header': '', 629 'Connection': 'close', 630 } 631 )['headers']['Wsgi-Url-Scheme'] 632 == 'http' 633 ), 'url scheme http' 634 635 self.certificate() 636 637 self.add_tls(application='variables') 638 639 assert ( 640 self.post_ssl( 641 headers={ 642 'Host': 'localhost', 643 'Content-Type': 'text/html', 644 'Custom-Header': '', 645 'Connection': 'close', 646 } 647 )['headers']['Wsgi-Url-Scheme'] 648 == 'https' 649 ), 'url scheme https' 650 651 def test_tls_big_upload(self): 652 self.load('upload') 653 654 self.certificate() 655 656 self.add_tls(application='upload') 657 658 filename = 'test.txt' 659 data = '0123456789' * 9000 660 661 res = self.post_ssl( 662 body={ 663 'file': { 664 'filename': filename, 665 'type': 'text/plain', 666 'data': io.StringIO(data), 667 } 668 } 669 ) 670 assert res['status'] == 200, 'status ok' 671 assert res['body'] == f'{filename}{data}' 672 673 def test_tls_multi_listener(self): 674 self.load('empty') 675 676 self.certificate() 677 678 self.add_tls() 679 self.add_tls(port=7081) 680 681 assert self.get_ssl()['status'] == 200, 'listener #1' 682 683 assert self.get_ssl(port=7081)['status'] == 200, 'listener #2' 684