1import socket 2 3import pytest 4from OpenSSL.SSL import ( 5 TLSv1_2_METHOD, 6 Context, 7 Connection, 8 Session, 9 _lib, 10) 11from unit.applications.tls import TestApplicationTLS 12 13 14class TestTLSTicket(TestApplicationTLS): 15 prerequisites = {'modules': {'openssl': 'any'}} 16 17 ticket = 'U1oDTh11mMxODuw12gS0EXX1E/PkZG13cJNQ6m5+6BGlfPTjNlIEw7PSVU3X1gTE' 18 ticket2 = ( 19 '5AV0DSYIYbZWZQB7fCnTHZmMxtotb/aXjam+n2XS79lTvX3Tq9xGqpC8XKNEF2lt' 20 ) 21 ticket80 = '6Pfil8lv/k8zf8MndPpfXaO5EAV6dhME6zs6CfUyq2yziynQwSywtKQMqHGnJ2HR\ 2249TZXi/Y4/8RSIO7QPsU51/HLR1gWIMhVM2m9yh93Bw=' 23 24 @pytest.fixture(autouse=True) 25 def setup_method_fixture(self, request): 26 self.certificate() 27 28 listener_conf = { 29 "pass": "routes", 30 "tls": { 31 "certificate": "default", 32 "session": {"cache_size": 0, "tickets": True}, 33 }, 34 } 35 36 assert 'success' in self.conf( 37 { 38 "listeners": { 39 "*:7080": listener_conf, 40 "*:7081": listener_conf, 41 "*:7082": listener_conf, 42 }, 43 "routes": [{"action": {"return": 200}}], 44 "applications": {}, 45 } 46 ), 'load application configuration' 47 48 def set_tickets(self, tickets=True, port=7080): 49 assert 'success' in self.conf( 50 {"cache_size": 0, "tickets": tickets}, 51 'listeners/*:' + str(port) + '/tls/session', 52 ) 53 54 def connect(self, ctx=None, session=None, port=7080): 55 sock = socket.create_connection(('127.0.0.1', port)) 56 57 if ctx is None: 58 ctx = Context(TLSv1_2_METHOD) 59 60 client = Connection(ctx, sock) 61 client.set_connect_state() 62 63 if session is not None: 64 client.set_session(session) 65 66 client.do_handshake() 67 client.shutdown() 68 69 return ( 70 client.get_session(), 71 ctx, 72 _lib.SSL_session_reused(client._ssl), 73 ) 74 75 def has_ticket(self, sess): 76 return _lib.SSL_SESSION_has_ticket(sess._session) 77 78 @pytest.mark.skipif( 79 not hasattr(_lib, 'SSL_SESSION_has_ticket'), 80 reason='ticket check is not supported', 81 ) 82 def test_tls_ticket(self): 83 sess, ctx, reused = self.connect() 84 assert self.has_ticket(sess), 'tickets True' 85 assert not reused, 'tickets True not reused' 86 87 sess, ctx, reused = self.connect(ctx, sess) 88 assert self.has_ticket(sess), 'tickets True reconnect' 89 assert reused, 'tickets True reused' 90 91 self.set_tickets(tickets=False) 92 93 sess, _, _ = self.connect() 94 assert not self.has_ticket(sess), 'tickets False' 95 96 assert 'success' in self.conf_delete( 97 'listeners/*:7080/tls/session/tickets' 98 ), 'tickets default configure' 99 100 sess, _, _ = self.connect() 101 assert not self.has_ticket(sess), 'tickets default (false)' 102 103 @pytest.mark.skipif( 104 not hasattr(_lib, 'SSL_SESSION_has_ticket'), 105 reason='ticket check is not supported', 106 ) 107 def test_tls_ticket_string(self): 108 self.set_tickets(self.ticket) 109 sess, ctx, _ = self.connect() 110 assert self.has_ticket(sess), 'tickets string' 111 112 sess2, _, reused = self.connect(ctx, sess) 113 assert self.has_ticket(sess2), 'tickets string reconnect' 114 assert reused, 'tickets string reused' 115 116 sess2, _, reused = self.connect(ctx, sess, port=7081) 117 assert self.has_ticket(sess2), 'connect True' 118 assert not reused, 'connect True not reused' 119 120 self.set_tickets(self.ticket2, port=7081) 121 122 sess2, _, reused = self.connect(ctx, sess, port=7081) 123 assert self.has_ticket(sess2), 'wrong ticket' 124 assert not reused, 'wrong ticket not reused' 125 126 self.set_tickets(self.ticket80) 127 128 sess, ctx, _ = self.connect() 129 assert self.has_ticket(sess), 'tickets string 80' 130 131 sess2, _, reused = self.connect(ctx, sess) 132 assert self.has_ticket(sess2), 'tickets string 80 reconnect' 133 assert reused, 'tickets string 80 reused' 134 135 sess2, _, reused = self.connect(ctx, sess, port=7081) 136 assert self.has_ticket(sess2), 'wrong ticket 80' 137 assert not reused, 'wrong ticket 80 not reused' 138 139 @pytest.mark.skipif( 140 not hasattr(_lib, 'SSL_SESSION_has_ticket'), 141 reason='ticket check is not supported', 142 ) 143 def test_tls_ticket_array(self): 144 self.set_tickets([]) 145 146 sess, ctx, _ = self.connect() 147 assert not self.has_ticket(sess), 'tickets array empty' 148 149 self.set_tickets([self.ticket, self.ticket2]) 150 self.set_tickets(self.ticket, port=7081) 151 self.set_tickets(self.ticket2, port=7082) 152 153 sess, ctx, _ = self.connect() 154 _, _, reused = self.connect(ctx, sess, port=7081) 155 assert not reused, 'not last ticket' 156 _, _, reused = self.connect(ctx, sess, port=7082) 157 assert reused, 'last ticket' 158 159 sess, ctx, _ = self.connect(port=7081) 160 _, _, reused = self.connect(ctx, sess) 161 assert reused, 'first ticket' 162 163 sess, ctx, _ = self.connect(port=7082) 164 _, _, reused = self.connect(ctx, sess) 165 assert reused, 'second ticket' 166 167 assert 'success' in self.conf_delete( 168 'listeners/*:7080/tls/session/tickets/0' 169 ), 'removed first ticket' 170 assert 'success' in self.conf_post( 171 '"' + self.ticket + '"', 'listeners/*:7080/tls/session/tickets' 172 ), 'add new ticket to the end of array' 173 174 sess, ctx, _ = self.connect() 175 _, _, reused = self.connect(ctx, sess, port=7082) 176 assert not reused, 'not last ticket 2' 177 _, _, reused = self.connect(ctx, sess, port=7081) 178 assert reused, 'last ticket 2' 179 180 def test_tls_ticket_invalid(self): 181 def check_tickets(tickets): 182 assert 'error' in self.conf( 183 {"tickets": tickets}, 'listeners/*:7080/tls/session', 184 ) 185 186 check_tickets({}) 187 check_tickets('!?&^' * 16) 188 check_tickets(self.ticket[:-2] + '!' + self.ticket[3:]) 189 check_tickets(self.ticket[:-1]) 190 check_tickets(self.ticket + 'b') 191 check_tickets(self.ticket + 'blah') 192 check_tickets([True, self.ticket, self.ticket2]) 193 check_tickets([self.ticket, 'blah', self.ticket2]) 194 check_tickets([self.ticket, self.ticket2, []]) 195