xref: /unit/test/test_go_isolation.py (revision 1673:883f2f79c2f6)
1import grp
2import os
3import pwd
4import shutil
5
6import pytest
7
8from conftest import option
9from conftest import unit_run
10from conftest import unit_stop
11from unit.applications.lang.go import TestApplicationGo
12from unit.feature.isolation import TestFeatureIsolation
13
14class TestGoIsolation(TestApplicationGo):
15    prerequisites = {'modules': {'go': 'any'}, 'features': ['isolation']}
16
17    isolation = TestFeatureIsolation()
18
19    @classmethod
20    def setup_class(cls, complete_check=True):
21        check = super().setup_class(complete_check=False)
22
23        unit = unit_run()
24        option.temp_dir = unit['temp_dir']
25
26        TestFeatureIsolation().check(option.available, unit['temp_dir'])
27
28        assert unit_stop() is None
29        shutil.rmtree(unit['temp_dir'])
30
31        return check if not complete_check else check()
32
33    def unpriv_creds(self):
34        nobody_uid = pwd.getpwnam('nobody').pw_uid
35
36        try:
37            nogroup_gid = grp.getgrnam('nogroup').gr_gid
38            nogroup = 'nogroup'
39        except:
40            nogroup_gid = grp.getgrnam('nobody').gr_gid
41            nogroup = 'nobody'
42
43        return (nobody_uid, nogroup_gid, nogroup)
44
45    def isolation_key(self, key):
46        return key in option.available['features']['isolation'].keys()
47
48    def test_isolation_values(self):
49        self.load('ns_inspect')
50
51        obj = self.getjson()['body']
52
53        for ns, ns_value in option.available['features']['isolation'].items():
54            if ns.upper() in obj['NS']:
55                assert obj['NS'][ns.upper()] == ns_value, '%s match' % ns
56
57    def test_isolation_unpriv_user(self, is_su):
58        if not self.isolation_key('unprivileged_userns_clone'):
59            pytest.skip('unprivileged clone is not available')
60
61        if is_su:
62            pytest.skip('privileged tests, skip this')
63
64        self.load('ns_inspect')
65        obj = self.getjson()['body']
66
67        assert obj['UID'] == os.geteuid(), 'uid match'
68        assert obj['GID'] == os.getegid(), 'gid match'
69
70        self.load('ns_inspect', isolation={'namespaces': {'credential': True}})
71
72        obj = self.getjson()['body']
73
74        nobody_uid, nogroup_gid, nogroup = self.unpriv_creds()
75
76        # unprivileged unit map itself to nobody in the container by default
77        assert obj['UID'] == nobody_uid, 'uid of nobody'
78        assert obj['GID'] == nogroup_gid, 'gid of %s' % nogroup
79
80        self.load(
81            'ns_inspect',
82            user='root',
83            isolation={'namespaces': {'credential': True}},
84        )
85
86        obj = self.getjson()['body']
87
88        assert obj['UID'] == 0, 'uid match user=root'
89        assert obj['GID'] == 0, 'gid match user=root'
90
91        self.load(
92            'ns_inspect',
93            user='root',
94            group=nogroup,
95            isolation={'namespaces': {'credential': True}},
96        )
97
98        obj = self.getjson()['body']
99
100        assert obj['UID'] == 0, 'uid match user=root group=nogroup'
101        assert obj['GID'] == nogroup_gid, 'gid match user=root group=nogroup'
102
103        self.load(
104            'ns_inspect',
105            user='root',
106            group='root',
107            isolation={
108                'namespaces': {'credential': True},
109                'uidmap': [{'container': 0, 'host': os.geteuid(), 'size': 1}],
110                'gidmap': [{'container': 0, 'host': os.getegid(), 'size': 1}],
111            },
112        )
113
114        obj = self.getjson()['body']
115
116        assert obj['UID'] == 0, 'uid match uidmap'
117        assert obj['GID'] == 0, 'gid match gidmap'
118
119    def test_isolation_priv_user(self, is_su):
120        if not is_su:
121            pytest.skip('unprivileged tests, skip this')
122
123        self.load('ns_inspect')
124
125        nobody_uid, nogroup_gid, nogroup = self.unpriv_creds()
126
127        obj = self.getjson()['body']
128
129        assert obj['UID'] == nobody_uid, 'uid match'
130        assert obj['GID'] == nogroup_gid, 'gid match'
131
132        self.load('ns_inspect', isolation={'namespaces': {'credential': True}})
133
134        obj = self.getjson()['body']
135
136        # privileged unit map app creds in the container by default
137        assert obj['UID'] == nobody_uid, 'uid nobody'
138        assert obj['GID'] == nogroup_gid, 'gid nobody'
139
140        self.load(
141            'ns_inspect',
142            user='root',
143            isolation={'namespaces': {'credential': True}},
144        )
145
146        obj = self.getjson()['body']
147
148        assert obj['UID'] == 0, 'uid nobody user=root'
149        assert obj['GID'] == 0, 'gid nobody user=root'
150
151        self.load(
152            'ns_inspect',
153            user='root',
154            group=nogroup,
155            isolation={'namespaces': {'credential': True}},
156        )
157
158        obj = self.getjson()['body']
159
160        assert obj['UID'] == 0, 'uid match user=root group=nogroup'
161        assert obj['GID'] == nogroup_gid, 'gid match user=root group=nogroup'
162
163        self.load(
164            'ns_inspect',
165            user='root',
166            group='root',
167            isolation={
168                'namespaces': {'credential': True},
169                'uidmap': [{'container': 0, 'host': 0, 'size': 1}],
170                'gidmap': [{'container': 0, 'host': 0, 'size': 1}],
171            },
172        )
173
174        obj = self.getjson()['body']
175
176        assert obj['UID'] == 0, 'uid match uidmap user=root'
177        assert obj['GID'] == 0, 'gid match gidmap user=root'
178
179        # map 65535 uids
180        self.load(
181            'ns_inspect',
182            user='nobody',
183            isolation={
184                'namespaces': {'credential': True},
185                'uidmap': [
186                    {'container': 0, 'host': 0, 'size': nobody_uid + 1}
187                ],
188            },
189        )
190
191        obj = self.getjson()['body']
192
193        assert obj['UID'] == nobody_uid, 'uid match uidmap user=nobody'
194        assert obj['GID'] == nogroup_gid, 'gid match uidmap user=nobody'
195
196    def test_isolation_mnt(self):
197        if not self.isolation_key('mnt'):
198            pytest.skip('mnt namespace is not supported')
199
200        if not self.isolation_key('unprivileged_userns_clone'):
201            pytest.skip('unprivileged clone is not available')
202
203        self.load(
204            'ns_inspect',
205            isolation={'namespaces': {'mount': True, 'credential': True}},
206        )
207
208        obj = self.getjson()['body']
209
210        # all but user and mnt
211        allns = list(option.available['features']['isolation'].keys())
212        allns.remove('user')
213        allns.remove('mnt')
214
215        for ns in allns:
216            if ns.upper() in obj['NS']:
217                assert (
218                    obj['NS'][ns.upper()]
219                    == option.available['features']['isolation'][ns]
220                ), ('%s match' % ns)
221
222        assert obj['NS']['MNT'] != self.isolation.getns('mnt'), 'mnt set'
223        assert obj['NS']['USER'] != self.isolation.getns('user'), 'user set'
224
225    def test_isolation_pid(self, is_su):
226        if not self.isolation_key('pid'):
227            pytest.skip('pid namespace is not supported')
228
229        if not is_su:
230            if not self.isolation_key('unprivileged_userns_clone'):
231                pytest.skip('unprivileged clone is not available')
232
233            if not self.isolation_key('user'):
234                pytest.skip('user namespace is not supported')
235
236            if not self.isolation_key('mnt'):
237                pytest.skip('mnt namespace is not supported')
238
239        isolation = {'namespaces': {'pid': True}}
240
241        if not is_su:
242            isolation['namespaces']['mount'] = True
243            isolation['namespaces']['credential'] = True
244
245        self.load('ns_inspect', isolation=isolation)
246
247        obj = self.getjson()['body']
248
249        assert obj['PID'] == 1, 'pid of container is 1'
250
251    def test_isolation_namespace_false(self):
252        self.load('ns_inspect')
253        allns = list(option.available['features']['isolation'].keys())
254
255        remove_list = ['unprivileged_userns_clone', 'ipc', 'cgroup']
256        allns = [ns for ns in allns if ns not in remove_list]
257
258        namespaces = {}
259        for ns in allns:
260            if ns == 'user':
261                namespaces['credential'] = False
262            elif ns == 'mnt':
263                namespaces['mount'] = False
264            elif ns == 'net':
265                namespaces['network'] = False
266            elif ns == 'uts':
267                namespaces['uname'] = False
268            else:
269                namespaces[ns] = False
270
271        self.load('ns_inspect', isolation={'namespaces': namespaces})
272
273        obj = self.getjson()['body']
274
275        for ns in allns:
276            if ns.upper() in obj['NS']:
277                assert (
278                    obj['NS'][ns.upper()]
279                    == option.available['features']['isolation'][ns]
280                ), ('%s match' % ns)
281
282    def test_go_isolation_rootfs_container(self, is_su, temp_dir):
283        if not is_su:
284            if not self.isolation_key('unprivileged_userns_clone'):
285                pytest.skip('unprivileged clone is not available')
286
287            if not self.isolation_key('user'):
288                pytest.skip('user namespace is not supported')
289
290            if not self.isolation_key('mnt'):
291                pytest.skip('mnt namespace is not supported')
292
293            if not self.isolation_key('pid'):
294                pytest.skip('pid namespace is not supported')
295
296        isolation = {'rootfs': temp_dir}
297
298        if not is_su:
299            isolation['namespaces'] = {
300                'mount': True,
301                'credential': True,
302                'pid': True
303            }
304
305        self.load('ns_inspect', isolation=isolation)
306
307        obj = self.getjson(url='/?file=/go/app')['body']
308
309        assert obj['FileExists'] == True, 'app relative to rootfs'
310
311        obj = self.getjson(url='/?file=/bin/sh')['body']
312        assert obj['FileExists'] == False, 'file should not exists'
313
314    def test_go_isolation_rootfs_container_priv(self, is_su, temp_dir):
315        if not is_su:
316            pytest.skip('requires root')
317
318        if not self.isolation_key('mnt'):
319            pytest.skip('mnt namespace is not supported')
320
321        isolation = {
322            'namespaces': {'mount': True},
323            'rootfs': temp_dir,
324        }
325
326        self.load('ns_inspect', isolation=isolation)
327
328        obj = self.getjson(url='/?file=/go/app')['body']
329
330        assert obj['FileExists'] == True, 'app relative to rootfs'
331
332        obj = self.getjson(url='/?file=/bin/sh')['body']
333        assert obj['FileExists'] == False, 'file should not exists'
334
335    def test_go_isolation_rootfs_default_tmpfs(self, is_su, temp_dir):
336        if not is_su:
337            if not self.isolation_key('unprivileged_userns_clone'):
338                pytest.skip('unprivileged clone is not available')
339
340            if not self.isolation_key('user'):
341                pytest.skip('user namespace is not supported')
342
343            if not self.isolation_key('mnt'):
344                pytest.skip('mnt namespace is not supported')
345
346            if not self.isolation_key('pid'):
347                pytest.skip('pid namespace is not supported')
348
349        isolation = {'rootfs': temp_dir}
350
351        if not is_su:
352            isolation['namespaces'] = {
353                'mount': True,
354                'credential': True,
355                'pid': True
356            }
357
358        self.load('ns_inspect', isolation=isolation)
359
360        obj = self.getjson(url='/?file=/tmp')['body']
361
362        assert obj['FileExists'] == True, 'app has /tmp'
363