xref: /unit/test/test_reconfigure_tls.py (revision 2112:94d2a108d227)
1import socket
2import ssl
3import time
4
5import pytest
6from unit.applications.tls import TestApplicationTLS
7
8
9class TestReconfigureTLS(TestApplicationTLS):
10    prerequisites = {'modules': {'openssl': 'any'}}
11
12    @pytest.fixture(autouse=True)
13    def setup_method_fixture(self):
14        if 'HAS_TLSv1_2' not in dir(ssl) or not ssl.HAS_TLSv1_2:
15            pytest.skip('OpenSSL too old')
16
17        self.certificate()
18
19        assert 'success' in self.conf(
20            {
21                "listeners": {
22                    "*:7080": {
23                        "pass": "routes",
24                        "tls": {"certificate": "default"},
25                    }
26                },
27                "routes": [{"action": {"return": 200}}],
28                "applications": {},
29            }
30        ), 'load application configuration'
31
32    def create_socket(self):
33        ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
34        ctx.check_hostname = False
35        ctx.verify_mode = ssl.CERT_NONE
36
37        s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
38        ssl_sock = ctx.wrap_socket(
39            s, server_hostname='localhost', do_handshake_on_connect=False
40        )
41        ssl_sock.connect(('127.0.0.1', 7080))
42
43        return ssl_sock
44
45    def clear_conf(self):
46        assert 'success' in self.conf({"listeners": {}, "applications": {}})
47
48    @pytest.mark.skip('not yet')
49    def test_reconfigure_tls_switch(self):
50        assert 'success' in self.conf_delete('listeners/*:7080/tls')
51
52        (_, sock) = self.get(
53            headers={'Host': 'localhost', 'Connection': 'keep-alive'},
54            start=True,
55            read_timeout=1,
56        )
57
58        assert 'success' in self.conf(
59            {"pass": "routes", "tls": {"certificate": "default"}},
60            'listeners/*:7080',
61        )
62
63        assert self.get(sock=sock)['status'] == 200, 'reconfigure'
64        assert self.get_ssl()['status'] == 200, 'reconfigure tls'
65
66    def test_reconfigure_tls(self):
67        ssl_sock = self.create_socket()
68
69        ssl_sock.sendall("""GET / HTTP/1.1\r\n""".encode())
70
71        self.clear_conf()
72
73        ssl_sock.sendall(
74            """Host: localhost\r\nConnection: close\r\n\r\n""".encode()
75        )
76
77        assert (
78            self.recvall(ssl_sock).decode().startswith('HTTP/1.1 200 OK')
79        ), 'finish request'
80
81    def test_reconfigure_tls_2(self):
82        ssl_sock = self.create_socket()
83
84        # Waiting for connection completion.
85        # Delay should be more than TCP_DEFER_ACCEPT.
86        time.sleep(1.5)
87
88        self.clear_conf()
89
90        try:
91            ssl_sock.do_handshake()
92        except ssl.SSLError:
93            ssl_sock.close()
94            success = True
95
96        if not success:
97            pytest.fail('Connection is not closed.')
98
99    def test_reconfigure_tls_3(self):
100        ssl_sock = self.create_socket()
101        ssl_sock.do_handshake()
102
103        self.clear_conf()
104
105        assert self.get(sock=ssl_sock)['status'] == 408, 'request timeout'
106