xref: /unit/test/test_tls_tickets.py (revision 1982:a1f2061dd02f)
1import socket
2
3import pytest
4from OpenSSL.SSL import (
5    TLSv1_2_METHOD,
6    Context,
7    Connection,
8    Session,
9    _lib,
10)
11from unit.applications.tls import TestApplicationTLS
12from unit.option import option
13
14
15class TestTLSTicket(TestApplicationTLS):
16    prerequisites = {'modules': {'openssl': 'any'}}
17
18    ticket = 'U1oDTh11mMxODuw12gS0EXX1E/PkZG13cJNQ6m5+6BGlfPTjNlIEw7PSVU3X1gTE'
19    ticket2 = (
20        '5AV0DSYIYbZWZQB7fCnTHZmMxtotb/aXjam+n2XS79lTvX3Tq9xGqpC8XKNEF2lt'
21    )
22    ticket80 = '6Pfil8lv/k8zf8MndPpfXaO5EAV6dhME6zs6CfUyq2yziynQwSywtKQMqHGnJ2HR\
2349TZXi/Y4/8RSIO7QPsU51/HLR1gWIMhVM2m9yh93Bw='
24
25    @pytest.fixture(autouse=True)
26    def setup_method_fixture(self, request):
27        self.certificate()
28
29        listener_conf = {
30            "pass": "routes",
31            "tls": {
32                "certificate": "default",
33                "session": {"cache_size": 0, "tickets": True},
34            },
35        }
36
37        assert 'success' in self.conf(
38            {
39                "listeners": {
40                    "*:7080": listener_conf,
41                    "*:7081": listener_conf,
42                    "*:7082": listener_conf,
43                },
44                "routes": [{"action": {"return": 200}}],
45                "applications": {},
46            }
47        ), 'load application configuration'
48
49    def set_tickets(self, tickets=True, port=7080):
50        assert 'success' in self.conf(
51            {"cache_size": 0, "tickets": tickets},
52            'listeners/*:' + str(port) + '/tls/session',
53        )
54
55    def connect(self, ctx=None, session=None, port=7080):
56        sock = socket.create_connection(('127.0.0.1', port))
57
58        if ctx is None:
59            ctx = Context(TLSv1_2_METHOD)
60
61        client = Connection(ctx, sock)
62        client.set_connect_state()
63
64        if session is not None:
65            client.set_session(session)
66
67        client.do_handshake()
68        client.shutdown()
69
70        return (
71            client.get_session(),
72            ctx,
73            _lib.SSL_session_reused(client._ssl),
74        )
75
76    def has_ticket(self, sess):
77        return _lib.SSL_SESSION_has_ticket(sess._session)
78
79    @pytest.mark.skipif(
80        not hasattr(_lib, 'SSL_SESSION_has_ticket'),
81        reason='ticket check is not supported',
82    )
83    def test_tls_ticket(self):
84        sess, ctx, reused = self.connect()
85        assert self.has_ticket(sess), 'tickets True'
86        assert not reused, 'tickets True not reused'
87
88        sess, ctx, reused = self.connect(ctx, sess)
89        assert self.has_ticket(sess), 'tickets True reconnect'
90        assert reused, 'tickets True reused'
91
92        self.set_tickets(tickets=False)
93
94        sess, _, _ = self.connect()
95        assert not self.has_ticket(sess), 'tickets False'
96
97        assert 'success' in self.conf_delete(
98            'listeners/*:7080/tls/session/tickets'
99        ), 'tickets default configure'
100
101        sess, _, _ = self.connect()
102        assert not self.has_ticket(sess), 'tickets default (false)'
103
104    @pytest.mark.skipif(
105        not hasattr(_lib, 'SSL_SESSION_has_ticket'),
106        reason='ticket check is not supported',
107    )
108    def test_tls_ticket_string(self):
109        self.set_tickets(self.ticket)
110        sess, ctx, _ = self.connect()
111        assert self.has_ticket(sess), 'tickets string'
112
113        sess2, _, reused = self.connect(ctx, sess)
114        assert self.has_ticket(sess2), 'tickets string reconnect'
115        assert reused, 'tickets string reused'
116
117        sess2, _, reused = self.connect(ctx, sess, port=7081)
118        assert self.has_ticket(sess2), 'connect True'
119        assert not reused, 'connect True not reused'
120
121        self.set_tickets(self.ticket2, port=7081)
122
123        sess2, _, reused = self.connect(ctx, sess, port=7081)
124        assert self.has_ticket(sess2), 'wrong ticket'
125        assert not reused, 'wrong ticket not reused'
126
127        self.set_tickets(self.ticket80)
128
129        sess, ctx, _ = self.connect()
130        assert self.has_ticket(sess), 'tickets string 80'
131
132        sess2, _, reused = self.connect(ctx, sess)
133        assert self.has_ticket(sess2), 'tickets string 80 reconnect'
134        assert reused, 'tickets string 80 reused'
135
136        sess2, _, reused = self.connect(ctx, sess, port=7081)
137        assert self.has_ticket(sess2), 'wrong ticket 80'
138        assert not reused, 'wrong ticket 80 not reused'
139
140    @pytest.mark.skipif(
141        not hasattr(_lib, 'SSL_SESSION_has_ticket'),
142        reason='ticket check is not supported',
143    )
144    def test_tls_ticket_array(self):
145        self.set_tickets([])
146
147        sess, ctx, _ = self.connect()
148        assert not self.has_ticket(sess), 'tickets array empty'
149
150        self.set_tickets([self.ticket, self.ticket2])
151        self.set_tickets(self.ticket, port=7081)
152        self.set_tickets(self.ticket2, port=7082)
153
154        sess, ctx, _ = self.connect()
155        _, _, reused = self.connect(ctx, sess, port=7081)
156        assert not reused, 'not last ticket'
157        _, _, reused = self.connect(ctx, sess, port=7082)
158        assert reused, 'last ticket'
159
160        sess, ctx, _ = self.connect(port=7081)
161        _, _, reused = self.connect(ctx, sess)
162        assert reused, 'first ticket'
163
164        sess, ctx, _ = self.connect(port=7082)
165        _, _, reused = self.connect(ctx, sess)
166        assert reused, 'second ticket'
167
168        assert 'success' in self.conf_delete(
169            'listeners/*:7080/tls/session/tickets/0'
170        ), 'removed first ticket'
171        assert 'success' in self.conf_post(
172            '"' + self.ticket + '"', 'listeners/*:7080/tls/session/tickets'
173        ), 'add new ticket to the end of array'
174
175        sess, ctx, _ = self.connect()
176        _, _, reused = self.connect(ctx, sess, port=7082)
177        assert not reused, 'not last ticket 2'
178        _, _, reused = self.connect(ctx, sess, port=7081)
179        assert reused, 'last ticket 2'
180
181    def test_tls_ticket_invalid(self):
182        def check_tickets(tickets):
183            assert 'error' in self.conf(
184                {"tickets": tickets}, 'listeners/*:7080/tls/session',
185            )
186
187        check_tickets({})
188        check_tickets('!?&^' * 16)
189        check_tickets(self.ticket[:-2] + '!' + self.ticket[3:])
190        check_tickets(self.ticket[:-1])
191        check_tickets(self.ticket + 'b')
192        check_tickets(self.ticket + 'blah')
193        check_tickets([True, self.ticket, self.ticket2])
194        check_tickets([self.ticket, 'blah', self.ticket2])
195        check_tickets([self.ticket, self.ticket2, []])
196