xref: /unit/test/test_tls.py (revision 1850:839024ce4a6a)
1import io
2import re
3import ssl
4import subprocess
5
6import pytest
7
8from unit.applications.tls import TestApplicationTLS
9
10
11class TestTLS(TestApplicationTLS):
12    prerequisites = {'modules': {'python': 'any', 'openssl': 'any'}}
13
14    def openssl_date_to_sec_epoch(self, date):
15        return self.date_to_sec_epoch(date, '%b %d %H:%M:%S %Y %Z')
16
17    def add_tls(self, application='empty', cert='default', port=7080):
18        assert 'success' in self.conf(
19            {
20                "pass": "applications/" + application,
21                "tls": {"certificate": cert},
22            },
23            'listeners/*:' + str(port),
24        )
25
26    def remove_tls(self, application='empty', port=7080):
27        assert 'success' in self.conf(
28            {"pass": "applications/" + application}, 'listeners/*:' + str(port)
29        )
30
31    def test_tls_listener_option_add(self):
32        self.load('empty')
33
34        self.certificate()
35
36        self.add_tls()
37
38        assert self.get_ssl()['status'] == 200, 'add listener option'
39
40    def test_tls_listener_option_remove(self):
41        self.load('empty')
42
43        self.certificate()
44
45        self.add_tls()
46
47        self.get_ssl()
48
49        self.remove_tls()
50
51        assert self.get()['status'] == 200, 'remove listener option'
52
53    def test_tls_certificate_remove(self):
54        self.load('empty')
55
56        self.certificate()
57
58        assert 'success' in self.conf_delete(
59            '/certificates/default'
60        ), 'remove certificate'
61
62    def test_tls_certificate_remove_used(self):
63        self.load('empty')
64
65        self.certificate()
66
67        self.add_tls()
68
69        assert 'error' in self.conf_delete(
70            '/certificates/default'
71        ), 'remove certificate'
72
73    def test_tls_certificate_remove_nonexisting(self):
74        self.load('empty')
75
76        self.certificate()
77
78        self.add_tls()
79
80        assert 'error' in self.conf_delete(
81            '/certificates/blah'
82        ), 'remove nonexistings certificate'
83
84    @pytest.mark.skip('not yet')
85    def test_tls_certificate_update(self):
86        self.load('empty')
87
88        self.certificate()
89
90        self.add_tls()
91
92        cert_old = self.get_server_certificate()
93
94        self.certificate()
95
96        assert cert_old != self.get_server_certificate(), 'update certificate'
97
98    @pytest.mark.skip('not yet')
99    def test_tls_certificate_key_incorrect(self):
100        self.load('empty')
101
102        self.certificate('first', False)
103        self.certificate('second', False)
104
105        assert 'error' in self.certificate_load(
106            'first', 'second'
107        ), 'key incorrect'
108
109    def test_tls_certificate_change(self):
110        self.load('empty')
111
112        self.certificate()
113        self.certificate('new')
114
115        self.add_tls()
116
117        cert_old = self.get_server_certificate()
118
119        self.add_tls(cert='new')
120
121        assert cert_old != self.get_server_certificate(), 'change certificate'
122
123    def test_tls_certificate_key_rsa(self):
124        self.load('empty')
125
126        self.certificate()
127
128        assert (
129            self.conf_get('/certificates/default/key') == 'RSA (2048 bits)'
130        ), 'certificate key rsa'
131
132    def test_tls_certificate_key_ec(self, temp_dir):
133        self.load('empty')
134
135        self.openssl_conf()
136
137        subprocess.call(
138            [
139                'openssl',
140                'ecparam',
141                '-noout',
142                '-genkey',
143                '-out',
144                temp_dir + '/ec.key',
145                '-name',
146                'prime256v1',
147            ],
148            stderr=subprocess.STDOUT,
149        )
150
151        subprocess.call(
152            [
153                'openssl',
154                'req',
155                '-x509',
156                '-new',
157                '-subj',
158                '/CN=ec/',
159                '-config',
160                temp_dir + '/openssl.conf',
161                '-key',
162                temp_dir + '/ec.key',
163                '-out',
164                temp_dir + '/ec.crt',
165            ],
166            stderr=subprocess.STDOUT,
167        )
168
169        self.certificate_load('ec')
170
171        assert (
172            self.conf_get('/certificates/ec/key') == 'ECDH'
173        ), 'certificate key ec'
174
175    def test_tls_certificate_chain_options(self):
176        self.load('empty')
177
178        self.certificate()
179
180        chain = self.conf_get('/certificates/default/chain')
181
182        assert len(chain) == 1, 'certificate chain length'
183
184        cert = chain[0]
185
186        assert (
187            cert['subject']['common_name'] == 'default'
188        ), 'certificate subject common name'
189        assert (
190            cert['issuer']['common_name'] == 'default'
191        ), 'certificate issuer common name'
192
193        assert (
194            abs(
195                self.sec_epoch()
196                - self.openssl_date_to_sec_epoch(cert['validity']['since'])
197            )
198            < 60
199        ), 'certificate validity since'
200        assert (
201            self.openssl_date_to_sec_epoch(cert['validity']['until'])
202            - self.openssl_date_to_sec_epoch(cert['validity']['since'])
203            == 2592000
204        ), 'certificate validity until'
205
206    def test_tls_certificate_chain(self, temp_dir):
207        self.load('empty')
208
209        self.certificate('root', False)
210
211        subprocess.call(
212            [
213                'openssl',
214                'req',
215                '-new',
216                '-subj',
217                '/CN=int/',
218                '-config',
219                temp_dir + '/openssl.conf',
220                '-out',
221                temp_dir + '/int.csr',
222                '-keyout',
223                temp_dir + '/int.key',
224            ],
225            stderr=subprocess.STDOUT,
226        )
227
228        subprocess.call(
229            [
230                'openssl',
231                'req',
232                '-new',
233                '-subj',
234                '/CN=end/',
235                '-config',
236                temp_dir + '/openssl.conf',
237                '-out',
238                temp_dir + '/end.csr',
239                '-keyout',
240                temp_dir + '/end.key',
241            ],
242            stderr=subprocess.STDOUT,
243        )
244
245        with open(temp_dir + '/ca.conf', 'w') as f:
246            f.write(
247                """[ ca ]
248default_ca = myca
249
250[ myca ]
251new_certs_dir = %(dir)s
252database = %(database)s
253default_md = sha256
254policy = myca_policy
255serial = %(certserial)s
256default_days = 1
257x509_extensions = myca_extensions
258
259[ myca_policy ]
260commonName = supplied
261
262[ myca_extensions ]
263basicConstraints = critical,CA:TRUE"""
264                % {
265                    'dir': temp_dir,
266                    'database': temp_dir + '/certindex',
267                    'certserial': temp_dir + '/certserial',
268                }
269            )
270
271        with open(temp_dir + '/certserial', 'w') as f:
272            f.write('1000')
273
274        with open(temp_dir + '/certindex', 'w') as f:
275            f.write('')
276
277        subprocess.call(
278            [
279                'openssl',
280                'ca',
281                '-batch',
282                '-subj',
283                '/CN=int/',
284                '-config',
285                temp_dir + '/ca.conf',
286                '-keyfile',
287                temp_dir + '/root.key',
288                '-cert',
289                temp_dir + '/root.crt',
290                '-in',
291                temp_dir + '/int.csr',
292                '-out',
293                temp_dir + '/int.crt',
294            ],
295            stderr=subprocess.STDOUT,
296        )
297
298        subprocess.call(
299            [
300                'openssl',
301                'ca',
302                '-batch',
303                '-subj',
304                '/CN=end/',
305                '-config',
306                temp_dir + '/ca.conf',
307                '-keyfile',
308                temp_dir + '/int.key',
309                '-cert',
310                temp_dir + '/int.crt',
311                '-in',
312                temp_dir + '/end.csr',
313                '-out',
314                temp_dir + '/end.crt',
315            ],
316            stderr=subprocess.STDOUT,
317        )
318
319        crt_path = temp_dir + '/end-int.crt'
320        end_path = temp_dir + '/end.crt'
321        int_path = temp_dir + '/int.crt'
322
323        with open(crt_path, 'wb') as crt, open(end_path, 'rb') as end, open(
324            int_path, 'rb'
325        ) as int:
326            crt.write(end.read() + int.read())
327
328        self.context = ssl.create_default_context()
329        self.context.check_hostname = False
330        self.context.verify_mode = ssl.CERT_REQUIRED
331        self.context.load_verify_locations(temp_dir + '/root.crt')
332
333        # incomplete chain
334
335        assert 'success' in self.certificate_load(
336            'end', 'end'
337        ), 'certificate chain end upload'
338
339        chain = self.conf_get('/certificates/end/chain')
340        assert len(chain) == 1, 'certificate chain end length'
341        assert (
342            chain[0]['subject']['common_name'] == 'end'
343        ), 'certificate chain end subject common name'
344        assert (
345            chain[0]['issuer']['common_name'] == 'int'
346        ), 'certificate chain end issuer common name'
347
348        self.add_tls(cert='end')
349
350        try:
351            resp = self.get_ssl()
352        except ssl.SSLError:
353            resp = None
354
355        assert resp == None, 'certificate chain incomplete chain'
356
357        # intermediate
358
359        assert 'success' in self.certificate_load(
360            'int', 'int'
361        ), 'certificate chain int upload'
362
363        chain = self.conf_get('/certificates/int/chain')
364        assert len(chain) == 1, 'certificate chain int length'
365        assert (
366            chain[0]['subject']['common_name'] == 'int'
367        ), 'certificate chain int subject common name'
368        assert (
369            chain[0]['issuer']['common_name'] == 'root'
370        ), 'certificate chain int issuer common name'
371
372        self.add_tls(cert='int')
373
374        assert (
375            self.get_ssl()['status'] == 200
376        ), 'certificate chain intermediate'
377
378        # intermediate server
379
380        assert 'success' in self.certificate_load(
381            'end-int', 'end'
382        ), 'certificate chain end-int upload'
383
384        chain = self.conf_get('/certificates/end-int/chain')
385        assert len(chain) == 2, 'certificate chain end-int length'
386        assert (
387            chain[0]['subject']['common_name'] == 'end'
388        ), 'certificate chain end-int int subject common name'
389        assert (
390            chain[0]['issuer']['common_name'] == 'int'
391        ), 'certificate chain end-int int issuer common name'
392        assert (
393            chain[1]['subject']['common_name'] == 'int'
394        ), 'certificate chain end-int end subject common name'
395        assert (
396            chain[1]['issuer']['common_name'] == 'root'
397        ), 'certificate chain end-int end issuer common name'
398
399        self.add_tls(cert='end-int')
400
401        assert (
402            self.get_ssl()['status'] == 200
403        ), 'certificate chain intermediate server'
404
405    @pytest.mark.skip('not yet')
406    def test_tls_reconfigure(self):
407        self.load('empty')
408
409        assert self.get()['status'] == 200, 'init'
410
411        self.certificate()
412
413        (resp, sock) = self.get(
414            headers={'Host': 'localhost', 'Connection': 'keep-alive'},
415            start=True,
416            read_timeout=1,
417        )
418
419        assert resp['status'] == 200, 'initial status'
420
421        self.add_tls()
422
423        assert self.get(sock=sock)['status'] == 200, 'reconfigure status'
424        assert self.get_ssl()['status'] == 200, 'reconfigure tls status'
425
426    def test_tls_keepalive(self):
427        self.load('mirror')
428
429        assert self.get()['status'] == 200, 'init'
430
431        self.certificate()
432
433        self.add_tls(application='mirror')
434
435        (resp, sock) = self.post_ssl(
436            headers={
437                'Host': 'localhost',
438                'Connection': 'keep-alive',
439                'Content-Type': 'text/html',
440            },
441            start=True,
442            body='0123456789',
443            read_timeout=1,
444        )
445
446        assert resp['body'] == '0123456789', 'keepalive 1'
447
448        resp = self.post_ssl(
449            headers={
450                'Host': 'localhost',
451                'Connection': 'close',
452                'Content-Type': 'text/html',
453            },
454            sock=sock,
455            body='0123456789',
456        )
457
458        assert resp['body'] == '0123456789', 'keepalive 2'
459
460    @pytest.mark.skip('not yet')
461    def test_tls_keepalive_certificate_remove(self):
462        self.load('empty')
463
464        assert self.get()['status'] == 200, 'init'
465
466        self.certificate()
467
468        self.add_tls()
469
470        (resp, sock) = self.get_ssl(
471            headers={'Host': 'localhost', 'Connection': 'keep-alive'},
472            start=True,
473            read_timeout=1,
474        )
475
476        assert 'success' in self.conf(
477            {"pass": "applications/empty"}, 'listeners/*:7080'
478        )
479        assert 'success' in self.conf_delete('/certificates/default')
480
481        try:
482            resp = self.get_ssl(
483                headers={'Host': 'localhost', 'Connection': 'close'}, sock=sock
484            )
485
486        except KeyboardInterrupt:
487            raise
488
489        except:
490            resp = None
491
492        assert resp == None, 'keepalive remove certificate'
493
494    @pytest.mark.skip('not yet')
495    def test_tls_certificates_remove_all(self):
496        self.load('empty')
497
498        self.certificate()
499
500        assert 'success' in self.conf_delete(
501            '/certificates'
502        ), 'remove all certificates'
503
504    def test_tls_application_respawn(self, skip_alert):
505        self.load('mirror')
506
507        self.certificate()
508
509        assert 'success' in self.conf('1', 'applications/mirror/processes')
510
511        self.add_tls(application='mirror')
512
513        (_, sock) = self.post_ssl(
514            headers={
515                'Host': 'localhost',
516                'Connection': 'keep-alive',
517                'Content-Type': 'text/html',
518            },
519            start=True,
520            body='0123456789',
521            read_timeout=1,
522        )
523
524        app_id = self.findall(r'(\d+)#\d+ "mirror" application started')[0]
525
526        subprocess.call(['kill', '-9', app_id])
527
528        skip_alert(r'process %s exited on signal 9' % app_id)
529
530        self.wait_for_record(
531            re.compile(
532                r' (?!' + app_id + r'#)(\d+)#\d+ "mirror" application started'
533            )
534        )
535
536        resp = self.post_ssl(
537            headers={
538                'Host': 'localhost',
539                'Connection': 'close',
540                'Content-Type': 'text/html',
541            },
542            sock=sock,
543            body='0123456789',
544        )
545
546        assert resp['status'] == 200, 'application respawn status'
547        assert resp['body'] == '0123456789', 'application respawn body'
548
549    def test_tls_url_scheme(self):
550        self.load('variables')
551
552        assert (
553            self.post(
554                headers={
555                    'Host': 'localhost',
556                    'Content-Type': 'text/html',
557                    'Custom-Header': '',
558                    'Connection': 'close',
559                }
560            )['headers']['Wsgi-Url-Scheme']
561            == 'http'
562        ), 'url scheme http'
563
564        self.certificate()
565
566        self.add_tls(application='variables')
567
568        assert (
569            self.post_ssl(
570                headers={
571                    'Host': 'localhost',
572                    'Content-Type': 'text/html',
573                    'Custom-Header': '',
574                    'Connection': 'close',
575                }
576            )['headers']['Wsgi-Url-Scheme']
577            == 'https'
578        ), 'url scheme https'
579
580    def test_tls_big_upload(self):
581        self.load('upload')
582
583        self.certificate()
584
585        self.add_tls(application='upload')
586
587        filename = 'test.txt'
588        data = '0123456789' * 9000
589
590        res = self.post_ssl(
591            body={
592                'file': {
593                    'filename': filename,
594                    'type': 'text/plain',
595                    'data': io.StringIO(data),
596                }
597            }
598        )
599        assert res['status'] == 200, 'status ok'
600        assert res['body'] == filename + data
601