1import grp 2import os 3import pwd 4 5import pytest 6from unit.applications.lang.go import TestApplicationGo 7from unit.option import option 8from unit.utils import getns 9 10 11class TestGoIsolation(TestApplicationGo): 12 prerequisites = {'modules': {'go': 'any'}, 'features': ['isolation']} 13 14 @pytest.fixture(autouse=True) 15 def setup_method_fixture(self, request, skip_alert): 16 skip_alert(r'\[unit\] close\(\d+\) failed: Bad file descriptor') 17 18 def unpriv_creds(self): 19 nobody_uid = pwd.getpwnam('nobody').pw_uid 20 21 try: 22 nogroup_gid = grp.getgrnam('nogroup').gr_gid 23 nogroup = 'nogroup' 24 except KeyError: 25 nogroup_gid = grp.getgrnam('nobody').gr_gid 26 nogroup = 'nobody' 27 28 return (nobody_uid, nogroup_gid, nogroup) 29 30 def isolation_key(self, key): 31 return key in option.available['features']['isolation'].keys() 32 33 def test_isolation_values(self): 34 self.load('ns_inspect') 35 36 obj = self.getjson()['body'] 37 38 for ns, ns_value in option.available['features']['isolation'].items(): 39 if ns.upper() in obj['NS']: 40 assert obj['NS'][ns.upper()] == ns_value, '%s match' % ns 41 42 def test_isolation_unpriv_user(self, is_su): 43 if not self.isolation_key('unprivileged_userns_clone'): 44 pytest.skip('unprivileged clone is not available') 45 46 if is_su: 47 pytest.skip('privileged tests, skip this') 48 49 self.load('ns_inspect') 50 obj = self.getjson()['body'] 51 52 assert obj['UID'] == os.geteuid(), 'uid match' 53 assert obj['GID'] == os.getegid(), 'gid match' 54 55 self.load('ns_inspect', isolation={'namespaces': {'credential': True}}) 56 57 obj = self.getjson()['body'] 58 59 nobody_uid, nogroup_gid, nogroup = self.unpriv_creds() 60 61 # unprivileged unit map itself to nobody in the container by default 62 assert obj['UID'] == nobody_uid, 'uid of nobody' 63 assert obj['GID'] == nogroup_gid, 'gid of %s' % nogroup 64 65 self.load( 66 'ns_inspect', 67 user='root', 68 isolation={'namespaces': {'credential': True}}, 69 ) 70 71 obj = self.getjson()['body'] 72 73 assert obj['UID'] == 0, 'uid match user=root' 74 assert obj['GID'] == 0, 'gid match user=root' 75 76 self.load( 77 'ns_inspect', 78 user='root', 79 group=nogroup, 80 isolation={'namespaces': {'credential': True}}, 81 ) 82 83 obj = self.getjson()['body'] 84 85 assert obj['UID'] == 0, 'uid match user=root group=nogroup' 86 assert obj['GID'] == nogroup_gid, 'gid match user=root group=nogroup' 87 88 self.load( 89 'ns_inspect', 90 user='root', 91 group='root', 92 isolation={ 93 'namespaces': {'credential': True}, 94 'uidmap': [{'container': 0, 'host': os.geteuid(), 'size': 1}], 95 'gidmap': [{'container': 0, 'host': os.getegid(), 'size': 1}], 96 }, 97 ) 98 99 obj = self.getjson()['body'] 100 101 assert obj['UID'] == 0, 'uid match uidmap' 102 assert obj['GID'] == 0, 'gid match gidmap' 103 104 def test_isolation_priv_user(self, is_su): 105 if not is_su: 106 pytest.skip('unprivileged tests, skip this') 107 108 self.load('ns_inspect') 109 110 nobody_uid, nogroup_gid, nogroup = self.unpriv_creds() 111 112 obj = self.getjson()['body'] 113 114 assert obj['UID'] == nobody_uid, 'uid match' 115 assert obj['GID'] == nogroup_gid, 'gid match' 116 117 self.load('ns_inspect', isolation={'namespaces': {'credential': True}}) 118 119 obj = self.getjson()['body'] 120 121 # privileged unit map app creds in the container by default 122 assert obj['UID'] == nobody_uid, 'uid nobody' 123 assert obj['GID'] == nogroup_gid, 'gid nobody' 124 125 self.load( 126 'ns_inspect', 127 user='root', 128 isolation={'namespaces': {'credential': True}}, 129 ) 130 131 obj = self.getjson()['body'] 132 133 assert obj['UID'] == 0, 'uid nobody user=root' 134 assert obj['GID'] == 0, 'gid nobody user=root' 135 136 self.load( 137 'ns_inspect', 138 user='root', 139 group=nogroup, 140 isolation={'namespaces': {'credential': True}}, 141 ) 142 143 obj = self.getjson()['body'] 144 145 assert obj['UID'] == 0, 'uid match user=root group=nogroup' 146 assert obj['GID'] == nogroup_gid, 'gid match user=root group=nogroup' 147 148 self.load( 149 'ns_inspect', 150 user='root', 151 group='root', 152 isolation={ 153 'namespaces': {'credential': True}, 154 'uidmap': [{'container': 0, 'host': 0, 'size': 1}], 155 'gidmap': [{'container': 0, 'host': 0, 'size': 1}], 156 }, 157 ) 158 159 obj = self.getjson()['body'] 160 161 assert obj['UID'] == 0, 'uid match uidmap user=root' 162 assert obj['GID'] == 0, 'gid match gidmap user=root' 163 164 # map 65535 uids 165 self.load( 166 'ns_inspect', 167 user='nobody', 168 isolation={ 169 'namespaces': {'credential': True}, 170 'uidmap': [ 171 {'container': 0, 'host': 0, 'size': nobody_uid + 1} 172 ], 173 }, 174 ) 175 176 obj = self.getjson()['body'] 177 178 assert obj['UID'] == nobody_uid, 'uid match uidmap user=nobody' 179 assert obj['GID'] == nogroup_gid, 'gid match uidmap user=nobody' 180 181 def test_isolation_mnt(self): 182 if not self.isolation_key('mnt'): 183 pytest.skip('mnt namespace is not supported') 184 185 if not self.isolation_key('unprivileged_userns_clone'): 186 pytest.skip('unprivileged clone is not available') 187 188 self.load( 189 'ns_inspect', 190 isolation={'namespaces': {'mount': True, 'credential': True}}, 191 ) 192 193 obj = self.getjson()['body'] 194 195 # all but user and mnt 196 allns = list(option.available['features']['isolation'].keys()) 197 allns.remove('user') 198 allns.remove('mnt') 199 200 for ns in allns: 201 if ns.upper() in obj['NS']: 202 assert ( 203 obj['NS'][ns.upper()] 204 == option.available['features']['isolation'][ns] 205 ), ('%s match' % ns) 206 207 assert obj['NS']['MNT'] != getns('mnt'), 'mnt set' 208 assert obj['NS']['USER'] != getns('user'), 'user set' 209 210 def test_isolation_pid(self, is_su): 211 if not self.isolation_key('pid'): 212 pytest.skip('pid namespace is not supported') 213 214 if not is_su: 215 if not self.isolation_key('unprivileged_userns_clone'): 216 pytest.skip('unprivileged clone is not available') 217 218 if not self.isolation_key('user'): 219 pytest.skip('user namespace is not supported') 220 221 if not self.isolation_key('mnt'): 222 pytest.skip('mnt namespace is not supported') 223 224 isolation = {'namespaces': {'pid': True}} 225 226 if not is_su: 227 isolation['namespaces']['mount'] = True 228 isolation['namespaces']['credential'] = True 229 230 self.load('ns_inspect', isolation=isolation) 231 232 obj = self.getjson()['body'] 233 234 assert obj['PID'] == 2, 'pid of container is 2' 235 236 def test_isolation_namespace_false(self): 237 self.load('ns_inspect') 238 allns = list(option.available['features']['isolation'].keys()) 239 240 remove_list = ['unprivileged_userns_clone', 'ipc', 'cgroup'] 241 allns = [ns for ns in allns if ns not in remove_list] 242 243 namespaces = {} 244 for ns in allns: 245 if ns == 'user': 246 namespaces['credential'] = False 247 elif ns == 'mnt': 248 namespaces['mount'] = False 249 elif ns == 'net': 250 namespaces['network'] = False 251 elif ns == 'uts': 252 namespaces['uname'] = False 253 else: 254 namespaces[ns] = False 255 256 self.load('ns_inspect', isolation={'namespaces': namespaces}) 257 258 obj = self.getjson()['body'] 259 260 for ns in allns: 261 if ns.upper() in obj['NS']: 262 assert ( 263 obj['NS'][ns.upper()] 264 == option.available['features']['isolation'][ns] 265 ), ('%s match' % ns) 266 267 def test_go_isolation_rootfs_container(self, is_su, temp_dir): 268 if not is_su: 269 if not self.isolation_key('unprivileged_userns_clone'): 270 pytest.skip('unprivileged clone is not available') 271 272 if not self.isolation_key('user'): 273 pytest.skip('user namespace is not supported') 274 275 if not self.isolation_key('mnt'): 276 pytest.skip('mnt namespace is not supported') 277 278 if not self.isolation_key('pid'): 279 pytest.skip('pid namespace is not supported') 280 281 isolation = {'rootfs': temp_dir} 282 283 if not is_su: 284 isolation['namespaces'] = { 285 'mount': True, 286 'credential': True, 287 'pid': True, 288 } 289 290 self.load('ns_inspect', isolation=isolation) 291 292 obj = self.getjson(url='/?file=/go/app')['body'] 293 294 assert obj['FileExists'] == True, 'app relative to rootfs' 295 296 obj = self.getjson(url='/?file=/bin/sh')['body'] 297 assert obj['FileExists'] == False, 'file should not exists' 298 299 def test_go_isolation_rootfs_container_priv(self, is_su, temp_dir): 300 if not is_su: 301 pytest.skip('requires root') 302 303 if not self.isolation_key('mnt'): 304 pytest.skip('mnt namespace is not supported') 305 306 isolation = { 307 'namespaces': {'mount': True}, 308 'rootfs': temp_dir, 309 } 310 311 self.load('ns_inspect', isolation=isolation) 312 313 obj = self.getjson(url='/?file=/go/app')['body'] 314 315 assert obj['FileExists'] == True, 'app relative to rootfs' 316 317 obj = self.getjson(url='/?file=/bin/sh')['body'] 318 assert obj['FileExists'] == False, 'file should not exists' 319 320 def test_go_isolation_rootfs_automount_tmpfs(self, is_su, temp_dir): 321 try: 322 open("/proc/self/mountinfo") 323 except: 324 pytest.skip('The system lacks /proc/self/mountinfo file') 325 326 if not is_su: 327 if not self.isolation_key('unprivileged_userns_clone'): 328 pytest.skip('unprivileged clone is not available') 329 330 if not self.isolation_key('user'): 331 pytest.skip('user namespace is not supported') 332 333 if not self.isolation_key('mnt'): 334 pytest.skip('mnt namespace is not supported') 335 336 if not self.isolation_key('pid'): 337 pytest.skip('pid namespace is not supported') 338 339 isolation = {'rootfs': temp_dir} 340 341 if not is_su: 342 isolation['namespaces'] = { 343 'mount': True, 344 'credential': True, 345 'pid': True, 346 } 347 348 isolation['automount'] = {'tmpfs': False} 349 350 self.load('ns_inspect', isolation=isolation) 351 352 obj = self.getjson(url='/?mounts=true')['body'] 353 354 assert ( 355 "/ /tmp" not in obj['Mounts'] and "tmpfs" not in obj['Mounts'] 356 ), 'app has no /tmp mounted' 357 358 isolation['automount'] = {'tmpfs': True} 359 360 self.load('ns_inspect', isolation=isolation) 361 362 obj = self.getjson(url='/?mounts=true')['body'] 363 364 assert ( 365 "/ /tmp" in obj['Mounts'] and "tmpfs" in obj['Mounts'] 366 ), 'app has /tmp mounted on /' 367