test_tls.py (1477:b93d1acf81bd) test_tls.py (1596:b7e2d4d92624)
1import io
1import io
2import pytest
2import re
3import ssl
4import subprocess
3import re
4import ssl
5import subprocess
5import unittest
6
7from unit.applications.tls import TestApplicationTLS
6
7from unit.applications.tls import TestApplicationTLS
8from conftest import skip_alert
8
9
10class TestTLS(TestApplicationTLS):
11 prerequisites = {'modules': {'python': 'any', 'openssl': 'any'}}
12
13 def findall(self, pattern):
9
10
11class TestTLS(TestApplicationTLS):
12 prerequisites = {'modules': {'python': 'any', 'openssl': 'any'}}
13
14 def findall(self, pattern):
14 with open(self.testdir + '/unit.log', 'r', errors='ignore') as f:
15 with open(self.temp_dir + '/unit.log', 'r', errors='ignore') as f:
15 return re.findall(pattern, f.read())
16
17 def openssl_date_to_sec_epoch(self, date):
18 return self.date_to_sec_epoch(date, '%b %d %H:%M:%S %Y %Z')
19
20 def add_tls(self, application='empty', cert='default', port=7080):
21 self.conf(
22 {

--- 10 unchanged lines hidden (view full) ---

33
34 def test_tls_listener_option_add(self):
35 self.load('empty')
36
37 self.certificate()
38
39 self.add_tls()
40
16 return re.findall(pattern, f.read())
17
18 def openssl_date_to_sec_epoch(self, date):
19 return self.date_to_sec_epoch(date, '%b %d %H:%M:%S %Y %Z')
20
21 def add_tls(self, application='empty', cert='default', port=7080):
22 self.conf(
23 {

--- 10 unchanged lines hidden (view full) ---

34
35 def test_tls_listener_option_add(self):
36 self.load('empty')
37
38 self.certificate()
39
40 self.add_tls()
41
41 self.assertEqual(self.get_ssl()['status'], 200, 'add listener option')
42 assert self.get_ssl()['status'] == 200, 'add listener option'
42
43 def test_tls_listener_option_remove(self):
44 self.load('empty')
45
46 self.certificate()
47
48 self.add_tls()
49
50 self.get_ssl()
51
52 self.remove_tls()
53
43
44 def test_tls_listener_option_remove(self):
45 self.load('empty')
46
47 self.certificate()
48
49 self.add_tls()
50
51 self.get_ssl()
52
53 self.remove_tls()
54
54 self.assertEqual(self.get()['status'], 200, 'remove listener option')
55 assert self.get()['status'] == 200, 'remove listener option'
55
56 def test_tls_certificate_remove(self):
57 self.load('empty')
58
59 self.certificate()
60
56
57 def test_tls_certificate_remove(self):
58 self.load('empty')
59
60 self.certificate()
61
61 self.assertIn(
62 'success',
63 self.conf_delete('/certificates/default'),
64 'remove certificate',
65 )
62 assert 'success' in self.conf_delete(
63 '/certificates/default'
64 ), 'remove certificate'
66
67 def test_tls_certificate_remove_used(self):
68 self.load('empty')
69
70 self.certificate()
71
72 self.add_tls()
73
65
66 def test_tls_certificate_remove_used(self):
67 self.load('empty')
68
69 self.certificate()
70
71 self.add_tls()
72
74 self.assertIn(
75 'error',
76 self.conf_delete('/certificates/default'),
77 'remove certificate',
78 )
73 assert 'error' in self.conf_delete(
74 '/certificates/default'
75 ), 'remove certificate'
79
80 def test_tls_certificate_remove_nonexisting(self):
81 self.load('empty')
82
83 self.certificate()
84
85 self.add_tls()
86
76
77 def test_tls_certificate_remove_nonexisting(self):
78 self.load('empty')
79
80 self.certificate()
81
82 self.add_tls()
83
87 self.assertIn(
88 'error',
89 self.conf_delete('/certificates/blah'),
90 'remove nonexistings certificate',
91 )
84 assert 'error' in self.conf_delete(
85 '/certificates/blah'
86 ), 'remove nonexistings certificate'
92
87
93 @unittest.skip('not yet')
88 @pytest.mark.skip('not yet')
94 def test_tls_certificate_update(self):
95 self.load('empty')
96
97 self.certificate()
98
99 self.add_tls()
100
101 cert_old = self.get_server_certificate()
102
103 self.certificate()
104
89 def test_tls_certificate_update(self):
90 self.load('empty')
91
92 self.certificate()
93
94 self.add_tls()
95
96 cert_old = self.get_server_certificate()
97
98 self.certificate()
99
105 self.assertNotEqual(
106 cert_old, self.get_server_certificate(), 'update certificate'
107 )
100 assert cert_old != self.get_server_certificate(), 'update certificate'
108
101
109 @unittest.skip('not yet')
102 @pytest.mark.skip('not yet')
110 def test_tls_certificate_key_incorrect(self):
111 self.load('empty')
112
113 self.certificate('first', False)
114 self.certificate('second', False)
115
103 def test_tls_certificate_key_incorrect(self):
104 self.load('empty')
105
106 self.certificate('first', False)
107 self.certificate('second', False)
108
116 self.assertIn(
117 'error', self.certificate_load('first', 'second'), 'key incorrect'
118 )
109 assert 'error' in self.certificate_load(
110 'first', 'second'
111 ), 'key incorrect'
119
120 def test_tls_certificate_change(self):
121 self.load('empty')
122
123 self.certificate()
124 self.certificate('new')
125
126 self.add_tls()
127
128 cert_old = self.get_server_certificate()
129
130 self.add_tls(cert='new')
131
112
113 def test_tls_certificate_change(self):
114 self.load('empty')
115
116 self.certificate()
117 self.certificate('new')
118
119 self.add_tls()
120
121 cert_old = self.get_server_certificate()
122
123 self.add_tls(cert='new')
124
132 self.assertNotEqual(
133 cert_old, self.get_server_certificate(), 'change certificate'
134 )
125 assert cert_old != self.get_server_certificate(), 'change certificate'
135
136 def test_tls_certificate_key_rsa(self):
137 self.load('empty')
138
139 self.certificate()
140
126
127 def test_tls_certificate_key_rsa(self):
128 self.load('empty')
129
130 self.certificate()
131
141 self.assertEqual(
142 self.conf_get('/certificates/default/key'),
143 'RSA (2048 bits)',
144 'certificate key rsa',
145 )
132 assert (
133 self.conf_get('/certificates/default/key') == 'RSA (2048 bits)'
134 ), 'certificate key rsa'
146
147 def test_tls_certificate_key_ec(self):
148 self.load('empty')
149
150 self.openssl_conf()
151
152 subprocess.call(
153 [
154 'openssl',
155 'ecparam',
156 '-noout',
157 '-genkey',
135
136 def test_tls_certificate_key_ec(self):
137 self.load('empty')
138
139 self.openssl_conf()
140
141 subprocess.call(
142 [
143 'openssl',
144 'ecparam',
145 '-noout',
146 '-genkey',
158 '-out', self.testdir + '/ec.key',
159 '-name', 'prime256v1',
147 '-out',
148 self.temp_dir + '/ec.key',
149 '-name',
150 'prime256v1',
160 ],
161 stderr=subprocess.STDOUT,
162 )
163
164 subprocess.call(
165 [
166 'openssl',
167 'req',
168 '-x509',
169 '-new',
151 ],
152 stderr=subprocess.STDOUT,
153 )
154
155 subprocess.call(
156 [
157 'openssl',
158 'req',
159 '-x509',
160 '-new',
170 '-subj', '/CN=ec/',
171 '-config', self.testdir + '/openssl.conf',
172 '-key', self.testdir + '/ec.key',
173 '-out', self.testdir + '/ec.crt',
161 '-subj',
162 '/CN=ec/',
163 '-config',
164 self.temp_dir + '/openssl.conf',
165 '-key',
166 self.temp_dir + '/ec.key',
167 '-out',
168 self.temp_dir + '/ec.crt',
174 ],
175 stderr=subprocess.STDOUT,
176 )
177
178 self.certificate_load('ec')
179
169 ],
170 stderr=subprocess.STDOUT,
171 )
172
173 self.certificate_load('ec')
174
180 self.assertEqual(
181 self.conf_get('/certificates/ec/key'), 'ECDH', 'certificate key ec'
182 )
175 assert (
176 self.conf_get('/certificates/ec/key') == 'ECDH'
177 ), 'certificate key ec'
183
184 def test_tls_certificate_chain_options(self):
185 self.load('empty')
186
187 self.certificate()
188
189 chain = self.conf_get('/certificates/default/chain')
190
178
179 def test_tls_certificate_chain_options(self):
180 self.load('empty')
181
182 self.certificate()
183
184 chain = self.conf_get('/certificates/default/chain')
185
191 self.assertEqual(len(chain), 1, 'certificate chain length')
186 assert len(chain) == 1, 'certificate chain length'
192
193 cert = chain[0]
194
187
188 cert = chain[0]
189
195 self.assertEqual(
196 cert['subject']['common_name'],
197 'default',
198 'certificate subject common name',
199 )
200 self.assertEqual(
201 cert['issuer']['common_name'],
202 'default',
203 'certificate issuer common name',
204 )
190 assert (
191 cert['subject']['common_name'] == 'default'
192 ), 'certificate subject common name'
193 assert (
194 cert['issuer']['common_name'] == 'default'
195 ), 'certificate issuer common name'
205
196
206 self.assertLess(
197 assert (
207 abs(
208 self.sec_epoch()
209 - self.openssl_date_to_sec_epoch(cert['validity']['since'])
198 abs(
199 self.sec_epoch()
200 - self.openssl_date_to_sec_epoch(cert['validity']['since'])
210 ),
211 5,
212 'certificate validity since',
213 )
214 self.assertEqual(
201 )
202 < 5
203 ), 'certificate validity since'
204 assert (
215 self.openssl_date_to_sec_epoch(cert['validity']['until'])
205 self.openssl_date_to_sec_epoch(cert['validity']['until'])
216 - self.openssl_date_to_sec_epoch(cert['validity']['since']),
217 2592000,
218 'certificate validity until',
219 )
206 - self.openssl_date_to_sec_epoch(cert['validity']['since'])
207 == 2592000
208 ), 'certificate validity until'
220
221 def test_tls_certificate_chain(self):
222 self.load('empty')
223
224 self.certificate('root', False)
225
226 subprocess.call(
227 [
228 'openssl',
229 'req',
230 '-new',
209
210 def test_tls_certificate_chain(self):
211 self.load('empty')
212
213 self.certificate('root', False)
214
215 subprocess.call(
216 [
217 'openssl',
218 'req',
219 '-new',
231 '-subj', '/CN=int/',
232 '-config', self.testdir + '/openssl.conf',
233 '-out', self.testdir + '/int.csr',
234 '-keyout', self.testdir + '/int.key',
220 '-subj',
221 '/CN=int/',
222 '-config',
223 self.temp_dir + '/openssl.conf',
224 '-out',
225 self.temp_dir + '/int.csr',
226 '-keyout',
227 self.temp_dir + '/int.key',
235 ],
236 stderr=subprocess.STDOUT,
237 )
238
239 subprocess.call(
240 [
241 'openssl',
242 'req',
243 '-new',
228 ],
229 stderr=subprocess.STDOUT,
230 )
231
232 subprocess.call(
233 [
234 'openssl',
235 'req',
236 '-new',
244 '-subj', '/CN=end/',
245 '-config', self.testdir + '/openssl.conf',
246 '-out', self.testdir + '/end.csr',
247 '-keyout', self.testdir + '/end.key',
237 '-subj',
238 '/CN=end/',
239 '-config',
240 self.temp_dir + '/openssl.conf',
241 '-out',
242 self.temp_dir + '/end.csr',
243 '-keyout',
244 self.temp_dir + '/end.key',
248 ],
249 stderr=subprocess.STDOUT,
250 )
251
245 ],
246 stderr=subprocess.STDOUT,
247 )
248
252 with open(self.testdir + '/ca.conf', 'w') as f:
249 with open(self.temp_dir + '/ca.conf', 'w') as f:
253 f.write(
254 """[ ca ]
255default_ca = myca
256
257[ myca ]
258new_certs_dir = %(dir)s
259database = %(database)s
260default_md = sha256
261policy = myca_policy
262serial = %(certserial)s
263default_days = 1
264x509_extensions = myca_extensions
265
266[ myca_policy ]
267commonName = supplied
268
269[ myca_extensions ]
270basicConstraints = critical,CA:TRUE"""
271 % {
250 f.write(
251 """[ ca ]
252default_ca = myca
253
254[ myca ]
255new_certs_dir = %(dir)s
256database = %(database)s
257default_md = sha256
258policy = myca_policy
259serial = %(certserial)s
260default_days = 1
261x509_extensions = myca_extensions
262
263[ myca_policy ]
264commonName = supplied
265
266[ myca_extensions ]
267basicConstraints = critical,CA:TRUE"""
268 % {
272 'dir': self.testdir,
273 'database': self.testdir + '/certindex',
274 'certserial': self.testdir + '/certserial',
269 'dir': self.temp_dir,
270 'database': self.temp_dir + '/certindex',
271 'certserial': self.temp_dir + '/certserial',
275 }
276 )
277
272 }
273 )
274
278 with open(self.testdir + '/certserial', 'w') as f:
275 with open(self.temp_dir + '/certserial', 'w') as f:
279 f.write('1000')
280
276 f.write('1000')
277
281 with open(self.testdir + '/certindex', 'w') as f:
278 with open(self.temp_dir + '/certindex', 'w') as f:
282 f.write('')
283
284 subprocess.call(
285 [
286 'openssl',
287 'ca',
288 '-batch',
279 f.write('')
280
281 subprocess.call(
282 [
283 'openssl',
284 'ca',
285 '-batch',
289 '-subj', '/CN=int/',
290 '-config', self.testdir + '/ca.conf',
291 '-keyfile', self.testdir + '/root.key',
292 '-cert', self.testdir + '/root.crt',
293 '-in', self.testdir + '/int.csr',
294 '-out', self.testdir + '/int.crt',
286 '-subj',
287 '/CN=int/',
288 '-config',
289 self.temp_dir + '/ca.conf',
290 '-keyfile',
291 self.temp_dir + '/root.key',
292 '-cert',
293 self.temp_dir + '/root.crt',
294 '-in',
295 self.temp_dir + '/int.csr',
296 '-out',
297 self.temp_dir + '/int.crt',
295 ],
296 stderr=subprocess.STDOUT,
297 )
298
299 subprocess.call(
300 [
301 'openssl',
302 'ca',
303 '-batch',
298 ],
299 stderr=subprocess.STDOUT,
300 )
301
302 subprocess.call(
303 [
304 'openssl',
305 'ca',
306 '-batch',
304 '-subj', '/CN=end/',
305 '-config', self.testdir + '/ca.conf',
306 '-keyfile', self.testdir + '/int.key',
307 '-cert', self.testdir + '/int.crt',
308 '-in', self.testdir + '/end.csr',
309 '-out', self.testdir + '/end.crt',
307 '-subj',
308 '/CN=end/',
309 '-config',
310 self.temp_dir + '/ca.conf',
311 '-keyfile',
312 self.temp_dir + '/int.key',
313 '-cert',
314 self.temp_dir + '/int.crt',
315 '-in',
316 self.temp_dir + '/end.csr',
317 '-out',
318 self.temp_dir + '/end.crt',
310 ],
311 stderr=subprocess.STDOUT,
312 )
313
319 ],
320 stderr=subprocess.STDOUT,
321 )
322
314 crt_path = self.testdir + '/end-int.crt'
315 end_path = self.testdir + '/end.crt'
316 int_path = self.testdir + '/int.crt'
323 crt_path = self.temp_dir + '/end-int.crt'
324 end_path = self.temp_dir + '/end.crt'
325 int_path = self.temp_dir + '/int.crt'
317
326
318 with open(crt_path, 'wb') as crt, \
319 open(end_path, 'rb') as end, \
320 open(int_path, 'rb') as int:
327 with open(crt_path, 'wb') as crt, open(end_path, 'rb') as end, open(
328 int_path, 'rb'
329 ) as int:
321 crt.write(end.read() + int.read())
322
323 self.context = ssl.create_default_context()
324 self.context.check_hostname = False
325 self.context.verify_mode = ssl.CERT_REQUIRED
330 crt.write(end.read() + int.read())
331
332 self.context = ssl.create_default_context()
333 self.context.check_hostname = False
334 self.context.verify_mode = ssl.CERT_REQUIRED
326 self.context.load_verify_locations(self.testdir + '/root.crt')
335 self.context.load_verify_locations(self.temp_dir + '/root.crt')
327
328 # incomplete chain
329
336
337 # incomplete chain
338
330 self.assertIn(
331 'success',
332 self.certificate_load('end', 'end'),
333 'certificate chain end upload',
334 )
339 assert 'success' in self.certificate_load(
340 'end', 'end'
341 ), 'certificate chain end upload'
335
336 chain = self.conf_get('/certificates/end/chain')
342
343 chain = self.conf_get('/certificates/end/chain')
337 self.assertEqual(len(chain), 1, 'certificate chain end length')
338 self.assertEqual(
339 chain[0]['subject']['common_name'],
340 'end',
341 'certificate chain end subject common name',
342 )
343 self.assertEqual(
344 chain[0]['issuer']['common_name'],
345 'int',
346 'certificate chain end issuer common name',
347 )
344 assert len(chain) == 1, 'certificate chain end length'
345 assert (
346 chain[0]['subject']['common_name'] == 'end'
347 ), 'certificate chain end subject common name'
348 assert (
349 chain[0]['issuer']['common_name'] == 'int'
350 ), 'certificate chain end issuer common name'
348
349 self.add_tls(cert='end')
350
351 try:
352 resp = self.get_ssl()
353 except ssl.SSLError:
354 resp = None
355
351
352 self.add_tls(cert='end')
353
354 try:
355 resp = self.get_ssl()
356 except ssl.SSLError:
357 resp = None
358
356 self.assertEqual(resp, None, 'certificate chain incomplete chain')
359 assert resp == None, 'certificate chain incomplete chain'
357
358 # intermediate
359
360
361 # intermediate
362
360 self.assertIn(
361 'success',
362 self.certificate_load('int', 'int'),
363 'certificate chain int upload',
364 )
363 assert 'success' in self.certificate_load(
364 'int', 'int'
365 ), 'certificate chain int upload'
365
366 chain = self.conf_get('/certificates/int/chain')
366
367 chain = self.conf_get('/certificates/int/chain')
367 self.assertEqual(len(chain), 1, 'certificate chain int length')
368 self.assertEqual(
369 chain[0]['subject']['common_name'],
370 'int',
371 'certificate chain int subject common name',
372 )
373 self.assertEqual(
374 chain[0]['issuer']['common_name'],
375 'root',
376 'certificate chain int issuer common name',
377 )
368 assert len(chain) == 1, 'certificate chain int length'
369 assert (
370 chain[0]['subject']['common_name'] == 'int'
371 ), 'certificate chain int subject common name'
372 assert (
373 chain[0]['issuer']['common_name'] == 'root'
374 ), 'certificate chain int issuer common name'
378
379 self.add_tls(cert='int')
380
375
376 self.add_tls(cert='int')
377
381 self.assertEqual(
382 self.get_ssl()['status'], 200, 'certificate chain intermediate'
383 )
378 assert (
379 self.get_ssl()['status'] == 200
380 ), 'certificate chain intermediate'
384
385 # intermediate server
386
381
382 # intermediate server
383
387 self.assertIn(
388 'success',
389 self.certificate_load('end-int', 'end'),
390 'certificate chain end-int upload',
391 )
384 assert 'success' in self.certificate_load(
385 'end-int', 'end'
386 ), 'certificate chain end-int upload'
392
393 chain = self.conf_get('/certificates/end-int/chain')
387
388 chain = self.conf_get('/certificates/end-int/chain')
394 self.assertEqual(len(chain), 2, 'certificate chain end-int length')
395 self.assertEqual(
396 chain[0]['subject']['common_name'],
397 'end',
398 'certificate chain end-int int subject common name',
399 )
400 self.assertEqual(
401 chain[0]['issuer']['common_name'],
402 'int',
403 'certificate chain end-int int issuer common name',
404 )
405 self.assertEqual(
406 chain[1]['subject']['common_name'],
407 'int',
408 'certificate chain end-int end subject common name',
409 )
410 self.assertEqual(
411 chain[1]['issuer']['common_name'],
412 'root',
413 'certificate chain end-int end issuer common name',
414 )
389 assert len(chain) == 2, 'certificate chain end-int length'
390 assert (
391 chain[0]['subject']['common_name'] == 'end'
392 ), 'certificate chain end-int int subject common name'
393 assert (
394 chain[0]['issuer']['common_name'] == 'int'
395 ), 'certificate chain end-int int issuer common name'
396 assert (
397 chain[1]['subject']['common_name'] == 'int'
398 ), 'certificate chain end-int end subject common name'
399 assert (
400 chain[1]['issuer']['common_name'] == 'root'
401 ), 'certificate chain end-int end issuer common name'
415
416 self.add_tls(cert='end-int')
417
402
403 self.add_tls(cert='end-int')
404
418 self.assertEqual(
419 self.get_ssl()['status'],
420 200,
421 'certificate chain intermediate server',
422 )
405 assert (
406 self.get_ssl()['status'] == 200
407 ), 'certificate chain intermediate server'
423
408
424 @unittest.skip('not yet')
409 @pytest.mark.skip('not yet')
425 def test_tls_reconfigure(self):
426 self.load('empty')
427
410 def test_tls_reconfigure(self):
411 self.load('empty')
412
428 self.assertEqual(self.get()['status'], 200, 'init')
413 assert self.get()['status'] == 200, 'init'
429
430 self.certificate()
431
432 (resp, sock) = self.get(
433 headers={'Host': 'localhost', 'Connection': 'keep-alive'},
434 start=True,
435 read_timeout=1,
436 )
437
414
415 self.certificate()
416
417 (resp, sock) = self.get(
418 headers={'Host': 'localhost', 'Connection': 'keep-alive'},
419 start=True,
420 read_timeout=1,
421 )
422
438 self.assertEqual(resp['status'], 200, 'initial status')
423 assert resp['status'] == 200, 'initial status'
439
440 self.add_tls()
441
424
425 self.add_tls()
426
442 self.assertEqual(
443 self.get(sock=sock)['status'], 200, 'reconfigure status'
444 )
445 self.assertEqual(
446 self.get_ssl()['status'], 200, 'reconfigure tls status'
447 )
427 assert self.get(sock=sock)['status'] == 200, 'reconfigure status'
428 assert self.get_ssl()['status'] == 200, 'reconfigure tls status'
448
449 def test_tls_keepalive(self):
450 self.load('mirror')
451
429
430 def test_tls_keepalive(self):
431 self.load('mirror')
432
452 self.assertEqual(self.get()['status'], 200, 'init')
433 assert self.get()['status'] == 200, 'init'
453
454 self.certificate()
455
456 self.add_tls(application='mirror')
457
458 (resp, sock) = self.post_ssl(
459 headers={
460 'Host': 'localhost',
461 'Connection': 'keep-alive',
462 'Content-Type': 'text/html',
463 },
464 start=True,
465 body='0123456789',
466 read_timeout=1,
467 )
468
434
435 self.certificate()
436
437 self.add_tls(application='mirror')
438
439 (resp, sock) = self.post_ssl(
440 headers={
441 'Host': 'localhost',
442 'Connection': 'keep-alive',
443 'Content-Type': 'text/html',
444 },
445 start=True,
446 body='0123456789',
447 read_timeout=1,
448 )
449
469 self.assertEqual(resp['body'], '0123456789', 'keepalive 1')
450 assert resp['body'] == '0123456789', 'keepalive 1'
470
471 resp = self.post_ssl(
472 headers={
473 'Host': 'localhost',
474 'Connection': 'close',
475 'Content-Type': 'text/html',
476 },
477 sock=sock,
478 body='0123456789',
479 )
480
451
452 resp = self.post_ssl(
453 headers={
454 'Host': 'localhost',
455 'Connection': 'close',
456 'Content-Type': 'text/html',
457 },
458 sock=sock,
459 body='0123456789',
460 )
461
481 self.assertEqual(resp['body'], '0123456789', 'keepalive 2')
462 assert resp['body'] == '0123456789', 'keepalive 2'
482
463
483 @unittest.skip('not yet')
464 @pytest.mark.skip('not yet')
484 def test_tls_keepalive_certificate_remove(self):
485 self.load('empty')
486
465 def test_tls_keepalive_certificate_remove(self):
466 self.load('empty')
467
487 self.assertEqual(self.get()['status'], 200, 'init')
468 assert self.get()['status'] == 200, 'init'
488
489 self.certificate()
490
491 self.add_tls()
492
493 (resp, sock) = self.get_ssl(
494 headers={'Host': 'localhost', 'Connection': 'keep-alive'},
495 start=True,

--- 5 unchanged lines hidden (view full) ---

501
502 try:
503 resp = self.get_ssl(
504 headers={'Host': 'localhost', 'Connection': 'close'}, sock=sock
505 )
506 except:
507 resp = None
508
469
470 self.certificate()
471
472 self.add_tls()
473
474 (resp, sock) = self.get_ssl(
475 headers={'Host': 'localhost', 'Connection': 'keep-alive'},
476 start=True,

--- 5 unchanged lines hidden (view full) ---

482
483 try:
484 resp = self.get_ssl(
485 headers={'Host': 'localhost', 'Connection': 'close'}, sock=sock
486 )
487 except:
488 resp = None
489
509 self.assertEqual(resp, None, 'keepalive remove certificate')
490 assert resp == None, 'keepalive remove certificate'
510
491
511 @unittest.skip('not yet')
492 @pytest.mark.skip('not yet')
512 def test_tls_certificates_remove_all(self):
513 self.load('empty')
514
515 self.certificate()
516
493 def test_tls_certificates_remove_all(self):
494 self.load('empty')
495
496 self.certificate()
497
517 self.assertIn(
518 'success',
519 self.conf_delete('/certificates'),
520 'remove all certificates',
521 )
498 assert 'success' in self.conf_delete(
499 '/certificates'
500 ), 'remove all certificates'
522
523 def test_tls_application_respawn(self):
524 self.load('mirror')
525
526 self.certificate()
527
528 self.conf('1', 'applications/mirror/processes')
529

--- 9 unchanged lines hidden (view full) ---

539 body='0123456789',
540 read_timeout=1,
541 )
542
543 app_id = self.findall(r'(\d+)#\d+ "mirror" application started')[0]
544
545 subprocess.call(['kill', '-9', app_id])
546
501
502 def test_tls_application_respawn(self):
503 self.load('mirror')
504
505 self.certificate()
506
507 self.conf('1', 'applications/mirror/processes')
508

--- 9 unchanged lines hidden (view full) ---

518 body='0123456789',
519 read_timeout=1,
520 )
521
522 app_id = self.findall(r'(\d+)#\d+ "mirror" application started')[0]
523
524 subprocess.call(['kill', '-9', app_id])
525
547 self.skip_alerts.append(r'process %s exited on signal 9' % app_id)
526 skip_alert(r'process %s exited on signal 9' % app_id)
548
549 self.wait_for_record(
550 re.compile(
551 ' (?!' + app_id + '#)(\d+)#\d+ "mirror" application started'
552 )
553 )
554
555 resp = self.post_ssl(
556 headers={
557 'Host': 'localhost',
558 'Connection': 'close',
559 'Content-Type': 'text/html',
560 },
561 sock=sock,
562 body='0123456789',
563 )
564
527
528 self.wait_for_record(
529 re.compile(
530 ' (?!' + app_id + '#)(\d+)#\d+ "mirror" application started'
531 )
532 )
533
534 resp = self.post_ssl(
535 headers={
536 'Host': 'localhost',
537 'Connection': 'close',
538 'Content-Type': 'text/html',
539 },
540 sock=sock,
541 body='0123456789',
542 )
543
565 self.assertEqual(resp['status'], 200, 'application respawn status')
566 self.assertEqual(
567 resp['body'], '0123456789', 'application respawn body'
568 )
544 assert resp['status'] == 200, 'application respawn status'
545 assert resp['body'] == '0123456789', 'application respawn body'
569
570 def test_tls_url_scheme(self):
571 self.load('variables')
572
546
547 def test_tls_url_scheme(self):
548 self.load('variables')
549
573 self.assertEqual(
550 assert (
574 self.post(
575 headers={
576 'Host': 'localhost',
577 'Content-Type': 'text/html',
578 'Custom-Header': '',
579 'Connection': 'close',
580 }
551 self.post(
552 headers={
553 'Host': 'localhost',
554 'Content-Type': 'text/html',
555 'Custom-Header': '',
556 'Connection': 'close',
557 }
581 )['headers']['Wsgi-Url-Scheme'],
582 'http',
583 'url scheme http',
584 )
558 )['headers']['Wsgi-Url-Scheme']
559 == 'http'
560 ), 'url scheme http'
585
586 self.certificate()
587
588 self.add_tls(application='variables')
589
561
562 self.certificate()
563
564 self.add_tls(application='variables')
565
590 self.assertEqual(
566 assert (
591 self.post_ssl(
592 headers={
593 'Host': 'localhost',
594 'Content-Type': 'text/html',
595 'Custom-Header': '',
596 'Connection': 'close',
597 }
567 self.post_ssl(
568 headers={
569 'Host': 'localhost',
570 'Content-Type': 'text/html',
571 'Custom-Header': '',
572 'Connection': 'close',
573 }
598 )['headers']['Wsgi-Url-Scheme'],
599 'https',
600 'url scheme https',
601 )
574 )['headers']['Wsgi-Url-Scheme']
575 == 'https'
576 ), 'url scheme https'
602
603 def test_tls_big_upload(self):
604 self.load('upload')
605
606 self.certificate()
607
608 self.add_tls(application='upload')
609
610 filename = 'test.txt'
611 data = '0123456789' * 9000
612
577
578 def test_tls_big_upload(self):
579 self.load('upload')
580
581 self.certificate()
582
583 self.add_tls(application='upload')
584
585 filename = 'test.txt'
586 data = '0123456789' * 9000
587
613 res = self.post_ssl(body={
614 'file': {
615 'filename': filename,
616 'type': 'text/plain',
617 'data': io.StringIO(data),
588 res = self.post_ssl(
589 body={
590 'file': {
591 'filename': filename,
592 'type': 'text/plain',
593 'data': io.StringIO(data),
594 }
618 }
595 }
619 })
620 self.assertEqual(res['status'], 200, 'status ok')
621 self.assertEqual(res['body'], filename + data)
622
623if __name__ == '__main__':
624 TestTLS.main()
596 )
597 assert res['status'] == 200, 'status ok'
598 assert res['body'] == filename + data