xref: /unit/test/test_asgi_lifespan.py (revision 2066:242192963d93)
1import os
2
3from conftest import unit_stop
4from packaging import version
5from unit.applications.lang.python import TestApplicationPython
6from unit.option import option
7
8
9class TestASGILifespan(TestApplicationPython):
10    prerequisites = {
11        'modules': {
12            'python': lambda v: version.parse(v) >= version.parse('3.5')
13        }
14    }
15    load_module = 'asgi'
16
17    def setup_cookies(self, prefix):
18        base_dir = option.test_dir + '/python/lifespan/empty'
19
20        os.chmod(base_dir, 0o777)
21
22        for name in ['startup', 'shutdown', 'version']:
23            path = option.test_dir + '/python/lifespan/empty/' + prefix + name
24            open(path, 'a').close()
25            os.chmod(path, 0o777)
26
27    def assert_cookies(self, prefix):
28        for name in ['startup', 'shutdown']:
29            path = option.test_dir + '/python/lifespan/empty/' + prefix + name
30            exists = os.path.isfile(path)
31            if exists:
32                os.remove(path)
33
34            assert not exists, name
35
36        path = option.test_dir + '/python/lifespan/empty/' + prefix + 'version'
37
38        with open(path, 'r') as f:
39            version = f.read()
40
41        os.remove(path)
42
43        assert version == '3.0 2.0', 'version'
44
45    def test_asgi_lifespan(self):
46        self.load('lifespan/empty')
47
48        self.setup_cookies('')
49
50        assert self.get()['status'] == 204
51
52        unit_stop()
53
54        self.assert_cookies('')
55
56    def test_asgi_lifespan_targets(self):
57        assert 'success' in self.conf(
58            {
59                "listeners": {"*:7080": {"pass": "routes"}},
60                "routes": [
61                    {
62                        "match": {"uri": "/1"},
63                        "action": {"pass": "applications/targets/1"},
64                    },
65                    {
66                        "match": {"uri": "/2"},
67                        "action": {"pass": "applications/targets/2"},
68                    },
69                ],
70                "applications": {
71                    "targets": {
72                        "type": self.get_application_type(),
73                        "processes": {"spare": 0},
74                        "working_directory": option.test_dir
75                        + "/python/lifespan/empty",
76                        "path": option.test_dir + '/python/lifespan/empty',
77                        "targets": {
78                            "1": {"module": "asgi", "callable": "application"},
79                            "2": {
80                                "module": "asgi",
81                                "callable": "application2",
82                            },
83                        },
84                    }
85                },
86            }
87        )
88
89        self.setup_cookies('')
90        self.setup_cookies('app2_')
91
92        assert self.get(url="/1")['status'] == 204
93        assert self.get(url="/2")['status'] == 204
94
95        unit_stop()
96
97        self.assert_cookies('')
98        self.assert_cookies('app2_')
99
100    def test_asgi_lifespan_failed(self):
101        self.load('lifespan/failed')
102
103        assert self.get()['status'] == 503
104
105        assert (
106            self.wait_for_record(r'\[error\].*Application startup failed')
107            is not None
108        ), 'error message'
109        assert self.wait_for_record(r'Exception blah') is not None, 'exception'
110
111    def test_asgi_lifespan_error(self):
112        self.load('lifespan/error')
113
114        self.get()
115
116        assert self.wait_for_record(r'Exception blah') is not None, 'exception'
117
118    def test_asgi_lifespan_error_auto(self):
119        self.load('lifespan/error_auto')
120
121        self.get()
122
123        assert self.wait_for_record(r'AssertionError') is not None, 'assertion'
124