1import re 2import subprocess 3from pathlib import Path 4 5import pytest 6 7from unit.applications.lang.python import ApplicationPython 8from unit.option import option 9from unit.utils import findmnt 10from unit.utils import waitformount 11from unit.utils import waitforunmount 12 13prerequisites = {'modules': {'python': 'any'}, 'features': {'isolation': True}} 14 15client = ApplicationPython() 16 17 18def get_cgroup(app_name): 19 output = subprocess.check_output( 20 ['ps', 'ax', '-o', 'pid', '-o', 'cmd'] 21 ).decode() 22 23 pid = re.search(fr'(\d+)\s*unit: "{app_name}" application', output).group(1) 24 25 cgroup = f'/proc/{pid}/cgroup' 26 27 if not Path(cgroup).is_file(): 28 pytest.skip(f'no cgroup at {cgroup}') 29 30 with open(cgroup, 'r', encoding='utf-8') as f: 31 return f.read().rstrip() 32 33 34def test_python_isolation_rootfs(is_su, require, temp_dir): 35 isolation = {'rootfs': temp_dir} 36 37 if not is_su: 38 require( 39 { 40 'features': { 41 'isolation': [ 42 'unprivileged_userns_clone', 43 'user', 44 'mnt', 45 'pid', 46 ] 47 } 48 } 49 ) 50 51 isolation['namespaces'] = { 52 'mount': True, 53 'credential': True, 54 'pid': True, 55 } 56 57 client.load('ns_inspect', isolation=isolation) 58 59 assert not ( 60 client.getjson(url=f'/?path={temp_dir}')['body']['FileExists'] 61 ), 'temp_dir does not exists in rootfs' 62 63 assert client.getjson(url='/?path=/proc/self')['body'][ 64 'FileExists' 65 ], 'no /proc/self' 66 67 assert not ( 68 client.getjson(url='/?path=/dev/pts')['body']['FileExists'] 69 ), 'no /dev/pts' 70 71 assert not ( 72 client.getjson(url='/?path=/sys/kernel')['body']['FileExists'] 73 ), 'no /sys/kernel' 74 75 ret = client.getjson(url='/?path=/app/python/ns_inspect') 76 77 assert ret['body']['FileExists'], 'application exists in rootfs' 78 79 80def test_python_isolation_rootfs_no_language_deps(require, temp_dir): 81 require({'privileged_user': True}) 82 83 isolation = {'rootfs': temp_dir, 'automount': {'language_deps': False}} 84 client.load('empty', isolation=isolation) 85 86 python_path = f'{temp_dir}/usr' 87 88 assert findmnt().find(python_path) == -1 89 assert client.get()['status'] != 200, 'disabled language_deps' 90 assert findmnt().find(python_path) == -1 91 92 isolation['automount']['language_deps'] = True 93 94 client.load('empty', isolation=isolation) 95 96 assert findmnt().find(python_path) == -1 97 assert client.get()['status'] == 200, 'enabled language_deps' 98 assert waitformount(python_path), 'language_deps mount' 99 100 client.conf({"listeners": {}, "applications": {}}) 101 102 assert waitforunmount(python_path), 'language_deps unmount' 103 104 105def test_python_isolation_procfs(require, temp_dir): 106 require({'privileged_user': True}) 107 108 isolation = {'rootfs': temp_dir, 'automount': {'procfs': False}} 109 110 client.load('ns_inspect', isolation=isolation) 111 112 assert not ( 113 client.getjson(url='/?path=/proc/self')['body']['FileExists'] 114 ), 'no /proc/self' 115 116 isolation['automount']['procfs'] = True 117 118 client.load('ns_inspect', isolation=isolation) 119 120 assert client.getjson(url='/?path=/proc/self')['body'][ 121 'FileExists' 122 ], '/proc/self' 123 124 125def test_python_isolation_cgroup(require): 126 require({'privileged_user': True, 'features': {'isolation': ['cgroup']}}) 127 128 def set_cgroup_path(path): 129 isolation = {'cgroup': {'path': path}} 130 client.load('empty', processes=1, isolation=isolation) 131 132 set_cgroup_path('scope/python') 133 134 cgroup_rel = Path(get_cgroup('empty')) 135 assert cgroup_rel.parts[-2:] == ('scope', 'python'), 'cgroup rel' 136 137 set_cgroup_path('/scope2/python') 138 139 cgroup_abs = Path(get_cgroup('empty')) 140 assert cgroup_abs.parts[-2:] == ('scope2', 'python'), 'cgroup abs' 141 142 assert len(cgroup_rel.parts) >= len(cgroup_abs.parts) 143 144 145def test_python_isolation_cgroup_two(require): 146 require({'privileged_user': True, 'features': {'isolation': ['cgroup']}}) 147 148 def set_two_cgroup_path(path, path2): 149 script_path = f'{option.test_dir}/python/empty' 150 151 assert 'success' in client.conf( 152 { 153 "listeners": { 154 "*:8080": {"pass": "applications/one"}, 155 "*:8081": {"pass": "applications/two"}, 156 }, 157 "applications": { 158 "one": { 159 "type": "python", 160 "processes": 1, 161 "path": script_path, 162 "working_directory": script_path, 163 "module": "wsgi", 164 "isolation": { 165 'cgroup': {'path': path}, 166 }, 167 }, 168 "two": { 169 "type": "python", 170 "processes": 1, 171 "path": script_path, 172 "working_directory": script_path, 173 "module": "wsgi", 174 "isolation": { 175 'cgroup': {'path': path2}, 176 }, 177 }, 178 }, 179 } 180 ) 181 182 set_two_cgroup_path('/scope/python', '/scope/python') 183 assert get_cgroup('one') == get_cgroup('two') 184 185 set_two_cgroup_path('/scope/python', '/scope2/python') 186 assert get_cgroup('one') != get_cgroup('two') 187 188 189def test_python_isolation_cgroup_invalid(require): 190 require({'privileged_user': True, 'features': {'isolation': ['cgroup']}}) 191 192 def check_invalid(path): 193 script_path = f'{option.test_dir}/python/empty' 194 assert 'error' in client.conf( 195 { 196 "listeners": {"*:8080": {"pass": "applications/empty"}}, 197 "applications": { 198 "empty": { 199 "type": "python", 200 "processes": {"spare": 0}, 201 "path": script_path, 202 "working_directory": script_path, 203 "module": "wsgi", 204 "isolation": { 205 'cgroup': {'path': path}, 206 }, 207 } 208 }, 209 } 210 ) 211 212 check_invalid('') 213 check_invalid('../scope') 214 check_invalid('scope/../python') 215