1import re 2import ssl 3import time 4import subprocess 5import unittest 6from unit.applications.tls import TestApplicationTLS 7 8 9class TestTLS(TestApplicationTLS): 10 prerequisites = ['python', 'openssl'] 11 12 def findall(self, pattern): 13 with open(self.testdir + '/unit.log', 'r', errors='ignore') as f: 14 return re.findall(pattern, f.read()) 15 16 def openssl_date_to_sec_epoch(self, date): 17 return self.date_to_sec_epoch(date, '%b %d %H:%M:%S %Y %Z') 18 19 def add_tls(self, application='empty', cert='default', port=7080): 20 self.conf( 21 { 22 "pass": "applications/" + application, 23 "tls": {"certificate": cert} 24 }, 25 'listeners/*:' + str(port), 26 ) 27 28 def remove_tls(self, application='empty', port=7080): 29 self.conf( 30 {"pass": "applications/" + application}, 'listeners/*:' + str(port) 31 ) 32 33 def test_tls_listener_option_add(self): 34 self.load('empty') 35 36 self.certificate() 37 38 self.add_tls() 39 40 self.assertEqual(self.get_ssl()['status'], 200, 'add listener option') 41 42 def test_tls_listener_option_remove(self): 43 self.load('empty') 44 45 self.certificate() 46 47 self.add_tls() 48 49 self.get_ssl() 50 51 self.remove_tls() 52 53 self.assertEqual(self.get()['status'], 200, 'remove listener option') 54 55 def test_tls_certificate_remove(self): 56 self.load('empty') 57 58 self.certificate() 59 60 self.assertIn( 61 'success', 62 self.conf_delete('/certificates/default'), 63 'remove certificate', 64 ) 65 66 def test_tls_certificate_remove_used(self): 67 self.load('empty') 68 69 self.certificate() 70 71 self.add_tls() 72 73 self.assertIn( 74 'error', 75 self.conf_delete('/certificates/default'), 76 'remove certificate', 77 ) 78 79 def test_tls_certificate_remove_nonexisting(self): 80 self.load('empty') 81 82 self.certificate() 83 84 self.add_tls() 85 86 self.assertIn( 87 'error', 88 self.conf_delete('/certificates/blah'), 89 'remove nonexistings certificate', 90 ) 91
| 1import re 2import ssl 3import time 4import subprocess 5import unittest 6from unit.applications.tls import TestApplicationTLS 7 8 9class TestTLS(TestApplicationTLS): 10 prerequisites = ['python', 'openssl'] 11 12 def findall(self, pattern): 13 with open(self.testdir + '/unit.log', 'r', errors='ignore') as f: 14 return re.findall(pattern, f.read()) 15 16 def openssl_date_to_sec_epoch(self, date): 17 return self.date_to_sec_epoch(date, '%b %d %H:%M:%S %Y %Z') 18 19 def add_tls(self, application='empty', cert='default', port=7080): 20 self.conf( 21 { 22 "pass": "applications/" + application, 23 "tls": {"certificate": cert} 24 }, 25 'listeners/*:' + str(port), 26 ) 27 28 def remove_tls(self, application='empty', port=7080): 29 self.conf( 30 {"pass": "applications/" + application}, 'listeners/*:' + str(port) 31 ) 32 33 def test_tls_listener_option_add(self): 34 self.load('empty') 35 36 self.certificate() 37 38 self.add_tls() 39 40 self.assertEqual(self.get_ssl()['status'], 200, 'add listener option') 41 42 def test_tls_listener_option_remove(self): 43 self.load('empty') 44 45 self.certificate() 46 47 self.add_tls() 48 49 self.get_ssl() 50 51 self.remove_tls() 52 53 self.assertEqual(self.get()['status'], 200, 'remove listener option') 54 55 def test_tls_certificate_remove(self): 56 self.load('empty') 57 58 self.certificate() 59 60 self.assertIn( 61 'success', 62 self.conf_delete('/certificates/default'), 63 'remove certificate', 64 ) 65 66 def test_tls_certificate_remove_used(self): 67 self.load('empty') 68 69 self.certificate() 70 71 self.add_tls() 72 73 self.assertIn( 74 'error', 75 self.conf_delete('/certificates/default'), 76 'remove certificate', 77 ) 78 79 def test_tls_certificate_remove_nonexisting(self): 80 self.load('empty') 81 82 self.certificate() 83 84 self.add_tls() 85 86 self.assertIn( 87 'error', 88 self.conf_delete('/certificates/blah'), 89 'remove nonexistings certificate', 90 ) 91
|
109 def test_tls_certificate_key_incorrect(self): 110 self.load('empty') 111 112 self.certificate('first', False) 113 self.certificate('second', False) 114 115 self.assertIn( 116 'error', self.certificate_load('first', 'second'), 'key incorrect' 117 ) 118 119 def test_tls_certificate_change(self): 120 self.load('empty') 121 122 self.certificate() 123 self.certificate('new') 124 125 self.add_tls() 126 127 cert_old = self.get_server_certificate() 128 129 self.add_tls(cert='new') 130 131 self.assertNotEqual( 132 cert_old, self.get_server_certificate(), 'change certificate' 133 ) 134 135 def test_tls_certificate_key_rsa(self): 136 self.load('empty') 137 138 self.certificate() 139 140 self.assertEqual( 141 self.conf_get('/certificates/default/key'), 142 'RSA (1024 bits)', 143 'certificate key rsa', 144 ) 145 146 def test_tls_certificate_key_ec(self): 147 self.load('empty') 148 149 subprocess.call( 150 [ 151 'openssl', 152 'ecparam', 153 '-noout', 154 '-genkey', 155 '-out', self.testdir + '/ec.key', 156 '-name', 'prime256v1', 157 ] 158 ) 159 160 subprocess.call( 161 [ 162 'openssl', 163 'req', 164 '-x509', 165 '-new', 166 '-subj', '/CN=ec/', 167 '-config', self.testdir + '/openssl.conf', 168 '-key', self.testdir + '/ec.key', 169 '-out', self.testdir + '/ec.crt', 170 ] 171 ) 172 173 self.certificate_load('ec') 174 175 self.assertEqual( 176 self.conf_get('/certificates/ec/key'), 'ECDH', 'certificate key ec' 177 ) 178 179 def test_tls_certificate_chain_options(self): 180 self.load('empty') 181 182 self.certificate() 183 184 chain = self.conf_get('/certificates/default/chain') 185 186 self.assertEqual(len(chain), 1, 'certificate chain length') 187 188 cert = chain[0] 189 190 self.assertEqual( 191 cert['subject']['common_name'], 192 'default', 193 'certificate subject common name', 194 ) 195 self.assertEqual( 196 cert['issuer']['common_name'], 197 'default', 198 'certificate issuer common name', 199 ) 200 201 self.assertLess( 202 abs( 203 self.sec_epoch() 204 - self.openssl_date_to_sec_epoch(cert['validity']['since']) 205 ), 206 5, 207 'certificate validity since', 208 ) 209 self.assertEqual( 210 self.openssl_date_to_sec_epoch(cert['validity']['until']) 211 - self.openssl_date_to_sec_epoch(cert['validity']['since']), 212 2592000, 213 'certificate validity until', 214 ) 215 216 def test_tls_certificate_chain(self): 217 self.load('empty') 218 219 self.certificate('root', False) 220 221 subprocess.call( 222 [ 223 'openssl', 224 'req', 225 '-new', 226 '-subj', '/CN=int/', 227 '-config', self.testdir + '/openssl.conf', 228 '-out', self.testdir + '/int.csr', 229 '-keyout', self.testdir + '/int.key', 230 ] 231 ) 232 233 subprocess.call( 234 [ 235 'openssl', 236 'req', 237 '-new', 238 '-subj', '/CN=end/', 239 '-config', self.testdir + '/openssl.conf', 240 '-out', self.testdir + '/end.csr', 241 '-keyout', self.testdir + '/end.key', 242 ] 243 ) 244 245 with open(self.testdir + '/ca.conf', 'w') as f: 246 f.write( 247 """[ ca ] 248default_ca = myca 249 250[ myca ] 251new_certs_dir = %(dir)s 252database = %(database)s 253default_md = sha1 254policy = myca_policy 255serial = %(certserial)s 256default_days = 1 257x509_extensions = myca_extensions 258 259[ myca_policy ] 260commonName = supplied 261 262[ myca_extensions ] 263basicConstraints = critical,CA:TRUE""" 264 % { 265 'dir': self.testdir, 266 'database': self.testdir + '/certindex', 267 'certserial': self.testdir + '/certserial', 268 } 269 ) 270 271 with open(self.testdir + '/certserial', 'w') as f: 272 f.write('1000') 273 274 with open(self.testdir + '/certindex', 'w') as f: 275 f.write('') 276 277 subprocess.call( 278 [ 279 'openssl', 280 'ca', 281 '-batch', 282 '-subj', '/CN=int/', 283 '-config', self.testdir + '/ca.conf', 284 '-keyfile', self.testdir + '/root.key', 285 '-cert', self.testdir + '/root.crt', 286 '-in', self.testdir + '/int.csr', 287 '-out', self.testdir + '/int.crt', 288 ] 289 ) 290 291 subprocess.call( 292 [ 293 'openssl', 294 'ca', 295 '-batch', 296 '-subj', '/CN=end/', 297 '-config', self.testdir + '/ca.conf', 298 '-keyfile', self.testdir + '/int.key', 299 '-cert', self.testdir + '/int.crt', 300 '-in', self.testdir + '/end.csr', 301 '-out', self.testdir + '/end.crt', 302 ] 303 ) 304 305 crt_path = self.testdir + '/end-int.crt' 306 end_path = self.testdir + '/end.crt' 307 int_path = self.testdir + '/int.crt' 308 309 with open(crt_path, 'wb') as crt, \ 310 open(end_path, 'rb') as end, \ 311 open(int_path, 'rb') as int: 312 crt.write(end.read() + int.read()) 313 314 self.context = ssl.create_default_context() 315 self.context.check_hostname = False 316 self.context.verify_mode = ssl.CERT_REQUIRED 317 self.context.load_verify_locations(self.testdir + '/root.crt') 318 319 # incomplete chain 320 321 self.assertIn( 322 'success', 323 self.certificate_load('end', 'end'), 324 'certificate chain end upload', 325 ) 326 327 chain = self.conf_get('/certificates/end/chain') 328 self.assertEqual(len(chain), 1, 'certificate chain end length') 329 self.assertEqual( 330 chain[0]['subject']['common_name'], 331 'end', 332 'certificate chain end subject common name', 333 ) 334 self.assertEqual( 335 chain[0]['issuer']['common_name'], 336 'int', 337 'certificate chain end issuer common name', 338 ) 339 340 self.add_tls(cert='end') 341 342 try: 343 resp = self.get_ssl() 344 except ssl.SSLError: 345 resp = None 346 347 self.assertEqual(resp, None, 'certificate chain incomplete chain') 348 349 # intermediate 350 351 self.assertIn( 352 'success', 353 self.certificate_load('int', 'int'), 354 'certificate chain int upload', 355 ) 356 357 chain = self.conf_get('/certificates/int/chain') 358 self.assertEqual(len(chain), 1, 'certificate chain int length') 359 self.assertEqual( 360 chain[0]['subject']['common_name'], 361 'int', 362 'certificate chain int subject common name', 363 ) 364 self.assertEqual( 365 chain[0]['issuer']['common_name'], 366 'root', 367 'certificate chain int issuer common name', 368 ) 369 370 self.add_tls(cert='int') 371 372 self.assertEqual( 373 self.get_ssl()['status'], 200, 'certificate chain intermediate' 374 ) 375 376 # intermediate server 377 378 self.assertIn( 379 'success', 380 self.certificate_load('end-int', 'end'), 381 'certificate chain end-int upload', 382 ) 383 384 chain = self.conf_get('/certificates/end-int/chain') 385 self.assertEqual(len(chain), 2, 'certificate chain end-int length') 386 self.assertEqual( 387 chain[0]['subject']['common_name'], 388 'end', 389 'certificate chain end-int int subject common name', 390 ) 391 self.assertEqual( 392 chain[0]['issuer']['common_name'], 393 'int', 394 'certificate chain end-int int issuer common name', 395 ) 396 self.assertEqual( 397 chain[1]['subject']['common_name'], 398 'int', 399 'certificate chain end-int end subject common name', 400 ) 401 self.assertEqual( 402 chain[1]['issuer']['common_name'], 403 'root', 404 'certificate chain end-int end issuer common name', 405 ) 406 407 self.add_tls(cert='end-int') 408 409 self.assertEqual( 410 self.get_ssl()['status'], 411 200, 412 'certificate chain intermediate server', 413 ) 414
| 109 def test_tls_certificate_key_incorrect(self): 110 self.load('empty') 111 112 self.certificate('first', False) 113 self.certificate('second', False) 114 115 self.assertIn( 116 'error', self.certificate_load('first', 'second'), 'key incorrect' 117 ) 118 119 def test_tls_certificate_change(self): 120 self.load('empty') 121 122 self.certificate() 123 self.certificate('new') 124 125 self.add_tls() 126 127 cert_old = self.get_server_certificate() 128 129 self.add_tls(cert='new') 130 131 self.assertNotEqual( 132 cert_old, self.get_server_certificate(), 'change certificate' 133 ) 134 135 def test_tls_certificate_key_rsa(self): 136 self.load('empty') 137 138 self.certificate() 139 140 self.assertEqual( 141 self.conf_get('/certificates/default/key'), 142 'RSA (1024 bits)', 143 'certificate key rsa', 144 ) 145 146 def test_tls_certificate_key_ec(self): 147 self.load('empty') 148 149 subprocess.call( 150 [ 151 'openssl', 152 'ecparam', 153 '-noout', 154 '-genkey', 155 '-out', self.testdir + '/ec.key', 156 '-name', 'prime256v1', 157 ] 158 ) 159 160 subprocess.call( 161 [ 162 'openssl', 163 'req', 164 '-x509', 165 '-new', 166 '-subj', '/CN=ec/', 167 '-config', self.testdir + '/openssl.conf', 168 '-key', self.testdir + '/ec.key', 169 '-out', self.testdir + '/ec.crt', 170 ] 171 ) 172 173 self.certificate_load('ec') 174 175 self.assertEqual( 176 self.conf_get('/certificates/ec/key'), 'ECDH', 'certificate key ec' 177 ) 178 179 def test_tls_certificate_chain_options(self): 180 self.load('empty') 181 182 self.certificate() 183 184 chain = self.conf_get('/certificates/default/chain') 185 186 self.assertEqual(len(chain), 1, 'certificate chain length') 187 188 cert = chain[0] 189 190 self.assertEqual( 191 cert['subject']['common_name'], 192 'default', 193 'certificate subject common name', 194 ) 195 self.assertEqual( 196 cert['issuer']['common_name'], 197 'default', 198 'certificate issuer common name', 199 ) 200 201 self.assertLess( 202 abs( 203 self.sec_epoch() 204 - self.openssl_date_to_sec_epoch(cert['validity']['since']) 205 ), 206 5, 207 'certificate validity since', 208 ) 209 self.assertEqual( 210 self.openssl_date_to_sec_epoch(cert['validity']['until']) 211 - self.openssl_date_to_sec_epoch(cert['validity']['since']), 212 2592000, 213 'certificate validity until', 214 ) 215 216 def test_tls_certificate_chain(self): 217 self.load('empty') 218 219 self.certificate('root', False) 220 221 subprocess.call( 222 [ 223 'openssl', 224 'req', 225 '-new', 226 '-subj', '/CN=int/', 227 '-config', self.testdir + '/openssl.conf', 228 '-out', self.testdir + '/int.csr', 229 '-keyout', self.testdir + '/int.key', 230 ] 231 ) 232 233 subprocess.call( 234 [ 235 'openssl', 236 'req', 237 '-new', 238 '-subj', '/CN=end/', 239 '-config', self.testdir + '/openssl.conf', 240 '-out', self.testdir + '/end.csr', 241 '-keyout', self.testdir + '/end.key', 242 ] 243 ) 244 245 with open(self.testdir + '/ca.conf', 'w') as f: 246 f.write( 247 """[ ca ] 248default_ca = myca 249 250[ myca ] 251new_certs_dir = %(dir)s 252database = %(database)s 253default_md = sha1 254policy = myca_policy 255serial = %(certserial)s 256default_days = 1 257x509_extensions = myca_extensions 258 259[ myca_policy ] 260commonName = supplied 261 262[ myca_extensions ] 263basicConstraints = critical,CA:TRUE""" 264 % { 265 'dir': self.testdir, 266 'database': self.testdir + '/certindex', 267 'certserial': self.testdir + '/certserial', 268 } 269 ) 270 271 with open(self.testdir + '/certserial', 'w') as f: 272 f.write('1000') 273 274 with open(self.testdir + '/certindex', 'w') as f: 275 f.write('') 276 277 subprocess.call( 278 [ 279 'openssl', 280 'ca', 281 '-batch', 282 '-subj', '/CN=int/', 283 '-config', self.testdir + '/ca.conf', 284 '-keyfile', self.testdir + '/root.key', 285 '-cert', self.testdir + '/root.crt', 286 '-in', self.testdir + '/int.csr', 287 '-out', self.testdir + '/int.crt', 288 ] 289 ) 290 291 subprocess.call( 292 [ 293 'openssl', 294 'ca', 295 '-batch', 296 '-subj', '/CN=end/', 297 '-config', self.testdir + '/ca.conf', 298 '-keyfile', self.testdir + '/int.key', 299 '-cert', self.testdir + '/int.crt', 300 '-in', self.testdir + '/end.csr', 301 '-out', self.testdir + '/end.crt', 302 ] 303 ) 304 305 crt_path = self.testdir + '/end-int.crt' 306 end_path = self.testdir + '/end.crt' 307 int_path = self.testdir + '/int.crt' 308 309 with open(crt_path, 'wb') as crt, \ 310 open(end_path, 'rb') as end, \ 311 open(int_path, 'rb') as int: 312 crt.write(end.read() + int.read()) 313 314 self.context = ssl.create_default_context() 315 self.context.check_hostname = False 316 self.context.verify_mode = ssl.CERT_REQUIRED 317 self.context.load_verify_locations(self.testdir + '/root.crt') 318 319 # incomplete chain 320 321 self.assertIn( 322 'success', 323 self.certificate_load('end', 'end'), 324 'certificate chain end upload', 325 ) 326 327 chain = self.conf_get('/certificates/end/chain') 328 self.assertEqual(len(chain), 1, 'certificate chain end length') 329 self.assertEqual( 330 chain[0]['subject']['common_name'], 331 'end', 332 'certificate chain end subject common name', 333 ) 334 self.assertEqual( 335 chain[0]['issuer']['common_name'], 336 'int', 337 'certificate chain end issuer common name', 338 ) 339 340 self.add_tls(cert='end') 341 342 try: 343 resp = self.get_ssl() 344 except ssl.SSLError: 345 resp = None 346 347 self.assertEqual(resp, None, 'certificate chain incomplete chain') 348 349 # intermediate 350 351 self.assertIn( 352 'success', 353 self.certificate_load('int', 'int'), 354 'certificate chain int upload', 355 ) 356 357 chain = self.conf_get('/certificates/int/chain') 358 self.assertEqual(len(chain), 1, 'certificate chain int length') 359 self.assertEqual( 360 chain[0]['subject']['common_name'], 361 'int', 362 'certificate chain int subject common name', 363 ) 364 self.assertEqual( 365 chain[0]['issuer']['common_name'], 366 'root', 367 'certificate chain int issuer common name', 368 ) 369 370 self.add_tls(cert='int') 371 372 self.assertEqual( 373 self.get_ssl()['status'], 200, 'certificate chain intermediate' 374 ) 375 376 # intermediate server 377 378 self.assertIn( 379 'success', 380 self.certificate_load('end-int', 'end'), 381 'certificate chain end-int upload', 382 ) 383 384 chain = self.conf_get('/certificates/end-int/chain') 385 self.assertEqual(len(chain), 2, 'certificate chain end-int length') 386 self.assertEqual( 387 chain[0]['subject']['common_name'], 388 'end', 389 'certificate chain end-int int subject common name', 390 ) 391 self.assertEqual( 392 chain[0]['issuer']['common_name'], 393 'int', 394 'certificate chain end-int int issuer common name', 395 ) 396 self.assertEqual( 397 chain[1]['subject']['common_name'], 398 'int', 399 'certificate chain end-int end subject common name', 400 ) 401 self.assertEqual( 402 chain[1]['issuer']['common_name'], 403 'root', 404 'certificate chain end-int end issuer common name', 405 ) 406 407 self.add_tls(cert='end-int') 408 409 self.assertEqual( 410 self.get_ssl()['status'], 411 200, 412 'certificate chain intermediate server', 413 ) 414
|
416 def test_tls_reconfigure(self): 417 self.load('empty') 418 419 self.assertEqual(self.get()['status'], 200, 'init') 420 421 self.certificate() 422 423 (resp, sock) = self.get( 424 headers={'Host': 'localhost', 'Connection': 'keep-alive'}, 425 start=True, 426 read_timeout=1, 427 ) 428 429 self.assertEqual(resp['status'], 200, 'initial status') 430 431 self.add_tls() 432 433 self.assertEqual( 434 self.get(sock=sock)['status'], 200, 'reconfigure status' 435 ) 436 self.assertEqual( 437 self.get_ssl()['status'], 200, 'reconfigure tls status' 438 ) 439 440 def test_tls_keepalive(self): 441 self.load('mirror') 442 443 self.assertEqual(self.get()['status'], 200, 'init') 444 445 self.certificate() 446 447 self.add_tls(application='mirror') 448 449 (resp, sock) = self.post_ssl( 450 headers={ 451 'Host': 'localhost', 452 'Connection': 'keep-alive', 453 'Content-Type': 'text/html', 454 }, 455 start=True, 456 body='0123456789', 457 read_timeout=1, 458 ) 459 460 self.assertEqual(resp['body'], '0123456789', 'keepalive 1') 461 462 resp = self.post_ssl( 463 headers={ 464 'Host': 'localhost', 465 'Connection': 'close', 466 'Content-Type': 'text/html', 467 }, 468 sock=sock, 469 body='0123456789', 470 ) 471 472 self.assertEqual(resp['body'], '0123456789', 'keepalive 2') 473
| 416 def test_tls_reconfigure(self): 417 self.load('empty') 418 419 self.assertEqual(self.get()['status'], 200, 'init') 420 421 self.certificate() 422 423 (resp, sock) = self.get( 424 headers={'Host': 'localhost', 'Connection': 'keep-alive'}, 425 start=True, 426 read_timeout=1, 427 ) 428 429 self.assertEqual(resp['status'], 200, 'initial status') 430 431 self.add_tls() 432 433 self.assertEqual( 434 self.get(sock=sock)['status'], 200, 'reconfigure status' 435 ) 436 self.assertEqual( 437 self.get_ssl()['status'], 200, 'reconfigure tls status' 438 ) 439 440 def test_tls_keepalive(self): 441 self.load('mirror') 442 443 self.assertEqual(self.get()['status'], 200, 'init') 444 445 self.certificate() 446 447 self.add_tls(application='mirror') 448 449 (resp, sock) = self.post_ssl( 450 headers={ 451 'Host': 'localhost', 452 'Connection': 'keep-alive', 453 'Content-Type': 'text/html', 454 }, 455 start=True, 456 body='0123456789', 457 read_timeout=1, 458 ) 459 460 self.assertEqual(resp['body'], '0123456789', 'keepalive 1') 461 462 resp = self.post_ssl( 463 headers={ 464 'Host': 'localhost', 465 'Connection': 'close', 466 'Content-Type': 'text/html', 467 }, 468 sock=sock, 469 body='0123456789', 470 ) 471 472 self.assertEqual(resp['body'], '0123456789', 'keepalive 2') 473
|
503 def test_tls_certificates_remove_all(self): 504 self.load('empty') 505 506 self.certificate() 507 508 self.assertIn( 509 'success', 510 self.conf_delete('/certificates'), 511 'remove all certificates', 512 ) 513 514 def test_tls_application_respawn(self): 515 self.skip_alerts.append(r'process \d+ exited on signal 9') 516 self.load('mirror') 517 518 self.assertEqual(self.get()['status'], 200, 'init') 519 520 self.certificate() 521 522 self.conf('1', 'applications/mirror/processes') 523 524 self.add_tls(application='mirror') 525 526 (resp, sock) = self.post_ssl( 527 headers={ 528 'Host': 'localhost', 529 'Connection': 'keep-alive', 530 'Content-Type': 'text/html', 531 }, 532 start=True, 533 body='0123456789', 534 read_timeout=1, 535 ) 536 537 app_id = self.findall(r'(\d+)#\d+ "mirror" application started')[0] 538 539 subprocess.call(['kill', '-9', app_id]) 540 541 self.wait_for_record( 542 re.compile( 543 ' (?!' + app_id + '#)(\d+)#\d+ "mirror" application started' 544 ) 545 ) 546 547 resp = self.post_ssl( 548 headers={ 549 'Host': 'localhost', 550 'Connection': 'close', 551 'Content-Type': 'text/html', 552 }, 553 sock=sock, 554 body='0123456789', 555 ) 556 557 self.assertEqual(resp['status'], 200, 'application respawn status') 558 self.assertEqual( 559 resp['body'], '0123456789', 'application respawn body' 560 ) 561 562 def test_tls_url_scheme(self): 563 self.load('variables') 564 565 self.assertEqual( 566 self.post( 567 headers={ 568 'Host': 'localhost', 569 'Content-Type': 'text/html', 570 'Custom-Header': '', 571 'Connection': 'close', 572 } 573 )['headers']['Wsgi-Url-Scheme'], 574 'http', 575 'url scheme http', 576 ) 577 578 self.certificate() 579 580 self.add_tls(application='variables') 581 582 self.assertEqual( 583 self.post_ssl( 584 headers={ 585 'Host': 'localhost', 586 'Content-Type': 'text/html', 587 'Custom-Header': '', 588 'Connection': 'close', 589 } 590 )['headers']['Wsgi-Url-Scheme'], 591 'https', 592 'url scheme https', 593 ) 594 595if __name__ == '__main__': 596 TestTLS.main()
| 503 def test_tls_certificates_remove_all(self): 504 self.load('empty') 505 506 self.certificate() 507 508 self.assertIn( 509 'success', 510 self.conf_delete('/certificates'), 511 'remove all certificates', 512 ) 513 514 def test_tls_application_respawn(self): 515 self.skip_alerts.append(r'process \d+ exited on signal 9') 516 self.load('mirror') 517 518 self.assertEqual(self.get()['status'], 200, 'init') 519 520 self.certificate() 521 522 self.conf('1', 'applications/mirror/processes') 523 524 self.add_tls(application='mirror') 525 526 (resp, sock) = self.post_ssl( 527 headers={ 528 'Host': 'localhost', 529 'Connection': 'keep-alive', 530 'Content-Type': 'text/html', 531 }, 532 start=True, 533 body='0123456789', 534 read_timeout=1, 535 ) 536 537 app_id = self.findall(r'(\d+)#\d+ "mirror" application started')[0] 538 539 subprocess.call(['kill', '-9', app_id]) 540 541 self.wait_for_record( 542 re.compile( 543 ' (?!' + app_id + '#)(\d+)#\d+ "mirror" application started' 544 ) 545 ) 546 547 resp = self.post_ssl( 548 headers={ 549 'Host': 'localhost', 550 'Connection': 'close', 551 'Content-Type': 'text/html', 552 }, 553 sock=sock, 554 body='0123456789', 555 ) 556 557 self.assertEqual(resp['status'], 200, 'application respawn status') 558 self.assertEqual( 559 resp['body'], '0123456789', 'application respawn body' 560 ) 561 562 def test_tls_url_scheme(self): 563 self.load('variables') 564 565 self.assertEqual( 566 self.post( 567 headers={ 568 'Host': 'localhost', 569 'Content-Type': 'text/html', 570 'Custom-Header': '', 571 'Connection': 'close', 572 } 573 )['headers']['Wsgi-Url-Scheme'], 574 'http', 575 'url scheme http', 576 ) 577 578 self.certificate() 579 580 self.add_tls(application='variables') 581 582 self.assertEqual( 583 self.post_ssl( 584 headers={ 585 'Host': 'localhost', 586 'Content-Type': 'text/html', 587 'Custom-Header': '', 588 'Connection': 'close', 589 } 590 )['headers']['Wsgi-Url-Scheme'], 591 'https', 592 'url scheme https', 593 ) 594 595if __name__ == '__main__': 596 TestTLS.main()
|