xref: /unit/test/test_tls.py (revision 1848:4bd548074e2c)
1import io
2import re
3import ssl
4import subprocess
5
6import pytest
7
8from unit.applications.tls import TestApplicationTLS
9from unit.option import option
10
11
12class TestTLS(TestApplicationTLS):
13    prerequisites = {'modules': {'python': 'any', 'openssl': 'any'}}
14
15    def findall(self, pattern):
16        with open(option.temp_dir + '/unit.log', 'r', errors='ignore') as f:
17            return re.findall(pattern, f.read())
18
19    def openssl_date_to_sec_epoch(self, date):
20        return self.date_to_sec_epoch(date, '%b %d %H:%M:%S %Y %Z')
21
22    def add_tls(self, application='empty', cert='default', port=7080):
23        assert 'success' in self.conf(
24            {
25                "pass": "applications/" + application,
26                "tls": {"certificate": cert},
27            },
28            'listeners/*:' + str(port),
29        )
30
31    def remove_tls(self, application='empty', port=7080):
32        assert 'success' in self.conf(
33            {"pass": "applications/" + application}, 'listeners/*:' + str(port)
34        )
35
36    def test_tls_listener_option_add(self):
37        self.load('empty')
38
39        self.certificate()
40
41        self.add_tls()
42
43        assert self.get_ssl()['status'] == 200, 'add listener option'
44
45    def test_tls_listener_option_remove(self):
46        self.load('empty')
47
48        self.certificate()
49
50        self.add_tls()
51
52        self.get_ssl()
53
54        self.remove_tls()
55
56        assert self.get()['status'] == 200, 'remove listener option'
57
58    def test_tls_certificate_remove(self):
59        self.load('empty')
60
61        self.certificate()
62
63        assert 'success' in self.conf_delete(
64            '/certificates/default'
65        ), 'remove certificate'
66
67    def test_tls_certificate_remove_used(self):
68        self.load('empty')
69
70        self.certificate()
71
72        self.add_tls()
73
74        assert 'error' in self.conf_delete(
75            '/certificates/default'
76        ), 'remove certificate'
77
78    def test_tls_certificate_remove_nonexisting(self):
79        self.load('empty')
80
81        self.certificate()
82
83        self.add_tls()
84
85        assert 'error' in self.conf_delete(
86            '/certificates/blah'
87        ), 'remove nonexistings certificate'
88
89    @pytest.mark.skip('not yet')
90    def test_tls_certificate_update(self):
91        self.load('empty')
92
93        self.certificate()
94
95        self.add_tls()
96
97        cert_old = self.get_server_certificate()
98
99        self.certificate()
100
101        assert cert_old != self.get_server_certificate(), 'update certificate'
102
103    @pytest.mark.skip('not yet')
104    def test_tls_certificate_key_incorrect(self):
105        self.load('empty')
106
107        self.certificate('first', False)
108        self.certificate('second', False)
109
110        assert 'error' in self.certificate_load(
111            'first', 'second'
112        ), 'key incorrect'
113
114    def test_tls_certificate_change(self):
115        self.load('empty')
116
117        self.certificate()
118        self.certificate('new')
119
120        self.add_tls()
121
122        cert_old = self.get_server_certificate()
123
124        self.add_tls(cert='new')
125
126        assert cert_old != self.get_server_certificate(), 'change certificate'
127
128    def test_tls_certificate_key_rsa(self):
129        self.load('empty')
130
131        self.certificate()
132
133        assert (
134            self.conf_get('/certificates/default/key') == 'RSA (2048 bits)'
135        ), 'certificate key rsa'
136
137    def test_tls_certificate_key_ec(self, temp_dir):
138        self.load('empty')
139
140        self.openssl_conf()
141
142        subprocess.call(
143            [
144                'openssl',
145                'ecparam',
146                '-noout',
147                '-genkey',
148                '-out',
149                temp_dir + '/ec.key',
150                '-name',
151                'prime256v1',
152            ],
153            stderr=subprocess.STDOUT,
154        )
155
156        subprocess.call(
157            [
158                'openssl',
159                'req',
160                '-x509',
161                '-new',
162                '-subj',
163                '/CN=ec/',
164                '-config',
165                temp_dir + '/openssl.conf',
166                '-key',
167                temp_dir + '/ec.key',
168                '-out',
169                temp_dir + '/ec.crt',
170            ],
171            stderr=subprocess.STDOUT,
172        )
173
174        self.certificate_load('ec')
175
176        assert (
177            self.conf_get('/certificates/ec/key') == 'ECDH'
178        ), 'certificate key ec'
179
180    def test_tls_certificate_chain_options(self):
181        self.load('empty')
182
183        self.certificate()
184
185        chain = self.conf_get('/certificates/default/chain')
186
187        assert len(chain) == 1, 'certificate chain length'
188
189        cert = chain[0]
190
191        assert (
192            cert['subject']['common_name'] == 'default'
193        ), 'certificate subject common name'
194        assert (
195            cert['issuer']['common_name'] == 'default'
196        ), 'certificate issuer common name'
197
198        assert (
199            abs(
200                self.sec_epoch()
201                - self.openssl_date_to_sec_epoch(cert['validity']['since'])
202            )
203            < 60
204        ), 'certificate validity since'
205        assert (
206            self.openssl_date_to_sec_epoch(cert['validity']['until'])
207            - self.openssl_date_to_sec_epoch(cert['validity']['since'])
208            == 2592000
209        ), 'certificate validity until'
210
211    def test_tls_certificate_chain(self, temp_dir):
212        self.load('empty')
213
214        self.certificate('root', False)
215
216        subprocess.call(
217            [
218                'openssl',
219                'req',
220                '-new',
221                '-subj',
222                '/CN=int/',
223                '-config',
224                temp_dir + '/openssl.conf',
225                '-out',
226                temp_dir + '/int.csr',
227                '-keyout',
228                temp_dir + '/int.key',
229            ],
230            stderr=subprocess.STDOUT,
231        )
232
233        subprocess.call(
234            [
235                'openssl',
236                'req',
237                '-new',
238                '-subj',
239                '/CN=end/',
240                '-config',
241                temp_dir + '/openssl.conf',
242                '-out',
243                temp_dir + '/end.csr',
244                '-keyout',
245                temp_dir + '/end.key',
246            ],
247            stderr=subprocess.STDOUT,
248        )
249
250        with open(temp_dir + '/ca.conf', 'w') as f:
251            f.write(
252                """[ ca ]
253default_ca = myca
254
255[ myca ]
256new_certs_dir = %(dir)s
257database = %(database)s
258default_md = sha256
259policy = myca_policy
260serial = %(certserial)s
261default_days = 1
262x509_extensions = myca_extensions
263
264[ myca_policy ]
265commonName = supplied
266
267[ myca_extensions ]
268basicConstraints = critical,CA:TRUE"""
269                % {
270                    'dir': temp_dir,
271                    'database': temp_dir + '/certindex',
272                    'certserial': temp_dir + '/certserial',
273                }
274            )
275
276        with open(temp_dir + '/certserial', 'w') as f:
277            f.write('1000')
278
279        with open(temp_dir + '/certindex', 'w') as f:
280            f.write('')
281
282        subprocess.call(
283            [
284                'openssl',
285                'ca',
286                '-batch',
287                '-subj',
288                '/CN=int/',
289                '-config',
290                temp_dir + '/ca.conf',
291                '-keyfile',
292                temp_dir + '/root.key',
293                '-cert',
294                temp_dir + '/root.crt',
295                '-in',
296                temp_dir + '/int.csr',
297                '-out',
298                temp_dir + '/int.crt',
299            ],
300            stderr=subprocess.STDOUT,
301        )
302
303        subprocess.call(
304            [
305                'openssl',
306                'ca',
307                '-batch',
308                '-subj',
309                '/CN=end/',
310                '-config',
311                temp_dir + '/ca.conf',
312                '-keyfile',
313                temp_dir + '/int.key',
314                '-cert',
315                temp_dir + '/int.crt',
316                '-in',
317                temp_dir + '/end.csr',
318                '-out',
319                temp_dir + '/end.crt',
320            ],
321            stderr=subprocess.STDOUT,
322        )
323
324        crt_path = temp_dir + '/end-int.crt'
325        end_path = temp_dir + '/end.crt'
326        int_path = temp_dir + '/int.crt'
327
328        with open(crt_path, 'wb') as crt, open(end_path, 'rb') as end, open(
329            int_path, 'rb'
330        ) as int:
331            crt.write(end.read() + int.read())
332
333        self.context = ssl.create_default_context()
334        self.context.check_hostname = False
335        self.context.verify_mode = ssl.CERT_REQUIRED
336        self.context.load_verify_locations(temp_dir + '/root.crt')
337
338        # incomplete chain
339
340        assert 'success' in self.certificate_load(
341            'end', 'end'
342        ), 'certificate chain end upload'
343
344        chain = self.conf_get('/certificates/end/chain')
345        assert len(chain) == 1, 'certificate chain end length'
346        assert (
347            chain[0]['subject']['common_name'] == 'end'
348        ), 'certificate chain end subject common name'
349        assert (
350            chain[0]['issuer']['common_name'] == 'int'
351        ), 'certificate chain end issuer common name'
352
353        self.add_tls(cert='end')
354
355        try:
356            resp = self.get_ssl()
357        except ssl.SSLError:
358            resp = None
359
360        assert resp == None, 'certificate chain incomplete chain'
361
362        # intermediate
363
364        assert 'success' in self.certificate_load(
365            'int', 'int'
366        ), 'certificate chain int upload'
367
368        chain = self.conf_get('/certificates/int/chain')
369        assert len(chain) == 1, 'certificate chain int length'
370        assert (
371            chain[0]['subject']['common_name'] == 'int'
372        ), 'certificate chain int subject common name'
373        assert (
374            chain[0]['issuer']['common_name'] == 'root'
375        ), 'certificate chain int issuer common name'
376
377        self.add_tls(cert='int')
378
379        assert (
380            self.get_ssl()['status'] == 200
381        ), 'certificate chain intermediate'
382
383        # intermediate server
384
385        assert 'success' in self.certificate_load(
386            'end-int', 'end'
387        ), 'certificate chain end-int upload'
388
389        chain = self.conf_get('/certificates/end-int/chain')
390        assert len(chain) == 2, 'certificate chain end-int length'
391        assert (
392            chain[0]['subject']['common_name'] == 'end'
393        ), 'certificate chain end-int int subject common name'
394        assert (
395            chain[0]['issuer']['common_name'] == 'int'
396        ), 'certificate chain end-int int issuer common name'
397        assert (
398            chain[1]['subject']['common_name'] == 'int'
399        ), 'certificate chain end-int end subject common name'
400        assert (
401            chain[1]['issuer']['common_name'] == 'root'
402        ), 'certificate chain end-int end issuer common name'
403
404        self.add_tls(cert='end-int')
405
406        assert (
407            self.get_ssl()['status'] == 200
408        ), 'certificate chain intermediate server'
409
410    @pytest.mark.skip('not yet')
411    def test_tls_reconfigure(self):
412        self.load('empty')
413
414        assert self.get()['status'] == 200, 'init'
415
416        self.certificate()
417
418        (resp, sock) = self.get(
419            headers={'Host': 'localhost', 'Connection': 'keep-alive'},
420            start=True,
421            read_timeout=1,
422        )
423
424        assert resp['status'] == 200, 'initial status'
425
426        self.add_tls()
427
428        assert self.get(sock=sock)['status'] == 200, 'reconfigure status'
429        assert self.get_ssl()['status'] == 200, 'reconfigure tls status'
430
431    def test_tls_keepalive(self):
432        self.load('mirror')
433
434        assert self.get()['status'] == 200, 'init'
435
436        self.certificate()
437
438        self.add_tls(application='mirror')
439
440        (resp, sock) = self.post_ssl(
441            headers={
442                'Host': 'localhost',
443                'Connection': 'keep-alive',
444                'Content-Type': 'text/html',
445            },
446            start=True,
447            body='0123456789',
448            read_timeout=1,
449        )
450
451        assert resp['body'] == '0123456789', 'keepalive 1'
452
453        resp = self.post_ssl(
454            headers={
455                'Host': 'localhost',
456                'Connection': 'close',
457                'Content-Type': 'text/html',
458            },
459            sock=sock,
460            body='0123456789',
461        )
462
463        assert resp['body'] == '0123456789', 'keepalive 2'
464
465    @pytest.mark.skip('not yet')
466    def test_tls_keepalive_certificate_remove(self):
467        self.load('empty')
468
469        assert self.get()['status'] == 200, 'init'
470
471        self.certificate()
472
473        self.add_tls()
474
475        (resp, sock) = self.get_ssl(
476            headers={'Host': 'localhost', 'Connection': 'keep-alive'},
477            start=True,
478            read_timeout=1,
479        )
480
481        assert 'success' in self.conf(
482            {"pass": "applications/empty"}, 'listeners/*:7080'
483        )
484        assert 'success' in self.conf_delete('/certificates/default')
485
486        try:
487            resp = self.get_ssl(
488                headers={'Host': 'localhost', 'Connection': 'close'}, sock=sock
489            )
490
491        except KeyboardInterrupt:
492            raise
493
494        except:
495            resp = None
496
497        assert resp == None, 'keepalive remove certificate'
498
499    @pytest.mark.skip('not yet')
500    def test_tls_certificates_remove_all(self):
501        self.load('empty')
502
503        self.certificate()
504
505        assert 'success' in self.conf_delete(
506            '/certificates'
507        ), 'remove all certificates'
508
509    def test_tls_application_respawn(self, skip_alert):
510        self.load('mirror')
511
512        self.certificate()
513
514        assert 'success' in self.conf('1', 'applications/mirror/processes')
515
516        self.add_tls(application='mirror')
517
518        (_, sock) = self.post_ssl(
519            headers={
520                'Host': 'localhost',
521                'Connection': 'keep-alive',
522                'Content-Type': 'text/html',
523            },
524            start=True,
525            body='0123456789',
526            read_timeout=1,
527        )
528
529        app_id = self.findall(r'(\d+)#\d+ "mirror" application started')[0]
530
531        subprocess.call(['kill', '-9', app_id])
532
533        skip_alert(r'process %s exited on signal 9' % app_id)
534
535        self.wait_for_record(
536            re.compile(
537                r' (?!' + app_id + r'#)(\d+)#\d+ "mirror" application started'
538            )
539        )
540
541        resp = self.post_ssl(
542            headers={
543                'Host': 'localhost',
544                'Connection': 'close',
545                'Content-Type': 'text/html',
546            },
547            sock=sock,
548            body='0123456789',
549        )
550
551        assert resp['status'] == 200, 'application respawn status'
552        assert resp['body'] == '0123456789', 'application respawn body'
553
554    def test_tls_url_scheme(self):
555        self.load('variables')
556
557        assert (
558            self.post(
559                headers={
560                    'Host': 'localhost',
561                    'Content-Type': 'text/html',
562                    'Custom-Header': '',
563                    'Connection': 'close',
564                }
565            )['headers']['Wsgi-Url-Scheme']
566            == 'http'
567        ), 'url scheme http'
568
569        self.certificate()
570
571        self.add_tls(application='variables')
572
573        assert (
574            self.post_ssl(
575                headers={
576                    'Host': 'localhost',
577                    'Content-Type': 'text/html',
578                    'Custom-Header': '',
579                    'Connection': 'close',
580                }
581            )['headers']['Wsgi-Url-Scheme']
582            == 'https'
583        ), 'url scheme https'
584
585    def test_tls_big_upload(self):
586        self.load('upload')
587
588        self.certificate()
589
590        self.add_tls(application='upload')
591
592        filename = 'test.txt'
593        data = '0123456789' * 9000
594
595        res = self.post_ssl(
596            body={
597                'file': {
598                    'filename': filename,
599                    'type': 'text/plain',
600                    'data': io.StringIO(data),
601                }
602            }
603        )
604        assert res['status'] == 200, 'status ok'
605        assert res['body'] == filename + data
606