1import socket 2 3import pytest 4 5pytest.importorskip('OpenSSL.SSL') 6from OpenSSL.SSL import ( 7 TLSv1_2_METHOD, 8 Context, 9 Connection, 10 _lib, 11) 12from unit.applications.tls import ApplicationTLS 13 14prerequisites = {'modules': {'openssl': 'any'}} 15 16client = ApplicationTLS() 17 18TICKET = 'U1oDTh11mMxODuw12gS0EXX1E/PkZG13cJNQ6m5+6BGlfPTjNlIEw7PSVU3X1gTE' 19TICKET2 = '5AV0DSYIYbZWZQB7fCnTHZmMxtotb/aXjam+n2XS79lTvX3Tq9xGqpC8XKNEF2lt' 20TICKET80 = '6Pfil8lv/k8zf8MndPpfXaO5EAV6dhME6zs6CfUyq2yziynQwSywtKQMqHGnJ2HR\ 2149TZXi/Y4/8RSIO7QPsU51/HLR1gWIMhVM2m9yh93Bw=' 22 23 24@pytest.fixture(autouse=True) 25def setup_method_fixture(): 26 client.certificate() 27 28 listener_conf = { 29 "pass": "routes", 30 "tls": { 31 "certificate": "default", 32 "session": {"cache_size": 0, "tickets": True}, 33 }, 34 } 35 36 assert 'success' in client.conf( 37 { 38 "listeners": { 39 "*:8080": listener_conf, 40 "*:8081": listener_conf, 41 "*:8082": listener_conf, 42 }, 43 "routes": [{"action": {"return": 200}}], 44 "applications": {}, 45 } 46 ), 'load application configuration' 47 48 49def connect(ctx=None, session=None, port=8080): 50 sock = socket.create_connection(('127.0.0.1', port)) 51 52 if ctx is None: 53 ctx = Context(TLSv1_2_METHOD) 54 55 conn = Connection(ctx, sock) 56 conn.set_connect_state() 57 58 if session is not None: 59 conn.set_session(session) 60 61 conn.do_handshake() 62 conn.shutdown() 63 64 return ( 65 conn.get_session(), 66 ctx, 67 _lib.SSL_session_reused(conn._ssl), 68 ) 69 70 71def has_ticket(sess): 72 return _lib.SSL_SESSION_has_ticket(sess._session) 73 74 75def set_tickets(tickets=True, port=8080): 76 assert 'success' in client.conf( 77 {"cache_size": 0, "tickets": tickets}, 78 f'listeners/*:{port}/tls/session', 79 ) 80 81 82@pytest.mark.skipif( 83 not hasattr(_lib, 'SSL_SESSION_has_ticket'), 84 reason='ticket check is not supported', 85) 86def test_tls_ticket(): 87 sess, ctx, reused = connect() 88 assert has_ticket(sess), 'tickets True' 89 assert not reused, 'tickets True not reused' 90 91 sess, ctx, reused = connect(ctx, sess) 92 assert has_ticket(sess), 'tickets True reconnect' 93 assert reused, 'tickets True reused' 94 95 set_tickets(tickets=False) 96 97 sess, _, _ = connect() 98 assert not has_ticket(sess), 'tickets False' 99 100 assert 'success' in client.conf_delete( 101 'listeners/*:8080/tls/session/tickets' 102 ), 'tickets default configure' 103 104 sess, _, _ = connect() 105 assert not has_ticket(sess), 'tickets default (false)' 106 107 108@pytest.mark.skipif( 109 not hasattr(_lib, 'SSL_SESSION_has_ticket'), 110 reason='ticket check is not supported', 111) 112def test_tls_ticket_string(): 113 set_tickets(TICKET) 114 sess, ctx, _ = connect() 115 assert has_ticket(sess), 'tickets string' 116 117 sess2, _, reused = connect(ctx, sess) 118 assert has_ticket(sess2), 'tickets string reconnect' 119 assert reused, 'tickets string reused' 120 121 sess2, _, reused = connect(ctx, sess, port=8081) 122 assert has_ticket(sess2), 'connect True' 123 assert not reused, 'connect True not reused' 124 125 set_tickets(TICKET2, port=8081) 126 127 sess2, _, reused = connect(ctx, sess, port=8081) 128 assert has_ticket(sess2), 'wrong ticket' 129 assert not reused, 'wrong ticket not reused' 130 131 set_tickets(TICKET80) 132 133 sess, ctx, _ = connect() 134 assert has_ticket(sess), 'tickets string 80' 135 136 sess2, _, reused = connect(ctx, sess) 137 assert has_ticket(sess2), 'tickets string 80 reconnect' 138 assert reused, 'tickets string 80 reused' 139 140 sess2, _, reused = connect(ctx, sess, port=8081) 141 assert has_ticket(sess2), 'wrong ticket 80' 142 assert not reused, 'wrong ticket 80 not reused' 143 144 145@pytest.mark.skipif( 146 not hasattr(_lib, 'SSL_SESSION_has_ticket'), 147 reason='ticket check is not supported', 148) 149def test_tls_ticket_array(): 150 set_tickets([]) 151 152 sess, ctx, _ = connect() 153 assert not has_ticket(sess), 'tickets array empty' 154 155 set_tickets([TICKET, TICKET2]) 156 set_tickets(TICKET, port=8081) 157 set_tickets(TICKET2, port=8082) 158 159 sess, ctx, _ = connect() 160 _, _, reused = connect(ctx, sess, port=8081) 161 assert not reused, 'not last ticket' 162 _, _, reused = connect(ctx, sess, port=8082) 163 assert reused, 'last ticket' 164 165 sess, ctx, _ = connect(port=8081) 166 _, _, reused = connect(ctx, sess) 167 assert reused, 'first ticket' 168 169 sess, ctx, _ = connect(port=8082) 170 _, _, reused = connect(ctx, sess) 171 assert reused, 'second ticket' 172 173 assert 'success' in client.conf_delete( 174 'listeners/*:8080/tls/session/tickets/0' 175 ), 'removed first ticket' 176 assert 'success' in client.conf_post( 177 f'"{TICKET}"', 'listeners/*:8080/tls/session/tickets' 178 ), 'add new ticket to the end of array' 179 180 sess, ctx, _ = connect() 181 _, _, reused = connect(ctx, sess, port=8082) 182 assert not reused, 'not last ticket 2' 183 _, _, reused = connect(ctx, sess, port=8081) 184 assert reused, 'last ticket 2' 185 186 187def test_tls_ticket_invalid(): 188 def check_tickets(tickets): 189 assert 'error' in client.conf( 190 {"tickets": tickets}, 191 'listeners/*:8080/tls/session', 192 ) 193 194 check_tickets({}) 195 check_tickets('!?&^' * 16) 196 check_tickets(f'{TICKET[:-2]}!{TICKET[3:]}') 197 check_tickets(TICKET[:-1]) 198 check_tickets(f'{TICKET}b') 199 check_tickets(f'{TICKET}blah') 200 check_tickets([True, TICKET, TICKET2]) 201 check_tickets([TICKET, 'blah', TICKET2]) 202 check_tickets([TICKET, TICKET2, []]) 203