tls.py (1621:275b26bbd3ae) tls.py (1635:97afbb6c5a15)
1import os
1import os
2import re
3import ssl
4import subprocess
5
2import ssl
3import subprocess
4
6from unit.applications.proto import TestApplicationProto
7from conftest import option
5from conftest import option
6from unit.applications.proto import TestApplicationProto
8
9
10class TestApplicationTLS(TestApplicationProto):
11 def setup_method(self):
12 super().setup_method()
13
14 self.context = ssl.create_default_context()
15 self.context.check_hostname = False
16 self.context.verify_mode = ssl.CERT_NONE
17
18 def certificate(self, name='default', load=True):
19 self.openssl_conf()
20
21 subprocess.call(
22 [
23 'openssl',
24 'req',
25 '-x509',
26 '-new',
27 '-subj', '/CN=' + name + '/',
28 '-config', self.temp_dir + '/openssl.conf',
29 '-out', self.temp_dir + '/' + name + '.crt',
30 '-keyout', self.temp_dir + '/' + name + '.key',
31 ],
32 stderr=subprocess.STDOUT,
33 )
34
35 if load:
36 self.certificate_load(name)
37
38 def certificate_load(self, crt, key=None):
39 if key is None:
40 key = crt
41
42 key_path = self.temp_dir + '/' + key + '.key'
43 crt_path = self.temp_dir + '/' + crt + '.crt'
44
45 with open(key_path, 'rb') as k, open(crt_path, 'rb') as c:
46 return self.conf(k.read() + c.read(), '/certificates/' + crt)
47
48 def get_ssl(self, **kwargs):
49 return self.get(wrapper=self.context.wrap_socket, **kwargs)
50
51 def post_ssl(self, **kwargs):
52 return self.post(wrapper=self.context.wrap_socket, **kwargs)
53
54 def get_server_certificate(self, addr=('127.0.0.1', 7080)):
55
56 ssl_list = dir(ssl)
57
58 if 'PROTOCOL_TLS' in ssl_list:
59 ssl_version = ssl.PROTOCOL_TLS
60
61 elif 'PROTOCOL_TLSv1_2' in ssl_list:
62 ssl_version = ssl.PROTOCOL_TLSv1_2
63
64 else:
65 ssl_version = ssl.PROTOCOL_TLSv1_1
66
67 return ssl.get_server_certificate(addr, ssl_version=ssl_version)
68
69 def openssl_conf(self):
70 conf_path = self.temp_dir + '/openssl.conf'
71
72 if os.path.exists(conf_path):
73 return
74
75 with open(conf_path, 'w') as f:
76 f.write(
77 """[ req ]
78default_bits = 2048
79encrypt_key = no
80distinguished_name = req_distinguished_name
81[ req_distinguished_name ]"""
82 )
83
84 def load(self, script, name=None):
85 if name is None:
86 name = script
87
88 script_path = option.test_dir + '/python/' + script
89
90 self._load_conf(
91 {
92 "listeners": {"*:7080": {"pass": "applications/" + name}},
93 "applications": {
94 name: {
95 "type": "python",
96 "processes": {"spare": 0},
97 "path": script_path,
98 "working_directory": script_path,
99 "module": "wsgi",
100 }
101 },
102 }
103 )
7
8
9class TestApplicationTLS(TestApplicationProto):
10 def setup_method(self):
11 super().setup_method()
12
13 self.context = ssl.create_default_context()
14 self.context.check_hostname = False
15 self.context.verify_mode = ssl.CERT_NONE
16
17 def certificate(self, name='default', load=True):
18 self.openssl_conf()
19
20 subprocess.call(
21 [
22 'openssl',
23 'req',
24 '-x509',
25 '-new',
26 '-subj', '/CN=' + name + '/',
27 '-config', self.temp_dir + '/openssl.conf',
28 '-out', self.temp_dir + '/' + name + '.crt',
29 '-keyout', self.temp_dir + '/' + name + '.key',
30 ],
31 stderr=subprocess.STDOUT,
32 )
33
34 if load:
35 self.certificate_load(name)
36
37 def certificate_load(self, crt, key=None):
38 if key is None:
39 key = crt
40
41 key_path = self.temp_dir + '/' + key + '.key'
42 crt_path = self.temp_dir + '/' + crt + '.crt'
43
44 with open(key_path, 'rb') as k, open(crt_path, 'rb') as c:
45 return self.conf(k.read() + c.read(), '/certificates/' + crt)
46
47 def get_ssl(self, **kwargs):
48 return self.get(wrapper=self.context.wrap_socket, **kwargs)
49
50 def post_ssl(self, **kwargs):
51 return self.post(wrapper=self.context.wrap_socket, **kwargs)
52
53 def get_server_certificate(self, addr=('127.0.0.1', 7080)):
54
55 ssl_list = dir(ssl)
56
57 if 'PROTOCOL_TLS' in ssl_list:
58 ssl_version = ssl.PROTOCOL_TLS
59
60 elif 'PROTOCOL_TLSv1_2' in ssl_list:
61 ssl_version = ssl.PROTOCOL_TLSv1_2
62
63 else:
64 ssl_version = ssl.PROTOCOL_TLSv1_1
65
66 return ssl.get_server_certificate(addr, ssl_version=ssl_version)
67
68 def openssl_conf(self):
69 conf_path = self.temp_dir + '/openssl.conf'
70
71 if os.path.exists(conf_path):
72 return
73
74 with open(conf_path, 'w') as f:
75 f.write(
76 """[ req ]
77default_bits = 2048
78encrypt_key = no
79distinguished_name = req_distinguished_name
80[ req_distinguished_name ]"""
81 )
82
83 def load(self, script, name=None):
84 if name is None:
85 name = script
86
87 script_path = option.test_dir + '/python/' + script
88
89 self._load_conf(
90 {
91 "listeners": {"*:7080": {"pass": "applications/" + name}},
92 "applications": {
93 name: {
94 "type": "python",
95 "processes": {"spare": 0},
96 "path": script_path,
97 "working_directory": script_path,
98 "module": "wsgi",
99 }
100 },
101 }
102 )