1import grp 2import pwd 3import unittest 4 5from unit.applications.lang.go import TestApplicationGo 6from unit.feature.isolation import TestFeatureIsolation 7 8 9class TestGoIsolation(TestApplicationGo): 10 prerequisites = {'modules': {'go': 'any'}, 'features': ['isolation']} 11 12 isolation = TestFeatureIsolation() 13 14 @classmethod 15 def setUpClass(cls, complete_check=True): 16 unit = super().setUpClass(complete_check=False) 17 18 TestFeatureIsolation().check(cls.available, unit.testdir) 19 20 return unit if not complete_check else unit.complete() 21 22 def unpriv_creds(self): 23 nobody_uid = pwd.getpwnam('nobody').pw_uid 24 25 try: 26 nogroup_gid = grp.getgrnam('nogroup').gr_gid 27 nogroup = 'nogroup' 28 except: 29 nogroup_gid = grp.getgrnam('nobody').gr_gid 30 nogroup = 'nobody' 31 32 return (nobody_uid, nogroup_gid, nogroup) 33 34 def isolation_key(self, key): 35 return key in self.available['features']['isolation'].keys() 36 37 def test_isolation_values(self): 38 self.load('ns_inspect') 39 40 obj = self.getjson()['body'] 41 42 for ns, ns_value in self.available['features']['isolation'].items(): 43 if ns.upper() in obj['NS']: 44 self.assertEqual( 45 obj['NS'][ns.upper()], ns_value, '%s match' % ns 46 ) 47 48 def test_isolation_unpriv_user(self): 49 if not self.isolation_key('unprivileged_userns_clone'): 50 print('unprivileged clone is not available') 51 raise unittest.SkipTest() 52 53 if self.is_su: 54 print('privileged tests, skip this') 55 raise unittest.SkipTest() 56 57 self.load('ns_inspect') 58 obj = self.getjson()['body'] 59 60 self.assertEqual(obj['UID'], self.uid, 'uid match') 61 self.assertEqual(obj['GID'], self.gid, 'gid match') 62 63 self.load('ns_inspect', isolation={'namespaces': {'credential': True}}) 64 65 obj = self.getjson()['body'] 66 67 nobody_uid, nogroup_gid, nogroup = self.unpriv_creds() 68 69 # unprivileged unit map itself to nobody in the container by default 70 self.assertEqual(obj['UID'], nobody_uid, 'uid of nobody') 71 self.assertEqual(obj['GID'], nogroup_gid, 'gid of %s' % nogroup) 72 73 self.load( 74 'ns_inspect', 75 user='root', 76 isolation={'namespaces': {'credential': True}}, 77 ) 78 79 obj = self.getjson()['body'] 80 81 self.assertEqual(obj['UID'], 0, 'uid match user=root') 82 self.assertEqual(obj['GID'], 0, 'gid match user=root') 83 84 self.load( 85 'ns_inspect', 86 user='root', 87 group=nogroup, 88 isolation={'namespaces': {'credential': True}}, 89 ) 90 91 obj = self.getjson()['body'] 92 93 self.assertEqual(obj['UID'], 0, 'uid match user=root group=nogroup') 94 self.assertEqual( 95 obj['GID'], nogroup_gid, 'gid match user=root group=nogroup' 96 ) 97 98 self.load( 99 'ns_inspect', 100 user='root', 101 group='root', 102 isolation={ 103 'namespaces': {'credential': True}, 104 'uidmap': [{'container': 0, 'host': self.uid, 'size': 1}], 105 'gidmap': [{'container': 0, 'host': self.gid, 'size': 1}], 106 }, 107 ) 108 109 obj = self.getjson()['body'] 110 111 self.assertEqual(obj['UID'], 0, 'uid match uidmap') 112 self.assertEqual(obj['GID'], 0, 'gid match gidmap') 113 114 def test_isolation_priv_user(self): 115 if not self.is_su: 116 print('unprivileged tests, skip this') 117 raise unittest.SkipTest() 118 119 self.load('ns_inspect') 120 121 nobody_uid, nogroup_gid, nogroup = self.unpriv_creds() 122 123 obj = self.getjson()['body'] 124 125 self.assertEqual(obj['UID'], nobody_uid, 'uid match') 126 self.assertEqual(obj['GID'], nogroup_gid, 'gid match') 127 128 self.load('ns_inspect', isolation={'namespaces': {'credential': True}}) 129 130 obj = self.getjson()['body'] 131 132 # privileged unit map app creds in the container by default 133 self.assertEqual(obj['UID'], nobody_uid, 'uid nobody') 134 self.assertEqual(obj['GID'], nogroup_gid, 'gid nobody') 135 136 self.load( 137 'ns_inspect', 138 user='root', 139 isolation={'namespaces': {'credential': True}}, 140 ) 141 142 obj = self.getjson()['body'] 143 144 self.assertEqual(obj['UID'], 0, 'uid nobody user=root') 145 self.assertEqual(obj['GID'], 0, 'gid nobody user=root') 146 147 self.load( 148 'ns_inspect', 149 user='root', 150 group=nogroup, 151 isolation={'namespaces': {'credential': True}}, 152 ) 153 154 obj = self.getjson()['body'] 155 156 self.assertEqual(obj['UID'], 0, 'uid match user=root group=nogroup') 157 self.assertEqual( 158 obj['GID'], nogroup_gid, 'gid match user=root group=nogroup' 159 ) 160 161 self.load( 162 'ns_inspect', 163 user='root', 164 group='root', 165 isolation={ 166 'namespaces': {'credential': True}, 167 'uidmap': [{'container': 0, 'host': 0, 'size': 1}], 168 'gidmap': [{'container': 0, 'host': 0, 'size': 1}], 169 }, 170 ) 171 172 obj = self.getjson()['body'] 173 174 self.assertEqual(obj['UID'], 0, 'uid match uidmap user=root') 175 self.assertEqual(obj['GID'], 0, 'gid match gidmap user=root') 176 177 # map 65535 uids 178 self.load( 179 'ns_inspect', 180 user='nobody', 181 isolation={ 182 'namespaces': {'credential': True}, 183 'uidmap': [ 184 {'container': 0, 'host': 0, 'size': nobody_uid + 1} 185 ], 186 }, 187 ) 188 189 obj = self.getjson()['body'] 190 191 self.assertEqual( 192 obj['UID'], nobody_uid, 'uid match uidmap user=nobody' 193 ) 194 self.assertEqual( 195 obj['GID'], nogroup_gid, 'gid match uidmap user=nobody' 196 ) 197 198 def test_isolation_mnt(self): 199 if not self.isolation_key('mnt'): 200 print('mnt namespace is not supported') 201 raise unittest.SkipTest() 202 203 if not self.isolation_key('unprivileged_userns_clone'): 204 print('unprivileged clone is not available') 205 raise unittest.SkipTest() 206 207 self.load( 208 'ns_inspect', 209 isolation={'namespaces': {'mount': True, 'credential': True}}, 210 ) 211 212 obj = self.getjson()['body'] 213 214 # all but user and mnt 215 allns = list(self.available['features']['isolation'].keys()) 216 allns.remove('user') 217 allns.remove('mnt') 218 219 for ns in allns: 220 if ns.upper() in obj['NS']: 221 self.assertEqual( 222 obj['NS'][ns.upper()], 223 self.available['features']['isolation'][ns], 224 '%s match' % ns, 225 ) 226 227 self.assertNotEqual( 228 obj['NS']['MNT'], self.isolation.getns('mnt'), 'mnt set' 229 ) 230 self.assertNotEqual( 231 obj['NS']['USER'], self.isolation.getns('user'), 'user set' 232 ) 233 234 def test_isolation_pid(self): 235 if not self.isolation_key('pid'): 236 print('pid namespace is not supported') 237 raise unittest.SkipTest() 238 239 if not (self.is_su or self.isolation_key('unprivileged_userns_clone')): 240 print('requires root or unprivileged_userns_clone') 241 raise unittest.SkipTest() 242 243 self.load( 244 'ns_inspect', 245 isolation={'namespaces': {'pid': True, 'credential': True}}, 246 ) 247 248 obj = self.getjson()['body'] 249 250 self.assertEqual(obj['PID'], 1, 'pid of container is 1') 251 252 def test_isolation_namespace_false(self): 253 self.load('ns_inspect') 254 allns = list(self.available['features']['isolation'].keys()) 255 256 remove_list = ['unprivileged_userns_clone', 'ipc', 'cgroup'] 257 allns = [ns for ns in allns if ns not in remove_list] 258 259 namespaces = {} 260 for ns in allns: 261 if ns == 'user': 262 namespaces['credential'] = False 263 elif ns == 'mnt': 264 namespaces['mount'] = False 265 elif ns == 'net': 266 namespaces['network'] = False 267 elif ns == 'uts': 268 namespaces['uname'] = False 269 else: 270 namespaces[ns] = False 271 272 self.load('ns_inspect', isolation={'namespaces': namespaces}) 273 274 obj = self.getjson()['body'] 275 276 for ns in allns: 277 if ns.upper() in obj['NS']: 278 self.assertEqual( 279 obj['NS'][ns.upper()], 280 self.available['features']['isolation'][ns], 281 '%s match' % ns, 282 ) 283 284 def test_go_isolation_rootfs_container(self): 285 if not self.isolation_key('unprivileged_userns_clone'): 286 print('unprivileged clone is not available') 287 raise unittest.SkipTest() 288 289 if not self.isolation_key('mnt'): 290 print('mnt namespace is not supported') 291 raise unittest.SkipTest() 292 293 isolation = { 294 'namespaces': {'mount': True, 'credential': True}, 295 'rootfs': self.testdir, 296 } 297 298 self.load('ns_inspect', isolation=isolation) 299 300 obj = self.getjson(url='/?file=/go/app')['body'] 301 302 self.assertEqual(obj['FileExists'], True, 'app relative to rootfs') 303 304 obj = self.getjson(url='/?file=/bin/sh')['body'] 305 self.assertEqual(obj['FileExists'], False, 'file should not exists') 306 307 def test_go_isolation_rootfs_container_priv(self): 308 if not self.is_su: 309 print("requires root") 310 raise unittest.SkipTest() 311 312 if not self.isolation_key('mnt'): 313 print('mnt namespace is not supported') 314 raise unittest.SkipTest() 315 316 isolation = { 317 'namespaces': {'mount': True}, 318 'rootfs': self.testdir, 319 } 320 321 self.load('ns_inspect', isolation=isolation) 322 323 obj = self.getjson(url='/?file=/go/app')['body'] 324 325 self.assertEqual(obj['FileExists'], True, 'app relative to rootfs') 326 327 obj = self.getjson(url='/?file=/bin/sh')['body'] 328 self.assertEqual(obj['FileExists'], False, 'file should not exists') 329 330 331if __name__ == '__main__': 332 TestGoIsolation.main() 333