xref: /unit/test/test_tls_session.py (revision 1981:65494883b62c)
1import socket
2import time
3
4import pytest
5from OpenSSL.SSL import (
6    TLSv1_2_METHOD,
7    SESS_CACHE_CLIENT,
8    OP_NO_TICKET,
9    Context,
10    Connection,
11    _lib,
12)
13from unit.applications.tls import TestApplicationTLS
14from unit.option import option
15
16
17class TestTLSSession(TestApplicationTLS):
18    prerequisites = {'modules': {'openssl': 'any'}}
19
20    @pytest.fixture(autouse=True)
21    def setup_method_fixture(self, request):
22        self.certificate()
23
24        assert 'success' in self.conf(
25            {
26                "listeners": {
27                    "*:7080": {
28                        "pass": "routes",
29                        "tls": {"certificate": "default", "session": {}},
30                    }
31                },
32                "routes": [{"action": {"return": 200}}],
33                "applications": {},
34            }
35        ), 'load application configuration'
36
37    def add_session(self, cache_size=None, timeout=None):
38        session = {}
39
40        if cache_size is not None:
41            session['cache_size'] = cache_size
42        if timeout is not None:
43            session['timeout'] = timeout
44
45        return self.conf(session, 'listeners/*:7080/tls/session')
46
47    def connect(self, ctx=None, session=None):
48        sock = socket.create_connection(('127.0.0.1', 7080))
49
50        if ctx is None:
51            ctx = Context(TLSv1_2_METHOD)
52            ctx.set_session_cache_mode(SESS_CACHE_CLIENT)
53            ctx.set_options(OP_NO_TICKET)
54
55        client = Connection(ctx, sock)
56        client.set_connect_state()
57
58        if session is not None:
59            client.set_session(session)
60
61        client.do_handshake()
62        client.shutdown()
63
64        return (
65            client,
66            client.get_session(),
67            ctx,
68            _lib.SSL_session_reused(client._ssl),
69        )
70
71    def test_tls_session(self):
72        client, sess, ctx, reused = self.connect()
73        assert not reused, 'new connection'
74
75        client, _, _, reused = self.connect(ctx, sess)
76        assert not reused, 'no cache'
77
78        assert 'success' in self.add_session(cache_size=1)
79
80        client, sess, ctx, reused = self.connect()
81        assert not reused, 'new connection cache'
82
83        client, _, _, reused = self.connect(ctx, sess)
84        assert reused, 'cache'
85
86        client, _, _, reused = self.connect(ctx, sess)
87        assert reused, 'cache 2'
88
89        # check that at least one session of two is not reused
90
91        client, sess, ctx, reused = self.connect()
92        client2, sess2, ctx2, reused2 = self.connect()
93        assert True not in [reused, reused2], 'new connection cache small'
94
95        client, _, _, reused = self.connect(ctx, sess)
96        client2, _, _, reused2 = self.connect(ctx2, sess2)
97        assert False in [reused, reused2], 'cache small'
98
99        # both sessions are reused
100
101        assert 'success' in self.add_session(cache_size=2)
102
103        client, sess, ctx, reused = self.connect()
104        client2, sess2, ctx2, reused2 = self.connect()
105        assert True not in [reused, reused2], 'new connection cache big'
106
107        client, _, _, reused = self.connect(ctx, sess)
108        client2, _, _, reused2 = self.connect(ctx2, sess2)
109        assert False not in [reused, reused2], 'cache big'
110
111    def test_tls_session_timeout(self):
112        assert 'success' in self.add_session(cache_size=1, timeout=1)
113
114        client, sess, ctx, reused = self.connect()
115        assert not reused, 'new connection'
116
117        client, _, _, reused = self.connect(ctx, sess)
118        assert reused, 'no timeout'
119
120        time.sleep(3)
121
122        client, _, _, reused = self.connect(ctx, sess)
123        assert not reused, 'timeout'
124
125    def test_tls_session_invalid(self):
126        assert 'error' in self.add_session(cache_size=-1)
127        assert 'error' in self.add_session(cache_size={})
128        assert 'error' in self.add_session(timeout=-1)
129        assert 'error' in self.add_session(timeout={})
130