1import socket 2 3import pytest 4 5pytest.importorskip('OpenSSL.SSL') 6from OpenSSL.SSL import ( 7 TLSv1_2_METHOD, 8 Context, 9 Connection, 10 Session, 11 _lib, 12) 13from unit.applications.tls import TestApplicationTLS 14 15 16class TestTLSTicket(TestApplicationTLS): 17 prerequisites = {'modules': {'openssl': 'any'}} 18 19 ticket = 'U1oDTh11mMxODuw12gS0EXX1E/PkZG13cJNQ6m5+6BGlfPTjNlIEw7PSVU3X1gTE' 20 ticket2 = ( 21 '5AV0DSYIYbZWZQB7fCnTHZmMxtotb/aXjam+n2XS79lTvX3Tq9xGqpC8XKNEF2lt' 22 ) 23 ticket80 = '6Pfil8lv/k8zf8MndPpfXaO5EAV6dhME6zs6CfUyq2yziynQwSywtKQMqHGnJ2HR\ 2449TZXi/Y4/8RSIO7QPsU51/HLR1gWIMhVM2m9yh93Bw=' 25 26 @pytest.fixture(autouse=True) 27 def setup_method_fixture(self, request): 28 self.certificate() 29 30 listener_conf = { 31 "pass": "routes", 32 "tls": { 33 "certificate": "default", 34 "session": {"cache_size": 0, "tickets": True}, 35 }, 36 } 37 38 assert 'success' in self.conf( 39 { 40 "listeners": { 41 "*:7080": listener_conf, 42 "*:7081": listener_conf, 43 "*:7082": listener_conf, 44 }, 45 "routes": [{"action": {"return": 200}}], 46 "applications": {}, 47 } 48 ), 'load application configuration' 49 50 def set_tickets(self, tickets=True, port=7080): 51 assert 'success' in self.conf( 52 {"cache_size": 0, "tickets": tickets}, 53 'listeners/*:' + str(port) + '/tls/session', 54 ) 55 56 def connect(self, ctx=None, session=None, port=7080): 57 sock = socket.create_connection(('127.0.0.1', port)) 58 59 if ctx is None: 60 ctx = Context(TLSv1_2_METHOD) 61 62 client = Connection(ctx, sock) 63 client.set_connect_state() 64 65 if session is not None: 66 client.set_session(session) 67 68 client.do_handshake() 69 client.shutdown() 70 71 return ( 72 client.get_session(), 73 ctx, 74 _lib.SSL_session_reused(client._ssl), 75 ) 76 77 def has_ticket(self, sess): 78 return _lib.SSL_SESSION_has_ticket(sess._session) 79 80 @pytest.mark.skipif( 81 not hasattr(_lib, 'SSL_SESSION_has_ticket'), 82 reason='ticket check is not supported', 83 ) 84 def test_tls_ticket(self): 85 sess, ctx, reused = self.connect() 86 assert self.has_ticket(sess), 'tickets True' 87 assert not reused, 'tickets True not reused' 88 89 sess, ctx, reused = self.connect(ctx, sess) 90 assert self.has_ticket(sess), 'tickets True reconnect' 91 assert reused, 'tickets True reused' 92 93 self.set_tickets(tickets=False) 94 95 sess, _, _ = self.connect() 96 assert not self.has_ticket(sess), 'tickets False' 97 98 assert 'success' in self.conf_delete( 99 'listeners/*:7080/tls/session/tickets' 100 ), 'tickets default configure' 101 102 sess, _, _ = self.connect() 103 assert not self.has_ticket(sess), 'tickets default (false)' 104 105 @pytest.mark.skipif( 106 not hasattr(_lib, 'SSL_SESSION_has_ticket'), 107 reason='ticket check is not supported', 108 ) 109 def test_tls_ticket_string(self): 110 self.set_tickets(self.ticket) 111 sess, ctx, _ = self.connect() 112 assert self.has_ticket(sess), 'tickets string' 113 114 sess2, _, reused = self.connect(ctx, sess) 115 assert self.has_ticket(sess2), 'tickets string reconnect' 116 assert reused, 'tickets string reused' 117 118 sess2, _, reused = self.connect(ctx, sess, port=7081) 119 assert self.has_ticket(sess2), 'connect True' 120 assert not reused, 'connect True not reused' 121 122 self.set_tickets(self.ticket2, port=7081) 123 124 sess2, _, reused = self.connect(ctx, sess, port=7081) 125 assert self.has_ticket(sess2), 'wrong ticket' 126 assert not reused, 'wrong ticket not reused' 127 128 self.set_tickets(self.ticket80) 129 130 sess, ctx, _ = self.connect() 131 assert self.has_ticket(sess), 'tickets string 80' 132 133 sess2, _, reused = self.connect(ctx, sess) 134 assert self.has_ticket(sess2), 'tickets string 80 reconnect' 135 assert reused, 'tickets string 80 reused' 136 137 sess2, _, reused = self.connect(ctx, sess, port=7081) 138 assert self.has_ticket(sess2), 'wrong ticket 80' 139 assert not reused, 'wrong ticket 80 not reused' 140 141 @pytest.mark.skipif( 142 not hasattr(_lib, 'SSL_SESSION_has_ticket'), 143 reason='ticket check is not supported', 144 ) 145 def test_tls_ticket_array(self): 146 self.set_tickets([]) 147 148 sess, ctx, _ = self.connect() 149 assert not self.has_ticket(sess), 'tickets array empty' 150 151 self.set_tickets([self.ticket, self.ticket2]) 152 self.set_tickets(self.ticket, port=7081) 153 self.set_tickets(self.ticket2, port=7082) 154 155 sess, ctx, _ = self.connect() 156 _, _, reused = self.connect(ctx, sess, port=7081) 157 assert not reused, 'not last ticket' 158 _, _, reused = self.connect(ctx, sess, port=7082) 159 assert reused, 'last ticket' 160 161 sess, ctx, _ = self.connect(port=7081) 162 _, _, reused = self.connect(ctx, sess) 163 assert reused, 'first ticket' 164 165 sess, ctx, _ = self.connect(port=7082) 166 _, _, reused = self.connect(ctx, sess) 167 assert reused, 'second ticket' 168 169 assert 'success' in self.conf_delete( 170 'listeners/*:7080/tls/session/tickets/0' 171 ), 'removed first ticket' 172 assert 'success' in self.conf_post( 173 '"' + self.ticket + '"', 'listeners/*:7080/tls/session/tickets' 174 ), 'add new ticket to the end of array' 175 176 sess, ctx, _ = self.connect() 177 _, _, reused = self.connect(ctx, sess, port=7082) 178 assert not reused, 'not last ticket 2' 179 _, _, reused = self.connect(ctx, sess, port=7081) 180 assert reused, 'last ticket 2' 181 182 def test_tls_ticket_invalid(self): 183 def check_tickets(tickets): 184 assert 'error' in self.conf( 185 {"tickets": tickets}, 'listeners/*:7080/tls/session', 186 ) 187 188 check_tickets({}) 189 check_tickets('!?&^' * 16) 190 check_tickets(self.ticket[:-2] + '!' + self.ticket[3:]) 191 check_tickets(self.ticket[:-1]) 192 check_tickets(self.ticket + 'b') 193 check_tickets(self.ticket + 'blah') 194 check_tickets([True, self.ticket, self.ticket2]) 195 check_tickets([self.ticket, 'blah', self.ticket2]) 196 check_tickets([self.ticket, self.ticket2, []]) 197