xref: /unit/test/test_tls.py (revision 2190:fbfec2aaf4c3)
1import io
2import ssl
3import subprocess
4import time
5
6import pytest
7from unit.applications.tls import TestApplicationTLS
8from unit.option import option
9
10
11class TestTLS(TestApplicationTLS):
12    prerequisites = {'modules': {'python': 'any', 'openssl': 'any'}}
13
14    def openssl_date_to_sec_epoch(self, date):
15        return self.date_to_sec_epoch(date, '%b %d %X %Y %Z')
16
17    def add_tls(self, application='empty', cert='default', port=7080):
18        assert 'success' in self.conf(
19            {
20                "pass": "applications/" + application,
21                "tls": {"certificate": cert},
22            },
23            'listeners/*:' + str(port),
24        )
25
26    def remove_tls(self, application='empty', port=7080):
27        assert 'success' in self.conf(
28            {"pass": "applications/" + application}, 'listeners/*:' + str(port)
29        )
30
31    def req(self, name='localhost', subject=None, x509=False):
32        subj = subject if subject is not None else '/CN=' + name + '/'
33
34        subprocess.check_output(
35            [
36                'openssl',
37                'req',
38                '-new',
39                '-subj',
40                subj,
41                '-config',
42                option.temp_dir + '/openssl.conf',
43                '-out',
44                option.temp_dir + '/' + name + '.csr',
45                '-keyout',
46                option.temp_dir + '/' + name + '.key',
47            ],
48            stderr=subprocess.STDOUT,
49        )
50
51    def generate_ca_conf(self):
52        with open(option.temp_dir + '/ca.conf', 'w') as f:
53            f.write(
54                """[ ca ]
55default_ca = myca
56
57[ myca ]
58new_certs_dir = %(dir)s
59database = %(database)s
60default_md = sha256
61policy = myca_policy
62serial = %(certserial)s
63default_days = 1
64x509_extensions = myca_extensions
65copy_extensions = copy
66
67[ myca_policy ]
68commonName = optional
69
70[ myca_extensions ]
71basicConstraints = critical,CA:TRUE"""
72                % {
73                    'dir': option.temp_dir,
74                    'database': option.temp_dir + '/certindex',
75                    'certserial': option.temp_dir + '/certserial',
76                }
77            )
78
79        with open(option.temp_dir + '/certserial', 'w') as f:
80            f.write('1000')
81
82        with open(option.temp_dir + '/certindex', 'w') as f:
83            f.write('')
84
85        with open(option.temp_dir + '/certindex.attr', 'w') as f:
86            f.write('')
87
88    def ca(self, cert='root', out='localhost'):
89        subprocess.check_output(
90            [
91                'openssl',
92                'ca',
93                '-batch',
94                '-config',
95                option.temp_dir + '/ca.conf',
96                '-keyfile',
97                option.temp_dir + '/' + cert + '.key',
98                '-cert',
99                option.temp_dir + '/' + cert + '.crt',
100                '-in',
101                option.temp_dir + '/' + out + '.csr',
102                '-out',
103                option.temp_dir + '/' + out + '.crt',
104            ],
105            stderr=subprocess.STDOUT,
106        )
107
108    def set_certificate_req_context(self, cert='root'):
109        self.context = ssl.create_default_context()
110        self.context.check_hostname = False
111        self.context.verify_mode = ssl.CERT_REQUIRED
112        self.context.load_verify_locations(
113            option.temp_dir + '/' + cert + '.crt'
114        )
115
116    def test_tls_listener_option_add(self):
117        self.load('empty')
118
119        self.certificate()
120
121        self.add_tls()
122
123        assert self.get_ssl()['status'] == 200, 'add listener option'
124
125    def test_tls_listener_option_remove(self):
126        self.load('empty')
127
128        self.certificate()
129
130        self.add_tls()
131
132        self.get_ssl()
133
134        self.remove_tls()
135
136        assert self.get()['status'] == 200, 'remove listener option'
137
138    def test_tls_certificate_remove(self):
139        self.load('empty')
140
141        self.certificate()
142
143        assert 'success' in self.conf_delete(
144            '/certificates/default'
145        ), 'remove certificate'
146
147    def test_tls_certificate_remove_used(self):
148        self.load('empty')
149
150        self.certificate()
151
152        self.add_tls()
153
154        assert 'error' in self.conf_delete(
155            '/certificates/default'
156        ), 'remove certificate'
157
158    def test_tls_certificate_remove_nonexisting(self):
159        self.load('empty')
160
161        self.certificate()
162
163        self.add_tls()
164
165        assert 'error' in self.conf_delete(
166            '/certificates/blah'
167        ), 'remove nonexistings certificate'
168
169    @pytest.mark.skip('not yet')
170    def test_tls_certificate_update(self):
171        self.load('empty')
172
173        self.certificate()
174
175        self.add_tls()
176
177        cert_old = ssl.get_server_certificate(('127.0.0.1', 7080))
178
179        self.certificate()
180
181        assert cert_old != ssl.get_server_certificate(
182            ('127.0.0.1', 7080)
183        ), 'update certificate'
184
185    @pytest.mark.skip('not yet')
186    def test_tls_certificate_key_incorrect(self):
187        self.load('empty')
188
189        self.certificate('first', False)
190        self.certificate('second', False)
191
192        assert 'error' in self.certificate_load(
193            'first', 'second'
194        ), 'key incorrect'
195
196    def test_tls_certificate_change(self):
197        self.load('empty')
198
199        self.certificate()
200        self.certificate('new')
201
202        self.add_tls()
203
204        cert_old = ssl.get_server_certificate(('127.0.0.1', 7080))
205
206        self.add_tls(cert='new')
207
208        assert cert_old != ssl.get_server_certificate(
209            ('127.0.0.1', 7080)
210        ), 'change certificate'
211
212    def test_tls_certificate_key_rsa(self):
213        self.load('empty')
214
215        self.certificate()
216
217        assert (
218            self.conf_get('/certificates/default/key') == 'RSA (2048 bits)'
219        ), 'certificate key rsa'
220
221    def test_tls_certificate_key_ec(self, temp_dir):
222        self.load('empty')
223
224        self.openssl_conf()
225
226        subprocess.check_output(
227            [
228                'openssl',
229                'ecparam',
230                '-noout',
231                '-genkey',
232                '-out',
233                temp_dir + '/ec.key',
234                '-name',
235                'prime256v1',
236            ],
237            stderr=subprocess.STDOUT,
238        )
239
240        subprocess.check_output(
241            [
242                'openssl',
243                'req',
244                '-x509',
245                '-new',
246                '-subj',
247                '/CN=ec/',
248                '-config',
249                temp_dir + '/openssl.conf',
250                '-key',
251                temp_dir + '/ec.key',
252                '-out',
253                temp_dir + '/ec.crt',
254            ],
255            stderr=subprocess.STDOUT,
256        )
257
258        self.certificate_load('ec')
259
260        assert (
261            self.conf_get('/certificates/ec/key') == 'ECDH'
262        ), 'certificate key ec'
263
264    def test_tls_certificate_chain_options(self):
265        self.load('empty')
266
267        self.certificate()
268
269        chain = self.conf_get('/certificates/default/chain')
270
271        assert len(chain) == 1, 'certificate chain length'
272
273        cert = chain[0]
274
275        assert (
276            cert['subject']['common_name'] == 'default'
277        ), 'certificate subject common name'
278        assert (
279            cert['issuer']['common_name'] == 'default'
280        ), 'certificate issuer common name'
281
282        assert (
283            abs(
284                self.sec_epoch()
285                - self.openssl_date_to_sec_epoch(cert['validity']['since'])
286            )
287            < 60
288        ), 'certificate validity since'
289        assert (
290            self.openssl_date_to_sec_epoch(cert['validity']['until'])
291            - self.openssl_date_to_sec_epoch(cert['validity']['since'])
292            == 2592000
293        ), 'certificate validity until'
294
295    def test_tls_certificate_chain(self, temp_dir):
296        self.load('empty')
297
298        self.certificate('root', False)
299
300        self.req('int')
301        self.req('end')
302
303        self.generate_ca_conf()
304
305        self.ca(cert='root', out='int')
306        self.ca(cert='int', out='end')
307
308        crt_path = temp_dir + '/end-int.crt'
309        end_path = temp_dir + '/end.crt'
310        int_path = temp_dir + '/int.crt'
311
312        with open(crt_path, 'wb') as crt, open(end_path, 'rb') as end, open(
313            int_path, 'rb'
314        ) as int:
315            crt.write(end.read() + int.read())
316
317        self.set_certificate_req_context()
318
319        # incomplete chain
320
321        assert 'success' in self.certificate_load(
322            'end', 'end'
323        ), 'certificate chain end upload'
324
325        chain = self.conf_get('/certificates/end/chain')
326        assert len(chain) == 1, 'certificate chain end length'
327        assert (
328            chain[0]['subject']['common_name'] == 'end'
329        ), 'certificate chain end subject common name'
330        assert (
331            chain[0]['issuer']['common_name'] == 'int'
332        ), 'certificate chain end issuer common name'
333
334        self.add_tls(cert='end')
335
336        try:
337            resp = self.get_ssl()
338        except ssl.SSLError:
339            resp = None
340
341        assert resp == None, 'certificate chain incomplete chain'
342
343        # intermediate
344
345        assert 'success' in self.certificate_load(
346            'int', 'int'
347        ), 'certificate chain int upload'
348
349        chain = self.conf_get('/certificates/int/chain')
350        assert len(chain) == 1, 'certificate chain int length'
351        assert (
352            chain[0]['subject']['common_name'] == 'int'
353        ), 'certificate chain int subject common name'
354        assert (
355            chain[0]['issuer']['common_name'] == 'root'
356        ), 'certificate chain int issuer common name'
357
358        self.add_tls(cert='int')
359
360        assert self.get_ssl()['status'] == 200, 'certificate chain intermediate'
361
362        # intermediate server
363
364        assert 'success' in self.certificate_load(
365            'end-int', 'end'
366        ), 'certificate chain end-int upload'
367
368        chain = self.conf_get('/certificates/end-int/chain')
369        assert len(chain) == 2, 'certificate chain end-int length'
370        assert (
371            chain[0]['subject']['common_name'] == 'end'
372        ), 'certificate chain end-int int subject common name'
373        assert (
374            chain[0]['issuer']['common_name'] == 'int'
375        ), 'certificate chain end-int int issuer common name'
376        assert (
377            chain[1]['subject']['common_name'] == 'int'
378        ), 'certificate chain end-int end subject common name'
379        assert (
380            chain[1]['issuer']['common_name'] == 'root'
381        ), 'certificate chain end-int end issuer common name'
382
383        self.add_tls(cert='end-int')
384
385        assert (
386            self.get_ssl()['status'] == 200
387        ), 'certificate chain intermediate server'
388
389    def test_tls_certificate_chain_long(self, temp_dir):
390        self.load('empty')
391
392        self.generate_ca_conf()
393
394        # Minimum chain length is 3.
395        chain_length = 10
396
397        for i in range(chain_length):
398            if i == 0:
399                self.certificate('root', False)
400            elif i == chain_length - 1:
401                self.req('end')
402            else:
403                self.req('int{}'.format(i))
404
405        for i in range(chain_length - 1):
406            if i == 0:
407                self.ca(cert='root', out='int1')
408            elif i == chain_length - 2:
409                self.ca(cert='int{}'.format(chain_length - 2), out='end')
410            else:
411                self.ca(cert='int{}'.format(i), out='int{}'.format(i + 1))
412
413        for i in range(chain_length - 1, 0, -1):
414            path = temp_dir + (
415                '/end.crt' if i == chain_length - 1 else '/int{}.crt'.format(i)
416            )
417
418            with open(temp_dir + '/all.crt', 'a') as chain, open(path) as cert:
419                chain.write(cert.read())
420
421        self.set_certificate_req_context()
422
423        assert 'success' in self.certificate_load(
424            'all', 'end'
425        ), 'certificate chain upload'
426
427        chain = self.conf_get('/certificates/all/chain')
428        assert len(chain) == chain_length - 1, 'certificate chain length'
429
430        self.add_tls(cert='all')
431
432        assert self.get_ssl()['status'] == 200, 'certificate chain long'
433
434    def test_tls_certificate_empty_cn(self, temp_dir):
435        self.certificate('root', False)
436
437        self.req(subject='/')
438
439        self.generate_ca_conf()
440        self.ca()
441
442        self.set_certificate_req_context()
443
444        assert 'success' in self.certificate_load('localhost', 'localhost')
445
446        cert = self.conf_get('/certificates/localhost')
447        assert cert['chain'][0]['subject'] == {}, 'empty subject'
448        assert cert['chain'][0]['issuer']['common_name'] == 'root', 'issuer'
449
450    def test_tls_certificate_empty_cn_san(self, temp_dir):
451        self.certificate('root', False)
452
453        self.openssl_conf(
454            rewrite=True, alt_names=["example.com", "www.example.net"]
455        )
456
457        self.req(subject='/')
458
459        self.generate_ca_conf()
460        self.ca()
461
462        self.set_certificate_req_context()
463
464        assert 'success' in self.certificate_load('localhost', 'localhost')
465
466        cert = self.conf_get('/certificates/localhost')
467        assert cert['chain'][0]['subject'] == {
468            'alt_names': ['example.com', 'www.example.net']
469        }, 'subject alt_names'
470        assert cert['chain'][0]['issuer']['common_name'] == 'root', 'issuer'
471
472    def test_tls_certificate_empty_cn_san_ip(self):
473        self.certificate('root', False)
474
475        self.openssl_conf(
476            rewrite=True,
477            alt_names=['example.com', 'www.example.net', 'IP|10.0.0.1'],
478        )
479
480        self.req(subject='/')
481
482        self.generate_ca_conf()
483        self.ca()
484
485        self.set_certificate_req_context()
486
487        assert 'success' in self.certificate_load('localhost', 'localhost')
488
489        cert = self.conf_get('/certificates/localhost')
490        assert cert['chain'][0]['subject'] == {
491            'alt_names': ['example.com', 'www.example.net']
492        }, 'subject alt_names'
493        assert cert['chain'][0]['issuer']['common_name'] == 'root', 'issuer'
494
495    def test_tls_keepalive(self):
496        self.load('mirror')
497
498        assert self.get()['status'] == 200, 'init'
499
500        self.certificate()
501
502        self.add_tls(application='mirror')
503
504        (resp, sock) = self.post_ssl(
505            headers={
506                'Host': 'localhost',
507                'Connection': 'keep-alive',
508            },
509            start=True,
510            body='0123456789',
511            read_timeout=1,
512        )
513
514        assert resp['body'] == '0123456789', 'keepalive 1'
515
516        resp = self.post_ssl(
517            headers={
518                'Host': 'localhost',
519                'Connection': 'close',
520            },
521            sock=sock,
522            body='0123456789',
523        )
524
525        assert resp['body'] == '0123456789', 'keepalive 2'
526
527    def test_tls_no_close_notify(self):
528        self.certificate()
529
530        assert 'success' in self.conf(
531            {
532                "listeners": {
533                    "*:7080": {
534                        "pass": "routes",
535                        "tls": {"certificate": "default"},
536                    }
537                },
538                "routes": [{"action": {"return": 200}}],
539                "applications": {},
540            }
541        ), 'load application configuration'
542
543        (resp, sock) = self.get_ssl(start=True)
544
545        time.sleep(5)
546
547        sock.close()
548
549    @pytest.mark.skip('not yet')
550    def test_tls_keepalive_certificate_remove(self):
551        self.load('empty')
552
553        assert self.get()['status'] == 200, 'init'
554
555        self.certificate()
556
557        self.add_tls()
558
559        (resp, sock) = self.get_ssl(
560            headers={'Host': 'localhost', 'Connection': 'keep-alive'},
561            start=True,
562            read_timeout=1,
563        )
564
565        assert 'success' in self.conf(
566            {"pass": "applications/empty"}, 'listeners/*:7080'
567        )
568        assert 'success' in self.conf_delete('/certificates/default')
569
570        try:
571            resp = self.get_ssl(sock=sock)
572
573        except KeyboardInterrupt:
574            raise
575
576        except:
577            resp = None
578
579        assert resp == None, 'keepalive remove certificate'
580
581    @pytest.mark.skip('not yet')
582    def test_tls_certificates_remove_all(self):
583        self.load('empty')
584
585        self.certificate()
586
587        assert 'success' in self.conf_delete(
588            '/certificates'
589        ), 'remove all certificates'
590
591    def test_tls_application_respawn(self, skip_alert):
592        self.load('mirror')
593
594        self.certificate()
595
596        assert 'success' in self.conf('1', 'applications/mirror/processes')
597
598        self.add_tls(application='mirror')
599
600        (_, sock) = self.post_ssl(
601            headers={
602                'Host': 'localhost',
603                'Connection': 'keep-alive',
604            },
605            start=True,
606            body='0123456789',
607            read_timeout=1,
608        )
609
610        app_id = self.findall(r'(\d+)#\d+ "mirror" application started')[0]
611
612        subprocess.check_output(['kill', '-9', app_id])
613
614        skip_alert(r'process %s exited on signal 9' % app_id)
615
616        self.wait_for_record(
617            r' (?!' + app_id + r'#)(\d+)#\d+ "mirror" application started'
618        )
619
620        resp = self.post_ssl(sock=sock, body='0123456789')
621
622        assert resp['status'] == 200, 'application respawn status'
623        assert resp['body'] == '0123456789', 'application respawn body'
624
625    def test_tls_url_scheme(self):
626        self.load('variables')
627
628        assert (
629            self.post(
630                headers={
631                    'Host': 'localhost',
632                    'Content-Type': 'text/html',
633                    'Custom-Header': '',
634                    'Connection': 'close',
635                }
636            )['headers']['Wsgi-Url-Scheme']
637            == 'http'
638        ), 'url scheme http'
639
640        self.certificate()
641
642        self.add_tls(application='variables')
643
644        assert (
645            self.post_ssl(
646                headers={
647                    'Host': 'localhost',
648                    'Content-Type': 'text/html',
649                    'Custom-Header': '',
650                    'Connection': 'close',
651                }
652            )['headers']['Wsgi-Url-Scheme']
653            == 'https'
654        ), 'url scheme https'
655
656    def test_tls_big_upload(self):
657        self.load('upload')
658
659        self.certificate()
660
661        self.add_tls(application='upload')
662
663        filename = 'test.txt'
664        data = '0123456789' * 9000
665
666        res = self.post_ssl(
667            body={
668                'file': {
669                    'filename': filename,
670                    'type': 'text/plain',
671                    'data': io.StringIO(data),
672                }
673            }
674        )
675        assert res['status'] == 200, 'status ok'
676        assert res['body'] == filename + data
677
678    def test_tls_multi_listener(self):
679        self.load('empty')
680
681        self.certificate()
682
683        self.add_tls()
684        self.add_tls(port=7081)
685
686        assert self.get_ssl()['status'] == 200, 'listener #1'
687
688        assert self.get_ssl(port=7081)['status'] == 200, 'listener #2'
689