xref: /unit/test/test_tls.py (revision 1654:fc7d0578e124)
1import io
2import re
3import ssl
4import subprocess
5
6import pytest
7
8from conftest import option
9from conftest import skip_alert
10from unit.applications.tls import TestApplicationTLS
11
12
13class TestTLS(TestApplicationTLS):
14    prerequisites = {'modules': {'python': 'any', 'openssl': 'any'}}
15
16    def findall(self, pattern):
17        with open(option.temp_dir + '/unit.log', 'r', errors='ignore') as f:
18            return re.findall(pattern, f.read())
19
20    def openssl_date_to_sec_epoch(self, date):
21        return self.date_to_sec_epoch(date, '%b %d %H:%M:%S %Y %Z')
22
23    def add_tls(self, application='empty', cert='default', port=7080):
24        self.conf(
25            {
26                "pass": "applications/" + application,
27                "tls": {"certificate": cert}
28            },
29            'listeners/*:' + str(port),
30        )
31
32    def remove_tls(self, application='empty', port=7080):
33        self.conf(
34            {"pass": "applications/" + application}, 'listeners/*:' + str(port)
35        )
36
37    def test_tls_listener_option_add(self):
38        self.load('empty')
39
40        self.certificate()
41
42        self.add_tls()
43
44        assert self.get_ssl()['status'] == 200, 'add listener option'
45
46    def test_tls_listener_option_remove(self):
47        self.load('empty')
48
49        self.certificate()
50
51        self.add_tls()
52
53        self.get_ssl()
54
55        self.remove_tls()
56
57        assert self.get()['status'] == 200, 'remove listener option'
58
59    def test_tls_certificate_remove(self):
60        self.load('empty')
61
62        self.certificate()
63
64        assert 'success' in self.conf_delete(
65            '/certificates/default'
66        ), 'remove certificate'
67
68    def test_tls_certificate_remove_used(self):
69        self.load('empty')
70
71        self.certificate()
72
73        self.add_tls()
74
75        assert 'error' in self.conf_delete(
76            '/certificates/default'
77        ), 'remove certificate'
78
79    def test_tls_certificate_remove_nonexisting(self):
80        self.load('empty')
81
82        self.certificate()
83
84        self.add_tls()
85
86        assert 'error' in self.conf_delete(
87            '/certificates/blah'
88        ), 'remove nonexistings certificate'
89
90    @pytest.mark.skip('not yet')
91    def test_tls_certificate_update(self):
92        self.load('empty')
93
94        self.certificate()
95
96        self.add_tls()
97
98        cert_old = self.get_server_certificate()
99
100        self.certificate()
101
102        assert cert_old != self.get_server_certificate(), 'update certificate'
103
104    @pytest.mark.skip('not yet')
105    def test_tls_certificate_key_incorrect(self):
106        self.load('empty')
107
108        self.certificate('first', False)
109        self.certificate('second', False)
110
111        assert 'error' in self.certificate_load(
112            'first', 'second'
113        ), 'key incorrect'
114
115    def test_tls_certificate_change(self):
116        self.load('empty')
117
118        self.certificate()
119        self.certificate('new')
120
121        self.add_tls()
122
123        cert_old = self.get_server_certificate()
124
125        self.add_tls(cert='new')
126
127        assert cert_old != self.get_server_certificate(), 'change certificate'
128
129    def test_tls_certificate_key_rsa(self):
130        self.load('empty')
131
132        self.certificate()
133
134        assert (
135            self.conf_get('/certificates/default/key') == 'RSA (2048 bits)'
136        ), 'certificate key rsa'
137
138    def test_tls_certificate_key_ec(self, temp_dir):
139        self.load('empty')
140
141        self.openssl_conf()
142
143        subprocess.call(
144            [
145                'openssl',
146                'ecparam',
147                '-noout',
148                '-genkey',
149                '-out',
150                temp_dir + '/ec.key',
151                '-name',
152                'prime256v1',
153            ],
154            stderr=subprocess.STDOUT,
155        )
156
157        subprocess.call(
158            [
159                'openssl',
160                'req',
161                '-x509',
162                '-new',
163                '-subj',
164                '/CN=ec/',
165                '-config',
166                temp_dir + '/openssl.conf',
167                '-key',
168                temp_dir + '/ec.key',
169                '-out',
170                temp_dir + '/ec.crt',
171            ],
172            stderr=subprocess.STDOUT,
173        )
174
175        self.certificate_load('ec')
176
177        assert (
178            self.conf_get('/certificates/ec/key') == 'ECDH'
179        ), 'certificate key ec'
180
181    def test_tls_certificate_chain_options(self):
182        self.load('empty')
183
184        self.certificate()
185
186        chain = self.conf_get('/certificates/default/chain')
187
188        assert len(chain) == 1, 'certificate chain length'
189
190        cert = chain[0]
191
192        assert (
193            cert['subject']['common_name'] == 'default'
194        ), 'certificate subject common name'
195        assert (
196            cert['issuer']['common_name'] == 'default'
197        ), 'certificate issuer common name'
198
199        assert (
200            abs(
201                self.sec_epoch()
202                - self.openssl_date_to_sec_epoch(cert['validity']['since'])
203            )
204            < 5
205        ), 'certificate validity since'
206        assert (
207            self.openssl_date_to_sec_epoch(cert['validity']['until'])
208            - self.openssl_date_to_sec_epoch(cert['validity']['since'])
209            == 2592000
210        ), 'certificate validity until'
211
212    def test_tls_certificate_chain(self, temp_dir):
213        self.load('empty')
214
215        self.certificate('root', False)
216
217        subprocess.call(
218            [
219                'openssl',
220                'req',
221                '-new',
222                '-subj',
223                '/CN=int/',
224                '-config',
225                temp_dir + '/openssl.conf',
226                '-out',
227                temp_dir + '/int.csr',
228                '-keyout',
229                temp_dir + '/int.key',
230            ],
231            stderr=subprocess.STDOUT,
232        )
233
234        subprocess.call(
235            [
236                'openssl',
237                'req',
238                '-new',
239                '-subj',
240                '/CN=end/',
241                '-config',
242                temp_dir + '/openssl.conf',
243                '-out',
244                temp_dir + '/end.csr',
245                '-keyout',
246                temp_dir + '/end.key',
247            ],
248            stderr=subprocess.STDOUT,
249        )
250
251        with open(temp_dir + '/ca.conf', 'w') as f:
252            f.write(
253                """[ ca ]
254default_ca = myca
255
256[ myca ]
257new_certs_dir = %(dir)s
258database = %(database)s
259default_md = sha256
260policy = myca_policy
261serial = %(certserial)s
262default_days = 1
263x509_extensions = myca_extensions
264
265[ myca_policy ]
266commonName = supplied
267
268[ myca_extensions ]
269basicConstraints = critical,CA:TRUE"""
270                % {
271                    'dir': temp_dir,
272                    'database': temp_dir + '/certindex',
273                    'certserial': temp_dir + '/certserial',
274                }
275            )
276
277        with open(temp_dir + '/certserial', 'w') as f:
278            f.write('1000')
279
280        with open(temp_dir + '/certindex', 'w') as f:
281            f.write('')
282
283        subprocess.call(
284            [
285                'openssl',
286                'ca',
287                '-batch',
288                '-subj',
289                '/CN=int/',
290                '-config',
291                temp_dir + '/ca.conf',
292                '-keyfile',
293                temp_dir + '/root.key',
294                '-cert',
295                temp_dir + '/root.crt',
296                '-in',
297                temp_dir + '/int.csr',
298                '-out',
299                temp_dir + '/int.crt',
300            ],
301            stderr=subprocess.STDOUT,
302        )
303
304        subprocess.call(
305            [
306                'openssl',
307                'ca',
308                '-batch',
309                '-subj',
310                '/CN=end/',
311                '-config',
312                temp_dir + '/ca.conf',
313                '-keyfile',
314                temp_dir + '/int.key',
315                '-cert',
316                temp_dir + '/int.crt',
317                '-in',
318                temp_dir + '/end.csr',
319                '-out',
320                temp_dir + '/end.crt',
321            ],
322            stderr=subprocess.STDOUT,
323        )
324
325        crt_path = temp_dir + '/end-int.crt'
326        end_path = temp_dir + '/end.crt'
327        int_path = temp_dir + '/int.crt'
328
329        with open(crt_path, 'wb') as crt, open(end_path, 'rb') as end, open(
330            int_path, 'rb'
331        ) as int:
332            crt.write(end.read() + int.read())
333
334        self.context = ssl.create_default_context()
335        self.context.check_hostname = False
336        self.context.verify_mode = ssl.CERT_REQUIRED
337        self.context.load_verify_locations(temp_dir + '/root.crt')
338
339        # incomplete chain
340
341        assert 'success' in self.certificate_load(
342            'end', 'end'
343        ), 'certificate chain end upload'
344
345        chain = self.conf_get('/certificates/end/chain')
346        assert len(chain) == 1, 'certificate chain end length'
347        assert (
348            chain[0]['subject']['common_name'] == 'end'
349        ), 'certificate chain end subject common name'
350        assert (
351            chain[0]['issuer']['common_name'] == 'int'
352        ), 'certificate chain end issuer common name'
353
354        self.add_tls(cert='end')
355
356        try:
357            resp = self.get_ssl()
358        except ssl.SSLError:
359            resp = None
360
361        assert resp == None, 'certificate chain incomplete chain'
362
363        # intermediate
364
365        assert 'success' in self.certificate_load(
366            'int', 'int'
367        ), 'certificate chain int upload'
368
369        chain = self.conf_get('/certificates/int/chain')
370        assert len(chain) == 1, 'certificate chain int length'
371        assert (
372            chain[0]['subject']['common_name'] == 'int'
373        ), 'certificate chain int subject common name'
374        assert (
375            chain[0]['issuer']['common_name'] == 'root'
376        ), 'certificate chain int issuer common name'
377
378        self.add_tls(cert='int')
379
380        assert (
381            self.get_ssl()['status'] == 200
382        ), 'certificate chain intermediate'
383
384        # intermediate server
385
386        assert 'success' in self.certificate_load(
387            'end-int', 'end'
388        ), 'certificate chain end-int upload'
389
390        chain = self.conf_get('/certificates/end-int/chain')
391        assert len(chain) == 2, 'certificate chain end-int length'
392        assert (
393            chain[0]['subject']['common_name'] == 'end'
394        ), 'certificate chain end-int int subject common name'
395        assert (
396            chain[0]['issuer']['common_name'] == 'int'
397        ), 'certificate chain end-int int issuer common name'
398        assert (
399            chain[1]['subject']['common_name'] == 'int'
400        ), 'certificate chain end-int end subject common name'
401        assert (
402            chain[1]['issuer']['common_name'] == 'root'
403        ), 'certificate chain end-int end issuer common name'
404
405        self.add_tls(cert='end-int')
406
407        assert (
408            self.get_ssl()['status'] == 200
409        ), 'certificate chain intermediate server'
410
411    @pytest.mark.skip('not yet')
412    def test_tls_reconfigure(self):
413        self.load('empty')
414
415        assert self.get()['status'] == 200, 'init'
416
417        self.certificate()
418
419        (resp, sock) = self.get(
420            headers={'Host': 'localhost', 'Connection': 'keep-alive'},
421            start=True,
422            read_timeout=1,
423        )
424
425        assert resp['status'] == 200, 'initial status'
426
427        self.add_tls()
428
429        assert self.get(sock=sock)['status'] == 200, 'reconfigure status'
430        assert self.get_ssl()['status'] == 200, 'reconfigure tls status'
431
432    def test_tls_keepalive(self):
433        self.load('mirror')
434
435        assert self.get()['status'] == 200, 'init'
436
437        self.certificate()
438
439        self.add_tls(application='mirror')
440
441        (resp, sock) = self.post_ssl(
442            headers={
443                'Host': 'localhost',
444                'Connection': 'keep-alive',
445                'Content-Type': 'text/html',
446            },
447            start=True,
448            body='0123456789',
449            read_timeout=1,
450        )
451
452        assert resp['body'] == '0123456789', 'keepalive 1'
453
454        resp = self.post_ssl(
455            headers={
456                'Host': 'localhost',
457                'Connection': 'close',
458                'Content-Type': 'text/html',
459            },
460            sock=sock,
461            body='0123456789',
462        )
463
464        assert resp['body'] == '0123456789', 'keepalive 2'
465
466    @pytest.mark.skip('not yet')
467    def test_tls_keepalive_certificate_remove(self):
468        self.load('empty')
469
470        assert self.get()['status'] == 200, 'init'
471
472        self.certificate()
473
474        self.add_tls()
475
476        (resp, sock) = self.get_ssl(
477            headers={'Host': 'localhost', 'Connection': 'keep-alive'},
478            start=True,
479            read_timeout=1,
480        )
481
482        self.conf({"pass": "applications/empty"}, 'listeners/*:7080')
483        self.conf_delete('/certificates/default')
484
485        try:
486            resp = self.get_ssl(
487                headers={'Host': 'localhost', 'Connection': 'close'}, sock=sock
488            )
489        except:
490            resp = None
491
492        assert resp == None, 'keepalive remove certificate'
493
494    @pytest.mark.skip('not yet')
495    def test_tls_certificates_remove_all(self):
496        self.load('empty')
497
498        self.certificate()
499
500        assert 'success' in self.conf_delete(
501            '/certificates'
502        ), 'remove all certificates'
503
504    def test_tls_application_respawn(self):
505        self.load('mirror')
506
507        self.certificate()
508
509        self.conf('1', 'applications/mirror/processes')
510
511        self.add_tls(application='mirror')
512
513        (_, sock) = self.post_ssl(
514            headers={
515                'Host': 'localhost',
516                'Connection': 'keep-alive',
517                'Content-Type': 'text/html',
518            },
519            start=True,
520            body='0123456789',
521            read_timeout=1,
522        )
523
524        app_id = self.findall(r'(\d+)#\d+ "mirror" application started')[0]
525
526        subprocess.call(['kill', '-9', app_id])
527
528        skip_alert(r'process %s exited on signal 9' % app_id)
529
530        self.wait_for_record(
531            re.compile(
532                r' (?!' + app_id + r'#)(\d+)#\d+ "mirror" application started'
533            )
534        )
535
536        resp = self.post_ssl(
537            headers={
538                'Host': 'localhost',
539                'Connection': 'close',
540                'Content-Type': 'text/html',
541            },
542            sock=sock,
543            body='0123456789',
544        )
545
546        assert resp['status'] == 200, 'application respawn status'
547        assert resp['body'] == '0123456789', 'application respawn body'
548
549    def test_tls_url_scheme(self):
550        self.load('variables')
551
552        assert (
553            self.post(
554                headers={
555                    'Host': 'localhost',
556                    'Content-Type': 'text/html',
557                    'Custom-Header': '',
558                    'Connection': 'close',
559                }
560            )['headers']['Wsgi-Url-Scheme']
561            == 'http'
562        ), 'url scheme http'
563
564        self.certificate()
565
566        self.add_tls(application='variables')
567
568        assert (
569            self.post_ssl(
570                headers={
571                    'Host': 'localhost',
572                    'Content-Type': 'text/html',
573                    'Custom-Header': '',
574                    'Connection': 'close',
575                }
576            )['headers']['Wsgi-Url-Scheme']
577            == 'https'
578        ), 'url scheme https'
579
580    def test_tls_big_upload(self):
581        self.load('upload')
582
583        self.certificate()
584
585        self.add_tls(application='upload')
586
587        filename = 'test.txt'
588        data = '0123456789' * 9000
589
590        res = self.post_ssl(
591            body={
592                'file': {
593                    'filename': filename,
594                    'type': 'text/plain',
595                    'data': io.StringIO(data),
596                }
597            }
598        )
599        assert res['status'] == 200, 'status ok'
600        assert res['body'] == filename + data
601