1import grp
| 1import grp
|
| 2import os
|
2import pwd
| 3import pwd
|
3import unittest
| 4import pytest
|
4 5from unit.applications.lang.go import TestApplicationGo 6from unit.feature.isolation import TestFeatureIsolation 7
| 5 6from unit.applications.lang.go import TestApplicationGo 7from unit.feature.isolation import TestFeatureIsolation 8
|
8
| |
9class TestGoIsolation(TestApplicationGo): 10 prerequisites = {'modules': {'go': 'any'}, 'features': ['isolation']} 11 12 isolation = TestFeatureIsolation() 13 14 @classmethod
| 9class TestGoIsolation(TestApplicationGo): 10 prerequisites = {'modules': {'go': 'any'}, 'features': ['isolation']} 11 12 isolation = TestFeatureIsolation() 13 14 @classmethod
|
15 def setUpClass(cls, complete_check=True): 16 unit = super().setUpClass(complete_check=False)
| 15 def setup_class(cls, complete_check=True): 16 unit = super().setup_class(complete_check=False)
|
17
| 17
|
18 TestFeatureIsolation().check(cls.available, unit.testdir)
| 18 TestFeatureIsolation().check(cls.available, unit.temp_dir)
|
19 20 return unit if not complete_check else unit.complete() 21 22 def unpriv_creds(self): 23 nobody_uid = pwd.getpwnam('nobody').pw_uid 24 25 try: 26 nogroup_gid = grp.getgrnam('nogroup').gr_gid 27 nogroup = 'nogroup' 28 except: 29 nogroup_gid = grp.getgrnam('nobody').gr_gid 30 nogroup = 'nobody' 31 32 return (nobody_uid, nogroup_gid, nogroup) 33 34 def isolation_key(self, key): 35 return key in self.available['features']['isolation'].keys() 36 37 def test_isolation_values(self): 38 self.load('ns_inspect') 39 40 obj = self.getjson()['body'] 41 42 for ns, ns_value in self.available['features']['isolation'].items(): 43 if ns.upper() in obj['NS']:
| 19 20 return unit if not complete_check else unit.complete() 21 22 def unpriv_creds(self): 23 nobody_uid = pwd.getpwnam('nobody').pw_uid 24 25 try: 26 nogroup_gid = grp.getgrnam('nogroup').gr_gid 27 nogroup = 'nogroup' 28 except: 29 nogroup_gid = grp.getgrnam('nobody').gr_gid 30 nogroup = 'nobody' 31 32 return (nobody_uid, nogroup_gid, nogroup) 33 34 def isolation_key(self, key): 35 return key in self.available['features']['isolation'].keys() 36 37 def test_isolation_values(self): 38 self.load('ns_inspect') 39 40 obj = self.getjson()['body'] 41 42 for ns, ns_value in self.available['features']['isolation'].items(): 43 if ns.upper() in obj['NS']:
|
44 self.assertEqual( 45 obj['NS'][ns.upper()], ns_value, '%s match' % ns 46 )
| 44 assert obj['NS'][ns.upper()] == ns_value, '%s match' % ns
|
47
| 45
|
48 def test_isolation_unpriv_user(self):
| 46 def test_isolation_unpriv_user(self, is_su):
|
49 if not self.isolation_key('unprivileged_userns_clone'):
| 47 if not self.isolation_key('unprivileged_userns_clone'):
|
50 print('unprivileged clone is not available') 51 raise unittest.SkipTest()
| 48 pytest.skip('unprivileged clone is not available')
|
52
| 49
|
53 if self.is_su: 54 print('privileged tests, skip this') 55 raise unittest.SkipTest()
| 50 if is_su: 51 pytest.skip('privileged tests, skip this')
|
56 57 self.load('ns_inspect') 58 obj = self.getjson()['body'] 59
| 52 53 self.load('ns_inspect') 54 obj = self.getjson()['body'] 55
|
60 self.assertEqual(obj['UID'], self.uid, 'uid match') 61 self.assertEqual(obj['GID'], self.gid, 'gid match')
| 56 assert obj['UID'] == os.geteuid(), 'uid match' 57 assert obj['GID'] == os.getegid(), 'gid match'
|
62 63 self.load('ns_inspect', isolation={'namespaces': {'credential': True}}) 64 65 obj = self.getjson()['body'] 66 67 nobody_uid, nogroup_gid, nogroup = self.unpriv_creds() 68 69 # unprivileged unit map itself to nobody in the container by default
| 58 59 self.load('ns_inspect', isolation={'namespaces': {'credential': True}}) 60 61 obj = self.getjson()['body'] 62 63 nobody_uid, nogroup_gid, nogroup = self.unpriv_creds() 64 65 # unprivileged unit map itself to nobody in the container by default
|
70 self.assertEqual(obj['UID'], nobody_uid, 'uid of nobody') 71 self.assertEqual(obj['GID'], nogroup_gid, 'gid of %s' % nogroup)
| 66 assert obj['UID'] == nobody_uid, 'uid of nobody' 67 assert obj['GID'] == nogroup_gid, 'gid of %s' % nogroup
|
72 73 self.load( 74 'ns_inspect', 75 user='root', 76 isolation={'namespaces': {'credential': True}}, 77 ) 78 79 obj = self.getjson()['body'] 80
| 68 69 self.load( 70 'ns_inspect', 71 user='root', 72 isolation={'namespaces': {'credential': True}}, 73 ) 74 75 obj = self.getjson()['body'] 76
|
81 self.assertEqual(obj['UID'], 0, 'uid match user=root') 82 self.assertEqual(obj['GID'], 0, 'gid match user=root')
| 77 assert obj['UID'] == 0, 'uid match user=root' 78 assert obj['GID'] == 0, 'gid match user=root'
|
83 84 self.load( 85 'ns_inspect', 86 user='root', 87 group=nogroup, 88 isolation={'namespaces': {'credential': True}}, 89 ) 90 91 obj = self.getjson()['body'] 92
| 79 80 self.load( 81 'ns_inspect', 82 user='root', 83 group=nogroup, 84 isolation={'namespaces': {'credential': True}}, 85 ) 86 87 obj = self.getjson()['body'] 88
|
93 self.assertEqual(obj['UID'], 0, 'uid match user=root group=nogroup') 94 self.assertEqual( 95 obj['GID'], nogroup_gid, 'gid match user=root group=nogroup' 96 )
| 89 assert obj['UID'] == 0, 'uid match user=root group=nogroup' 90 assert obj['GID'] == nogroup_gid, 'gid match user=root group=nogroup'
|
97 98 self.load( 99 'ns_inspect', 100 user='root', 101 group='root', 102 isolation={ 103 'namespaces': {'credential': True},
| 91 92 self.load( 93 'ns_inspect', 94 user='root', 95 group='root', 96 isolation={ 97 'namespaces': {'credential': True},
|
104 'uidmap': [{'container': 0, 'host': self.uid, 'size': 1}], 105 'gidmap': [{'container': 0, 'host': self.gid, 'size': 1}],
| 98 'uidmap': [{'container': 0, 'host': os.geteuid(), 'size': 1}], 99 'gidmap': [{'container': 0, 'host': os.getegid(), 'size': 1}],
|
106 }, 107 ) 108 109 obj = self.getjson()['body'] 110
| 100 }, 101 ) 102 103 obj = self.getjson()['body'] 104
|
111 self.assertEqual(obj['UID'], 0, 'uid match uidmap') 112 self.assertEqual(obj['GID'], 0, 'gid match gidmap')
| 105 assert obj['UID'] == 0, 'uid match uidmap' 106 assert obj['GID'] == 0, 'gid match gidmap'
|
113
| 107
|
114 def test_isolation_priv_user(self): 115 if not self.is_su: 116 print('unprivileged tests, skip this') 117 raise unittest.SkipTest()
| 108 def test_isolation_priv_user(self, is_su): 109 if not is_su: 110 pytest.skip('unprivileged tests, skip this')
|
118 119 self.load('ns_inspect') 120 121 nobody_uid, nogroup_gid, nogroup = self.unpriv_creds() 122 123 obj = self.getjson()['body'] 124
| 111 112 self.load('ns_inspect') 113 114 nobody_uid, nogroup_gid, nogroup = self.unpriv_creds() 115 116 obj = self.getjson()['body'] 117
|
125 self.assertEqual(obj['UID'], nobody_uid, 'uid match') 126 self.assertEqual(obj['GID'], nogroup_gid, 'gid match')
| 118 assert obj['UID'] == nobody_uid, 'uid match' 119 assert obj['GID'] == nogroup_gid, 'gid match'
|
127 128 self.load('ns_inspect', isolation={'namespaces': {'credential': True}}) 129 130 obj = self.getjson()['body'] 131 132 # privileged unit map app creds in the container by default
| 120 121 self.load('ns_inspect', isolation={'namespaces': {'credential': True}}) 122 123 obj = self.getjson()['body'] 124 125 # privileged unit map app creds in the container by default
|
133 self.assertEqual(obj['UID'], nobody_uid, 'uid nobody') 134 self.assertEqual(obj['GID'], nogroup_gid, 'gid nobody')
| 126 assert obj['UID'] == nobody_uid, 'uid nobody' 127 assert obj['GID'] == nogroup_gid, 'gid nobody'
|
135 136 self.load( 137 'ns_inspect', 138 user='root', 139 isolation={'namespaces': {'credential': True}}, 140 ) 141 142 obj = self.getjson()['body'] 143
| 128 129 self.load( 130 'ns_inspect', 131 user='root', 132 isolation={'namespaces': {'credential': True}}, 133 ) 134 135 obj = self.getjson()['body'] 136
|
144 self.assertEqual(obj['UID'], 0, 'uid nobody user=root') 145 self.assertEqual(obj['GID'], 0, 'gid nobody user=root')
| 137 assert obj['UID'] == 0, 'uid nobody user=root' 138 assert obj['GID'] == 0, 'gid nobody user=root'
|
146 147 self.load( 148 'ns_inspect', 149 user='root', 150 group=nogroup, 151 isolation={'namespaces': {'credential': True}}, 152 ) 153 154 obj = self.getjson()['body'] 155
| 139 140 self.load( 141 'ns_inspect', 142 user='root', 143 group=nogroup, 144 isolation={'namespaces': {'credential': True}}, 145 ) 146 147 obj = self.getjson()['body'] 148
|
156 self.assertEqual(obj['UID'], 0, 'uid match user=root group=nogroup') 157 self.assertEqual( 158 obj['GID'], nogroup_gid, 'gid match user=root group=nogroup' 159 )
| 149 assert obj['UID'] == 0, 'uid match user=root group=nogroup' 150 assert obj['GID'] == nogroup_gid, 'gid match user=root group=nogroup'
|
160 161 self.load( 162 'ns_inspect', 163 user='root', 164 group='root', 165 isolation={ 166 'namespaces': {'credential': True}, 167 'uidmap': [{'container': 0, 'host': 0, 'size': 1}], 168 'gidmap': [{'container': 0, 'host': 0, 'size': 1}], 169 }, 170 ) 171 172 obj = self.getjson()['body'] 173
| 151 152 self.load( 153 'ns_inspect', 154 user='root', 155 group='root', 156 isolation={ 157 'namespaces': {'credential': True}, 158 'uidmap': [{'container': 0, 'host': 0, 'size': 1}], 159 'gidmap': [{'container': 0, 'host': 0, 'size': 1}], 160 }, 161 ) 162 163 obj = self.getjson()['body'] 164
|
174 self.assertEqual(obj['UID'], 0, 'uid match uidmap user=root') 175 self.assertEqual(obj['GID'], 0, 'gid match gidmap user=root')
| 165 assert obj['UID'] == 0, 'uid match uidmap user=root' 166 assert obj['GID'] == 0, 'gid match gidmap user=root'
|
176 177 # map 65535 uids 178 self.load( 179 'ns_inspect', 180 user='nobody', 181 isolation={ 182 'namespaces': {'credential': True}, 183 'uidmap': [ 184 {'container': 0, 'host': 0, 'size': nobody_uid + 1} 185 ], 186 }, 187 ) 188 189 obj = self.getjson()['body'] 190
| 167 168 # map 65535 uids 169 self.load( 170 'ns_inspect', 171 user='nobody', 172 isolation={ 173 'namespaces': {'credential': True}, 174 'uidmap': [ 175 {'container': 0, 'host': 0, 'size': nobody_uid + 1} 176 ], 177 }, 178 ) 179 180 obj = self.getjson()['body'] 181
|
191 self.assertEqual( 192 obj['UID'], nobody_uid, 'uid match uidmap user=nobody' 193 ) 194 self.assertEqual( 195 obj['GID'], nogroup_gid, 'gid match uidmap user=nobody' 196 )
| 182 assert obj['UID'] == nobody_uid, 'uid match uidmap user=nobody' 183 assert obj['GID'] == nogroup_gid, 'gid match uidmap user=nobody'
|
197 198 def test_isolation_mnt(self): 199 if not self.isolation_key('mnt'):
| 184 185 def test_isolation_mnt(self): 186 if not self.isolation_key('mnt'):
|
200 print('mnt namespace is not supported') 201 raise unittest.SkipTest()
| 187 pytest.skip('mnt namespace is not supported')
|
202 203 if not self.isolation_key('unprivileged_userns_clone'):
| 188 189 if not self.isolation_key('unprivileged_userns_clone'):
|
204 print('unprivileged clone is not available') 205 raise unittest.SkipTest()
| 190 pytest.skip('unprivileged clone is not available')
|
206 207 self.load( 208 'ns_inspect', 209 isolation={'namespaces': {'mount': True, 'credential': True}}, 210 ) 211 212 obj = self.getjson()['body'] 213 214 # all but user and mnt 215 allns = list(self.available['features']['isolation'].keys()) 216 allns.remove('user') 217 allns.remove('mnt') 218 219 for ns in allns: 220 if ns.upper() in obj['NS']:
| 191 192 self.load( 193 'ns_inspect', 194 isolation={'namespaces': {'mount': True, 'credential': True}}, 195 ) 196 197 obj = self.getjson()['body'] 198 199 # all but user and mnt 200 allns = list(self.available['features']['isolation'].keys()) 201 allns.remove('user') 202 allns.remove('mnt') 203 204 for ns in allns: 205 if ns.upper() in obj['NS']:
|
221 self.assertEqual( 222 obj['NS'][ns.upper()], 223 self.available['features']['isolation'][ns], 224 '%s match' % ns, 225 )
| 206 assert ( 207 obj['NS'][ns.upper()] 208 == self.available['features']['isolation'][ns] 209 ), ('%s match' % ns)
|
226
| 210
|
227 self.assertNotEqual( 228 obj['NS']['MNT'], self.isolation.getns('mnt'), 'mnt set' 229 ) 230 self.assertNotEqual( 231 obj['NS']['USER'], self.isolation.getns('user'), 'user set' 232 )
| 211 assert obj['NS']['MNT'] != self.isolation.getns('mnt'), 'mnt set' 212 assert obj['NS']['USER'] != self.isolation.getns('user'), 'user set'
|
233
| 213
|
234 def test_isolation_pid(self):
| 214 def test_isolation_pid(self, is_su):
|
235 if not self.isolation_key('pid'):
| 215 if not self.isolation_key('pid'):
|
236 print('pid namespace is not supported') 237 raise unittest.SkipTest()
| 216 pytest.skip('pid namespace is not supported')
|
238
| 217
|
239 if not (self.is_su or self.isolation_key('unprivileged_userns_clone')): 240 print('requires root or unprivileged_userns_clone') 241 raise unittest.SkipTest()
| 218 if not (is_su or self.isolation_key('unprivileged_userns_clone')): 219 pytest.skip('requires root or unprivileged_userns_clone')
|
242 243 self.load( 244 'ns_inspect', 245 isolation={'namespaces': {'pid': True, 'credential': True}}, 246 ) 247 248 obj = self.getjson()['body'] 249
| 220 221 self.load( 222 'ns_inspect', 223 isolation={'namespaces': {'pid': True, 'credential': True}}, 224 ) 225 226 obj = self.getjson()['body'] 227
|
250 self.assertEqual(obj['PID'], 1, 'pid of container is 1')
| 228 assert obj['PID'] == 1, 'pid of container is 1'
|
251 252 def test_isolation_namespace_false(self): 253 self.load('ns_inspect') 254 allns = list(self.available['features']['isolation'].keys()) 255 256 remove_list = ['unprivileged_userns_clone', 'ipc', 'cgroup'] 257 allns = [ns for ns in allns if ns not in remove_list] 258 259 namespaces = {} 260 for ns in allns: 261 if ns == 'user': 262 namespaces['credential'] = False 263 elif ns == 'mnt': 264 namespaces['mount'] = False 265 elif ns == 'net': 266 namespaces['network'] = False 267 elif ns == 'uts': 268 namespaces['uname'] = False 269 else: 270 namespaces[ns] = False 271 272 self.load('ns_inspect', isolation={'namespaces': namespaces}) 273 274 obj = self.getjson()['body'] 275 276 for ns in allns: 277 if ns.upper() in obj['NS']:
| 229 230 def test_isolation_namespace_false(self): 231 self.load('ns_inspect') 232 allns = list(self.available['features']['isolation'].keys()) 233 234 remove_list = ['unprivileged_userns_clone', 'ipc', 'cgroup'] 235 allns = [ns for ns in allns if ns not in remove_list] 236 237 namespaces = {} 238 for ns in allns: 239 if ns == 'user': 240 namespaces['credential'] = False 241 elif ns == 'mnt': 242 namespaces['mount'] = False 243 elif ns == 'net': 244 namespaces['network'] = False 245 elif ns == 'uts': 246 namespaces['uname'] = False 247 else: 248 namespaces[ns] = False 249 250 self.load('ns_inspect', isolation={'namespaces': namespaces}) 251 252 obj = self.getjson()['body'] 253 254 for ns in allns: 255 if ns.upper() in obj['NS']:
|
278 self.assertEqual( 279 obj['NS'][ns.upper()], 280 self.available['features']['isolation'][ns], 281 '%s match' % ns, 282 )
| 256 assert ( 257 obj['NS'][ns.upper()] 258 == self.available['features']['isolation'][ns] 259 ), ('%s match' % ns)
|
283 284 def test_go_isolation_rootfs_container(self): 285 if not self.isolation_key('unprivileged_userns_clone'):
| 260 261 def test_go_isolation_rootfs_container(self): 262 if not self.isolation_key('unprivileged_userns_clone'):
|
286 print('unprivileged clone is not available') 287 raise unittest.SkipTest()
| 263 pytest.skip('unprivileged clone is not available')
|
288 289 if not self.isolation_key('mnt'):
| 264 265 if not self.isolation_key('mnt'):
|
290 print('mnt namespace is not supported') 291 raise unittest.SkipTest()
| 266 pytest.skip('mnt namespace is not supported')
|
292 293 isolation = { 294 'namespaces': {'mount': True, 'credential': True},
| 267 268 isolation = { 269 'namespaces': {'mount': True, 'credential': True},
|
295 'rootfs': self.testdir,
| 270 'rootfs': self.temp_dir,
|
296 } 297 298 self.load('ns_inspect', isolation=isolation) 299 300 obj = self.getjson(url='/?file=/go/app')['body'] 301
| 271 } 272 273 self.load('ns_inspect', isolation=isolation) 274 275 obj = self.getjson(url='/?file=/go/app')['body'] 276
|
302 self.assertEqual(obj['FileExists'], True, 'app relative to rootfs')
| 277 assert obj['FileExists'] == True, 'app relative to rootfs'
|
303 304 obj = self.getjson(url='/?file=/bin/sh')['body']
| 278 279 obj = self.getjson(url='/?file=/bin/sh')['body']
|
305 self.assertEqual(obj['FileExists'], False, 'file should not exists')
| 280 assert obj['FileExists'] == False, 'file should not exists'
|
306
| 281
|
307 def test_go_isolation_rootfs_container_priv(self): 308 if not self.is_su: 309 print("requires root") 310 raise unittest.SkipTest()
| 282 def test_go_isolation_rootfs_container_priv(self, is_su): 283 if not is_su: 284 pytest.skip('requires root')
|
311 312 if not self.isolation_key('mnt'):
| 285 286 if not self.isolation_key('mnt'):
|
313 print('mnt namespace is not supported') 314 raise unittest.SkipTest()
| 287 pytest.skip('mnt namespace is not supported')
|
315 316 isolation = { 317 'namespaces': {'mount': True},
| 288 289 isolation = { 290 'namespaces': {'mount': True},
|
318 'rootfs': self.testdir,
| 291 'rootfs': self.temp_dir,
|
319 } 320 321 self.load('ns_inspect', isolation=isolation) 322 323 obj = self.getjson(url='/?file=/go/app')['body'] 324
| 292 } 293 294 self.load('ns_inspect', isolation=isolation) 295 296 obj = self.getjson(url='/?file=/go/app')['body'] 297
|
325 self.assertEqual(obj['FileExists'], True, 'app relative to rootfs')
| 298 assert obj['FileExists'] == True, 'app relative to rootfs'
|
326 327 obj = self.getjson(url='/?file=/bin/sh')['body']
| 299 300 obj = self.getjson(url='/?file=/bin/sh')['body']
|
328 self.assertEqual(obj['FileExists'], False, 'file should not exists')
| 301 assert obj['FileExists'] == False, 'file should not exists'
|
329 330 def test_go_isolation_rootfs_default_tmpfs(self): 331 if not self.isolation_key('unprivileged_userns_clone'):
| 302 303 def test_go_isolation_rootfs_default_tmpfs(self): 304 if not self.isolation_key('unprivileged_userns_clone'):
|
332 print('unprivileged clone is not available') 333 raise unittest.SkipTest()
| 305 pytest.skip('unprivileged clone is not available')
|
334 335 if not self.isolation_key('mnt'):
| 306 307 if not self.isolation_key('mnt'):
|
336 print('mnt namespace is not supported') 337 raise unittest.SkipTest()
| 308 pytest.skip('mnt namespace is not supported')
|
338 339 isolation = { 340 'namespaces': {'mount': True, 'credential': True},
| 309 310 isolation = { 311 'namespaces': {'mount': True, 'credential': True},
|
341 'rootfs': self.testdir,
| 312 'rootfs': self.temp_dir,
|
342 } 343 344 self.load('ns_inspect', isolation=isolation) 345 346 obj = self.getjson(url='/?file=/tmp')['body'] 347
| 313 } 314 315 self.load('ns_inspect', isolation=isolation) 316 317 obj = self.getjson(url='/?file=/tmp')['body'] 318
|
348 self.assertEqual(obj['FileExists'], True, 'app has /tmp') 349 350 351if __name__ == '__main__': 352 TestGoIsolation.main()
| 319 assert obj['FileExists'] == True, 'app has /tmp'
|
| |