1import socket 2 3import pytest 4from OpenSSL.SSL import ( 5 TLSv1_2_METHOD, 6 Context, 7 Connection, 8 Session, 9 _lib, 10) 11from unit.applications.tls import TestApplicationTLS 12from unit.option import option 13 14 15class TestTLSTicket(TestApplicationTLS): 16 prerequisites = {'modules': {'openssl': 'any'}} 17 18 ticket = 'U1oDTh11mMxODuw12gS0EXX1E/PkZG13cJNQ6m5+6BGlfPTjNlIEw7PSVU3X1gTE' 19 ticket2 = ( 20 '5AV0DSYIYbZWZQB7fCnTHZmMxtotb/aXjam+n2XS79lTvX3Tq9xGqpC8XKNEF2lt' 21 ) 22 ticket80 = '6Pfil8lv/k8zf8MndPpfXaO5EAV6dhME6zs6CfUyq2yziynQwSywtKQMqHGnJ2HR\ 2349TZXi/Y4/8RSIO7QPsU51/HLR1gWIMhVM2m9yh93Bw=' 24 25 @pytest.fixture(autouse=True) 26 def setup_method_fixture(self, request): 27 self.certificate() 28 29 listener_conf = { 30 "pass": "routes", 31 "tls": { 32 "certificate": "default", 33 "session": {"cache_size": 0, "tickets": True}, 34 }, 35 } 36 37 assert 'success' in self.conf( 38 { 39 "listeners": { 40 "*:7080": listener_conf, 41 "*:7081": listener_conf, 42 "*:7082": listener_conf, 43 }, 44 "routes": [{"action": {"return": 200}}], 45 "applications": {}, 46 } 47 ), 'load application configuration' 48 49 def set_tickets(self, tickets=True, port=7080): 50 assert 'success' in self.conf( 51 {"cache_size": 0, "tickets": tickets}, 52 'listeners/*:' + str(port) + '/tls/session', 53 ) 54 55 def connect(self, ctx=None, session=None, port=7080): 56 sock = socket.create_connection(('127.0.0.1', port)) 57 58 if ctx is None: 59 ctx = Context(TLSv1_2_METHOD) 60 61 client = Connection(ctx, sock) 62 client.set_connect_state() 63 64 if session is not None: 65 client.set_session(session) 66 67 client.do_handshake() 68 client.shutdown() 69 70 return ( 71 client.get_session(), 72 ctx, 73 _lib.SSL_session_reused(client._ssl), 74 ) 75 76 def has_ticket(self, sess): 77 return _lib.SSL_SESSION_has_ticket(sess._session) 78 79 @pytest.mark.skipif( 80 not hasattr(_lib, 'SSL_SESSION_has_ticket'), 81 reason='ticket check is not supported', 82 ) 83 def test_tls_ticket(self): 84 sess, ctx, reused = self.connect() 85 assert self.has_ticket(sess), 'tickets True' 86 assert not reused, 'tickets True not reused' 87 88 sess, ctx, reused = self.connect(ctx, sess) 89 assert self.has_ticket(sess), 'tickets True reconnect' 90 assert reused, 'tickets True reused' 91 92 self.set_tickets(tickets=False) 93 94 sess, _, _ = self.connect() 95 assert not self.has_ticket(sess), 'tickets False' 96 97 assert 'success' in self.conf_delete( 98 'listeners/*:7080/tls/session/tickets' 99 ), 'tickets default configure' 100 101 sess, _, _ = self.connect() 102 assert not self.has_ticket(sess), 'tickets default (false)' 103 104 @pytest.mark.skipif( 105 not hasattr(_lib, 'SSL_SESSION_has_ticket'), 106 reason='ticket check is not supported', 107 ) 108 def test_tls_ticket_string(self): 109 self.set_tickets(self.ticket) 110 sess, ctx, _ = self.connect() 111 assert self.has_ticket(sess), 'tickets string' 112 113 sess2, _, reused = self.connect(ctx, sess) 114 assert self.has_ticket(sess2), 'tickets string reconnect' 115 assert reused, 'tickets string reused' 116 117 sess2, _, reused = self.connect(ctx, sess, port=7081) 118 assert self.has_ticket(sess2), 'connect True' 119 assert not reused, 'connect True not reused' 120 121 self.set_tickets(self.ticket2, port=7081) 122 123 sess2, _, reused = self.connect(ctx, sess, port=7081) 124 assert self.has_ticket(sess2), 'wrong ticket' 125 assert not reused, 'wrong ticket not reused' 126 127 self.set_tickets(self.ticket80) 128 129 sess, ctx, _ = self.connect() 130 assert self.has_ticket(sess), 'tickets string 80' 131 132 sess2, _, reused = self.connect(ctx, sess) 133 assert self.has_ticket(sess2), 'tickets string 80 reconnect' 134 assert reused, 'tickets string 80 reused' 135 136 sess2, _, reused = self.connect(ctx, sess, port=7081) 137 assert self.has_ticket(sess2), 'wrong ticket 80' 138 assert not reused, 'wrong ticket 80 not reused' 139 140 @pytest.mark.skipif( 141 not hasattr(_lib, 'SSL_SESSION_has_ticket'), 142 reason='ticket check is not supported', 143 ) 144 def test_tls_ticket_array(self): 145 self.set_tickets([]) 146 147 sess, ctx, _ = self.connect() 148 assert not self.has_ticket(sess), 'tickets array empty' 149 150 self.set_tickets([self.ticket, self.ticket2]) 151 self.set_tickets(self.ticket, port=7081) 152 self.set_tickets(self.ticket2, port=7082) 153 154 sess, ctx, _ = self.connect() 155 _, _, reused = self.connect(ctx, sess, port=7081) 156 assert not reused, 'not last ticket' 157 _, _, reused = self.connect(ctx, sess, port=7082) 158 assert reused, 'last ticket' 159 160 sess, ctx, _ = self.connect(port=7081) 161 _, _, reused = self.connect(ctx, sess) 162 assert reused, 'first ticket' 163 164 sess, ctx, _ = self.connect(port=7082) 165 _, _, reused = self.connect(ctx, sess) 166 assert reused, 'second ticket' 167 168 assert 'success' in self.conf_delete( 169 'listeners/*:7080/tls/session/tickets/0' 170 ), 'removed first ticket' 171 assert 'success' in self.conf_post( 172 '"' + self.ticket + '"', 'listeners/*:7080/tls/session/tickets' 173 ), 'add new ticket to the end of array' 174 175 sess, ctx, _ = self.connect() 176 _, _, reused = self.connect(ctx, sess, port=7082) 177 assert not reused, 'not last ticket 2' 178 _, _, reused = self.connect(ctx, sess, port=7081) 179 assert reused, 'last ticket 2' 180 181 def test_tls_ticket_invalid(self): 182 def check_tickets(tickets): 183 assert 'error' in self.conf( 184 {"tickets": tickets}, 'listeners/*:7080/tls/session', 185 ) 186 187 check_tickets({}) 188 check_tickets('!?&^' * 16) 189 check_tickets(self.ticket[:-2] + '!' + self.ticket[3:]) 190 check_tickets(self.ticket[:-1]) 191 check_tickets(self.ticket + 'b') 192 check_tickets(self.ticket + 'blah') 193 check_tickets([True, self.ticket, self.ticket2]) 194 check_tickets([self.ticket, 'blah', self.ticket2]) 195 check_tickets([self.ticket, self.ticket2, []]) 196