xref: /unit/test/test_asgi_lifespan.py (revision 1971:3410f9d2a662)
1import os
2from distutils.version import LooseVersion
3
4import pytest
5from conftest import unit_stop
6from unit.applications.lang.python import TestApplicationPython
7from unit.option import option
8
9
10class TestASGILifespan(TestApplicationPython):
11    prerequisites = {
12        'modules': {'python': lambda v: LooseVersion(v) >= LooseVersion('3.5')}
13    }
14    load_module = 'asgi'
15
16    def setup_cookies(self, prefix):
17        base_dir = option.test_dir + '/python/lifespan/empty'
18
19        os.chmod(base_dir, 0o777)
20
21        for name in ['startup', 'shutdown', 'version']:
22            path = option.test_dir + '/python/lifespan/empty/' + prefix + name
23            open(path, 'a').close()
24            os.chmod(path, 0o777)
25
26    def assert_cookies(self, prefix):
27        for name in ['startup', 'shutdown']:
28            path = option.test_dir + '/python/lifespan/empty/' + prefix + name
29            exists = os.path.isfile(path)
30            if exists:
31                os.remove(path)
32
33            assert not exists, name
34
35        path = option.test_dir + '/python/lifespan/empty/' + prefix + 'version'
36
37        with open(path, 'r') as f:
38            version = f.read()
39
40        os.remove(path)
41
42        assert version == '3.0 2.0', 'version'
43
44    def test_asgi_lifespan(self):
45        self.load('lifespan/empty')
46
47        self.setup_cookies('')
48
49        assert self.get()['status'] == 204
50
51        unit_stop()
52
53        self.assert_cookies('')
54
55    def test_asgi_lifespan_targets(self):
56        assert 'success' in self.conf(
57            {
58                "listeners": {"*:7080": {"pass": "routes"}},
59                "routes": [
60                    {
61                        "match": {"uri": "/1"},
62                        "action": {"pass": "applications/targets/1"},
63                    },
64                    {
65                        "match": {"uri": "/2"},
66                        "action": {"pass": "applications/targets/2"},
67                    },
68                ],
69                "applications": {
70                    "targets": {
71                        "type": "python",
72                        "processes": {"spare": 0},
73                        "working_directory": option.test_dir
74                        + "/python/lifespan/empty",
75                        "path": option.test_dir + '/python/lifespan/empty',
76                        "targets": {
77                            "1": {"module": "asgi", "callable": "application"},
78                            "2": {
79                                "module": "asgi",
80                                "callable": "application2",
81                            },
82                        },
83                    }
84                },
85            }
86        )
87
88        self.setup_cookies('')
89        self.setup_cookies('app2_')
90
91        assert self.get(url="/1")['status'] == 204
92        assert self.get(url="/2")['status'] == 204
93
94        unit_stop()
95
96        self.assert_cookies('')
97        self.assert_cookies('app2_')
98
99    def test_asgi_lifespan_failed(self):
100        self.load('lifespan/failed')
101
102        assert self.get()['status'] == 503
103
104        assert (
105            self.wait_for_record(r'\[error\].*Application startup failed')
106            is not None
107        ), 'error message'
108        assert self.wait_for_record(r'Exception blah') is not None, 'exception'
109
110    def test_asgi_lifespan_error(self):
111        self.load('lifespan/error')
112
113        self.get()
114
115        assert self.wait_for_record(r'Exception blah') is not None, 'exception'
116
117    def test_asgi_lifespan_error_auto(self):
118        self.load('lifespan/error_auto')
119
120        self.get()
121
122        assert self.wait_for_record(r'AssertionError') is not None, 'assertion'
123