test_tls.py (1041:9bdd46610ea9) test_tls.py (1064:75a64629661f)
1import re
2import ssl
3import time
4import subprocess
5import unittest
6from unit.applications.tls import TestApplicationTLS
7
8
9class TestTLS(TestApplicationTLS):
10 prerequisites = ['python', 'openssl']
11
12 def findall(self, pattern):
13 with open(self.testdir + '/unit.log', 'r', errors='ignore') as f:
14 return re.findall(pattern, f.read())
15
16 def openssl_date_to_sec_epoch(self, date):
17 return self.date_to_sec_epoch(date, '%b %d %H:%M:%S %Y %Z')
18
19 def add_tls(self, application='empty', cert='default', port=7080):
20 self.conf(
21 {
22 "pass": "applications/" + application,
23 "tls": {"certificate": cert}
24 },
25 'listeners/*:' + str(port),
26 )
27
28 def remove_tls(self, application='empty', port=7080):
29 self.conf(
30 {"pass": "applications/" + application}, 'listeners/*:' + str(port)
31 )
32
33 def test_tls_listener_option_add(self):
34 self.load('empty')
35
36 self.certificate()
37
38 self.add_tls()
39
40 self.assertEqual(self.get_ssl()['status'], 200, 'add listener option')
41
42 def test_tls_listener_option_remove(self):
43 self.load('empty')
44
45 self.certificate()
46
47 self.add_tls()
48
49 self.get_ssl()
50
51 self.remove_tls()
52
53 self.assertEqual(self.get()['status'], 200, 'remove listener option')
54
55 def test_tls_certificate_remove(self):
56 self.load('empty')
57
58 self.certificate()
59
60 self.assertIn(
61 'success',
62 self.conf_delete('/certificates/default'),
63 'remove certificate',
64 )
65
66 def test_tls_certificate_remove_used(self):
67 self.load('empty')
68
69 self.certificate()
70
71 self.add_tls()
72
73 self.assertIn(
74 'error',
75 self.conf_delete('/certificates/default'),
76 'remove certificate',
77 )
78
79 def test_tls_certificate_remove_nonexisting(self):
80 self.load('empty')
81
82 self.certificate()
83
84 self.add_tls()
85
86 self.assertIn(
87 'error',
88 self.conf_delete('/certificates/blah'),
89 'remove nonexistings certificate',
90 )
91
1import re
2import ssl
3import time
4import subprocess
5import unittest
6from unit.applications.tls import TestApplicationTLS
7
8
9class TestTLS(TestApplicationTLS):
10 prerequisites = ['python', 'openssl']
11
12 def findall(self, pattern):
13 with open(self.testdir + '/unit.log', 'r', errors='ignore') as f:
14 return re.findall(pattern, f.read())
15
16 def openssl_date_to_sec_epoch(self, date):
17 return self.date_to_sec_epoch(date, '%b %d %H:%M:%S %Y %Z')
18
19 def add_tls(self, application='empty', cert='default', port=7080):
20 self.conf(
21 {
22 "pass": "applications/" + application,
23 "tls": {"certificate": cert}
24 },
25 'listeners/*:' + str(port),
26 )
27
28 def remove_tls(self, application='empty', port=7080):
29 self.conf(
30 {"pass": "applications/" + application}, 'listeners/*:' + str(port)
31 )
32
33 def test_tls_listener_option_add(self):
34 self.load('empty')
35
36 self.certificate()
37
38 self.add_tls()
39
40 self.assertEqual(self.get_ssl()['status'], 200, 'add listener option')
41
42 def test_tls_listener_option_remove(self):
43 self.load('empty')
44
45 self.certificate()
46
47 self.add_tls()
48
49 self.get_ssl()
50
51 self.remove_tls()
52
53 self.assertEqual(self.get()['status'], 200, 'remove listener option')
54
55 def test_tls_certificate_remove(self):
56 self.load('empty')
57
58 self.certificate()
59
60 self.assertIn(
61 'success',
62 self.conf_delete('/certificates/default'),
63 'remove certificate',
64 )
65
66 def test_tls_certificate_remove_used(self):
67 self.load('empty')
68
69 self.certificate()
70
71 self.add_tls()
72
73 self.assertIn(
74 'error',
75 self.conf_delete('/certificates/default'),
76 'remove certificate',
77 )
78
79 def test_tls_certificate_remove_nonexisting(self):
80 self.load('empty')
81
82 self.certificate()
83
84 self.add_tls()
85
86 self.assertIn(
87 'error',
88 self.conf_delete('/certificates/blah'),
89 'remove nonexistings certificate',
90 )
91
92 @unittest.expectedFailure
92 @unittest.skip('not yet')
93 def test_tls_certificate_update(self):
94 self.load('empty')
95
96 self.certificate()
97
98 self.add_tls()
99
100 cert_old = self.get_server_certificate()
101
102 self.certificate()
103
104 self.assertNotEqual(
105 cert_old, self.get_server_certificate(), 'update certificate'
106 )
107
93 def test_tls_certificate_update(self):
94 self.load('empty')
95
96 self.certificate()
97
98 self.add_tls()
99
100 cert_old = self.get_server_certificate()
101
102 self.certificate()
103
104 self.assertNotEqual(
105 cert_old, self.get_server_certificate(), 'update certificate'
106 )
107
108 @unittest.expectedFailure
108 @unittest.skip('not yet')
109 def test_tls_certificate_key_incorrect(self):
110 self.load('empty')
111
112 self.certificate('first', False)
113 self.certificate('second', False)
114
115 self.assertIn(
116 'error', self.certificate_load('first', 'second'), 'key incorrect'
117 )
118
119 def test_tls_certificate_change(self):
120 self.load('empty')
121
122 self.certificate()
123 self.certificate('new')
124
125 self.add_tls()
126
127 cert_old = self.get_server_certificate()
128
129 self.add_tls(cert='new')
130
131 self.assertNotEqual(
132 cert_old, self.get_server_certificate(), 'change certificate'
133 )
134
135 def test_tls_certificate_key_rsa(self):
136 self.load('empty')
137
138 self.certificate()
139
140 self.assertEqual(
141 self.conf_get('/certificates/default/key'),
142 'RSA (1024 bits)',
143 'certificate key rsa',
144 )
145
146 def test_tls_certificate_key_ec(self):
147 self.load('empty')
148
149 subprocess.call(
150 [
151 'openssl',
152 'ecparam',
153 '-noout',
154 '-genkey',
155 '-out', self.testdir + '/ec.key',
156 '-name', 'prime256v1',
157 ]
158 )
159
160 subprocess.call(
161 [
162 'openssl',
163 'req',
164 '-x509',
165 '-new',
166 '-subj', '/CN=ec/',
167 '-config', self.testdir + '/openssl.conf',
168 '-key', self.testdir + '/ec.key',
169 '-out', self.testdir + '/ec.crt',
170 ]
171 )
172
173 self.certificate_load('ec')
174
175 self.assertEqual(
176 self.conf_get('/certificates/ec/key'), 'ECDH', 'certificate key ec'
177 )
178
179 def test_tls_certificate_chain_options(self):
180 self.load('empty')
181
182 self.certificate()
183
184 chain = self.conf_get('/certificates/default/chain')
185
186 self.assertEqual(len(chain), 1, 'certificate chain length')
187
188 cert = chain[0]
189
190 self.assertEqual(
191 cert['subject']['common_name'],
192 'default',
193 'certificate subject common name',
194 )
195 self.assertEqual(
196 cert['issuer']['common_name'],
197 'default',
198 'certificate issuer common name',
199 )
200
201 self.assertLess(
202 abs(
203 self.sec_epoch()
204 - self.openssl_date_to_sec_epoch(cert['validity']['since'])
205 ),
206 5,
207 'certificate validity since',
208 )
209 self.assertEqual(
210 self.openssl_date_to_sec_epoch(cert['validity']['until'])
211 - self.openssl_date_to_sec_epoch(cert['validity']['since']),
212 2592000,
213 'certificate validity until',
214 )
215
216 def test_tls_certificate_chain(self):
217 self.load('empty')
218
219 self.certificate('root', False)
220
221 subprocess.call(
222 [
223 'openssl',
224 'req',
225 '-new',
226 '-subj', '/CN=int/',
227 '-config', self.testdir + '/openssl.conf',
228 '-out', self.testdir + '/int.csr',
229 '-keyout', self.testdir + '/int.key',
230 ]
231 )
232
233 subprocess.call(
234 [
235 'openssl',
236 'req',
237 '-new',
238 '-subj', '/CN=end/',
239 '-config', self.testdir + '/openssl.conf',
240 '-out', self.testdir + '/end.csr',
241 '-keyout', self.testdir + '/end.key',
242 ]
243 )
244
245 with open(self.testdir + '/ca.conf', 'w') as f:
246 f.write(
247 """[ ca ]
248default_ca = myca
249
250[ myca ]
251new_certs_dir = %(dir)s
252database = %(database)s
253default_md = sha1
254policy = myca_policy
255serial = %(certserial)s
256default_days = 1
257x509_extensions = myca_extensions
258
259[ myca_policy ]
260commonName = supplied
261
262[ myca_extensions ]
263basicConstraints = critical,CA:TRUE"""
264 % {
265 'dir': self.testdir,
266 'database': self.testdir + '/certindex',
267 'certserial': self.testdir + '/certserial',
268 }
269 )
270
271 with open(self.testdir + '/certserial', 'w') as f:
272 f.write('1000')
273
274 with open(self.testdir + '/certindex', 'w') as f:
275 f.write('')
276
277 subprocess.call(
278 [
279 'openssl',
280 'ca',
281 '-batch',
282 '-subj', '/CN=int/',
283 '-config', self.testdir + '/ca.conf',
284 '-keyfile', self.testdir + '/root.key',
285 '-cert', self.testdir + '/root.crt',
286 '-in', self.testdir + '/int.csr',
287 '-out', self.testdir + '/int.crt',
288 ]
289 )
290
291 subprocess.call(
292 [
293 'openssl',
294 'ca',
295 '-batch',
296 '-subj', '/CN=end/',
297 '-config', self.testdir + '/ca.conf',
298 '-keyfile', self.testdir + '/int.key',
299 '-cert', self.testdir + '/int.crt',
300 '-in', self.testdir + '/end.csr',
301 '-out', self.testdir + '/end.crt',
302 ]
303 )
304
305 crt_path = self.testdir + '/end-int.crt'
306 end_path = self.testdir + '/end.crt'
307 int_path = self.testdir + '/int.crt'
308
309 with open(crt_path, 'wb') as crt, \
310 open(end_path, 'rb') as end, \
311 open(int_path, 'rb') as int:
312 crt.write(end.read() + int.read())
313
314 self.context = ssl.create_default_context()
315 self.context.check_hostname = False
316 self.context.verify_mode = ssl.CERT_REQUIRED
317 self.context.load_verify_locations(self.testdir + '/root.crt')
318
319 # incomplete chain
320
321 self.assertIn(
322 'success',
323 self.certificate_load('end', 'end'),
324 'certificate chain end upload',
325 )
326
327 chain = self.conf_get('/certificates/end/chain')
328 self.assertEqual(len(chain), 1, 'certificate chain end length')
329 self.assertEqual(
330 chain[0]['subject']['common_name'],
331 'end',
332 'certificate chain end subject common name',
333 )
334 self.assertEqual(
335 chain[0]['issuer']['common_name'],
336 'int',
337 'certificate chain end issuer common name',
338 )
339
340 self.add_tls(cert='end')
341
342 try:
343 resp = self.get_ssl()
344 except ssl.SSLError:
345 resp = None
346
347 self.assertEqual(resp, None, 'certificate chain incomplete chain')
348
349 # intermediate
350
351 self.assertIn(
352 'success',
353 self.certificate_load('int', 'int'),
354 'certificate chain int upload',
355 )
356
357 chain = self.conf_get('/certificates/int/chain')
358 self.assertEqual(len(chain), 1, 'certificate chain int length')
359 self.assertEqual(
360 chain[0]['subject']['common_name'],
361 'int',
362 'certificate chain int subject common name',
363 )
364 self.assertEqual(
365 chain[0]['issuer']['common_name'],
366 'root',
367 'certificate chain int issuer common name',
368 )
369
370 self.add_tls(cert='int')
371
372 self.assertEqual(
373 self.get_ssl()['status'], 200, 'certificate chain intermediate'
374 )
375
376 # intermediate server
377
378 self.assertIn(
379 'success',
380 self.certificate_load('end-int', 'end'),
381 'certificate chain end-int upload',
382 )
383
384 chain = self.conf_get('/certificates/end-int/chain')
385 self.assertEqual(len(chain), 2, 'certificate chain end-int length')
386 self.assertEqual(
387 chain[0]['subject']['common_name'],
388 'end',
389 'certificate chain end-int int subject common name',
390 )
391 self.assertEqual(
392 chain[0]['issuer']['common_name'],
393 'int',
394 'certificate chain end-int int issuer common name',
395 )
396 self.assertEqual(
397 chain[1]['subject']['common_name'],
398 'int',
399 'certificate chain end-int end subject common name',
400 )
401 self.assertEqual(
402 chain[1]['issuer']['common_name'],
403 'root',
404 'certificate chain end-int end issuer common name',
405 )
406
407 self.add_tls(cert='end-int')
408
409 self.assertEqual(
410 self.get_ssl()['status'],
411 200,
412 'certificate chain intermediate server',
413 )
414
109 def test_tls_certificate_key_incorrect(self):
110 self.load('empty')
111
112 self.certificate('first', False)
113 self.certificate('second', False)
114
115 self.assertIn(
116 'error', self.certificate_load('first', 'second'), 'key incorrect'
117 )
118
119 def test_tls_certificate_change(self):
120 self.load('empty')
121
122 self.certificate()
123 self.certificate('new')
124
125 self.add_tls()
126
127 cert_old = self.get_server_certificate()
128
129 self.add_tls(cert='new')
130
131 self.assertNotEqual(
132 cert_old, self.get_server_certificate(), 'change certificate'
133 )
134
135 def test_tls_certificate_key_rsa(self):
136 self.load('empty')
137
138 self.certificate()
139
140 self.assertEqual(
141 self.conf_get('/certificates/default/key'),
142 'RSA (1024 bits)',
143 'certificate key rsa',
144 )
145
146 def test_tls_certificate_key_ec(self):
147 self.load('empty')
148
149 subprocess.call(
150 [
151 'openssl',
152 'ecparam',
153 '-noout',
154 '-genkey',
155 '-out', self.testdir + '/ec.key',
156 '-name', 'prime256v1',
157 ]
158 )
159
160 subprocess.call(
161 [
162 'openssl',
163 'req',
164 '-x509',
165 '-new',
166 '-subj', '/CN=ec/',
167 '-config', self.testdir + '/openssl.conf',
168 '-key', self.testdir + '/ec.key',
169 '-out', self.testdir + '/ec.crt',
170 ]
171 )
172
173 self.certificate_load('ec')
174
175 self.assertEqual(
176 self.conf_get('/certificates/ec/key'), 'ECDH', 'certificate key ec'
177 )
178
179 def test_tls_certificate_chain_options(self):
180 self.load('empty')
181
182 self.certificate()
183
184 chain = self.conf_get('/certificates/default/chain')
185
186 self.assertEqual(len(chain), 1, 'certificate chain length')
187
188 cert = chain[0]
189
190 self.assertEqual(
191 cert['subject']['common_name'],
192 'default',
193 'certificate subject common name',
194 )
195 self.assertEqual(
196 cert['issuer']['common_name'],
197 'default',
198 'certificate issuer common name',
199 )
200
201 self.assertLess(
202 abs(
203 self.sec_epoch()
204 - self.openssl_date_to_sec_epoch(cert['validity']['since'])
205 ),
206 5,
207 'certificate validity since',
208 )
209 self.assertEqual(
210 self.openssl_date_to_sec_epoch(cert['validity']['until'])
211 - self.openssl_date_to_sec_epoch(cert['validity']['since']),
212 2592000,
213 'certificate validity until',
214 )
215
216 def test_tls_certificate_chain(self):
217 self.load('empty')
218
219 self.certificate('root', False)
220
221 subprocess.call(
222 [
223 'openssl',
224 'req',
225 '-new',
226 '-subj', '/CN=int/',
227 '-config', self.testdir + '/openssl.conf',
228 '-out', self.testdir + '/int.csr',
229 '-keyout', self.testdir + '/int.key',
230 ]
231 )
232
233 subprocess.call(
234 [
235 'openssl',
236 'req',
237 '-new',
238 '-subj', '/CN=end/',
239 '-config', self.testdir + '/openssl.conf',
240 '-out', self.testdir + '/end.csr',
241 '-keyout', self.testdir + '/end.key',
242 ]
243 )
244
245 with open(self.testdir + '/ca.conf', 'w') as f:
246 f.write(
247 """[ ca ]
248default_ca = myca
249
250[ myca ]
251new_certs_dir = %(dir)s
252database = %(database)s
253default_md = sha1
254policy = myca_policy
255serial = %(certserial)s
256default_days = 1
257x509_extensions = myca_extensions
258
259[ myca_policy ]
260commonName = supplied
261
262[ myca_extensions ]
263basicConstraints = critical,CA:TRUE"""
264 % {
265 'dir': self.testdir,
266 'database': self.testdir + '/certindex',
267 'certserial': self.testdir + '/certserial',
268 }
269 )
270
271 with open(self.testdir + '/certserial', 'w') as f:
272 f.write('1000')
273
274 with open(self.testdir + '/certindex', 'w') as f:
275 f.write('')
276
277 subprocess.call(
278 [
279 'openssl',
280 'ca',
281 '-batch',
282 '-subj', '/CN=int/',
283 '-config', self.testdir + '/ca.conf',
284 '-keyfile', self.testdir + '/root.key',
285 '-cert', self.testdir + '/root.crt',
286 '-in', self.testdir + '/int.csr',
287 '-out', self.testdir + '/int.crt',
288 ]
289 )
290
291 subprocess.call(
292 [
293 'openssl',
294 'ca',
295 '-batch',
296 '-subj', '/CN=end/',
297 '-config', self.testdir + '/ca.conf',
298 '-keyfile', self.testdir + '/int.key',
299 '-cert', self.testdir + '/int.crt',
300 '-in', self.testdir + '/end.csr',
301 '-out', self.testdir + '/end.crt',
302 ]
303 )
304
305 crt_path = self.testdir + '/end-int.crt'
306 end_path = self.testdir + '/end.crt'
307 int_path = self.testdir + '/int.crt'
308
309 with open(crt_path, 'wb') as crt, \
310 open(end_path, 'rb') as end, \
311 open(int_path, 'rb') as int:
312 crt.write(end.read() + int.read())
313
314 self.context = ssl.create_default_context()
315 self.context.check_hostname = False
316 self.context.verify_mode = ssl.CERT_REQUIRED
317 self.context.load_verify_locations(self.testdir + '/root.crt')
318
319 # incomplete chain
320
321 self.assertIn(
322 'success',
323 self.certificate_load('end', 'end'),
324 'certificate chain end upload',
325 )
326
327 chain = self.conf_get('/certificates/end/chain')
328 self.assertEqual(len(chain), 1, 'certificate chain end length')
329 self.assertEqual(
330 chain[0]['subject']['common_name'],
331 'end',
332 'certificate chain end subject common name',
333 )
334 self.assertEqual(
335 chain[0]['issuer']['common_name'],
336 'int',
337 'certificate chain end issuer common name',
338 )
339
340 self.add_tls(cert='end')
341
342 try:
343 resp = self.get_ssl()
344 except ssl.SSLError:
345 resp = None
346
347 self.assertEqual(resp, None, 'certificate chain incomplete chain')
348
349 # intermediate
350
351 self.assertIn(
352 'success',
353 self.certificate_load('int', 'int'),
354 'certificate chain int upload',
355 )
356
357 chain = self.conf_get('/certificates/int/chain')
358 self.assertEqual(len(chain), 1, 'certificate chain int length')
359 self.assertEqual(
360 chain[0]['subject']['common_name'],
361 'int',
362 'certificate chain int subject common name',
363 )
364 self.assertEqual(
365 chain[0]['issuer']['common_name'],
366 'root',
367 'certificate chain int issuer common name',
368 )
369
370 self.add_tls(cert='int')
371
372 self.assertEqual(
373 self.get_ssl()['status'], 200, 'certificate chain intermediate'
374 )
375
376 # intermediate server
377
378 self.assertIn(
379 'success',
380 self.certificate_load('end-int', 'end'),
381 'certificate chain end-int upload',
382 )
383
384 chain = self.conf_get('/certificates/end-int/chain')
385 self.assertEqual(len(chain), 2, 'certificate chain end-int length')
386 self.assertEqual(
387 chain[0]['subject']['common_name'],
388 'end',
389 'certificate chain end-int int subject common name',
390 )
391 self.assertEqual(
392 chain[0]['issuer']['common_name'],
393 'int',
394 'certificate chain end-int int issuer common name',
395 )
396 self.assertEqual(
397 chain[1]['subject']['common_name'],
398 'int',
399 'certificate chain end-int end subject common name',
400 )
401 self.assertEqual(
402 chain[1]['issuer']['common_name'],
403 'root',
404 'certificate chain end-int end issuer common name',
405 )
406
407 self.add_tls(cert='end-int')
408
409 self.assertEqual(
410 self.get_ssl()['status'],
411 200,
412 'certificate chain intermediate server',
413 )
414
415 @unittest.expectedFailure
415 @unittest.skip('not yet')
416 def test_tls_reconfigure(self):
417 self.load('empty')
418
419 self.assertEqual(self.get()['status'], 200, 'init')
420
421 self.certificate()
422
423 (resp, sock) = self.get(
424 headers={'Host': 'localhost', 'Connection': 'keep-alive'},
425 start=True,
426 read_timeout=1,
427 )
428
429 self.assertEqual(resp['status'], 200, 'initial status')
430
431 self.add_tls()
432
433 self.assertEqual(
434 self.get(sock=sock)['status'], 200, 'reconfigure status'
435 )
436 self.assertEqual(
437 self.get_ssl()['status'], 200, 'reconfigure tls status'
438 )
439
440 def test_tls_keepalive(self):
441 self.load('mirror')
442
443 self.assertEqual(self.get()['status'], 200, 'init')
444
445 self.certificate()
446
447 self.add_tls(application='mirror')
448
449 (resp, sock) = self.post_ssl(
450 headers={
451 'Host': 'localhost',
452 'Connection': 'keep-alive',
453 'Content-Type': 'text/html',
454 },
455 start=True,
456 body='0123456789',
457 read_timeout=1,
458 )
459
460 self.assertEqual(resp['body'], '0123456789', 'keepalive 1')
461
462 resp = self.post_ssl(
463 headers={
464 'Host': 'localhost',
465 'Connection': 'close',
466 'Content-Type': 'text/html',
467 },
468 sock=sock,
469 body='0123456789',
470 )
471
472 self.assertEqual(resp['body'], '0123456789', 'keepalive 2')
473
416 def test_tls_reconfigure(self):
417 self.load('empty')
418
419 self.assertEqual(self.get()['status'], 200, 'init')
420
421 self.certificate()
422
423 (resp, sock) = self.get(
424 headers={'Host': 'localhost', 'Connection': 'keep-alive'},
425 start=True,
426 read_timeout=1,
427 )
428
429 self.assertEqual(resp['status'], 200, 'initial status')
430
431 self.add_tls()
432
433 self.assertEqual(
434 self.get(sock=sock)['status'], 200, 'reconfigure status'
435 )
436 self.assertEqual(
437 self.get_ssl()['status'], 200, 'reconfigure tls status'
438 )
439
440 def test_tls_keepalive(self):
441 self.load('mirror')
442
443 self.assertEqual(self.get()['status'], 200, 'init')
444
445 self.certificate()
446
447 self.add_tls(application='mirror')
448
449 (resp, sock) = self.post_ssl(
450 headers={
451 'Host': 'localhost',
452 'Connection': 'keep-alive',
453 'Content-Type': 'text/html',
454 },
455 start=True,
456 body='0123456789',
457 read_timeout=1,
458 )
459
460 self.assertEqual(resp['body'], '0123456789', 'keepalive 1')
461
462 resp = self.post_ssl(
463 headers={
464 'Host': 'localhost',
465 'Connection': 'close',
466 'Content-Type': 'text/html',
467 },
468 sock=sock,
469 body='0123456789',
470 )
471
472 self.assertEqual(resp['body'], '0123456789', 'keepalive 2')
473
474 @unittest.expectedFailure
474 @unittest.skip('not yet')
475 def test_tls_keepalive_certificate_remove(self):
476 self.load('empty')
477
478 self.assertEqual(self.get()['status'], 200, 'init')
479
480 self.certificate()
481
482 self.add_tls()
483
484 (resp, sock) = self.get_ssl(
485 headers={'Host': 'localhost', 'Connection': 'keep-alive'},
486 start=True,
487 read_timeout=1,
488 )
489
490 self.conf({"pass": "applications/empty"}, 'listeners/*:7080')
491 self.conf_delete('/certificates/default')
492
493 try:
494 resp = self.get_ssl(
495 headers={'Host': 'localhost', 'Connection': 'close'}, sock=sock
496 )
497 except:
498 resp = None
499
500 self.assertEqual(resp, None, 'keepalive remove certificate')
501
475 def test_tls_keepalive_certificate_remove(self):
476 self.load('empty')
477
478 self.assertEqual(self.get()['status'], 200, 'init')
479
480 self.certificate()
481
482 self.add_tls()
483
484 (resp, sock) = self.get_ssl(
485 headers={'Host': 'localhost', 'Connection': 'keep-alive'},
486 start=True,
487 read_timeout=1,
488 )
489
490 self.conf({"pass": "applications/empty"}, 'listeners/*:7080')
491 self.conf_delete('/certificates/default')
492
493 try:
494 resp = self.get_ssl(
495 headers={'Host': 'localhost', 'Connection': 'close'}, sock=sock
496 )
497 except:
498 resp = None
499
500 self.assertEqual(resp, None, 'keepalive remove certificate')
501
502 @unittest.expectedFailure
502 @unittest.skip('not yet')
503 def test_tls_certificates_remove_all(self):
504 self.load('empty')
505
506 self.certificate()
507
508 self.assertIn(
509 'success',
510 self.conf_delete('/certificates'),
511 'remove all certificates',
512 )
513
514 def test_tls_application_respawn(self):
515 self.skip_alerts.append(r'process \d+ exited on signal 9')
516 self.load('mirror')
517
518 self.assertEqual(self.get()['status'], 200, 'init')
519
520 self.certificate()
521
522 self.conf('1', 'applications/mirror/processes')
523
524 self.add_tls(application='mirror')
525
526 (resp, sock) = self.post_ssl(
527 headers={
528 'Host': 'localhost',
529 'Connection': 'keep-alive',
530 'Content-Type': 'text/html',
531 },
532 start=True,
533 body='0123456789',
534 read_timeout=1,
535 )
536
537 app_id = self.findall(r'(\d+)#\d+ "mirror" application started')[0]
538
539 subprocess.call(['kill', '-9', app_id])
540
541 self.wait_for_record(
542 re.compile(
543 ' (?!' + app_id + '#)(\d+)#\d+ "mirror" application started'
544 )
545 )
546
547 resp = self.post_ssl(
548 headers={
549 'Host': 'localhost',
550 'Connection': 'close',
551 'Content-Type': 'text/html',
552 },
553 sock=sock,
554 body='0123456789',
555 )
556
557 self.assertEqual(resp['status'], 200, 'application respawn status')
558 self.assertEqual(
559 resp['body'], '0123456789', 'application respawn body'
560 )
561
562 def test_tls_url_scheme(self):
563 self.load('variables')
564
565 self.assertEqual(
566 self.post(
567 headers={
568 'Host': 'localhost',
569 'Content-Type': 'text/html',
570 'Custom-Header': '',
571 'Connection': 'close',
572 }
573 )['headers']['Wsgi-Url-Scheme'],
574 'http',
575 'url scheme http',
576 )
577
578 self.certificate()
579
580 self.add_tls(application='variables')
581
582 self.assertEqual(
583 self.post_ssl(
584 headers={
585 'Host': 'localhost',
586 'Content-Type': 'text/html',
587 'Custom-Header': '',
588 'Connection': 'close',
589 }
590 )['headers']['Wsgi-Url-Scheme'],
591 'https',
592 'url scheme https',
593 )
594
595if __name__ == '__main__':
596 TestTLS.main()
503 def test_tls_certificates_remove_all(self):
504 self.load('empty')
505
506 self.certificate()
507
508 self.assertIn(
509 'success',
510 self.conf_delete('/certificates'),
511 'remove all certificates',
512 )
513
514 def test_tls_application_respawn(self):
515 self.skip_alerts.append(r'process \d+ exited on signal 9')
516 self.load('mirror')
517
518 self.assertEqual(self.get()['status'], 200, 'init')
519
520 self.certificate()
521
522 self.conf('1', 'applications/mirror/processes')
523
524 self.add_tls(application='mirror')
525
526 (resp, sock) = self.post_ssl(
527 headers={
528 'Host': 'localhost',
529 'Connection': 'keep-alive',
530 'Content-Type': 'text/html',
531 },
532 start=True,
533 body='0123456789',
534 read_timeout=1,
535 )
536
537 app_id = self.findall(r'(\d+)#\d+ "mirror" application started')[0]
538
539 subprocess.call(['kill', '-9', app_id])
540
541 self.wait_for_record(
542 re.compile(
543 ' (?!' + app_id + '#)(\d+)#\d+ "mirror" application started'
544 )
545 )
546
547 resp = self.post_ssl(
548 headers={
549 'Host': 'localhost',
550 'Connection': 'close',
551 'Content-Type': 'text/html',
552 },
553 sock=sock,
554 body='0123456789',
555 )
556
557 self.assertEqual(resp['status'], 200, 'application respawn status')
558 self.assertEqual(
559 resp['body'], '0123456789', 'application respawn body'
560 )
561
562 def test_tls_url_scheme(self):
563 self.load('variables')
564
565 self.assertEqual(
566 self.post(
567 headers={
568 'Host': 'localhost',
569 'Content-Type': 'text/html',
570 'Custom-Header': '',
571 'Connection': 'close',
572 }
573 )['headers']['Wsgi-Url-Scheme'],
574 'http',
575 'url scheme http',
576 )
577
578 self.certificate()
579
580 self.add_tls(application='variables')
581
582 self.assertEqual(
583 self.post_ssl(
584 headers={
585 'Host': 'localhost',
586 'Content-Type': 'text/html',
587 'Custom-Header': '',
588 'Connection': 'close',
589 }
590 )['headers']['Wsgi-Url-Scheme'],
591 'https',
592 'url scheme https',
593 )
594
595if __name__ == '__main__':
596 TestTLS.main()