xref: /unit/test/test_python_isolation.py (revision 2616:ab2896c980ab)
1import re
2import subprocess
3from pathlib import Path
4
5import pytest
6
7from unit.applications.lang.python import ApplicationPython
8from unit.option import option
9from unit.utils import findmnt
10from unit.utils import waitformount
11from unit.utils import waitforunmount
12
13prerequisites = {'modules': {'python': 'any'}, 'features': {'isolation': True}}
14
15client = ApplicationPython()
16
17
18def get_cgroup(app_name):
19    output = subprocess.check_output(
20        ['ps', 'ax', '-o', 'pid', '-o', 'cmd']
21    ).decode()
22
23    pid = re.search(fr'(\d+)\s*unit: "{app_name}" application', output).group(1)
24
25    cgroup = f'/proc/{pid}/cgroup'
26
27    if not Path(cgroup).is_file():
28        pytest.skip(f'no cgroup at {cgroup}')
29
30    with open(cgroup, 'r', encoding='utf-8') as f:
31        return f.read().rstrip()
32
33
34def test_python_isolation_rootfs(is_su, require, temp_dir):
35    isolation = {'rootfs': temp_dir}
36
37    if not is_su:
38        require(
39            {
40                'features': {
41                    'isolation': [
42                        'unprivileged_userns_clone',
43                        'user',
44                        'mnt',
45                        'pid',
46                    ]
47                }
48            }
49        )
50
51        isolation['namespaces'] = {
52            'mount': True,
53            'credential': True,
54            'pid': True,
55        }
56
57    client.load('ns_inspect', isolation=isolation)
58
59    assert not (
60        client.getjson(url=f'/?path={temp_dir}')['body']['FileExists']
61    ), 'temp_dir does not exists in rootfs'
62
63    assert client.getjson(url='/?path=/proc/self')['body'][
64        'FileExists'
65    ], 'no /proc/self'
66
67    assert not (
68        client.getjson(url='/?path=/dev/pts')['body']['FileExists']
69    ), 'no /dev/pts'
70
71    assert not (
72        client.getjson(url='/?path=/sys/kernel')['body']['FileExists']
73    ), 'no /sys/kernel'
74
75    ret = client.getjson(url='/?path=/app/python/ns_inspect')
76
77    assert ret['body']['FileExists'], 'application exists in rootfs'
78
79
80def test_python_isolation_rootfs_no_language_deps(require, temp_dir):
81    require({'privileged_user': True})
82
83    isolation = {'rootfs': temp_dir, 'automount': {'language_deps': False}}
84    client.load('empty', isolation=isolation)
85
86    python_path = f'{temp_dir}/usr'
87
88    assert findmnt().find(python_path) == -1
89    assert client.get()['status'] != 200, 'disabled language_deps'
90    assert findmnt().find(python_path) == -1
91
92    isolation['automount']['language_deps'] = True
93
94    client.load('empty', isolation=isolation)
95
96    assert findmnt().find(python_path) == -1
97    assert client.get()['status'] == 200, 'enabled language_deps'
98    assert waitformount(python_path), 'language_deps mount'
99
100    client.conf({"listeners": {}, "applications": {}})
101
102    assert waitforunmount(python_path), 'language_deps unmount'
103
104
105def test_python_isolation_procfs(require, temp_dir):
106    require({'privileged_user': True})
107
108    isolation = {'rootfs': temp_dir, 'automount': {'procfs': False}}
109
110    client.load('ns_inspect', isolation=isolation)
111
112    assert not (
113        client.getjson(url='/?path=/proc/self')['body']['FileExists']
114    ), 'no /proc/self'
115
116    isolation['automount']['procfs'] = True
117
118    client.load('ns_inspect', isolation=isolation)
119
120    assert client.getjson(url='/?path=/proc/self')['body'][
121        'FileExists'
122    ], '/proc/self'
123
124
125def test_python_isolation_cgroup(require):
126    require({'privileged_user': True, 'features': {'isolation': ['cgroup']}})
127
128    def set_cgroup_path(path):
129        isolation = {'cgroup': {'path': path}}
130        client.load('empty', processes=1, isolation=isolation)
131
132    set_cgroup_path('scope/python')
133
134    cgroup_rel = Path(get_cgroup('empty'))
135    assert cgroup_rel.parts[-2:] == ('scope', 'python'), 'cgroup rel'
136
137    set_cgroup_path('/scope2/python')
138
139    cgroup_abs = Path(get_cgroup('empty'))
140    assert cgroup_abs.parts[-2:] == ('scope2', 'python'), 'cgroup abs'
141
142    assert len(cgroup_rel.parts) >= len(cgroup_abs.parts)
143
144
145def test_python_isolation_cgroup_two(require):
146    require({'privileged_user': True, 'features': {'isolation': ['cgroup']}})
147
148    def set_two_cgroup_path(path, path2):
149        script_path = f'{option.test_dir}/python/empty'
150
151        assert 'success' in client.conf(
152            {
153                "listeners": {
154                    "*:8080": {"pass": "applications/one"},
155                    "*:8081": {"pass": "applications/two"},
156                },
157                "applications": {
158                    "one": {
159                        "type": "python",
160                        "processes": 1,
161                        "path": script_path,
162                        "working_directory": script_path,
163                        "module": "wsgi",
164                        "isolation": {
165                            'cgroup': {'path': path},
166                        },
167                    },
168                    "two": {
169                        "type": "python",
170                        "processes": 1,
171                        "path": script_path,
172                        "working_directory": script_path,
173                        "module": "wsgi",
174                        "isolation": {
175                            'cgroup': {'path': path2},
176                        },
177                    },
178                },
179            }
180        )
181
182    set_two_cgroup_path('/scope/python', '/scope/python')
183    assert get_cgroup('one') == get_cgroup('two')
184
185    set_two_cgroup_path('/scope/python', '/scope2/python')
186    assert get_cgroup('one') != get_cgroup('two')
187
188
189def test_python_isolation_cgroup_invalid(require):
190    require({'privileged_user': True, 'features': {'isolation': ['cgroup']}})
191
192    def check_invalid(path):
193        script_path = f'{option.test_dir}/python/empty'
194        assert 'error' in client.conf(
195            {
196                "listeners": {"*:8080": {"pass": "applications/empty"}},
197                "applications": {
198                    "empty": {
199                        "type": "python",
200                        "processes": {"spare": 0},
201                        "path": script_path,
202                        "working_directory": script_path,
203                        "module": "wsgi",
204                        "isolation": {
205                            'cgroup': {'path': path},
206                        },
207                    }
208                },
209            }
210        )
211
212    check_invalid('')
213    check_invalid('../scope')
214    check_invalid('scope/../python')
215