xref: /unit/test/test_tls_session.py (revision 1984:06514cd08a35)
1import socket
2import time
3
4import pytest
5from OpenSSL.SSL import (
6    TLSv1_2_METHOD,
7    SESS_CACHE_CLIENT,
8    OP_NO_TICKET,
9    Context,
10    Connection,
11    _lib,
12)
13from unit.applications.tls import TestApplicationTLS
14
15
16class TestTLSSession(TestApplicationTLS):
17    prerequisites = {'modules': {'openssl': 'any'}}
18
19    @pytest.fixture(autouse=True)
20    def setup_method_fixture(self, request):
21        self.certificate()
22
23        assert 'success' in self.conf(
24            {
25                "listeners": {
26                    "*:7080": {
27                        "pass": "routes",
28                        "tls": {"certificate": "default", "session": {}},
29                    }
30                },
31                "routes": [{"action": {"return": 200}}],
32                "applications": {},
33            }
34        ), 'load application configuration'
35
36    def add_session(self, cache_size=None, timeout=None):
37        session = {}
38
39        if cache_size is not None:
40            session['cache_size'] = cache_size
41        if timeout is not None:
42            session['timeout'] = timeout
43
44        return self.conf(session, 'listeners/*:7080/tls/session')
45
46    def connect(self, ctx=None, session=None):
47        sock = socket.create_connection(('127.0.0.1', 7080))
48
49        if ctx is None:
50            ctx = Context(TLSv1_2_METHOD)
51            ctx.set_session_cache_mode(SESS_CACHE_CLIENT)
52            ctx.set_options(OP_NO_TICKET)
53
54        client = Connection(ctx, sock)
55        client.set_connect_state()
56
57        if session is not None:
58            client.set_session(session)
59
60        client.do_handshake()
61        client.shutdown()
62
63        return (
64            client,
65            client.get_session(),
66            ctx,
67            _lib.SSL_session_reused(client._ssl),
68        )
69
70    def test_tls_session(self):
71        client, sess, ctx, reused = self.connect()
72        assert not reused, 'new connection'
73
74        client, _, _, reused = self.connect(ctx, sess)
75        assert not reused, 'no cache'
76
77        assert 'success' in self.add_session(cache_size=1)
78
79        client, sess, ctx, reused = self.connect()
80        assert not reused, 'new connection cache'
81
82        client, _, _, reused = self.connect(ctx, sess)
83        assert reused, 'cache'
84
85        client, _, _, reused = self.connect(ctx, sess)
86        assert reused, 'cache 2'
87
88        # check that at least one session of two is not reused
89
90        client, sess, ctx, reused = self.connect()
91        client2, sess2, ctx2, reused2 = self.connect()
92        assert True not in [reused, reused2], 'new connection cache small'
93
94        client, _, _, reused = self.connect(ctx, sess)
95        client2, _, _, reused2 = self.connect(ctx2, sess2)
96        assert False in [reused, reused2], 'cache small'
97
98        # both sessions are reused
99
100        assert 'success' in self.add_session(cache_size=2)
101
102        client, sess, ctx, reused = self.connect()
103        client2, sess2, ctx2, reused2 = self.connect()
104        assert True not in [reused, reused2], 'new connection cache big'
105
106        client, _, _, reused = self.connect(ctx, sess)
107        client2, _, _, reused2 = self.connect(ctx2, sess2)
108        assert False not in [reused, reused2], 'cache big'
109
110    def test_tls_session_timeout(self):
111        assert 'success' in self.add_session(cache_size=1, timeout=1)
112
113        client, sess, ctx, reused = self.connect()
114        assert not reused, 'new connection'
115
116        client, _, _, reused = self.connect(ctx, sess)
117        assert reused, 'no timeout'
118
119        time.sleep(3)
120
121        client, _, _, reused = self.connect(ctx, sess)
122        assert not reused, 'timeout'
123
124    def test_tls_session_invalid(self):
125        assert 'error' in self.add_session(cache_size=-1)
126        assert 'error' in self.add_session(cache_size={})
127        assert 'error' in self.add_session(timeout=-1)
128        assert 'error' in self.add_session(timeout={})
129