xref: /unit/test/test_reconfigure_tls.py (revision 2086:3baca957c0dd)
1import socket
2import ssl
3import time
4
5import pytest
6from unit.applications.tls import TestApplicationTLS
7
8
9class TestReconfigureTLS(TestApplicationTLS):
10    prerequisites = {'modules': {'openssl': 'any'}}
11
12    @pytest.fixture(autouse=True)
13    def setup_method_fixture(self):
14        self.certificate()
15
16        assert 'success' in self.conf(
17            {
18                "listeners": {
19                    "*:7080": {
20                        "pass": "routes",
21                        "tls": {"certificate": "default"},
22                    }
23                },
24                "routes": [{"action": {"return": 200}}],
25                "applications": {},
26            }
27        ), 'load application configuration'
28
29    def create_socket(self):
30        ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1_2)
31        ctx.verify_mode = ssl.CERT_NONE
32        ctx.check_hostname = False
33
34        s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
35        ssl_sock = ctx.wrap_socket(
36            s, server_hostname='localhost', do_handshake_on_connect=False
37        )
38        ssl_sock.connect(('127.0.0.1', 7080))
39
40        return ssl_sock
41
42    def clear_conf(self):
43        assert 'success' in self.conf({"listeners": {}, "applications": {}})
44
45    @pytest.mark.skip('not yet')
46    def test_reconfigure_tls_switch(self):
47        assert 'success' in self.conf_delete('listeners/*:7080/tls')
48
49        (_, sock) = self.get(
50            headers={'Host': 'localhost', 'Connection': 'keep-alive'},
51            start=True,
52            read_timeout=1,
53        )
54
55        assert 'success' in self.conf(
56            {"pass": "routes", "tls": {"certificate": "default"}},
57            'listeners/*:7080',
58        )
59
60        assert self.get(sock=sock)['status'] == 200, 'reconfigure'
61        assert self.get_ssl()['status'] == 200, 'reconfigure tls'
62
63    def test_reconfigure_tls(self):
64        ssl_sock = self.create_socket()
65
66        ssl_sock.sendall("""GET / HTTP/1.1\r\n""".encode())
67
68        self.clear_conf()
69
70        ssl_sock.sendall(
71            """Host: localhost\r\nConnection: close\r\n\r\n""".encode()
72        )
73
74        assert (
75            self.recvall(ssl_sock).decode().startswith('HTTP/1.1 200 OK')
76        ), 'finish request'
77
78    def test_reconfigure_tls_2(self):
79        ssl_sock = self.create_socket()
80
81        # Waiting for connection completion.
82        # Delay should be more than TCP_DEFER_ACCEPT.
83        time.sleep(1.5)
84
85        self.clear_conf()
86
87        try:
88            ssl_sock.do_handshake()
89        except ssl.SSLError:
90            ssl_sock.close()
91            success = True
92
93        if not success:
94            pytest.fail('Connection is not closed.')
95
96    def test_reconfigure_tls_3(self):
97        ssl_sock = self.create_socket()
98        ssl_sock.do_handshake()
99
100        self.clear_conf()
101
102        assert self.get(sock=ssl_sock)['status'] == 408, 'request timeout'
103