1import grp 2import os 3import pwd 4import shutil 5 6import pytest 7 8from unit.applications.lang.go import TestApplicationGo 9from unit.option import option 10from unit.utils import getns 11 12class TestGoIsolation(TestApplicationGo): 13 prerequisites = {'modules': {'go': 'any'}, 'features': ['isolation']} 14 15 def unpriv_creds(self): 16 nobody_uid = pwd.getpwnam('nobody').pw_uid 17 18 try: 19 nogroup_gid = grp.getgrnam('nogroup').gr_gid 20 nogroup = 'nogroup' 21 except KeyError: 22 nogroup_gid = grp.getgrnam('nobody').gr_gid 23 nogroup = 'nobody' 24 25 return (nobody_uid, nogroup_gid, nogroup) 26 27 def isolation_key(self, key): 28 return key in option.available['features']['isolation'].keys() 29 30 def test_isolation_values(self): 31 self.load('ns_inspect') 32 33 obj = self.getjson()['body'] 34 35 for ns, ns_value in option.available['features']['isolation'].items(): 36 if ns.upper() in obj['NS']: 37 assert obj['NS'][ns.upper()] == ns_value, '%s match' % ns 38 39 def test_isolation_unpriv_user(self, is_su): 40 if not self.isolation_key('unprivileged_userns_clone'): 41 pytest.skip('unprivileged clone is not available') 42 43 if is_su: 44 pytest.skip('privileged tests, skip this') 45 46 self.load('ns_inspect') 47 obj = self.getjson()['body'] 48 49 assert obj['UID'] == os.geteuid(), 'uid match' 50 assert obj['GID'] == os.getegid(), 'gid match' 51 52 self.load('ns_inspect', isolation={'namespaces': {'credential': True}}) 53 54 obj = self.getjson()['body'] 55 56 nobody_uid, nogroup_gid, nogroup = self.unpriv_creds() 57 58 # unprivileged unit map itself to nobody in the container by default 59 assert obj['UID'] == nobody_uid, 'uid of nobody' 60 assert obj['GID'] == nogroup_gid, 'gid of %s' % nogroup 61 62 self.load( 63 'ns_inspect', 64 user='root', 65 isolation={'namespaces': {'credential': True}}, 66 ) 67 68 obj = self.getjson()['body'] 69 70 assert obj['UID'] == 0, 'uid match user=root' 71 assert obj['GID'] == 0, 'gid match user=root' 72 73 self.load( 74 'ns_inspect', 75 user='root', 76 group=nogroup, 77 isolation={'namespaces': {'credential': True}}, 78 ) 79 80 obj = self.getjson()['body'] 81 82 assert obj['UID'] == 0, 'uid match user=root group=nogroup' 83 assert obj['GID'] == nogroup_gid, 'gid match user=root group=nogroup' 84 85 self.load( 86 'ns_inspect', 87 user='root', 88 group='root', 89 isolation={ 90 'namespaces': {'credential': True}, 91 'uidmap': [{'container': 0, 'host': os.geteuid(), 'size': 1}], 92 'gidmap': [{'container': 0, 'host': os.getegid(), 'size': 1}], 93 }, 94 ) 95 96 obj = self.getjson()['body'] 97 98 assert obj['UID'] == 0, 'uid match uidmap' 99 assert obj['GID'] == 0, 'gid match gidmap' 100 101 def test_isolation_priv_user(self, is_su): 102 if not is_su: 103 pytest.skip('unprivileged tests, skip this') 104 105 self.load('ns_inspect') 106 107 nobody_uid, nogroup_gid, nogroup = self.unpriv_creds() 108 109 obj = self.getjson()['body'] 110 111 assert obj['UID'] == nobody_uid, 'uid match' 112 assert obj['GID'] == nogroup_gid, 'gid match' 113 114 self.load('ns_inspect', isolation={'namespaces': {'credential': True}}) 115 116 obj = self.getjson()['body'] 117 118 # privileged unit map app creds in the container by default 119 assert obj['UID'] == nobody_uid, 'uid nobody' 120 assert obj['GID'] == nogroup_gid, 'gid nobody' 121 122 self.load( 123 'ns_inspect', 124 user='root', 125 isolation={'namespaces': {'credential': True}}, 126 ) 127 128 obj = self.getjson()['body'] 129 130 assert obj['UID'] == 0, 'uid nobody user=root' 131 assert obj['GID'] == 0, 'gid nobody user=root' 132 133 self.load( 134 'ns_inspect', 135 user='root', 136 group=nogroup, 137 isolation={'namespaces': {'credential': True}}, 138 ) 139 140 obj = self.getjson()['body'] 141 142 assert obj['UID'] == 0, 'uid match user=root group=nogroup' 143 assert obj['GID'] == nogroup_gid, 'gid match user=root group=nogroup' 144 145 self.load( 146 'ns_inspect', 147 user='root', 148 group='root', 149 isolation={ 150 'namespaces': {'credential': True}, 151 'uidmap': [{'container': 0, 'host': 0, 'size': 1}], 152 'gidmap': [{'container': 0, 'host': 0, 'size': 1}], 153 }, 154 ) 155 156 obj = self.getjson()['body'] 157 158 assert obj['UID'] == 0, 'uid match uidmap user=root' 159 assert obj['GID'] == 0, 'gid match gidmap user=root' 160 161 # map 65535 uids 162 self.load( 163 'ns_inspect', 164 user='nobody', 165 isolation={ 166 'namespaces': {'credential': True}, 167 'uidmap': [ 168 {'container': 0, 'host': 0, 'size': nobody_uid + 1} 169 ], 170 }, 171 ) 172 173 obj = self.getjson()['body'] 174 175 assert obj['UID'] == nobody_uid, 'uid match uidmap user=nobody' 176 assert obj['GID'] == nogroup_gid, 'gid match uidmap user=nobody' 177 178 def test_isolation_mnt(self): 179 if not self.isolation_key('mnt'): 180 pytest.skip('mnt namespace is not supported') 181 182 if not self.isolation_key('unprivileged_userns_clone'): 183 pytest.skip('unprivileged clone is not available') 184 185 self.load( 186 'ns_inspect', 187 isolation={'namespaces': {'mount': True, 'credential': True}}, 188 ) 189 190 obj = self.getjson()['body'] 191 192 # all but user and mnt 193 allns = list(option.available['features']['isolation'].keys()) 194 allns.remove('user') 195 allns.remove('mnt') 196 197 for ns in allns: 198 if ns.upper() in obj['NS']: 199 assert ( 200 obj['NS'][ns.upper()] 201 == option.available['features']['isolation'][ns] 202 ), ('%s match' % ns) 203 204 assert obj['NS']['MNT'] != getns('mnt'), 'mnt set' 205 assert obj['NS']['USER'] != getns('user'), 'user set' 206 207 def test_isolation_pid(self, is_su): 208 if not self.isolation_key('pid'): 209 pytest.skip('pid namespace is not supported') 210 211 if not is_su: 212 if not self.isolation_key('unprivileged_userns_clone'): 213 pytest.skip('unprivileged clone is not available') 214 215 if not self.isolation_key('user'): 216 pytest.skip('user namespace is not supported') 217 218 if not self.isolation_key('mnt'): 219 pytest.skip('mnt namespace is not supported') 220 221 isolation = {'namespaces': {'pid': True}} 222 223 if not is_su: 224 isolation['namespaces']['mount'] = True 225 isolation['namespaces']['credential'] = True 226 227 self.load('ns_inspect', isolation=isolation) 228 229 obj = self.getjson()['body'] 230 231 assert obj['PID'] == 1, 'pid of container is 1' 232 233 def test_isolation_namespace_false(self): 234 self.load('ns_inspect') 235 allns = list(option.available['features']['isolation'].keys()) 236 237 remove_list = ['unprivileged_userns_clone', 'ipc', 'cgroup'] 238 allns = [ns for ns in allns if ns not in remove_list] 239 240 namespaces = {} 241 for ns in allns: 242 if ns == 'user': 243 namespaces['credential'] = False 244 elif ns == 'mnt': 245 namespaces['mount'] = False 246 elif ns == 'net': 247 namespaces['network'] = False 248 elif ns == 'uts': 249 namespaces['uname'] = False 250 else: 251 namespaces[ns] = False 252 253 self.load('ns_inspect', isolation={'namespaces': namespaces}) 254 255 obj = self.getjson()['body'] 256 257 for ns in allns: 258 if ns.upper() in obj['NS']: 259 assert ( 260 obj['NS'][ns.upper()] 261 == option.available['features']['isolation'][ns] 262 ), ('%s match' % ns) 263 264 def test_go_isolation_rootfs_container(self, is_su, temp_dir): 265 if not is_su: 266 if not self.isolation_key('unprivileged_userns_clone'): 267 pytest.skip('unprivileged clone is not available') 268 269 if not self.isolation_key('user'): 270 pytest.skip('user namespace is not supported') 271 272 if not self.isolation_key('mnt'): 273 pytest.skip('mnt namespace is not supported') 274 275 if not self.isolation_key('pid'): 276 pytest.skip('pid namespace is not supported') 277 278 isolation = {'rootfs': temp_dir} 279 280 if not is_su: 281 isolation['namespaces'] = { 282 'mount': True, 283 'credential': True, 284 'pid': True 285 } 286 287 self.load('ns_inspect', isolation=isolation) 288 289 obj = self.getjson(url='/?file=/go/app')['body'] 290 291 assert obj['FileExists'] == True, 'app relative to rootfs' 292 293 obj = self.getjson(url='/?file=/bin/sh')['body'] 294 assert obj['FileExists'] == False, 'file should not exists' 295 296 def test_go_isolation_rootfs_container_priv(self, is_su, temp_dir): 297 if not is_su: 298 pytest.skip('requires root') 299 300 if not self.isolation_key('mnt'): 301 pytest.skip('mnt namespace is not supported') 302 303 isolation = { 304 'namespaces': {'mount': True}, 305 'rootfs': temp_dir, 306 } 307 308 self.load('ns_inspect', isolation=isolation) 309 310 obj = self.getjson(url='/?file=/go/app')['body'] 311 312 assert obj['FileExists'] == True, 'app relative to rootfs' 313 314 obj = self.getjson(url='/?file=/bin/sh')['body'] 315 assert obj['FileExists'] == False, 'file should not exists' 316 317 def test_go_isolation_rootfs_automount_tmpfs(self, is_su, temp_dir): 318 try: 319 open("/proc/self/mountinfo") 320 except: 321 pytest.skip('The system lacks /proc/self/mountinfo file') 322 323 if not is_su: 324 if not self.isolation_key('unprivileged_userns_clone'): 325 pytest.skip('unprivileged clone is not available') 326 327 if not self.isolation_key('user'): 328 pytest.skip('user namespace is not supported') 329 330 if not self.isolation_key('mnt'): 331 pytest.skip('mnt namespace is not supported') 332 333 if not self.isolation_key('pid'): 334 pytest.skip('pid namespace is not supported') 335 336 isolation = {'rootfs': temp_dir} 337 338 if not is_su: 339 isolation['namespaces'] = { 340 'mount': True, 341 'credential': True, 342 'pid': True 343 } 344 345 self.load('ns_inspect', isolation=isolation) 346 347 obj = self.getjson(url='/?mounts=true')['body'] 348 349 assert ( 350 "/ /tmp" in obj['Mounts'] and "tmpfs" in obj['Mounts'] 351 ), 'app has /tmp mounted on /' 352 353 isolation['automount'] = { 354 'tmpfs': False 355 } 356 357 self.load('ns_inspect', isolation=isolation) 358 359 obj = self.getjson(url='/?mounts=true')['body'] 360 361 assert ( 362 "/ /tmp" not in obj['Mounts'] and "tmpfs" not in obj['Mounts'] 363 ), 'app has no /tmp mounted' 364