1import io 2import re 3import ssl 4import subprocess 5 6import pytest 7 8from unit.applications.tls import TestApplicationTLS 9 10 11class TestTLS(TestApplicationTLS): 12 prerequisites = {'modules': {'python': 'any', 'openssl': 'any'}} 13 14 def openssl_date_to_sec_epoch(self, date): 15 return self.date_to_sec_epoch(date, '%b %d %H:%M:%S %Y %Z') 16 17 def add_tls(self, application='empty', cert='default', port=7080): 18 assert 'success' in self.conf( 19 { 20 "pass": "applications/" + application, 21 "tls": {"certificate": cert}, 22 }, 23 'listeners/*:' + str(port), 24 ) 25 26 def remove_tls(self, application='empty', port=7080): 27 assert 'success' in self.conf( 28 {"pass": "applications/" + application}, 'listeners/*:' + str(port) 29 ) 30 31 def test_tls_listener_option_add(self): 32 self.load('empty') 33 34 self.certificate() 35 36 self.add_tls() 37 38 assert self.get_ssl()['status'] == 200, 'add listener option' 39 40 def test_tls_listener_option_remove(self): 41 self.load('empty') 42 43 self.certificate() 44 45 self.add_tls() 46 47 self.get_ssl() 48 49 self.remove_tls() 50 51 assert self.get()['status'] == 200, 'remove listener option' 52 53 def test_tls_certificate_remove(self): 54 self.load('empty') 55 56 self.certificate() 57 58 assert 'success' in self.conf_delete( 59 '/certificates/default' 60 ), 'remove certificate' 61 62 def test_tls_certificate_remove_used(self): 63 self.load('empty') 64 65 self.certificate() 66 67 self.add_tls() 68 69 assert 'error' in self.conf_delete( 70 '/certificates/default' 71 ), 'remove certificate' 72 73 def test_tls_certificate_remove_nonexisting(self): 74 self.load('empty') 75 76 self.certificate() 77 78 self.add_tls() 79 80 assert 'error' in self.conf_delete( 81 '/certificates/blah' 82 ), 'remove nonexistings certificate' 83 84 @pytest.mark.skip('not yet') 85 def test_tls_certificate_update(self): 86 self.load('empty') 87 88 self.certificate() 89 90 self.add_tls() 91 92 cert_old = self.get_server_certificate() 93 94 self.certificate() 95 96 assert cert_old != self.get_server_certificate(), 'update certificate' 97 98 @pytest.mark.skip('not yet') 99 def test_tls_certificate_key_incorrect(self): 100 self.load('empty') 101 102 self.certificate('first', False) 103 self.certificate('second', False) 104 105 assert 'error' in self.certificate_load( 106 'first', 'second' 107 ), 'key incorrect' 108 109 def test_tls_certificate_change(self): 110 self.load('empty') 111 112 self.certificate() 113 self.certificate('new') 114 115 self.add_tls() 116 117 cert_old = self.get_server_certificate() 118 119 self.add_tls(cert='new') 120 121 assert cert_old != self.get_server_certificate(), 'change certificate' 122 123 def test_tls_certificate_key_rsa(self): 124 self.load('empty') 125 126 self.certificate() 127 128 assert ( 129 self.conf_get('/certificates/default/key') == 'RSA (2048 bits)' 130 ), 'certificate key rsa' 131 132 def test_tls_certificate_key_ec(self, temp_dir): 133 self.load('empty') 134 135 self.openssl_conf() 136 137 subprocess.call( 138 [ 139 'openssl', 140 'ecparam', 141 '-noout', 142 '-genkey', 143 '-out', 144 temp_dir + '/ec.key', 145 '-name', 146 'prime256v1', 147 ], 148 stderr=subprocess.STDOUT, 149 ) 150 151 subprocess.call( 152 [ 153 'openssl', 154 'req', 155 '-x509', 156 '-new', 157 '-subj', 158 '/CN=ec/', 159 '-config', 160 temp_dir + '/openssl.conf', 161 '-key', 162 temp_dir + '/ec.key', 163 '-out', 164 temp_dir + '/ec.crt', 165 ], 166 stderr=subprocess.STDOUT, 167 ) 168 169 self.certificate_load('ec') 170 171 assert ( 172 self.conf_get('/certificates/ec/key') == 'ECDH' 173 ), 'certificate key ec' 174 175 def test_tls_certificate_chain_options(self): 176 self.load('empty') 177 178 self.certificate() 179 180 chain = self.conf_get('/certificates/default/chain') 181 182 assert len(chain) == 1, 'certificate chain length' 183 184 cert = chain[0] 185 186 assert ( 187 cert['subject']['common_name'] == 'default' 188 ), 'certificate subject common name' 189 assert ( 190 cert['issuer']['common_name'] == 'default' 191 ), 'certificate issuer common name' 192 193 assert ( 194 abs( 195 self.sec_epoch() 196 - self.openssl_date_to_sec_epoch(cert['validity']['since']) 197 ) 198 < 60 199 ), 'certificate validity since' 200 assert ( 201 self.openssl_date_to_sec_epoch(cert['validity']['until']) 202 - self.openssl_date_to_sec_epoch(cert['validity']['since']) 203 == 2592000 204 ), 'certificate validity until' 205 206 def test_tls_certificate_chain(self, temp_dir): 207 self.load('empty') 208 209 self.certificate('root', False) 210 211 subprocess.call( 212 [ 213 'openssl', 214 'req', 215 '-new', 216 '-subj', 217 '/CN=int/', 218 '-config', 219 temp_dir + '/openssl.conf', 220 '-out', 221 temp_dir + '/int.csr', 222 '-keyout', 223 temp_dir + '/int.key', 224 ], 225 stderr=subprocess.STDOUT, 226 ) 227 228 subprocess.call( 229 [ 230 'openssl', 231 'req', 232 '-new', 233 '-subj', 234 '/CN=end/', 235 '-config', 236 temp_dir + '/openssl.conf', 237 '-out', 238 temp_dir + '/end.csr', 239 '-keyout', 240 temp_dir + '/end.key', 241 ], 242 stderr=subprocess.STDOUT, 243 ) 244 245 with open(temp_dir + '/ca.conf', 'w') as f: 246 f.write( 247 """[ ca ] 248default_ca = myca 249 250[ myca ] 251new_certs_dir = %(dir)s 252database = %(database)s 253default_md = sha256 254policy = myca_policy 255serial = %(certserial)s 256default_days = 1 257x509_extensions = myca_extensions 258 259[ myca_policy ] 260commonName = supplied 261 262[ myca_extensions ] 263basicConstraints = critical,CA:TRUE""" 264 % { 265 'dir': temp_dir, 266 'database': temp_dir + '/certindex', 267 'certserial': temp_dir + '/certserial', 268 } 269 ) 270 271 with open(temp_dir + '/certserial', 'w') as f: 272 f.write('1000') 273 274 with open(temp_dir + '/certindex', 'w') as f: 275 f.write('') 276 277 subprocess.call( 278 [ 279 'openssl', 280 'ca', 281 '-batch', 282 '-subj', 283 '/CN=int/', 284 '-config', 285 temp_dir + '/ca.conf', 286 '-keyfile', 287 temp_dir + '/root.key', 288 '-cert', 289 temp_dir + '/root.crt', 290 '-in', 291 temp_dir + '/int.csr', 292 '-out', 293 temp_dir + '/int.crt', 294 ], 295 stderr=subprocess.STDOUT, 296 ) 297 298 subprocess.call( 299 [ 300 'openssl', 301 'ca', 302 '-batch', 303 '-subj', 304 '/CN=end/', 305 '-config', 306 temp_dir + '/ca.conf', 307 '-keyfile', 308 temp_dir + '/int.key', 309 '-cert', 310 temp_dir + '/int.crt', 311 '-in', 312 temp_dir + '/end.csr', 313 '-out', 314 temp_dir + '/end.crt', 315 ], 316 stderr=subprocess.STDOUT, 317 ) 318 319 crt_path = temp_dir + '/end-int.crt' 320 end_path = temp_dir + '/end.crt' 321 int_path = temp_dir + '/int.crt' 322 323 with open(crt_path, 'wb') as crt, open(end_path, 'rb') as end, open( 324 int_path, 'rb' 325 ) as int: 326 crt.write(end.read() + int.read()) 327 328 self.context = ssl.create_default_context() 329 self.context.check_hostname = False 330 self.context.verify_mode = ssl.CERT_REQUIRED 331 self.context.load_verify_locations(temp_dir + '/root.crt') 332 333 # incomplete chain 334 335 assert 'success' in self.certificate_load( 336 'end', 'end' 337 ), 'certificate chain end upload' 338 339 chain = self.conf_get('/certificates/end/chain') 340 assert len(chain) == 1, 'certificate chain end length' 341 assert ( 342 chain[0]['subject']['common_name'] == 'end' 343 ), 'certificate chain end subject common name' 344 assert ( 345 chain[0]['issuer']['common_name'] == 'int' 346 ), 'certificate chain end issuer common name' 347 348 self.add_tls(cert='end') 349 350 try: 351 resp = self.get_ssl() 352 except ssl.SSLError: 353 resp = None 354 355 assert resp == None, 'certificate chain incomplete chain' 356 357 # intermediate 358 359 assert 'success' in self.certificate_load( 360 'int', 'int' 361 ), 'certificate chain int upload' 362 363 chain = self.conf_get('/certificates/int/chain') 364 assert len(chain) == 1, 'certificate chain int length' 365 assert ( 366 chain[0]['subject']['common_name'] == 'int' 367 ), 'certificate chain int subject common name' 368 assert ( 369 chain[0]['issuer']['common_name'] == 'root' 370 ), 'certificate chain int issuer common name' 371 372 self.add_tls(cert='int') 373 374 assert ( 375 self.get_ssl()['status'] == 200 376 ), 'certificate chain intermediate' 377 378 # intermediate server 379 380 assert 'success' in self.certificate_load( 381 'end-int', 'end' 382 ), 'certificate chain end-int upload' 383 384 chain = self.conf_get('/certificates/end-int/chain') 385 assert len(chain) == 2, 'certificate chain end-int length' 386 assert ( 387 chain[0]['subject']['common_name'] == 'end' 388 ), 'certificate chain end-int int subject common name' 389 assert ( 390 chain[0]['issuer']['common_name'] == 'int' 391 ), 'certificate chain end-int int issuer common name' 392 assert ( 393 chain[1]['subject']['common_name'] == 'int' 394 ), 'certificate chain end-int end subject common name' 395 assert ( 396 chain[1]['issuer']['common_name'] == 'root' 397 ), 'certificate chain end-int end issuer common name' 398 399 self.add_tls(cert='end-int') 400 401 assert ( 402 self.get_ssl()['status'] == 200 403 ), 'certificate chain intermediate server' 404 405 @pytest.mark.skip('not yet') 406 def test_tls_reconfigure(self): 407 self.load('empty') 408 409 assert self.get()['status'] == 200, 'init' 410 411 self.certificate() 412 413 (resp, sock) = self.get( 414 headers={'Host': 'localhost', 'Connection': 'keep-alive'}, 415 start=True, 416 read_timeout=1, 417 ) 418 419 assert resp['status'] == 200, 'initial status' 420 421 self.add_tls() 422 423 assert self.get(sock=sock)['status'] == 200, 'reconfigure status' 424 assert self.get_ssl()['status'] == 200, 'reconfigure tls status' 425 426 def test_tls_keepalive(self): 427 self.load('mirror') 428 429 assert self.get()['status'] == 200, 'init' 430 431 self.certificate() 432 433 self.add_tls(application='mirror') 434 435 (resp, sock) = self.post_ssl( 436 headers={ 437 'Host': 'localhost', 438 'Connection': 'keep-alive', 439 'Content-Type': 'text/html', 440 }, 441 start=True, 442 body='0123456789', 443 read_timeout=1, 444 ) 445 446 assert resp['body'] == '0123456789', 'keepalive 1' 447 448 resp = self.post_ssl( 449 headers={ 450 'Host': 'localhost', 451 'Connection': 'close', 452 'Content-Type': 'text/html', 453 }, 454 sock=sock, 455 body='0123456789', 456 ) 457 458 assert resp['body'] == '0123456789', 'keepalive 2' 459 460 @pytest.mark.skip('not yet') 461 def test_tls_keepalive_certificate_remove(self): 462 self.load('empty') 463 464 assert self.get()['status'] == 200, 'init' 465 466 self.certificate() 467 468 self.add_tls() 469 470 (resp, sock) = self.get_ssl( 471 headers={'Host': 'localhost', 'Connection': 'keep-alive'}, 472 start=True, 473 read_timeout=1, 474 ) 475 476 assert 'success' in self.conf( 477 {"pass": "applications/empty"}, 'listeners/*:7080' 478 ) 479 assert 'success' in self.conf_delete('/certificates/default') 480 481 try: 482 resp = self.get_ssl( 483 headers={'Host': 'localhost', 'Connection': 'close'}, sock=sock 484 ) 485 486 except KeyboardInterrupt: 487 raise 488 489 except: 490 resp = None 491 492 assert resp == None, 'keepalive remove certificate' 493 494 @pytest.mark.skip('not yet') 495 def test_tls_certificates_remove_all(self): 496 self.load('empty') 497 498 self.certificate() 499 500 assert 'success' in self.conf_delete( 501 '/certificates' 502 ), 'remove all certificates' 503 504 def test_tls_application_respawn(self, skip_alert): 505 self.load('mirror') 506 507 self.certificate() 508 509 assert 'success' in self.conf('1', 'applications/mirror/processes') 510 511 self.add_tls(application='mirror') 512 513 (_, sock) = self.post_ssl( 514 headers={ 515 'Host': 'localhost', 516 'Connection': 'keep-alive', 517 'Content-Type': 'text/html', 518 }, 519 start=True, 520 body='0123456789', 521 read_timeout=1, 522 ) 523 524 app_id = self.findall(r'(\d+)#\d+ "mirror" application started')[0] 525 526 subprocess.call(['kill', '-9', app_id]) 527 528 skip_alert(r'process %s exited on signal 9' % app_id) 529 530 self.wait_for_record( 531 re.compile( 532 r' (?!' + app_id + r'#)(\d+)#\d+ "mirror" application started' 533 ) 534 ) 535 536 resp = self.post_ssl( 537 headers={ 538 'Host': 'localhost', 539 'Connection': 'close', 540 'Content-Type': 'text/html', 541 }, 542 sock=sock, 543 body='0123456789', 544 ) 545 546 assert resp['status'] == 200, 'application respawn status' 547 assert resp['body'] == '0123456789', 'application respawn body' 548 549 def test_tls_url_scheme(self): 550 self.load('variables') 551 552 assert ( 553 self.post( 554 headers={ 555 'Host': 'localhost', 556 'Content-Type': 'text/html', 557 'Custom-Header': '', 558 'Connection': 'close', 559 } 560 )['headers']['Wsgi-Url-Scheme'] 561 == 'http' 562 ), 'url scheme http' 563 564 self.certificate() 565 566 self.add_tls(application='variables') 567 568 assert ( 569 self.post_ssl( 570 headers={ 571 'Host': 'localhost', 572 'Content-Type': 'text/html', 573 'Custom-Header': '', 574 'Connection': 'close', 575 } 576 )['headers']['Wsgi-Url-Scheme'] 577 == 'https' 578 ), 'url scheme https' 579 580 def test_tls_big_upload(self): 581 self.load('upload') 582 583 self.certificate() 584 585 self.add_tls(application='upload') 586 587 filename = 'test.txt' 588 data = '0123456789' * 9000 589 590 res = self.post_ssl( 591 body={ 592 'file': { 593 'filename': filename, 594 'type': 'text/plain', 595 'data': io.StringIO(data), 596 } 597 } 598 ) 599 assert res['status'] == 200, 'status ok' 600 assert res['body'] == filename + data 601