xref: /unit/test/test_go_isolation.py (revision 1999:00d43b03d82f)
1import grp
2import os
3import pwd
4
5import pytest
6from unit.applications.lang.go import TestApplicationGo
7from unit.option import option
8from unit.utils import getns
9
10
11class TestGoIsolation(TestApplicationGo):
12    prerequisites = {'modules': {'go': 'any'}, 'features': ['isolation']}
13
14    @pytest.fixture(autouse=True)
15    def setup_method_fixture(self, request, skip_alert):
16        skip_alert(r'\[unit\] close\(\d+\) failed: Bad file descriptor')
17
18    def unpriv_creds(self):
19        nobody_uid = pwd.getpwnam('nobody').pw_uid
20
21        try:
22            nogroup_gid = grp.getgrnam('nogroup').gr_gid
23            nogroup = 'nogroup'
24        except KeyError:
25            nogroup_gid = grp.getgrnam('nobody').gr_gid
26            nogroup = 'nobody'
27
28        return (nobody_uid, nogroup_gid, nogroup)
29
30    def isolation_key(self, key):
31        return key in option.available['features']['isolation'].keys()
32
33    def test_isolation_values(self):
34        self.load('ns_inspect')
35
36        obj = self.getjson()['body']
37
38        for ns, ns_value in option.available['features']['isolation'].items():
39            if ns.upper() in obj['NS']:
40                assert obj['NS'][ns.upper()] == ns_value, '%s match' % ns
41
42    def test_isolation_unpriv_user(self, is_su):
43        if not self.isolation_key('unprivileged_userns_clone'):
44            pytest.skip('unprivileged clone is not available')
45
46        if is_su:
47            pytest.skip('privileged tests, skip this')
48
49        self.load('ns_inspect')
50        obj = self.getjson()['body']
51
52        assert obj['UID'] == os.geteuid(), 'uid match'
53        assert obj['GID'] == os.getegid(), 'gid match'
54
55        self.load('ns_inspect', isolation={'namespaces': {'credential': True}})
56
57        obj = self.getjson()['body']
58
59        nobody_uid, nogroup_gid, nogroup = self.unpriv_creds()
60
61        # unprivileged unit map itself to nobody in the container by default
62        assert obj['UID'] == nobody_uid, 'uid of nobody'
63        assert obj['GID'] == nogroup_gid, 'gid of %s' % nogroup
64
65        self.load(
66            'ns_inspect',
67            user='root',
68            isolation={'namespaces': {'credential': True}},
69        )
70
71        obj = self.getjson()['body']
72
73        assert obj['UID'] == 0, 'uid match user=root'
74        assert obj['GID'] == 0, 'gid match user=root'
75
76        self.load(
77            'ns_inspect',
78            user='root',
79            group=nogroup,
80            isolation={'namespaces': {'credential': True}},
81        )
82
83        obj = self.getjson()['body']
84
85        assert obj['UID'] == 0, 'uid match user=root group=nogroup'
86        assert obj['GID'] == nogroup_gid, 'gid match user=root group=nogroup'
87
88        self.load(
89            'ns_inspect',
90            user='root',
91            group='root',
92            isolation={
93                'namespaces': {'credential': True},
94                'uidmap': [{'container': 0, 'host': os.geteuid(), 'size': 1}],
95                'gidmap': [{'container': 0, 'host': os.getegid(), 'size': 1}],
96            },
97        )
98
99        obj = self.getjson()['body']
100
101        assert obj['UID'] == 0, 'uid match uidmap'
102        assert obj['GID'] == 0, 'gid match gidmap'
103
104    def test_isolation_priv_user(self, is_su):
105        if not is_su:
106            pytest.skip('unprivileged tests, skip this')
107
108        self.load('ns_inspect')
109
110        nobody_uid, nogroup_gid, nogroup = self.unpriv_creds()
111
112        obj = self.getjson()['body']
113
114        assert obj['UID'] == nobody_uid, 'uid match'
115        assert obj['GID'] == nogroup_gid, 'gid match'
116
117        self.load('ns_inspect', isolation={'namespaces': {'credential': True}})
118
119        obj = self.getjson()['body']
120
121        # privileged unit map app creds in the container by default
122        assert obj['UID'] == nobody_uid, 'uid nobody'
123        assert obj['GID'] == nogroup_gid, 'gid nobody'
124
125        self.load(
126            'ns_inspect',
127            user='root',
128            isolation={'namespaces': {'credential': True}},
129        )
130
131        obj = self.getjson()['body']
132
133        assert obj['UID'] == 0, 'uid nobody user=root'
134        assert obj['GID'] == 0, 'gid nobody user=root'
135
136        self.load(
137            'ns_inspect',
138            user='root',
139            group=nogroup,
140            isolation={'namespaces': {'credential': True}},
141        )
142
143        obj = self.getjson()['body']
144
145        assert obj['UID'] == 0, 'uid match user=root group=nogroup'
146        assert obj['GID'] == nogroup_gid, 'gid match user=root group=nogroup'
147
148        self.load(
149            'ns_inspect',
150            user='root',
151            group='root',
152            isolation={
153                'namespaces': {'credential': True},
154                'uidmap': [{'container': 0, 'host': 0, 'size': 1}],
155                'gidmap': [{'container': 0, 'host': 0, 'size': 1}],
156            },
157        )
158
159        obj = self.getjson()['body']
160
161        assert obj['UID'] == 0, 'uid match uidmap user=root'
162        assert obj['GID'] == 0, 'gid match gidmap user=root'
163
164        # map 65535 uids
165        self.load(
166            'ns_inspect',
167            user='nobody',
168            isolation={
169                'namespaces': {'credential': True},
170                'uidmap': [
171                    {'container': 0, 'host': 0, 'size': nobody_uid + 1}
172                ],
173            },
174        )
175
176        obj = self.getjson()['body']
177
178        assert obj['UID'] == nobody_uid, 'uid match uidmap user=nobody'
179        assert obj['GID'] == nogroup_gid, 'gid match uidmap user=nobody'
180
181    def test_isolation_mnt(self):
182        if not self.isolation_key('mnt'):
183            pytest.skip('mnt namespace is not supported')
184
185        if not self.isolation_key('unprivileged_userns_clone'):
186            pytest.skip('unprivileged clone is not available')
187
188        self.load(
189            'ns_inspect',
190            isolation={'namespaces': {'mount': True, 'credential': True}},
191        )
192
193        obj = self.getjson()['body']
194
195        # all but user and mnt
196        allns = list(option.available['features']['isolation'].keys())
197        allns.remove('user')
198        allns.remove('mnt')
199
200        for ns in allns:
201            if ns.upper() in obj['NS']:
202                assert (
203                    obj['NS'][ns.upper()]
204                    == option.available['features']['isolation'][ns]
205                ), ('%s match' % ns)
206
207        assert obj['NS']['MNT'] != getns('mnt'), 'mnt set'
208        assert obj['NS']['USER'] != getns('user'), 'user set'
209
210    def test_isolation_pid(self, is_su):
211        if not self.isolation_key('pid'):
212            pytest.skip('pid namespace is not supported')
213
214        if not is_su:
215            if not self.isolation_key('unprivileged_userns_clone'):
216                pytest.skip('unprivileged clone is not available')
217
218            if not self.isolation_key('user'):
219                pytest.skip('user namespace is not supported')
220
221            if not self.isolation_key('mnt'):
222                pytest.skip('mnt namespace is not supported')
223
224        isolation = {'namespaces': {'pid': True}}
225
226        if not is_su:
227            isolation['namespaces']['mount'] = True
228            isolation['namespaces']['credential'] = True
229
230        self.load('ns_inspect', isolation=isolation)
231
232        obj = self.getjson()['body']
233
234        assert obj['PID'] == 2, 'pid of container is 2'
235
236    def test_isolation_namespace_false(self):
237        self.load('ns_inspect')
238        allns = list(option.available['features']['isolation'].keys())
239
240        remove_list = ['unprivileged_userns_clone', 'ipc', 'cgroup']
241        allns = [ns for ns in allns if ns not in remove_list]
242
243        namespaces = {}
244        for ns in allns:
245            if ns == 'user':
246                namespaces['credential'] = False
247            elif ns == 'mnt':
248                namespaces['mount'] = False
249            elif ns == 'net':
250                namespaces['network'] = False
251            elif ns == 'uts':
252                namespaces['uname'] = False
253            else:
254                namespaces[ns] = False
255
256        self.load('ns_inspect', isolation={'namespaces': namespaces})
257
258        obj = self.getjson()['body']
259
260        for ns in allns:
261            if ns.upper() in obj['NS']:
262                assert (
263                    obj['NS'][ns.upper()]
264                    == option.available['features']['isolation'][ns]
265                ), ('%s match' % ns)
266
267    def test_go_isolation_rootfs_container(self, is_su, temp_dir):
268        if not is_su:
269            if not self.isolation_key('unprivileged_userns_clone'):
270                pytest.skip('unprivileged clone is not available')
271
272            if not self.isolation_key('user'):
273                pytest.skip('user namespace is not supported')
274
275            if not self.isolation_key('mnt'):
276                pytest.skip('mnt namespace is not supported')
277
278            if not self.isolation_key('pid'):
279                pytest.skip('pid namespace is not supported')
280
281        isolation = {'rootfs': temp_dir}
282
283        if not is_su:
284            isolation['namespaces'] = {
285                'mount': True,
286                'credential': True,
287                'pid': True,
288            }
289
290        self.load('ns_inspect', isolation=isolation)
291
292        obj = self.getjson(url='/?file=/go/app')['body']
293
294        assert obj['FileExists'] == True, 'app relative to rootfs'
295
296        obj = self.getjson(url='/?file=/bin/sh')['body']
297        assert obj['FileExists'] == False, 'file should not exists'
298
299    def test_go_isolation_rootfs_container_priv(self, is_su, temp_dir):
300        if not is_su:
301            pytest.skip('requires root')
302
303        if not self.isolation_key('mnt'):
304            pytest.skip('mnt namespace is not supported')
305
306        isolation = {
307            'namespaces': {'mount': True},
308            'rootfs': temp_dir,
309        }
310
311        self.load('ns_inspect', isolation=isolation)
312
313        obj = self.getjson(url='/?file=/go/app')['body']
314
315        assert obj['FileExists'] == True, 'app relative to rootfs'
316
317        obj = self.getjson(url='/?file=/bin/sh')['body']
318        assert obj['FileExists'] == False, 'file should not exists'
319
320    def test_go_isolation_rootfs_automount_tmpfs(self, is_su, temp_dir):
321        try:
322            open("/proc/self/mountinfo")
323        except:
324            pytest.skip('The system lacks /proc/self/mountinfo file')
325
326        if not is_su:
327            if not self.isolation_key('unprivileged_userns_clone'):
328                pytest.skip('unprivileged clone is not available')
329
330            if not self.isolation_key('user'):
331                pytest.skip('user namespace is not supported')
332
333            if not self.isolation_key('mnt'):
334                pytest.skip('mnt namespace is not supported')
335
336            if not self.isolation_key('pid'):
337                pytest.skip('pid namespace is not supported')
338
339        isolation = {'rootfs': temp_dir}
340
341        if not is_su:
342            isolation['namespaces'] = {
343                'mount': True,
344                'credential': True,
345                'pid': True,
346            }
347
348        isolation['automount'] = {'tmpfs': False}
349
350        self.load('ns_inspect', isolation=isolation)
351
352        obj = self.getjson(url='/?mounts=true')['body']
353
354        assert (
355            "/ /tmp" not in obj['Mounts'] and "tmpfs" not in obj['Mounts']
356        ), 'app has no /tmp mounted'
357
358        isolation['automount'] = {'tmpfs': True}
359
360        self.load('ns_inspect', isolation=isolation)
361
362        obj = self.getjson(url='/?mounts=true')['body']
363
364        assert (
365            "/ /tmp" in obj['Mounts'] and "tmpfs" in obj['Mounts']
366        ), 'app has /tmp mounted on /'
367