xref: /unit/test/test_tls_sni.py (revision 1922:53a9f4983fe0)
1import ssl
2import subprocess
3
4import pytest
5
6from unit.applications.tls import TestApplicationTLS
7from unit.option import option
8
9
10class TestTLSSNI(TestApplicationTLS):
11    prerequisites = {'modules': {'openssl': 'any'}}
12
13    def setup_method(self):
14        self._load_conf(
15            {
16                "listeners": {"*:7080": {"pass": "routes"}},
17                "routes": [{"action": {"return": 200}}],
18                "applications": {},
19            }
20        )
21
22    def openssl_date_to_sec_epoch(self, date):
23        return self.date_to_sec_epoch(date, '%b %d %H:%M:%S %Y %Z')
24
25    def add_tls(self, cert='default'):
26        assert 'success' in self.conf(
27            {"pass": "routes", "tls": {"certificate": cert}},
28            'listeners/*:7080',
29        )
30
31    def remove_tls(self):
32        assert 'success' in self.conf({"pass": "routes"}, 'listeners/*:7080')
33
34    def generate_ca_conf(self):
35        with open(option.temp_dir + '/ca.conf', 'w') as f:
36            f.write(
37                """[ ca ]
38default_ca = myca
39
40[ myca ]
41new_certs_dir = %(dir)s
42database = %(database)s
43default_md = sha256
44policy = myca_policy
45serial = %(certserial)s
46default_days = 1
47x509_extensions = myca_extensions
48copy_extensions = copy
49
50[ myca_policy ]
51commonName = optional
52
53[ myca_extensions ]
54basicConstraints = critical,CA:TRUE"""
55                % {
56                    'dir': option.temp_dir,
57                    'database': option.temp_dir + '/certindex',
58                    'certserial': option.temp_dir + '/certserial',
59                }
60            )
61
62        with open(option.temp_dir + '/certserial', 'w') as f:
63            f.write('1000')
64
65        with open(option.temp_dir + '/certindex', 'w') as f:
66            f.write('')
67
68    def config_bundles(self, bundles):
69        self.certificate('root', False)
70
71        for b in bundles:
72            self.openssl_conf(rewrite=True, alt_names=bundles[b]['alt_names'])
73            subj = (
74                '/CN={}/'.format(bundles[b]['subj'])
75                if 'subj' in bundles[b]
76                else '/'
77            )
78
79            subprocess.call(
80                [
81                    'openssl',
82                    'req',
83                    '-new',
84                    '-subj',
85                    subj,
86                    '-config',
87                    option.temp_dir + '/openssl.conf',
88                    '-out',
89                    option.temp_dir + '/{}.csr'.format(b),
90                    '-keyout',
91                    option.temp_dir + '/{}.key'.format(b),
92                ],
93                stderr=subprocess.STDOUT,
94            )
95
96        self.generate_ca_conf()
97
98        for b in bundles:
99            subj = (
100                '/CN={}/'.format(bundles[b]['subj'])
101                if 'subj' in bundles[b]
102                else '/'
103            )
104
105            subprocess.call(
106                [
107                    'openssl',
108                    'ca',
109                    '-batch',
110                    '-subj',
111                    subj,
112                    '-config',
113                    option.temp_dir + '/ca.conf',
114                    '-keyfile',
115                    option.temp_dir + '/root.key',
116                    '-cert',
117                    option.temp_dir + '/root.crt',
118                    '-in',
119                    option.temp_dir + '/{}.csr'.format(b),
120                    '-out',
121                    option.temp_dir + '/{}.crt'.format(b),
122                ],
123                stderr=subprocess.STDOUT,
124            )
125
126        self.context = ssl.create_default_context()
127        self.context.check_hostname = False
128        self.context.verify_mode = ssl.CERT_REQUIRED
129        self.context.load_verify_locations(option.temp_dir + '/root.crt')
130
131        self.load_certs(bundles)
132
133    def load_certs(self, bundles):
134        for bname, bvalue in bundles.items():
135            assert 'success' in self.certificate_load(
136                bname, bname
137            ), 'certificate {} upload'.format(bvalue['subj'])
138
139    def check_cert(self, host, expect):
140        resp, sock = self.get_ssl(
141            headers={
142                'Host': host,
143                'Content-Length': '0',
144                'Connection': 'close',
145            },
146            start=True,
147        )
148
149        assert resp['status'] == 200
150        assert sock.getpeercert()['subject'][0][0][1] == expect
151
152    def test_tls_sni(self):
153        bundles = {
154            "default": {"subj": "default", "alt_names": ["default"]},
155            "localhost.com": {
156                "subj": "localhost.com",
157                "alt_names": ["alt1.localhost.com"],
158            },
159            "example.com": {
160                "subj": "example.com",
161                "alt_names": ["alt1.example.com", "alt2.example.com"],
162            },
163        }
164        self.config_bundles(bundles)
165        self.add_tls(["default", "localhost.com", "example.com"])
166
167        self.check_cert('alt1.localhost.com', bundles['localhost.com']['subj'])
168        self.check_cert('alt2.example.com', bundles['example.com']['subj'])
169        self.check_cert('blah', bundles['default']['subj'])
170
171    def test_tls_sni_no_hostname(self):
172        bundles = {
173            "localhost.com": {"subj": "localhost.com", "alt_names": []},
174            "example.com": {
175                "subj": "example.com",
176                "alt_names": ["example.com"],
177            },
178        }
179        self.config_bundles(bundles)
180        self.add_tls(["localhost.com", "example.com"])
181
182        resp, sock = self.get_ssl(
183            headers={'Content-Length': '0', 'Connection': 'close'}, start=True,
184        )
185        assert resp['status'] == 200
186        assert (
187            sock.getpeercert()['subject'][0][0][1]
188            == bundles['localhost.com']['subj']
189        )
190
191    def test_tls_sni_upper_case(self):
192        bundles = {
193            "localhost.com": {"subj": "LOCALHOST.COM", "alt_names": []},
194            "example.com": {
195                "subj": "example.com",
196                "alt_names": ["ALT1.EXAMPLE.COM", "*.ALT2.EXAMPLE.COM"],
197            },
198        }
199        self.config_bundles(bundles)
200        self.add_tls(["localhost.com", "example.com"])
201
202        self.check_cert('localhost.com', bundles['localhost.com']['subj'])
203        self.check_cert('LOCALHOST.COM', bundles['localhost.com']['subj'])
204        self.check_cert('EXAMPLE.COM', bundles['localhost.com']['subj'])
205        self.check_cert('ALT1.EXAMPLE.COM', bundles['example.com']['subj'])
206        self.check_cert('WWW.ALT2.EXAMPLE.COM', bundles['example.com']['subj'])
207
208    def test_tls_sni_only_bundle(self):
209        bundles = {
210            "localhost.com": {
211                "subj": "localhost.com",
212                "alt_names": ["alt1.localhost.com", "alt2.localhost.com"],
213            }
214        }
215        self.config_bundles(bundles)
216        self.add_tls(["localhost.com"])
217
218        self.check_cert('domain.com', bundles['localhost.com']['subj'])
219        self.check_cert('alt1.domain.com', bundles['localhost.com']['subj'])
220
221    def test_tls_sni_wildcard(self):
222        bundles = {
223            "localhost.com": {"subj": "localhost.com", "alt_names": []},
224            "example.com": {
225                "subj": "example.com",
226                "alt_names": ["*.example.com", "*.alt.example.com"],
227            },
228        }
229        self.config_bundles(bundles)
230        self.add_tls(["localhost.com", "example.com"])
231
232        self.check_cert('example.com', bundles['localhost.com']['subj'])
233        self.check_cert('www.example.com', bundles['example.com']['subj'])
234        self.check_cert('alt.example.com', bundles['example.com']['subj'])
235        self.check_cert('www.alt.example.com', bundles['example.com']['subj'])
236        self.check_cert('www.alt.example.ru', bundles['localhost.com']['subj'])
237
238    def test_tls_sni_duplicated_bundle(self):
239        bundles = {
240            "localhost.com": {
241                "subj": "localhost.com",
242                "alt_names": ["localhost.com", "alt2.localhost.com"],
243            }
244        }
245        self.config_bundles(bundles)
246        self.add_tls(["localhost.com", "localhost.com"])
247
248        self.check_cert('localhost.com', bundles['localhost.com']['subj'])
249        self.check_cert('alt2.localhost.com', bundles['localhost.com']['subj'])
250
251    def test_tls_sni_same_alt(self):
252        bundles = {
253            "localhost": {"subj": "subj1", "alt_names": "same.altname.com"},
254            "example": {"subj": "subj2", "alt_names": "same.altname.com"},
255        }
256        self.config_bundles(bundles)
257        self.add_tls(["localhost", "example"])
258
259        self.check_cert('localhost', bundles['localhost']['subj'])
260        self.check_cert('example', bundles['localhost']['subj'])
261
262    def test_tls_sni_empty_cn(self):
263        bundles = {"localhost": {"alt_names": ["alt.localhost.com"]}}
264        self.config_bundles(bundles)
265        self.add_tls(["localhost"])
266
267        resp, sock = self.get_ssl(
268            headers={
269                'Host': 'domain.com',
270                'Content-Length': '0',
271                'Connection': 'close',
272            },
273            start=True,
274        )
275
276        assert resp['status'] == 200
277        assert (
278            sock.getpeercert()['subjectAltName'][0][1] == 'alt.localhost.com'
279        )
280
281    def test_tls_sni_invalid(self):
282        self.config_bundles({"localhost": {"subj": "subj1", "alt_names": ''}})
283        self.add_tls(["localhost"])
284
285        def check_certificate(cert):
286            assert 'error' in self.conf(
287                {"pass": "routes", "tls": {"certificate": cert}},
288                'listeners/*:7080',
289            )
290
291        check_certificate('')
292        check_certificate('blah')
293        check_certificate([])
294        check_certificate(['blah'])
295        check_certificate(['localhost', 'blah'])
296        check_certificate(['localhost', []])
297