1import socket 2import time 3 4import pytest 5from OpenSSL.SSL import ( 6 TLSv1_2_METHOD, 7 SESS_CACHE_CLIENT, 8 OP_NO_TICKET, 9 Context, 10 Connection, 11 _lib, 12) 13from unit.applications.tls import TestApplicationTLS 14from unit.option import option 15 16 17class TestTLSSession(TestApplicationTLS): 18 prerequisites = {'modules': {'openssl': 'any'}} 19 20 @pytest.fixture(autouse=True) 21 def setup_method_fixture(self, request): 22 self.certificate() 23 24 assert 'success' in self.conf( 25 { 26 "listeners": { 27 "*:7080": { 28 "pass": "routes", 29 "tls": {"certificate": "default", "session": {}}, 30 } 31 }, 32 "routes": [{"action": {"return": 200}}], 33 "applications": {}, 34 } 35 ), 'load application configuration' 36 37 def add_session(self, cache_size=None, timeout=None): 38 session = {} 39 40 if cache_size is not None: 41 session['cache_size'] = cache_size 42 if timeout is not None: 43 session['timeout'] = timeout 44 45 return self.conf(session, 'listeners/*:7080/tls/session') 46 47 def connect(self, ctx=None, session=None): 48 sock = socket.create_connection(('127.0.0.1', 7080)) 49 50 if ctx is None: 51 ctx = Context(TLSv1_2_METHOD) 52 ctx.set_session_cache_mode(SESS_CACHE_CLIENT) 53 ctx.set_options(OP_NO_TICKET) 54 55 client = Connection(ctx, sock) 56 client.set_connect_state() 57 58 if session is not None: 59 client.set_session(session) 60 61 client.do_handshake() 62 client.shutdown() 63 64 return ( 65 client, 66 client.get_session(), 67 ctx, 68 _lib.SSL_session_reused(client._ssl), 69 ) 70 71 def test_tls_session(self): 72 client, sess, ctx, reused = self.connect() 73 assert not reused, 'new connection' 74 75 client, _, _, reused = self.connect(ctx, sess) 76 assert not reused, 'no cache' 77 78 assert 'success' in self.add_session(cache_size=1) 79 80 client, sess, ctx, reused = self.connect() 81 assert not reused, 'new connection cache' 82 83 client, _, _, reused = self.connect(ctx, sess) 84 assert reused, 'cache' 85 86 client, _, _, reused = self.connect(ctx, sess) 87 assert reused, 'cache 2' 88 89 # check that at least one session of two is not reused 90 91 client, sess, ctx, reused = self.connect() 92 client2, sess2, ctx2, reused2 = self.connect() 93 assert True not in [reused, reused2], 'new connection cache small' 94 95 client, _, _, reused = self.connect(ctx, sess) 96 client2, _, _, reused2 = self.connect(ctx2, sess2) 97 assert False in [reused, reused2], 'cache small' 98 99 # both sessions are reused 100 101 assert 'success' in self.add_session(cache_size=2) 102 103 client, sess, ctx, reused = self.connect() 104 client2, sess2, ctx2, reused2 = self.connect() 105 assert True not in [reused, reused2], 'new connection cache big' 106 107 client, _, _, reused = self.connect(ctx, sess) 108 client2, _, _, reused2 = self.connect(ctx2, sess2) 109 assert False not in [reused, reused2], 'cache big' 110 111 def test_tls_session_timeout(self): 112 assert 'success' in self.add_session(cache_size=1, timeout=1) 113 114 client, sess, ctx, reused = self.connect() 115 assert not reused, 'new connection' 116 117 client, _, _, reused = self.connect(ctx, sess) 118 assert reused, 'no timeout' 119 120 time.sleep(3) 121 122 client, _, _, reused = self.connect(ctx, sess) 123 assert not reused, 'timeout' 124 125 def test_tls_session_invalid(self): 126 assert 'error' in self.add_session(cache_size=-1) 127 assert 'error' in self.add_session(cache_size={}) 128 assert 'error' in self.add_session(timeout=-1) 129 assert 'error' in self.add_session(timeout={}) 130