xref: /unit/test/test_tls.py (revision 1999:00d43b03d82f)
1import io
2import re
3import ssl
4import subprocess
5import time
6
7import pytest
8from unit.applications.tls import TestApplicationTLS
9from unit.option import option
10
11
12class TestTLS(TestApplicationTLS):
13    prerequisites = {'modules': {'python': 'any', 'openssl': 'any'}}
14
15    def openssl_date_to_sec_epoch(self, date):
16        return self.date_to_sec_epoch(date, '%b %d %H:%M:%S %Y %Z')
17
18    def add_tls(self, application='empty', cert='default', port=7080):
19        assert 'success' in self.conf(
20            {
21                "pass": "applications/" + application,
22                "tls": {"certificate": cert},
23            },
24            'listeners/*:' + str(port),
25        )
26
27    def remove_tls(self, application='empty', port=7080):
28        assert 'success' in self.conf(
29            {"pass": "applications/" + application}, 'listeners/*:' + str(port)
30        )
31
32    def req(self, name='localhost', subject=None, x509=False):
33        subj = subject if subject is not None else '/CN=' + name + '/'
34
35        subprocess.call(
36            [
37                'openssl',
38                'req',
39                '-new',
40                '-subj',
41                subj,
42                '-config',
43                option.temp_dir + '/openssl.conf',
44                '-out',
45                option.temp_dir + '/' + name + '.csr',
46                '-keyout',
47                option.temp_dir + '/' + name + '.key',
48            ],
49            stderr=subprocess.STDOUT,
50        )
51
52    def generate_ca_conf(self):
53        with open(option.temp_dir + '/ca.conf', 'w') as f:
54            f.write(
55                """[ ca ]
56default_ca = myca
57
58[ myca ]
59new_certs_dir = %(dir)s
60database = %(database)s
61default_md = sha256
62policy = myca_policy
63serial = %(certserial)s
64default_days = 1
65x509_extensions = myca_extensions
66copy_extensions = copy
67
68[ myca_policy ]
69commonName = optional
70
71[ myca_extensions ]
72basicConstraints = critical,CA:TRUE"""
73                % {
74                    'dir': option.temp_dir,
75                    'database': option.temp_dir + '/certindex',
76                    'certserial': option.temp_dir + '/certserial',
77                }
78            )
79
80        with open(option.temp_dir + '/certserial', 'w') as f:
81            f.write('1000')
82
83        with open(option.temp_dir + '/certindex', 'w') as f:
84            f.write('')
85
86        with open(option.temp_dir + '/certindex.attr', 'w') as f:
87            f.write('')
88
89    def ca(self, cert='root', out='localhost'):
90        subprocess.call(
91            [
92                'openssl',
93                'ca',
94                '-batch',
95                '-config',
96                option.temp_dir + '/ca.conf',
97                '-keyfile',
98                option.temp_dir + '/' + cert + '.key',
99                '-cert',
100                option.temp_dir + '/' + cert + '.crt',
101                '-in',
102                option.temp_dir + '/' + out + '.csr',
103                '-out',
104                option.temp_dir + '/' + out + '.crt',
105            ],
106            stderr=subprocess.STDOUT,
107        )
108
109    def set_certificate_req_context(self, cert='root'):
110        self.context = ssl.create_default_context()
111        self.context.check_hostname = False
112        self.context.verify_mode = ssl.CERT_REQUIRED
113        self.context.load_verify_locations(
114            option.temp_dir + '/' + cert + '.crt'
115        )
116
117    def test_tls_listener_option_add(self):
118        self.load('empty')
119
120        self.certificate()
121
122        self.add_tls()
123
124        assert self.get_ssl()['status'] == 200, 'add listener option'
125
126    def test_tls_listener_option_remove(self):
127        self.load('empty')
128
129        self.certificate()
130
131        self.add_tls()
132
133        self.get_ssl()
134
135        self.remove_tls()
136
137        assert self.get()['status'] == 200, 'remove listener option'
138
139    def test_tls_certificate_remove(self):
140        self.load('empty')
141
142        self.certificate()
143
144        assert 'success' in self.conf_delete(
145            '/certificates/default'
146        ), 'remove certificate'
147
148    def test_tls_certificate_remove_used(self):
149        self.load('empty')
150
151        self.certificate()
152
153        self.add_tls()
154
155        assert 'error' in self.conf_delete(
156            '/certificates/default'
157        ), 'remove certificate'
158
159    def test_tls_certificate_remove_nonexisting(self):
160        self.load('empty')
161
162        self.certificate()
163
164        self.add_tls()
165
166        assert 'error' in self.conf_delete(
167            '/certificates/blah'
168        ), 'remove nonexistings certificate'
169
170    @pytest.mark.skip('not yet')
171    def test_tls_certificate_update(self):
172        self.load('empty')
173
174        self.certificate()
175
176        self.add_tls()
177
178        cert_old = self.get_server_certificate()
179
180        self.certificate()
181
182        assert cert_old != self.get_server_certificate(), 'update certificate'
183
184    @pytest.mark.skip('not yet')
185    def test_tls_certificate_key_incorrect(self):
186        self.load('empty')
187
188        self.certificate('first', False)
189        self.certificate('second', False)
190
191        assert 'error' in self.certificate_load(
192            'first', 'second'
193        ), 'key incorrect'
194
195    def test_tls_certificate_change(self):
196        self.load('empty')
197
198        self.certificate()
199        self.certificate('new')
200
201        self.add_tls()
202
203        cert_old = self.get_server_certificate()
204
205        self.add_tls(cert='new')
206
207        assert cert_old != self.get_server_certificate(), 'change certificate'
208
209    def test_tls_certificate_key_rsa(self):
210        self.load('empty')
211
212        self.certificate()
213
214        assert (
215            self.conf_get('/certificates/default/key') == 'RSA (2048 bits)'
216        ), 'certificate key rsa'
217
218    def test_tls_certificate_key_ec(self, temp_dir):
219        self.load('empty')
220
221        self.openssl_conf()
222
223        subprocess.call(
224            [
225                'openssl',
226                'ecparam',
227                '-noout',
228                '-genkey',
229                '-out',
230                temp_dir + '/ec.key',
231                '-name',
232                'prime256v1',
233            ],
234            stderr=subprocess.STDOUT,
235        )
236
237        subprocess.call(
238            [
239                'openssl',
240                'req',
241                '-x509',
242                '-new',
243                '-subj',
244                '/CN=ec/',
245                '-config',
246                temp_dir + '/openssl.conf',
247                '-key',
248                temp_dir + '/ec.key',
249                '-out',
250                temp_dir + '/ec.crt',
251            ],
252            stderr=subprocess.STDOUT,
253        )
254
255        self.certificate_load('ec')
256
257        assert (
258            self.conf_get('/certificates/ec/key') == 'ECDH'
259        ), 'certificate key ec'
260
261    def test_tls_certificate_chain_options(self):
262        self.load('empty')
263
264        self.certificate()
265
266        chain = self.conf_get('/certificates/default/chain')
267
268        assert len(chain) == 1, 'certificate chain length'
269
270        cert = chain[0]
271
272        assert (
273            cert['subject']['common_name'] == 'default'
274        ), 'certificate subject common name'
275        assert (
276            cert['issuer']['common_name'] == 'default'
277        ), 'certificate issuer common name'
278
279        assert (
280            abs(
281                self.sec_epoch()
282                - self.openssl_date_to_sec_epoch(cert['validity']['since'])
283            )
284            < 60
285        ), 'certificate validity since'
286        assert (
287            self.openssl_date_to_sec_epoch(cert['validity']['until'])
288            - self.openssl_date_to_sec_epoch(cert['validity']['since'])
289            == 2592000
290        ), 'certificate validity until'
291
292    def test_tls_certificate_chain(self, temp_dir):
293        self.load('empty')
294
295        self.certificate('root', False)
296
297        self.req('int')
298        self.req('end')
299
300        self.generate_ca_conf()
301
302        self.ca(cert='root', out='int')
303        self.ca(cert='int', out='end')
304
305        crt_path = temp_dir + '/end-int.crt'
306        end_path = temp_dir + '/end.crt'
307        int_path = temp_dir + '/int.crt'
308
309        with open(crt_path, 'wb') as crt, open(end_path, 'rb') as end, open(
310            int_path, 'rb'
311        ) as int:
312            crt.write(end.read() + int.read())
313
314        self.set_certificate_req_context()
315
316        # incomplete chain
317
318        assert 'success' in self.certificate_load(
319            'end', 'end'
320        ), 'certificate chain end upload'
321
322        chain = self.conf_get('/certificates/end/chain')
323        assert len(chain) == 1, 'certificate chain end length'
324        assert (
325            chain[0]['subject']['common_name'] == 'end'
326        ), 'certificate chain end subject common name'
327        assert (
328            chain[0]['issuer']['common_name'] == 'int'
329        ), 'certificate chain end issuer common name'
330
331        self.add_tls(cert='end')
332
333        try:
334            resp = self.get_ssl()
335        except ssl.SSLError:
336            resp = None
337
338        assert resp == None, 'certificate chain incomplete chain'
339
340        # intermediate
341
342        assert 'success' in self.certificate_load(
343            'int', 'int'
344        ), 'certificate chain int upload'
345
346        chain = self.conf_get('/certificates/int/chain')
347        assert len(chain) == 1, 'certificate chain int length'
348        assert (
349            chain[0]['subject']['common_name'] == 'int'
350        ), 'certificate chain int subject common name'
351        assert (
352            chain[0]['issuer']['common_name'] == 'root'
353        ), 'certificate chain int issuer common name'
354
355        self.add_tls(cert='int')
356
357        assert (
358            self.get_ssl()['status'] == 200
359        ), 'certificate chain intermediate'
360
361        # intermediate server
362
363        assert 'success' in self.certificate_load(
364            'end-int', 'end'
365        ), 'certificate chain end-int upload'
366
367        chain = self.conf_get('/certificates/end-int/chain')
368        assert len(chain) == 2, 'certificate chain end-int length'
369        assert (
370            chain[0]['subject']['common_name'] == 'end'
371        ), 'certificate chain end-int int subject common name'
372        assert (
373            chain[0]['issuer']['common_name'] == 'int'
374        ), 'certificate chain end-int int issuer common name'
375        assert (
376            chain[1]['subject']['common_name'] == 'int'
377        ), 'certificate chain end-int end subject common name'
378        assert (
379            chain[1]['issuer']['common_name'] == 'root'
380        ), 'certificate chain end-int end issuer common name'
381
382        self.add_tls(cert='end-int')
383
384        assert (
385            self.get_ssl()['status'] == 200
386        ), 'certificate chain intermediate server'
387
388    def test_tls_certificate_empty_cn(self, temp_dir):
389        self.certificate('root', False)
390
391        self.req(subject='/')
392
393        self.generate_ca_conf()
394        self.ca()
395
396        self.set_certificate_req_context()
397
398        assert 'success' in self.certificate_load('localhost', 'localhost')
399
400        cert = self.conf_get('/certificates/localhost')
401        assert cert['chain'][0]['subject'] == {}, 'empty subject'
402        assert cert['chain'][0]['issuer']['common_name'] == 'root', 'issuer'
403
404    def test_tls_certificate_empty_cn_san(self, temp_dir):
405        self.certificate('root', False)
406
407        self.openssl_conf(
408            rewrite=True, alt_names=["example.com", "www.example.net"]
409        )
410
411        self.req(subject='/')
412
413        self.generate_ca_conf()
414        self.ca()
415
416        self.set_certificate_req_context()
417
418        assert 'success' in self.certificate_load('localhost', 'localhost')
419
420        cert = self.conf_get('/certificates/localhost')
421        assert cert['chain'][0]['subject'] == {
422            'alt_names': ['example.com', 'www.example.net']
423        }, 'subject alt_names'
424        assert cert['chain'][0]['issuer']['common_name'] == 'root', 'issuer'
425
426    def test_tls_certificate_empty_cn_san_ip(self):
427        self.certificate('root', False)
428
429        self.openssl_conf(
430            rewrite=True,
431            alt_names=['example.com', 'www.example.net', 'IP|10.0.0.1'],
432        )
433
434        self.req(subject='/')
435
436        self.generate_ca_conf()
437        self.ca()
438
439        self.set_certificate_req_context()
440
441        assert 'success' in self.certificate_load('localhost', 'localhost')
442
443        cert = self.conf_get('/certificates/localhost')
444        assert cert['chain'][0]['subject'] == {
445            'alt_names': ['example.com', 'www.example.net']
446        }, 'subject alt_names'
447        assert cert['chain'][0]['issuer']['common_name'] == 'root', 'issuer'
448
449    @pytest.mark.skip('not yet')
450    def test_tls_reconfigure(self):
451        self.load('empty')
452
453        assert self.get()['status'] == 200, 'init'
454
455        self.certificate()
456
457        (resp, sock) = self.get(
458            headers={'Host': 'localhost', 'Connection': 'keep-alive'},
459            start=True,
460            read_timeout=1,
461        )
462
463        assert resp['status'] == 200, 'initial status'
464
465        self.add_tls()
466
467        assert self.get(sock=sock)['status'] == 200, 'reconfigure status'
468        assert self.get_ssl()['status'] == 200, 'reconfigure tls status'
469
470    def test_tls_keepalive(self):
471        self.load('mirror')
472
473        assert self.get()['status'] == 200, 'init'
474
475        self.certificate()
476
477        self.add_tls(application='mirror')
478
479        (resp, sock) = self.post_ssl(
480            headers={
481                'Host': 'localhost',
482                'Connection': 'keep-alive',
483                'Content-Type': 'text/html',
484            },
485            start=True,
486            body='0123456789',
487            read_timeout=1,
488        )
489
490        assert resp['body'] == '0123456789', 'keepalive 1'
491
492        resp = self.post_ssl(
493            headers={
494                'Host': 'localhost',
495                'Connection': 'close',
496                'Content-Type': 'text/html',
497            },
498            sock=sock,
499            body='0123456789',
500        )
501
502        assert resp['body'] == '0123456789', 'keepalive 2'
503
504    def test_tls_no_close_notify(self):
505        self.certificate()
506
507        assert 'success' in self.conf(
508            {
509                "listeners": {
510                    "*:7080": {
511                        "pass": "routes",
512                        "tls": {"certificate": "default"},
513                    }
514                },
515                "routes": [{"action": {"return": 200}}],
516                "applications": {},
517            }
518        ), 'load application configuration'
519
520        (resp, sock) = self.get_ssl(start=True)
521
522        time.sleep(5)
523
524        sock.close()
525
526    @pytest.mark.skip('not yet')
527    def test_tls_keepalive_certificate_remove(self):
528        self.load('empty')
529
530        assert self.get()['status'] == 200, 'init'
531
532        self.certificate()
533
534        self.add_tls()
535
536        (resp, sock) = self.get_ssl(
537            headers={'Host': 'localhost', 'Connection': 'keep-alive'},
538            start=True,
539            read_timeout=1,
540        )
541
542        assert 'success' in self.conf(
543            {"pass": "applications/empty"}, 'listeners/*:7080'
544        )
545        assert 'success' in self.conf_delete('/certificates/default')
546
547        try:
548            resp = self.get_ssl(
549                headers={'Host': 'localhost', 'Connection': 'close'}, sock=sock
550            )
551
552        except KeyboardInterrupt:
553            raise
554
555        except:
556            resp = None
557
558        assert resp == None, 'keepalive remove certificate'
559
560    @pytest.mark.skip('not yet')
561    def test_tls_certificates_remove_all(self):
562        self.load('empty')
563
564        self.certificate()
565
566        assert 'success' in self.conf_delete(
567            '/certificates'
568        ), 'remove all certificates'
569
570    def test_tls_application_respawn(self, skip_alert):
571        self.load('mirror')
572
573        self.certificate()
574
575        assert 'success' in self.conf('1', 'applications/mirror/processes')
576
577        self.add_tls(application='mirror')
578
579        (_, sock) = self.post_ssl(
580            headers={
581                'Host': 'localhost',
582                'Connection': 'keep-alive',
583                'Content-Type': 'text/html',
584            },
585            start=True,
586            body='0123456789',
587            read_timeout=1,
588        )
589
590        app_id = self.findall(r'(\d+)#\d+ "mirror" application started')[0]
591
592        subprocess.call(['kill', '-9', app_id])
593
594        skip_alert(r'process .* %s.* exited on signal 9' % app_id)
595
596        self.wait_for_record(
597            re.compile(
598                r' (?!' + app_id + r'#)(\d+)#\d+ "mirror" application started'
599            )
600        )
601
602        resp = self.post_ssl(
603            headers={
604                'Host': 'localhost',
605                'Connection': 'close',
606                'Content-Type': 'text/html',
607            },
608            sock=sock,
609            body='0123456789',
610        )
611
612        assert resp['status'] == 200, 'application respawn status'
613        assert resp['body'] == '0123456789', 'application respawn body'
614
615    def test_tls_url_scheme(self):
616        self.load('variables')
617
618        assert (
619            self.post(
620                headers={
621                    'Host': 'localhost',
622                    'Content-Type': 'text/html',
623                    'Custom-Header': '',
624                    'Connection': 'close',
625                }
626            )['headers']['Wsgi-Url-Scheme']
627            == 'http'
628        ), 'url scheme http'
629
630        self.certificate()
631
632        self.add_tls(application='variables')
633
634        assert (
635            self.post_ssl(
636                headers={
637                    'Host': 'localhost',
638                    'Content-Type': 'text/html',
639                    'Custom-Header': '',
640                    'Connection': 'close',
641                }
642            )['headers']['Wsgi-Url-Scheme']
643            == 'https'
644        ), 'url scheme https'
645
646    def test_tls_big_upload(self):
647        self.load('upload')
648
649        self.certificate()
650
651        self.add_tls(application='upload')
652
653        filename = 'test.txt'
654        data = '0123456789' * 9000
655
656        res = self.post_ssl(
657            body={
658                'file': {
659                    'filename': filename,
660                    'type': 'text/plain',
661                    'data': io.StringIO(data),
662                }
663            }
664        )
665        assert res['status'] == 200, 'status ok'
666        assert res['body'] == filename + data
667
668    def test_tls_multi_listener(self):
669        self.load('empty')
670
671        self.certificate()
672
673        self.add_tls()
674        self.add_tls(port=7081)
675
676        assert self.get_ssl()['status'] == 200, 'listener #1'
677
678        assert self.get_ssl(port=7081)['status'] == 200, 'listener #2'
679