xref: /unit/test/test_tls_sni.py (revision 1971:3410f9d2a662)
1import ssl
2import subprocess
3
4import pytest
5from unit.applications.tls import TestApplicationTLS
6from unit.option import option
7
8
9class TestTLSSNI(TestApplicationTLS):
10    prerequisites = {'modules': {'openssl': 'any'}}
11
12    def setup_method(self):
13        self._load_conf(
14            {
15                "listeners": {"*:7080": {"pass": "routes"}},
16                "routes": [{"action": {"return": 200}}],
17                "applications": {},
18            }
19        )
20
21    def openssl_date_to_sec_epoch(self, date):
22        return self.date_to_sec_epoch(date, '%b %d %H:%M:%S %Y %Z')
23
24    def add_tls(self, cert='default'):
25        assert 'success' in self.conf(
26            {"pass": "routes", "tls": {"certificate": cert}},
27            'listeners/*:7080',
28        )
29
30    def remove_tls(self):
31        assert 'success' in self.conf({"pass": "routes"}, 'listeners/*:7080')
32
33    def generate_ca_conf(self):
34        with open(option.temp_dir + '/ca.conf', 'w') as f:
35            f.write(
36                """[ ca ]
37default_ca = myca
38
39[ myca ]
40new_certs_dir = %(dir)s
41database = %(database)s
42default_md = sha256
43policy = myca_policy
44serial = %(certserial)s
45default_days = 1
46x509_extensions = myca_extensions
47copy_extensions = copy
48
49[ myca_policy ]
50commonName = optional
51
52[ myca_extensions ]
53basicConstraints = critical,CA:TRUE"""
54                % {
55                    'dir': option.temp_dir,
56                    'database': option.temp_dir + '/certindex',
57                    'certserial': option.temp_dir + '/certserial',
58                }
59            )
60
61        with open(option.temp_dir + '/certserial', 'w') as f:
62            f.write('1000')
63
64        with open(option.temp_dir + '/certindex', 'w') as f:
65            f.write('')
66
67    def config_bundles(self, bundles):
68        self.certificate('root', False)
69
70        for b in bundles:
71            self.openssl_conf(rewrite=True, alt_names=bundles[b]['alt_names'])
72            subj = (
73                '/CN={}/'.format(bundles[b]['subj'])
74                if 'subj' in bundles[b]
75                else '/'
76            )
77
78            subprocess.call(
79                [
80                    'openssl',
81                    'req',
82                    '-new',
83                    '-subj',
84                    subj,
85                    '-config',
86                    option.temp_dir + '/openssl.conf',
87                    '-out',
88                    option.temp_dir + '/{}.csr'.format(b),
89                    '-keyout',
90                    option.temp_dir + '/{}.key'.format(b),
91                ],
92                stderr=subprocess.STDOUT,
93            )
94
95        self.generate_ca_conf()
96
97        for b in bundles:
98            subj = (
99                '/CN={}/'.format(bundles[b]['subj'])
100                if 'subj' in bundles[b]
101                else '/'
102            )
103
104            subprocess.call(
105                [
106                    'openssl',
107                    'ca',
108                    '-batch',
109                    '-subj',
110                    subj,
111                    '-config',
112                    option.temp_dir + '/ca.conf',
113                    '-keyfile',
114                    option.temp_dir + '/root.key',
115                    '-cert',
116                    option.temp_dir + '/root.crt',
117                    '-in',
118                    option.temp_dir + '/{}.csr'.format(b),
119                    '-out',
120                    option.temp_dir + '/{}.crt'.format(b),
121                ],
122                stderr=subprocess.STDOUT,
123            )
124
125        self.context = ssl.create_default_context()
126        self.context.check_hostname = False
127        self.context.verify_mode = ssl.CERT_REQUIRED
128        self.context.load_verify_locations(option.temp_dir + '/root.crt')
129
130        self.load_certs(bundles)
131
132    def load_certs(self, bundles):
133        for bname, bvalue in bundles.items():
134            assert 'success' in self.certificate_load(
135                bname, bname
136            ), 'certificate {} upload'.format(bvalue['subj'])
137
138    def check_cert(self, host, expect):
139        resp, sock = self.get_ssl(
140            headers={
141                'Host': host,
142                'Content-Length': '0',
143                'Connection': 'close',
144            },
145            start=True,
146        )
147
148        assert resp['status'] == 200
149        assert sock.getpeercert()['subject'][0][0][1] == expect
150
151    def test_tls_sni(self):
152        bundles = {
153            "default": {"subj": "default", "alt_names": ["default"]},
154            "localhost.com": {
155                "subj": "localhost.com",
156                "alt_names": ["alt1.localhost.com"],
157            },
158            "example.com": {
159                "subj": "example.com",
160                "alt_names": ["alt1.example.com", "alt2.example.com"],
161            },
162        }
163        self.config_bundles(bundles)
164        self.add_tls(["default", "localhost.com", "example.com"])
165
166        self.check_cert('alt1.localhost.com', bundles['localhost.com']['subj'])
167        self.check_cert('alt2.example.com', bundles['example.com']['subj'])
168        self.check_cert('blah', bundles['default']['subj'])
169
170    def test_tls_sni_no_hostname(self):
171        bundles = {
172            "localhost.com": {"subj": "localhost.com", "alt_names": []},
173            "example.com": {
174                "subj": "example.com",
175                "alt_names": ["example.com"],
176            },
177        }
178        self.config_bundles(bundles)
179        self.add_tls(["localhost.com", "example.com"])
180
181        resp, sock = self.get_ssl(
182            headers={'Content-Length': '0', 'Connection': 'close'}, start=True,
183        )
184        assert resp['status'] == 200
185        assert (
186            sock.getpeercert()['subject'][0][0][1]
187            == bundles['localhost.com']['subj']
188        )
189
190    def test_tls_sni_upper_case(self):
191        bundles = {
192            "localhost.com": {"subj": "LOCALHOST.COM", "alt_names": []},
193            "example.com": {
194                "subj": "example.com",
195                "alt_names": ["ALT1.EXAMPLE.COM", "*.ALT2.EXAMPLE.COM"],
196            },
197        }
198        self.config_bundles(bundles)
199        self.add_tls(["localhost.com", "example.com"])
200
201        self.check_cert('localhost.com', bundles['localhost.com']['subj'])
202        self.check_cert('LOCALHOST.COM', bundles['localhost.com']['subj'])
203        self.check_cert('EXAMPLE.COM', bundles['localhost.com']['subj'])
204        self.check_cert('ALT1.EXAMPLE.COM', bundles['example.com']['subj'])
205        self.check_cert('WWW.ALT2.EXAMPLE.COM', bundles['example.com']['subj'])
206
207    def test_tls_sni_only_bundle(self):
208        bundles = {
209            "localhost.com": {
210                "subj": "localhost.com",
211                "alt_names": ["alt1.localhost.com", "alt2.localhost.com"],
212            }
213        }
214        self.config_bundles(bundles)
215        self.add_tls(["localhost.com"])
216
217        self.check_cert('domain.com', bundles['localhost.com']['subj'])
218        self.check_cert('alt1.domain.com', bundles['localhost.com']['subj'])
219
220    def test_tls_sni_wildcard(self):
221        bundles = {
222            "localhost.com": {"subj": "localhost.com", "alt_names": []},
223            "example.com": {
224                "subj": "example.com",
225                "alt_names": ["*.example.com", "*.alt.example.com"],
226            },
227        }
228        self.config_bundles(bundles)
229        self.add_tls(["localhost.com", "example.com"])
230
231        self.check_cert('example.com', bundles['localhost.com']['subj'])
232        self.check_cert('www.example.com', bundles['example.com']['subj'])
233        self.check_cert('alt.example.com', bundles['example.com']['subj'])
234        self.check_cert('www.alt.example.com', bundles['example.com']['subj'])
235        self.check_cert('www.alt.example.ru', bundles['localhost.com']['subj'])
236
237    def test_tls_sni_duplicated_bundle(self):
238        bundles = {
239            "localhost.com": {
240                "subj": "localhost.com",
241                "alt_names": ["localhost.com", "alt2.localhost.com"],
242            }
243        }
244        self.config_bundles(bundles)
245        self.add_tls(["localhost.com", "localhost.com"])
246
247        self.check_cert('localhost.com', bundles['localhost.com']['subj'])
248        self.check_cert('alt2.localhost.com', bundles['localhost.com']['subj'])
249
250    def test_tls_sni_same_alt(self):
251        bundles = {
252            "localhost": {"subj": "subj1", "alt_names": "same.altname.com"},
253            "example": {"subj": "subj2", "alt_names": "same.altname.com"},
254        }
255        self.config_bundles(bundles)
256        self.add_tls(["localhost", "example"])
257
258        self.check_cert('localhost', bundles['localhost']['subj'])
259        self.check_cert('example', bundles['localhost']['subj'])
260
261    def test_tls_sni_empty_cn(self):
262        bundles = {"localhost": {"alt_names": ["alt.localhost.com"]}}
263        self.config_bundles(bundles)
264        self.add_tls(["localhost"])
265
266        resp, sock = self.get_ssl(
267            headers={
268                'Host': 'domain.com',
269                'Content-Length': '0',
270                'Connection': 'close',
271            },
272            start=True,
273        )
274
275        assert resp['status'] == 200
276        assert (
277            sock.getpeercert()['subjectAltName'][0][1] == 'alt.localhost.com'
278        )
279
280    def test_tls_sni_invalid(self):
281        self.config_bundles({"localhost": {"subj": "subj1", "alt_names": ''}})
282        self.add_tls(["localhost"])
283
284        def check_certificate(cert):
285            assert 'error' in self.conf(
286                {"pass": "routes", "tls": {"certificate": cert}},
287                'listeners/*:7080',
288            )
289
290        check_certificate('')
291        check_certificate('blah')
292        check_certificate([])
293        check_certificate(['blah'])
294        check_certificate(['localhost', 'blah'])
295        check_certificate(['localhost', []])
296