1import grp 2import os 3import pwd 4 5import pytest 6 7from unit.applications.lang.go import ApplicationGo 8from unit.option import option 9from unit.utils import getns 10 11prerequisites = {'modules': {'go': 'any'}, 'features': {'isolation': True}} 12 13client = ApplicationGo() 14 15 16def unpriv_creds(): 17 nobody_uid = pwd.getpwnam('nobody').pw_uid 18 19 try: 20 nogroup_gid = grp.getgrnam('nogroup').gr_gid 21 nogroup = 'nogroup' 22 except KeyError: 23 nogroup_gid = grp.getgrnam('nobody').gr_gid 24 nogroup = 'nobody' 25 26 return (nobody_uid, nogroup_gid, nogroup) 27 28 29def test_isolation_values(): 30 client.load('ns_inspect') 31 32 obj = client.getjson()['body'] 33 34 for ns, ns_value in option.available['features']['isolation'].items(): 35 if ns.upper() in obj['NS']: 36 assert obj['NS'][ns.upper()] == ns_value, f'{ns} match' 37 38 39def test_isolation_unpriv_user(require): 40 require( 41 { 42 'privileged_user': False, 43 'features': {'isolation': ['unprivileged_userns_clone']}, 44 } 45 ) 46 47 client.load('ns_inspect') 48 obj = client.getjson()['body'] 49 50 assert obj['UID'] == os.geteuid(), 'uid match' 51 assert obj['GID'] == os.getegid(), 'gid match' 52 53 client.load('ns_inspect', isolation={'namespaces': {'credential': True}}) 54 55 obj = client.getjson()['body'] 56 57 nobody_uid, nogroup_gid, nogroup = unpriv_creds() 58 59 # unprivileged unit map itself to nobody in the container by default 60 assert obj['UID'] == nobody_uid, 'uid of nobody' 61 assert obj['GID'] == nogroup_gid, f'gid of {nogroup}' 62 63 client.load( 64 'ns_inspect', 65 user='root', 66 isolation={'namespaces': {'credential': True}}, 67 ) 68 69 obj = client.getjson()['body'] 70 71 assert obj['UID'] == 0, 'uid match user=root' 72 assert obj['GID'] == 0, 'gid match user=root' 73 74 client.load( 75 'ns_inspect', 76 user='root', 77 group=nogroup, 78 isolation={'namespaces': {'credential': True}}, 79 ) 80 81 obj = client.getjson()['body'] 82 83 assert obj['UID'] == 0, 'uid match user=root group=nogroup' 84 assert obj['GID'] == nogroup_gid, 'gid match user=root group=nogroup' 85 86 client.load( 87 'ns_inspect', 88 user='root', 89 group='root', 90 isolation={ 91 'namespaces': {'credential': True}, 92 'uidmap': [{'container': 0, 'host': os.geteuid(), 'size': 1}], 93 'gidmap': [{'container': 0, 'host': os.getegid(), 'size': 1}], 94 }, 95 ) 96 97 obj = client.getjson()['body'] 98 99 assert obj['UID'] == 0, 'uid match uidmap' 100 assert obj['GID'] == 0, 'gid match gidmap' 101 102 103def test_isolation_priv_user(require): 104 require({'privileged_user': True}) 105 106 client.load('ns_inspect') 107 108 nobody_uid, nogroup_gid, nogroup = unpriv_creds() 109 110 obj = client.getjson()['body'] 111 112 assert obj['UID'] == nobody_uid, 'uid match' 113 assert obj['GID'] == nogroup_gid, 'gid match' 114 115 client.load('ns_inspect', isolation={'namespaces': {'credential': True}}) 116 117 obj = client.getjson()['body'] 118 119 # privileged unit map app creds in the container by default 120 assert obj['UID'] == nobody_uid, 'uid nobody' 121 assert obj['GID'] == nogroup_gid, 'gid nobody' 122 123 client.load( 124 'ns_inspect', 125 user='root', 126 isolation={'namespaces': {'credential': True}}, 127 ) 128 129 obj = client.getjson()['body'] 130 131 assert obj['UID'] == 0, 'uid nobody user=root' 132 assert obj['GID'] == 0, 'gid nobody user=root' 133 134 client.load( 135 'ns_inspect', 136 user='root', 137 group=nogroup, 138 isolation={'namespaces': {'credential': True}}, 139 ) 140 141 obj = client.getjson()['body'] 142 143 assert obj['UID'] == 0, 'uid match user=root group=nogroup' 144 assert obj['GID'] == nogroup_gid, 'gid match user=root group=nogroup' 145 146 client.load( 147 'ns_inspect', 148 user='root', 149 group='root', 150 isolation={ 151 'namespaces': {'credential': True}, 152 'uidmap': [{'container': 0, 'host': 0, 'size': 1}], 153 'gidmap': [{'container': 0, 'host': 0, 'size': 1}], 154 }, 155 ) 156 157 obj = client.getjson()['body'] 158 159 assert obj['UID'] == 0, 'uid match uidmap user=root' 160 assert obj['GID'] == 0, 'gid match gidmap user=root' 161 162 # map 65535 uids 163 client.load( 164 'ns_inspect', 165 user='nobody', 166 isolation={ 167 'namespaces': {'credential': True}, 168 'uidmap': [{'container': 0, 'host': 0, 'size': nobody_uid + 1}], 169 }, 170 ) 171 172 obj = client.getjson()['body'] 173 174 assert obj['UID'] == nobody_uid, 'uid match uidmap user=nobody' 175 assert obj['GID'] == nogroup_gid, 'gid match uidmap user=nobody' 176 177 178def test_isolation_mnt(require): 179 require( 180 { 181 'features': {'isolation': ['unprivileged_userns_clone', 'mnt']}, 182 } 183 ) 184 185 client.load( 186 'ns_inspect', 187 isolation={'namespaces': {'mount': True, 'credential': True}}, 188 ) 189 190 obj = client.getjson()['body'] 191 192 # all but user and mnt 193 allns = list(option.available['features']['isolation'].keys()) 194 allns.remove('user') 195 allns.remove('mnt') 196 197 for ns in allns: 198 if ns.upper() in obj['NS']: 199 assert ( 200 obj['NS'][ns.upper()] 201 == option.available['features']['isolation'][ns] 202 ), f'{ns} match' 203 204 assert obj['NS']['MNT'] != getns('mnt'), 'mnt set' 205 assert obj['NS']['USER'] != getns('user'), 'user set' 206 207 208def test_isolation_pid(is_su, require): 209 require({'features': {'isolation': ['pid']}}) 210 211 if not is_su: 212 require( 213 { 214 'features': { 215 'isolation': [ 216 'unprivileged_userns_clone', 217 'user', 218 'mnt', 219 ] 220 } 221 } 222 ) 223 224 isolation = {'namespaces': {'pid': True}} 225 226 if not is_su: 227 isolation['namespaces']['mount'] = True 228 isolation['namespaces']['credential'] = True 229 230 client.load('ns_inspect', isolation=isolation) 231 232 obj = client.getjson()['body'] 233 234 assert obj['PID'] == 2, 'pid of container is 2' 235 236 237def test_isolation_namespace_false(): 238 client.load('ns_inspect') 239 allns = list(option.available['features']['isolation'].keys()) 240 241 remove_list = ['unprivileged_userns_clone', 'ipc', 'cgroup'] 242 allns = [ns for ns in allns if ns not in remove_list] 243 244 namespaces = {} 245 for ns in allns: 246 if ns == 'user': 247 namespaces['credential'] = False 248 elif ns == 'mnt': 249 namespaces['mount'] = False 250 elif ns == 'net': 251 namespaces['network'] = False 252 elif ns == 'uts': 253 namespaces['uname'] = False 254 else: 255 namespaces[ns] = False 256 257 client.load('ns_inspect', isolation={'namespaces': namespaces}) 258 259 obj = client.getjson()['body'] 260 261 for ns in allns: 262 if ns.upper() in obj['NS']: 263 assert ( 264 obj['NS'][ns.upper()] 265 == option.available['features']['isolation'][ns] 266 ), f'{ns} match' 267 268 269def test_go_isolation_rootfs_container(is_su, require, temp_dir): 270 if not is_su: 271 require( 272 { 273 'features': { 274 'isolation': [ 275 'unprivileged_userns_clone', 276 'user', 277 'mnt', 278 'pid', 279 ] 280 } 281 } 282 ) 283 284 isolation = {'rootfs': temp_dir} 285 286 if not is_su: 287 isolation['namespaces'] = { 288 'mount': True, 289 'credential': True, 290 'pid': True, 291 } 292 293 client.load('ns_inspect', isolation=isolation) 294 295 obj = client.getjson(url='/?file=/go/app')['body'] 296 297 assert obj['FileExists'], 'app relative to rootfs' 298 299 obj = client.getjson(url='/?file=/bin/sh')['body'] 300 assert not obj['FileExists'], 'file should not exists' 301 302 303def test_go_isolation_rootfs_container_priv(require, temp_dir): 304 require({'privileged_user': True, 'features': {'isolation': ['mnt']}}) 305 306 isolation = { 307 'namespaces': {'mount': True}, 308 'rootfs': temp_dir, 309 } 310 311 client.load('ns_inspect', isolation=isolation) 312 313 obj = client.getjson(url='/?file=/go/app')['body'] 314 315 assert obj['FileExists'], 'app relative to rootfs' 316 317 obj = client.getjson(url='/?file=/bin/sh')['body'] 318 assert not obj['FileExists'], 'file should not exists' 319 320 321def test_go_isolation_rootfs_automount_tmpfs(is_su, require, temp_dir): 322 try: 323 open("/proc/self/mountinfo", encoding='utf-8') 324 except: 325 pytest.skip('The system lacks /proc/self/mountinfo file') 326 327 if not is_su: 328 require( 329 { 330 'features': { 331 'isolation': [ 332 'unprivileged_userns_clone', 333 'user', 334 'mnt', 335 'pid', 336 ] 337 } 338 } 339 ) 340 341 isolation = {'rootfs': temp_dir} 342 343 if not is_su: 344 isolation['namespaces'] = { 345 'mount': True, 346 'credential': True, 347 'pid': True, 348 } 349 350 isolation['automount'] = {'tmpfs': False} 351 352 client.load('ns_inspect', isolation=isolation) 353 354 obj = client.getjson(url='/?mounts=true')['body'] 355 356 assert ( 357 "/ /tmp" not in obj['Mounts'] and "tmpfs" not in obj['Mounts'] 358 ), 'app has no /tmp mounted' 359 360 isolation['automount'] = {'tmpfs': True} 361 362 client.load('ns_inspect', isolation=isolation) 363 364 obj = client.getjson(url='/?mounts=true')['body'] 365 366 assert ( 367 "/ /tmp" in obj['Mounts'] and "tmpfs" in obj['Mounts'] 368 ), 'app has /tmp mounted on /' 369