xref: /unit/test/test_tls_sni.py (revision 1848:4bd548074e2c)
1import ssl
2import subprocess
3
4import pytest
5
6from unit.applications.tls import TestApplicationTLS
7from unit.option import option
8
9
10class TestTLSSNI(TestApplicationTLS):
11    prerequisites = {'modules': {'openssl': 'any'}}
12
13    def setup_method(self):
14        self._load_conf(
15            {
16                "listeners": {"*:7080": {"pass": "routes"}},
17                "routes": [{"action": {"return": 200}}],
18                "applications": {},
19            }
20        )
21
22    def openssl_date_to_sec_epoch(self, date):
23        return self.date_to_sec_epoch(date, '%b %d %H:%M:%S %Y %Z')
24
25    def add_tls(self, cert='default'):
26        assert 'success' in self.conf(
27            {"pass": "routes", "tls": {"certificate": cert}},
28            'listeners/*:7080',
29        )
30
31    def remove_tls(self):
32        assert 'success' in self.conf({"pass": "routes"}, 'listeners/*:7080')
33
34    def generate_ca_conf(self):
35        with open(option.temp_dir + '/ca.conf', 'w') as f:
36            f.write(
37                """[ ca ]
38default_ca = myca
39
40[ myca ]
41new_certs_dir = %(dir)s
42database = %(database)s
43default_md = sha256
44policy = myca_policy
45serial = %(certserial)s
46default_days = 1
47x509_extensions = myca_extensions
48copy_extensions = copy
49
50[ myca_policy ]
51commonName = optional
52
53[ myca_extensions ]
54basicConstraints = critical,CA:TRUE"""
55                % {
56                    'dir': option.temp_dir,
57                    'database': option.temp_dir + '/certindex',
58                    'certserial': option.temp_dir + '/certserial',
59                }
60            )
61
62        with open(option.temp_dir + '/certserial', 'w') as f:
63            f.write('1000')
64
65        with open(option.temp_dir + '/certindex', 'w') as f:
66            f.write('')
67
68    def config_bundles(self, bundles):
69        self.certificate('root', False)
70
71        for b in bundles:
72            self.openssl_conf(rewrite=True, alt_names=bundles[b]['alt_names'])
73            subj = (
74                '/CN={}/'.format(bundles[b]['subj'])
75                if 'subj' in bundles[b]
76                else '/'
77            )
78
79            subprocess.call(
80                [
81                    'openssl',
82                    'req',
83                    '-new',
84                    '-subj',
85                    subj,
86                    '-config',
87                    option.temp_dir + '/openssl.conf',
88                    '-out',
89                    option.temp_dir + '/{}.csr'.format(b),
90                    '-keyout',
91                    option.temp_dir + '/{}.key'.format(b),
92                ],
93                stderr=subprocess.STDOUT,
94            )
95
96        self.generate_ca_conf()
97
98        for b in bundles:
99            subj = (
100                '/CN={}/'.format(bundles[b]['subj'])
101                if 'subj' in bundles[b]
102                else '/'
103            )
104
105            subprocess.call(
106                [
107                    'openssl',
108                    'ca',
109                    '-batch',
110                    '-subj',
111                    subj,
112                    '-config',
113                    option.temp_dir + '/ca.conf',
114                    '-keyfile',
115                    option.temp_dir + '/root.key',
116                    '-cert',
117                    option.temp_dir + '/root.crt',
118                    '-in',
119                    option.temp_dir + '/{}.csr'.format(b),
120                    '-out',
121                    option.temp_dir + '/{}.crt'.format(b),
122                ],
123                stderr=subprocess.STDOUT,
124            )
125
126        self.context = ssl.create_default_context()
127        self.context.check_hostname = False
128        self.context.verify_mode = ssl.CERT_REQUIRED
129        self.context.load_verify_locations(option.temp_dir + '/root.crt')
130
131        self.load_certs(bundles)
132
133    def load_certs(self, bundles):
134        for bname, bvalue in bundles.items():
135            assert 'success' in self.certificate_load(
136                bname, bname
137            ), 'certificate {} upload'.format(bvalue['subj'])
138
139    def check_cert(self, host, expect):
140        resp, sock = self.get_ssl(
141            headers={
142                'Host': host,
143                'Content-Length': '0',
144                'Connection': 'close',
145            },
146            start=True,
147        )
148
149        assert resp['status'] == 200
150        assert sock.getpeercert()['subject'][0][0][1] == expect
151
152    def test_tls_sni(self):
153        bundles = {
154            "default": {"subj": "default", "alt_names": ["default"]},
155            "localhost.com": {
156                "subj": "localhost.com",
157                "alt_names": ["alt1.localhost.com"],
158            },
159            "example.com": {
160                "subj": "example.com",
161                "alt_names": ["alt1.example.com", "alt2.example.com"],
162            },
163        }
164        self.config_bundles(bundles)
165        self.add_tls(["default", "localhost.com", "example.com"])
166
167        self.check_cert('alt1.localhost.com', bundles['localhost.com']['subj'])
168        self.check_cert('alt2.example.com', bundles['example.com']['subj'])
169        self.check_cert('blah', bundles['default']['subj'])
170
171    def test_tls_sni_upper_case(self):
172        bundles = {
173            "localhost.com": {"subj": "LOCALHOST.COM", "alt_names": []},
174            "example.com": {
175                "subj": "example.com",
176                "alt_names": ["ALT1.EXAMPLE.COM", "*.ALT2.EXAMPLE.COM"],
177            },
178        }
179        self.config_bundles(bundles)
180        self.add_tls(["localhost.com", "example.com"])
181
182        self.check_cert('localhost.com', bundles['localhost.com']['subj'])
183        self.check_cert('LOCALHOST.COM', bundles['localhost.com']['subj'])
184        self.check_cert('EXAMPLE.COM', bundles['localhost.com']['subj'])
185        self.check_cert('ALT1.EXAMPLE.COM', bundles['example.com']['subj'])
186        self.check_cert('WWW.ALT2.EXAMPLE.COM', bundles['example.com']['subj'])
187
188    def test_tls_sni_only_bundle(self):
189        bundles = {
190            "localhost.com": {
191                "subj": "localhost.com",
192                "alt_names": ["alt1.localhost.com", "alt2.localhost.com"],
193            }
194        }
195        self.config_bundles(bundles)
196        self.add_tls(["localhost.com"])
197
198        self.check_cert('domain.com', bundles['localhost.com']['subj'])
199        self.check_cert('alt1.domain.com', bundles['localhost.com']['subj'])
200
201    def test_tls_sni_wildcard(self):
202        bundles = {
203            "localhost.com": {"subj": "localhost.com", "alt_names": []},
204            "example.com": {
205                "subj": "example.com",
206                "alt_names": ["*.example.com", "*.alt.example.com"],
207            },
208        }
209        self.config_bundles(bundles)
210        self.add_tls(["localhost.com", "example.com"])
211
212        self.check_cert('example.com', bundles['localhost.com']['subj'])
213        self.check_cert('www.example.com', bundles['example.com']['subj'])
214        self.check_cert('alt.example.com', bundles['example.com']['subj'])
215        self.check_cert('www.alt.example.com', bundles['example.com']['subj'])
216        self.check_cert('www.alt.example.ru', bundles['localhost.com']['subj'])
217
218    def test_tls_sni_duplicated_bundle(self):
219        bundles = {
220            "localhost.com": {
221                "subj": "localhost.com",
222                "alt_names": ["localhost.com", "alt2.localhost.com"],
223            }
224        }
225        self.config_bundles(bundles)
226        self.add_tls(["localhost.com", "localhost.com"])
227
228        self.check_cert('localhost.com', bundles['localhost.com']['subj'])
229        self.check_cert('alt2.localhost.com', bundles['localhost.com']['subj'])
230
231    def test_tls_sni_same_alt(self):
232        bundles = {
233            "localhost": {"subj": "subj1", "alt_names": "same.altname.com"},
234            "example": {"subj": "subj2", "alt_names": "same.altname.com"},
235        }
236        self.config_bundles(bundles)
237        self.add_tls(["localhost", "example"])
238
239        self.check_cert('localhost', bundles['localhost']['subj'])
240        self.check_cert('example', bundles['localhost']['subj'])
241
242    def test_tls_sni_empty_cn(self):
243        bundles = {"localhost": {"alt_names": ["alt.localhost.com"]}}
244        self.config_bundles(bundles)
245        self.add_tls(["localhost"])
246
247        resp, sock = self.get_ssl(
248            headers={
249                'Host': 'domain.com',
250                'Content-Length': '0',
251                'Connection': 'close',
252            },
253            start=True,
254        )
255
256        assert resp['status'] == 200
257        assert (
258            sock.getpeercert()['subjectAltName'][0][1] == 'alt.localhost.com'
259        )
260
261    def test_tls_sni_invalid(self):
262        self.config_bundles({"localhost": {"subj": "subj1", "alt_names": ''}})
263        self.add_tls(["localhost"])
264
265        def check_certificate(cert):
266            assert 'error' in self.conf(
267                {"pass": "routes", "tls": {"certificate": cert}},
268                'listeners/*:7080',
269            )
270
271        check_certificate('')
272        check_certificate('blah')
273        check_certificate([])
274        check_certificate(['blah'])
275        check_certificate(['localhost', 'blah'])
276        check_certificate(['localhost', []])
277