1import io 2import ssl 3import subprocess 4import time 5from pathlib import Path 6 7import pytest 8 9from unit.applications.tls import ApplicationTLS 10from unit.option import option 11 12prerequisites = {'modules': {'python': 'any', 'openssl': 'any'}} 13 14client = ApplicationTLS() 15 16 17def add_tls(application='empty', cert='default', port=8080): 18 assert 'success' in client.conf( 19 { 20 "pass": f"applications/{application}", 21 "tls": {"certificate": cert}, 22 }, 23 f'listeners/*:{port}', 24 ) 25 26 27def ca(cert='root', out='localhost'): 28 subprocess.check_output( 29 [ 30 'openssl', 31 'ca', 32 '-batch', 33 '-config', 34 f'{option.temp_dir}/ca.conf', 35 '-keyfile', 36 f'{option.temp_dir}/{cert}.key', 37 '-cert', 38 f'{option.temp_dir}/{cert}.crt', 39 '-in', 40 f'{option.temp_dir}/{out}.csr', 41 '-out', 42 f'{option.temp_dir}/{out}.crt', 43 ], 44 stderr=subprocess.STDOUT, 45 ) 46 47 48def context_cert_req(cert='root'): 49 context = ssl.create_default_context() 50 context.check_hostname = False 51 context.verify_mode = ssl.CERT_REQUIRED 52 context.load_verify_locations(f'{option.temp_dir}/{cert}.crt') 53 54 return context 55 56 57def generate_ca_conf(): 58 Path(f'{option.temp_dir}/ca.conf').write_text( 59 f"""[ ca ] 60default_ca = myca 61 62[ myca ] 63new_certs_dir = {option.temp_dir} 64database = {option.temp_dir}/certindex 65default_md = sha256 66policy = myca_policy 67serial = {option.temp_dir}/certserial 68default_days = 1 69x509_extensions = myca_extensions 70copy_extensions = copy 71 72[ myca_policy ] 73commonName = optional 74 75[ myca_extensions ] 76basicConstraints = critical,CA:TRUE""", 77 encoding='utf-8', 78 ) 79 80 Path(f'{option.temp_dir}/certserial').write_text('1000', encoding='utf-8') 81 Path(f'{option.temp_dir}/certindex').touch() 82 Path(f'{option.temp_dir}/certindex.attr').touch() 83 84 85def remove_tls(application='empty', port=8080): 86 assert 'success' in client.conf( 87 {"pass": f"applications/{application}"}, f'listeners/*:{port}' 88 ) 89 90 91def req(name='localhost', subject=None): 92 subj = subject if subject is not None else f'/CN={name}/' 93 94 subprocess.check_output( 95 [ 96 'openssl', 97 'req', 98 '-new', 99 '-subj', 100 subj, 101 '-config', 102 f'{option.temp_dir}/openssl.conf', 103 '-out', 104 f'{option.temp_dir}/{name}.csr', 105 '-keyout', 106 f'{option.temp_dir}/{name}.key', 107 ], 108 stderr=subprocess.STDOUT, 109 ) 110 111 112def test_tls_listener_option_add(): 113 client.load('empty') 114 115 client.certificate() 116 117 add_tls() 118 119 assert client.get_ssl()['status'] == 200, 'add listener option' 120 121 122def test_tls_listener_option_remove(): 123 client.load('empty') 124 125 client.certificate() 126 127 add_tls() 128 129 client.get_ssl() 130 131 remove_tls() 132 133 assert client.get()['status'] == 200, 'remove listener option' 134 135 136def test_tls_certificate_remove(): 137 client.load('empty') 138 139 client.certificate() 140 141 assert 'success' in client.conf_delete( 142 '/certificates/default' 143 ), 'remove certificate' 144 145 146def test_tls_certificate_remove_used(): 147 client.load('empty') 148 149 client.certificate() 150 151 add_tls() 152 153 assert 'error' in client.conf_delete( 154 '/certificates/default' 155 ), 'remove certificate' 156 157 158def test_tls_certificate_remove_nonexisting(): 159 client.load('empty') 160 161 client.certificate() 162 163 add_tls() 164 165 assert 'error' in client.conf_delete( 166 '/certificates/blah' 167 ), 'remove nonexistings certificate' 168 169 170@pytest.mark.skip('not yet') 171def test_tls_certificate_update(): 172 client.load('empty') 173 174 client.certificate() 175 176 add_tls() 177 178 cert_old = ssl.get_server_certificate(('127.0.0.1', 8080)) 179 180 client.certificate() 181 182 assert cert_old != ssl.get_server_certificate( 183 ('127.0.0.1', 8080) 184 ), 'update certificate' 185 186 187@pytest.mark.skip('not yet') 188def test_tls_certificate_key_incorrect(): 189 client.load('empty') 190 191 client.certificate('first', False) 192 client.certificate('second', False) 193 194 assert 'error' in client.certificate_load( 195 'first', 'second' 196 ), 'key incorrect' 197 198 199def test_tls_certificate_change(): 200 client.load('empty') 201 202 client.certificate() 203 client.certificate('new') 204 205 add_tls() 206 207 cert_old = ssl.get_server_certificate(('127.0.0.1', 8080)) 208 209 add_tls(cert='new') 210 211 assert cert_old != ssl.get_server_certificate( 212 ('127.0.0.1', 8080) 213 ), 'change certificate' 214 215 216def test_tls_certificate_key_rsa(): 217 client.load('empty') 218 219 client.certificate() 220 221 assert ( 222 client.conf_get('/certificates/default/key') == 'RSA (2048 bits)' 223 ), 'certificate key rsa' 224 225 226def test_tls_certificate_key_ec(temp_dir): 227 client.load('empty') 228 229 client.openssl_conf() 230 231 subprocess.check_output( 232 [ 233 'openssl', 234 'ecparam', 235 '-noout', 236 '-genkey', 237 '-out', 238 f'{temp_dir}/ec.key', 239 '-name', 240 'prime256v1', 241 ], 242 stderr=subprocess.STDOUT, 243 ) 244 245 subprocess.check_output( 246 [ 247 'openssl', 248 'req', 249 '-x509', 250 '-new', 251 '-subj', 252 '/CN=ec/', 253 '-config', 254 f'{temp_dir}/openssl.conf', 255 '-key', 256 f'{temp_dir}/ec.key', 257 '-out', 258 f'{temp_dir}/ec.crt', 259 ], 260 stderr=subprocess.STDOUT, 261 ) 262 263 client.certificate_load('ec') 264 265 assert ( 266 client.conf_get('/certificates/ec/key') == 'ECDH' 267 ), 'certificate key ec' 268 269 270def test_tls_certificate_chain_options(date_to_sec_epoch, sec_epoch): 271 client.load('empty') 272 date_format = '%b %d %X %Y %Z' 273 274 client.certificate() 275 276 chain = client.conf_get('/certificates/default/chain') 277 278 assert len(chain) == 1, 'certificate chain length' 279 280 cert = chain[0] 281 282 assert ( 283 cert['subject']['common_name'] == 'default' 284 ), 'certificate subject common name' 285 assert ( 286 cert['issuer']['common_name'] == 'default' 287 ), 'certificate issuer common name' 288 289 assert ( 290 abs( 291 sec_epoch 292 - date_to_sec_epoch(cert['validity']['since'], date_format) 293 ) 294 < 60 295 ), 'certificate validity since' 296 assert ( 297 date_to_sec_epoch(cert['validity']['until'], date_format) 298 - date_to_sec_epoch(cert['validity']['since'], date_format) 299 == 2592000 300 ), 'certificate validity until' 301 302 303def test_tls_certificate_chain(temp_dir): 304 client.load('empty') 305 306 client.certificate('root', False) 307 308 req('int') 309 req('end') 310 311 generate_ca_conf() 312 313 ca(cert='root', out='int') 314 ca(cert='int', out='end') 315 316 crt_path = f'{temp_dir}/end-int.crt' 317 end_path = f'{temp_dir}/end.crt' 318 int_path = f'{temp_dir}/int.crt' 319 320 with open(crt_path, 'wb') as crt, open(end_path, 'rb') as end, open( 321 int_path, 'rb' 322 ) as inter: 323 crt.write(end.read() + inter.read()) 324 325 # incomplete chain 326 327 assert 'success' in client.certificate_load( 328 'end', 'end' 329 ), 'certificate chain end upload' 330 331 chain = client.conf_get('/certificates/end/chain') 332 assert len(chain) == 1, 'certificate chain end length' 333 assert ( 334 chain[0]['subject']['common_name'] == 'end' 335 ), 'certificate chain end subject common name' 336 assert ( 337 chain[0]['issuer']['common_name'] == 'int' 338 ), 'certificate chain end issuer common name' 339 340 add_tls(cert='end') 341 342 ctx_cert_req = context_cert_req() 343 try: 344 resp = client.get_ssl(context=ctx_cert_req) 345 except ssl.SSLError: 346 resp = None 347 348 assert resp is None, 'certificate chain incomplete chain' 349 350 # intermediate 351 352 assert 'success' in client.certificate_load( 353 'int', 'int' 354 ), 'certificate chain int upload' 355 356 chain = client.conf_get('/certificates/int/chain') 357 assert len(chain) == 1, 'certificate chain int length' 358 assert ( 359 chain[0]['subject']['common_name'] == 'int' 360 ), 'certificate chain int subject common name' 361 assert ( 362 chain[0]['issuer']['common_name'] == 'root' 363 ), 'certificate chain int issuer common name' 364 365 add_tls(cert='int') 366 367 assert client.get_ssl()['status'] == 200, 'certificate chain intermediate' 368 369 # intermediate server 370 371 assert 'success' in client.certificate_load( 372 'end-int', 'end' 373 ), 'certificate chain end-int upload' 374 375 chain = client.conf_get('/certificates/end-int/chain') 376 assert len(chain) == 2, 'certificate chain end-int length' 377 assert ( 378 chain[0]['subject']['common_name'] == 'end' 379 ), 'certificate chain end-int int subject common name' 380 assert ( 381 chain[0]['issuer']['common_name'] == 'int' 382 ), 'certificate chain end-int int issuer common name' 383 assert ( 384 chain[1]['subject']['common_name'] == 'int' 385 ), 'certificate chain end-int end subject common name' 386 assert ( 387 chain[1]['issuer']['common_name'] == 'root' 388 ), 'certificate chain end-int end issuer common name' 389 390 add_tls(cert='end-int') 391 392 assert ( 393 client.get_ssl(context=ctx_cert_req)['status'] == 200 394 ), 'certificate chain intermediate server' 395 396 397def test_tls_certificate_chain_long(temp_dir): 398 client.load('empty') 399 400 generate_ca_conf() 401 402 # Minimum chain length is 3. 403 chain_length = 10 404 405 for i in range(chain_length): 406 if i == 0: 407 client.certificate('root', False) 408 elif i == chain_length - 1: 409 req('end') 410 else: 411 req(f'int{i}') 412 413 for i in range(chain_length - 1): 414 if i == 0: 415 ca(cert='root', out='int1') 416 elif i == chain_length - 2: 417 ca(cert=f'int{(chain_length - 2)}', out='end') 418 else: 419 ca(cert=f'int{i}', out=f'int{(i + 1)}') 420 421 for i in range(chain_length - 1, 0, -1): 422 path = ( 423 f'{temp_dir}/end.crt' 424 if i == chain_length - 1 425 else f'{temp_dir}/int{i}.crt' 426 ) 427 428 with open(f'{temp_dir}/all.crt', 'a', encoding='utf-8') as chain, open( 429 path, encoding='utf-8' 430 ) as cert: 431 chain.write(cert.read()) 432 433 assert 'success' in client.certificate_load( 434 'all', 'end' 435 ), 'certificate chain upload' 436 437 chain = client.conf_get('/certificates/all/chain') 438 assert len(chain) == chain_length - 1, 'certificate chain length' 439 440 add_tls(cert='all') 441 442 assert ( 443 client.get_ssl(context=context_cert_req())['status'] == 200 444 ), 'certificate chain long' 445 446 447def test_tls_certificate_empty_cn(): 448 client.certificate('root', False) 449 450 req(subject='/') 451 452 generate_ca_conf() 453 ca() 454 455 assert 'success' in client.certificate_load('localhost', 'localhost') 456 457 cert = client.conf_get('/certificates/localhost') 458 assert cert['chain'][0]['subject'] == {}, 'empty subject' 459 assert cert['chain'][0]['issuer']['common_name'] == 'root', 'issuer' 460 461 462def test_tls_certificate_empty_cn_san(): 463 client.certificate('root', False) 464 465 client.openssl_conf( 466 rewrite=True, alt_names=["example.com", "www.example.net"] 467 ) 468 469 req(subject='/') 470 471 generate_ca_conf() 472 ca() 473 474 assert 'success' in client.certificate_load('localhost', 'localhost') 475 476 cert = client.conf_get('/certificates/localhost') 477 assert cert['chain'][0]['subject'] == { 478 'alt_names': ['example.com', 'www.example.net'] 479 }, 'subject alt_names' 480 assert cert['chain'][0]['issuer']['common_name'] == 'root', 'issuer' 481 482 483def test_tls_certificate_empty_cn_san_ip(): 484 client.certificate('root', False) 485 486 client.openssl_conf( 487 rewrite=True, 488 alt_names=['example.com', 'www.example.net', 'IP|10.0.0.1'], 489 ) 490 491 req(subject='/') 492 493 generate_ca_conf() 494 ca() 495 496 assert 'success' in client.certificate_load('localhost', 'localhost') 497 498 cert = client.conf_get('/certificates/localhost') 499 assert cert['chain'][0]['subject'] == { 500 'alt_names': ['example.com', 'www.example.net'] 501 }, 'subject alt_names' 502 assert cert['chain'][0]['issuer']['common_name'] == 'root', 'issuer' 503 504 505def test_tls_keepalive(): 506 client.load('mirror') 507 508 assert client.get()['status'] == 200, 'init' 509 510 client.certificate() 511 512 add_tls(application='mirror') 513 514 (resp, sock) = client.post_ssl( 515 headers={ 516 'Host': 'localhost', 517 'Connection': 'keep-alive', 518 }, 519 start=True, 520 body='0123456789', 521 read_timeout=1, 522 ) 523 524 assert resp['body'] == '0123456789', 'keepalive 1' 525 526 resp = client.post_ssl( 527 headers={ 528 'Host': 'localhost', 529 'Connection': 'close', 530 }, 531 sock=sock, 532 body='0123456789', 533 ) 534 535 assert resp['body'] == '0123456789', 'keepalive 2' 536 537 538def test_tls_no_close_notify(): 539 client.certificate() 540 541 assert 'success' in client.conf( 542 { 543 "listeners": { 544 "*:8080": { 545 "pass": "routes", 546 "tls": {"certificate": "default"}, 547 } 548 }, 549 "routes": [{"action": {"return": 200}}], 550 "applications": {}, 551 } 552 ), 'load application configuration' 553 554 (_, sock) = client.get_ssl(start=True) 555 556 time.sleep(5) 557 558 sock.close() 559 560 561@pytest.mark.skip('not yet') 562def test_tls_keepalive_certificate_remove(): 563 client.load('empty') 564 565 assert client.get()['status'] == 200, 'init' 566 567 client.certificate() 568 569 add_tls() 570 571 (resp, sock) = client.get_ssl( 572 headers={'Host': 'localhost', 'Connection': 'keep-alive'}, 573 start=True, 574 read_timeout=1, 575 ) 576 577 assert 'success' in client.conf( 578 {"pass": "applications/empty"}, 'listeners/*:8080' 579 ) 580 assert 'success' in client.conf_delete('/certificates/default') 581 582 try: 583 resp = client.get_ssl(sock=sock) 584 585 except KeyboardInterrupt: 586 raise 587 588 except: 589 resp = None 590 591 assert resp is None, 'keepalive remove certificate' 592 593 594@pytest.mark.skip('not yet') 595def test_tls_certificates_remove_all(): 596 client.load('empty') 597 598 client.certificate() 599 600 assert 'success' in client.conf_delete( 601 '/certificates' 602 ), 'remove all certificates' 603 604 605def test_tls_application_respawn(findall, skip_alert, wait_for_record): 606 client.load('mirror') 607 608 client.certificate() 609 610 assert 'success' in client.conf('1', 'applications/mirror/processes') 611 612 add_tls(application='mirror') 613 614 (_, sock) = client.post_ssl( 615 headers={ 616 'Host': 'localhost', 617 'Connection': 'keep-alive', 618 }, 619 start=True, 620 body='0123456789', 621 read_timeout=1, 622 ) 623 624 app_id = findall(r'(\d+)#\d+ "mirror" application started')[0] 625 626 subprocess.check_output(['kill', '-9', app_id]) 627 628 skip_alert(fr'process {app_id} exited on signal 9') 629 630 wait_for_record(fr' (?!{app_id}#)(\d+)#\d+ "mirror" application started') 631 632 resp = client.post_ssl(sock=sock, body='0123456789') 633 634 assert resp['status'] == 200, 'application respawn status' 635 assert resp['body'] == '0123456789', 'application respawn body' 636 637 638def test_tls_url_scheme(): 639 client.load('variables') 640 641 assert ( 642 client.post( 643 headers={ 644 'Host': 'localhost', 645 'Content-Type': 'text/html', 646 'Custom-Header': '', 647 'Connection': 'close', 648 } 649 )['headers']['Wsgi-Url-Scheme'] 650 == 'http' 651 ), 'url scheme http' 652 653 client.certificate() 654 655 add_tls(application='variables') 656 657 assert ( 658 client.post_ssl( 659 headers={ 660 'Host': 'localhost', 661 'Content-Type': 'text/html', 662 'Custom-Header': '', 663 'Connection': 'close', 664 } 665 )['headers']['Wsgi-Url-Scheme'] 666 == 'https' 667 ), 'url scheme https' 668 669 670def test_tls_big_upload(): 671 client.load('upload') 672 673 client.certificate() 674 675 add_tls(application='upload') 676 677 filename = 'test.txt' 678 data = '0123456789' * 9000 679 680 res = client.post_ssl( 681 body={ 682 'file': { 683 'filename': filename, 684 'type': 'text/plain', 685 'data': io.StringIO(data), 686 } 687 } 688 ) 689 assert res['status'] == 200, 'status ok' 690 assert res['body'] == f'{filename}{data}' 691 692 693def test_tls_multi_listener(): 694 client.load('empty') 695 696 client.certificate() 697 698 add_tls() 699 add_tls(port=8081) 700 701 assert client.get_ssl()['status'] == 200, 'listener #1' 702 703 assert client.get_ssl(port=8081)['status'] == 200, 'listener #2' 704