xref: /unit/test/test_asgi_lifespan.py (revision 2616:ab2896c980ab)
1from pathlib import Path
2
3from packaging import version
4
5from conftest import unit_stop
6from unit.applications.lang.python import ApplicationPython
7from unit.option import option
8
9prerequisites = {
10    'modules': {'python': lambda v: version.parse(v) >= version.parse('3.5')}
11}
12
13client = ApplicationPython(load_module='asgi')
14
15
16def assert_cookies(prefix):
17    for name in ['startup', 'shutdown']:
18        path = Path(f'{option.test_dir}/python/lifespan/empty/{prefix}{name}')
19        exists = path.is_file()
20        path.unlink(missing_ok=True)
21
22        assert not exists, name
23
24    path = Path(f'{option.test_dir}/python/lifespan/empty/{prefix}version')
25    versions = path.read_text(encoding='utf-8')
26    path.unlink()
27
28    assert versions == '3.0 2.0', 'versions'
29
30
31def setup_cookies(prefix):
32    base_dir = Path(f'{option.test_dir}/python/lifespan/empty')
33    base_dir.chmod(0o777)
34
35    for name in ['startup', 'shutdown', 'version']:
36        path = Path(f'{option.test_dir}/python/lifespan/empty/{prefix}{name}')
37        path.touch(0o777)
38
39
40def test_asgi_lifespan():
41    client.load('lifespan/empty')
42
43    setup_cookies('')
44
45    assert client.get()['status'] == 204
46
47    unit_stop()
48
49    assert_cookies('')
50
51
52def test_asgi_lifespan_targets():
53    path = f'{option.test_dir}/python/lifespan/empty'
54
55    assert 'success' in client.conf(
56        {
57            "listeners": {"*:8080": {"pass": "routes"}},
58            "routes": [
59                {
60                    "match": {"uri": "/1"},
61                    "action": {"pass": "applications/targets/1"},
62                },
63                {
64                    "match": {"uri": "/2"},
65                    "action": {"pass": "applications/targets/2"},
66                },
67            ],
68            "applications": {
69                "targets": {
70                    "type": client.get_application_type(),
71                    "processes": {"spare": 0},
72                    "working_directory": path,
73                    "path": path,
74                    "targets": {
75                        "1": {"module": "asgi", "callable": "application"},
76                        "2": {
77                            "module": "asgi",
78                            "callable": "application2",
79                        },
80                    },
81                }
82            },
83        }
84    )
85
86    setup_cookies('')
87    setup_cookies('app2_')
88
89    assert client.get(url="/1")['status'] == 204
90    assert client.get(url="/2")['status'] == 204
91
92    unit_stop()
93
94    assert_cookies('')
95    assert_cookies('app2_')
96
97
98def test_asgi_lifespan_failed(wait_for_record):
99    client.load('lifespan/failed')
100
101    assert client.get()['status'] == 503
102
103    assert (
104        wait_for_record(r'\[error\].*Application startup failed') is not None
105    ), 'error message'
106    assert wait_for_record(r'Exception blah') is not None, 'exception'
107
108
109def test_asgi_lifespan_error(wait_for_record):
110    client.load('lifespan/error')
111
112    client.get()
113
114    assert wait_for_record(r'Exception blah') is not None, 'exception'
115
116
117def test_asgi_lifespan_error_auto(wait_for_record):
118    client.load('lifespan/error_auto')
119
120    client.get()
121
122    assert wait_for_record(r'AssertionError') is not None, 'assertion'
123