xref: /unit/test/unit/applications/tls.py (revision 1866:483157530489)
1import os
2import ssl
3import subprocess
4
5from unit.applications.proto import TestApplicationProto
6from unit.option import option
7
8
9class TestApplicationTLS(TestApplicationProto):
10    def setup_method(self):
11        self.context = ssl.create_default_context()
12        self.context.check_hostname = False
13        self.context.verify_mode = ssl.CERT_NONE
14
15    def certificate(self, name='default', load=True):
16        self.openssl_conf()
17
18        subprocess.call(
19            [
20                'openssl',
21                'req',
22                '-x509',
23                '-new',
24                '-subj',
25                '/CN=' + name + '/',
26                '-config',
27                option.temp_dir + '/openssl.conf',
28                '-out',
29                option.temp_dir + '/' + name + '.crt',
30                '-keyout',
31                option.temp_dir + '/' + name + '.key',
32            ],
33            stderr=subprocess.STDOUT,
34        )
35
36        if load:
37            self.certificate_load(name)
38
39    def certificate_load(self, crt, key=None):
40        if key is None:
41            key = crt
42
43        key_path = option.temp_dir + '/' + key + '.key'
44        crt_path = option.temp_dir + '/' + crt + '.crt'
45
46        with open(key_path, 'rb') as k, open(crt_path, 'rb') as c:
47            return self.conf(k.read() + c.read(), '/certificates/' + crt)
48
49    def get_ssl(self, **kwargs):
50        return self.get(wrapper=self.context.wrap_socket, **kwargs)
51
52    def post_ssl(self, **kwargs):
53        return self.post(wrapper=self.context.wrap_socket, **kwargs)
54
55    def get_server_certificate(self, addr=('127.0.0.1', 7080)):
56
57        ssl_list = dir(ssl)
58
59        if 'PROTOCOL_TLS' in ssl_list:
60            ssl_version = ssl.PROTOCOL_TLS
61
62        elif 'PROTOCOL_TLSv1_2' in ssl_list:
63            ssl_version = ssl.PROTOCOL_TLSv1_2
64
65        else:
66            ssl_version = ssl.PROTOCOL_TLSv1_1
67
68        return ssl.get_server_certificate(addr, ssl_version=ssl_version)
69
70    def openssl_conf(self, rewrite=False, alt_names=[]):
71        conf_path = option.temp_dir + '/openssl.conf'
72
73        if not rewrite and os.path.exists(conf_path):
74            return
75
76        # Generates alt_names section with dns names
77        a_names = "[alt_names]\n"
78        for i, k in enumerate(alt_names, 1):
79            k = k.split('|')
80
81            if k[0] == 'IP':
82                a_names += "IP.%d = %s\n" % (i, k[1])
83            else:
84                a_names += "DNS.%d = %s\n" % (i, k[0])
85
86        # Generates section for sign request extension
87        a_sec = """req_extensions = myca_req_extensions
88
89[ myca_req_extensions ]
90subjectAltName = @alt_names
91
92{a_names}""".format(
93            a_names=a_names
94        )
95
96        with open(conf_path, 'w') as f:
97            f.write(
98                """[ req ]
99default_bits = 2048
100encrypt_key = no
101distinguished_name = req_distinguished_name
102
103{a_sec}
104[ req_distinguished_name ]""".format(
105                    a_sec=a_sec if alt_names else ""
106                )
107            )
108
109    def load(self, script, name=None):
110        if name is None:
111            name = script
112
113        script_path = option.test_dir + '/python/' + script
114
115        self._load_conf(
116            {
117                "listeners": {"*:7080": {"pass": "applications/" + name}},
118                "applications": {
119                    name: {
120                        "type": "python",
121                        "processes": {"spare": 0},
122                        "path": script_path,
123                        "working_directory": script_path,
124                        "module": "wsgi",
125                    }
126                },
127            }
128        )
129