xref: /unit/test/unit/applications/tls.py (revision 2066:242192963d93)
1import os
2import ssl
3import subprocess
4
5from unit.applications.proto import TestApplicationProto
6from unit.option import option
7
8
9class TestApplicationTLS(TestApplicationProto):
10    def setup_method(self):
11        self.context = ssl.create_default_context()
12        self.context.check_hostname = False
13        self.context.verify_mode = ssl.CERT_NONE
14
15    def certificate(self, name='default', load=True):
16        self.openssl_conf()
17
18        subprocess.check_output(
19            [
20                'openssl',
21                'req',
22                '-x509',
23                '-new',
24                '-subj',
25                '/CN=' + name + '/',
26                '-config',
27                option.temp_dir + '/openssl.conf',
28                '-out',
29                option.temp_dir + '/' + name + '.crt',
30                '-keyout',
31                option.temp_dir + '/' + name + '.key',
32            ],
33            stderr=subprocess.STDOUT,
34        )
35
36        if load:
37            self.certificate_load(name)
38
39    def certificate_load(self, crt, key=None):
40        if key is None:
41            key = crt
42
43        key_path = option.temp_dir + '/' + key + '.key'
44        crt_path = option.temp_dir + '/' + crt + '.crt'
45
46        with open(key_path, 'rb') as k, open(crt_path, 'rb') as c:
47            return self.conf(k.read() + c.read(), '/certificates/' + crt)
48
49    def get_ssl(self, **kwargs):
50        return self.get(wrapper=self.context.wrap_socket, **kwargs)
51
52    def post_ssl(self, **kwargs):
53        return self.post(wrapper=self.context.wrap_socket, **kwargs)
54
55    def openssl_conf(self, rewrite=False, alt_names=[]):
56        conf_path = option.temp_dir + '/openssl.conf'
57
58        if not rewrite and os.path.exists(conf_path):
59            return
60
61        # Generates alt_names section with dns names
62        a_names = "[alt_names]\n"
63        for i, k in enumerate(alt_names, 1):
64            k = k.split('|')
65
66            if k[0] == 'IP':
67                a_names += "IP.%d = %s\n" % (i, k[1])
68            else:
69                a_names += "DNS.%d = %s\n" % (i, k[0])
70
71        # Generates section for sign request extension
72        a_sec = """req_extensions = myca_req_extensions
73
74[ myca_req_extensions ]
75subjectAltName = @alt_names
76
77{a_names}""".format(
78            a_names=a_names
79        )
80
81        with open(conf_path, 'w') as f:
82            f.write(
83                """[ req ]
84default_bits = 2048
85encrypt_key = no
86distinguished_name = req_distinguished_name
87
88{a_sec}
89[ req_distinguished_name ]""".format(
90                    a_sec=a_sec if alt_names else ""
91                )
92            )
93
94    def load(self, script, name=None):
95        if name is None:
96            name = script
97
98        script_path = option.test_dir + '/python/' + script
99
100        self._load_conf(
101            {
102                "listeners": {"*:7080": {"pass": "applications/" + name}},
103                "applications": {
104                    name: {
105                        "type": "python",
106                        "processes": {"spare": 0},
107                        "path": script_path,
108                        "working_directory": script_path,
109                        "module": "wsgi",
110                    }
111                },
112            }
113        )
114