1import os 2import pytest 3from distutils.version import LooseVersion 4 5from unit.applications.lang.python import TestApplicationPython 6from conftest import option 7 8 9class TestASGILifespan(TestApplicationPython): 10 prerequisites = { 11 'modules': {'python': lambda v: LooseVersion(v) >= LooseVersion('3.5')} 12 } 13 load_module = 'asgi' 14 15 def test_asgi_lifespan(self): 16 self.load('lifespan/empty') 17 18 startup_path = option.test_dir + '/python/lifespan/empty/startup' 19 shutdown_path = option.test_dir + '/python/lifespan/empty/shutdown' 20 version_path = option.test_dir + '/python/lifespan/empty/version' 21 22 open(startup_path, 'a').close() 23 open(shutdown_path, 'a').close() 24 open(version_path, 'a').close() 25 26 assert self.get()['status'] == 204 27 28 self.stop() 29 30 is_startup = os.path.isfile(startup_path) 31 is_shutdown = os.path.isfile(shutdown_path) 32 33 if is_startup: 34 os.remove(startup_path) 35 36 if is_shutdown: 37 os.remove(shutdown_path) 38 39 with open(version_path, 'r') as f: 40 version = f.read() 41 42 os.remove(version_path) 43 44 assert not is_startup, 'startup' 45 assert not is_shutdown, 'shutdown' 46 assert version == '3.0 2.0', 'version' 47 48 def test_asgi_lifespan_failed(self): 49 self.load('lifespan/failed') 50 51 assert self.get()['status'] == 503 52 53 assert ( 54 self.wait_for_record(r'\[error\].*Application startup failed') 55 is not None 56 ), 'error message' 57 assert self.wait_for_record(r'Exception blah') is not None, 'exception' 58 59 def test_asgi_lifespan_error(self): 60 self.load('lifespan/error') 61 62 self.get() 63 64 assert self.wait_for_record(r'Exception blah') is not None, 'exception' 65 66 def test_asgi_lifespan_error_auto(self): 67 self.load('lifespan/error_auto') 68 69 self.get() 70 71 assert self.wait_for_record(r'AssertionError') is not None, 'assertion' 72