xref: /unit/test/test_tls.py (revision 2073:bc6ad31ce286)
1import io
2import re
3import ssl
4import subprocess
5import time
6
7import pytest
8from unit.applications.tls import TestApplicationTLS
9from unit.option import option
10
11
12class TestTLS(TestApplicationTLS):
13    prerequisites = {'modules': {'python': 'any', 'openssl': 'any'}}
14
15    def openssl_date_to_sec_epoch(self, date):
16        return self.date_to_sec_epoch(date, '%b %d %H:%M:%S %Y %Z')
17
18    def add_tls(self, application='empty', cert='default', port=7080):
19        assert 'success' in self.conf(
20            {
21                "pass": "applications/" + application,
22                "tls": {"certificate": cert},
23            },
24            'listeners/*:' + str(port),
25        )
26
27    def remove_tls(self, application='empty', port=7080):
28        assert 'success' in self.conf(
29            {"pass": "applications/" + application}, 'listeners/*:' + str(port)
30        )
31
32    def req(self, name='localhost', subject=None, x509=False):
33        subj = subject if subject is not None else '/CN=' + name + '/'
34
35        subprocess.check_output(
36            [
37                'openssl',
38                'req',
39                '-new',
40                '-subj',
41                subj,
42                '-config',
43                option.temp_dir + '/openssl.conf',
44                '-out',
45                option.temp_dir + '/' + name + '.csr',
46                '-keyout',
47                option.temp_dir + '/' + name + '.key',
48            ],
49            stderr=subprocess.STDOUT,
50        )
51
52    def generate_ca_conf(self):
53        with open(option.temp_dir + '/ca.conf', 'w') as f:
54            f.write(
55                """[ ca ]
56default_ca = myca
57
58[ myca ]
59new_certs_dir = %(dir)s
60database = %(database)s
61default_md = sha256
62policy = myca_policy
63serial = %(certserial)s
64default_days = 1
65x509_extensions = myca_extensions
66copy_extensions = copy
67
68[ myca_policy ]
69commonName = optional
70
71[ myca_extensions ]
72basicConstraints = critical,CA:TRUE"""
73                % {
74                    'dir': option.temp_dir,
75                    'database': option.temp_dir + '/certindex',
76                    'certserial': option.temp_dir + '/certserial',
77                }
78            )
79
80        with open(option.temp_dir + '/certserial', 'w') as f:
81            f.write('1000')
82
83        with open(option.temp_dir + '/certindex', 'w') as f:
84            f.write('')
85
86        with open(option.temp_dir + '/certindex.attr', 'w') as f:
87            f.write('')
88
89    def ca(self, cert='root', out='localhost'):
90        subprocess.check_output(
91            [
92                'openssl',
93                'ca',
94                '-batch',
95                '-config',
96                option.temp_dir + '/ca.conf',
97                '-keyfile',
98                option.temp_dir + '/' + cert + '.key',
99                '-cert',
100                option.temp_dir + '/' + cert + '.crt',
101                '-in',
102                option.temp_dir + '/' + out + '.csr',
103                '-out',
104                option.temp_dir + '/' + out + '.crt',
105            ],
106            stderr=subprocess.STDOUT,
107        )
108
109    def set_certificate_req_context(self, cert='root'):
110        self.context = ssl.create_default_context()
111        self.context.check_hostname = False
112        self.context.verify_mode = ssl.CERT_REQUIRED
113        self.context.load_verify_locations(
114            option.temp_dir + '/' + cert + '.crt'
115        )
116
117    def test_tls_listener_option_add(self):
118        self.load('empty')
119
120        self.certificate()
121
122        self.add_tls()
123
124        assert self.get_ssl()['status'] == 200, 'add listener option'
125
126    def test_tls_listener_option_remove(self):
127        self.load('empty')
128
129        self.certificate()
130
131        self.add_tls()
132
133        self.get_ssl()
134
135        self.remove_tls()
136
137        assert self.get()['status'] == 200, 'remove listener option'
138
139    def test_tls_certificate_remove(self):
140        self.load('empty')
141
142        self.certificate()
143
144        assert 'success' in self.conf_delete(
145            '/certificates/default'
146        ), 'remove certificate'
147
148    def test_tls_certificate_remove_used(self):
149        self.load('empty')
150
151        self.certificate()
152
153        self.add_tls()
154
155        assert 'error' in self.conf_delete(
156            '/certificates/default'
157        ), 'remove certificate'
158
159    def test_tls_certificate_remove_nonexisting(self):
160        self.load('empty')
161
162        self.certificate()
163
164        self.add_tls()
165
166        assert 'error' in self.conf_delete(
167            '/certificates/blah'
168        ), 'remove nonexistings certificate'
169
170    @pytest.mark.skip('not yet')
171    def test_tls_certificate_update(self):
172        self.load('empty')
173
174        self.certificate()
175
176        self.add_tls()
177
178        cert_old = ssl.get_server_certificate(('127.0.0.1', 7080))
179
180        self.certificate()
181
182        assert cert_old != ssl.get_server_certificate(
183            ('127.0.0.1', 7080)
184        ), 'update certificate'
185
186    @pytest.mark.skip('not yet')
187    def test_tls_certificate_key_incorrect(self):
188        self.load('empty')
189
190        self.certificate('first', False)
191        self.certificate('second', False)
192
193        assert 'error' in self.certificate_load(
194            'first', 'second'
195        ), 'key incorrect'
196
197    def test_tls_certificate_change(self):
198        self.load('empty')
199
200        self.certificate()
201        self.certificate('new')
202
203        self.add_tls()
204
205        cert_old = ssl.get_server_certificate(('127.0.0.1', 7080))
206
207        self.add_tls(cert='new')
208
209        assert cert_old != ssl.get_server_certificate(
210            ('127.0.0.1', 7080)
211        ), 'change certificate'
212
213    def test_tls_certificate_key_rsa(self):
214        self.load('empty')
215
216        self.certificate()
217
218        assert (
219            self.conf_get('/certificates/default/key') == 'RSA (2048 bits)'
220        ), 'certificate key rsa'
221
222    def test_tls_certificate_key_ec(self, temp_dir):
223        self.load('empty')
224
225        self.openssl_conf()
226
227        subprocess.check_output(
228            [
229                'openssl',
230                'ecparam',
231                '-noout',
232                '-genkey',
233                '-out',
234                temp_dir + '/ec.key',
235                '-name',
236                'prime256v1',
237            ],
238            stderr=subprocess.STDOUT,
239        )
240
241        subprocess.check_output(
242            [
243                'openssl',
244                'req',
245                '-x509',
246                '-new',
247                '-subj',
248                '/CN=ec/',
249                '-config',
250                temp_dir + '/openssl.conf',
251                '-key',
252                temp_dir + '/ec.key',
253                '-out',
254                temp_dir + '/ec.crt',
255            ],
256            stderr=subprocess.STDOUT,
257        )
258
259        self.certificate_load('ec')
260
261        assert (
262            self.conf_get('/certificates/ec/key') == 'ECDH'
263        ), 'certificate key ec'
264
265    def test_tls_certificate_chain_options(self):
266        self.load('empty')
267
268        self.certificate()
269
270        chain = self.conf_get('/certificates/default/chain')
271
272        assert len(chain) == 1, 'certificate chain length'
273
274        cert = chain[0]
275
276        assert (
277            cert['subject']['common_name'] == 'default'
278        ), 'certificate subject common name'
279        assert (
280            cert['issuer']['common_name'] == 'default'
281        ), 'certificate issuer common name'
282
283        assert (
284            abs(
285                self.sec_epoch()
286                - self.openssl_date_to_sec_epoch(cert['validity']['since'])
287            )
288            < 60
289        ), 'certificate validity since'
290        assert (
291            self.openssl_date_to_sec_epoch(cert['validity']['until'])
292            - self.openssl_date_to_sec_epoch(cert['validity']['since'])
293            == 2592000
294        ), 'certificate validity until'
295
296    def test_tls_certificate_chain(self, temp_dir):
297        self.load('empty')
298
299        self.certificate('root', False)
300
301        self.req('int')
302        self.req('end')
303
304        self.generate_ca_conf()
305
306        self.ca(cert='root', out='int')
307        self.ca(cert='int', out='end')
308
309        crt_path = temp_dir + '/end-int.crt'
310        end_path = temp_dir + '/end.crt'
311        int_path = temp_dir + '/int.crt'
312
313        with open(crt_path, 'wb') as crt, open(end_path, 'rb') as end, open(
314            int_path, 'rb'
315        ) as int:
316            crt.write(end.read() + int.read())
317
318        self.set_certificate_req_context()
319
320        # incomplete chain
321
322        assert 'success' in self.certificate_load(
323            'end', 'end'
324        ), 'certificate chain end upload'
325
326        chain = self.conf_get('/certificates/end/chain')
327        assert len(chain) == 1, 'certificate chain end length'
328        assert (
329            chain[0]['subject']['common_name'] == 'end'
330        ), 'certificate chain end subject common name'
331        assert (
332            chain[0]['issuer']['common_name'] == 'int'
333        ), 'certificate chain end issuer common name'
334
335        self.add_tls(cert='end')
336
337        try:
338            resp = self.get_ssl()
339        except ssl.SSLError:
340            resp = None
341
342        assert resp == None, 'certificate chain incomplete chain'
343
344        # intermediate
345
346        assert 'success' in self.certificate_load(
347            'int', 'int'
348        ), 'certificate chain int upload'
349
350        chain = self.conf_get('/certificates/int/chain')
351        assert len(chain) == 1, 'certificate chain int length'
352        assert (
353            chain[0]['subject']['common_name'] == 'int'
354        ), 'certificate chain int subject common name'
355        assert (
356            chain[0]['issuer']['common_name'] == 'root'
357        ), 'certificate chain int issuer common name'
358
359        self.add_tls(cert='int')
360
361        assert self.get_ssl()['status'] == 200, 'certificate chain intermediate'
362
363        # intermediate server
364
365        assert 'success' in self.certificate_load(
366            'end-int', 'end'
367        ), 'certificate chain end-int upload'
368
369        chain = self.conf_get('/certificates/end-int/chain')
370        assert len(chain) == 2, 'certificate chain end-int length'
371        assert (
372            chain[0]['subject']['common_name'] == 'end'
373        ), 'certificate chain end-int int subject common name'
374        assert (
375            chain[0]['issuer']['common_name'] == 'int'
376        ), 'certificate chain end-int int issuer common name'
377        assert (
378            chain[1]['subject']['common_name'] == 'int'
379        ), 'certificate chain end-int end subject common name'
380        assert (
381            chain[1]['issuer']['common_name'] == 'root'
382        ), 'certificate chain end-int end issuer common name'
383
384        self.add_tls(cert='end-int')
385
386        assert (
387            self.get_ssl()['status'] == 200
388        ), 'certificate chain intermediate server'
389
390    def test_tls_certificate_chain_long(self, temp_dir):
391        self.load('empty')
392
393        self.generate_ca_conf()
394
395        # Minimum chain length is 3.
396        chain_length = 10
397
398        for i in range(chain_length):
399            if i == 0:
400                self.certificate('root', False)
401            elif i == chain_length - 1:
402                self.req('end')
403            else:
404                self.req('int{}'.format(i))
405
406        for i in range(chain_length - 1):
407            if i == 0:
408                self.ca(cert='root', out='int1')
409            elif i == chain_length - 2:
410                self.ca(cert='int{}'.format(chain_length - 2), out='end')
411            else:
412                self.ca(cert='int{}'.format(i), out='int{}'.format(i + 1))
413
414        for i in range(chain_length - 1, 0, -1):
415            path = temp_dir + (
416                '/end.crt' if i == chain_length - 1 else '/int{}.crt'.format(i)
417            )
418
419            with open(temp_dir + '/all.crt', 'a') as chain, open(path) as cert:
420                chain.write(cert.read())
421
422        self.set_certificate_req_context()
423
424        assert 'success' in self.certificate_load(
425            'all', 'end'
426        ), 'certificate chain upload'
427
428        chain = self.conf_get('/certificates/all/chain')
429        assert len(chain) == chain_length - 1, 'certificate chain length'
430
431        self.add_tls(cert='all')
432
433        assert self.get_ssl()['status'] == 200, 'certificate chain long'
434
435    def test_tls_certificate_empty_cn(self, temp_dir):
436        self.certificate('root', False)
437
438        self.req(subject='/')
439
440        self.generate_ca_conf()
441        self.ca()
442
443        self.set_certificate_req_context()
444
445        assert 'success' in self.certificate_load('localhost', 'localhost')
446
447        cert = self.conf_get('/certificates/localhost')
448        assert cert['chain'][0]['subject'] == {}, 'empty subject'
449        assert cert['chain'][0]['issuer']['common_name'] == 'root', 'issuer'
450
451    def test_tls_certificate_empty_cn_san(self, temp_dir):
452        self.certificate('root', False)
453
454        self.openssl_conf(
455            rewrite=True, alt_names=["example.com", "www.example.net"]
456        )
457
458        self.req(subject='/')
459
460        self.generate_ca_conf()
461        self.ca()
462
463        self.set_certificate_req_context()
464
465        assert 'success' in self.certificate_load('localhost', 'localhost')
466
467        cert = self.conf_get('/certificates/localhost')
468        assert cert['chain'][0]['subject'] == {
469            'alt_names': ['example.com', 'www.example.net']
470        }, 'subject alt_names'
471        assert cert['chain'][0]['issuer']['common_name'] == 'root', 'issuer'
472
473    def test_tls_certificate_empty_cn_san_ip(self):
474        self.certificate('root', False)
475
476        self.openssl_conf(
477            rewrite=True,
478            alt_names=['example.com', 'www.example.net', 'IP|10.0.0.1'],
479        )
480
481        self.req(subject='/')
482
483        self.generate_ca_conf()
484        self.ca()
485
486        self.set_certificate_req_context()
487
488        assert 'success' in self.certificate_load('localhost', 'localhost')
489
490        cert = self.conf_get('/certificates/localhost')
491        assert cert['chain'][0]['subject'] == {
492            'alt_names': ['example.com', 'www.example.net']
493        }, 'subject alt_names'
494        assert cert['chain'][0]['issuer']['common_name'] == 'root', 'issuer'
495
496    @pytest.mark.skip('not yet')
497    def test_tls_reconfigure(self):
498        self.load('empty')
499
500        assert self.get()['status'] == 200, 'init'
501
502        self.certificate()
503
504        (resp, sock) = self.get(
505            headers={'Host': 'localhost', 'Connection': 'keep-alive'},
506            start=True,
507            read_timeout=1,
508        )
509
510        assert resp['status'] == 200, 'initial status'
511
512        self.add_tls()
513
514        assert self.get(sock=sock)['status'] == 200, 'reconfigure status'
515        assert self.get_ssl()['status'] == 200, 'reconfigure tls status'
516
517    def test_tls_keepalive(self):
518        self.load('mirror')
519
520        assert self.get()['status'] == 200, 'init'
521
522        self.certificate()
523
524        self.add_tls(application='mirror')
525
526        (resp, sock) = self.post_ssl(
527            headers={
528                'Host': 'localhost',
529                'Connection': 'keep-alive',
530                'Content-Type': 'text/html',
531            },
532            start=True,
533            body='0123456789',
534            read_timeout=1,
535        )
536
537        assert resp['body'] == '0123456789', 'keepalive 1'
538
539        resp = self.post_ssl(
540            headers={
541                'Host': 'localhost',
542                'Connection': 'close',
543                'Content-Type': 'text/html',
544            },
545            sock=sock,
546            body='0123456789',
547        )
548
549        assert resp['body'] == '0123456789', 'keepalive 2'
550
551    def test_tls_no_close_notify(self):
552        self.certificate()
553
554        assert 'success' in self.conf(
555            {
556                "listeners": {
557                    "*:7080": {
558                        "pass": "routes",
559                        "tls": {"certificate": "default"},
560                    }
561                },
562                "routes": [{"action": {"return": 200}}],
563                "applications": {},
564            }
565        ), 'load application configuration'
566
567        (resp, sock) = self.get_ssl(start=True)
568
569        time.sleep(5)
570
571        sock.close()
572
573    @pytest.mark.skip('not yet')
574    def test_tls_keepalive_certificate_remove(self):
575        self.load('empty')
576
577        assert self.get()['status'] == 200, 'init'
578
579        self.certificate()
580
581        self.add_tls()
582
583        (resp, sock) = self.get_ssl(
584            headers={'Host': 'localhost', 'Connection': 'keep-alive'},
585            start=True,
586            read_timeout=1,
587        )
588
589        assert 'success' in self.conf(
590            {"pass": "applications/empty"}, 'listeners/*:7080'
591        )
592        assert 'success' in self.conf_delete('/certificates/default')
593
594        try:
595            resp = self.get_ssl(
596                headers={'Host': 'localhost', 'Connection': 'close'}, sock=sock
597            )
598
599        except KeyboardInterrupt:
600            raise
601
602        except:
603            resp = None
604
605        assert resp == None, 'keepalive remove certificate'
606
607    @pytest.mark.skip('not yet')
608    def test_tls_certificates_remove_all(self):
609        self.load('empty')
610
611        self.certificate()
612
613        assert 'success' in self.conf_delete(
614            '/certificates'
615        ), 'remove all certificates'
616
617    def test_tls_application_respawn(self, skip_alert):
618        self.load('mirror')
619
620        self.certificate()
621
622        assert 'success' in self.conf('1', 'applications/mirror/processes')
623
624        self.add_tls(application='mirror')
625
626        (_, sock) = self.post_ssl(
627            headers={
628                'Host': 'localhost',
629                'Connection': 'keep-alive',
630                'Content-Type': 'text/html',
631            },
632            start=True,
633            body='0123456789',
634            read_timeout=1,
635        )
636
637        app_id = self.findall(r'(\d+)#\d+ "mirror" application started')[0]
638
639        subprocess.check_output(['kill', '-9', app_id])
640
641        skip_alert(r'process .* %s.* exited on signal 9' % app_id)
642
643        self.wait_for_record(
644            re.compile(
645                r' (?!' + app_id + r'#)(\d+)#\d+ "mirror" application started'
646            )
647        )
648
649        resp = self.post_ssl(
650            headers={
651                'Host': 'localhost',
652                'Connection': 'close',
653                'Content-Type': 'text/html',
654            },
655            sock=sock,
656            body='0123456789',
657        )
658
659        assert resp['status'] == 200, 'application respawn status'
660        assert resp['body'] == '0123456789', 'application respawn body'
661
662    def test_tls_url_scheme(self):
663        self.load('variables')
664
665        assert (
666            self.post(
667                headers={
668                    'Host': 'localhost',
669                    'Content-Type': 'text/html',
670                    'Custom-Header': '',
671                    'Connection': 'close',
672                }
673            )['headers']['Wsgi-Url-Scheme']
674            == 'http'
675        ), 'url scheme http'
676
677        self.certificate()
678
679        self.add_tls(application='variables')
680
681        assert (
682            self.post_ssl(
683                headers={
684                    'Host': 'localhost',
685                    'Content-Type': 'text/html',
686                    'Custom-Header': '',
687                    'Connection': 'close',
688                }
689            )['headers']['Wsgi-Url-Scheme']
690            == 'https'
691        ), 'url scheme https'
692
693    def test_tls_big_upload(self):
694        self.load('upload')
695
696        self.certificate()
697
698        self.add_tls(application='upload')
699
700        filename = 'test.txt'
701        data = '0123456789' * 9000
702
703        res = self.post_ssl(
704            body={
705                'file': {
706                    'filename': filename,
707                    'type': 'text/plain',
708                    'data': io.StringIO(data),
709                }
710            }
711        )
712        assert res['status'] == 200, 'status ok'
713        assert res['body'] == filename + data
714
715    def test_tls_multi_listener(self):
716        self.load('empty')
717
718        self.certificate()
719
720        self.add_tls()
721        self.add_tls(port=7081)
722
723        assert self.get_ssl()['status'] == 200, 'listener #1'
724
725        assert self.get_ssl(port=7081)['status'] == 200, 'listener #2'
726