xref: /unit/test/test_tls.py (revision 2482:88df458ead6d)
1import io
2import ssl
3import subprocess
4import time
5
6import pytest
7from unit.applications.tls import TestApplicationTLS
8from unit.option import option
9
10
11class TestTLS(TestApplicationTLS):
12    prerequisites = {'modules': {'python': 'any', 'openssl': 'any'}}
13
14    def add_tls(self, application='empty', cert='default', port=7080):
15        assert 'success' in self.conf(
16            {
17                "pass": f"applications/{application}",
18                "tls": {"certificate": cert},
19            },
20            f'listeners/*:{port}',
21        )
22
23    def remove_tls(self, application='empty', port=7080):
24        assert 'success' in self.conf(
25            {"pass": f"applications/{application}"}, f'listeners/*:{port}'
26        )
27
28    def req(self, name='localhost', subject=None):
29        subj = subject if subject is not None else f'/CN={name}/'
30
31        subprocess.check_output(
32            [
33                'openssl',
34                'req',
35                '-new',
36                '-subj',
37                subj,
38                '-config',
39                f'{option.temp_dir}/openssl.conf',
40                '-out',
41                f'{option.temp_dir}/{name}.csr',
42                '-keyout',
43                f'{option.temp_dir}/{name}.key',
44            ],
45            stderr=subprocess.STDOUT,
46        )
47
48    def generate_ca_conf(self):
49        with open(f'{option.temp_dir}/ca.conf', 'w') as f:
50            f.write(
51                f"""[ ca ]
52default_ca = myca
53
54[ myca ]
55new_certs_dir = {option.temp_dir}
56database = {option.temp_dir}/certindex
57default_md = sha256
58policy = myca_policy
59serial = {option.temp_dir}/certserial
60default_days = 1
61x509_extensions = myca_extensions
62copy_extensions = copy
63
64[ myca_policy ]
65commonName = optional
66
67[ myca_extensions ]
68basicConstraints = critical,CA:TRUE"""
69            )
70
71        with open(f'{option.temp_dir}/certserial', 'w') as f:
72            f.write('1000')
73
74        with open(f'{option.temp_dir}/certindex', 'w') as f:
75            f.write('')
76
77        with open(f'{option.temp_dir}/certindex.attr', 'w') as f:
78            f.write('')
79
80    def ca(self, cert='root', out='localhost'):
81        subprocess.check_output(
82            [
83                'openssl',
84                'ca',
85                '-batch',
86                '-config',
87                f'{option.temp_dir}/ca.conf',
88                '-keyfile',
89                f'{option.temp_dir}/{cert}.key',
90                '-cert',
91                f'{option.temp_dir}/{cert}.crt',
92                '-in',
93                f'{option.temp_dir}/{out}.csr',
94                '-out',
95                f'{option.temp_dir}/{out}.crt',
96            ],
97            stderr=subprocess.STDOUT,
98        )
99
100    def set_certificate_req_context(self, cert='root'):
101        self.context = ssl.create_default_context()
102        self.context.check_hostname = False
103        self.context.verify_mode = ssl.CERT_REQUIRED
104        self.context.load_verify_locations(f'{option.temp_dir}/{cert}.crt')
105
106    def test_tls_listener_option_add(self):
107        self.load('empty')
108
109        self.certificate()
110
111        self.add_tls()
112
113        assert self.get_ssl()['status'] == 200, 'add listener option'
114
115    def test_tls_listener_option_remove(self):
116        self.load('empty')
117
118        self.certificate()
119
120        self.add_tls()
121
122        self.get_ssl()
123
124        self.remove_tls()
125
126        assert self.get()['status'] == 200, 'remove listener option'
127
128    def test_tls_certificate_remove(self):
129        self.load('empty')
130
131        self.certificate()
132
133        assert 'success' in self.conf_delete(
134            '/certificates/default'
135        ), 'remove certificate'
136
137    def test_tls_certificate_remove_used(self):
138        self.load('empty')
139
140        self.certificate()
141
142        self.add_tls()
143
144        assert 'error' in self.conf_delete(
145            '/certificates/default'
146        ), 'remove certificate'
147
148    def test_tls_certificate_remove_nonexisting(self):
149        self.load('empty')
150
151        self.certificate()
152
153        self.add_tls()
154
155        assert 'error' in self.conf_delete(
156            '/certificates/blah'
157        ), 'remove nonexistings certificate'
158
159    @pytest.mark.skip('not yet')
160    def test_tls_certificate_update(self):
161        self.load('empty')
162
163        self.certificate()
164
165        self.add_tls()
166
167        cert_old = ssl.get_server_certificate(('127.0.0.1', 7080))
168
169        self.certificate()
170
171        assert cert_old != ssl.get_server_certificate(
172            ('127.0.0.1', 7080)
173        ), 'update certificate'
174
175    @pytest.mark.skip('not yet')
176    def test_tls_certificate_key_incorrect(self):
177        self.load('empty')
178
179        self.certificate('first', False)
180        self.certificate('second', False)
181
182        assert 'error' in self.certificate_load(
183            'first', 'second'
184        ), 'key incorrect'
185
186    def test_tls_certificate_change(self):
187        self.load('empty')
188
189        self.certificate()
190        self.certificate('new')
191
192        self.add_tls()
193
194        cert_old = ssl.get_server_certificate(('127.0.0.1', 7080))
195
196        self.add_tls(cert='new')
197
198        assert cert_old != ssl.get_server_certificate(
199            ('127.0.0.1', 7080)
200        ), 'change certificate'
201
202    def test_tls_certificate_key_rsa(self):
203        self.load('empty')
204
205        self.certificate()
206
207        assert (
208            self.conf_get('/certificates/default/key') == 'RSA (2048 bits)'
209        ), 'certificate key rsa'
210
211    def test_tls_certificate_key_ec(self, temp_dir):
212        self.load('empty')
213
214        self.openssl_conf()
215
216        subprocess.check_output(
217            [
218                'openssl',
219                'ecparam',
220                '-noout',
221                '-genkey',
222                '-out',
223                f'{temp_dir}/ec.key',
224                '-name',
225                'prime256v1',
226            ],
227            stderr=subprocess.STDOUT,
228        )
229
230        subprocess.check_output(
231            [
232                'openssl',
233                'req',
234                '-x509',
235                '-new',
236                '-subj',
237                '/CN=ec/',
238                '-config',
239                f'{temp_dir}/openssl.conf',
240                '-key',
241                f'{temp_dir}/ec.key',
242                '-out',
243                f'{temp_dir}/ec.crt',
244            ],
245            stderr=subprocess.STDOUT,
246        )
247
248        self.certificate_load('ec')
249
250        assert (
251            self.conf_get('/certificates/ec/key') == 'ECDH'
252        ), 'certificate key ec'
253
254    def test_tls_certificate_chain_options(self, date_to_sec_epoch, sec_epoch):
255        self.load('empty')
256        date_format = '%b %d %X %Y %Z'
257
258        self.certificate()
259
260        chain = self.conf_get('/certificates/default/chain')
261
262        assert len(chain) == 1, 'certificate chain length'
263
264        cert = chain[0]
265
266        assert (
267            cert['subject']['common_name'] == 'default'
268        ), 'certificate subject common name'
269        assert (
270            cert['issuer']['common_name'] == 'default'
271        ), 'certificate issuer common name'
272
273        assert (
274            abs(
275                sec_epoch
276                - date_to_sec_epoch(cert['validity']['since'], date_format)
277            )
278            < 60
279        ), 'certificate validity since'
280        assert (
281            date_to_sec_epoch(cert['validity']['until'], date_format)
282            - date_to_sec_epoch(cert['validity']['since'], date_format)
283            == 2592000
284        ), 'certificate validity until'
285
286    def test_tls_certificate_chain(self, temp_dir):
287        self.load('empty')
288
289        self.certificate('root', False)
290
291        self.req('int')
292        self.req('end')
293
294        self.generate_ca_conf()
295
296        self.ca(cert='root', out='int')
297        self.ca(cert='int', out='end')
298
299        crt_path = f'{temp_dir}/end-int.crt'
300        end_path = f'{temp_dir}/end.crt'
301        int_path = f'{temp_dir}/int.crt'
302
303        with open(crt_path, 'wb') as crt, open(end_path, 'rb') as end, open(
304            int_path, 'rb'
305        ) as int:
306            crt.write(end.read() + int.read())
307
308        self.set_certificate_req_context()
309
310        # incomplete chain
311
312        assert 'success' in self.certificate_load(
313            'end', 'end'
314        ), 'certificate chain end upload'
315
316        chain = self.conf_get('/certificates/end/chain')
317        assert len(chain) == 1, 'certificate chain end length'
318        assert (
319            chain[0]['subject']['common_name'] == 'end'
320        ), 'certificate chain end subject common name'
321        assert (
322            chain[0]['issuer']['common_name'] == 'int'
323        ), 'certificate chain end issuer common name'
324
325        self.add_tls(cert='end')
326
327        try:
328            resp = self.get_ssl()
329        except ssl.SSLError:
330            resp = None
331
332        assert resp is None, 'certificate chain incomplete chain'
333
334        # intermediate
335
336        assert 'success' in self.certificate_load(
337            'int', 'int'
338        ), 'certificate chain int upload'
339
340        chain = self.conf_get('/certificates/int/chain')
341        assert len(chain) == 1, 'certificate chain int length'
342        assert (
343            chain[0]['subject']['common_name'] == 'int'
344        ), 'certificate chain int subject common name'
345        assert (
346            chain[0]['issuer']['common_name'] == 'root'
347        ), 'certificate chain int issuer common name'
348
349        self.add_tls(cert='int')
350
351        assert self.get_ssl()['status'] == 200, 'certificate chain intermediate'
352
353        # intermediate server
354
355        assert 'success' in self.certificate_load(
356            'end-int', 'end'
357        ), 'certificate chain end-int upload'
358
359        chain = self.conf_get('/certificates/end-int/chain')
360        assert len(chain) == 2, 'certificate chain end-int length'
361        assert (
362            chain[0]['subject']['common_name'] == 'end'
363        ), 'certificate chain end-int int subject common name'
364        assert (
365            chain[0]['issuer']['common_name'] == 'int'
366        ), 'certificate chain end-int int issuer common name'
367        assert (
368            chain[1]['subject']['common_name'] == 'int'
369        ), 'certificate chain end-int end subject common name'
370        assert (
371            chain[1]['issuer']['common_name'] == 'root'
372        ), 'certificate chain end-int end issuer common name'
373
374        self.add_tls(cert='end-int')
375
376        assert (
377            self.get_ssl()['status'] == 200
378        ), 'certificate chain intermediate server'
379
380    def test_tls_certificate_chain_long(self, temp_dir):
381        self.load('empty')
382
383        self.generate_ca_conf()
384
385        # Minimum chain length is 3.
386        chain_length = 10
387
388        for i in range(chain_length):
389            if i == 0:
390                self.certificate('root', False)
391            elif i == chain_length - 1:
392                self.req('end')
393            else:
394                self.req(f'int{i}')
395
396        for i in range(chain_length - 1):
397            if i == 0:
398                self.ca(cert='root', out='int1')
399            elif i == chain_length - 2:
400                self.ca(cert=f'int{(chain_length - 2)}', out='end')
401            else:
402                self.ca(cert=f'int{i}', out=f'int{(i + 1)}')
403
404        for i in range(chain_length - 1, 0, -1):
405            path = (
406                f'{temp_dir}/end.crt'
407                if i == chain_length - 1
408                else f'{temp_dir}/int{i}.crt'
409            )
410
411            with open(f'{temp_dir}/all.crt', 'a') as chain, open(path) as cert:
412                chain.write(cert.read())
413
414        self.set_certificate_req_context()
415
416        assert 'success' in self.certificate_load(
417            'all', 'end'
418        ), 'certificate chain upload'
419
420        chain = self.conf_get('/certificates/all/chain')
421        assert len(chain) == chain_length - 1, 'certificate chain length'
422
423        self.add_tls(cert='all')
424
425        assert self.get_ssl()['status'] == 200, 'certificate chain long'
426
427    def test_tls_certificate_empty_cn(self):
428        self.certificate('root', False)
429
430        self.req(subject='/')
431
432        self.generate_ca_conf()
433        self.ca()
434
435        self.set_certificate_req_context()
436
437        assert 'success' in self.certificate_load('localhost', 'localhost')
438
439        cert = self.conf_get('/certificates/localhost')
440        assert cert['chain'][0]['subject'] == {}, 'empty subject'
441        assert cert['chain'][0]['issuer']['common_name'] == 'root', 'issuer'
442
443    def test_tls_certificate_empty_cn_san(self):
444        self.certificate('root', False)
445
446        self.openssl_conf(
447            rewrite=True, alt_names=["example.com", "www.example.net"]
448        )
449
450        self.req(subject='/')
451
452        self.generate_ca_conf()
453        self.ca()
454
455        self.set_certificate_req_context()
456
457        assert 'success' in self.certificate_load('localhost', 'localhost')
458
459        cert = self.conf_get('/certificates/localhost')
460        assert cert['chain'][0]['subject'] == {
461            'alt_names': ['example.com', 'www.example.net']
462        }, 'subject alt_names'
463        assert cert['chain'][0]['issuer']['common_name'] == 'root', 'issuer'
464
465    def test_tls_certificate_empty_cn_san_ip(self):
466        self.certificate('root', False)
467
468        self.openssl_conf(
469            rewrite=True,
470            alt_names=['example.com', 'www.example.net', 'IP|10.0.0.1'],
471        )
472
473        self.req(subject='/')
474
475        self.generate_ca_conf()
476        self.ca()
477
478        self.set_certificate_req_context()
479
480        assert 'success' in self.certificate_load('localhost', 'localhost')
481
482        cert = self.conf_get('/certificates/localhost')
483        assert cert['chain'][0]['subject'] == {
484            'alt_names': ['example.com', 'www.example.net']
485        }, 'subject alt_names'
486        assert cert['chain'][0]['issuer']['common_name'] == 'root', 'issuer'
487
488    def test_tls_keepalive(self):
489        self.load('mirror')
490
491        assert self.get()['status'] == 200, 'init'
492
493        self.certificate()
494
495        self.add_tls(application='mirror')
496
497        (resp, sock) = self.post_ssl(
498            headers={
499                'Host': 'localhost',
500                'Connection': 'keep-alive',
501            },
502            start=True,
503            body='0123456789',
504            read_timeout=1,
505        )
506
507        assert resp['body'] == '0123456789', 'keepalive 1'
508
509        resp = self.post_ssl(
510            headers={
511                'Host': 'localhost',
512                'Connection': 'close',
513            },
514            sock=sock,
515            body='0123456789',
516        )
517
518        assert resp['body'] == '0123456789', 'keepalive 2'
519
520    def test_tls_no_close_notify(self):
521        self.certificate()
522
523        assert 'success' in self.conf(
524            {
525                "listeners": {
526                    "*:7080": {
527                        "pass": "routes",
528                        "tls": {"certificate": "default"},
529                    }
530                },
531                "routes": [{"action": {"return": 200}}],
532                "applications": {},
533            }
534        ), 'load application configuration'
535
536        (_, sock) = self.get_ssl(start=True)
537
538        time.sleep(5)
539
540        sock.close()
541
542    @pytest.mark.skip('not yet')
543    def test_tls_keepalive_certificate_remove(self):
544        self.load('empty')
545
546        assert self.get()['status'] == 200, 'init'
547
548        self.certificate()
549
550        self.add_tls()
551
552        (resp, sock) = self.get_ssl(
553            headers={'Host': 'localhost', 'Connection': 'keep-alive'},
554            start=True,
555            read_timeout=1,
556        )
557
558        assert 'success' in self.conf(
559            {"pass": "applications/empty"}, 'listeners/*:7080'
560        )
561        assert 'success' in self.conf_delete('/certificates/default')
562
563        try:
564            resp = self.get_ssl(sock=sock)
565
566        except KeyboardInterrupt:
567            raise
568
569        except:
570            resp = None
571
572        assert resp is None, 'keepalive remove certificate'
573
574    @pytest.mark.skip('not yet')
575    def test_tls_certificates_remove_all(self):
576        self.load('empty')
577
578        self.certificate()
579
580        assert 'success' in self.conf_delete(
581            '/certificates'
582        ), 'remove all certificates'
583
584    def test_tls_application_respawn(
585        self, findall, skip_alert, wait_for_record
586    ):
587        self.load('mirror')
588
589        self.certificate()
590
591        assert 'success' in self.conf('1', 'applications/mirror/processes')
592
593        self.add_tls(application='mirror')
594
595        (_, sock) = self.post_ssl(
596            headers={
597                'Host': 'localhost',
598                'Connection': 'keep-alive',
599            },
600            start=True,
601            body='0123456789',
602            read_timeout=1,
603        )
604
605        app_id = findall(r'(\d+)#\d+ "mirror" application started')[0]
606
607        subprocess.check_output(['kill', '-9', app_id])
608
609        skip_alert(fr'process {app_id} exited on signal 9')
610
611        wait_for_record(
612            fr' (?!{app_id}#)(\d+)#\d+ "mirror" application started'
613        )
614
615        resp = self.post_ssl(sock=sock, body='0123456789')
616
617        assert resp['status'] == 200, 'application respawn status'
618        assert resp['body'] == '0123456789', 'application respawn body'
619
620    def test_tls_url_scheme(self):
621        self.load('variables')
622
623        assert (
624            self.post(
625                headers={
626                    'Host': 'localhost',
627                    'Content-Type': 'text/html',
628                    'Custom-Header': '',
629                    'Connection': 'close',
630                }
631            )['headers']['Wsgi-Url-Scheme']
632            == 'http'
633        ), 'url scheme http'
634
635        self.certificate()
636
637        self.add_tls(application='variables')
638
639        assert (
640            self.post_ssl(
641                headers={
642                    'Host': 'localhost',
643                    'Content-Type': 'text/html',
644                    'Custom-Header': '',
645                    'Connection': 'close',
646                }
647            )['headers']['Wsgi-Url-Scheme']
648            == 'https'
649        ), 'url scheme https'
650
651    def test_tls_big_upload(self):
652        self.load('upload')
653
654        self.certificate()
655
656        self.add_tls(application='upload')
657
658        filename = 'test.txt'
659        data = '0123456789' * 9000
660
661        res = self.post_ssl(
662            body={
663                'file': {
664                    'filename': filename,
665                    'type': 'text/plain',
666                    'data': io.StringIO(data),
667                }
668            }
669        )
670        assert res['status'] == 200, 'status ok'
671        assert res['body'] == f'{filename}{data}'
672
673    def test_tls_multi_listener(self):
674        self.load('empty')
675
676        self.certificate()
677
678        self.add_tls()
679        self.add_tls(port=7081)
680
681        assert self.get_ssl()['status'] == 200, 'listener #1'
682
683        assert self.get_ssl(port=7081)['status'] == 200, 'listener #2'
684