1import io 2import re 3import ssl 4import subprocess 5 6import pytest 7 8from conftest import option 9from conftest import skip_alert 10from unit.applications.tls import TestApplicationTLS 11 12 13class TestTLS(TestApplicationTLS): 14 prerequisites = {'modules': {'python': 'any', 'openssl': 'any'}} 15 16 def findall(self, pattern): 17 with open(option.temp_dir + '/unit.log', 'r', errors='ignore') as f: 18 return re.findall(pattern, f.read()) 19 20 def openssl_date_to_sec_epoch(self, date): 21 return self.date_to_sec_epoch(date, '%b %d %H:%M:%S %Y %Z') 22 23 def add_tls(self, application='empty', cert='default', port=7080): 24 self.conf( 25 { 26 "pass": "applications/" + application, 27 "tls": {"certificate": cert} 28 }, 29 'listeners/*:' + str(port), 30 ) 31 32 def remove_tls(self, application='empty', port=7080): 33 self.conf( 34 {"pass": "applications/" + application}, 'listeners/*:' + str(port) 35 ) 36 37 def test_tls_listener_option_add(self): 38 self.load('empty') 39 40 self.certificate() 41 42 self.add_tls() 43 44 assert self.get_ssl()['status'] == 200, 'add listener option' 45 46 def test_tls_listener_option_remove(self): 47 self.load('empty') 48 49 self.certificate() 50 51 self.add_tls() 52 53 self.get_ssl() 54 55 self.remove_tls() 56 57 assert self.get()['status'] == 200, 'remove listener option' 58 59 def test_tls_certificate_remove(self): 60 self.load('empty') 61 62 self.certificate() 63 64 assert 'success' in self.conf_delete( 65 '/certificates/default' 66 ), 'remove certificate' 67 68 def test_tls_certificate_remove_used(self): 69 self.load('empty') 70 71 self.certificate() 72 73 self.add_tls() 74 75 assert 'error' in self.conf_delete( 76 '/certificates/default' 77 ), 'remove certificate' 78 79 def test_tls_certificate_remove_nonexisting(self): 80 self.load('empty') 81 82 self.certificate() 83 84 self.add_tls() 85 86 assert 'error' in self.conf_delete( 87 '/certificates/blah' 88 ), 'remove nonexistings certificate' 89 90 @pytest.mark.skip('not yet') 91 def test_tls_certificate_update(self): 92 self.load('empty') 93 94 self.certificate() 95 96 self.add_tls() 97 98 cert_old = self.get_server_certificate() 99 100 self.certificate() 101 102 assert cert_old != self.get_server_certificate(), 'update certificate' 103 104 @pytest.mark.skip('not yet') 105 def test_tls_certificate_key_incorrect(self): 106 self.load('empty') 107 108 self.certificate('first', False) 109 self.certificate('second', False) 110 111 assert 'error' in self.certificate_load( 112 'first', 'second' 113 ), 'key incorrect' 114 115 def test_tls_certificate_change(self): 116 self.load('empty') 117 118 self.certificate() 119 self.certificate('new') 120 121 self.add_tls() 122 123 cert_old = self.get_server_certificate() 124 125 self.add_tls(cert='new') 126 127 assert cert_old != self.get_server_certificate(), 'change certificate' 128 129 def test_tls_certificate_key_rsa(self): 130 self.load('empty') 131 132 self.certificate() 133 134 assert ( 135 self.conf_get('/certificates/default/key') == 'RSA (2048 bits)' 136 ), 'certificate key rsa' 137 138 def test_tls_certificate_key_ec(self, temp_dir): 139 self.load('empty') 140 141 self.openssl_conf() 142 143 subprocess.call( 144 [ 145 'openssl', 146 'ecparam', 147 '-noout', 148 '-genkey', 149 '-out', 150 temp_dir + '/ec.key', 151 '-name', 152 'prime256v1', 153 ], 154 stderr=subprocess.STDOUT, 155 ) 156 157 subprocess.call( 158 [ 159 'openssl', 160 'req', 161 '-x509', 162 '-new', 163 '-subj', 164 '/CN=ec/', 165 '-config', 166 temp_dir + '/openssl.conf', 167 '-key', 168 temp_dir + '/ec.key', 169 '-out', 170 temp_dir + '/ec.crt', 171 ], 172 stderr=subprocess.STDOUT, 173 ) 174 175 self.certificate_load('ec') 176 177 assert ( 178 self.conf_get('/certificates/ec/key') == 'ECDH' 179 ), 'certificate key ec' 180 181 def test_tls_certificate_chain_options(self): 182 self.load('empty') 183 184 self.certificate() 185 186 chain = self.conf_get('/certificates/default/chain') 187 188 assert len(chain) == 1, 'certificate chain length' 189 190 cert = chain[0] 191 192 assert ( 193 cert['subject']['common_name'] == 'default' 194 ), 'certificate subject common name' 195 assert ( 196 cert['issuer']['common_name'] == 'default' 197 ), 'certificate issuer common name' 198 199 assert ( 200 abs( 201 self.sec_epoch() 202 - self.openssl_date_to_sec_epoch(cert['validity']['since']) 203 ) 204 < 5 205 ), 'certificate validity since' 206 assert ( 207 self.openssl_date_to_sec_epoch(cert['validity']['until']) 208 - self.openssl_date_to_sec_epoch(cert['validity']['since']) 209 == 2592000 210 ), 'certificate validity until' 211 212 def test_tls_certificate_chain(self, temp_dir): 213 self.load('empty') 214 215 self.certificate('root', False) 216 217 subprocess.call( 218 [ 219 'openssl', 220 'req', 221 '-new', 222 '-subj', 223 '/CN=int/', 224 '-config', 225 temp_dir + '/openssl.conf', 226 '-out', 227 temp_dir + '/int.csr', 228 '-keyout', 229 temp_dir + '/int.key', 230 ], 231 stderr=subprocess.STDOUT, 232 ) 233 234 subprocess.call( 235 [ 236 'openssl', 237 'req', 238 '-new', 239 '-subj', 240 '/CN=end/', 241 '-config', 242 temp_dir + '/openssl.conf', 243 '-out', 244 temp_dir + '/end.csr', 245 '-keyout', 246 temp_dir + '/end.key', 247 ], 248 stderr=subprocess.STDOUT, 249 ) 250 251 with open(temp_dir + '/ca.conf', 'w') as f: 252 f.write( 253 """[ ca ] 254default_ca = myca 255 256[ myca ] 257new_certs_dir = %(dir)s 258database = %(database)s 259default_md = sha256 260policy = myca_policy 261serial = %(certserial)s 262default_days = 1 263x509_extensions = myca_extensions 264 265[ myca_policy ] 266commonName = supplied 267 268[ myca_extensions ] 269basicConstraints = critical,CA:TRUE""" 270 % { 271 'dir': temp_dir, 272 'database': temp_dir + '/certindex', 273 'certserial': temp_dir + '/certserial', 274 } 275 ) 276 277 with open(temp_dir + '/certserial', 'w') as f: 278 f.write('1000') 279 280 with open(temp_dir + '/certindex', 'w') as f: 281 f.write('') 282 283 subprocess.call( 284 [ 285 'openssl', 286 'ca', 287 '-batch', 288 '-subj', 289 '/CN=int/', 290 '-config', 291 temp_dir + '/ca.conf', 292 '-keyfile', 293 temp_dir + '/root.key', 294 '-cert', 295 temp_dir + '/root.crt', 296 '-in', 297 temp_dir + '/int.csr', 298 '-out', 299 temp_dir + '/int.crt', 300 ], 301 stderr=subprocess.STDOUT, 302 ) 303 304 subprocess.call( 305 [ 306 'openssl', 307 'ca', 308 '-batch', 309 '-subj', 310 '/CN=end/', 311 '-config', 312 temp_dir + '/ca.conf', 313 '-keyfile', 314 temp_dir + '/int.key', 315 '-cert', 316 temp_dir + '/int.crt', 317 '-in', 318 temp_dir + '/end.csr', 319 '-out', 320 temp_dir + '/end.crt', 321 ], 322 stderr=subprocess.STDOUT, 323 ) 324 325 crt_path = temp_dir + '/end-int.crt' 326 end_path = temp_dir + '/end.crt' 327 int_path = temp_dir + '/int.crt' 328 329 with open(crt_path, 'wb') as crt, open(end_path, 'rb') as end, open( 330 int_path, 'rb' 331 ) as int: 332 crt.write(end.read() + int.read()) 333 334 self.context = ssl.create_default_context() 335 self.context.check_hostname = False 336 self.context.verify_mode = ssl.CERT_REQUIRED 337 self.context.load_verify_locations(temp_dir + '/root.crt') 338 339 # incomplete chain 340 341 assert 'success' in self.certificate_load( 342 'end', 'end' 343 ), 'certificate chain end upload' 344 345 chain = self.conf_get('/certificates/end/chain') 346 assert len(chain) == 1, 'certificate chain end length' 347 assert ( 348 chain[0]['subject']['common_name'] == 'end' 349 ), 'certificate chain end subject common name' 350 assert ( 351 chain[0]['issuer']['common_name'] == 'int' 352 ), 'certificate chain end issuer common name' 353 354 self.add_tls(cert='end') 355 356 try: 357 resp = self.get_ssl() 358 except ssl.SSLError: 359 resp = None 360 361 assert resp == None, 'certificate chain incomplete chain' 362 363 # intermediate 364 365 assert 'success' in self.certificate_load( 366 'int', 'int' 367 ), 'certificate chain int upload' 368 369 chain = self.conf_get('/certificates/int/chain') 370 assert len(chain) == 1, 'certificate chain int length' 371 assert ( 372 chain[0]['subject']['common_name'] == 'int' 373 ), 'certificate chain int subject common name' 374 assert ( 375 chain[0]['issuer']['common_name'] == 'root' 376 ), 'certificate chain int issuer common name' 377 378 self.add_tls(cert='int') 379 380 assert ( 381 self.get_ssl()['status'] == 200 382 ), 'certificate chain intermediate' 383 384 # intermediate server 385 386 assert 'success' in self.certificate_load( 387 'end-int', 'end' 388 ), 'certificate chain end-int upload' 389 390 chain = self.conf_get('/certificates/end-int/chain') 391 assert len(chain) == 2, 'certificate chain end-int length' 392 assert ( 393 chain[0]['subject']['common_name'] == 'end' 394 ), 'certificate chain end-int int subject common name' 395 assert ( 396 chain[0]['issuer']['common_name'] == 'int' 397 ), 'certificate chain end-int int issuer common name' 398 assert ( 399 chain[1]['subject']['common_name'] == 'int' 400 ), 'certificate chain end-int end subject common name' 401 assert ( 402 chain[1]['issuer']['common_name'] == 'root' 403 ), 'certificate chain end-int end issuer common name' 404 405 self.add_tls(cert='end-int') 406 407 assert ( 408 self.get_ssl()['status'] == 200 409 ), 'certificate chain intermediate server' 410 411 @pytest.mark.skip('not yet') 412 def test_tls_reconfigure(self): 413 self.load('empty') 414 415 assert self.get()['status'] == 200, 'init' 416 417 self.certificate() 418 419 (resp, sock) = self.get( 420 headers={'Host': 'localhost', 'Connection': 'keep-alive'}, 421 start=True, 422 read_timeout=1, 423 ) 424 425 assert resp['status'] == 200, 'initial status' 426 427 self.add_tls() 428 429 assert self.get(sock=sock)['status'] == 200, 'reconfigure status' 430 assert self.get_ssl()['status'] == 200, 'reconfigure tls status' 431 432 def test_tls_keepalive(self): 433 self.load('mirror') 434 435 assert self.get()['status'] == 200, 'init' 436 437 self.certificate() 438 439 self.add_tls(application='mirror') 440 441 (resp, sock) = self.post_ssl( 442 headers={ 443 'Host': 'localhost', 444 'Connection': 'keep-alive', 445 'Content-Type': 'text/html', 446 }, 447 start=True, 448 body='0123456789', 449 read_timeout=1, 450 ) 451 452 assert resp['body'] == '0123456789', 'keepalive 1' 453 454 resp = self.post_ssl( 455 headers={ 456 'Host': 'localhost', 457 'Connection': 'close', 458 'Content-Type': 'text/html', 459 }, 460 sock=sock, 461 body='0123456789', 462 ) 463 464 assert resp['body'] == '0123456789', 'keepalive 2' 465 466 @pytest.mark.skip('not yet') 467 def test_tls_keepalive_certificate_remove(self): 468 self.load('empty') 469 470 assert self.get()['status'] == 200, 'init' 471 472 self.certificate() 473 474 self.add_tls() 475 476 (resp, sock) = self.get_ssl( 477 headers={'Host': 'localhost', 'Connection': 'keep-alive'}, 478 start=True, 479 read_timeout=1, 480 ) 481 482 self.conf({"pass": "applications/empty"}, 'listeners/*:7080') 483 self.conf_delete('/certificates/default') 484 485 try: 486 resp = self.get_ssl( 487 headers={'Host': 'localhost', 'Connection': 'close'}, sock=sock 488 ) 489 except: 490 resp = None 491 492 assert resp == None, 'keepalive remove certificate' 493 494 @pytest.mark.skip('not yet') 495 def test_tls_certificates_remove_all(self): 496 self.load('empty') 497 498 self.certificate() 499 500 assert 'success' in self.conf_delete( 501 '/certificates' 502 ), 'remove all certificates' 503 504 def test_tls_application_respawn(self): 505 self.load('mirror') 506 507 self.certificate() 508 509 self.conf('1', 'applications/mirror/processes') 510 511 self.add_tls(application='mirror') 512 513 (_, sock) = self.post_ssl( 514 headers={ 515 'Host': 'localhost', 516 'Connection': 'keep-alive', 517 'Content-Type': 'text/html', 518 }, 519 start=True, 520 body='0123456789', 521 read_timeout=1, 522 ) 523 524 app_id = self.findall(r'(\d+)#\d+ "mirror" application started')[0] 525 526 subprocess.call(['kill', '-9', app_id]) 527 528 skip_alert(r'process %s exited on signal 9' % app_id) 529 530 self.wait_for_record( 531 re.compile( 532 r' (?!' + app_id + r'#)(\d+)#\d+ "mirror" application started' 533 ) 534 ) 535 536 resp = self.post_ssl( 537 headers={ 538 'Host': 'localhost', 539 'Connection': 'close', 540 'Content-Type': 'text/html', 541 }, 542 sock=sock, 543 body='0123456789', 544 ) 545 546 assert resp['status'] == 200, 'application respawn status' 547 assert resp['body'] == '0123456789', 'application respawn body' 548 549 def test_tls_url_scheme(self): 550 self.load('variables') 551 552 assert ( 553 self.post( 554 headers={ 555 'Host': 'localhost', 556 'Content-Type': 'text/html', 557 'Custom-Header': '', 558 'Connection': 'close', 559 } 560 )['headers']['Wsgi-Url-Scheme'] 561 == 'http' 562 ), 'url scheme http' 563 564 self.certificate() 565 566 self.add_tls(application='variables') 567 568 assert ( 569 self.post_ssl( 570 headers={ 571 'Host': 'localhost', 572 'Content-Type': 'text/html', 573 'Custom-Header': '', 574 'Connection': 'close', 575 } 576 )['headers']['Wsgi-Url-Scheme'] 577 == 'https' 578 ), 'url scheme https' 579 580 def test_tls_big_upload(self): 581 self.load('upload') 582 583 self.certificate() 584 585 self.add_tls(application='upload') 586 587 filename = 'test.txt' 588 data = '0123456789' * 9000 589 590 res = self.post_ssl( 591 body={ 592 'file': { 593 'filename': filename, 594 'type': 'text/plain', 595 'data': io.StringIO(data), 596 } 597 } 598 ) 599 assert res['status'] == 200, 'status ok' 600 assert res['body'] == filename + data 601