1import io 2import re 3import ssl 4import subprocess 5import time 6 7import pytest 8from unit.applications.tls import TestApplicationTLS 9from unit.option import option 10 11 12class TestTLS(TestApplicationTLS): 13 prerequisites = {'modules': {'python': 'any', 'openssl': 'any'}} 14 15 def openssl_date_to_sec_epoch(self, date): 16 return self.date_to_sec_epoch(date, '%b %d %H:%M:%S %Y %Z') 17 18 def add_tls(self, application='empty', cert='default', port=7080): 19 assert 'success' in self.conf( 20 { 21 "pass": "applications/" + application, 22 "tls": {"certificate": cert}, 23 }, 24 'listeners/*:' + str(port), 25 ) 26 27 def remove_tls(self, application='empty', port=7080): 28 assert 'success' in self.conf( 29 {"pass": "applications/" + application}, 'listeners/*:' + str(port) 30 ) 31 32 def req(self, name='localhost', subject=None, x509=False): 33 subj = subject if subject is not None else '/CN=' + name + '/' 34 35 subprocess.call( 36 [ 37 'openssl', 38 'req', 39 '-new', 40 '-subj', 41 subj, 42 '-config', 43 option.temp_dir + '/openssl.conf', 44 '-out', 45 option.temp_dir + '/' + name + '.csr', 46 '-keyout', 47 option.temp_dir + '/' + name + '.key', 48 ], 49 stderr=subprocess.STDOUT, 50 ) 51 52 def generate_ca_conf(self): 53 with open(option.temp_dir + '/ca.conf', 'w') as f: 54 f.write( 55 """[ ca ] 56default_ca = myca 57 58[ myca ] 59new_certs_dir = %(dir)s 60database = %(database)s 61default_md = sha256 62policy = myca_policy 63serial = %(certserial)s 64default_days = 1 65x509_extensions = myca_extensions 66copy_extensions = copy 67 68[ myca_policy ] 69commonName = optional 70 71[ myca_extensions ] 72basicConstraints = critical,CA:TRUE""" 73 % { 74 'dir': option.temp_dir, 75 'database': option.temp_dir + '/certindex', 76 'certserial': option.temp_dir + '/certserial', 77 } 78 ) 79 80 with open(option.temp_dir + '/certserial', 'w') as f: 81 f.write('1000') 82 83 with open(option.temp_dir + '/certindex', 'w') as f: 84 f.write('') 85 86 with open(option.temp_dir + '/certindex.attr', 'w') as f: 87 f.write('') 88 89 def ca(self, cert='root', out='localhost'): 90 subprocess.call( 91 [ 92 'openssl', 93 'ca', 94 '-batch', 95 '-config', 96 option.temp_dir + '/ca.conf', 97 '-keyfile', 98 option.temp_dir + '/' + cert + '.key', 99 '-cert', 100 option.temp_dir + '/' + cert + '.crt', 101 '-in', 102 option.temp_dir + '/' + out + '.csr', 103 '-out', 104 option.temp_dir + '/' + out + '.crt', 105 ], 106 stderr=subprocess.STDOUT, 107 ) 108 109 def set_certificate_req_context(self, cert='root'): 110 self.context = ssl.create_default_context() 111 self.context.check_hostname = False 112 self.context.verify_mode = ssl.CERT_REQUIRED 113 self.context.load_verify_locations( 114 option.temp_dir + '/' + cert + '.crt' 115 ) 116 117 def test_tls_listener_option_add(self): 118 self.load('empty') 119 120 self.certificate() 121 122 self.add_tls() 123 124 assert self.get_ssl()['status'] == 200, 'add listener option' 125 126 def test_tls_listener_option_remove(self): 127 self.load('empty') 128 129 self.certificate() 130 131 self.add_tls() 132 133 self.get_ssl() 134 135 self.remove_tls() 136 137 assert self.get()['status'] == 200, 'remove listener option' 138 139 def test_tls_certificate_remove(self): 140 self.load('empty') 141 142 self.certificate() 143 144 assert 'success' in self.conf_delete( 145 '/certificates/default' 146 ), 'remove certificate' 147 148 def test_tls_certificate_remove_used(self): 149 self.load('empty') 150 151 self.certificate() 152 153 self.add_tls() 154 155 assert 'error' in self.conf_delete( 156 '/certificates/default' 157 ), 'remove certificate' 158 159 def test_tls_certificate_remove_nonexisting(self): 160 self.load('empty') 161 162 self.certificate() 163 164 self.add_tls() 165 166 assert 'error' in self.conf_delete( 167 '/certificates/blah' 168 ), 'remove nonexistings certificate' 169 170 @pytest.mark.skip('not yet') 171 def test_tls_certificate_update(self): 172 self.load('empty') 173 174 self.certificate() 175 176 self.add_tls() 177 178 cert_old = self.get_server_certificate() 179 180 self.certificate() 181 182 assert cert_old != self.get_server_certificate(), 'update certificate' 183 184 @pytest.mark.skip('not yet') 185 def test_tls_certificate_key_incorrect(self): 186 self.load('empty') 187 188 self.certificate('first', False) 189 self.certificate('second', False) 190 191 assert 'error' in self.certificate_load( 192 'first', 'second' 193 ), 'key incorrect' 194 195 def test_tls_certificate_change(self): 196 self.load('empty') 197 198 self.certificate() 199 self.certificate('new') 200 201 self.add_tls() 202 203 cert_old = self.get_server_certificate() 204 205 self.add_tls(cert='new') 206 207 assert cert_old != self.get_server_certificate(), 'change certificate' 208 209 def test_tls_certificate_key_rsa(self): 210 self.load('empty') 211 212 self.certificate() 213 214 assert ( 215 self.conf_get('/certificates/default/key') == 'RSA (2048 bits)' 216 ), 'certificate key rsa' 217 218 def test_tls_certificate_key_ec(self, temp_dir): 219 self.load('empty') 220 221 self.openssl_conf() 222 223 subprocess.call( 224 [ 225 'openssl', 226 'ecparam', 227 '-noout', 228 '-genkey', 229 '-out', 230 temp_dir + '/ec.key', 231 '-name', 232 'prime256v1', 233 ], 234 stderr=subprocess.STDOUT, 235 ) 236 237 subprocess.call( 238 [ 239 'openssl', 240 'req', 241 '-x509', 242 '-new', 243 '-subj', 244 '/CN=ec/', 245 '-config', 246 temp_dir + '/openssl.conf', 247 '-key', 248 temp_dir + '/ec.key', 249 '-out', 250 temp_dir + '/ec.crt', 251 ], 252 stderr=subprocess.STDOUT, 253 ) 254 255 self.certificate_load('ec') 256 257 assert ( 258 self.conf_get('/certificates/ec/key') == 'ECDH' 259 ), 'certificate key ec' 260 261 def test_tls_certificate_chain_options(self): 262 self.load('empty') 263 264 self.certificate() 265 266 chain = self.conf_get('/certificates/default/chain') 267 268 assert len(chain) == 1, 'certificate chain length' 269 270 cert = chain[0] 271 272 assert ( 273 cert['subject']['common_name'] == 'default' 274 ), 'certificate subject common name' 275 assert ( 276 cert['issuer']['common_name'] == 'default' 277 ), 'certificate issuer common name' 278 279 assert ( 280 abs( 281 self.sec_epoch() 282 - self.openssl_date_to_sec_epoch(cert['validity']['since']) 283 ) 284 < 60 285 ), 'certificate validity since' 286 assert ( 287 self.openssl_date_to_sec_epoch(cert['validity']['until']) 288 - self.openssl_date_to_sec_epoch(cert['validity']['since']) 289 == 2592000 290 ), 'certificate validity until' 291 292 def test_tls_certificate_chain(self, temp_dir): 293 self.load('empty') 294 295 self.certificate('root', False) 296 297 self.req('int') 298 self.req('end') 299 300 self.generate_ca_conf() 301 302 self.ca(cert='root', out='int') 303 self.ca(cert='int', out='end') 304 305 crt_path = temp_dir + '/end-int.crt' 306 end_path = temp_dir + '/end.crt' 307 int_path = temp_dir + '/int.crt' 308 309 with open(crt_path, 'wb') as crt, open(end_path, 'rb') as end, open( 310 int_path, 'rb' 311 ) as int: 312 crt.write(end.read() + int.read()) 313 314 self.set_certificate_req_context() 315 316 # incomplete chain 317 318 assert 'success' in self.certificate_load( 319 'end', 'end' 320 ), 'certificate chain end upload' 321 322 chain = self.conf_get('/certificates/end/chain') 323 assert len(chain) == 1, 'certificate chain end length' 324 assert ( 325 chain[0]['subject']['common_name'] == 'end' 326 ), 'certificate chain end subject common name' 327 assert ( 328 chain[0]['issuer']['common_name'] == 'int' 329 ), 'certificate chain end issuer common name' 330 331 self.add_tls(cert='end') 332 333 try: 334 resp = self.get_ssl() 335 except ssl.SSLError: 336 resp = None 337 338 assert resp == None, 'certificate chain incomplete chain' 339 340 # intermediate 341 342 assert 'success' in self.certificate_load( 343 'int', 'int' 344 ), 'certificate chain int upload' 345 346 chain = self.conf_get('/certificates/int/chain') 347 assert len(chain) == 1, 'certificate chain int length' 348 assert ( 349 chain[0]['subject']['common_name'] == 'int' 350 ), 'certificate chain int subject common name' 351 assert ( 352 chain[0]['issuer']['common_name'] == 'root' 353 ), 'certificate chain int issuer common name' 354 355 self.add_tls(cert='int') 356 357 assert ( 358 self.get_ssl()['status'] == 200 359 ), 'certificate chain intermediate' 360 361 # intermediate server 362 363 assert 'success' in self.certificate_load( 364 'end-int', 'end' 365 ), 'certificate chain end-int upload' 366 367 chain = self.conf_get('/certificates/end-int/chain') 368 assert len(chain) == 2, 'certificate chain end-int length' 369 assert ( 370 chain[0]['subject']['common_name'] == 'end' 371 ), 'certificate chain end-int int subject common name' 372 assert ( 373 chain[0]['issuer']['common_name'] == 'int' 374 ), 'certificate chain end-int int issuer common name' 375 assert ( 376 chain[1]['subject']['common_name'] == 'int' 377 ), 'certificate chain end-int end subject common name' 378 assert ( 379 chain[1]['issuer']['common_name'] == 'root' 380 ), 'certificate chain end-int end issuer common name' 381 382 self.add_tls(cert='end-int') 383 384 assert ( 385 self.get_ssl()['status'] == 200 386 ), 'certificate chain intermediate server' 387 388 def test_tls_certificate_empty_cn(self, temp_dir): 389 self.certificate('root', False) 390 391 self.req(subject='/') 392 393 self.generate_ca_conf() 394 self.ca() 395 396 self.set_certificate_req_context() 397 398 assert 'success' in self.certificate_load('localhost', 'localhost') 399 400 cert = self.conf_get('/certificates/localhost') 401 assert cert['chain'][0]['subject'] == {}, 'empty subject' 402 assert cert['chain'][0]['issuer']['common_name'] == 'root', 'issuer' 403 404 def test_tls_certificate_empty_cn_san(self, temp_dir): 405 self.certificate('root', False) 406 407 self.openssl_conf( 408 rewrite=True, alt_names=["example.com", "www.example.net"] 409 ) 410 411 self.req(subject='/') 412 413 self.generate_ca_conf() 414 self.ca() 415 416 self.set_certificate_req_context() 417 418 assert 'success' in self.certificate_load('localhost', 'localhost') 419 420 cert = self.conf_get('/certificates/localhost') 421 assert cert['chain'][0]['subject'] == { 422 'alt_names': ['example.com', 'www.example.net'] 423 }, 'subject alt_names' 424 assert cert['chain'][0]['issuer']['common_name'] == 'root', 'issuer' 425 426 def test_tls_certificate_empty_cn_san_ip(self): 427 self.certificate('root', False) 428 429 self.openssl_conf( 430 rewrite=True, 431 alt_names=['example.com', 'www.example.net', 'IP|10.0.0.1'], 432 ) 433 434 self.req(subject='/') 435 436 self.generate_ca_conf() 437 self.ca() 438 439 self.set_certificate_req_context() 440 441 assert 'success' in self.certificate_load('localhost', 'localhost') 442 443 cert = self.conf_get('/certificates/localhost') 444 assert cert['chain'][0]['subject'] == { 445 'alt_names': ['example.com', 'www.example.net'] 446 }, 'subject alt_names' 447 assert cert['chain'][0]['issuer']['common_name'] == 'root', 'issuer' 448 449 @pytest.mark.skip('not yet') 450 def test_tls_reconfigure(self): 451 self.load('empty') 452 453 assert self.get()['status'] == 200, 'init' 454 455 self.certificate() 456 457 (resp, sock) = self.get( 458 headers={'Host': 'localhost', 'Connection': 'keep-alive'}, 459 start=True, 460 read_timeout=1, 461 ) 462 463 assert resp['status'] == 200, 'initial status' 464 465 self.add_tls() 466 467 assert self.get(sock=sock)['status'] == 200, 'reconfigure status' 468 assert self.get_ssl()['status'] == 200, 'reconfigure tls status' 469 470 def test_tls_keepalive(self): 471 self.load('mirror') 472 473 assert self.get()['status'] == 200, 'init' 474 475 self.certificate() 476 477 self.add_tls(application='mirror') 478 479 (resp, sock) = self.post_ssl( 480 headers={ 481 'Host': 'localhost', 482 'Connection': 'keep-alive', 483 'Content-Type': 'text/html', 484 }, 485 start=True, 486 body='0123456789', 487 read_timeout=1, 488 ) 489 490 assert resp['body'] == '0123456789', 'keepalive 1' 491 492 resp = self.post_ssl( 493 headers={ 494 'Host': 'localhost', 495 'Connection': 'close', 496 'Content-Type': 'text/html', 497 }, 498 sock=sock, 499 body='0123456789', 500 ) 501 502 assert resp['body'] == '0123456789', 'keepalive 2' 503 504 def test_tls_no_close_notify(self): 505 self.certificate() 506 507 assert 'success' in self.conf( 508 { 509 "listeners": { 510 "*:7080": { 511 "pass": "routes", 512 "tls": {"certificate": "default"}, 513 } 514 }, 515 "routes": [{"action": {"return": 200}}], 516 "applications": {}, 517 } 518 ), 'load application configuration' 519 520 (resp, sock) = self.get_ssl(start=True) 521 522 time.sleep(5) 523 524 sock.close() 525 526 @pytest.mark.skip('not yet') 527 def test_tls_keepalive_certificate_remove(self): 528 self.load('empty') 529 530 assert self.get()['status'] == 200, 'init' 531 532 self.certificate() 533 534 self.add_tls() 535 536 (resp, sock) = self.get_ssl( 537 headers={'Host': 'localhost', 'Connection': 'keep-alive'}, 538 start=True, 539 read_timeout=1, 540 ) 541 542 assert 'success' in self.conf( 543 {"pass": "applications/empty"}, 'listeners/*:7080' 544 ) 545 assert 'success' in self.conf_delete('/certificates/default') 546 547 try: 548 resp = self.get_ssl( 549 headers={'Host': 'localhost', 'Connection': 'close'}, sock=sock 550 ) 551 552 except KeyboardInterrupt: 553 raise 554 555 except: 556 resp = None 557 558 assert resp == None, 'keepalive remove certificate' 559 560 @pytest.mark.skip('not yet') 561 def test_tls_certificates_remove_all(self): 562 self.load('empty') 563 564 self.certificate() 565 566 assert 'success' in self.conf_delete( 567 '/certificates' 568 ), 'remove all certificates' 569 570 def test_tls_application_respawn(self, skip_alert): 571 self.load('mirror') 572 573 self.certificate() 574 575 assert 'success' in self.conf('1', 'applications/mirror/processes') 576 577 self.add_tls(application='mirror') 578 579 (_, sock) = self.post_ssl( 580 headers={ 581 'Host': 'localhost', 582 'Connection': 'keep-alive', 583 'Content-Type': 'text/html', 584 }, 585 start=True, 586 body='0123456789', 587 read_timeout=1, 588 ) 589 590 app_id = self.findall(r'(\d+)#\d+ "mirror" application started')[0] 591 592 subprocess.call(['kill', '-9', app_id]) 593 594 skip_alert(r'process .* %s.* exited on signal 9' % app_id) 595 596 self.wait_for_record( 597 re.compile( 598 r' (?!' + app_id + r'#)(\d+)#\d+ "mirror" application started' 599 ) 600 ) 601 602 resp = self.post_ssl( 603 headers={ 604 'Host': 'localhost', 605 'Connection': 'close', 606 'Content-Type': 'text/html', 607 }, 608 sock=sock, 609 body='0123456789', 610 ) 611 612 assert resp['status'] == 200, 'application respawn status' 613 assert resp['body'] == '0123456789', 'application respawn body' 614 615 def test_tls_url_scheme(self): 616 self.load('variables') 617 618 assert ( 619 self.post( 620 headers={ 621 'Host': 'localhost', 622 'Content-Type': 'text/html', 623 'Custom-Header': '', 624 'Connection': 'close', 625 } 626 )['headers']['Wsgi-Url-Scheme'] 627 == 'http' 628 ), 'url scheme http' 629 630 self.certificate() 631 632 self.add_tls(application='variables') 633 634 assert ( 635 self.post_ssl( 636 headers={ 637 'Host': 'localhost', 638 'Content-Type': 'text/html', 639 'Custom-Header': '', 640 'Connection': 'close', 641 } 642 )['headers']['Wsgi-Url-Scheme'] 643 == 'https' 644 ), 'url scheme https' 645 646 def test_tls_big_upload(self): 647 self.load('upload') 648 649 self.certificate() 650 651 self.add_tls(application='upload') 652 653 filename = 'test.txt' 654 data = '0123456789' * 9000 655 656 res = self.post_ssl( 657 body={ 658 'file': { 659 'filename': filename, 660 'type': 'text/plain', 661 'data': io.StringIO(data), 662 } 663 } 664 ) 665 assert res['status'] == 200, 'status ok' 666 assert res['body'] == filename + data 667 668 def test_tls_multi_listener(self): 669 self.load('empty') 670 671 self.certificate() 672 673 self.add_tls() 674 self.add_tls(port=7081) 675 676 assert self.get_ssl()['status'] == 200, 'listener #1' 677 678 assert self.get_ssl(port=7081)['status'] == 200, 'listener #2' 679