xref: /unit/test/test_tls.py (revision 2174:a7fb5d8a9590)
1import io
2import ssl
3import subprocess
4import time
5
6import pytest
7from unit.applications.tls import TestApplicationTLS
8from unit.option import option
9
10
11class TestTLS(TestApplicationTLS):
12    prerequisites = {'modules': {'python': 'any', 'openssl': 'any'}}
13
14    def openssl_date_to_sec_epoch(self, date):
15        return self.date_to_sec_epoch(date, '%b %d %X %Y %Z')
16
17    def add_tls(self, application='empty', cert='default', port=7080):
18        assert 'success' in self.conf(
19            {
20                "pass": "applications/" + application,
21                "tls": {"certificate": cert},
22            },
23            'listeners/*:' + str(port),
24        )
25
26    def remove_tls(self, application='empty', port=7080):
27        assert 'success' in self.conf(
28            {"pass": "applications/" + application}, 'listeners/*:' + str(port)
29        )
30
31    def req(self, name='localhost', subject=None, x509=False):
32        subj = subject if subject is not None else '/CN=' + name + '/'
33
34        subprocess.check_output(
35            [
36                'openssl',
37                'req',
38                '-new',
39                '-subj',
40                subj,
41                '-config',
42                option.temp_dir + '/openssl.conf',
43                '-out',
44                option.temp_dir + '/' + name + '.csr',
45                '-keyout',
46                option.temp_dir + '/' + name + '.key',
47            ],
48            stderr=subprocess.STDOUT,
49        )
50
51    def generate_ca_conf(self):
52        with open(option.temp_dir + '/ca.conf', 'w') as f:
53            f.write(
54                """[ ca ]
55default_ca = myca
56
57[ myca ]
58new_certs_dir = %(dir)s
59database = %(database)s
60default_md = sha256
61policy = myca_policy
62serial = %(certserial)s
63default_days = 1
64x509_extensions = myca_extensions
65copy_extensions = copy
66
67[ myca_policy ]
68commonName = optional
69
70[ myca_extensions ]
71basicConstraints = critical,CA:TRUE"""
72                % {
73                    'dir': option.temp_dir,
74                    'database': option.temp_dir + '/certindex',
75                    'certserial': option.temp_dir + '/certserial',
76                }
77            )
78
79        with open(option.temp_dir + '/certserial', 'w') as f:
80            f.write('1000')
81
82        with open(option.temp_dir + '/certindex', 'w') as f:
83            f.write('')
84
85        with open(option.temp_dir + '/certindex.attr', 'w') as f:
86            f.write('')
87
88    def ca(self, cert='root', out='localhost'):
89        subprocess.check_output(
90            [
91                'openssl',
92                'ca',
93                '-batch',
94                '-config',
95                option.temp_dir + '/ca.conf',
96                '-keyfile',
97                option.temp_dir + '/' + cert + '.key',
98                '-cert',
99                option.temp_dir + '/' + cert + '.crt',
100                '-in',
101                option.temp_dir + '/' + out + '.csr',
102                '-out',
103                option.temp_dir + '/' + out + '.crt',
104            ],
105            stderr=subprocess.STDOUT,
106        )
107
108    def set_certificate_req_context(self, cert='root'):
109        self.context = ssl.create_default_context()
110        self.context.check_hostname = False
111        self.context.verify_mode = ssl.CERT_REQUIRED
112        self.context.load_verify_locations(
113            option.temp_dir + '/' + cert + '.crt'
114        )
115
116    def test_tls_listener_option_add(self):
117        self.load('empty')
118
119        self.certificate()
120
121        self.add_tls()
122
123        assert self.get_ssl()['status'] == 200, 'add listener option'
124
125    def test_tls_listener_option_remove(self):
126        self.load('empty')
127
128        self.certificate()
129
130        self.add_tls()
131
132        self.get_ssl()
133
134        self.remove_tls()
135
136        assert self.get()['status'] == 200, 'remove listener option'
137
138    def test_tls_certificate_remove(self):
139        self.load('empty')
140
141        self.certificate()
142
143        assert 'success' in self.conf_delete(
144            '/certificates/default'
145        ), 'remove certificate'
146
147    def test_tls_certificate_remove_used(self):
148        self.load('empty')
149
150        self.certificate()
151
152        self.add_tls()
153
154        assert 'error' in self.conf_delete(
155            '/certificates/default'
156        ), 'remove certificate'
157
158    def test_tls_certificate_remove_nonexisting(self):
159        self.load('empty')
160
161        self.certificate()
162
163        self.add_tls()
164
165        assert 'error' in self.conf_delete(
166            '/certificates/blah'
167        ), 'remove nonexistings certificate'
168
169    @pytest.mark.skip('not yet')
170    def test_tls_certificate_update(self):
171        self.load('empty')
172
173        self.certificate()
174
175        self.add_tls()
176
177        cert_old = ssl.get_server_certificate(('127.0.0.1', 7080))
178
179        self.certificate()
180
181        assert cert_old != ssl.get_server_certificate(
182            ('127.0.0.1', 7080)
183        ), 'update certificate'
184
185    @pytest.mark.skip('not yet')
186    def test_tls_certificate_key_incorrect(self):
187        self.load('empty')
188
189        self.certificate('first', False)
190        self.certificate('second', False)
191
192        assert 'error' in self.certificate_load(
193            'first', 'second'
194        ), 'key incorrect'
195
196    def test_tls_certificate_change(self):
197        self.load('empty')
198
199        self.certificate()
200        self.certificate('new')
201
202        self.add_tls()
203
204        cert_old = ssl.get_server_certificate(('127.0.0.1', 7080))
205
206        self.add_tls(cert='new')
207
208        assert cert_old != ssl.get_server_certificate(
209            ('127.0.0.1', 7080)
210        ), 'change certificate'
211
212    def test_tls_certificate_key_rsa(self):
213        self.load('empty')
214
215        self.certificate()
216
217        assert (
218            self.conf_get('/certificates/default/key') == 'RSA (2048 bits)'
219        ), 'certificate key rsa'
220
221    def test_tls_certificate_key_ec(self, temp_dir):
222        self.load('empty')
223
224        self.openssl_conf()
225
226        subprocess.check_output(
227            [
228                'openssl',
229                'ecparam',
230                '-noout',
231                '-genkey',
232                '-out',
233                temp_dir + '/ec.key',
234                '-name',
235                'prime256v1',
236            ],
237            stderr=subprocess.STDOUT,
238        )
239
240        subprocess.check_output(
241            [
242                'openssl',
243                'req',
244                '-x509',
245                '-new',
246                '-subj',
247                '/CN=ec/',
248                '-config',
249                temp_dir + '/openssl.conf',
250                '-key',
251                temp_dir + '/ec.key',
252                '-out',
253                temp_dir + '/ec.crt',
254            ],
255            stderr=subprocess.STDOUT,
256        )
257
258        self.certificate_load('ec')
259
260        assert (
261            self.conf_get('/certificates/ec/key') == 'ECDH'
262        ), 'certificate key ec'
263
264    def test_tls_certificate_chain_options(self):
265        self.load('empty')
266
267        self.certificate()
268
269        chain = self.conf_get('/certificates/default/chain')
270
271        assert len(chain) == 1, 'certificate chain length'
272
273        cert = chain[0]
274
275        assert (
276            cert['subject']['common_name'] == 'default'
277        ), 'certificate subject common name'
278        assert (
279            cert['issuer']['common_name'] == 'default'
280        ), 'certificate issuer common name'
281
282        assert (
283            abs(
284                self.sec_epoch()
285                - self.openssl_date_to_sec_epoch(cert['validity']['since'])
286            )
287            < 60
288        ), 'certificate validity since'
289        assert (
290            self.openssl_date_to_sec_epoch(cert['validity']['until'])
291            - self.openssl_date_to_sec_epoch(cert['validity']['since'])
292            == 2592000
293        ), 'certificate validity until'
294
295    def test_tls_certificate_chain(self, temp_dir):
296        self.load('empty')
297
298        self.certificate('root', False)
299
300        self.req('int')
301        self.req('end')
302
303        self.generate_ca_conf()
304
305        self.ca(cert='root', out='int')
306        self.ca(cert='int', out='end')
307
308        crt_path = temp_dir + '/end-int.crt'
309        end_path = temp_dir + '/end.crt'
310        int_path = temp_dir + '/int.crt'
311
312        with open(crt_path, 'wb') as crt, open(end_path, 'rb') as end, open(
313            int_path, 'rb'
314        ) as int:
315            crt.write(end.read() + int.read())
316
317        self.set_certificate_req_context()
318
319        # incomplete chain
320
321        assert 'success' in self.certificate_load(
322            'end', 'end'
323        ), 'certificate chain end upload'
324
325        chain = self.conf_get('/certificates/end/chain')
326        assert len(chain) == 1, 'certificate chain end length'
327        assert (
328            chain[0]['subject']['common_name'] == 'end'
329        ), 'certificate chain end subject common name'
330        assert (
331            chain[0]['issuer']['common_name'] == 'int'
332        ), 'certificate chain end issuer common name'
333
334        self.add_tls(cert='end')
335
336        try:
337            resp = self.get_ssl()
338        except ssl.SSLError:
339            resp = None
340
341        assert resp == None, 'certificate chain incomplete chain'
342
343        # intermediate
344
345        assert 'success' in self.certificate_load(
346            'int', 'int'
347        ), 'certificate chain int upload'
348
349        chain = self.conf_get('/certificates/int/chain')
350        assert len(chain) == 1, 'certificate chain int length'
351        assert (
352            chain[0]['subject']['common_name'] == 'int'
353        ), 'certificate chain int subject common name'
354        assert (
355            chain[0]['issuer']['common_name'] == 'root'
356        ), 'certificate chain int issuer common name'
357
358        self.add_tls(cert='int')
359
360        assert self.get_ssl()['status'] == 200, 'certificate chain intermediate'
361
362        # intermediate server
363
364        assert 'success' in self.certificate_load(
365            'end-int', 'end'
366        ), 'certificate chain end-int upload'
367
368        chain = self.conf_get('/certificates/end-int/chain')
369        assert len(chain) == 2, 'certificate chain end-int length'
370        assert (
371            chain[0]['subject']['common_name'] == 'end'
372        ), 'certificate chain end-int int subject common name'
373        assert (
374            chain[0]['issuer']['common_name'] == 'int'
375        ), 'certificate chain end-int int issuer common name'
376        assert (
377            chain[1]['subject']['common_name'] == 'int'
378        ), 'certificate chain end-int end subject common name'
379        assert (
380            chain[1]['issuer']['common_name'] == 'root'
381        ), 'certificate chain end-int end issuer common name'
382
383        self.add_tls(cert='end-int')
384
385        assert (
386            self.get_ssl()['status'] == 200
387        ), 'certificate chain intermediate server'
388
389    def test_tls_certificate_chain_long(self, temp_dir):
390        self.load('empty')
391
392        self.generate_ca_conf()
393
394        # Minimum chain length is 3.
395        chain_length = 10
396
397        for i in range(chain_length):
398            if i == 0:
399                self.certificate('root', False)
400            elif i == chain_length - 1:
401                self.req('end')
402            else:
403                self.req('int{}'.format(i))
404
405        for i in range(chain_length - 1):
406            if i == 0:
407                self.ca(cert='root', out='int1')
408            elif i == chain_length - 2:
409                self.ca(cert='int{}'.format(chain_length - 2), out='end')
410            else:
411                self.ca(cert='int{}'.format(i), out='int{}'.format(i + 1))
412
413        for i in range(chain_length - 1, 0, -1):
414            path = temp_dir + (
415                '/end.crt' if i == chain_length - 1 else '/int{}.crt'.format(i)
416            )
417
418            with open(temp_dir + '/all.crt', 'a') as chain, open(path) as cert:
419                chain.write(cert.read())
420
421        self.set_certificate_req_context()
422
423        assert 'success' in self.certificate_load(
424            'all', 'end'
425        ), 'certificate chain upload'
426
427        chain = self.conf_get('/certificates/all/chain')
428        assert len(chain) == chain_length - 1, 'certificate chain length'
429
430        self.add_tls(cert='all')
431
432        assert self.get_ssl()['status'] == 200, 'certificate chain long'
433
434    def test_tls_certificate_empty_cn(self, temp_dir):
435        self.certificate('root', False)
436
437        self.req(subject='/')
438
439        self.generate_ca_conf()
440        self.ca()
441
442        self.set_certificate_req_context()
443
444        assert 'success' in self.certificate_load('localhost', 'localhost')
445
446        cert = self.conf_get('/certificates/localhost')
447        assert cert['chain'][0]['subject'] == {}, 'empty subject'
448        assert cert['chain'][0]['issuer']['common_name'] == 'root', 'issuer'
449
450    def test_tls_certificate_empty_cn_san(self, temp_dir):
451        self.certificate('root', False)
452
453        self.openssl_conf(
454            rewrite=True, alt_names=["example.com", "www.example.net"]
455        )
456
457        self.req(subject='/')
458
459        self.generate_ca_conf()
460        self.ca()
461
462        self.set_certificate_req_context()
463
464        assert 'success' in self.certificate_load('localhost', 'localhost')
465
466        cert = self.conf_get('/certificates/localhost')
467        assert cert['chain'][0]['subject'] == {
468            'alt_names': ['example.com', 'www.example.net']
469        }, 'subject alt_names'
470        assert cert['chain'][0]['issuer']['common_name'] == 'root', 'issuer'
471
472    def test_tls_certificate_empty_cn_san_ip(self):
473        self.certificate('root', False)
474
475        self.openssl_conf(
476            rewrite=True,
477            alt_names=['example.com', 'www.example.net', 'IP|10.0.0.1'],
478        )
479
480        self.req(subject='/')
481
482        self.generate_ca_conf()
483        self.ca()
484
485        self.set_certificate_req_context()
486
487        assert 'success' in self.certificate_load('localhost', 'localhost')
488
489        cert = self.conf_get('/certificates/localhost')
490        assert cert['chain'][0]['subject'] == {
491            'alt_names': ['example.com', 'www.example.net']
492        }, 'subject alt_names'
493        assert cert['chain'][0]['issuer']['common_name'] == 'root', 'issuer'
494
495    def test_tls_keepalive(self):
496        self.load('mirror')
497
498        assert self.get()['status'] == 200, 'init'
499
500        self.certificate()
501
502        self.add_tls(application='mirror')
503
504        (resp, sock) = self.post_ssl(
505            headers={
506                'Host': 'localhost',
507                'Connection': 'keep-alive',
508                'Content-Type': 'text/html',
509            },
510            start=True,
511            body='0123456789',
512            read_timeout=1,
513        )
514
515        assert resp['body'] == '0123456789', 'keepalive 1'
516
517        resp = self.post_ssl(
518            headers={
519                'Host': 'localhost',
520                'Connection': 'close',
521                'Content-Type': 'text/html',
522            },
523            sock=sock,
524            body='0123456789',
525        )
526
527        assert resp['body'] == '0123456789', 'keepalive 2'
528
529    def test_tls_no_close_notify(self):
530        self.certificate()
531
532        assert 'success' in self.conf(
533            {
534                "listeners": {
535                    "*:7080": {
536                        "pass": "routes",
537                        "tls": {"certificate": "default"},
538                    }
539                },
540                "routes": [{"action": {"return": 200}}],
541                "applications": {},
542            }
543        ), 'load application configuration'
544
545        (resp, sock) = self.get_ssl(start=True)
546
547        time.sleep(5)
548
549        sock.close()
550
551    @pytest.mark.skip('not yet')
552    def test_tls_keepalive_certificate_remove(self):
553        self.load('empty')
554
555        assert self.get()['status'] == 200, 'init'
556
557        self.certificate()
558
559        self.add_tls()
560
561        (resp, sock) = self.get_ssl(
562            headers={'Host': 'localhost', 'Connection': 'keep-alive'},
563            start=True,
564            read_timeout=1,
565        )
566
567        assert 'success' in self.conf(
568            {"pass": "applications/empty"}, 'listeners/*:7080'
569        )
570        assert 'success' in self.conf_delete('/certificates/default')
571
572        try:
573            resp = self.get_ssl(
574                headers={'Host': 'localhost', 'Connection': 'close'}, sock=sock
575            )
576
577        except KeyboardInterrupt:
578            raise
579
580        except:
581            resp = None
582
583        assert resp == None, 'keepalive remove certificate'
584
585    @pytest.mark.skip('not yet')
586    def test_tls_certificates_remove_all(self):
587        self.load('empty')
588
589        self.certificate()
590
591        assert 'success' in self.conf_delete(
592            '/certificates'
593        ), 'remove all certificates'
594
595    def test_tls_application_respawn(self, skip_alert):
596        self.load('mirror')
597
598        self.certificate()
599
600        assert 'success' in self.conf('1', 'applications/mirror/processes')
601
602        self.add_tls(application='mirror')
603
604        (_, sock) = self.post_ssl(
605            headers={
606                'Host': 'localhost',
607                'Connection': 'keep-alive',
608                'Content-Type': 'text/html',
609            },
610            start=True,
611            body='0123456789',
612            read_timeout=1,
613        )
614
615        app_id = self.findall(r'(\d+)#\d+ "mirror" application started')[0]
616
617        subprocess.check_output(['kill', '-9', app_id])
618
619        skip_alert(r'process %s exited on signal 9' % app_id)
620
621        self.wait_for_record(
622            r' (?!' + app_id + r'#)(\d+)#\d+ "mirror" application started'
623        )
624
625        resp = self.post_ssl(
626            headers={
627                'Host': 'localhost',
628                'Connection': 'close',
629                'Content-Type': 'text/html',
630            },
631            sock=sock,
632            body='0123456789',
633        )
634
635        assert resp['status'] == 200, 'application respawn status'
636        assert resp['body'] == '0123456789', 'application respawn body'
637
638    def test_tls_url_scheme(self):
639        self.load('variables')
640
641        assert (
642            self.post(
643                headers={
644                    'Host': 'localhost',
645                    'Content-Type': 'text/html',
646                    'Custom-Header': '',
647                    'Connection': 'close',
648                }
649            )['headers']['Wsgi-Url-Scheme']
650            == 'http'
651        ), 'url scheme http'
652
653        self.certificate()
654
655        self.add_tls(application='variables')
656
657        assert (
658            self.post_ssl(
659                headers={
660                    'Host': 'localhost',
661                    'Content-Type': 'text/html',
662                    'Custom-Header': '',
663                    'Connection': 'close',
664                }
665            )['headers']['Wsgi-Url-Scheme']
666            == 'https'
667        ), 'url scheme https'
668
669    def test_tls_big_upload(self):
670        self.load('upload')
671
672        self.certificate()
673
674        self.add_tls(application='upload')
675
676        filename = 'test.txt'
677        data = '0123456789' * 9000
678
679        res = self.post_ssl(
680            body={
681                'file': {
682                    'filename': filename,
683                    'type': 'text/plain',
684                    'data': io.StringIO(data),
685                }
686            }
687        )
688        assert res['status'] == 200, 'status ok'
689        assert res['body'] == filename + data
690
691    def test_tls_multi_listener(self):
692        self.load('empty')
693
694        self.certificate()
695
696        self.add_tls()
697        self.add_tls(port=7081)
698
699        assert self.get_ssl()['status'] == 200, 'listener #1'
700
701        assert self.get_ssl(port=7081)['status'] == 200, 'listener #2'
702