1import os 2from distutils.version import LooseVersion 3 4import pytest 5from conftest import unit_stop 6from unit.applications.lang.python import TestApplicationPython 7from unit.option import option 8 9 10class TestASGILifespan(TestApplicationPython): 11 prerequisites = { 12 'modules': {'python': lambda v: LooseVersion(v) >= LooseVersion('3.5')} 13 } 14 load_module = 'asgi' 15 16 def setup_cookies(self, prefix): 17 base_dir = option.test_dir + '/python/lifespan/empty' 18 19 os.chmod(base_dir, 0o777) 20 21 for name in ['startup', 'shutdown', 'version']: 22 path = option.test_dir + '/python/lifespan/empty/' + prefix + name 23 open(path, 'a').close() 24 os.chmod(path, 0o777) 25 26 def assert_cookies(self, prefix): 27 for name in ['startup', 'shutdown']: 28 path = option.test_dir + '/python/lifespan/empty/' + prefix + name 29 exists = os.path.isfile(path) 30 if exists: 31 os.remove(path) 32 33 assert not exists, name 34 35 path = option.test_dir + '/python/lifespan/empty/' + prefix + 'version' 36 37 with open(path, 'r') as f: 38 version = f.read() 39 40 os.remove(path) 41 42 assert version == '3.0 2.0', 'version' 43 44 def test_asgi_lifespan(self): 45 self.load('lifespan/empty') 46 47 self.setup_cookies('') 48 49 assert self.get()['status'] == 204 50 51 unit_stop() 52 53 self.assert_cookies('') 54 55 def test_asgi_lifespan_targets(self): 56 assert 'success' in self.conf( 57 { 58 "listeners": {"*:7080": {"pass": "routes"}}, 59 "routes": [ 60 { 61 "match": {"uri": "/1"}, 62 "action": {"pass": "applications/targets/1"}, 63 }, 64 { 65 "match": {"uri": "/2"}, 66 "action": {"pass": "applications/targets/2"}, 67 }, 68 ], 69 "applications": { 70 "targets": { 71 "type": "python", 72 "processes": {"spare": 0}, 73 "working_directory": option.test_dir 74 + "/python/lifespan/empty", 75 "path": option.test_dir + '/python/lifespan/empty', 76 "targets": { 77 "1": {"module": "asgi", "callable": "application"}, 78 "2": { 79 "module": "asgi", 80 "callable": "application2", 81 }, 82 }, 83 } 84 }, 85 } 86 ) 87 88 self.setup_cookies('') 89 self.setup_cookies('app2_') 90 91 assert self.get(url="/1")['status'] == 204 92 assert self.get(url="/2")['status'] == 204 93 94 unit_stop() 95 96 self.assert_cookies('') 97 self.assert_cookies('app2_') 98 99 def test_asgi_lifespan_failed(self): 100 self.load('lifespan/failed') 101 102 assert self.get()['status'] == 503 103 104 assert ( 105 self.wait_for_record(r'\[error\].*Application startup failed') 106 is not None 107 ), 'error message' 108 assert self.wait_for_record(r'Exception blah') is not None, 'exception' 109 110 def test_asgi_lifespan_error(self): 111 self.load('lifespan/error') 112 113 self.get() 114 115 assert self.wait_for_record(r'Exception blah') is not None, 'exception' 116 117 def test_asgi_lifespan_error_auto(self): 118 self.load('lifespan/error_auto') 119 120 self.get() 121 122 assert self.wait_for_record(r'AssertionError') is not None, 'assertion' 123