11 12 13class TestTLS(TestApplicationTLS): 14 prerequisites = {'modules': {'python': 'any', 'openssl': 'any'}} 15 16 def findall(self, pattern): 17 with open(option.temp_dir + '/unit.log', 'r', errors='ignore') as f: 18 return re.findall(pattern, f.read()) 19 20 def openssl_date_to_sec_epoch(self, date): 21 return self.date_to_sec_epoch(date, '%b %d %H:%M:%S %Y %Z') 22 23 def add_tls(self, application='empty', cert='default', port=7080): 24 self.conf( 25 { 26 "pass": "applications/" + application, 27 "tls": {"certificate": cert} 28 }, 29 'listeners/*:' + str(port), 30 ) 31 32 def remove_tls(self, application='empty', port=7080): 33 self.conf( 34 {"pass": "applications/" + application}, 'listeners/*:' + str(port) 35 ) 36 37 def test_tls_listener_option_add(self): 38 self.load('empty') 39 40 self.certificate() 41 42 self.add_tls() 43 44 assert self.get_ssl()['status'] == 200, 'add listener option' 45 46 def test_tls_listener_option_remove(self): 47 self.load('empty') 48 49 self.certificate() 50 51 self.add_tls() 52 53 self.get_ssl() 54 55 self.remove_tls() 56 57 assert self.get()['status'] == 200, 'remove listener option' 58 59 def test_tls_certificate_remove(self): 60 self.load('empty') 61 62 self.certificate() 63 64 assert 'success' in self.conf_delete( 65 '/certificates/default' 66 ), 'remove certificate' 67 68 def test_tls_certificate_remove_used(self): 69 self.load('empty') 70 71 self.certificate() 72 73 self.add_tls() 74 75 assert 'error' in self.conf_delete( 76 '/certificates/default' 77 ), 'remove certificate' 78 79 def test_tls_certificate_remove_nonexisting(self): 80 self.load('empty') 81 82 self.certificate() 83 84 self.add_tls() 85 86 assert 'error' in self.conf_delete( 87 '/certificates/blah' 88 ), 'remove nonexistings certificate' 89 90 @pytest.mark.skip('not yet') 91 def test_tls_certificate_update(self): 92 self.load('empty') 93 94 self.certificate() 95 96 self.add_tls() 97 98 cert_old = self.get_server_certificate() 99 100 self.certificate() 101 102 assert cert_old != self.get_server_certificate(), 'update certificate' 103 104 @pytest.mark.skip('not yet') 105 def test_tls_certificate_key_incorrect(self): 106 self.load('empty') 107 108 self.certificate('first', False) 109 self.certificate('second', False) 110 111 assert 'error' in self.certificate_load( 112 'first', 'second' 113 ), 'key incorrect' 114 115 def test_tls_certificate_change(self): 116 self.load('empty') 117 118 self.certificate() 119 self.certificate('new') 120 121 self.add_tls() 122 123 cert_old = self.get_server_certificate() 124 125 self.add_tls(cert='new') 126 127 assert cert_old != self.get_server_certificate(), 'change certificate' 128 129 def test_tls_certificate_key_rsa(self): 130 self.load('empty') 131 132 self.certificate() 133 134 assert ( 135 self.conf_get('/certificates/default/key') == 'RSA (2048 bits)' 136 ), 'certificate key rsa' 137 138 def test_tls_certificate_key_ec(self, temp_dir): 139 self.load('empty') 140 141 self.openssl_conf() 142 143 subprocess.call( 144 [ 145 'openssl', 146 'ecparam', 147 '-noout', 148 '-genkey', 149 '-out', 150 temp_dir + '/ec.key', 151 '-name', 152 'prime256v1', 153 ], 154 stderr=subprocess.STDOUT, 155 ) 156 157 subprocess.call( 158 [ 159 'openssl', 160 'req', 161 '-x509', 162 '-new', 163 '-subj', 164 '/CN=ec/', 165 '-config', 166 temp_dir + '/openssl.conf', 167 '-key', 168 temp_dir + '/ec.key', 169 '-out', 170 temp_dir + '/ec.crt', 171 ], 172 stderr=subprocess.STDOUT, 173 ) 174 175 self.certificate_load('ec') 176 177 assert ( 178 self.conf_get('/certificates/ec/key') == 'ECDH' 179 ), 'certificate key ec' 180 181 def test_tls_certificate_chain_options(self): 182 self.load('empty') 183 184 self.certificate() 185 186 chain = self.conf_get('/certificates/default/chain') 187 188 assert len(chain) == 1, 'certificate chain length' 189 190 cert = chain[0] 191 192 assert ( 193 cert['subject']['common_name'] == 'default' 194 ), 'certificate subject common name' 195 assert ( 196 cert['issuer']['common_name'] == 'default' 197 ), 'certificate issuer common name' 198 199 assert ( 200 abs( 201 self.sec_epoch() 202 - self.openssl_date_to_sec_epoch(cert['validity']['since']) 203 ) 204 < 5 205 ), 'certificate validity since' 206 assert ( 207 self.openssl_date_to_sec_epoch(cert['validity']['until']) 208 - self.openssl_date_to_sec_epoch(cert['validity']['since']) 209 == 2592000 210 ), 'certificate validity until' 211 212 def test_tls_certificate_chain(self, temp_dir): 213 self.load('empty') 214 215 self.certificate('root', False) 216 217 subprocess.call( 218 [ 219 'openssl', 220 'req', 221 '-new', 222 '-subj', 223 '/CN=int/', 224 '-config', 225 temp_dir + '/openssl.conf', 226 '-out', 227 temp_dir + '/int.csr', 228 '-keyout', 229 temp_dir + '/int.key', 230 ], 231 stderr=subprocess.STDOUT, 232 ) 233 234 subprocess.call( 235 [ 236 'openssl', 237 'req', 238 '-new', 239 '-subj', 240 '/CN=end/', 241 '-config', 242 temp_dir + '/openssl.conf', 243 '-out', 244 temp_dir + '/end.csr', 245 '-keyout', 246 temp_dir + '/end.key', 247 ], 248 stderr=subprocess.STDOUT, 249 ) 250 251 with open(temp_dir + '/ca.conf', 'w') as f: 252 f.write( 253 """[ ca ] 254default_ca = myca 255 256[ myca ] 257new_certs_dir = %(dir)s 258database = %(database)s 259default_md = sha256 260policy = myca_policy 261serial = %(certserial)s 262default_days = 1 263x509_extensions = myca_extensions 264 265[ myca_policy ] 266commonName = supplied 267 268[ myca_extensions ] 269basicConstraints = critical,CA:TRUE""" 270 % { 271 'dir': temp_dir, 272 'database': temp_dir + '/certindex', 273 'certserial': temp_dir + '/certserial', 274 } 275 ) 276 277 with open(temp_dir + '/certserial', 'w') as f: 278 f.write('1000') 279 280 with open(temp_dir + '/certindex', 'w') as f: 281 f.write('') 282 283 subprocess.call( 284 [ 285 'openssl', 286 'ca', 287 '-batch', 288 '-subj', 289 '/CN=int/', 290 '-config', 291 temp_dir + '/ca.conf', 292 '-keyfile', 293 temp_dir + '/root.key', 294 '-cert', 295 temp_dir + '/root.crt', 296 '-in', 297 temp_dir + '/int.csr', 298 '-out', 299 temp_dir + '/int.crt', 300 ], 301 stderr=subprocess.STDOUT, 302 ) 303 304 subprocess.call( 305 [ 306 'openssl', 307 'ca', 308 '-batch', 309 '-subj', 310 '/CN=end/', 311 '-config', 312 temp_dir + '/ca.conf', 313 '-keyfile', 314 temp_dir + '/int.key', 315 '-cert', 316 temp_dir + '/int.crt', 317 '-in', 318 temp_dir + '/end.csr', 319 '-out', 320 temp_dir + '/end.crt', 321 ], 322 stderr=subprocess.STDOUT, 323 ) 324 325 crt_path = temp_dir + '/end-int.crt' 326 end_path = temp_dir + '/end.crt' 327 int_path = temp_dir + '/int.crt' 328 329 with open(crt_path, 'wb') as crt, open(end_path, 'rb') as end, open( 330 int_path, 'rb' 331 ) as int: 332 crt.write(end.read() + int.read()) 333 334 self.context = ssl.create_default_context() 335 self.context.check_hostname = False 336 self.context.verify_mode = ssl.CERT_REQUIRED 337 self.context.load_verify_locations(temp_dir + '/root.crt') 338 339 # incomplete chain 340 341 assert 'success' in self.certificate_load( 342 'end', 'end' 343 ), 'certificate chain end upload' 344 345 chain = self.conf_get('/certificates/end/chain') 346 assert len(chain) == 1, 'certificate chain end length' 347 assert ( 348 chain[0]['subject']['common_name'] == 'end' 349 ), 'certificate chain end subject common name' 350 assert ( 351 chain[0]['issuer']['common_name'] == 'int' 352 ), 'certificate chain end issuer common name' 353 354 self.add_tls(cert='end') 355 356 try: 357 resp = self.get_ssl() 358 except ssl.SSLError: 359 resp = None 360 361 assert resp == None, 'certificate chain incomplete chain' 362 363 # intermediate 364 365 assert 'success' in self.certificate_load( 366 'int', 'int' 367 ), 'certificate chain int upload' 368 369 chain = self.conf_get('/certificates/int/chain') 370 assert len(chain) == 1, 'certificate chain int length' 371 assert ( 372 chain[0]['subject']['common_name'] == 'int' 373 ), 'certificate chain int subject common name' 374 assert ( 375 chain[0]['issuer']['common_name'] == 'root' 376 ), 'certificate chain int issuer common name' 377 378 self.add_tls(cert='int') 379 380 assert ( 381 self.get_ssl()['status'] == 200 382 ), 'certificate chain intermediate' 383 384 # intermediate server 385 386 assert 'success' in self.certificate_load( 387 'end-int', 'end' 388 ), 'certificate chain end-int upload' 389 390 chain = self.conf_get('/certificates/end-int/chain') 391 assert len(chain) == 2, 'certificate chain end-int length' 392 assert ( 393 chain[0]['subject']['common_name'] == 'end' 394 ), 'certificate chain end-int int subject common name' 395 assert ( 396 chain[0]['issuer']['common_name'] == 'int' 397 ), 'certificate chain end-int int issuer common name' 398 assert ( 399 chain[1]['subject']['common_name'] == 'int' 400 ), 'certificate chain end-int end subject common name' 401 assert ( 402 chain[1]['issuer']['common_name'] == 'root' 403 ), 'certificate chain end-int end issuer common name' 404 405 self.add_tls(cert='end-int') 406 407 assert ( 408 self.get_ssl()['status'] == 200 409 ), 'certificate chain intermediate server' 410 411 @pytest.mark.skip('not yet') 412 def test_tls_reconfigure(self): 413 self.load('empty') 414 415 assert self.get()['status'] == 200, 'init' 416 417 self.certificate() 418 419 (resp, sock) = self.get( 420 headers={'Host': 'localhost', 'Connection': 'keep-alive'}, 421 start=True, 422 read_timeout=1, 423 ) 424 425 assert resp['status'] == 200, 'initial status' 426 427 self.add_tls() 428 429 assert self.get(sock=sock)['status'] == 200, 'reconfigure status' 430 assert self.get_ssl()['status'] == 200, 'reconfigure tls status' 431 432 def test_tls_keepalive(self): 433 self.load('mirror') 434 435 assert self.get()['status'] == 200, 'init' 436 437 self.certificate() 438 439 self.add_tls(application='mirror') 440 441 (resp, sock) = self.post_ssl( 442 headers={ 443 'Host': 'localhost', 444 'Connection': 'keep-alive', 445 'Content-Type': 'text/html', 446 }, 447 start=True, 448 body='0123456789', 449 read_timeout=1, 450 ) 451 452 assert resp['body'] == '0123456789', 'keepalive 1' 453 454 resp = self.post_ssl( 455 headers={ 456 'Host': 'localhost', 457 'Connection': 'close', 458 'Content-Type': 'text/html', 459 }, 460 sock=sock, 461 body='0123456789', 462 ) 463 464 assert resp['body'] == '0123456789', 'keepalive 2' 465 466 @pytest.mark.skip('not yet') 467 def test_tls_keepalive_certificate_remove(self): 468 self.load('empty') 469 470 assert self.get()['status'] == 200, 'init' 471 472 self.certificate() 473 474 self.add_tls() 475 476 (resp, sock) = self.get_ssl( 477 headers={'Host': 'localhost', 'Connection': 'keep-alive'}, 478 start=True, 479 read_timeout=1, 480 ) 481 482 self.conf({"pass": "applications/empty"}, 'listeners/*:7080') 483 self.conf_delete('/certificates/default') 484 485 try: 486 resp = self.get_ssl( 487 headers={'Host': 'localhost', 'Connection': 'close'}, sock=sock 488 ) 489 490 except KeyboardInterrupt: 491 raise 492 493 except: 494 resp = None 495 496 assert resp == None, 'keepalive remove certificate' 497 498 @pytest.mark.skip('not yet') 499 def test_tls_certificates_remove_all(self): 500 self.load('empty') 501 502 self.certificate() 503 504 assert 'success' in self.conf_delete( 505 '/certificates' 506 ), 'remove all certificates' 507 508 def test_tls_application_respawn(self): 509 self.load('mirror') 510 511 self.certificate() 512 513 self.conf('1', 'applications/mirror/processes') 514 515 self.add_tls(application='mirror') 516 517 (_, sock) = self.post_ssl( 518 headers={ 519 'Host': 'localhost', 520 'Connection': 'keep-alive', 521 'Content-Type': 'text/html', 522 }, 523 start=True, 524 body='0123456789', 525 read_timeout=1, 526 ) 527 528 app_id = self.findall(r'(\d+)#\d+ "mirror" application started')[0] 529 530 subprocess.call(['kill', '-9', app_id]) 531 532 skip_alert(r'process %s exited on signal 9' % app_id) 533 534 self.wait_for_record( 535 re.compile( 536 r' (?!' + app_id + r'#)(\d+)#\d+ "mirror" application started' 537 ) 538 ) 539 540 resp = self.post_ssl( 541 headers={ 542 'Host': 'localhost', 543 'Connection': 'close', 544 'Content-Type': 'text/html', 545 }, 546 sock=sock, 547 body='0123456789', 548 ) 549 550 assert resp['status'] == 200, 'application respawn status' 551 assert resp['body'] == '0123456789', 'application respawn body' 552 553 def test_tls_url_scheme(self): 554 self.load('variables') 555 556 assert ( 557 self.post( 558 headers={ 559 'Host': 'localhost', 560 'Content-Type': 'text/html', 561 'Custom-Header': '', 562 'Connection': 'close', 563 } 564 )['headers']['Wsgi-Url-Scheme'] 565 == 'http' 566 ), 'url scheme http' 567 568 self.certificate() 569 570 self.add_tls(application='variables') 571 572 assert ( 573 self.post_ssl( 574 headers={ 575 'Host': 'localhost', 576 'Content-Type': 'text/html', 577 'Custom-Header': '', 578 'Connection': 'close', 579 } 580 )['headers']['Wsgi-Url-Scheme'] 581 == 'https' 582 ), 'url scheme https' 583 584 def test_tls_big_upload(self): 585 self.load('upload') 586 587 self.certificate() 588 589 self.add_tls(application='upload') 590 591 filename = 'test.txt' 592 data = '0123456789' * 9000 593 594 res = self.post_ssl( 595 body={ 596 'file': { 597 'filename': filename, 598 'type': 'text/plain', 599 'data': io.StringIO(data), 600 } 601 } 602 ) 603 assert res['status'] == 200, 'status ok' 604 assert res['body'] == filename + data
| 11 12 13class TestTLS(TestApplicationTLS): 14 prerequisites = {'modules': {'python': 'any', 'openssl': 'any'}} 15 16 def findall(self, pattern): 17 with open(option.temp_dir + '/unit.log', 'r', errors='ignore') as f: 18 return re.findall(pattern, f.read()) 19 20 def openssl_date_to_sec_epoch(self, date): 21 return self.date_to_sec_epoch(date, '%b %d %H:%M:%S %Y %Z') 22 23 def add_tls(self, application='empty', cert='default', port=7080): 24 self.conf( 25 { 26 "pass": "applications/" + application, 27 "tls": {"certificate": cert} 28 }, 29 'listeners/*:' + str(port), 30 ) 31 32 def remove_tls(self, application='empty', port=7080): 33 self.conf( 34 {"pass": "applications/" + application}, 'listeners/*:' + str(port) 35 ) 36 37 def test_tls_listener_option_add(self): 38 self.load('empty') 39 40 self.certificate() 41 42 self.add_tls() 43 44 assert self.get_ssl()['status'] == 200, 'add listener option' 45 46 def test_tls_listener_option_remove(self): 47 self.load('empty') 48 49 self.certificate() 50 51 self.add_tls() 52 53 self.get_ssl() 54 55 self.remove_tls() 56 57 assert self.get()['status'] == 200, 'remove listener option' 58 59 def test_tls_certificate_remove(self): 60 self.load('empty') 61 62 self.certificate() 63 64 assert 'success' in self.conf_delete( 65 '/certificates/default' 66 ), 'remove certificate' 67 68 def test_tls_certificate_remove_used(self): 69 self.load('empty') 70 71 self.certificate() 72 73 self.add_tls() 74 75 assert 'error' in self.conf_delete( 76 '/certificates/default' 77 ), 'remove certificate' 78 79 def test_tls_certificate_remove_nonexisting(self): 80 self.load('empty') 81 82 self.certificate() 83 84 self.add_tls() 85 86 assert 'error' in self.conf_delete( 87 '/certificates/blah' 88 ), 'remove nonexistings certificate' 89 90 @pytest.mark.skip('not yet') 91 def test_tls_certificate_update(self): 92 self.load('empty') 93 94 self.certificate() 95 96 self.add_tls() 97 98 cert_old = self.get_server_certificate() 99 100 self.certificate() 101 102 assert cert_old != self.get_server_certificate(), 'update certificate' 103 104 @pytest.mark.skip('not yet') 105 def test_tls_certificate_key_incorrect(self): 106 self.load('empty') 107 108 self.certificate('first', False) 109 self.certificate('second', False) 110 111 assert 'error' in self.certificate_load( 112 'first', 'second' 113 ), 'key incorrect' 114 115 def test_tls_certificate_change(self): 116 self.load('empty') 117 118 self.certificate() 119 self.certificate('new') 120 121 self.add_tls() 122 123 cert_old = self.get_server_certificate() 124 125 self.add_tls(cert='new') 126 127 assert cert_old != self.get_server_certificate(), 'change certificate' 128 129 def test_tls_certificate_key_rsa(self): 130 self.load('empty') 131 132 self.certificate() 133 134 assert ( 135 self.conf_get('/certificates/default/key') == 'RSA (2048 bits)' 136 ), 'certificate key rsa' 137 138 def test_tls_certificate_key_ec(self, temp_dir): 139 self.load('empty') 140 141 self.openssl_conf() 142 143 subprocess.call( 144 [ 145 'openssl', 146 'ecparam', 147 '-noout', 148 '-genkey', 149 '-out', 150 temp_dir + '/ec.key', 151 '-name', 152 'prime256v1', 153 ], 154 stderr=subprocess.STDOUT, 155 ) 156 157 subprocess.call( 158 [ 159 'openssl', 160 'req', 161 '-x509', 162 '-new', 163 '-subj', 164 '/CN=ec/', 165 '-config', 166 temp_dir + '/openssl.conf', 167 '-key', 168 temp_dir + '/ec.key', 169 '-out', 170 temp_dir + '/ec.crt', 171 ], 172 stderr=subprocess.STDOUT, 173 ) 174 175 self.certificate_load('ec') 176 177 assert ( 178 self.conf_get('/certificates/ec/key') == 'ECDH' 179 ), 'certificate key ec' 180 181 def test_tls_certificate_chain_options(self): 182 self.load('empty') 183 184 self.certificate() 185 186 chain = self.conf_get('/certificates/default/chain') 187 188 assert len(chain) == 1, 'certificate chain length' 189 190 cert = chain[0] 191 192 assert ( 193 cert['subject']['common_name'] == 'default' 194 ), 'certificate subject common name' 195 assert ( 196 cert['issuer']['common_name'] == 'default' 197 ), 'certificate issuer common name' 198 199 assert ( 200 abs( 201 self.sec_epoch() 202 - self.openssl_date_to_sec_epoch(cert['validity']['since']) 203 ) 204 < 5 205 ), 'certificate validity since' 206 assert ( 207 self.openssl_date_to_sec_epoch(cert['validity']['until']) 208 - self.openssl_date_to_sec_epoch(cert['validity']['since']) 209 == 2592000 210 ), 'certificate validity until' 211 212 def test_tls_certificate_chain(self, temp_dir): 213 self.load('empty') 214 215 self.certificate('root', False) 216 217 subprocess.call( 218 [ 219 'openssl', 220 'req', 221 '-new', 222 '-subj', 223 '/CN=int/', 224 '-config', 225 temp_dir + '/openssl.conf', 226 '-out', 227 temp_dir + '/int.csr', 228 '-keyout', 229 temp_dir + '/int.key', 230 ], 231 stderr=subprocess.STDOUT, 232 ) 233 234 subprocess.call( 235 [ 236 'openssl', 237 'req', 238 '-new', 239 '-subj', 240 '/CN=end/', 241 '-config', 242 temp_dir + '/openssl.conf', 243 '-out', 244 temp_dir + '/end.csr', 245 '-keyout', 246 temp_dir + '/end.key', 247 ], 248 stderr=subprocess.STDOUT, 249 ) 250 251 with open(temp_dir + '/ca.conf', 'w') as f: 252 f.write( 253 """[ ca ] 254default_ca = myca 255 256[ myca ] 257new_certs_dir = %(dir)s 258database = %(database)s 259default_md = sha256 260policy = myca_policy 261serial = %(certserial)s 262default_days = 1 263x509_extensions = myca_extensions 264 265[ myca_policy ] 266commonName = supplied 267 268[ myca_extensions ] 269basicConstraints = critical,CA:TRUE""" 270 % { 271 'dir': temp_dir, 272 'database': temp_dir + '/certindex', 273 'certserial': temp_dir + '/certserial', 274 } 275 ) 276 277 with open(temp_dir + '/certserial', 'w') as f: 278 f.write('1000') 279 280 with open(temp_dir + '/certindex', 'w') as f: 281 f.write('') 282 283 subprocess.call( 284 [ 285 'openssl', 286 'ca', 287 '-batch', 288 '-subj', 289 '/CN=int/', 290 '-config', 291 temp_dir + '/ca.conf', 292 '-keyfile', 293 temp_dir + '/root.key', 294 '-cert', 295 temp_dir + '/root.crt', 296 '-in', 297 temp_dir + '/int.csr', 298 '-out', 299 temp_dir + '/int.crt', 300 ], 301 stderr=subprocess.STDOUT, 302 ) 303 304 subprocess.call( 305 [ 306 'openssl', 307 'ca', 308 '-batch', 309 '-subj', 310 '/CN=end/', 311 '-config', 312 temp_dir + '/ca.conf', 313 '-keyfile', 314 temp_dir + '/int.key', 315 '-cert', 316 temp_dir + '/int.crt', 317 '-in', 318 temp_dir + '/end.csr', 319 '-out', 320 temp_dir + '/end.crt', 321 ], 322 stderr=subprocess.STDOUT, 323 ) 324 325 crt_path = temp_dir + '/end-int.crt' 326 end_path = temp_dir + '/end.crt' 327 int_path = temp_dir + '/int.crt' 328 329 with open(crt_path, 'wb') as crt, open(end_path, 'rb') as end, open( 330 int_path, 'rb' 331 ) as int: 332 crt.write(end.read() + int.read()) 333 334 self.context = ssl.create_default_context() 335 self.context.check_hostname = False 336 self.context.verify_mode = ssl.CERT_REQUIRED 337 self.context.load_verify_locations(temp_dir + '/root.crt') 338 339 # incomplete chain 340 341 assert 'success' in self.certificate_load( 342 'end', 'end' 343 ), 'certificate chain end upload' 344 345 chain = self.conf_get('/certificates/end/chain') 346 assert len(chain) == 1, 'certificate chain end length' 347 assert ( 348 chain[0]['subject']['common_name'] == 'end' 349 ), 'certificate chain end subject common name' 350 assert ( 351 chain[0]['issuer']['common_name'] == 'int' 352 ), 'certificate chain end issuer common name' 353 354 self.add_tls(cert='end') 355 356 try: 357 resp = self.get_ssl() 358 except ssl.SSLError: 359 resp = None 360 361 assert resp == None, 'certificate chain incomplete chain' 362 363 # intermediate 364 365 assert 'success' in self.certificate_load( 366 'int', 'int' 367 ), 'certificate chain int upload' 368 369 chain = self.conf_get('/certificates/int/chain') 370 assert len(chain) == 1, 'certificate chain int length' 371 assert ( 372 chain[0]['subject']['common_name'] == 'int' 373 ), 'certificate chain int subject common name' 374 assert ( 375 chain[0]['issuer']['common_name'] == 'root' 376 ), 'certificate chain int issuer common name' 377 378 self.add_tls(cert='int') 379 380 assert ( 381 self.get_ssl()['status'] == 200 382 ), 'certificate chain intermediate' 383 384 # intermediate server 385 386 assert 'success' in self.certificate_load( 387 'end-int', 'end' 388 ), 'certificate chain end-int upload' 389 390 chain = self.conf_get('/certificates/end-int/chain') 391 assert len(chain) == 2, 'certificate chain end-int length' 392 assert ( 393 chain[0]['subject']['common_name'] == 'end' 394 ), 'certificate chain end-int int subject common name' 395 assert ( 396 chain[0]['issuer']['common_name'] == 'int' 397 ), 'certificate chain end-int int issuer common name' 398 assert ( 399 chain[1]['subject']['common_name'] == 'int' 400 ), 'certificate chain end-int end subject common name' 401 assert ( 402 chain[1]['issuer']['common_name'] == 'root' 403 ), 'certificate chain end-int end issuer common name' 404 405 self.add_tls(cert='end-int') 406 407 assert ( 408 self.get_ssl()['status'] == 200 409 ), 'certificate chain intermediate server' 410 411 @pytest.mark.skip('not yet') 412 def test_tls_reconfigure(self): 413 self.load('empty') 414 415 assert self.get()['status'] == 200, 'init' 416 417 self.certificate() 418 419 (resp, sock) = self.get( 420 headers={'Host': 'localhost', 'Connection': 'keep-alive'}, 421 start=True, 422 read_timeout=1, 423 ) 424 425 assert resp['status'] == 200, 'initial status' 426 427 self.add_tls() 428 429 assert self.get(sock=sock)['status'] == 200, 'reconfigure status' 430 assert self.get_ssl()['status'] == 200, 'reconfigure tls status' 431 432 def test_tls_keepalive(self): 433 self.load('mirror') 434 435 assert self.get()['status'] == 200, 'init' 436 437 self.certificate() 438 439 self.add_tls(application='mirror') 440 441 (resp, sock) = self.post_ssl( 442 headers={ 443 'Host': 'localhost', 444 'Connection': 'keep-alive', 445 'Content-Type': 'text/html', 446 }, 447 start=True, 448 body='0123456789', 449 read_timeout=1, 450 ) 451 452 assert resp['body'] == '0123456789', 'keepalive 1' 453 454 resp = self.post_ssl( 455 headers={ 456 'Host': 'localhost', 457 'Connection': 'close', 458 'Content-Type': 'text/html', 459 }, 460 sock=sock, 461 body='0123456789', 462 ) 463 464 assert resp['body'] == '0123456789', 'keepalive 2' 465 466 @pytest.mark.skip('not yet') 467 def test_tls_keepalive_certificate_remove(self): 468 self.load('empty') 469 470 assert self.get()['status'] == 200, 'init' 471 472 self.certificate() 473 474 self.add_tls() 475 476 (resp, sock) = self.get_ssl( 477 headers={'Host': 'localhost', 'Connection': 'keep-alive'}, 478 start=True, 479 read_timeout=1, 480 ) 481 482 self.conf({"pass": "applications/empty"}, 'listeners/*:7080') 483 self.conf_delete('/certificates/default') 484 485 try: 486 resp = self.get_ssl( 487 headers={'Host': 'localhost', 'Connection': 'close'}, sock=sock 488 ) 489 490 except KeyboardInterrupt: 491 raise 492 493 except: 494 resp = None 495 496 assert resp == None, 'keepalive remove certificate' 497 498 @pytest.mark.skip('not yet') 499 def test_tls_certificates_remove_all(self): 500 self.load('empty') 501 502 self.certificate() 503 504 assert 'success' in self.conf_delete( 505 '/certificates' 506 ), 'remove all certificates' 507 508 def test_tls_application_respawn(self): 509 self.load('mirror') 510 511 self.certificate() 512 513 self.conf('1', 'applications/mirror/processes') 514 515 self.add_tls(application='mirror') 516 517 (_, sock) = self.post_ssl( 518 headers={ 519 'Host': 'localhost', 520 'Connection': 'keep-alive', 521 'Content-Type': 'text/html', 522 }, 523 start=True, 524 body='0123456789', 525 read_timeout=1, 526 ) 527 528 app_id = self.findall(r'(\d+)#\d+ "mirror" application started')[0] 529 530 subprocess.call(['kill', '-9', app_id]) 531 532 skip_alert(r'process %s exited on signal 9' % app_id) 533 534 self.wait_for_record( 535 re.compile( 536 r' (?!' + app_id + r'#)(\d+)#\d+ "mirror" application started' 537 ) 538 ) 539 540 resp = self.post_ssl( 541 headers={ 542 'Host': 'localhost', 543 'Connection': 'close', 544 'Content-Type': 'text/html', 545 }, 546 sock=sock, 547 body='0123456789', 548 ) 549 550 assert resp['status'] == 200, 'application respawn status' 551 assert resp['body'] == '0123456789', 'application respawn body' 552 553 def test_tls_url_scheme(self): 554 self.load('variables') 555 556 assert ( 557 self.post( 558 headers={ 559 'Host': 'localhost', 560 'Content-Type': 'text/html', 561 'Custom-Header': '', 562 'Connection': 'close', 563 } 564 )['headers']['Wsgi-Url-Scheme'] 565 == 'http' 566 ), 'url scheme http' 567 568 self.certificate() 569 570 self.add_tls(application='variables') 571 572 assert ( 573 self.post_ssl( 574 headers={ 575 'Host': 'localhost', 576 'Content-Type': 'text/html', 577 'Custom-Header': '', 578 'Connection': 'close', 579 } 580 )['headers']['Wsgi-Url-Scheme'] 581 == 'https' 582 ), 'url scheme https' 583 584 def test_tls_big_upload(self): 585 self.load('upload') 586 587 self.certificate() 588 589 self.add_tls(application='upload') 590 591 filename = 'test.txt' 592 data = '0123456789' * 9000 593 594 res = self.post_ssl( 595 body={ 596 'file': { 597 'filename': filename, 598 'type': 'text/plain', 599 'data': io.StringIO(data), 600 } 601 } 602 ) 603 assert res['status'] == 200, 'status ok' 604 assert res['body'] == filename + data
|