test_tls.py (1706:a1da56837554) test_tls.py (1730:44912af5b3e6)
1import io
2import re
3import ssl
4import subprocess
5
6import pytest
7
1import io
2import re
3import ssl
4import subprocess
5
6import pytest
7
8from conftest import option
9from conftest import skip_alert
10from unit.applications.tls import TestApplicationTLS
8from conftest import skip_alert
9from unit.applications.tls import TestApplicationTLS
10from unit.option import option
11
12
13class TestTLS(TestApplicationTLS):
14 prerequisites = {'modules': {'python': 'any', 'openssl': 'any'}}
15
16 def findall(self, pattern):
17 with open(option.temp_dir + '/unit.log', 'r', errors='ignore') as f:
18 return re.findall(pattern, f.read())
19
20 def openssl_date_to_sec_epoch(self, date):
21 return self.date_to_sec_epoch(date, '%b %d %H:%M:%S %Y %Z')
22
23 def add_tls(self, application='empty', cert='default', port=7080):
24 self.conf(
25 {
26 "pass": "applications/" + application,
27 "tls": {"certificate": cert}
28 },
29 'listeners/*:' + str(port),
30 )
31
32 def remove_tls(self, application='empty', port=7080):
33 self.conf(
34 {"pass": "applications/" + application}, 'listeners/*:' + str(port)
35 )
36
37 def test_tls_listener_option_add(self):
38 self.load('empty')
39
40 self.certificate()
41
42 self.add_tls()
43
44 assert self.get_ssl()['status'] == 200, 'add listener option'
45
46 def test_tls_listener_option_remove(self):
47 self.load('empty')
48
49 self.certificate()
50
51 self.add_tls()
52
53 self.get_ssl()
54
55 self.remove_tls()
56
57 assert self.get()['status'] == 200, 'remove listener option'
58
59 def test_tls_certificate_remove(self):
60 self.load('empty')
61
62 self.certificate()
63
64 assert 'success' in self.conf_delete(
65 '/certificates/default'
66 ), 'remove certificate'
67
68 def test_tls_certificate_remove_used(self):
69 self.load('empty')
70
71 self.certificate()
72
73 self.add_tls()
74
75 assert 'error' in self.conf_delete(
76 '/certificates/default'
77 ), 'remove certificate'
78
79 def test_tls_certificate_remove_nonexisting(self):
80 self.load('empty')
81
82 self.certificate()
83
84 self.add_tls()
85
86 assert 'error' in self.conf_delete(
87 '/certificates/blah'
88 ), 'remove nonexistings certificate'
89
90 @pytest.mark.skip('not yet')
91 def test_tls_certificate_update(self):
92 self.load('empty')
93
94 self.certificate()
95
96 self.add_tls()
97
98 cert_old = self.get_server_certificate()
99
100 self.certificate()
101
102 assert cert_old != self.get_server_certificate(), 'update certificate'
103
104 @pytest.mark.skip('not yet')
105 def test_tls_certificate_key_incorrect(self):
106 self.load('empty')
107
108 self.certificate('first', False)
109 self.certificate('second', False)
110
111 assert 'error' in self.certificate_load(
112 'first', 'second'
113 ), 'key incorrect'
114
115 def test_tls_certificate_change(self):
116 self.load('empty')
117
118 self.certificate()
119 self.certificate('new')
120
121 self.add_tls()
122
123 cert_old = self.get_server_certificate()
124
125 self.add_tls(cert='new')
126
127 assert cert_old != self.get_server_certificate(), 'change certificate'
128
129 def test_tls_certificate_key_rsa(self):
130 self.load('empty')
131
132 self.certificate()
133
134 assert (
135 self.conf_get('/certificates/default/key') == 'RSA (2048 bits)'
136 ), 'certificate key rsa'
137
138 def test_tls_certificate_key_ec(self, temp_dir):
139 self.load('empty')
140
141 self.openssl_conf()
142
143 subprocess.call(
144 [
145 'openssl',
146 'ecparam',
147 '-noout',
148 '-genkey',
149 '-out',
150 temp_dir + '/ec.key',
151 '-name',
152 'prime256v1',
153 ],
154 stderr=subprocess.STDOUT,
155 )
156
157 subprocess.call(
158 [
159 'openssl',
160 'req',
161 '-x509',
162 '-new',
163 '-subj',
164 '/CN=ec/',
165 '-config',
166 temp_dir + '/openssl.conf',
167 '-key',
168 temp_dir + '/ec.key',
169 '-out',
170 temp_dir + '/ec.crt',
171 ],
172 stderr=subprocess.STDOUT,
173 )
174
175 self.certificate_load('ec')
176
177 assert (
178 self.conf_get('/certificates/ec/key') == 'ECDH'
179 ), 'certificate key ec'
180
181 def test_tls_certificate_chain_options(self):
182 self.load('empty')
183
184 self.certificate()
185
186 chain = self.conf_get('/certificates/default/chain')
187
188 assert len(chain) == 1, 'certificate chain length'
189
190 cert = chain[0]
191
192 assert (
193 cert['subject']['common_name'] == 'default'
194 ), 'certificate subject common name'
195 assert (
196 cert['issuer']['common_name'] == 'default'
197 ), 'certificate issuer common name'
198
199 assert (
200 abs(
201 self.sec_epoch()
202 - self.openssl_date_to_sec_epoch(cert['validity']['since'])
203 )
204 < 5
205 ), 'certificate validity since'
206 assert (
207 self.openssl_date_to_sec_epoch(cert['validity']['until'])
208 - self.openssl_date_to_sec_epoch(cert['validity']['since'])
209 == 2592000
210 ), 'certificate validity until'
211
212 def test_tls_certificate_chain(self, temp_dir):
213 self.load('empty')
214
215 self.certificate('root', False)
216
217 subprocess.call(
218 [
219 'openssl',
220 'req',
221 '-new',
222 '-subj',
223 '/CN=int/',
224 '-config',
225 temp_dir + '/openssl.conf',
226 '-out',
227 temp_dir + '/int.csr',
228 '-keyout',
229 temp_dir + '/int.key',
230 ],
231 stderr=subprocess.STDOUT,
232 )
233
234 subprocess.call(
235 [
236 'openssl',
237 'req',
238 '-new',
239 '-subj',
240 '/CN=end/',
241 '-config',
242 temp_dir + '/openssl.conf',
243 '-out',
244 temp_dir + '/end.csr',
245 '-keyout',
246 temp_dir + '/end.key',
247 ],
248 stderr=subprocess.STDOUT,
249 )
250
251 with open(temp_dir + '/ca.conf', 'w') as f:
252 f.write(
253 """[ ca ]
254default_ca = myca
255
256[ myca ]
257new_certs_dir = %(dir)s
258database = %(database)s
259default_md = sha256
260policy = myca_policy
261serial = %(certserial)s
262default_days = 1
263x509_extensions = myca_extensions
264
265[ myca_policy ]
266commonName = supplied
267
268[ myca_extensions ]
269basicConstraints = critical,CA:TRUE"""
270 % {
271 'dir': temp_dir,
272 'database': temp_dir + '/certindex',
273 'certserial': temp_dir + '/certserial',
274 }
275 )
276
277 with open(temp_dir + '/certserial', 'w') as f:
278 f.write('1000')
279
280 with open(temp_dir + '/certindex', 'w') as f:
281 f.write('')
282
283 subprocess.call(
284 [
285 'openssl',
286 'ca',
287 '-batch',
288 '-subj',
289 '/CN=int/',
290 '-config',
291 temp_dir + '/ca.conf',
292 '-keyfile',
293 temp_dir + '/root.key',
294 '-cert',
295 temp_dir + '/root.crt',
296 '-in',
297 temp_dir + '/int.csr',
298 '-out',
299 temp_dir + '/int.crt',
300 ],
301 stderr=subprocess.STDOUT,
302 )
303
304 subprocess.call(
305 [
306 'openssl',
307 'ca',
308 '-batch',
309 '-subj',
310 '/CN=end/',
311 '-config',
312 temp_dir + '/ca.conf',
313 '-keyfile',
314 temp_dir + '/int.key',
315 '-cert',
316 temp_dir + '/int.crt',
317 '-in',
318 temp_dir + '/end.csr',
319 '-out',
320 temp_dir + '/end.crt',
321 ],
322 stderr=subprocess.STDOUT,
323 )
324
325 crt_path = temp_dir + '/end-int.crt'
326 end_path = temp_dir + '/end.crt'
327 int_path = temp_dir + '/int.crt'
328
329 with open(crt_path, 'wb') as crt, open(end_path, 'rb') as end, open(
330 int_path, 'rb'
331 ) as int:
332 crt.write(end.read() + int.read())
333
334 self.context = ssl.create_default_context()
335 self.context.check_hostname = False
336 self.context.verify_mode = ssl.CERT_REQUIRED
337 self.context.load_verify_locations(temp_dir + '/root.crt')
338
339 # incomplete chain
340
341 assert 'success' in self.certificate_load(
342 'end', 'end'
343 ), 'certificate chain end upload'
344
345 chain = self.conf_get('/certificates/end/chain')
346 assert len(chain) == 1, 'certificate chain end length'
347 assert (
348 chain[0]['subject']['common_name'] == 'end'
349 ), 'certificate chain end subject common name'
350 assert (
351 chain[0]['issuer']['common_name'] == 'int'
352 ), 'certificate chain end issuer common name'
353
354 self.add_tls(cert='end')
355
356 try:
357 resp = self.get_ssl()
358 except ssl.SSLError:
359 resp = None
360
361 assert resp == None, 'certificate chain incomplete chain'
362
363 # intermediate
364
365 assert 'success' in self.certificate_load(
366 'int', 'int'
367 ), 'certificate chain int upload'
368
369 chain = self.conf_get('/certificates/int/chain')
370 assert len(chain) == 1, 'certificate chain int length'
371 assert (
372 chain[0]['subject']['common_name'] == 'int'
373 ), 'certificate chain int subject common name'
374 assert (
375 chain[0]['issuer']['common_name'] == 'root'
376 ), 'certificate chain int issuer common name'
377
378 self.add_tls(cert='int')
379
380 assert (
381 self.get_ssl()['status'] == 200
382 ), 'certificate chain intermediate'
383
384 # intermediate server
385
386 assert 'success' in self.certificate_load(
387 'end-int', 'end'
388 ), 'certificate chain end-int upload'
389
390 chain = self.conf_get('/certificates/end-int/chain')
391 assert len(chain) == 2, 'certificate chain end-int length'
392 assert (
393 chain[0]['subject']['common_name'] == 'end'
394 ), 'certificate chain end-int int subject common name'
395 assert (
396 chain[0]['issuer']['common_name'] == 'int'
397 ), 'certificate chain end-int int issuer common name'
398 assert (
399 chain[1]['subject']['common_name'] == 'int'
400 ), 'certificate chain end-int end subject common name'
401 assert (
402 chain[1]['issuer']['common_name'] == 'root'
403 ), 'certificate chain end-int end issuer common name'
404
405 self.add_tls(cert='end-int')
406
407 assert (
408 self.get_ssl()['status'] == 200
409 ), 'certificate chain intermediate server'
410
411 @pytest.mark.skip('not yet')
412 def test_tls_reconfigure(self):
413 self.load('empty')
414
415 assert self.get()['status'] == 200, 'init'
416
417 self.certificate()
418
419 (resp, sock) = self.get(
420 headers={'Host': 'localhost', 'Connection': 'keep-alive'},
421 start=True,
422 read_timeout=1,
423 )
424
425 assert resp['status'] == 200, 'initial status'
426
427 self.add_tls()
428
429 assert self.get(sock=sock)['status'] == 200, 'reconfigure status'
430 assert self.get_ssl()['status'] == 200, 'reconfigure tls status'
431
432 def test_tls_keepalive(self):
433 self.load('mirror')
434
435 assert self.get()['status'] == 200, 'init'
436
437 self.certificate()
438
439 self.add_tls(application='mirror')
440
441 (resp, sock) = self.post_ssl(
442 headers={
443 'Host': 'localhost',
444 'Connection': 'keep-alive',
445 'Content-Type': 'text/html',
446 },
447 start=True,
448 body='0123456789',
449 read_timeout=1,
450 )
451
452 assert resp['body'] == '0123456789', 'keepalive 1'
453
454 resp = self.post_ssl(
455 headers={
456 'Host': 'localhost',
457 'Connection': 'close',
458 'Content-Type': 'text/html',
459 },
460 sock=sock,
461 body='0123456789',
462 )
463
464 assert resp['body'] == '0123456789', 'keepalive 2'
465
466 @pytest.mark.skip('not yet')
467 def test_tls_keepalive_certificate_remove(self):
468 self.load('empty')
469
470 assert self.get()['status'] == 200, 'init'
471
472 self.certificate()
473
474 self.add_tls()
475
476 (resp, sock) = self.get_ssl(
477 headers={'Host': 'localhost', 'Connection': 'keep-alive'},
478 start=True,
479 read_timeout=1,
480 )
481
482 self.conf({"pass": "applications/empty"}, 'listeners/*:7080')
483 self.conf_delete('/certificates/default')
484
485 try:
486 resp = self.get_ssl(
487 headers={'Host': 'localhost', 'Connection': 'close'}, sock=sock
488 )
489
490 except KeyboardInterrupt:
491 raise
492
493 except:
494 resp = None
495
496 assert resp == None, 'keepalive remove certificate'
497
498 @pytest.mark.skip('not yet')
499 def test_tls_certificates_remove_all(self):
500 self.load('empty')
501
502 self.certificate()
503
504 assert 'success' in self.conf_delete(
505 '/certificates'
506 ), 'remove all certificates'
507
508 def test_tls_application_respawn(self):
509 self.load('mirror')
510
511 self.certificate()
512
513 self.conf('1', 'applications/mirror/processes')
514
515 self.add_tls(application='mirror')
516
517 (_, sock) = self.post_ssl(
518 headers={
519 'Host': 'localhost',
520 'Connection': 'keep-alive',
521 'Content-Type': 'text/html',
522 },
523 start=True,
524 body='0123456789',
525 read_timeout=1,
526 )
527
528 app_id = self.findall(r'(\d+)#\d+ "mirror" application started')[0]
529
530 subprocess.call(['kill', '-9', app_id])
531
532 skip_alert(r'process %s exited on signal 9' % app_id)
533
534 self.wait_for_record(
535 re.compile(
536 r' (?!' + app_id + r'#)(\d+)#\d+ "mirror" application started'
537 )
538 )
539
540 resp = self.post_ssl(
541 headers={
542 'Host': 'localhost',
543 'Connection': 'close',
544 'Content-Type': 'text/html',
545 },
546 sock=sock,
547 body='0123456789',
548 )
549
550 assert resp['status'] == 200, 'application respawn status'
551 assert resp['body'] == '0123456789', 'application respawn body'
552
553 def test_tls_url_scheme(self):
554 self.load('variables')
555
556 assert (
557 self.post(
558 headers={
559 'Host': 'localhost',
560 'Content-Type': 'text/html',
561 'Custom-Header': '',
562 'Connection': 'close',
563 }
564 )['headers']['Wsgi-Url-Scheme']
565 == 'http'
566 ), 'url scheme http'
567
568 self.certificate()
569
570 self.add_tls(application='variables')
571
572 assert (
573 self.post_ssl(
574 headers={
575 'Host': 'localhost',
576 'Content-Type': 'text/html',
577 'Custom-Header': '',
578 'Connection': 'close',
579 }
580 )['headers']['Wsgi-Url-Scheme']
581 == 'https'
582 ), 'url scheme https'
583
584 def test_tls_big_upload(self):
585 self.load('upload')
586
587 self.certificate()
588
589 self.add_tls(application='upload')
590
591 filename = 'test.txt'
592 data = '0123456789' * 9000
593
594 res = self.post_ssl(
595 body={
596 'file': {
597 'filename': filename,
598 'type': 'text/plain',
599 'data': io.StringIO(data),
600 }
601 }
602 )
603 assert res['status'] == 200, 'status ok'
604 assert res['body'] == filename + data
11
12
13class TestTLS(TestApplicationTLS):
14 prerequisites = {'modules': {'python': 'any', 'openssl': 'any'}}
15
16 def findall(self, pattern):
17 with open(option.temp_dir + '/unit.log', 'r', errors='ignore') as f:
18 return re.findall(pattern, f.read())
19
20 def openssl_date_to_sec_epoch(self, date):
21 return self.date_to_sec_epoch(date, '%b %d %H:%M:%S %Y %Z')
22
23 def add_tls(self, application='empty', cert='default', port=7080):
24 self.conf(
25 {
26 "pass": "applications/" + application,
27 "tls": {"certificate": cert}
28 },
29 'listeners/*:' + str(port),
30 )
31
32 def remove_tls(self, application='empty', port=7080):
33 self.conf(
34 {"pass": "applications/" + application}, 'listeners/*:' + str(port)
35 )
36
37 def test_tls_listener_option_add(self):
38 self.load('empty')
39
40 self.certificate()
41
42 self.add_tls()
43
44 assert self.get_ssl()['status'] == 200, 'add listener option'
45
46 def test_tls_listener_option_remove(self):
47 self.load('empty')
48
49 self.certificate()
50
51 self.add_tls()
52
53 self.get_ssl()
54
55 self.remove_tls()
56
57 assert self.get()['status'] == 200, 'remove listener option'
58
59 def test_tls_certificate_remove(self):
60 self.load('empty')
61
62 self.certificate()
63
64 assert 'success' in self.conf_delete(
65 '/certificates/default'
66 ), 'remove certificate'
67
68 def test_tls_certificate_remove_used(self):
69 self.load('empty')
70
71 self.certificate()
72
73 self.add_tls()
74
75 assert 'error' in self.conf_delete(
76 '/certificates/default'
77 ), 'remove certificate'
78
79 def test_tls_certificate_remove_nonexisting(self):
80 self.load('empty')
81
82 self.certificate()
83
84 self.add_tls()
85
86 assert 'error' in self.conf_delete(
87 '/certificates/blah'
88 ), 'remove nonexistings certificate'
89
90 @pytest.mark.skip('not yet')
91 def test_tls_certificate_update(self):
92 self.load('empty')
93
94 self.certificate()
95
96 self.add_tls()
97
98 cert_old = self.get_server_certificate()
99
100 self.certificate()
101
102 assert cert_old != self.get_server_certificate(), 'update certificate'
103
104 @pytest.mark.skip('not yet')
105 def test_tls_certificate_key_incorrect(self):
106 self.load('empty')
107
108 self.certificate('first', False)
109 self.certificate('second', False)
110
111 assert 'error' in self.certificate_load(
112 'first', 'second'
113 ), 'key incorrect'
114
115 def test_tls_certificate_change(self):
116 self.load('empty')
117
118 self.certificate()
119 self.certificate('new')
120
121 self.add_tls()
122
123 cert_old = self.get_server_certificate()
124
125 self.add_tls(cert='new')
126
127 assert cert_old != self.get_server_certificate(), 'change certificate'
128
129 def test_tls_certificate_key_rsa(self):
130 self.load('empty')
131
132 self.certificate()
133
134 assert (
135 self.conf_get('/certificates/default/key') == 'RSA (2048 bits)'
136 ), 'certificate key rsa'
137
138 def test_tls_certificate_key_ec(self, temp_dir):
139 self.load('empty')
140
141 self.openssl_conf()
142
143 subprocess.call(
144 [
145 'openssl',
146 'ecparam',
147 '-noout',
148 '-genkey',
149 '-out',
150 temp_dir + '/ec.key',
151 '-name',
152 'prime256v1',
153 ],
154 stderr=subprocess.STDOUT,
155 )
156
157 subprocess.call(
158 [
159 'openssl',
160 'req',
161 '-x509',
162 '-new',
163 '-subj',
164 '/CN=ec/',
165 '-config',
166 temp_dir + '/openssl.conf',
167 '-key',
168 temp_dir + '/ec.key',
169 '-out',
170 temp_dir + '/ec.crt',
171 ],
172 stderr=subprocess.STDOUT,
173 )
174
175 self.certificate_load('ec')
176
177 assert (
178 self.conf_get('/certificates/ec/key') == 'ECDH'
179 ), 'certificate key ec'
180
181 def test_tls_certificate_chain_options(self):
182 self.load('empty')
183
184 self.certificate()
185
186 chain = self.conf_get('/certificates/default/chain')
187
188 assert len(chain) == 1, 'certificate chain length'
189
190 cert = chain[0]
191
192 assert (
193 cert['subject']['common_name'] == 'default'
194 ), 'certificate subject common name'
195 assert (
196 cert['issuer']['common_name'] == 'default'
197 ), 'certificate issuer common name'
198
199 assert (
200 abs(
201 self.sec_epoch()
202 - self.openssl_date_to_sec_epoch(cert['validity']['since'])
203 )
204 < 5
205 ), 'certificate validity since'
206 assert (
207 self.openssl_date_to_sec_epoch(cert['validity']['until'])
208 - self.openssl_date_to_sec_epoch(cert['validity']['since'])
209 == 2592000
210 ), 'certificate validity until'
211
212 def test_tls_certificate_chain(self, temp_dir):
213 self.load('empty')
214
215 self.certificate('root', False)
216
217 subprocess.call(
218 [
219 'openssl',
220 'req',
221 '-new',
222 '-subj',
223 '/CN=int/',
224 '-config',
225 temp_dir + '/openssl.conf',
226 '-out',
227 temp_dir + '/int.csr',
228 '-keyout',
229 temp_dir + '/int.key',
230 ],
231 stderr=subprocess.STDOUT,
232 )
233
234 subprocess.call(
235 [
236 'openssl',
237 'req',
238 '-new',
239 '-subj',
240 '/CN=end/',
241 '-config',
242 temp_dir + '/openssl.conf',
243 '-out',
244 temp_dir + '/end.csr',
245 '-keyout',
246 temp_dir + '/end.key',
247 ],
248 stderr=subprocess.STDOUT,
249 )
250
251 with open(temp_dir + '/ca.conf', 'w') as f:
252 f.write(
253 """[ ca ]
254default_ca = myca
255
256[ myca ]
257new_certs_dir = %(dir)s
258database = %(database)s
259default_md = sha256
260policy = myca_policy
261serial = %(certserial)s
262default_days = 1
263x509_extensions = myca_extensions
264
265[ myca_policy ]
266commonName = supplied
267
268[ myca_extensions ]
269basicConstraints = critical,CA:TRUE"""
270 % {
271 'dir': temp_dir,
272 'database': temp_dir + '/certindex',
273 'certserial': temp_dir + '/certserial',
274 }
275 )
276
277 with open(temp_dir + '/certserial', 'w') as f:
278 f.write('1000')
279
280 with open(temp_dir + '/certindex', 'w') as f:
281 f.write('')
282
283 subprocess.call(
284 [
285 'openssl',
286 'ca',
287 '-batch',
288 '-subj',
289 '/CN=int/',
290 '-config',
291 temp_dir + '/ca.conf',
292 '-keyfile',
293 temp_dir + '/root.key',
294 '-cert',
295 temp_dir + '/root.crt',
296 '-in',
297 temp_dir + '/int.csr',
298 '-out',
299 temp_dir + '/int.crt',
300 ],
301 stderr=subprocess.STDOUT,
302 )
303
304 subprocess.call(
305 [
306 'openssl',
307 'ca',
308 '-batch',
309 '-subj',
310 '/CN=end/',
311 '-config',
312 temp_dir + '/ca.conf',
313 '-keyfile',
314 temp_dir + '/int.key',
315 '-cert',
316 temp_dir + '/int.crt',
317 '-in',
318 temp_dir + '/end.csr',
319 '-out',
320 temp_dir + '/end.crt',
321 ],
322 stderr=subprocess.STDOUT,
323 )
324
325 crt_path = temp_dir + '/end-int.crt'
326 end_path = temp_dir + '/end.crt'
327 int_path = temp_dir + '/int.crt'
328
329 with open(crt_path, 'wb') as crt, open(end_path, 'rb') as end, open(
330 int_path, 'rb'
331 ) as int:
332 crt.write(end.read() + int.read())
333
334 self.context = ssl.create_default_context()
335 self.context.check_hostname = False
336 self.context.verify_mode = ssl.CERT_REQUIRED
337 self.context.load_verify_locations(temp_dir + '/root.crt')
338
339 # incomplete chain
340
341 assert 'success' in self.certificate_load(
342 'end', 'end'
343 ), 'certificate chain end upload'
344
345 chain = self.conf_get('/certificates/end/chain')
346 assert len(chain) == 1, 'certificate chain end length'
347 assert (
348 chain[0]['subject']['common_name'] == 'end'
349 ), 'certificate chain end subject common name'
350 assert (
351 chain[0]['issuer']['common_name'] == 'int'
352 ), 'certificate chain end issuer common name'
353
354 self.add_tls(cert='end')
355
356 try:
357 resp = self.get_ssl()
358 except ssl.SSLError:
359 resp = None
360
361 assert resp == None, 'certificate chain incomplete chain'
362
363 # intermediate
364
365 assert 'success' in self.certificate_load(
366 'int', 'int'
367 ), 'certificate chain int upload'
368
369 chain = self.conf_get('/certificates/int/chain')
370 assert len(chain) == 1, 'certificate chain int length'
371 assert (
372 chain[0]['subject']['common_name'] == 'int'
373 ), 'certificate chain int subject common name'
374 assert (
375 chain[0]['issuer']['common_name'] == 'root'
376 ), 'certificate chain int issuer common name'
377
378 self.add_tls(cert='int')
379
380 assert (
381 self.get_ssl()['status'] == 200
382 ), 'certificate chain intermediate'
383
384 # intermediate server
385
386 assert 'success' in self.certificate_load(
387 'end-int', 'end'
388 ), 'certificate chain end-int upload'
389
390 chain = self.conf_get('/certificates/end-int/chain')
391 assert len(chain) == 2, 'certificate chain end-int length'
392 assert (
393 chain[0]['subject']['common_name'] == 'end'
394 ), 'certificate chain end-int int subject common name'
395 assert (
396 chain[0]['issuer']['common_name'] == 'int'
397 ), 'certificate chain end-int int issuer common name'
398 assert (
399 chain[1]['subject']['common_name'] == 'int'
400 ), 'certificate chain end-int end subject common name'
401 assert (
402 chain[1]['issuer']['common_name'] == 'root'
403 ), 'certificate chain end-int end issuer common name'
404
405 self.add_tls(cert='end-int')
406
407 assert (
408 self.get_ssl()['status'] == 200
409 ), 'certificate chain intermediate server'
410
411 @pytest.mark.skip('not yet')
412 def test_tls_reconfigure(self):
413 self.load('empty')
414
415 assert self.get()['status'] == 200, 'init'
416
417 self.certificate()
418
419 (resp, sock) = self.get(
420 headers={'Host': 'localhost', 'Connection': 'keep-alive'},
421 start=True,
422 read_timeout=1,
423 )
424
425 assert resp['status'] == 200, 'initial status'
426
427 self.add_tls()
428
429 assert self.get(sock=sock)['status'] == 200, 'reconfigure status'
430 assert self.get_ssl()['status'] == 200, 'reconfigure tls status'
431
432 def test_tls_keepalive(self):
433 self.load('mirror')
434
435 assert self.get()['status'] == 200, 'init'
436
437 self.certificate()
438
439 self.add_tls(application='mirror')
440
441 (resp, sock) = self.post_ssl(
442 headers={
443 'Host': 'localhost',
444 'Connection': 'keep-alive',
445 'Content-Type': 'text/html',
446 },
447 start=True,
448 body='0123456789',
449 read_timeout=1,
450 )
451
452 assert resp['body'] == '0123456789', 'keepalive 1'
453
454 resp = self.post_ssl(
455 headers={
456 'Host': 'localhost',
457 'Connection': 'close',
458 'Content-Type': 'text/html',
459 },
460 sock=sock,
461 body='0123456789',
462 )
463
464 assert resp['body'] == '0123456789', 'keepalive 2'
465
466 @pytest.mark.skip('not yet')
467 def test_tls_keepalive_certificate_remove(self):
468 self.load('empty')
469
470 assert self.get()['status'] == 200, 'init'
471
472 self.certificate()
473
474 self.add_tls()
475
476 (resp, sock) = self.get_ssl(
477 headers={'Host': 'localhost', 'Connection': 'keep-alive'},
478 start=True,
479 read_timeout=1,
480 )
481
482 self.conf({"pass": "applications/empty"}, 'listeners/*:7080')
483 self.conf_delete('/certificates/default')
484
485 try:
486 resp = self.get_ssl(
487 headers={'Host': 'localhost', 'Connection': 'close'}, sock=sock
488 )
489
490 except KeyboardInterrupt:
491 raise
492
493 except:
494 resp = None
495
496 assert resp == None, 'keepalive remove certificate'
497
498 @pytest.mark.skip('not yet')
499 def test_tls_certificates_remove_all(self):
500 self.load('empty')
501
502 self.certificate()
503
504 assert 'success' in self.conf_delete(
505 '/certificates'
506 ), 'remove all certificates'
507
508 def test_tls_application_respawn(self):
509 self.load('mirror')
510
511 self.certificate()
512
513 self.conf('1', 'applications/mirror/processes')
514
515 self.add_tls(application='mirror')
516
517 (_, sock) = self.post_ssl(
518 headers={
519 'Host': 'localhost',
520 'Connection': 'keep-alive',
521 'Content-Type': 'text/html',
522 },
523 start=True,
524 body='0123456789',
525 read_timeout=1,
526 )
527
528 app_id = self.findall(r'(\d+)#\d+ "mirror" application started')[0]
529
530 subprocess.call(['kill', '-9', app_id])
531
532 skip_alert(r'process %s exited on signal 9' % app_id)
533
534 self.wait_for_record(
535 re.compile(
536 r' (?!' + app_id + r'#)(\d+)#\d+ "mirror" application started'
537 )
538 )
539
540 resp = self.post_ssl(
541 headers={
542 'Host': 'localhost',
543 'Connection': 'close',
544 'Content-Type': 'text/html',
545 },
546 sock=sock,
547 body='0123456789',
548 )
549
550 assert resp['status'] == 200, 'application respawn status'
551 assert resp['body'] == '0123456789', 'application respawn body'
552
553 def test_tls_url_scheme(self):
554 self.load('variables')
555
556 assert (
557 self.post(
558 headers={
559 'Host': 'localhost',
560 'Content-Type': 'text/html',
561 'Custom-Header': '',
562 'Connection': 'close',
563 }
564 )['headers']['Wsgi-Url-Scheme']
565 == 'http'
566 ), 'url scheme http'
567
568 self.certificate()
569
570 self.add_tls(application='variables')
571
572 assert (
573 self.post_ssl(
574 headers={
575 'Host': 'localhost',
576 'Content-Type': 'text/html',
577 'Custom-Header': '',
578 'Connection': 'close',
579 }
580 )['headers']['Wsgi-Url-Scheme']
581 == 'https'
582 ), 'url scheme https'
583
584 def test_tls_big_upload(self):
585 self.load('upload')
586
587 self.certificate()
588
589 self.add_tls(application='upload')
590
591 filename = 'test.txt'
592 data = '0123456789' * 9000
593
594 res = self.post_ssl(
595 body={
596 'file': {
597 'filename': filename,
598 'type': 'text/plain',
599 'data': io.StringIO(data),
600 }
601 }
602 )
603 assert res['status'] == 200, 'status ok'
604 assert res['body'] == filename + data