xref: /unit/test/unit/applications/tls.py (revision 1848:4bd548074e2c)
1import os
2import ssl
3import subprocess
4
5from unit.applications.proto import TestApplicationProto
6from unit.option import option
7
8
9class TestApplicationTLS(TestApplicationProto):
10    def setup_method(self):
11        self.context = ssl.create_default_context()
12        self.context.check_hostname = False
13        self.context.verify_mode = ssl.CERT_NONE
14
15    def certificate(self, name='default', load=True):
16        self.openssl_conf()
17
18        subprocess.call(
19            [
20                'openssl',
21                'req',
22                '-x509',
23                '-new',
24                '-subj',
25                '/CN=' + name + '/',
26                '-config',
27                option.temp_dir + '/openssl.conf',
28                '-out',
29                option.temp_dir + '/' + name + '.crt',
30                '-keyout',
31                option.temp_dir + '/' + name + '.key',
32            ],
33            stderr=subprocess.STDOUT,
34        )
35
36        if load:
37            self.certificate_load(name)
38
39    def certificate_load(self, crt, key=None):
40        if key is None:
41            key = crt
42
43        key_path = option.temp_dir + '/' + key + '.key'
44        crt_path = option.temp_dir + '/' + crt + '.crt'
45
46        with open(key_path, 'rb') as k, open(crt_path, 'rb') as c:
47            return self.conf(k.read() + c.read(), '/certificates/' + crt)
48
49    def get_ssl(self, **kwargs):
50        return self.get(wrapper=self.context.wrap_socket, **kwargs)
51
52    def post_ssl(self, **kwargs):
53        return self.post(wrapper=self.context.wrap_socket, **kwargs)
54
55    def get_server_certificate(self, addr=('127.0.0.1', 7080)):
56
57        ssl_list = dir(ssl)
58
59        if 'PROTOCOL_TLS' in ssl_list:
60            ssl_version = ssl.PROTOCOL_TLS
61
62        elif 'PROTOCOL_TLSv1_2' in ssl_list:
63            ssl_version = ssl.PROTOCOL_TLSv1_2
64
65        else:
66            ssl_version = ssl.PROTOCOL_TLSv1_1
67
68        return ssl.get_server_certificate(addr, ssl_version=ssl_version)
69
70    def openssl_conf(self, rewrite=False, alt_names=[]):
71        conf_path = option.temp_dir + '/openssl.conf'
72
73        if not rewrite and os.path.exists(conf_path):
74            return
75
76        # Generates alt_names section with dns names
77        a_names = "[alt_names]\n"
78        for i, k in enumerate(alt_names, 1):
79            a_names += "DNS.%d = %s\n" % (i, k)
80
81            # Generates section for sign request extension
82        a_sec = """req_extensions = myca_req_extensions
83
84[ myca_req_extensions ]
85subjectAltName = @alt_names
86
87{a_names}""".format(
88            a_names=a_names
89        )
90
91        with open(conf_path, 'w') as f:
92            f.write(
93                """[ req ]
94default_bits = 2048
95encrypt_key = no
96distinguished_name = req_distinguished_name
97
98{a_sec}
99[ req_distinguished_name ]""".format(
100                    a_sec=a_sec if alt_names else ""
101                )
102            )
103
104    def load(self, script, name=None):
105        if name is None:
106            name = script
107
108        script_path = option.test_dir + '/python/' + script
109
110        self._load_conf(
111            {
112                "listeners": {"*:7080": {"pass": "applications/" + name}},
113                "applications": {
114                    name: {
115                        "type": "python",
116                        "processes": {"spare": 0},
117                        "path": script_path,
118                        "working_directory": script_path,
119                        "module": "wsgi",
120                    }
121                },
122            }
123        )
124