xref: /unit/test/test_tls.py (revision 2066:242192963d93)
1import io
2import re
3import ssl
4import subprocess
5import time
6
7import pytest
8from unit.applications.tls import TestApplicationTLS
9from unit.option import option
10
11
12class TestTLS(TestApplicationTLS):
13    prerequisites = {'modules': {'python': 'any', 'openssl': 'any'}}
14
15    def openssl_date_to_sec_epoch(self, date):
16        return self.date_to_sec_epoch(date, '%b %d %H:%M:%S %Y %Z')
17
18    def add_tls(self, application='empty', cert='default', port=7080):
19        assert 'success' in self.conf(
20            {
21                "pass": "applications/" + application,
22                "tls": {"certificate": cert},
23            },
24            'listeners/*:' + str(port),
25        )
26
27    def remove_tls(self, application='empty', port=7080):
28        assert 'success' in self.conf(
29            {"pass": "applications/" + application}, 'listeners/*:' + str(port)
30        )
31
32    def req(self, name='localhost', subject=None, x509=False):
33        subj = subject if subject is not None else '/CN=' + name + '/'
34
35        subprocess.check_output(
36            [
37                'openssl',
38                'req',
39                '-new',
40                '-subj',
41                subj,
42                '-config',
43                option.temp_dir + '/openssl.conf',
44                '-out',
45                option.temp_dir + '/' + name + '.csr',
46                '-keyout',
47                option.temp_dir + '/' + name + '.key',
48            ],
49            stderr=subprocess.STDOUT,
50        )
51
52    def generate_ca_conf(self):
53        with open(option.temp_dir + '/ca.conf', 'w') as f:
54            f.write(
55                """[ ca ]
56default_ca = myca
57
58[ myca ]
59new_certs_dir = %(dir)s
60database = %(database)s
61default_md = sha256
62policy = myca_policy
63serial = %(certserial)s
64default_days = 1
65x509_extensions = myca_extensions
66copy_extensions = copy
67
68[ myca_policy ]
69commonName = optional
70
71[ myca_extensions ]
72basicConstraints = critical,CA:TRUE"""
73                % {
74                    'dir': option.temp_dir,
75                    'database': option.temp_dir + '/certindex',
76                    'certserial': option.temp_dir + '/certserial',
77                }
78            )
79
80        with open(option.temp_dir + '/certserial', 'w') as f:
81            f.write('1000')
82
83        with open(option.temp_dir + '/certindex', 'w') as f:
84            f.write('')
85
86        with open(option.temp_dir + '/certindex.attr', 'w') as f:
87            f.write('')
88
89    def ca(self, cert='root', out='localhost'):
90        subprocess.check_output(
91            [
92                'openssl',
93                'ca',
94                '-batch',
95                '-config',
96                option.temp_dir + '/ca.conf',
97                '-keyfile',
98                option.temp_dir + '/' + cert + '.key',
99                '-cert',
100                option.temp_dir + '/' + cert + '.crt',
101                '-in',
102                option.temp_dir + '/' + out + '.csr',
103                '-out',
104                option.temp_dir + '/' + out + '.crt',
105            ],
106            stderr=subprocess.STDOUT,
107        )
108
109    def set_certificate_req_context(self, cert='root'):
110        self.context = ssl.create_default_context()
111        self.context.check_hostname = False
112        self.context.verify_mode = ssl.CERT_REQUIRED
113        self.context.load_verify_locations(
114            option.temp_dir + '/' + cert + '.crt'
115        )
116
117    def test_tls_listener_option_add(self):
118        self.load('empty')
119
120        self.certificate()
121
122        self.add_tls()
123
124        assert self.get_ssl()['status'] == 200, 'add listener option'
125
126    def test_tls_listener_option_remove(self):
127        self.load('empty')
128
129        self.certificate()
130
131        self.add_tls()
132
133        self.get_ssl()
134
135        self.remove_tls()
136
137        assert self.get()['status'] == 200, 'remove listener option'
138
139    def test_tls_certificate_remove(self):
140        self.load('empty')
141
142        self.certificate()
143
144        assert 'success' in self.conf_delete(
145            '/certificates/default'
146        ), 'remove certificate'
147
148    def test_tls_certificate_remove_used(self):
149        self.load('empty')
150
151        self.certificate()
152
153        self.add_tls()
154
155        assert 'error' in self.conf_delete(
156            '/certificates/default'
157        ), 'remove certificate'
158
159    def test_tls_certificate_remove_nonexisting(self):
160        self.load('empty')
161
162        self.certificate()
163
164        self.add_tls()
165
166        assert 'error' in self.conf_delete(
167            '/certificates/blah'
168        ), 'remove nonexistings certificate'
169
170    @pytest.mark.skip('not yet')
171    def test_tls_certificate_update(self):
172        self.load('empty')
173
174        self.certificate()
175
176        self.add_tls()
177
178        cert_old = ssl.get_server_certificate(('127.0.0.1', 7080))
179
180        self.certificate()
181
182        assert cert_old != ssl.get_server_certificate(
183            ('127.0.0.1', 7080)
184        ), 'update certificate'
185
186    @pytest.mark.skip('not yet')
187    def test_tls_certificate_key_incorrect(self):
188        self.load('empty')
189
190        self.certificate('first', False)
191        self.certificate('second', False)
192
193        assert 'error' in self.certificate_load(
194            'first', 'second'
195        ), 'key incorrect'
196
197    def test_tls_certificate_change(self):
198        self.load('empty')
199
200        self.certificate()
201        self.certificate('new')
202
203        self.add_tls()
204
205        cert_old = ssl.get_server_certificate(('127.0.0.1', 7080))
206
207        self.add_tls(cert='new')
208
209        assert cert_old != ssl.get_server_certificate(
210            ('127.0.0.1', 7080)
211        ), 'change certificate'
212
213    def test_tls_certificate_key_rsa(self):
214        self.load('empty')
215
216        self.certificate()
217
218        assert (
219            self.conf_get('/certificates/default/key') == 'RSA (2048 bits)'
220        ), 'certificate key rsa'
221
222    def test_tls_certificate_key_ec(self, temp_dir):
223        self.load('empty')
224
225        self.openssl_conf()
226
227        subprocess.check_output(
228            [
229                'openssl',
230                'ecparam',
231                '-noout',
232                '-genkey',
233                '-out',
234                temp_dir + '/ec.key',
235                '-name',
236                'prime256v1',
237            ],
238            stderr=subprocess.STDOUT,
239        )
240
241        subprocess.check_output(
242            [
243                'openssl',
244                'req',
245                '-x509',
246                '-new',
247                '-subj',
248                '/CN=ec/',
249                '-config',
250                temp_dir + '/openssl.conf',
251                '-key',
252                temp_dir + '/ec.key',
253                '-out',
254                temp_dir + '/ec.crt',
255            ],
256            stderr=subprocess.STDOUT,
257        )
258
259        self.certificate_load('ec')
260
261        assert (
262            self.conf_get('/certificates/ec/key') == 'ECDH'
263        ), 'certificate key ec'
264
265    def test_tls_certificate_chain_options(self):
266        self.load('empty')
267
268        self.certificate()
269
270        chain = self.conf_get('/certificates/default/chain')
271
272        assert len(chain) == 1, 'certificate chain length'
273
274        cert = chain[0]
275
276        assert (
277            cert['subject']['common_name'] == 'default'
278        ), 'certificate subject common name'
279        assert (
280            cert['issuer']['common_name'] == 'default'
281        ), 'certificate issuer common name'
282
283        assert (
284            abs(
285                self.sec_epoch()
286                - self.openssl_date_to_sec_epoch(cert['validity']['since'])
287            )
288            < 60
289        ), 'certificate validity since'
290        assert (
291            self.openssl_date_to_sec_epoch(cert['validity']['until'])
292            - self.openssl_date_to_sec_epoch(cert['validity']['since'])
293            == 2592000
294        ), 'certificate validity until'
295
296    def test_tls_certificate_chain(self, temp_dir):
297        self.load('empty')
298
299        self.certificate('root', False)
300
301        self.req('int')
302        self.req('end')
303
304        self.generate_ca_conf()
305
306        self.ca(cert='root', out='int')
307        self.ca(cert='int', out='end')
308
309        crt_path = temp_dir + '/end-int.crt'
310        end_path = temp_dir + '/end.crt'
311        int_path = temp_dir + '/int.crt'
312
313        with open(crt_path, 'wb') as crt, open(end_path, 'rb') as end, open(
314            int_path, 'rb'
315        ) as int:
316            crt.write(end.read() + int.read())
317
318        self.set_certificate_req_context()
319
320        # incomplete chain
321
322        assert 'success' in self.certificate_load(
323            'end', 'end'
324        ), 'certificate chain end upload'
325
326        chain = self.conf_get('/certificates/end/chain')
327        assert len(chain) == 1, 'certificate chain end length'
328        assert (
329            chain[0]['subject']['common_name'] == 'end'
330        ), 'certificate chain end subject common name'
331        assert (
332            chain[0]['issuer']['common_name'] == 'int'
333        ), 'certificate chain end issuer common name'
334
335        self.add_tls(cert='end')
336
337        try:
338            resp = self.get_ssl()
339        except ssl.SSLError:
340            resp = None
341
342        assert resp == None, 'certificate chain incomplete chain'
343
344        # intermediate
345
346        assert 'success' in self.certificate_load(
347            'int', 'int'
348        ), 'certificate chain int upload'
349
350        chain = self.conf_get('/certificates/int/chain')
351        assert len(chain) == 1, 'certificate chain int length'
352        assert (
353            chain[0]['subject']['common_name'] == 'int'
354        ), 'certificate chain int subject common name'
355        assert (
356            chain[0]['issuer']['common_name'] == 'root'
357        ), 'certificate chain int issuer common name'
358
359        self.add_tls(cert='int')
360
361        assert (
362            self.get_ssl()['status'] == 200
363        ), 'certificate chain intermediate'
364
365        # intermediate server
366
367        assert 'success' in self.certificate_load(
368            'end-int', 'end'
369        ), 'certificate chain end-int upload'
370
371        chain = self.conf_get('/certificates/end-int/chain')
372        assert len(chain) == 2, 'certificate chain end-int length'
373        assert (
374            chain[0]['subject']['common_name'] == 'end'
375        ), 'certificate chain end-int int subject common name'
376        assert (
377            chain[0]['issuer']['common_name'] == 'int'
378        ), 'certificate chain end-int int issuer common name'
379        assert (
380            chain[1]['subject']['common_name'] == 'int'
381        ), 'certificate chain end-int end subject common name'
382        assert (
383            chain[1]['issuer']['common_name'] == 'root'
384        ), 'certificate chain end-int end issuer common name'
385
386        self.add_tls(cert='end-int')
387
388        assert (
389            self.get_ssl()['status'] == 200
390        ), 'certificate chain intermediate server'
391
392    def test_tls_certificate_empty_cn(self, temp_dir):
393        self.certificate('root', False)
394
395        self.req(subject='/')
396
397        self.generate_ca_conf()
398        self.ca()
399
400        self.set_certificate_req_context()
401
402        assert 'success' in self.certificate_load('localhost', 'localhost')
403
404        cert = self.conf_get('/certificates/localhost')
405        assert cert['chain'][0]['subject'] == {}, 'empty subject'
406        assert cert['chain'][0]['issuer']['common_name'] == 'root', 'issuer'
407
408    def test_tls_certificate_empty_cn_san(self, temp_dir):
409        self.certificate('root', False)
410
411        self.openssl_conf(
412            rewrite=True, alt_names=["example.com", "www.example.net"]
413        )
414
415        self.req(subject='/')
416
417        self.generate_ca_conf()
418        self.ca()
419
420        self.set_certificate_req_context()
421
422        assert 'success' in self.certificate_load('localhost', 'localhost')
423
424        cert = self.conf_get('/certificates/localhost')
425        assert cert['chain'][0]['subject'] == {
426            'alt_names': ['example.com', 'www.example.net']
427        }, 'subject alt_names'
428        assert cert['chain'][0]['issuer']['common_name'] == 'root', 'issuer'
429
430    def test_tls_certificate_empty_cn_san_ip(self):
431        self.certificate('root', False)
432
433        self.openssl_conf(
434            rewrite=True,
435            alt_names=['example.com', 'www.example.net', 'IP|10.0.0.1'],
436        )
437
438        self.req(subject='/')
439
440        self.generate_ca_conf()
441        self.ca()
442
443        self.set_certificate_req_context()
444
445        assert 'success' in self.certificate_load('localhost', 'localhost')
446
447        cert = self.conf_get('/certificates/localhost')
448        assert cert['chain'][0]['subject'] == {
449            'alt_names': ['example.com', 'www.example.net']
450        }, 'subject alt_names'
451        assert cert['chain'][0]['issuer']['common_name'] == 'root', 'issuer'
452
453    @pytest.mark.skip('not yet')
454    def test_tls_reconfigure(self):
455        self.load('empty')
456
457        assert self.get()['status'] == 200, 'init'
458
459        self.certificate()
460
461        (resp, sock) = self.get(
462            headers={'Host': 'localhost', 'Connection': 'keep-alive'},
463            start=True,
464            read_timeout=1,
465        )
466
467        assert resp['status'] == 200, 'initial status'
468
469        self.add_tls()
470
471        assert self.get(sock=sock)['status'] == 200, 'reconfigure status'
472        assert self.get_ssl()['status'] == 200, 'reconfigure tls status'
473
474    def test_tls_keepalive(self):
475        self.load('mirror')
476
477        assert self.get()['status'] == 200, 'init'
478
479        self.certificate()
480
481        self.add_tls(application='mirror')
482
483        (resp, sock) = self.post_ssl(
484            headers={
485                'Host': 'localhost',
486                'Connection': 'keep-alive',
487                'Content-Type': 'text/html',
488            },
489            start=True,
490            body='0123456789',
491            read_timeout=1,
492        )
493
494        assert resp['body'] == '0123456789', 'keepalive 1'
495
496        resp = self.post_ssl(
497            headers={
498                'Host': 'localhost',
499                'Connection': 'close',
500                'Content-Type': 'text/html',
501            },
502            sock=sock,
503            body='0123456789',
504        )
505
506        assert resp['body'] == '0123456789', 'keepalive 2'
507
508    def test_tls_no_close_notify(self):
509        self.certificate()
510
511        assert 'success' in self.conf(
512            {
513                "listeners": {
514                    "*:7080": {
515                        "pass": "routes",
516                        "tls": {"certificate": "default"},
517                    }
518                },
519                "routes": [{"action": {"return": 200}}],
520                "applications": {},
521            }
522        ), 'load application configuration'
523
524        (resp, sock) = self.get_ssl(start=True)
525
526        time.sleep(5)
527
528        sock.close()
529
530    @pytest.mark.skip('not yet')
531    def test_tls_keepalive_certificate_remove(self):
532        self.load('empty')
533
534        assert self.get()['status'] == 200, 'init'
535
536        self.certificate()
537
538        self.add_tls()
539
540        (resp, sock) = self.get_ssl(
541            headers={'Host': 'localhost', 'Connection': 'keep-alive'},
542            start=True,
543            read_timeout=1,
544        )
545
546        assert 'success' in self.conf(
547            {"pass": "applications/empty"}, 'listeners/*:7080'
548        )
549        assert 'success' in self.conf_delete('/certificates/default')
550
551        try:
552            resp = self.get_ssl(
553                headers={'Host': 'localhost', 'Connection': 'close'}, sock=sock
554            )
555
556        except KeyboardInterrupt:
557            raise
558
559        except:
560            resp = None
561
562        assert resp == None, 'keepalive remove certificate'
563
564    @pytest.mark.skip('not yet')
565    def test_tls_certificates_remove_all(self):
566        self.load('empty')
567
568        self.certificate()
569
570        assert 'success' in self.conf_delete(
571            '/certificates'
572        ), 'remove all certificates'
573
574    def test_tls_application_respawn(self, skip_alert):
575        self.load('mirror')
576
577        self.certificate()
578
579        assert 'success' in self.conf('1', 'applications/mirror/processes')
580
581        self.add_tls(application='mirror')
582
583        (_, sock) = self.post_ssl(
584            headers={
585                'Host': 'localhost',
586                'Connection': 'keep-alive',
587                'Content-Type': 'text/html',
588            },
589            start=True,
590            body='0123456789',
591            read_timeout=1,
592        )
593
594        app_id = self.findall(r'(\d+)#\d+ "mirror" application started')[0]
595
596        subprocess.check_output(['kill', '-9', app_id])
597
598        skip_alert(r'process .* %s.* exited on signal 9' % app_id)
599
600        self.wait_for_record(
601            re.compile(
602                r' (?!' + app_id + r'#)(\d+)#\d+ "mirror" application started'
603            )
604        )
605
606        resp = self.post_ssl(
607            headers={
608                'Host': 'localhost',
609                'Connection': 'close',
610                'Content-Type': 'text/html',
611            },
612            sock=sock,
613            body='0123456789',
614        )
615
616        assert resp['status'] == 200, 'application respawn status'
617        assert resp['body'] == '0123456789', 'application respawn body'
618
619    def test_tls_url_scheme(self):
620        self.load('variables')
621
622        assert (
623            self.post(
624                headers={
625                    'Host': 'localhost',
626                    'Content-Type': 'text/html',
627                    'Custom-Header': '',
628                    'Connection': 'close',
629                }
630            )['headers']['Wsgi-Url-Scheme']
631            == 'http'
632        ), 'url scheme http'
633
634        self.certificate()
635
636        self.add_tls(application='variables')
637
638        assert (
639            self.post_ssl(
640                headers={
641                    'Host': 'localhost',
642                    'Content-Type': 'text/html',
643                    'Custom-Header': '',
644                    'Connection': 'close',
645                }
646            )['headers']['Wsgi-Url-Scheme']
647            == 'https'
648        ), 'url scheme https'
649
650    def test_tls_big_upload(self):
651        self.load('upload')
652
653        self.certificate()
654
655        self.add_tls(application='upload')
656
657        filename = 'test.txt'
658        data = '0123456789' * 9000
659
660        res = self.post_ssl(
661            body={
662                'file': {
663                    'filename': filename,
664                    'type': 'text/plain',
665                    'data': io.StringIO(data),
666                }
667            }
668        )
669        assert res['status'] == 200, 'status ok'
670        assert res['body'] == filename + data
671
672    def test_tls_multi_listener(self):
673        self.load('empty')
674
675        self.certificate()
676
677        self.add_tls()
678        self.add_tls(port=7081)
679
680        assert self.get_ssl()['status'] == 200, 'listener #1'
681
682        assert self.get_ssl(port=7081)['status'] == 200, 'listener #2'
683