1import socket 2import time 3 4import pytest 5 6pytest.importorskip('OpenSSL.SSL') 7from OpenSSL.SSL import ( 8 TLSv1_2_METHOD, 9 SESS_CACHE_CLIENT, 10 OP_NO_TICKET, 11 Context, 12 Connection, 13 _lib, 14) 15from unit.applications.tls import TestApplicationTLS 16 17 18class TestTLSSession(TestApplicationTLS): 19 prerequisites = {'modules': {'openssl': 'any'}} 20 21 @pytest.fixture(autouse=True) 22 def setup_method_fixture(self, request): 23 self.certificate() 24 25 assert 'success' in self.conf( 26 { 27 "listeners": { 28 "*:7080": { 29 "pass": "routes", 30 "tls": {"certificate": "default", "session": {}}, 31 } 32 }, 33 "routes": [{"action": {"return": 200}}], 34 "applications": {}, 35 } 36 ), 'load application configuration' 37 38 def add_session(self, cache_size=None, timeout=None): 39 session = {} 40 41 if cache_size is not None: 42 session['cache_size'] = cache_size 43 if timeout is not None: 44 session['timeout'] = timeout 45 46 return self.conf(session, 'listeners/*:7080/tls/session') 47 48 def connect(self, ctx=None, session=None): 49 sock = socket.create_connection(('127.0.0.1', 7080)) 50 51 if ctx is None: 52 ctx = Context(TLSv1_2_METHOD) 53 ctx.set_session_cache_mode(SESS_CACHE_CLIENT) 54 ctx.set_options(OP_NO_TICKET) 55 56 client = Connection(ctx, sock) 57 client.set_connect_state() 58 59 if session is not None: 60 client.set_session(session) 61 62 client.do_handshake() 63 client.shutdown() 64 65 return ( 66 client, 67 client.get_session(), 68 ctx, 69 _lib.SSL_session_reused(client._ssl), 70 ) 71 72 def test_tls_session(self): 73 client, sess, ctx, reused = self.connect() 74 assert not reused, 'new connection' 75 76 client, _, _, reused = self.connect(ctx, sess) 77 assert not reused, 'no cache' 78 79 assert 'success' in self.add_session(cache_size=2) 80 81 client, sess, ctx, reused = self.connect() 82 assert not reused, 'new connection cache' 83 84 client, _, _, reused = self.connect(ctx, sess) 85 assert reused, 'cache' 86 87 client, _, _, reused = self.connect(ctx, sess) 88 assert reused, 'cache 2' 89 90 # check that at least one session of four is not reused 91 92 clients = [self.connect() for _ in range(4)] 93 assert True not in [c[-1] for c in clients], 'cache small all new' 94 95 clients_again = [self.connect(c[2], c[1]) for c in clients] 96 assert False in [c[-1] for c in clients_again], 'cache small no reuse' 97 98 # all four sessions are reused 99 100 assert 'success' in self.add_session(cache_size=8) 101 102 clients = [self.connect() for _ in range(4)] 103 assert True not in [c[-1] for c in clients], 'cache big all new' 104 105 clients_again = [self.connect(c[2], c[1]) for c in clients] 106 assert False not in [c[-1] for c in clients_again], 'cache big reuse' 107 108 def test_tls_session_timeout(self): 109 assert 'success' in self.add_session(cache_size=5, timeout=1) 110 111 client, sess, ctx, reused = self.connect() 112 assert not reused, 'new connection' 113 114 client, _, _, reused = self.connect(ctx, sess) 115 assert reused, 'no timeout' 116 117 time.sleep(3) 118 119 client, _, _, reused = self.connect(ctx, sess) 120 assert not reused, 'timeout' 121 122 def test_tls_session_invalid(self): 123 assert 'error' in self.add_session(cache_size=-1) 124 assert 'error' in self.add_session(cache_size={}) 125 assert 'error' in self.add_session(timeout=-1) 126 assert 'error' in self.add_session(timeout={}) 127