1from pathlib import Path 2 3from packaging import version 4 5from conftest import unit_stop 6from unit.applications.lang.python import ApplicationPython 7from unit.option import option 8 9prerequisites = { 10 'modules': {'python': lambda v: version.parse(v) >= version.parse('3.5')} 11} 12 13client = ApplicationPython(load_module='asgi') 14 15 16def assert_cookies(prefix): 17 for name in ['startup', 'shutdown']: 18 path = Path(f'{option.test_dir}/python/lifespan/empty/{prefix}{name}') 19 exists = path.is_file() 20 path.unlink(missing_ok=True) 21 22 assert not exists, name 23 24 path = Path(f'{option.test_dir}/python/lifespan/empty/{prefix}version') 25 versions = path.read_text(encoding='utf-8') 26 path.unlink() 27 28 assert versions == '3.0 2.0', 'versions' 29 30 31def setup_cookies(prefix): 32 base_dir = Path(f'{option.test_dir}/python/lifespan/empty') 33 base_dir.chmod(0o777) 34 35 for name in ['startup', 'shutdown', 'version']: 36 path = Path(f'{option.test_dir}/python/lifespan/empty/{prefix}{name}') 37 path.touch(0o777) 38 39 40def test_asgi_lifespan(): 41 client.load('lifespan/empty') 42 43 setup_cookies('') 44 45 assert client.get()['status'] == 204 46 47 unit_stop() 48 49 assert_cookies('') 50 51 52def test_asgi_lifespan_targets(): 53 path = f'{option.test_dir}/python/lifespan/empty' 54 55 assert 'success' in client.conf( 56 { 57 "listeners": {"*:8080": {"pass": "routes"}}, 58 "routes": [ 59 { 60 "match": {"uri": "/1"}, 61 "action": {"pass": "applications/targets/1"}, 62 }, 63 { 64 "match": {"uri": "/2"}, 65 "action": {"pass": "applications/targets/2"}, 66 }, 67 ], 68 "applications": { 69 "targets": { 70 "type": client.get_application_type(), 71 "processes": {"spare": 0}, 72 "working_directory": path, 73 "path": path, 74 "targets": { 75 "1": {"module": "asgi", "callable": "application"}, 76 "2": { 77 "module": "asgi", 78 "callable": "application2", 79 }, 80 }, 81 } 82 }, 83 } 84 ) 85 86 setup_cookies('') 87 setup_cookies('app2_') 88 89 assert client.get(url="/1")['status'] == 204 90 assert client.get(url="/2")['status'] == 204 91 92 unit_stop() 93 94 assert_cookies('') 95 assert_cookies('app2_') 96 97 98def test_asgi_lifespan_failed(wait_for_record): 99 client.load('lifespan/failed') 100 101 assert client.get()['status'] == 503 102 103 assert ( 104 wait_for_record(r'\[error\].*Application startup failed') is not None 105 ), 'error message' 106 assert wait_for_record(r'Exception blah') is not None, 'exception' 107 108 109def test_asgi_lifespan_error(wait_for_record): 110 client.load('lifespan/error') 111 112 client.get() 113 114 assert wait_for_record(r'Exception blah') is not None, 'exception' 115 116 117def test_asgi_lifespan_error_auto(wait_for_record): 118 client.load('lifespan/error_auto') 119 120 client.get() 121 122 assert wait_for_record(r'AssertionError') is not None, 'assertion' 123