xref: /unit/test/test_go_isolation.py (revision 1848:4bd548074e2c)
1import grp
2import os
3import pwd
4
5import pytest
6
7from unit.applications.lang.go import TestApplicationGo
8from unit.option import option
9from unit.utils import getns
10
11
12class TestGoIsolation(TestApplicationGo):
13    prerequisites = {'modules': {'go': 'any'}, 'features': ['isolation']}
14
15    def unpriv_creds(self):
16        nobody_uid = pwd.getpwnam('nobody').pw_uid
17
18        try:
19            nogroup_gid = grp.getgrnam('nogroup').gr_gid
20            nogroup = 'nogroup'
21        except KeyError:
22            nogroup_gid = grp.getgrnam('nobody').gr_gid
23            nogroup = 'nobody'
24
25        return (nobody_uid, nogroup_gid, nogroup)
26
27    def isolation_key(self, key):
28        return key in option.available['features']['isolation'].keys()
29
30    def test_isolation_values(self):
31        self.load('ns_inspect')
32
33        obj = self.getjson()['body']
34
35        for ns, ns_value in option.available['features']['isolation'].items():
36            if ns.upper() in obj['NS']:
37                assert obj['NS'][ns.upper()] == ns_value, '%s match' % ns
38
39    def test_isolation_unpriv_user(self, is_su):
40        if not self.isolation_key('unprivileged_userns_clone'):
41            pytest.skip('unprivileged clone is not available')
42
43        if is_su:
44            pytest.skip('privileged tests, skip this')
45
46        self.load('ns_inspect')
47        obj = self.getjson()['body']
48
49        assert obj['UID'] == os.geteuid(), 'uid match'
50        assert obj['GID'] == os.getegid(), 'gid match'
51
52        self.load('ns_inspect', isolation={'namespaces': {'credential': True}})
53
54        obj = self.getjson()['body']
55
56        nobody_uid, nogroup_gid, nogroup = self.unpriv_creds()
57
58        # unprivileged unit map itself to nobody in the container by default
59        assert obj['UID'] == nobody_uid, 'uid of nobody'
60        assert obj['GID'] == nogroup_gid, 'gid of %s' % nogroup
61
62        self.load(
63            'ns_inspect',
64            user='root',
65            isolation={'namespaces': {'credential': True}},
66        )
67
68        obj = self.getjson()['body']
69
70        assert obj['UID'] == 0, 'uid match user=root'
71        assert obj['GID'] == 0, 'gid match user=root'
72
73        self.load(
74            'ns_inspect',
75            user='root',
76            group=nogroup,
77            isolation={'namespaces': {'credential': True}},
78        )
79
80        obj = self.getjson()['body']
81
82        assert obj['UID'] == 0, 'uid match user=root group=nogroup'
83        assert obj['GID'] == nogroup_gid, 'gid match user=root group=nogroup'
84
85        self.load(
86            'ns_inspect',
87            user='root',
88            group='root',
89            isolation={
90                'namespaces': {'credential': True},
91                'uidmap': [{'container': 0, 'host': os.geteuid(), 'size': 1}],
92                'gidmap': [{'container': 0, 'host': os.getegid(), 'size': 1}],
93            },
94        )
95
96        obj = self.getjson()['body']
97
98        assert obj['UID'] == 0, 'uid match uidmap'
99        assert obj['GID'] == 0, 'gid match gidmap'
100
101    def test_isolation_priv_user(self, is_su):
102        if not is_su:
103            pytest.skip('unprivileged tests, skip this')
104
105        self.load('ns_inspect')
106
107        nobody_uid, nogroup_gid, nogroup = self.unpriv_creds()
108
109        obj = self.getjson()['body']
110
111        assert obj['UID'] == nobody_uid, 'uid match'
112        assert obj['GID'] == nogroup_gid, 'gid match'
113
114        self.load('ns_inspect', isolation={'namespaces': {'credential': True}})
115
116        obj = self.getjson()['body']
117
118        # privileged unit map app creds in the container by default
119        assert obj['UID'] == nobody_uid, 'uid nobody'
120        assert obj['GID'] == nogroup_gid, 'gid nobody'
121
122        self.load(
123            'ns_inspect',
124            user='root',
125            isolation={'namespaces': {'credential': True}},
126        )
127
128        obj = self.getjson()['body']
129
130        assert obj['UID'] == 0, 'uid nobody user=root'
131        assert obj['GID'] == 0, 'gid nobody user=root'
132
133        self.load(
134            'ns_inspect',
135            user='root',
136            group=nogroup,
137            isolation={'namespaces': {'credential': True}},
138        )
139
140        obj = self.getjson()['body']
141
142        assert obj['UID'] == 0, 'uid match user=root group=nogroup'
143        assert obj['GID'] == nogroup_gid, 'gid match user=root group=nogroup'
144
145        self.load(
146            'ns_inspect',
147            user='root',
148            group='root',
149            isolation={
150                'namespaces': {'credential': True},
151                'uidmap': [{'container': 0, 'host': 0, 'size': 1}],
152                'gidmap': [{'container': 0, 'host': 0, 'size': 1}],
153            },
154        )
155
156        obj = self.getjson()['body']
157
158        assert obj['UID'] == 0, 'uid match uidmap user=root'
159        assert obj['GID'] == 0, 'gid match gidmap user=root'
160
161        # map 65535 uids
162        self.load(
163            'ns_inspect',
164            user='nobody',
165            isolation={
166                'namespaces': {'credential': True},
167                'uidmap': [
168                    {'container': 0, 'host': 0, 'size': nobody_uid + 1}
169                ],
170            },
171        )
172
173        obj = self.getjson()['body']
174
175        assert obj['UID'] == nobody_uid, 'uid match uidmap user=nobody'
176        assert obj['GID'] == nogroup_gid, 'gid match uidmap user=nobody'
177
178    def test_isolation_mnt(self):
179        if not self.isolation_key('mnt'):
180            pytest.skip('mnt namespace is not supported')
181
182        if not self.isolation_key('unprivileged_userns_clone'):
183            pytest.skip('unprivileged clone is not available')
184
185        self.load(
186            'ns_inspect',
187            isolation={'namespaces': {'mount': True, 'credential': True}},
188        )
189
190        obj = self.getjson()['body']
191
192        # all but user and mnt
193        allns = list(option.available['features']['isolation'].keys())
194        allns.remove('user')
195        allns.remove('mnt')
196
197        for ns in allns:
198            if ns.upper() in obj['NS']:
199                assert (
200                    obj['NS'][ns.upper()]
201                    == option.available['features']['isolation'][ns]
202                ), ('%s match' % ns)
203
204        assert obj['NS']['MNT'] != getns('mnt'), 'mnt set'
205        assert obj['NS']['USER'] != getns('user'), 'user set'
206
207    def test_isolation_pid(self, is_su):
208        if not self.isolation_key('pid'):
209            pytest.skip('pid namespace is not supported')
210
211        if not is_su:
212            if not self.isolation_key('unprivileged_userns_clone'):
213                pytest.skip('unprivileged clone is not available')
214
215            if not self.isolation_key('user'):
216                pytest.skip('user namespace is not supported')
217
218            if not self.isolation_key('mnt'):
219                pytest.skip('mnt namespace is not supported')
220
221        isolation = {'namespaces': {'pid': True}}
222
223        if not is_su:
224            isolation['namespaces']['mount'] = True
225            isolation['namespaces']['credential'] = True
226
227        self.load('ns_inspect', isolation=isolation)
228
229        obj = self.getjson()['body']
230
231        assert obj['PID'] == 1, 'pid of container is 1'
232
233    def test_isolation_namespace_false(self):
234        self.load('ns_inspect')
235        allns = list(option.available['features']['isolation'].keys())
236
237        remove_list = ['unprivileged_userns_clone', 'ipc', 'cgroup']
238        allns = [ns for ns in allns if ns not in remove_list]
239
240        namespaces = {}
241        for ns in allns:
242            if ns == 'user':
243                namespaces['credential'] = False
244            elif ns == 'mnt':
245                namespaces['mount'] = False
246            elif ns == 'net':
247                namespaces['network'] = False
248            elif ns == 'uts':
249                namespaces['uname'] = False
250            else:
251                namespaces[ns] = False
252
253        self.load('ns_inspect', isolation={'namespaces': namespaces})
254
255        obj = self.getjson()['body']
256
257        for ns in allns:
258            if ns.upper() in obj['NS']:
259                assert (
260                    obj['NS'][ns.upper()]
261                    == option.available['features']['isolation'][ns]
262                ), ('%s match' % ns)
263
264    def test_go_isolation_rootfs_container(self, is_su, temp_dir):
265        if not is_su:
266            if not self.isolation_key('unprivileged_userns_clone'):
267                pytest.skip('unprivileged clone is not available')
268
269            if not self.isolation_key('user'):
270                pytest.skip('user namespace is not supported')
271
272            if not self.isolation_key('mnt'):
273                pytest.skip('mnt namespace is not supported')
274
275            if not self.isolation_key('pid'):
276                pytest.skip('pid namespace is not supported')
277
278        isolation = {'rootfs': temp_dir}
279
280        if not is_su:
281            isolation['namespaces'] = {
282                'mount': True,
283                'credential': True,
284                'pid': True,
285            }
286
287        self.load('ns_inspect', isolation=isolation)
288
289        obj = self.getjson(url='/?file=/go/app')['body']
290
291        assert obj['FileExists'] == True, 'app relative to rootfs'
292
293        obj = self.getjson(url='/?file=/bin/sh')['body']
294        assert obj['FileExists'] == False, 'file should not exists'
295
296    def test_go_isolation_rootfs_container_priv(self, is_su, temp_dir):
297        if not is_su:
298            pytest.skip('requires root')
299
300        if not self.isolation_key('mnt'):
301            pytest.skip('mnt namespace is not supported')
302
303        isolation = {
304            'namespaces': {'mount': True},
305            'rootfs': temp_dir,
306        }
307
308        self.load('ns_inspect', isolation=isolation)
309
310        obj = self.getjson(url='/?file=/go/app')['body']
311
312        assert obj['FileExists'] == True, 'app relative to rootfs'
313
314        obj = self.getjson(url='/?file=/bin/sh')['body']
315        assert obj['FileExists'] == False, 'file should not exists'
316
317    def test_go_isolation_rootfs_automount_tmpfs(self, is_su, temp_dir):
318        try:
319            open("/proc/self/mountinfo")
320        except:
321            pytest.skip('The system lacks /proc/self/mountinfo file')
322
323        if not is_su:
324            if not self.isolation_key('unprivileged_userns_clone'):
325                pytest.skip('unprivileged clone is not available')
326
327            if not self.isolation_key('user'):
328                pytest.skip('user namespace is not supported')
329
330            if not self.isolation_key('mnt'):
331                pytest.skip('mnt namespace is not supported')
332
333            if not self.isolation_key('pid'):
334                pytest.skip('pid namespace is not supported')
335
336        isolation = {'rootfs': temp_dir}
337
338        if not is_su:
339            isolation['namespaces'] = {
340                'mount': True,
341                'credential': True,
342                'pid': True,
343            }
344
345        isolation['automount'] = {'tmpfs': False}
346
347        self.load('ns_inspect', isolation=isolation)
348
349        obj = self.getjson(url='/?mounts=true')['body']
350
351        assert (
352            "/ /tmp" not in obj['Mounts'] and "tmpfs" not in obj['Mounts']
353        ), 'app has no /tmp mounted'
354
355        isolation['automount'] = {'tmpfs': True}
356
357        self.load('ns_inspect', isolation=isolation)
358
359        obj = self.getjson(url='/?mounts=true')['body']
360
361        assert (
362            "/ /tmp" in obj['Mounts'] and "tmpfs" in obj['Mounts']
363        ), 'app has /tmp mounted on /'
364