1import io 2import re 3import ssl 4import subprocess 5import time 6 7import pytest 8from unit.applications.tls import TestApplicationTLS 9from unit.option import option 10 11 12class TestTLS(TestApplicationTLS): 13 prerequisites = {'modules': {'python': 'any', 'openssl': 'any'}} 14 15 def openssl_date_to_sec_epoch(self, date): 16 return self.date_to_sec_epoch(date, '%b %d %H:%M:%S %Y %Z') 17 18 def add_tls(self, application='empty', cert='default', port=7080): 19 assert 'success' in self.conf( 20 { 21 "pass": "applications/" + application, 22 "tls": {"certificate": cert}, 23 }, 24 'listeners/*:' + str(port), 25 ) 26 27 def remove_tls(self, application='empty', port=7080): 28 assert 'success' in self.conf( 29 {"pass": "applications/" + application}, 'listeners/*:' + str(port) 30 ) 31 32 def req(self, name='localhost', subject=None, x509=False): 33 subj = subject if subject is not None else '/CN=' + name + '/' 34 35 subprocess.check_output( 36 [ 37 'openssl', 38 'req', 39 '-new', 40 '-subj', 41 subj, 42 '-config', 43 option.temp_dir + '/openssl.conf', 44 '-out', 45 option.temp_dir + '/' + name + '.csr', 46 '-keyout', 47 option.temp_dir + '/' + name + '.key', 48 ], 49 stderr=subprocess.STDOUT, 50 ) 51 52 def generate_ca_conf(self): 53 with open(option.temp_dir + '/ca.conf', 'w') as f: 54 f.write( 55 """[ ca ] 56default_ca = myca 57 58[ myca ] 59new_certs_dir = %(dir)s 60database = %(database)s 61default_md = sha256 62policy = myca_policy 63serial = %(certserial)s 64default_days = 1 65x509_extensions = myca_extensions 66copy_extensions = copy 67 68[ myca_policy ] 69commonName = optional 70 71[ myca_extensions ] 72basicConstraints = critical,CA:TRUE""" 73 % { 74 'dir': option.temp_dir, 75 'database': option.temp_dir + '/certindex', 76 'certserial': option.temp_dir + '/certserial', 77 } 78 ) 79 80 with open(option.temp_dir + '/certserial', 'w') as f: 81 f.write('1000') 82 83 with open(option.temp_dir + '/certindex', 'w') as f: 84 f.write('') 85 86 with open(option.temp_dir + '/certindex.attr', 'w') as f: 87 f.write('') 88 89 def ca(self, cert='root', out='localhost'): 90 subprocess.check_output( 91 [ 92 'openssl', 93 'ca', 94 '-batch', 95 '-config', 96 option.temp_dir + '/ca.conf', 97 '-keyfile', 98 option.temp_dir + '/' + cert + '.key', 99 '-cert', 100 option.temp_dir + '/' + cert + '.crt', 101 '-in', 102 option.temp_dir + '/' + out + '.csr', 103 '-out', 104 option.temp_dir + '/' + out + '.crt', 105 ], 106 stderr=subprocess.STDOUT, 107 ) 108 109 def set_certificate_req_context(self, cert='root'): 110 self.context = ssl.create_default_context() 111 self.context.check_hostname = False 112 self.context.verify_mode = ssl.CERT_REQUIRED 113 self.context.load_verify_locations( 114 option.temp_dir + '/' + cert + '.crt' 115 ) 116 117 def test_tls_listener_option_add(self): 118 self.load('empty') 119 120 self.certificate() 121 122 self.add_tls() 123 124 assert self.get_ssl()['status'] == 200, 'add listener option' 125 126 def test_tls_listener_option_remove(self): 127 self.load('empty') 128 129 self.certificate() 130 131 self.add_tls() 132 133 self.get_ssl() 134 135 self.remove_tls() 136 137 assert self.get()['status'] == 200, 'remove listener option' 138 139 def test_tls_certificate_remove(self): 140 self.load('empty') 141 142 self.certificate() 143 144 assert 'success' in self.conf_delete( 145 '/certificates/default' 146 ), 'remove certificate' 147 148 def test_tls_certificate_remove_used(self): 149 self.load('empty') 150 151 self.certificate() 152 153 self.add_tls() 154 155 assert 'error' in self.conf_delete( 156 '/certificates/default' 157 ), 'remove certificate' 158 159 def test_tls_certificate_remove_nonexisting(self): 160 self.load('empty') 161 162 self.certificate() 163 164 self.add_tls() 165 166 assert 'error' in self.conf_delete( 167 '/certificates/blah' 168 ), 'remove nonexistings certificate' 169 170 @pytest.mark.skip('not yet') 171 def test_tls_certificate_update(self): 172 self.load('empty') 173 174 self.certificate() 175 176 self.add_tls() 177 178 cert_old = ssl.get_server_certificate(('127.0.0.1', 7080)) 179 180 self.certificate() 181 182 assert cert_old != ssl.get_server_certificate( 183 ('127.0.0.1', 7080) 184 ), 'update certificate' 185 186 @pytest.mark.skip('not yet') 187 def test_tls_certificate_key_incorrect(self): 188 self.load('empty') 189 190 self.certificate('first', False) 191 self.certificate('second', False) 192 193 assert 'error' in self.certificate_load( 194 'first', 'second' 195 ), 'key incorrect' 196 197 def test_tls_certificate_change(self): 198 self.load('empty') 199 200 self.certificate() 201 self.certificate('new') 202 203 self.add_tls() 204 205 cert_old = ssl.get_server_certificate(('127.0.0.1', 7080)) 206 207 self.add_tls(cert='new') 208 209 assert cert_old != ssl.get_server_certificate( 210 ('127.0.0.1', 7080) 211 ), 'change certificate' 212 213 def test_tls_certificate_key_rsa(self): 214 self.load('empty') 215 216 self.certificate() 217 218 assert ( 219 self.conf_get('/certificates/default/key') == 'RSA (2048 bits)' 220 ), 'certificate key rsa' 221 222 def test_tls_certificate_key_ec(self, temp_dir): 223 self.load('empty') 224 225 self.openssl_conf() 226 227 subprocess.check_output( 228 [ 229 'openssl', 230 'ecparam', 231 '-noout', 232 '-genkey', 233 '-out', 234 temp_dir + '/ec.key', 235 '-name', 236 'prime256v1', 237 ], 238 stderr=subprocess.STDOUT, 239 ) 240 241 subprocess.check_output( 242 [ 243 'openssl', 244 'req', 245 '-x509', 246 '-new', 247 '-subj', 248 '/CN=ec/', 249 '-config', 250 temp_dir + '/openssl.conf', 251 '-key', 252 temp_dir + '/ec.key', 253 '-out', 254 temp_dir + '/ec.crt', 255 ], 256 stderr=subprocess.STDOUT, 257 ) 258 259 self.certificate_load('ec') 260 261 assert ( 262 self.conf_get('/certificates/ec/key') == 'ECDH' 263 ), 'certificate key ec' 264 265 def test_tls_certificate_chain_options(self): 266 self.load('empty') 267 268 self.certificate() 269 270 chain = self.conf_get('/certificates/default/chain') 271 272 assert len(chain) == 1, 'certificate chain length' 273 274 cert = chain[0] 275 276 assert ( 277 cert['subject']['common_name'] == 'default' 278 ), 'certificate subject common name' 279 assert ( 280 cert['issuer']['common_name'] == 'default' 281 ), 'certificate issuer common name' 282 283 assert ( 284 abs( 285 self.sec_epoch() 286 - self.openssl_date_to_sec_epoch(cert['validity']['since']) 287 ) 288 < 60 289 ), 'certificate validity since' 290 assert ( 291 self.openssl_date_to_sec_epoch(cert['validity']['until']) 292 - self.openssl_date_to_sec_epoch(cert['validity']['since']) 293 == 2592000 294 ), 'certificate validity until' 295 296 def test_tls_certificate_chain(self, temp_dir): 297 self.load('empty') 298 299 self.certificate('root', False) 300 301 self.req('int') 302 self.req('end') 303 304 self.generate_ca_conf() 305 306 self.ca(cert='root', out='int') 307 self.ca(cert='int', out='end') 308 309 crt_path = temp_dir + '/end-int.crt' 310 end_path = temp_dir + '/end.crt' 311 int_path = temp_dir + '/int.crt' 312 313 with open(crt_path, 'wb') as crt, open(end_path, 'rb') as end, open( 314 int_path, 'rb' 315 ) as int: 316 crt.write(end.read() + int.read()) 317 318 self.set_certificate_req_context() 319 320 # incomplete chain 321 322 assert 'success' in self.certificate_load( 323 'end', 'end' 324 ), 'certificate chain end upload' 325 326 chain = self.conf_get('/certificates/end/chain') 327 assert len(chain) == 1, 'certificate chain end length' 328 assert ( 329 chain[0]['subject']['common_name'] == 'end' 330 ), 'certificate chain end subject common name' 331 assert ( 332 chain[0]['issuer']['common_name'] == 'int' 333 ), 'certificate chain end issuer common name' 334 335 self.add_tls(cert='end') 336 337 try: 338 resp = self.get_ssl() 339 except ssl.SSLError: 340 resp = None 341 342 assert resp == None, 'certificate chain incomplete chain' 343 344 # intermediate 345 346 assert 'success' in self.certificate_load( 347 'int', 'int' 348 ), 'certificate chain int upload' 349 350 chain = self.conf_get('/certificates/int/chain') 351 assert len(chain) == 1, 'certificate chain int length' 352 assert ( 353 chain[0]['subject']['common_name'] == 'int' 354 ), 'certificate chain int subject common name' 355 assert ( 356 chain[0]['issuer']['common_name'] == 'root' 357 ), 'certificate chain int issuer common name' 358 359 self.add_tls(cert='int') 360 361 assert ( 362 self.get_ssl()['status'] == 200 363 ), 'certificate chain intermediate' 364 365 # intermediate server 366 367 assert 'success' in self.certificate_load( 368 'end-int', 'end' 369 ), 'certificate chain end-int upload' 370 371 chain = self.conf_get('/certificates/end-int/chain') 372 assert len(chain) == 2, 'certificate chain end-int length' 373 assert ( 374 chain[0]['subject']['common_name'] == 'end' 375 ), 'certificate chain end-int int subject common name' 376 assert ( 377 chain[0]['issuer']['common_name'] == 'int' 378 ), 'certificate chain end-int int issuer common name' 379 assert ( 380 chain[1]['subject']['common_name'] == 'int' 381 ), 'certificate chain end-int end subject common name' 382 assert ( 383 chain[1]['issuer']['common_name'] == 'root' 384 ), 'certificate chain end-int end issuer common name' 385 386 self.add_tls(cert='end-int') 387 388 assert ( 389 self.get_ssl()['status'] == 200 390 ), 'certificate chain intermediate server' 391 392 def test_tls_certificate_empty_cn(self, temp_dir): 393 self.certificate('root', False) 394 395 self.req(subject='/') 396 397 self.generate_ca_conf() 398 self.ca() 399 400 self.set_certificate_req_context() 401 402 assert 'success' in self.certificate_load('localhost', 'localhost') 403 404 cert = self.conf_get('/certificates/localhost') 405 assert cert['chain'][0]['subject'] == {}, 'empty subject' 406 assert cert['chain'][0]['issuer']['common_name'] == 'root', 'issuer' 407 408 def test_tls_certificate_empty_cn_san(self, temp_dir): 409 self.certificate('root', False) 410 411 self.openssl_conf( 412 rewrite=True, alt_names=["example.com", "www.example.net"] 413 ) 414 415 self.req(subject='/') 416 417 self.generate_ca_conf() 418 self.ca() 419 420 self.set_certificate_req_context() 421 422 assert 'success' in self.certificate_load('localhost', 'localhost') 423 424 cert = self.conf_get('/certificates/localhost') 425 assert cert['chain'][0]['subject'] == { 426 'alt_names': ['example.com', 'www.example.net'] 427 }, 'subject alt_names' 428 assert cert['chain'][0]['issuer']['common_name'] == 'root', 'issuer' 429 430 def test_tls_certificate_empty_cn_san_ip(self): 431 self.certificate('root', False) 432 433 self.openssl_conf( 434 rewrite=True, 435 alt_names=['example.com', 'www.example.net', 'IP|10.0.0.1'], 436 ) 437 438 self.req(subject='/') 439 440 self.generate_ca_conf() 441 self.ca() 442 443 self.set_certificate_req_context() 444 445 assert 'success' in self.certificate_load('localhost', 'localhost') 446 447 cert = self.conf_get('/certificates/localhost') 448 assert cert['chain'][0]['subject'] == { 449 'alt_names': ['example.com', 'www.example.net'] 450 }, 'subject alt_names' 451 assert cert['chain'][0]['issuer']['common_name'] == 'root', 'issuer' 452 453 @pytest.mark.skip('not yet') 454 def test_tls_reconfigure(self): 455 self.load('empty') 456 457 assert self.get()['status'] == 200, 'init' 458 459 self.certificate() 460 461 (resp, sock) = self.get( 462 headers={'Host': 'localhost', 'Connection': 'keep-alive'}, 463 start=True, 464 read_timeout=1, 465 ) 466 467 assert resp['status'] == 200, 'initial status' 468 469 self.add_tls() 470 471 assert self.get(sock=sock)['status'] == 200, 'reconfigure status' 472 assert self.get_ssl()['status'] == 200, 'reconfigure tls status' 473 474 def test_tls_keepalive(self): 475 self.load('mirror') 476 477 assert self.get()['status'] == 200, 'init' 478 479 self.certificate() 480 481 self.add_tls(application='mirror') 482 483 (resp, sock) = self.post_ssl( 484 headers={ 485 'Host': 'localhost', 486 'Connection': 'keep-alive', 487 'Content-Type': 'text/html', 488 }, 489 start=True, 490 body='0123456789', 491 read_timeout=1, 492 ) 493 494 assert resp['body'] == '0123456789', 'keepalive 1' 495 496 resp = self.post_ssl( 497 headers={ 498 'Host': 'localhost', 499 'Connection': 'close', 500 'Content-Type': 'text/html', 501 }, 502 sock=sock, 503 body='0123456789', 504 ) 505 506 assert resp['body'] == '0123456789', 'keepalive 2' 507 508 def test_tls_no_close_notify(self): 509 self.certificate() 510 511 assert 'success' in self.conf( 512 { 513 "listeners": { 514 "*:7080": { 515 "pass": "routes", 516 "tls": {"certificate": "default"}, 517 } 518 }, 519 "routes": [{"action": {"return": 200}}], 520 "applications": {}, 521 } 522 ), 'load application configuration' 523 524 (resp, sock) = self.get_ssl(start=True) 525 526 time.sleep(5) 527 528 sock.close() 529 530 @pytest.mark.skip('not yet') 531 def test_tls_keepalive_certificate_remove(self): 532 self.load('empty') 533 534 assert self.get()['status'] == 200, 'init' 535 536 self.certificate() 537 538 self.add_tls() 539 540 (resp, sock) = self.get_ssl( 541 headers={'Host': 'localhost', 'Connection': 'keep-alive'}, 542 start=True, 543 read_timeout=1, 544 ) 545 546 assert 'success' in self.conf( 547 {"pass": "applications/empty"}, 'listeners/*:7080' 548 ) 549 assert 'success' in self.conf_delete('/certificates/default') 550 551 try: 552 resp = self.get_ssl( 553 headers={'Host': 'localhost', 'Connection': 'close'}, sock=sock 554 ) 555 556 except KeyboardInterrupt: 557 raise 558 559 except: 560 resp = None 561 562 assert resp == None, 'keepalive remove certificate' 563 564 @pytest.mark.skip('not yet') 565 def test_tls_certificates_remove_all(self): 566 self.load('empty') 567 568 self.certificate() 569 570 assert 'success' in self.conf_delete( 571 '/certificates' 572 ), 'remove all certificates' 573 574 def test_tls_application_respawn(self, skip_alert): 575 self.load('mirror') 576 577 self.certificate() 578 579 assert 'success' in self.conf('1', 'applications/mirror/processes') 580 581 self.add_tls(application='mirror') 582 583 (_, sock) = self.post_ssl( 584 headers={ 585 'Host': 'localhost', 586 'Connection': 'keep-alive', 587 'Content-Type': 'text/html', 588 }, 589 start=True, 590 body='0123456789', 591 read_timeout=1, 592 ) 593 594 app_id = self.findall(r'(\d+)#\d+ "mirror" application started')[0] 595 596 subprocess.check_output(['kill', '-9', app_id]) 597 598 skip_alert(r'process .* %s.* exited on signal 9' % app_id) 599 600 self.wait_for_record( 601 re.compile( 602 r' (?!' + app_id + r'#)(\d+)#\d+ "mirror" application started' 603 ) 604 ) 605 606 resp = self.post_ssl( 607 headers={ 608 'Host': 'localhost', 609 'Connection': 'close', 610 'Content-Type': 'text/html', 611 }, 612 sock=sock, 613 body='0123456789', 614 ) 615 616 assert resp['status'] == 200, 'application respawn status' 617 assert resp['body'] == '0123456789', 'application respawn body' 618 619 def test_tls_url_scheme(self): 620 self.load('variables') 621 622 assert ( 623 self.post( 624 headers={ 625 'Host': 'localhost', 626 'Content-Type': 'text/html', 627 'Custom-Header': '', 628 'Connection': 'close', 629 } 630 )['headers']['Wsgi-Url-Scheme'] 631 == 'http' 632 ), 'url scheme http' 633 634 self.certificate() 635 636 self.add_tls(application='variables') 637 638 assert ( 639 self.post_ssl( 640 headers={ 641 'Host': 'localhost', 642 'Content-Type': 'text/html', 643 'Custom-Header': '', 644 'Connection': 'close', 645 } 646 )['headers']['Wsgi-Url-Scheme'] 647 == 'https' 648 ), 'url scheme https' 649 650 def test_tls_big_upload(self): 651 self.load('upload') 652 653 self.certificate() 654 655 self.add_tls(application='upload') 656 657 filename = 'test.txt' 658 data = '0123456789' * 9000 659 660 res = self.post_ssl( 661 body={ 662 'file': { 663 'filename': filename, 664 'type': 'text/plain', 665 'data': io.StringIO(data), 666 } 667 } 668 ) 669 assert res['status'] == 200, 'status ok' 670 assert res['body'] == filename + data 671 672 def test_tls_multi_listener(self): 673 self.load('empty') 674 675 self.certificate() 676 677 self.add_tls() 678 self.add_tls(port=7081) 679 680 assert self.get_ssl()['status'] == 200, 'listener #1' 681 682 assert self.get_ssl(port=7081)['status'] == 200, 'listener #2' 683