xref: /unit/test/test_tls.py (revision 2144:b14caaedca5e)
1import io
2import re
3import ssl
4import subprocess
5import time
6
7import pytest
8from unit.applications.tls import TestApplicationTLS
9from unit.option import option
10
11
12class TestTLS(TestApplicationTLS):
13    prerequisites = {'modules': {'python': 'any', 'openssl': 'any'}}
14
15    def openssl_date_to_sec_epoch(self, date):
16        return self.date_to_sec_epoch(date, '%b %d %X %Y %Z')
17
18    def add_tls(self, application='empty', cert='default', port=7080):
19        assert 'success' in self.conf(
20            {
21                "pass": "applications/" + application,
22                "tls": {"certificate": cert},
23            },
24            'listeners/*:' + str(port),
25        )
26
27    def remove_tls(self, application='empty', port=7080):
28        assert 'success' in self.conf(
29            {"pass": "applications/" + application}, 'listeners/*:' + str(port)
30        )
31
32    def req(self, name='localhost', subject=None, x509=False):
33        subj = subject if subject is not None else '/CN=' + name + '/'
34
35        subprocess.check_output(
36            [
37                'openssl',
38                'req',
39                '-new',
40                '-subj',
41                subj,
42                '-config',
43                option.temp_dir + '/openssl.conf',
44                '-out',
45                option.temp_dir + '/' + name + '.csr',
46                '-keyout',
47                option.temp_dir + '/' + name + '.key',
48            ],
49            stderr=subprocess.STDOUT,
50        )
51
52    def generate_ca_conf(self):
53        with open(option.temp_dir + '/ca.conf', 'w') as f:
54            f.write(
55                """[ ca ]
56default_ca = myca
57
58[ myca ]
59new_certs_dir = %(dir)s
60database = %(database)s
61default_md = sha256
62policy = myca_policy
63serial = %(certserial)s
64default_days = 1
65x509_extensions = myca_extensions
66copy_extensions = copy
67
68[ myca_policy ]
69commonName = optional
70
71[ myca_extensions ]
72basicConstraints = critical,CA:TRUE"""
73                % {
74                    'dir': option.temp_dir,
75                    'database': option.temp_dir + '/certindex',
76                    'certserial': option.temp_dir + '/certserial',
77                }
78            )
79
80        with open(option.temp_dir + '/certserial', 'w') as f:
81            f.write('1000')
82
83        with open(option.temp_dir + '/certindex', 'w') as f:
84            f.write('')
85
86        with open(option.temp_dir + '/certindex.attr', 'w') as f:
87            f.write('')
88
89    def ca(self, cert='root', out='localhost'):
90        subprocess.check_output(
91            [
92                'openssl',
93                'ca',
94                '-batch',
95                '-config',
96                option.temp_dir + '/ca.conf',
97                '-keyfile',
98                option.temp_dir + '/' + cert + '.key',
99                '-cert',
100                option.temp_dir + '/' + cert + '.crt',
101                '-in',
102                option.temp_dir + '/' + out + '.csr',
103                '-out',
104                option.temp_dir + '/' + out + '.crt',
105            ],
106            stderr=subprocess.STDOUT,
107        )
108
109    def set_certificate_req_context(self, cert='root'):
110        self.context = ssl.create_default_context()
111        self.context.check_hostname = False
112        self.context.verify_mode = ssl.CERT_REQUIRED
113        self.context.load_verify_locations(
114            option.temp_dir + '/' + cert + '.crt'
115        )
116
117    def test_tls_listener_option_add(self):
118        self.load('empty')
119
120        self.certificate()
121
122        self.add_tls()
123
124        assert self.get_ssl()['status'] == 200, 'add listener option'
125
126    def test_tls_listener_option_remove(self):
127        self.load('empty')
128
129        self.certificate()
130
131        self.add_tls()
132
133        self.get_ssl()
134
135        self.remove_tls()
136
137        assert self.get()['status'] == 200, 'remove listener option'
138
139    def test_tls_certificate_remove(self):
140        self.load('empty')
141
142        self.certificate()
143
144        assert 'success' in self.conf_delete(
145            '/certificates/default'
146        ), 'remove certificate'
147
148    def test_tls_certificate_remove_used(self):
149        self.load('empty')
150
151        self.certificate()
152
153        self.add_tls()
154
155        assert 'error' in self.conf_delete(
156            '/certificates/default'
157        ), 'remove certificate'
158
159    def test_tls_certificate_remove_nonexisting(self):
160        self.load('empty')
161
162        self.certificate()
163
164        self.add_tls()
165
166        assert 'error' in self.conf_delete(
167            '/certificates/blah'
168        ), 'remove nonexistings certificate'
169
170    @pytest.mark.skip('not yet')
171    def test_tls_certificate_update(self):
172        self.load('empty')
173
174        self.certificate()
175
176        self.add_tls()
177
178        cert_old = ssl.get_server_certificate(('127.0.0.1', 7080))
179
180        self.certificate()
181
182        assert cert_old != ssl.get_server_certificate(
183            ('127.0.0.1', 7080)
184        ), 'update certificate'
185
186    @pytest.mark.skip('not yet')
187    def test_tls_certificate_key_incorrect(self):
188        self.load('empty')
189
190        self.certificate('first', False)
191        self.certificate('second', False)
192
193        assert 'error' in self.certificate_load(
194            'first', 'second'
195        ), 'key incorrect'
196
197    def test_tls_certificate_change(self):
198        self.load('empty')
199
200        self.certificate()
201        self.certificate('new')
202
203        self.add_tls()
204
205        cert_old = ssl.get_server_certificate(('127.0.0.1', 7080))
206
207        self.add_tls(cert='new')
208
209        assert cert_old != ssl.get_server_certificate(
210            ('127.0.0.1', 7080)
211        ), 'change certificate'
212
213    def test_tls_certificate_key_rsa(self):
214        self.load('empty')
215
216        self.certificate()
217
218        assert (
219            self.conf_get('/certificates/default/key') == 'RSA (2048 bits)'
220        ), 'certificate key rsa'
221
222    def test_tls_certificate_key_ec(self, temp_dir):
223        self.load('empty')
224
225        self.openssl_conf()
226
227        subprocess.check_output(
228            [
229                'openssl',
230                'ecparam',
231                '-noout',
232                '-genkey',
233                '-out',
234                temp_dir + '/ec.key',
235                '-name',
236                'prime256v1',
237            ],
238            stderr=subprocess.STDOUT,
239        )
240
241        subprocess.check_output(
242            [
243                'openssl',
244                'req',
245                '-x509',
246                '-new',
247                '-subj',
248                '/CN=ec/',
249                '-config',
250                temp_dir + '/openssl.conf',
251                '-key',
252                temp_dir + '/ec.key',
253                '-out',
254                temp_dir + '/ec.crt',
255            ],
256            stderr=subprocess.STDOUT,
257        )
258
259        self.certificate_load('ec')
260
261        assert (
262            self.conf_get('/certificates/ec/key') == 'ECDH'
263        ), 'certificate key ec'
264
265    def test_tls_certificate_chain_options(self):
266        self.load('empty')
267
268        self.certificate()
269
270        chain = self.conf_get('/certificates/default/chain')
271
272        assert len(chain) == 1, 'certificate chain length'
273
274        cert = chain[0]
275
276        assert (
277            cert['subject']['common_name'] == 'default'
278        ), 'certificate subject common name'
279        assert (
280            cert['issuer']['common_name'] == 'default'
281        ), 'certificate issuer common name'
282
283        assert (
284            abs(
285                self.sec_epoch()
286                - self.openssl_date_to_sec_epoch(cert['validity']['since'])
287            )
288            < 60
289        ), 'certificate validity since'
290        assert (
291            self.openssl_date_to_sec_epoch(cert['validity']['until'])
292            - self.openssl_date_to_sec_epoch(cert['validity']['since'])
293            == 2592000
294        ), 'certificate validity until'
295
296    def test_tls_certificate_chain(self, temp_dir):
297        self.load('empty')
298
299        self.certificate('root', False)
300
301        self.req('int')
302        self.req('end')
303
304        self.generate_ca_conf()
305
306        self.ca(cert='root', out='int')
307        self.ca(cert='int', out='end')
308
309        crt_path = temp_dir + '/end-int.crt'
310        end_path = temp_dir + '/end.crt'
311        int_path = temp_dir + '/int.crt'
312
313        with open(crt_path, 'wb') as crt, open(end_path, 'rb') as end, open(
314            int_path, 'rb'
315        ) as int:
316            crt.write(end.read() + int.read())
317
318        self.set_certificate_req_context()
319
320        # incomplete chain
321
322        assert 'success' in self.certificate_load(
323            'end', 'end'
324        ), 'certificate chain end upload'
325
326        chain = self.conf_get('/certificates/end/chain')
327        assert len(chain) == 1, 'certificate chain end length'
328        assert (
329            chain[0]['subject']['common_name'] == 'end'
330        ), 'certificate chain end subject common name'
331        assert (
332            chain[0]['issuer']['common_name'] == 'int'
333        ), 'certificate chain end issuer common name'
334
335        self.add_tls(cert='end')
336
337        try:
338            resp = self.get_ssl()
339        except ssl.SSLError:
340            resp = None
341
342        assert resp == None, 'certificate chain incomplete chain'
343
344        # intermediate
345
346        assert 'success' in self.certificate_load(
347            'int', 'int'
348        ), 'certificate chain int upload'
349
350        chain = self.conf_get('/certificates/int/chain')
351        assert len(chain) == 1, 'certificate chain int length'
352        assert (
353            chain[0]['subject']['common_name'] == 'int'
354        ), 'certificate chain int subject common name'
355        assert (
356            chain[0]['issuer']['common_name'] == 'root'
357        ), 'certificate chain int issuer common name'
358
359        self.add_tls(cert='int')
360
361        assert self.get_ssl()['status'] == 200, 'certificate chain intermediate'
362
363        # intermediate server
364
365        assert 'success' in self.certificate_load(
366            'end-int', 'end'
367        ), 'certificate chain end-int upload'
368
369        chain = self.conf_get('/certificates/end-int/chain')
370        assert len(chain) == 2, 'certificate chain end-int length'
371        assert (
372            chain[0]['subject']['common_name'] == 'end'
373        ), 'certificate chain end-int int subject common name'
374        assert (
375            chain[0]['issuer']['common_name'] == 'int'
376        ), 'certificate chain end-int int issuer common name'
377        assert (
378            chain[1]['subject']['common_name'] == 'int'
379        ), 'certificate chain end-int end subject common name'
380        assert (
381            chain[1]['issuer']['common_name'] == 'root'
382        ), 'certificate chain end-int end issuer common name'
383
384        self.add_tls(cert='end-int')
385
386        assert (
387            self.get_ssl()['status'] == 200
388        ), 'certificate chain intermediate server'
389
390    def test_tls_certificate_chain_long(self, temp_dir):
391        self.load('empty')
392
393        self.generate_ca_conf()
394
395        # Minimum chain length is 3.
396        chain_length = 10
397
398        for i in range(chain_length):
399            if i == 0:
400                self.certificate('root', False)
401            elif i == chain_length - 1:
402                self.req('end')
403            else:
404                self.req('int{}'.format(i))
405
406        for i in range(chain_length - 1):
407            if i == 0:
408                self.ca(cert='root', out='int1')
409            elif i == chain_length - 2:
410                self.ca(cert='int{}'.format(chain_length - 2), out='end')
411            else:
412                self.ca(cert='int{}'.format(i), out='int{}'.format(i + 1))
413
414        for i in range(chain_length - 1, 0, -1):
415            path = temp_dir + (
416                '/end.crt' if i == chain_length - 1 else '/int{}.crt'.format(i)
417            )
418
419            with open(temp_dir + '/all.crt', 'a') as chain, open(path) as cert:
420                chain.write(cert.read())
421
422        self.set_certificate_req_context()
423
424        assert 'success' in self.certificate_load(
425            'all', 'end'
426        ), 'certificate chain upload'
427
428        chain = self.conf_get('/certificates/all/chain')
429        assert len(chain) == chain_length - 1, 'certificate chain length'
430
431        self.add_tls(cert='all')
432
433        assert self.get_ssl()['status'] == 200, 'certificate chain long'
434
435    def test_tls_certificate_empty_cn(self, temp_dir):
436        self.certificate('root', False)
437
438        self.req(subject='/')
439
440        self.generate_ca_conf()
441        self.ca()
442
443        self.set_certificate_req_context()
444
445        assert 'success' in self.certificate_load('localhost', 'localhost')
446
447        cert = self.conf_get('/certificates/localhost')
448        assert cert['chain'][0]['subject'] == {}, 'empty subject'
449        assert cert['chain'][0]['issuer']['common_name'] == 'root', 'issuer'
450
451    def test_tls_certificate_empty_cn_san(self, temp_dir):
452        self.certificate('root', False)
453
454        self.openssl_conf(
455            rewrite=True, alt_names=["example.com", "www.example.net"]
456        )
457
458        self.req(subject='/')
459
460        self.generate_ca_conf()
461        self.ca()
462
463        self.set_certificate_req_context()
464
465        assert 'success' in self.certificate_load('localhost', 'localhost')
466
467        cert = self.conf_get('/certificates/localhost')
468        assert cert['chain'][0]['subject'] == {
469            'alt_names': ['example.com', 'www.example.net']
470        }, 'subject alt_names'
471        assert cert['chain'][0]['issuer']['common_name'] == 'root', 'issuer'
472
473    def test_tls_certificate_empty_cn_san_ip(self):
474        self.certificate('root', False)
475
476        self.openssl_conf(
477            rewrite=True,
478            alt_names=['example.com', 'www.example.net', 'IP|10.0.0.1'],
479        )
480
481        self.req(subject='/')
482
483        self.generate_ca_conf()
484        self.ca()
485
486        self.set_certificate_req_context()
487
488        assert 'success' in self.certificate_load('localhost', 'localhost')
489
490        cert = self.conf_get('/certificates/localhost')
491        assert cert['chain'][0]['subject'] == {
492            'alt_names': ['example.com', 'www.example.net']
493        }, 'subject alt_names'
494        assert cert['chain'][0]['issuer']['common_name'] == 'root', 'issuer'
495
496    def test_tls_keepalive(self):
497        self.load('mirror')
498
499        assert self.get()['status'] == 200, 'init'
500
501        self.certificate()
502
503        self.add_tls(application='mirror')
504
505        (resp, sock) = self.post_ssl(
506            headers={
507                'Host': 'localhost',
508                'Connection': 'keep-alive',
509                'Content-Type': 'text/html',
510            },
511            start=True,
512            body='0123456789',
513            read_timeout=1,
514        )
515
516        assert resp['body'] == '0123456789', 'keepalive 1'
517
518        resp = self.post_ssl(
519            headers={
520                'Host': 'localhost',
521                'Connection': 'close',
522                'Content-Type': 'text/html',
523            },
524            sock=sock,
525            body='0123456789',
526        )
527
528        assert resp['body'] == '0123456789', 'keepalive 2'
529
530    def test_tls_no_close_notify(self):
531        self.certificate()
532
533        assert 'success' in self.conf(
534            {
535                "listeners": {
536                    "*:7080": {
537                        "pass": "routes",
538                        "tls": {"certificate": "default"},
539                    }
540                },
541                "routes": [{"action": {"return": 200}}],
542                "applications": {},
543            }
544        ), 'load application configuration'
545
546        (resp, sock) = self.get_ssl(start=True)
547
548        time.sleep(5)
549
550        sock.close()
551
552    @pytest.mark.skip('not yet')
553    def test_tls_keepalive_certificate_remove(self):
554        self.load('empty')
555
556        assert self.get()['status'] == 200, 'init'
557
558        self.certificate()
559
560        self.add_tls()
561
562        (resp, sock) = self.get_ssl(
563            headers={'Host': 'localhost', 'Connection': 'keep-alive'},
564            start=True,
565            read_timeout=1,
566        )
567
568        assert 'success' in self.conf(
569            {"pass": "applications/empty"}, 'listeners/*:7080'
570        )
571        assert 'success' in self.conf_delete('/certificates/default')
572
573        try:
574            resp = self.get_ssl(
575                headers={'Host': 'localhost', 'Connection': 'close'}, sock=sock
576            )
577
578        except KeyboardInterrupt:
579            raise
580
581        except:
582            resp = None
583
584        assert resp == None, 'keepalive remove certificate'
585
586    @pytest.mark.skip('not yet')
587    def test_tls_certificates_remove_all(self):
588        self.load('empty')
589
590        self.certificate()
591
592        assert 'success' in self.conf_delete(
593            '/certificates'
594        ), 'remove all certificates'
595
596    def test_tls_application_respawn(self, skip_alert):
597        self.load('mirror')
598
599        self.certificate()
600
601        assert 'success' in self.conf('1', 'applications/mirror/processes')
602
603        self.add_tls(application='mirror')
604
605        (_, sock) = self.post_ssl(
606            headers={
607                'Host': 'localhost',
608                'Connection': 'keep-alive',
609                'Content-Type': 'text/html',
610            },
611            start=True,
612            body='0123456789',
613            read_timeout=1,
614        )
615
616        app_id = self.findall(r'(\d+)#\d+ "mirror" application started')[0]
617
618        subprocess.check_output(['kill', '-9', app_id])
619
620        skip_alert(r'process .* %s.* exited on signal 9' % app_id)
621
622        self.wait_for_record(
623            re.compile(
624                r' (?!' + app_id + r'#)(\d+)#\d+ "mirror" application started'
625            )
626        )
627
628        resp = self.post_ssl(
629            headers={
630                'Host': 'localhost',
631                'Connection': 'close',
632                'Content-Type': 'text/html',
633            },
634            sock=sock,
635            body='0123456789',
636        )
637
638        assert resp['status'] == 200, 'application respawn status'
639        assert resp['body'] == '0123456789', 'application respawn body'
640
641    def test_tls_url_scheme(self):
642        self.load('variables')
643
644        assert (
645            self.post(
646                headers={
647                    'Host': 'localhost',
648                    'Content-Type': 'text/html',
649                    'Custom-Header': '',
650                    'Connection': 'close',
651                }
652            )['headers']['Wsgi-Url-Scheme']
653            == 'http'
654        ), 'url scheme http'
655
656        self.certificate()
657
658        self.add_tls(application='variables')
659
660        assert (
661            self.post_ssl(
662                headers={
663                    'Host': 'localhost',
664                    'Content-Type': 'text/html',
665                    'Custom-Header': '',
666                    'Connection': 'close',
667                }
668            )['headers']['Wsgi-Url-Scheme']
669            == 'https'
670        ), 'url scheme https'
671
672    def test_tls_big_upload(self):
673        self.load('upload')
674
675        self.certificate()
676
677        self.add_tls(application='upload')
678
679        filename = 'test.txt'
680        data = '0123456789' * 9000
681
682        res = self.post_ssl(
683            body={
684                'file': {
685                    'filename': filename,
686                    'type': 'text/plain',
687                    'data': io.StringIO(data),
688                }
689            }
690        )
691        assert res['status'] == 200, 'status ok'
692        assert res['body'] == filename + data
693
694    def test_tls_multi_listener(self):
695        self.load('empty')
696
697        self.certificate()
698
699        self.add_tls()
700        self.add_tls(port=7081)
701
702        assert self.get_ssl()['status'] == 200, 'listener #1'
703
704        assert self.get_ssl(port=7081)['status'] == 200, 'listener #2'
705