xref: /unit/test/unit/applications/tls.py (revision 2330:4b1f175f9c88)
1import os
2import ssl
3import subprocess
4
5from unit.applications.proto import TestApplicationProto
6from unit.option import option
7
8
9class TestApplicationTLS(TestApplicationProto):
10    def setup_method(self):
11        self.context = ssl.create_default_context()
12        self.context.check_hostname = False
13        self.context.verify_mode = ssl.CERT_NONE
14
15    def certificate(self, name='default', load=True):
16        self.openssl_conf()
17
18        subprocess.check_output(
19            [
20                'openssl',
21                'req',
22                '-x509',
23                '-new',
24                '-subj',
25                f'/CN={name}/',
26                '-config',
27                f'{option.temp_dir}/openssl.conf',
28                '-out',
29                f'{option.temp_dir}/{name}.crt',
30                '-keyout',
31                f'{option.temp_dir}/{name}.key',
32            ],
33            stderr=subprocess.STDOUT,
34        )
35
36        if load:
37            self.certificate_load(name)
38
39    def certificate_load(self, crt, key=None):
40        if key is None:
41            key = crt
42
43        key_path = f'{option.temp_dir}/{key}.key'
44        crt_path = f'{option.temp_dir}/{crt}.crt'
45
46        with open(key_path, 'rb') as k, open(crt_path, 'rb') as c:
47            return self.conf(k.read() + c.read(), f'/certificates/{crt}')
48
49    def get_ssl(self, **kwargs):
50        return self.get(wrapper=self.context.wrap_socket, **kwargs)
51
52    def post_ssl(self, **kwargs):
53        return self.post(wrapper=self.context.wrap_socket, **kwargs)
54
55    def openssl_conf(self, rewrite=False, alt_names=None):
56        alt_names = alt_names or []
57        conf_path = f'{option.temp_dir}/openssl.conf'
58
59        if not rewrite and os.path.exists(conf_path):
60            return
61
62        # Generates alt_names section with dns names
63        a_names = '[alt_names]\n'
64        for i, k in enumerate(alt_names, 1):
65            k = k.split('|')
66
67            if k[0] == 'IP':
68                a_names += f'IP.{i} = {k[1]}\n'
69            else:
70                a_names += f'DNS.{i} = {k[0]}\n'
71
72        # Generates section for sign request extension
73        a_sec = f'''req_extensions = myca_req_extensions
74
75[ myca_req_extensions ]
76subjectAltName = @alt_names
77
78{a_names}'''
79
80        with open(conf_path, 'w') as f:
81            f.write(
82                f'''[ req ]
83default_bits = 2048
84encrypt_key = no
85distinguished_name = req_distinguished_name
86
87{a_sec if alt_names else ""}
88[ req_distinguished_name ]'''
89            )
90
91    def load(self, script, name=None):
92        if name is None:
93            name = script
94
95        script_path = f'{option.test_dir}/python/{script}'
96        self._load_conf(
97            {
98                "listeners": {"*:7080": {"pass": f"applications/{name}"}},
99                "applications": {
100                    name: {
101                        "type": "python",
102                        "processes": {"spare": 0},
103                        "path": script_path,
104                        "working_directory": script_path,
105                        "module": "wsgi",
106                    }
107                },
108            }
109        )
110