xref: /unit/test/test_go_isolation.py (revision 1971:3410f9d2a662)
1import grp
2import os
3import pwd
4
5import pytest
6from unit.applications.lang.go import TestApplicationGo
7from unit.option import option
8from unit.utils import getns
9
10
11class TestGoIsolation(TestApplicationGo):
12    prerequisites = {'modules': {'go': 'any'}, 'features': ['isolation']}
13
14    def unpriv_creds(self):
15        nobody_uid = pwd.getpwnam('nobody').pw_uid
16
17        try:
18            nogroup_gid = grp.getgrnam('nogroup').gr_gid
19            nogroup = 'nogroup'
20        except KeyError:
21            nogroup_gid = grp.getgrnam('nobody').gr_gid
22            nogroup = 'nobody'
23
24        return (nobody_uid, nogroup_gid, nogroup)
25
26    def isolation_key(self, key):
27        return key in option.available['features']['isolation'].keys()
28
29    def test_isolation_values(self):
30        self.load('ns_inspect')
31
32        obj = self.getjson()['body']
33
34        for ns, ns_value in option.available['features']['isolation'].items():
35            if ns.upper() in obj['NS']:
36                assert obj['NS'][ns.upper()] == ns_value, '%s match' % ns
37
38    def test_isolation_unpriv_user(self, is_su):
39        if not self.isolation_key('unprivileged_userns_clone'):
40            pytest.skip('unprivileged clone is not available')
41
42        if is_su:
43            pytest.skip('privileged tests, skip this')
44
45        self.load('ns_inspect')
46        obj = self.getjson()['body']
47
48        assert obj['UID'] == os.geteuid(), 'uid match'
49        assert obj['GID'] == os.getegid(), 'gid match'
50
51        self.load('ns_inspect', isolation={'namespaces': {'credential': True}})
52
53        obj = self.getjson()['body']
54
55        nobody_uid, nogroup_gid, nogroup = self.unpriv_creds()
56
57        # unprivileged unit map itself to nobody in the container by default
58        assert obj['UID'] == nobody_uid, 'uid of nobody'
59        assert obj['GID'] == nogroup_gid, 'gid of %s' % nogroup
60
61        self.load(
62            'ns_inspect',
63            user='root',
64            isolation={'namespaces': {'credential': True}},
65        )
66
67        obj = self.getjson()['body']
68
69        assert obj['UID'] == 0, 'uid match user=root'
70        assert obj['GID'] == 0, 'gid match user=root'
71
72        self.load(
73            'ns_inspect',
74            user='root',
75            group=nogroup,
76            isolation={'namespaces': {'credential': True}},
77        )
78
79        obj = self.getjson()['body']
80
81        assert obj['UID'] == 0, 'uid match user=root group=nogroup'
82        assert obj['GID'] == nogroup_gid, 'gid match user=root group=nogroup'
83
84        self.load(
85            'ns_inspect',
86            user='root',
87            group='root',
88            isolation={
89                'namespaces': {'credential': True},
90                'uidmap': [{'container': 0, 'host': os.geteuid(), 'size': 1}],
91                'gidmap': [{'container': 0, 'host': os.getegid(), 'size': 1}],
92            },
93        )
94
95        obj = self.getjson()['body']
96
97        assert obj['UID'] == 0, 'uid match uidmap'
98        assert obj['GID'] == 0, 'gid match gidmap'
99
100    def test_isolation_priv_user(self, is_su):
101        if not is_su:
102            pytest.skip('unprivileged tests, skip this')
103
104        self.load('ns_inspect')
105
106        nobody_uid, nogroup_gid, nogroup = self.unpriv_creds()
107
108        obj = self.getjson()['body']
109
110        assert obj['UID'] == nobody_uid, 'uid match'
111        assert obj['GID'] == nogroup_gid, 'gid match'
112
113        self.load('ns_inspect', isolation={'namespaces': {'credential': True}})
114
115        obj = self.getjson()['body']
116
117        # privileged unit map app creds in the container by default
118        assert obj['UID'] == nobody_uid, 'uid nobody'
119        assert obj['GID'] == nogroup_gid, 'gid nobody'
120
121        self.load(
122            'ns_inspect',
123            user='root',
124            isolation={'namespaces': {'credential': True}},
125        )
126
127        obj = self.getjson()['body']
128
129        assert obj['UID'] == 0, 'uid nobody user=root'
130        assert obj['GID'] == 0, 'gid nobody user=root'
131
132        self.load(
133            'ns_inspect',
134            user='root',
135            group=nogroup,
136            isolation={'namespaces': {'credential': True}},
137        )
138
139        obj = self.getjson()['body']
140
141        assert obj['UID'] == 0, 'uid match user=root group=nogroup'
142        assert obj['GID'] == nogroup_gid, 'gid match user=root group=nogroup'
143
144        self.load(
145            'ns_inspect',
146            user='root',
147            group='root',
148            isolation={
149                'namespaces': {'credential': True},
150                'uidmap': [{'container': 0, 'host': 0, 'size': 1}],
151                'gidmap': [{'container': 0, 'host': 0, 'size': 1}],
152            },
153        )
154
155        obj = self.getjson()['body']
156
157        assert obj['UID'] == 0, 'uid match uidmap user=root'
158        assert obj['GID'] == 0, 'gid match gidmap user=root'
159
160        # map 65535 uids
161        self.load(
162            'ns_inspect',
163            user='nobody',
164            isolation={
165                'namespaces': {'credential': True},
166                'uidmap': [
167                    {'container': 0, 'host': 0, 'size': nobody_uid + 1}
168                ],
169            },
170        )
171
172        obj = self.getjson()['body']
173
174        assert obj['UID'] == nobody_uid, 'uid match uidmap user=nobody'
175        assert obj['GID'] == nogroup_gid, 'gid match uidmap user=nobody'
176
177    def test_isolation_mnt(self):
178        if not self.isolation_key('mnt'):
179            pytest.skip('mnt namespace is not supported')
180
181        if not self.isolation_key('unprivileged_userns_clone'):
182            pytest.skip('unprivileged clone is not available')
183
184        self.load(
185            'ns_inspect',
186            isolation={'namespaces': {'mount': True, 'credential': True}},
187        )
188
189        obj = self.getjson()['body']
190
191        # all but user and mnt
192        allns = list(option.available['features']['isolation'].keys())
193        allns.remove('user')
194        allns.remove('mnt')
195
196        for ns in allns:
197            if ns.upper() in obj['NS']:
198                assert (
199                    obj['NS'][ns.upper()]
200                    == option.available['features']['isolation'][ns]
201                ), ('%s match' % ns)
202
203        assert obj['NS']['MNT'] != getns('mnt'), 'mnt set'
204        assert obj['NS']['USER'] != getns('user'), 'user set'
205
206    def test_isolation_pid(self, is_su):
207        if not self.isolation_key('pid'):
208            pytest.skip('pid namespace is not supported')
209
210        if not is_su:
211            if not self.isolation_key('unprivileged_userns_clone'):
212                pytest.skip('unprivileged clone is not available')
213
214            if not self.isolation_key('user'):
215                pytest.skip('user namespace is not supported')
216
217            if not self.isolation_key('mnt'):
218                pytest.skip('mnt namespace is not supported')
219
220        isolation = {'namespaces': {'pid': True}}
221
222        if not is_su:
223            isolation['namespaces']['mount'] = True
224            isolation['namespaces']['credential'] = True
225
226        self.load('ns_inspect', isolation=isolation)
227
228        obj = self.getjson()['body']
229
230        assert obj['PID'] == 1, 'pid of container is 1'
231
232    def test_isolation_namespace_false(self):
233        self.load('ns_inspect')
234        allns = list(option.available['features']['isolation'].keys())
235
236        remove_list = ['unprivileged_userns_clone', 'ipc', 'cgroup']
237        allns = [ns for ns in allns if ns not in remove_list]
238
239        namespaces = {}
240        for ns in allns:
241            if ns == 'user':
242                namespaces['credential'] = False
243            elif ns == 'mnt':
244                namespaces['mount'] = False
245            elif ns == 'net':
246                namespaces['network'] = False
247            elif ns == 'uts':
248                namespaces['uname'] = False
249            else:
250                namespaces[ns] = False
251
252        self.load('ns_inspect', isolation={'namespaces': namespaces})
253
254        obj = self.getjson()['body']
255
256        for ns in allns:
257            if ns.upper() in obj['NS']:
258                assert (
259                    obj['NS'][ns.upper()]
260                    == option.available['features']['isolation'][ns]
261                ), ('%s match' % ns)
262
263    def test_go_isolation_rootfs_container(self, is_su, temp_dir):
264        if not is_su:
265            if not self.isolation_key('unprivileged_userns_clone'):
266                pytest.skip('unprivileged clone is not available')
267
268            if not self.isolation_key('user'):
269                pytest.skip('user namespace is not supported')
270
271            if not self.isolation_key('mnt'):
272                pytest.skip('mnt namespace is not supported')
273
274            if not self.isolation_key('pid'):
275                pytest.skip('pid namespace is not supported')
276
277        isolation = {'rootfs': temp_dir}
278
279        if not is_su:
280            isolation['namespaces'] = {
281                'mount': True,
282                'credential': True,
283                'pid': True,
284            }
285
286        self.load('ns_inspect', isolation=isolation)
287
288        obj = self.getjson(url='/?file=/go/app')['body']
289
290        assert obj['FileExists'] == True, 'app relative to rootfs'
291
292        obj = self.getjson(url='/?file=/bin/sh')['body']
293        assert obj['FileExists'] == False, 'file should not exists'
294
295    def test_go_isolation_rootfs_container_priv(self, is_su, temp_dir):
296        if not is_su:
297            pytest.skip('requires root')
298
299        if not self.isolation_key('mnt'):
300            pytest.skip('mnt namespace is not supported')
301
302        isolation = {
303            'namespaces': {'mount': True},
304            'rootfs': temp_dir,
305        }
306
307        self.load('ns_inspect', isolation=isolation)
308
309        obj = self.getjson(url='/?file=/go/app')['body']
310
311        assert obj['FileExists'] == True, 'app relative to rootfs'
312
313        obj = self.getjson(url='/?file=/bin/sh')['body']
314        assert obj['FileExists'] == False, 'file should not exists'
315
316    def test_go_isolation_rootfs_automount_tmpfs(self, is_su, temp_dir):
317        try:
318            open("/proc/self/mountinfo")
319        except:
320            pytest.skip('The system lacks /proc/self/mountinfo file')
321
322        if not is_su:
323            if not self.isolation_key('unprivileged_userns_clone'):
324                pytest.skip('unprivileged clone is not available')
325
326            if not self.isolation_key('user'):
327                pytest.skip('user namespace is not supported')
328
329            if not self.isolation_key('mnt'):
330                pytest.skip('mnt namespace is not supported')
331
332            if not self.isolation_key('pid'):
333                pytest.skip('pid namespace is not supported')
334
335        isolation = {'rootfs': temp_dir}
336
337        if not is_su:
338            isolation['namespaces'] = {
339                'mount': True,
340                'credential': True,
341                'pid': True,
342            }
343
344        isolation['automount'] = {'tmpfs': False}
345
346        self.load('ns_inspect', isolation=isolation)
347
348        obj = self.getjson(url='/?mounts=true')['body']
349
350        assert (
351            "/ /tmp" not in obj['Mounts'] and "tmpfs" not in obj['Mounts']
352        ), 'app has no /tmp mounted'
353
354        isolation['automount'] = {'tmpfs': True}
355
356        self.load('ns_inspect', isolation=isolation)
357
358        obj = self.getjson(url='/?mounts=true')['body']
359
360        assert (
361            "/ /tmp" in obj['Mounts'] and "tmpfs" in obj['Mounts']
362        ), 'app has /tmp mounted on /'
363