xref: /unit/test/test_tls_session.py (revision 2005:28c7f6ff3832)
1import socket
2import time
3
4import pytest
5
6pytest.importorskip('OpenSSL.SSL')
7from OpenSSL.SSL import (
8    TLSv1_2_METHOD,
9    SESS_CACHE_CLIENT,
10    OP_NO_TICKET,
11    Context,
12    Connection,
13    _lib,
14)
15from unit.applications.tls import TestApplicationTLS
16
17
18class TestTLSSession(TestApplicationTLS):
19    prerequisites = {'modules': {'openssl': 'any'}}
20
21    @pytest.fixture(autouse=True)
22    def setup_method_fixture(self, request):
23        self.certificate()
24
25        assert 'success' in self.conf(
26            {
27                "listeners": {
28                    "*:7080": {
29                        "pass": "routes",
30                        "tls": {"certificate": "default", "session": {}},
31                    }
32                },
33                "routes": [{"action": {"return": 200}}],
34                "applications": {},
35            }
36        ), 'load application configuration'
37
38    def add_session(self, cache_size=None, timeout=None):
39        session = {}
40
41        if cache_size is not None:
42            session['cache_size'] = cache_size
43        if timeout is not None:
44            session['timeout'] = timeout
45
46        return self.conf(session, 'listeners/*:7080/tls/session')
47
48    def connect(self, ctx=None, session=None):
49        sock = socket.create_connection(('127.0.0.1', 7080))
50
51        if ctx is None:
52            ctx = Context(TLSv1_2_METHOD)
53            ctx.set_session_cache_mode(SESS_CACHE_CLIENT)
54            ctx.set_options(OP_NO_TICKET)
55
56        client = Connection(ctx, sock)
57        client.set_connect_state()
58
59        if session is not None:
60            client.set_session(session)
61
62        client.do_handshake()
63        client.shutdown()
64
65        return (
66            client,
67            client.get_session(),
68            ctx,
69            _lib.SSL_session_reused(client._ssl),
70        )
71
72    def test_tls_session(self):
73        client, sess, ctx, reused = self.connect()
74        assert not reused, 'new connection'
75
76        client, _, _, reused = self.connect(ctx, sess)
77        assert not reused, 'no cache'
78
79        assert 'success' in self.add_session(cache_size=2)
80
81        client, sess, ctx, reused = self.connect()
82        assert not reused, 'new connection cache'
83
84        client, _, _, reused = self.connect(ctx, sess)
85        assert reused, 'cache'
86
87        client, _, _, reused = self.connect(ctx, sess)
88        assert reused, 'cache 2'
89
90        # check that at least one session of four is not reused
91
92        clients = [self.connect() for _ in range(4)]
93        assert True not in [c[-1] for c in clients], 'cache small all new'
94
95        clients_again = [self.connect(c[2], c[1]) for c in clients]
96        assert False in [c[-1] for c in clients_again], 'cache small no reuse'
97
98        # all four sessions are reused
99
100        assert 'success' in self.add_session(cache_size=8)
101
102        clients = [self.connect() for _ in range(4)]
103        assert True not in [c[-1] for c in clients], 'cache big all new'
104
105        clients_again = [self.connect(c[2], c[1]) for c in clients]
106        assert False not in [c[-1] for c in clients_again], 'cache big reuse'
107
108    def test_tls_session_timeout(self):
109        assert 'success' in self.add_session(cache_size=5, timeout=1)
110
111        client, sess, ctx, reused = self.connect()
112        assert not reused, 'new connection'
113
114        client, _, _, reused = self.connect(ctx, sess)
115        assert reused, 'no timeout'
116
117        time.sleep(3)
118
119        client, _, _, reused = self.connect(ctx, sess)
120        assert not reused, 'timeout'
121
122    def test_tls_session_invalid(self):
123        assert 'error' in self.add_session(cache_size=-1)
124        assert 'error' in self.add_session(cache_size={})
125        assert 'error' in self.add_session(timeout=-1)
126        assert 'error' in self.add_session(timeout={})
127