1import grp 2import os 3import pwd 4 5import pytest 6from unit.applications.lang.go import TestApplicationGo 7from unit.option import option 8from unit.utils import getns 9 10 11class TestGoIsolation(TestApplicationGo): 12 prerequisites = {'modules': {'go': 'any'}, 'features': ['isolation']} 13 14 @pytest.fixture(autouse=True) 15 def setup_method_fixture(self, skip_alert): 16 skip_alert(r'\[unit\] close\(\d+\) failed: Bad file descriptor') 17 18 def unpriv_creds(self): 19 nobody_uid = pwd.getpwnam('nobody').pw_uid 20 21 try: 22 nogroup_gid = grp.getgrnam('nogroup').gr_gid 23 nogroup = 'nogroup' 24 except KeyError: 25 nogroup_gid = grp.getgrnam('nobody').gr_gid 26 nogroup = 'nobody' 27 28 return (nobody_uid, nogroup_gid, nogroup) 29 30 def isolation_key(self, key): 31 return key in option.available['features']['isolation'].keys() 32 33 def test_isolation_values(self): 34 self.load('ns_inspect') 35 36 obj = self.getjson()['body'] 37 38 for ns, ns_value in option.available['features']['isolation'].items(): 39 if ns.upper() in obj['NS']: 40 assert obj['NS'][ns.upper()] == ns_value, f'{ns} match' 41 42 def test_isolation_unpriv_user(self, is_su): 43 if not self.isolation_key('unprivileged_userns_clone'): 44 pytest.skip('unprivileged clone is not available') 45 46 if is_su: 47 pytest.skip('privileged tests, skip this') 48 49 self.load('ns_inspect') 50 obj = self.getjson()['body'] 51 52 assert obj['UID'] == os.geteuid(), 'uid match' 53 assert obj['GID'] == os.getegid(), 'gid match' 54 55 self.load('ns_inspect', isolation={'namespaces': {'credential': True}}) 56 57 obj = self.getjson()['body'] 58 59 nobody_uid, nogroup_gid, nogroup = self.unpriv_creds() 60 61 # unprivileged unit map itself to nobody in the container by default 62 assert obj['UID'] == nobody_uid, 'uid of nobody' 63 assert obj['GID'] == nogroup_gid, f'gid of {nogroup}' 64 65 self.load( 66 'ns_inspect', 67 user='root', 68 isolation={'namespaces': {'credential': True}}, 69 ) 70 71 obj = self.getjson()['body'] 72 73 assert obj['UID'] == 0, 'uid match user=root' 74 assert obj['GID'] == 0, 'gid match user=root' 75 76 self.load( 77 'ns_inspect', 78 user='root', 79 group=nogroup, 80 isolation={'namespaces': {'credential': True}}, 81 ) 82 83 obj = self.getjson()['body'] 84 85 assert obj['UID'] == 0, 'uid match user=root group=nogroup' 86 assert obj['GID'] == nogroup_gid, 'gid match user=root group=nogroup' 87 88 self.load( 89 'ns_inspect', 90 user='root', 91 group='root', 92 isolation={ 93 'namespaces': {'credential': True}, 94 'uidmap': [{'container': 0, 'host': os.geteuid(), 'size': 1}], 95 'gidmap': [{'container': 0, 'host': os.getegid(), 'size': 1}], 96 }, 97 ) 98 99 obj = self.getjson()['body'] 100 101 assert obj['UID'] == 0, 'uid match uidmap' 102 assert obj['GID'] == 0, 'gid match gidmap' 103 104 def test_isolation_priv_user(self, is_su): 105 if not is_su: 106 pytest.skip('unprivileged tests, skip this') 107 108 self.load('ns_inspect') 109 110 nobody_uid, nogroup_gid, nogroup = self.unpriv_creds() 111 112 obj = self.getjson()['body'] 113 114 assert obj['UID'] == nobody_uid, 'uid match' 115 assert obj['GID'] == nogroup_gid, 'gid match' 116 117 self.load('ns_inspect', isolation={'namespaces': {'credential': True}}) 118 119 obj = self.getjson()['body'] 120 121 # privileged unit map app creds in the container by default 122 assert obj['UID'] == nobody_uid, 'uid nobody' 123 assert obj['GID'] == nogroup_gid, 'gid nobody' 124 125 self.load( 126 'ns_inspect', 127 user='root', 128 isolation={'namespaces': {'credential': True}}, 129 ) 130 131 obj = self.getjson()['body'] 132 133 assert obj['UID'] == 0, 'uid nobody user=root' 134 assert obj['GID'] == 0, 'gid nobody user=root' 135 136 self.load( 137 'ns_inspect', 138 user='root', 139 group=nogroup, 140 isolation={'namespaces': {'credential': True}}, 141 ) 142 143 obj = self.getjson()['body'] 144 145 assert obj['UID'] == 0, 'uid match user=root group=nogroup' 146 assert obj['GID'] == nogroup_gid, 'gid match user=root group=nogroup' 147 148 self.load( 149 'ns_inspect', 150 user='root', 151 group='root', 152 isolation={ 153 'namespaces': {'credential': True}, 154 'uidmap': [{'container': 0, 'host': 0, 'size': 1}], 155 'gidmap': [{'container': 0, 'host': 0, 'size': 1}], 156 }, 157 ) 158 159 obj = self.getjson()['body'] 160 161 assert obj['UID'] == 0, 'uid match uidmap user=root' 162 assert obj['GID'] == 0, 'gid match gidmap user=root' 163 164 # map 65535 uids 165 self.load( 166 'ns_inspect', 167 user='nobody', 168 isolation={ 169 'namespaces': {'credential': True}, 170 'uidmap': [{'container': 0, 'host': 0, 'size': nobody_uid + 1}], 171 }, 172 ) 173 174 obj = self.getjson()['body'] 175 176 assert obj['UID'] == nobody_uid, 'uid match uidmap user=nobody' 177 assert obj['GID'] == nogroup_gid, 'gid match uidmap user=nobody' 178 179 def test_isolation_mnt(self): 180 if not self.isolation_key('mnt'): 181 pytest.skip('mnt namespace is not supported') 182 183 if not self.isolation_key('unprivileged_userns_clone'): 184 pytest.skip('unprivileged clone is not available') 185 186 self.load( 187 'ns_inspect', 188 isolation={'namespaces': {'mount': True, 'credential': True}}, 189 ) 190 191 obj = self.getjson()['body'] 192 193 # all but user and mnt 194 allns = list(option.available['features']['isolation'].keys()) 195 allns.remove('user') 196 allns.remove('mnt') 197 198 for ns in allns: 199 if ns.upper() in obj['NS']: 200 assert ( 201 obj['NS'][ns.upper()] 202 == option.available['features']['isolation'][ns] 203 ), f'{ns} match' 204 205 assert obj['NS']['MNT'] != getns('mnt'), 'mnt set' 206 assert obj['NS']['USER'] != getns('user'), 'user set' 207 208 def test_isolation_pid(self, is_su): 209 if not self.isolation_key('pid'): 210 pytest.skip('pid namespace is not supported') 211 212 if not is_su: 213 if not self.isolation_key('unprivileged_userns_clone'): 214 pytest.skip('unprivileged clone is not available') 215 216 if not self.isolation_key('user'): 217 pytest.skip('user namespace is not supported') 218 219 if not self.isolation_key('mnt'): 220 pytest.skip('mnt namespace is not supported') 221 222 isolation = {'namespaces': {'pid': True}} 223 224 if not is_su: 225 isolation['namespaces']['mount'] = True 226 isolation['namespaces']['credential'] = True 227 228 self.load('ns_inspect', isolation=isolation) 229 230 obj = self.getjson()['body'] 231 232 assert obj['PID'] == 2, 'pid of container is 2' 233 234 def test_isolation_namespace_false(self): 235 self.load('ns_inspect') 236 allns = list(option.available['features']['isolation'].keys()) 237 238 remove_list = ['unprivileged_userns_clone', 'ipc', 'cgroup'] 239 allns = [ns for ns in allns if ns not in remove_list] 240 241 namespaces = {} 242 for ns in allns: 243 if ns == 'user': 244 namespaces['credential'] = False 245 elif ns == 'mnt': 246 namespaces['mount'] = False 247 elif ns == 'net': 248 namespaces['network'] = False 249 elif ns == 'uts': 250 namespaces['uname'] = False 251 else: 252 namespaces[ns] = False 253 254 self.load('ns_inspect', isolation={'namespaces': namespaces}) 255 256 obj = self.getjson()['body'] 257 258 for ns in allns: 259 if ns.upper() in obj['NS']: 260 assert ( 261 obj['NS'][ns.upper()] 262 == option.available['features']['isolation'][ns] 263 ), f'{ns} match' 264 265 def test_go_isolation_rootfs_container(self, is_su, temp_dir): 266 if not is_su: 267 if not self.isolation_key('unprivileged_userns_clone'): 268 pytest.skip('unprivileged clone is not available') 269 270 if not self.isolation_key('user'): 271 pytest.skip('user namespace is not supported') 272 273 if not self.isolation_key('mnt'): 274 pytest.skip('mnt namespace is not supported') 275 276 if not self.isolation_key('pid'): 277 pytest.skip('pid namespace is not supported') 278 279 isolation = {'rootfs': temp_dir} 280 281 if not is_su: 282 isolation['namespaces'] = { 283 'mount': True, 284 'credential': True, 285 'pid': True, 286 } 287 288 self.load('ns_inspect', isolation=isolation) 289 290 obj = self.getjson(url='/?file=/go/app')['body'] 291 292 assert obj['FileExists'], 'app relative to rootfs' 293 294 obj = self.getjson(url='/?file=/bin/sh')['body'] 295 assert not obj['FileExists'], 'file should not exists' 296 297 def test_go_isolation_rootfs_container_priv(self, is_su, temp_dir): 298 if not is_su: 299 pytest.skip('requires root') 300 301 if not self.isolation_key('mnt'): 302 pytest.skip('mnt namespace is not supported') 303 304 isolation = { 305 'namespaces': {'mount': True}, 306 'rootfs': temp_dir, 307 } 308 309 self.load('ns_inspect', isolation=isolation) 310 311 obj = self.getjson(url='/?file=/go/app')['body'] 312 313 assert obj['FileExists'], 'app relative to rootfs' 314 315 obj = self.getjson(url='/?file=/bin/sh')['body'] 316 assert not obj['FileExists'], 'file should not exists' 317 318 def test_go_isolation_rootfs_automount_tmpfs(self, is_su, temp_dir): 319 try: 320 open("/proc/self/mountinfo") 321 except: 322 pytest.skip('The system lacks /proc/self/mountinfo file') 323 324 if not is_su: 325 if not self.isolation_key('unprivileged_userns_clone'): 326 pytest.skip('unprivileged clone is not available') 327 328 if not self.isolation_key('user'): 329 pytest.skip('user namespace is not supported') 330 331 if not self.isolation_key('mnt'): 332 pytest.skip('mnt namespace is not supported') 333 334 if not self.isolation_key('pid'): 335 pytest.skip('pid namespace is not supported') 336 337 isolation = {'rootfs': temp_dir} 338 339 if not is_su: 340 isolation['namespaces'] = { 341 'mount': True, 342 'credential': True, 343 'pid': True, 344 } 345 346 isolation['automount'] = {'tmpfs': False} 347 348 self.load('ns_inspect', isolation=isolation) 349 350 obj = self.getjson(url='/?mounts=true')['body'] 351 352 assert ( 353 "/ /tmp" not in obj['Mounts'] and "tmpfs" not in obj['Mounts'] 354 ), 'app has no /tmp mounted' 355 356 isolation['automount'] = {'tmpfs': True} 357 358 self.load('ns_inspect', isolation=isolation) 359 360 obj = self.getjson(url='/?mounts=true')['body'] 361 362 assert ( 363 "/ /tmp" in obj['Mounts'] and "tmpfs" in obj['Mounts'] 364 ), 'app has /tmp mounted on /' 365