xref: /unit/test/test_tls.py (revision 2616:ab2896c980ab)
1import io
2import ssl
3import subprocess
4import time
5from pathlib import Path
6
7import pytest
8
9from unit.applications.tls import ApplicationTLS
10from unit.option import option
11
12prerequisites = {'modules': {'python': 'any', 'openssl': 'any'}}
13
14client = ApplicationTLS()
15
16
17def add_tls(application='empty', cert='default', port=8080):
18    assert 'success' in client.conf(
19        {
20            "pass": f"applications/{application}",
21            "tls": {"certificate": cert},
22        },
23        f'listeners/*:{port}',
24    )
25
26
27def ca(cert='root', out='localhost'):
28    subprocess.check_output(
29        [
30            'openssl',
31            'ca',
32            '-batch',
33            '-config',
34            f'{option.temp_dir}/ca.conf',
35            '-keyfile',
36            f'{option.temp_dir}/{cert}.key',
37            '-cert',
38            f'{option.temp_dir}/{cert}.crt',
39            '-in',
40            f'{option.temp_dir}/{out}.csr',
41            '-out',
42            f'{option.temp_dir}/{out}.crt',
43        ],
44        stderr=subprocess.STDOUT,
45    )
46
47
48def context_cert_req(cert='root'):
49    context = ssl.create_default_context()
50    context.check_hostname = False
51    context.verify_mode = ssl.CERT_REQUIRED
52    context.load_verify_locations(f'{option.temp_dir}/{cert}.crt')
53
54    return context
55
56
57def generate_ca_conf():
58    Path(f'{option.temp_dir}/ca.conf').write_text(
59        f"""[ ca ]
60default_ca = myca
61
62[ myca ]
63new_certs_dir = {option.temp_dir}
64database = {option.temp_dir}/certindex
65default_md = sha256
66policy = myca_policy
67serial = {option.temp_dir}/certserial
68default_days = 1
69x509_extensions = myca_extensions
70copy_extensions = copy
71
72[ myca_policy ]
73commonName = optional
74
75[ myca_extensions ]
76basicConstraints = critical,CA:TRUE""",
77        encoding='utf-8',
78    )
79
80    Path(f'{option.temp_dir}/certserial').write_text('1000', encoding='utf-8')
81    Path(f'{option.temp_dir}/certindex').touch()
82    Path(f'{option.temp_dir}/certindex.attr').touch()
83
84
85def remove_tls(application='empty', port=8080):
86    assert 'success' in client.conf(
87        {"pass": f"applications/{application}"}, f'listeners/*:{port}'
88    )
89
90
91def req(name='localhost', subject=None):
92    subj = subject if subject is not None else f'/CN={name}/'
93
94    subprocess.check_output(
95        [
96            'openssl',
97            'req',
98            '-new',
99            '-subj',
100            subj,
101            '-config',
102            f'{option.temp_dir}/openssl.conf',
103            '-out',
104            f'{option.temp_dir}/{name}.csr',
105            '-keyout',
106            f'{option.temp_dir}/{name}.key',
107        ],
108        stderr=subprocess.STDOUT,
109    )
110
111
112def test_tls_listener_option_add():
113    client.load('empty')
114
115    client.certificate()
116
117    add_tls()
118
119    assert client.get_ssl()['status'] == 200, 'add listener option'
120
121
122def test_tls_listener_option_remove():
123    client.load('empty')
124
125    client.certificate()
126
127    add_tls()
128
129    client.get_ssl()
130
131    remove_tls()
132
133    assert client.get()['status'] == 200, 'remove listener option'
134
135
136def test_tls_certificate_remove():
137    client.load('empty')
138
139    client.certificate()
140
141    assert 'success' in client.conf_delete(
142        '/certificates/default'
143    ), 'remove certificate'
144
145
146def test_tls_certificate_remove_used():
147    client.load('empty')
148
149    client.certificate()
150
151    add_tls()
152
153    assert 'error' in client.conf_delete(
154        '/certificates/default'
155    ), 'remove certificate'
156
157
158def test_tls_certificate_remove_nonexisting():
159    client.load('empty')
160
161    client.certificate()
162
163    add_tls()
164
165    assert 'error' in client.conf_delete(
166        '/certificates/blah'
167    ), 'remove nonexistings certificate'
168
169
170@pytest.mark.skip('not yet')
171def test_tls_certificate_update():
172    client.load('empty')
173
174    client.certificate()
175
176    add_tls()
177
178    cert_old = ssl.get_server_certificate(('127.0.0.1', 8080))
179
180    client.certificate()
181
182    assert cert_old != ssl.get_server_certificate(
183        ('127.0.0.1', 8080)
184    ), 'update certificate'
185
186
187@pytest.mark.skip('not yet')
188def test_tls_certificate_key_incorrect():
189    client.load('empty')
190
191    client.certificate('first', False)
192    client.certificate('second', False)
193
194    assert 'error' in client.certificate_load(
195        'first', 'second'
196    ), 'key incorrect'
197
198
199def test_tls_certificate_change():
200    client.load('empty')
201
202    client.certificate()
203    client.certificate('new')
204
205    add_tls()
206
207    cert_old = ssl.get_server_certificate(('127.0.0.1', 8080))
208
209    add_tls(cert='new')
210
211    assert cert_old != ssl.get_server_certificate(
212        ('127.0.0.1', 8080)
213    ), 'change certificate'
214
215
216def test_tls_certificate_key_rsa():
217    client.load('empty')
218
219    client.certificate()
220
221    assert (
222        client.conf_get('/certificates/default/key') == 'RSA (2048 bits)'
223    ), 'certificate key rsa'
224
225
226def test_tls_certificate_key_ec(temp_dir):
227    client.load('empty')
228
229    client.openssl_conf()
230
231    subprocess.check_output(
232        [
233            'openssl',
234            'ecparam',
235            '-noout',
236            '-genkey',
237            '-out',
238            f'{temp_dir}/ec.key',
239            '-name',
240            'prime256v1',
241        ],
242        stderr=subprocess.STDOUT,
243    )
244
245    subprocess.check_output(
246        [
247            'openssl',
248            'req',
249            '-x509',
250            '-new',
251            '-subj',
252            '/CN=ec/',
253            '-config',
254            f'{temp_dir}/openssl.conf',
255            '-key',
256            f'{temp_dir}/ec.key',
257            '-out',
258            f'{temp_dir}/ec.crt',
259        ],
260        stderr=subprocess.STDOUT,
261    )
262
263    client.certificate_load('ec')
264
265    assert (
266        client.conf_get('/certificates/ec/key') == 'ECDH'
267    ), 'certificate key ec'
268
269
270def test_tls_certificate_chain_options(date_to_sec_epoch, sec_epoch):
271    client.load('empty')
272    date_format = '%b %d %X %Y %Z'
273
274    client.certificate()
275
276    chain = client.conf_get('/certificates/default/chain')
277
278    assert len(chain) == 1, 'certificate chain length'
279
280    cert = chain[0]
281
282    assert (
283        cert['subject']['common_name'] == 'default'
284    ), 'certificate subject common name'
285    assert (
286        cert['issuer']['common_name'] == 'default'
287    ), 'certificate issuer common name'
288
289    assert (
290        abs(
291            sec_epoch
292            - date_to_sec_epoch(cert['validity']['since'], date_format)
293        )
294        < 60
295    ), 'certificate validity since'
296    assert (
297        date_to_sec_epoch(cert['validity']['until'], date_format)
298        - date_to_sec_epoch(cert['validity']['since'], date_format)
299        == 2592000
300    ), 'certificate validity until'
301
302
303def test_tls_certificate_chain(temp_dir):
304    client.load('empty')
305
306    client.certificate('root', False)
307
308    req('int')
309    req('end')
310
311    generate_ca_conf()
312
313    ca(cert='root', out='int')
314    ca(cert='int', out='end')
315
316    crt_path = f'{temp_dir}/end-int.crt'
317    end_path = f'{temp_dir}/end.crt'
318    int_path = f'{temp_dir}/int.crt'
319
320    with open(crt_path, 'wb') as crt, open(end_path, 'rb') as end, open(
321        int_path, 'rb'
322    ) as inter:
323        crt.write(end.read() + inter.read())
324
325    # incomplete chain
326
327    assert 'success' in client.certificate_load(
328        'end', 'end'
329    ), 'certificate chain end upload'
330
331    chain = client.conf_get('/certificates/end/chain')
332    assert len(chain) == 1, 'certificate chain end length'
333    assert (
334        chain[0]['subject']['common_name'] == 'end'
335    ), 'certificate chain end subject common name'
336    assert (
337        chain[0]['issuer']['common_name'] == 'int'
338    ), 'certificate chain end issuer common name'
339
340    add_tls(cert='end')
341
342    ctx_cert_req = context_cert_req()
343    try:
344        resp = client.get_ssl(context=ctx_cert_req)
345    except ssl.SSLError:
346        resp = None
347
348    assert resp is None, 'certificate chain incomplete chain'
349
350    # intermediate
351
352    assert 'success' in client.certificate_load(
353        'int', 'int'
354    ), 'certificate chain int upload'
355
356    chain = client.conf_get('/certificates/int/chain')
357    assert len(chain) == 1, 'certificate chain int length'
358    assert (
359        chain[0]['subject']['common_name'] == 'int'
360    ), 'certificate chain int subject common name'
361    assert (
362        chain[0]['issuer']['common_name'] == 'root'
363    ), 'certificate chain int issuer common name'
364
365    add_tls(cert='int')
366
367    assert client.get_ssl()['status'] == 200, 'certificate chain intermediate'
368
369    # intermediate server
370
371    assert 'success' in client.certificate_load(
372        'end-int', 'end'
373    ), 'certificate chain end-int upload'
374
375    chain = client.conf_get('/certificates/end-int/chain')
376    assert len(chain) == 2, 'certificate chain end-int length'
377    assert (
378        chain[0]['subject']['common_name'] == 'end'
379    ), 'certificate chain end-int int subject common name'
380    assert (
381        chain[0]['issuer']['common_name'] == 'int'
382    ), 'certificate chain end-int int issuer common name'
383    assert (
384        chain[1]['subject']['common_name'] == 'int'
385    ), 'certificate chain end-int end subject common name'
386    assert (
387        chain[1]['issuer']['common_name'] == 'root'
388    ), 'certificate chain end-int end issuer common name'
389
390    add_tls(cert='end-int')
391
392    assert (
393        client.get_ssl(context=ctx_cert_req)['status'] == 200
394    ), 'certificate chain intermediate server'
395
396
397def test_tls_certificate_chain_long(temp_dir):
398    client.load('empty')
399
400    generate_ca_conf()
401
402    # Minimum chain length is 3.
403    chain_length = 10
404
405    for i in range(chain_length):
406        if i == 0:
407            client.certificate('root', False)
408        elif i == chain_length - 1:
409            req('end')
410        else:
411            req(f'int{i}')
412
413    for i in range(chain_length - 1):
414        if i == 0:
415            ca(cert='root', out='int1')
416        elif i == chain_length - 2:
417            ca(cert=f'int{(chain_length - 2)}', out='end')
418        else:
419            ca(cert=f'int{i}', out=f'int{(i + 1)}')
420
421    for i in range(chain_length - 1, 0, -1):
422        path = (
423            f'{temp_dir}/end.crt'
424            if i == chain_length - 1
425            else f'{temp_dir}/int{i}.crt'
426        )
427
428        with open(f'{temp_dir}/all.crt', 'a', encoding='utf-8') as chain, open(
429            path, encoding='utf-8'
430        ) as cert:
431            chain.write(cert.read())
432
433    assert 'success' in client.certificate_load(
434        'all', 'end'
435    ), 'certificate chain upload'
436
437    chain = client.conf_get('/certificates/all/chain')
438    assert len(chain) == chain_length - 1, 'certificate chain length'
439
440    add_tls(cert='all')
441
442    assert (
443        client.get_ssl(context=context_cert_req())['status'] == 200
444    ), 'certificate chain long'
445
446
447def test_tls_certificate_empty_cn():
448    client.certificate('root', False)
449
450    req(subject='/')
451
452    generate_ca_conf()
453    ca()
454
455    assert 'success' in client.certificate_load('localhost', 'localhost')
456
457    cert = client.conf_get('/certificates/localhost')
458    assert cert['chain'][0]['subject'] == {}, 'empty subject'
459    assert cert['chain'][0]['issuer']['common_name'] == 'root', 'issuer'
460
461
462def test_tls_certificate_empty_cn_san():
463    client.certificate('root', False)
464
465    client.openssl_conf(
466        rewrite=True, alt_names=["example.com", "www.example.net"]
467    )
468
469    req(subject='/')
470
471    generate_ca_conf()
472    ca()
473
474    assert 'success' in client.certificate_load('localhost', 'localhost')
475
476    cert = client.conf_get('/certificates/localhost')
477    assert cert['chain'][0]['subject'] == {
478        'alt_names': ['example.com', 'www.example.net']
479    }, 'subject alt_names'
480    assert cert['chain'][0]['issuer']['common_name'] == 'root', 'issuer'
481
482
483def test_tls_certificate_empty_cn_san_ip():
484    client.certificate('root', False)
485
486    client.openssl_conf(
487        rewrite=True,
488        alt_names=['example.com', 'www.example.net', 'IP|10.0.0.1'],
489    )
490
491    req(subject='/')
492
493    generate_ca_conf()
494    ca()
495
496    assert 'success' in client.certificate_load('localhost', 'localhost')
497
498    cert = client.conf_get('/certificates/localhost')
499    assert cert['chain'][0]['subject'] == {
500        'alt_names': ['example.com', 'www.example.net']
501    }, 'subject alt_names'
502    assert cert['chain'][0]['issuer']['common_name'] == 'root', 'issuer'
503
504
505def test_tls_keepalive():
506    client.load('mirror')
507
508    assert client.get()['status'] == 200, 'init'
509
510    client.certificate()
511
512    add_tls(application='mirror')
513
514    (resp, sock) = client.post_ssl(
515        headers={
516            'Host': 'localhost',
517            'Connection': 'keep-alive',
518        },
519        start=True,
520        body='0123456789',
521        read_timeout=1,
522    )
523
524    assert resp['body'] == '0123456789', 'keepalive 1'
525
526    resp = client.post_ssl(
527        headers={
528            'Host': 'localhost',
529            'Connection': 'close',
530        },
531        sock=sock,
532        body='0123456789',
533    )
534
535    assert resp['body'] == '0123456789', 'keepalive 2'
536
537
538def test_tls_no_close_notify():
539    client.certificate()
540
541    assert 'success' in client.conf(
542        {
543            "listeners": {
544                "*:8080": {
545                    "pass": "routes",
546                    "tls": {"certificate": "default"},
547                }
548            },
549            "routes": [{"action": {"return": 200}}],
550            "applications": {},
551        }
552    ), 'load application configuration'
553
554    (_, sock) = client.get_ssl(start=True)
555
556    time.sleep(5)
557
558    sock.close()
559
560
561@pytest.mark.skip('not yet')
562def test_tls_keepalive_certificate_remove():
563    client.load('empty')
564
565    assert client.get()['status'] == 200, 'init'
566
567    client.certificate()
568
569    add_tls()
570
571    (resp, sock) = client.get_ssl(
572        headers={'Host': 'localhost', 'Connection': 'keep-alive'},
573        start=True,
574        read_timeout=1,
575    )
576
577    assert 'success' in client.conf(
578        {"pass": "applications/empty"}, 'listeners/*:8080'
579    )
580    assert 'success' in client.conf_delete('/certificates/default')
581
582    try:
583        resp = client.get_ssl(sock=sock)
584
585    except KeyboardInterrupt:
586        raise
587
588    except:
589        resp = None
590
591    assert resp is None, 'keepalive remove certificate'
592
593
594@pytest.mark.skip('not yet')
595def test_tls_certificates_remove_all():
596    client.load('empty')
597
598    client.certificate()
599
600    assert 'success' in client.conf_delete(
601        '/certificates'
602    ), 'remove all certificates'
603
604
605def test_tls_application_respawn(findall, skip_alert, wait_for_record):
606    client.load('mirror')
607
608    client.certificate()
609
610    assert 'success' in client.conf('1', 'applications/mirror/processes')
611
612    add_tls(application='mirror')
613
614    (_, sock) = client.post_ssl(
615        headers={
616            'Host': 'localhost',
617            'Connection': 'keep-alive',
618        },
619        start=True,
620        body='0123456789',
621        read_timeout=1,
622    )
623
624    app_id = findall(r'(\d+)#\d+ "mirror" application started')[0]
625
626    subprocess.check_output(['kill', '-9', app_id])
627
628    skip_alert(fr'process {app_id} exited on signal 9')
629
630    wait_for_record(fr' (?!{app_id}#)(\d+)#\d+ "mirror" application started')
631
632    resp = client.post_ssl(sock=sock, body='0123456789')
633
634    assert resp['status'] == 200, 'application respawn status'
635    assert resp['body'] == '0123456789', 'application respawn body'
636
637
638def test_tls_url_scheme():
639    client.load('variables')
640
641    assert (
642        client.post(
643            headers={
644                'Host': 'localhost',
645                'Content-Type': 'text/html',
646                'Custom-Header': '',
647                'Connection': 'close',
648            }
649        )['headers']['Wsgi-Url-Scheme']
650        == 'http'
651    ), 'url scheme http'
652
653    client.certificate()
654
655    add_tls(application='variables')
656
657    assert (
658        client.post_ssl(
659            headers={
660                'Host': 'localhost',
661                'Content-Type': 'text/html',
662                'Custom-Header': '',
663                'Connection': 'close',
664            }
665        )['headers']['Wsgi-Url-Scheme']
666        == 'https'
667    ), 'url scheme https'
668
669
670def test_tls_big_upload():
671    client.load('upload')
672
673    client.certificate()
674
675    add_tls(application='upload')
676
677    filename = 'test.txt'
678    data = '0123456789' * 9000
679
680    res = client.post_ssl(
681        body={
682            'file': {
683                'filename': filename,
684                'type': 'text/plain',
685                'data': io.StringIO(data),
686            }
687        }
688    )
689    assert res['status'] == 200, 'status ok'
690    assert res['body'] == f'{filename}{data}'
691
692
693def test_tls_multi_listener():
694    client.load('empty')
695
696    client.certificate()
697
698    add_tls()
699    add_tls(port=8081)
700
701    assert client.get_ssl()['status'] == 200, 'listener #1'
702
703    assert client.get_ssl(port=8081)['status'] == 200, 'listener #2'
704