1import socket 2import time 3 4import pytest 5from OpenSSL.SSL import ( 6 TLSv1_2_METHOD, 7 SESS_CACHE_CLIENT, 8 OP_NO_TICKET, 9 Context, 10 Connection, 11 _lib, 12) 13from unit.applications.tls import TestApplicationTLS 14 15 16class TestTLSSession(TestApplicationTLS): 17 prerequisites = {'modules': {'openssl': 'any'}} 18 19 @pytest.fixture(autouse=True) 20 def setup_method_fixture(self, request): 21 self.certificate() 22 23 assert 'success' in self.conf( 24 { 25 "listeners": { 26 "*:7080": { 27 "pass": "routes", 28 "tls": {"certificate": "default", "session": {}}, 29 } 30 }, 31 "routes": [{"action": {"return": 200}}], 32 "applications": {}, 33 } 34 ), 'load application configuration' 35 36 def add_session(self, cache_size=None, timeout=None): 37 session = {} 38 39 if cache_size is not None: 40 session['cache_size'] = cache_size 41 if timeout is not None: 42 session['timeout'] = timeout 43 44 return self.conf(session, 'listeners/*:7080/tls/session') 45 46 def connect(self, ctx=None, session=None): 47 sock = socket.create_connection(('127.0.0.1', 7080)) 48 49 if ctx is None: 50 ctx = Context(TLSv1_2_METHOD) 51 ctx.set_session_cache_mode(SESS_CACHE_CLIENT) 52 ctx.set_options(OP_NO_TICKET) 53 54 client = Connection(ctx, sock) 55 client.set_connect_state() 56 57 if session is not None: 58 client.set_session(session) 59 60 client.do_handshake() 61 client.shutdown() 62 63 return ( 64 client, 65 client.get_session(), 66 ctx, 67 _lib.SSL_session_reused(client._ssl), 68 ) 69 70 def test_tls_session(self): 71 client, sess, ctx, reused = self.connect() 72 assert not reused, 'new connection' 73 74 client, _, _, reused = self.connect(ctx, sess) 75 assert not reused, 'no cache' 76 77 assert 'success' in self.add_session(cache_size=1) 78 79 client, sess, ctx, reused = self.connect() 80 assert not reused, 'new connection cache' 81 82 client, _, _, reused = self.connect(ctx, sess) 83 assert reused, 'cache' 84 85 client, _, _, reused = self.connect(ctx, sess) 86 assert reused, 'cache 2' 87 88 # check that at least one session of two is not reused 89 90 client, sess, ctx, reused = self.connect() 91 client2, sess2, ctx2, reused2 = self.connect() 92 assert True not in [reused, reused2], 'new connection cache small' 93 94 client, _, _, reused = self.connect(ctx, sess) 95 client2, _, _, reused2 = self.connect(ctx2, sess2) 96 assert False in [reused, reused2], 'cache small' 97 98 # both sessions are reused 99 100 assert 'success' in self.add_session(cache_size=2) 101 102 client, sess, ctx, reused = self.connect() 103 client2, sess2, ctx2, reused2 = self.connect() 104 assert True not in [reused, reused2], 'new connection cache big' 105 106 client, _, _, reused = self.connect(ctx, sess) 107 client2, _, _, reused2 = self.connect(ctx2, sess2) 108 assert False not in [reused, reused2], 'cache big' 109 110 def test_tls_session_timeout(self): 111 assert 'success' in self.add_session(cache_size=1, timeout=1) 112 113 client, sess, ctx, reused = self.connect() 114 assert not reused, 'new connection' 115 116 client, _, _, reused = self.connect(ctx, sess) 117 assert reused, 'no timeout' 118 119 time.sleep(3) 120 121 client, _, _, reused = self.connect(ctx, sess) 122 assert not reused, 'timeout' 123 124 def test_tls_session_invalid(self): 125 assert 'error' in self.add_session(cache_size=-1) 126 assert 'error' in self.add_session(cache_size={}) 127 assert 'error' in self.add_session(timeout=-1) 128 assert 'error' in self.add_session(timeout={}) 129