xref: /unit/test/test_tls.py (revision 1029:687f7cc7aae2)
1import re
2import ssl
3import time
4import subprocess
5import unittest
6from unit.applications.tls import TestApplicationTLS
7
8
9class TestTLS(TestApplicationTLS):
10    prerequisites = ['python', 'openssl']
11
12    def findall(self, pattern):
13        with open(self.testdir + '/unit.log', 'r', errors='ignore') as f:
14            return re.findall(pattern, f.read())
15
16    def openssl_date_to_sec_epoch(self, date):
17        return self.date_to_sec_epoch(date, '%b %d %H:%M:%S %Y %Z')
18
19    def add_tls(self, application='empty', cert='default', port=7080):
20        self.conf(
21            {"application": application, "tls": {"certificate": cert}},
22            'listeners/*:' + str(port),
23        )
24
25    def remove_tls(self, application='empty', port=7080):
26        self.conf({"application": application}, 'listeners/*:' + str(port))
27
28    def test_tls_listener_option_add(self):
29        self.load('empty')
30
31        self.certificate()
32
33        self.add_tls()
34
35        self.assertEqual(self.get_ssl()['status'], 200, 'add listener option')
36
37    def test_tls_listener_option_remove(self):
38        self.load('empty')
39
40        self.certificate()
41
42        self.add_tls()
43
44        self.get_ssl()
45
46        self.remove_tls()
47
48        self.assertEqual(self.get()['status'], 200, 'remove listener option')
49
50    def test_tls_certificate_remove(self):
51        self.load('empty')
52
53        self.certificate()
54
55        self.assertIn(
56            'success',
57            self.conf_delete('/certificates/default'),
58            'remove certificate',
59        )
60
61    def test_tls_certificate_remove_used(self):
62        self.load('empty')
63
64        self.certificate()
65
66        self.add_tls()
67
68        self.assertIn(
69            'error',
70            self.conf_delete('/certificates/default'),
71            'remove certificate',
72        )
73
74    def test_tls_certificate_remove_nonexisting(self):
75        self.load('empty')
76
77        self.certificate()
78
79        self.add_tls()
80
81        self.assertIn(
82            'error',
83            self.conf_delete('/certificates/blah'),
84            'remove nonexistings certificate',
85        )
86
87    @unittest.expectedFailure
88    def test_tls_certificate_update(self):
89        self.load('empty')
90
91        self.certificate()
92
93        self.add_tls()
94
95        cert_old = self.get_server_certificate()
96
97        self.certificate()
98
99        self.assertNotEqual(
100            cert_old, self.get_server_certificate(), 'update certificate'
101        )
102
103    @unittest.expectedFailure
104    def test_tls_certificate_key_incorrect(self):
105        self.load('empty')
106
107        self.certificate('first', False)
108        self.certificate('second', False)
109
110        self.assertIn(
111            'error', self.certificate_load('first', 'second'), 'key incorrect'
112        )
113
114    def test_tls_certificate_change(self):
115        self.load('empty')
116
117        self.certificate()
118        self.certificate('new')
119
120        self.add_tls()
121
122        cert_old = self.get_server_certificate()
123
124        self.add_tls(cert='new')
125
126        self.assertNotEqual(
127            cert_old, self.get_server_certificate(), 'change certificate'
128        )
129
130    def test_tls_certificate_key_rsa(self):
131        self.load('empty')
132
133        self.certificate()
134
135        self.assertEqual(
136            self.conf_get('/certificates/default/key'),
137            'RSA (1024 bits)',
138            'certificate key rsa',
139        )
140
141    def test_tls_certificate_key_ec(self):
142        self.load('empty')
143
144        subprocess.call(
145            [
146                'openssl',
147                'ecparam',
148                '-noout',
149                '-genkey',
150                '-out',   self.testdir + '/ec.key',
151                '-name',  'prime256v1',
152            ]
153        )
154
155        subprocess.call(
156            [
157                'openssl',
158                'req',
159                '-x509',
160                '-new',
161                '-subj',    '/CN=ec/',
162                '-config',  self.testdir + '/openssl.conf',
163                '-key',     self.testdir + '/ec.key',
164                '-out',     self.testdir + '/ec.crt',
165            ]
166        )
167
168        self.certificate_load('ec')
169
170        self.assertEqual(
171            self.conf_get('/certificates/ec/key'), 'ECDH', 'certificate key ec'
172        )
173
174    def test_tls_certificate_chain_options(self):
175        self.load('empty')
176
177        self.certificate()
178
179        chain = self.conf_get('/certificates/default/chain')
180
181        self.assertEqual(len(chain), 1, 'certificate chain length')
182
183        cert = chain[0]
184
185        self.assertEqual(
186            cert['subject']['common_name'],
187            'default',
188            'certificate subject common name',
189        )
190        self.assertEqual(
191            cert['issuer']['common_name'],
192            'default',
193            'certificate issuer common name',
194        )
195
196        self.assertLess(
197            abs(
198                self.sec_epoch()
199                - self.openssl_date_to_sec_epoch(cert['validity']['since'])
200            ),
201            5,
202            'certificate validity since',
203        )
204        self.assertEqual(
205            self.openssl_date_to_sec_epoch(cert['validity']['until'])
206            - self.openssl_date_to_sec_epoch(cert['validity']['since']),
207            2592000,
208            'certificate validity until',
209        )
210
211    def test_tls_certificate_chain(self):
212        self.load('empty')
213
214        self.certificate('root', False)
215
216        subprocess.call(
217            [
218                'openssl',
219                'req',
220                '-new',
221                '-subj',    '/CN=int/',
222                '-config',  self.testdir + '/openssl.conf',
223                '-out',     self.testdir + '/int.csr',
224                '-keyout',  self.testdir + '/int.key',
225            ]
226        )
227
228        subprocess.call(
229            [
230                'openssl',
231                'req',
232                '-new',
233                '-subj',    '/CN=end/',
234                '-config',  self.testdir + '/openssl.conf',
235                '-out',     self.testdir + '/end.csr',
236                '-keyout',  self.testdir + '/end.key',
237            ]
238        )
239
240        with open(self.testdir + '/ca.conf', 'w') as f:
241            f.write(
242                """[ ca ]
243default_ca = myca
244
245[ myca ]
246new_certs_dir = %(dir)s
247database = %(database)s
248default_md = sha1
249policy = myca_policy
250serial = %(certserial)s
251default_days = 1
252x509_extensions = myca_extensions
253
254[ myca_policy ]
255commonName = supplied
256
257[ myca_extensions ]
258basicConstraints = critical,CA:TRUE"""
259                % {
260                    'dir': self.testdir,
261                    'database': self.testdir + '/certindex',
262                    'certserial': self.testdir + '/certserial',
263                }
264            )
265
266        with open(self.testdir + '/certserial', 'w') as f:
267            f.write('1000')
268
269        with open(self.testdir + '/certindex', 'w') as f:
270            f.write('')
271
272        subprocess.call(
273            [
274                'openssl',
275                'ca',
276                '-batch',
277                '-subj',     '/CN=int/',
278                '-config',   self.testdir + '/ca.conf',
279                '-keyfile',  self.testdir + '/root.key',
280                '-cert',     self.testdir + '/root.crt',
281                '-in',       self.testdir + '/int.csr',
282                '-out',      self.testdir + '/int.crt',
283            ]
284        )
285
286        subprocess.call(
287            [
288                'openssl',
289                'ca',
290                '-batch',
291                '-subj',     '/CN=end/',
292                '-config',   self.testdir + '/ca.conf',
293                '-keyfile',  self.testdir + '/int.key',
294                '-cert',     self.testdir + '/int.crt',
295                '-in',       self.testdir + '/end.csr',
296                '-out',      self.testdir + '/end.crt',
297            ]
298        )
299
300        crt_path = self.testdir + '/end-int.crt'
301        end_path = self.testdir + '/end.crt'
302        int_path = self.testdir + '/int.crt'
303
304        with open(crt_path, 'wb') as crt, \
305             open(end_path, 'rb') as end, \
306             open(int_path, 'rb') as int:
307            crt.write(end.read() + int.read())
308
309        self.context = ssl.create_default_context()
310        self.context.check_hostname = False
311        self.context.verify_mode = ssl.CERT_REQUIRED
312        self.context.load_verify_locations(self.testdir + '/root.crt')
313
314        # incomplete chain
315
316        self.assertIn(
317            'success',
318            self.certificate_load('end', 'end'),
319            'certificate chain end upload',
320        )
321
322        chain = self.conf_get('/certificates/end/chain')
323        self.assertEqual(len(chain), 1, 'certificate chain end length')
324        self.assertEqual(
325            chain[0]['subject']['common_name'],
326            'end',
327            'certificate chain end subject common name',
328        )
329        self.assertEqual(
330            chain[0]['issuer']['common_name'],
331            'int',
332            'certificate chain end issuer common name',
333        )
334
335        self.add_tls(cert='end')
336
337        try:
338            resp = self.get_ssl()
339        except ssl.SSLError:
340            resp = None
341
342        self.assertEqual(resp, None, 'certificate chain incomplete chain')
343
344        # intermediate
345
346        self.assertIn(
347            'success',
348            self.certificate_load('int', 'int'),
349            'certificate chain int upload',
350        )
351
352        chain = self.conf_get('/certificates/int/chain')
353        self.assertEqual(len(chain), 1, 'certificate chain int length')
354        self.assertEqual(
355            chain[0]['subject']['common_name'],
356            'int',
357            'certificate chain int subject common name',
358        )
359        self.assertEqual(
360            chain[0]['issuer']['common_name'],
361            'root',
362            'certificate chain int issuer common name',
363        )
364
365        self.add_tls(cert='int')
366
367        self.assertEqual(
368            self.get_ssl()['status'], 200, 'certificate chain intermediate'
369        )
370
371        # intermediate server
372
373        self.assertIn(
374            'success',
375            self.certificate_load('end-int', 'end'),
376            'certificate chain end-int upload',
377        )
378
379        chain = self.conf_get('/certificates/end-int/chain')
380        self.assertEqual(len(chain), 2, 'certificate chain end-int length')
381        self.assertEqual(
382            chain[0]['subject']['common_name'],
383            'end',
384            'certificate chain end-int int subject common name',
385        )
386        self.assertEqual(
387            chain[0]['issuer']['common_name'],
388            'int',
389            'certificate chain end-int int issuer common name',
390        )
391        self.assertEqual(
392            chain[1]['subject']['common_name'],
393            'int',
394            'certificate chain end-int end subject common name',
395        )
396        self.assertEqual(
397            chain[1]['issuer']['common_name'],
398            'root',
399            'certificate chain end-int end issuer common name',
400        )
401
402        self.add_tls(cert='end-int')
403
404        self.assertEqual(
405            self.get_ssl()['status'],
406            200,
407            'certificate chain intermediate server',
408        )
409
410    @unittest.expectedFailure
411    def test_tls_reconfigure(self):
412        self.load('empty')
413
414        self.assertEqual(self.get()['status'], 200, 'init')
415
416        self.certificate()
417
418        (resp, sock) = self.get(
419            headers={'Host': 'localhost', 'Connection': 'keep-alive'},
420            start=True,
421            read_timeout=1,
422        )
423
424        self.assertEqual(resp['status'], 200, 'initial status')
425
426        self.add_tls()
427
428        self.assertEqual(
429            self.get(sock=sock)['status'], 200, 'reconfigure status'
430        )
431        self.assertEqual(
432            self.get_ssl()['status'], 200, 'reconfigure tls status'
433        )
434
435    def test_tls_keepalive(self):
436        self.load('mirror')
437
438        self.assertEqual(self.get()['status'], 200, 'init')
439
440        self.certificate()
441
442        self.add_tls(application='mirror')
443
444        (resp, sock) = self.post_ssl(
445            headers={
446                'Host': 'localhost',
447                'Connection': 'keep-alive',
448                'Content-Type': 'text/html',
449            },
450            start=True,
451            body='0123456789',
452            read_timeout=1,
453        )
454
455        self.assertEqual(resp['body'], '0123456789', 'keepalive 1')
456
457        resp = self.post_ssl(
458            headers={
459                'Host': 'localhost',
460                'Connection': 'close',
461                'Content-Type': 'text/html',
462            },
463            sock=sock,
464            body='0123456789',
465        )
466
467        self.assertEqual(resp['body'], '0123456789', 'keepalive 2')
468
469    @unittest.expectedFailure
470    def test_tls_keepalive_certificate_remove(self):
471        self.load('empty')
472
473        self.assertEqual(self.get()['status'], 200, 'init')
474
475        self.certificate()
476
477        self.add_tls()
478
479        (resp, sock) = self.get_ssl(
480            headers={'Host': 'localhost', 'Connection': 'keep-alive'},
481            start=True,
482            read_timeout=1,
483        )
484
485        self.conf({"application": "empty"}, 'listeners/*:7080')
486        self.conf_delete('/certificates/default')
487
488        try:
489            resp = self.get_ssl(
490                headers={'Host': 'localhost', 'Connection': 'close'}, sock=sock
491            )
492        except:
493            resp = None
494
495        self.assertEqual(resp, None, 'keepalive remove certificate')
496
497    @unittest.expectedFailure
498    def test_tls_certificates_remove_all(self):
499        self.load('empty')
500
501        self.certificate()
502
503        self.assertIn(
504            'success',
505            self.conf_delete('/certificates'),
506            'remove all certificates',
507        )
508
509    def test_tls_application_respawn(self):
510        self.skip_alerts.append(r'process \d+ exited on signal 9')
511        self.load('mirror')
512
513        self.assertEqual(self.get()['status'], 200, 'init')
514
515        self.certificate()
516
517        self.conf('1', 'applications/mirror/processes')
518
519        self.add_tls(application='mirror')
520
521        (resp, sock) = self.post_ssl(
522            headers={
523                'Host': 'localhost',
524                'Connection': 'keep-alive',
525                'Content-Type': 'text/html',
526            },
527            start=True,
528            body='0123456789',
529            read_timeout=1,
530        )
531
532        app_id = self.findall(r'(\d+)#\d+ "mirror" application started')[0]
533
534        subprocess.call(['kill', '-9', app_id])
535
536        self.wait_for_record(
537            re.compile(
538                ' (?!' + app_id + '#)(\d+)#\d+ "mirror" application started'
539            )
540        )
541
542        resp = self.post_ssl(
543            headers={
544                'Host': 'localhost',
545                'Connection': 'close',
546                'Content-Type': 'text/html',
547            },
548            sock=sock,
549            body='0123456789',
550        )
551
552        self.assertEqual(resp['status'], 200, 'application respawn status')
553        self.assertEqual(
554            resp['body'], '0123456789', 'application respawn body'
555        )
556
557    def test_tls_url_scheme(self):
558        self.load('variables')
559
560        self.assertEqual(
561            self.post(
562                headers={
563                    'Host': 'localhost',
564                    'Content-Type': 'text/html',
565                    'Custom-Header': '',
566                    'Connection': 'close',
567                }
568            )['headers']['Wsgi-Url-Scheme'],
569            'http',
570            'url scheme http',
571        )
572
573        self.certificate()
574
575        self.add_tls(application='variables')
576
577        self.assertEqual(
578            self.post_ssl(
579                headers={
580                    'Host': 'localhost',
581                    'Content-Type': 'text/html',
582                    'Custom-Header': '',
583                    'Connection': 'close',
584                }
585            )['headers']['Wsgi-Url-Scheme'],
586            'https',
587            'url scheme https',
588        )
589
590if __name__ == '__main__':
591    TestTLS.main()
592