1import io
2import re
3import ssl
4import subprocess
5
6import pytest
7
8from unit.applications.tls import TestApplicationTLS
9from unit.option import option
10
11
12class TestTLS(TestApplicationTLS):
13 prerequisites = {'modules': {'python': 'any', 'openssl': 'any'}}
14
15 def openssl_date_to_sec_epoch(self, date):
16 return self.date_to_sec_epoch(date, '%b %d %H:%M:%S %Y %Z')
17

--- 6 unchanged lines hidden (view full) ---

24 'listeners/*:' + str(port),
25 )
26
27 def remove_tls(self, application='empty', port=7080):
28 assert 'success' in self.conf(
29 {"pass": "applications/" + application}, 'listeners/*:' + str(port)
30 )
31
32 def req(self, name='localhost', subject=None, x509=False):
33 subj = subject if subject is not None else '/CN=' + name + '/'
34
35 subprocess.call(
36 [
37 'openssl',
38 'req',
39 '-new',
40 '-subj',
41 subj,
42 '-config',
43 option.temp_dir + '/openssl.conf',
44 '-out',
45 option.temp_dir + '/' + name + '.csr',
46 '-keyout',
47 option.temp_dir + '/' + name + '.key',
48 ],
49 stderr=subprocess.STDOUT,
50 )
51
52 def generate_ca_conf(self):
53 with open(option.temp_dir + '/ca.conf', 'w') as f:
54 f.write(
55 """[ ca ]
56default_ca = myca
57
58[ myca ]
59new_certs_dir = %(dir)s
60database = %(database)s
61default_md = sha256
62policy = myca_policy
63serial = %(certserial)s
64default_days = 1
65x509_extensions = myca_extensions
66copy_extensions = copy
67
68[ myca_policy ]
69commonName = optional
70
71[ myca_extensions ]
72basicConstraints = critical,CA:TRUE"""
73 % {
74 'dir': option.temp_dir,
75 'database': option.temp_dir + '/certindex',
76 'certserial': option.temp_dir + '/certserial',
77 }
78 )
79
80 with open(option.temp_dir + '/certserial', 'w') as f:
81 f.write('1000')
82
83 with open(option.temp_dir + '/certindex', 'w') as f:
84 f.write('')
85
86 with open(option.temp_dir + '/certindex.attr', 'w') as f:
87 f.write('')
88
89 def ca(self, cert='root', out='localhost'):
90 subprocess.call(
91 [
92 'openssl',
93 'ca',
94 '-batch',
95 '-config',
96 option.temp_dir + '/ca.conf',
97 '-keyfile',
98 option.temp_dir + '/' + cert + '.key',
99 '-cert',
100 option.temp_dir + '/' + cert + '.crt',
101 '-in',
102 option.temp_dir + '/' + out + '.csr',
103 '-out',
104 option.temp_dir + '/' + out + '.crt',
105 ],
106 stderr=subprocess.STDOUT,
107 )
108
109 def set_certificate_req_context(self, cert='root'):
110 self.context = ssl.create_default_context()
111 self.context.check_hostname = False
112 self.context.verify_mode = ssl.CERT_REQUIRED
113 self.context.load_verify_locations(
114 option.temp_dir + '/' + cert + '.crt'
115 )
116
117 def test_tls_listener_option_add(self):
118 self.load('empty')
119
120 self.certificate()
121
122 self.add_tls()
123
124 assert self.get_ssl()['status'] == 200, 'add listener option'

--- 164 unchanged lines hidden (view full) ---

289 == 2592000
290 ), 'certificate validity until'
291
292 def test_tls_certificate_chain(self, temp_dir):
293 self.load('empty')
294
295 self.certificate('root', False)
296
211 subprocess.call(
212 [
213 'openssl',
214 'req',
215 '-new',
216 '-subj',
217 '/CN=int/',
218 '-config',
219 temp_dir + '/openssl.conf',
220 '-out',
221 temp_dir + '/int.csr',
222 '-keyout',
223 temp_dir + '/int.key',
224 ],
225 stderr=subprocess.STDOUT,
226 )
297 self.req('int')
298 self.req('end')
299
228 subprocess.call(
229 [
230 'openssl',
231 'req',
232 '-new',
233 '-subj',
234 '/CN=end/',
235 '-config',
236 temp_dir + '/openssl.conf',
237 '-out',
238 temp_dir + '/end.csr',
239 '-keyout',
240 temp_dir + '/end.key',
241 ],
242 stderr=subprocess.STDOUT,
243 )
300 self.generate_ca_conf()
301
245 with open(temp_dir + '/ca.conf', 'w') as f:
246 f.write(
247 """[ ca ]
248default_ca = myca
302 self.ca(cert='root', out='int')
303 self.ca(cert='int', out='end')
304
250[ myca ]
251new_certs_dir = %(dir)s
252database = %(database)s
253default_md = sha256
254policy = myca_policy
255serial = %(certserial)s
256default_days = 1
257x509_extensions = myca_extensions
258
259[ myca_policy ]
260commonName = supplied
261
262[ myca_extensions ]
263basicConstraints = critical,CA:TRUE"""
264 % {
265 'dir': temp_dir,
266 'database': temp_dir + '/certindex',
267 'certserial': temp_dir + '/certserial',
268 }
269 )
270
271 with open(temp_dir + '/certserial', 'w') as f:
272 f.write('1000')
273
274 with open(temp_dir + '/certindex', 'w') as f:
275 f.write('')
276
277 subprocess.call(
278 [
279 'openssl',
280 'ca',
281 '-batch',
282 '-subj',
283 '/CN=int/',
284 '-config',
285 temp_dir + '/ca.conf',
286 '-keyfile',
287 temp_dir + '/root.key',
288 '-cert',
289 temp_dir + '/root.crt',
290 '-in',
291 temp_dir + '/int.csr',
292 '-out',
293 temp_dir + '/int.crt',
294 ],
295 stderr=subprocess.STDOUT,
296 )
297
298 subprocess.call(
299 [
300 'openssl',
301 'ca',
302 '-batch',
303 '-subj',
304 '/CN=end/',
305 '-config',
306 temp_dir + '/ca.conf',
307 '-keyfile',
308 temp_dir + '/int.key',
309 '-cert',
310 temp_dir + '/int.crt',
311 '-in',
312 temp_dir + '/end.csr',
313 '-out',
314 temp_dir + '/end.crt',
315 ],
316 stderr=subprocess.STDOUT,
317 )
318
305 crt_path = temp_dir + '/end-int.crt'
306 end_path = temp_dir + '/end.crt'
307 int_path = temp_dir + '/int.crt'
308
309 with open(crt_path, 'wb') as crt, open(end_path, 'rb') as end, open(
310 int_path, 'rb'
311 ) as int:
312 crt.write(end.read() + int.read())
313
328 self.context = ssl.create_default_context()
329 self.context.check_hostname = False
330 self.context.verify_mode = ssl.CERT_REQUIRED
331 self.context.load_verify_locations(temp_dir + '/root.crt')
314 self.set_certificate_req_context()
315
316 # incomplete chain
317
318 assert 'success' in self.certificate_load(
319 'end', 'end'
320 ), 'certificate chain end upload'
321
322 chain = self.conf_get('/certificates/end/chain')

--- 57 unchanged lines hidden (view full) ---

380 ), 'certificate chain end-int end issuer common name'
381
382 self.add_tls(cert='end-int')
383
384 assert (
385 self.get_ssl()['status'] == 200
386 ), 'certificate chain intermediate server'
387
388 def test_tls_certificate_empty_cn(self, temp_dir):
389 self.certificate('root', False)
390
391 self.req(subject='/')
392
393 self.generate_ca_conf()
394 self.ca()
395
396 self.set_certificate_req_context()
397
398 assert 'success' in self.certificate_load('localhost', 'localhost')
399
400 cert = self.conf_get('/certificates/localhost')
401 assert cert['chain'][0]['subject'] == {}, 'empty subject'
402 assert cert['chain'][0]['issuer']['common_name'] == 'root', 'issuer'
403
404 def test_tls_certificate_empty_cn_san(self, temp_dir):
405 self.certificate('root', False)
406
407 self.openssl_conf(
408 rewrite=True, alt_names=["example.com", "www.example.net"]
409 )
410
411 self.req(subject='/')
412
413 self.generate_ca_conf()
414 self.ca()
415
416 self.set_certificate_req_context()
417
418 assert 'success' in self.certificate_load('localhost', 'localhost')
419
420 cert = self.conf_get('/certificates/localhost')
421 assert cert['chain'][0]['subject'] == {
422 'alt_names': ['example.com', 'www.example.net']
423 }, 'subject alt_names'
424 assert cert['chain'][0]['issuer']['common_name'] == 'root', 'issuer'
425
426 @pytest.mark.skip('not yet')
427 def test_tls_reconfigure(self):
428 self.load('empty')
429
430 assert self.get()['status'] == 200, 'init'
431
432 self.certificate()
433

--- 188 unchanged lines hidden ---