xref: /unit/test/test_asgi_targets.py (revision 1971:3410f9d2a662)
1from distutils.version import LooseVersion
2
3import pytest
4from unit.applications.lang.python import TestApplicationPython
5from unit.option import option
6
7
8class TestASGITargets(TestApplicationPython):
9    prerequisites = {
10        'modules': {'python': lambda v: LooseVersion(v) >= LooseVersion('3.5')}
11    }
12    load_module = 'asgi'
13
14    @pytest.fixture(autouse=True)
15    def setup_method_fixture(self):
16        assert 'success' in self.conf(
17            {
18                "listeners": {"*:7080": {"pass": "routes"}},
19                "routes": [
20                    {
21                        "match": {"uri": "/1"},
22                        "action": {"pass": "applications/targets/1"},
23                    },
24                    {
25                        "match": {"uri": "/2"},
26                        "action": {"pass": "applications/targets/2"},
27                    },
28                ],
29                "applications": {
30                    "targets": {
31                        "type": "python",
32                        "processes": {"spare": 0},
33                        "working_directory": option.test_dir
34                        + "/python/targets/",
35                        "path": option.test_dir + '/python/targets/',
36                        "protocol": "asgi",
37                        "targets": {
38                            "1": {
39                                "module": "asgi",
40                                "callable": "application_200",
41                            },
42                            "2": {
43                                "module": "asgi",
44                                "callable": "application_201",
45                            },
46                        },
47                    }
48                },
49            }
50        )
51
52    def conf_targets(self, targets):
53        assert 'success' in self.conf(targets, 'applications/targets/targets')
54
55    def test_asgi_targets(self):
56        assert self.get(url='/1')['status'] == 200
57        assert self.get(url='/2')['status'] == 201
58
59    def test_asgi_targets_legacy(self):
60        self.conf_targets(
61            {
62                "1": {"module": "asgi", "callable": "legacy_application_200"},
63                "2": {"module": "asgi", "callable": "legacy_application_201"},
64            }
65        )
66
67        assert self.get(url='/1')['status'] == 200
68        assert self.get(url='/2')['status'] == 201
69
70    def test_asgi_targets_mix(self):
71        self.conf_targets(
72            {
73                "1": {"module": "asgi", "callable": "application_200"},
74                "2": {"module": "asgi", "callable": "legacy_application_201"},
75            }
76        )
77
78        assert self.get(url='/1')['status'] == 200
79        assert self.get(url='/2')['status'] == 201
80
81    def test_asgi_targets_broken(self, skip_alert):
82        skip_alert(r'Python failed to get "blah" from module')
83
84        self.conf_targets(
85            {
86                "1": {"module": "asgi", "callable": "application_200"},
87                "2": {"module": "asgi", "callable": "blah"},
88            }
89        )
90
91        assert self.get(url='/1')['status'] != 200
92