xref: /unit/test/test_go_isolation.py (revision 1771:94cf6c5fafbd)
1import grp
2import os
3import pwd
4
5import pytest
6from unit.applications.lang.go import TestApplicationGo
7from unit.option import option
8from unit.utils import getns
9
10class TestGoIsolation(TestApplicationGo):
11    prerequisites = {'modules': {'go': 'any'}, 'features': ['isolation']}
12
13    def unpriv_creds(self):
14        nobody_uid = pwd.getpwnam('nobody').pw_uid
15
16        try:
17            nogroup_gid = grp.getgrnam('nogroup').gr_gid
18            nogroup = 'nogroup'
19        except KeyError:
20            nogroup_gid = grp.getgrnam('nobody').gr_gid
21            nogroup = 'nobody'
22
23        return (nobody_uid, nogroup_gid, nogroup)
24
25    def isolation_key(self, key):
26        return key in option.available['features']['isolation'].keys()
27
28    def test_isolation_values(self):
29        self.load('ns_inspect')
30
31        obj = self.getjson()['body']
32
33        for ns, ns_value in option.available['features']['isolation'].items():
34            if ns.upper() in obj['NS']:
35                assert obj['NS'][ns.upper()] == ns_value, '%s match' % ns
36
37    def test_isolation_unpriv_user(self, is_su):
38        if not self.isolation_key('unprivileged_userns_clone'):
39            pytest.skip('unprivileged clone is not available')
40
41        if is_su:
42            pytest.skip('privileged tests, skip this')
43
44        self.load('ns_inspect')
45        obj = self.getjson()['body']
46
47        assert obj['UID'] == os.geteuid(), 'uid match'
48        assert obj['GID'] == os.getegid(), 'gid match'
49
50        self.load('ns_inspect', isolation={'namespaces': {'credential': True}})
51
52        obj = self.getjson()['body']
53
54        nobody_uid, nogroup_gid, nogroup = self.unpriv_creds()
55
56        # unprivileged unit map itself to nobody in the container by default
57        assert obj['UID'] == nobody_uid, 'uid of nobody'
58        assert obj['GID'] == nogroup_gid, 'gid of %s' % nogroup
59
60        self.load(
61            'ns_inspect',
62            user='root',
63            isolation={'namespaces': {'credential': True}},
64        )
65
66        obj = self.getjson()['body']
67
68        assert obj['UID'] == 0, 'uid match user=root'
69        assert obj['GID'] == 0, 'gid match user=root'
70
71        self.load(
72            'ns_inspect',
73            user='root',
74            group=nogroup,
75            isolation={'namespaces': {'credential': True}},
76        )
77
78        obj = self.getjson()['body']
79
80        assert obj['UID'] == 0, 'uid match user=root group=nogroup'
81        assert obj['GID'] == nogroup_gid, 'gid match user=root group=nogroup'
82
83        self.load(
84            'ns_inspect',
85            user='root',
86            group='root',
87            isolation={
88                'namespaces': {'credential': True},
89                'uidmap': [{'container': 0, 'host': os.geteuid(), 'size': 1}],
90                'gidmap': [{'container': 0, 'host': os.getegid(), 'size': 1}],
91            },
92        )
93
94        obj = self.getjson()['body']
95
96        assert obj['UID'] == 0, 'uid match uidmap'
97        assert obj['GID'] == 0, 'gid match gidmap'
98
99    def test_isolation_priv_user(self, is_su):
100        if not is_su:
101            pytest.skip('unprivileged tests, skip this')
102
103        self.load('ns_inspect')
104
105        nobody_uid, nogroup_gid, nogroup = self.unpriv_creds()
106
107        obj = self.getjson()['body']
108
109        assert obj['UID'] == nobody_uid, 'uid match'
110        assert obj['GID'] == nogroup_gid, 'gid match'
111
112        self.load('ns_inspect', isolation={'namespaces': {'credential': True}})
113
114        obj = self.getjson()['body']
115
116        # privileged unit map app creds in the container by default
117        assert obj['UID'] == nobody_uid, 'uid nobody'
118        assert obj['GID'] == nogroup_gid, 'gid nobody'
119
120        self.load(
121            'ns_inspect',
122            user='root',
123            isolation={'namespaces': {'credential': True}},
124        )
125
126        obj = self.getjson()['body']
127
128        assert obj['UID'] == 0, 'uid nobody user=root'
129        assert obj['GID'] == 0, 'gid nobody user=root'
130
131        self.load(
132            'ns_inspect',
133            user='root',
134            group=nogroup,
135            isolation={'namespaces': {'credential': True}},
136        )
137
138        obj = self.getjson()['body']
139
140        assert obj['UID'] == 0, 'uid match user=root group=nogroup'
141        assert obj['GID'] == nogroup_gid, 'gid match user=root group=nogroup'
142
143        self.load(
144            'ns_inspect',
145            user='root',
146            group='root',
147            isolation={
148                'namespaces': {'credential': True},
149                'uidmap': [{'container': 0, 'host': 0, 'size': 1}],
150                'gidmap': [{'container': 0, 'host': 0, 'size': 1}],
151            },
152        )
153
154        obj = self.getjson()['body']
155
156        assert obj['UID'] == 0, 'uid match uidmap user=root'
157        assert obj['GID'] == 0, 'gid match gidmap user=root'
158
159        # map 65535 uids
160        self.load(
161            'ns_inspect',
162            user='nobody',
163            isolation={
164                'namespaces': {'credential': True},
165                'uidmap': [
166                    {'container': 0, 'host': 0, 'size': nobody_uid + 1}
167                ],
168            },
169        )
170
171        obj = self.getjson()['body']
172
173        assert obj['UID'] == nobody_uid, 'uid match uidmap user=nobody'
174        assert obj['GID'] == nogroup_gid, 'gid match uidmap user=nobody'
175
176    def test_isolation_mnt(self):
177        if not self.isolation_key('mnt'):
178            pytest.skip('mnt namespace is not supported')
179
180        if not self.isolation_key('unprivileged_userns_clone'):
181            pytest.skip('unprivileged clone is not available')
182
183        self.load(
184            'ns_inspect',
185            isolation={'namespaces': {'mount': True, 'credential': True}},
186        )
187
188        obj = self.getjson()['body']
189
190        # all but user and mnt
191        allns = list(option.available['features']['isolation'].keys())
192        allns.remove('user')
193        allns.remove('mnt')
194
195        for ns in allns:
196            if ns.upper() in obj['NS']:
197                assert (
198                    obj['NS'][ns.upper()]
199                    == option.available['features']['isolation'][ns]
200                ), ('%s match' % ns)
201
202        assert obj['NS']['MNT'] != getns('mnt'), 'mnt set'
203        assert obj['NS']['USER'] != getns('user'), 'user set'
204
205    def test_isolation_pid(self, is_su):
206        if not self.isolation_key('pid'):
207            pytest.skip('pid namespace is not supported')
208
209        if not is_su:
210            if not self.isolation_key('unprivileged_userns_clone'):
211                pytest.skip('unprivileged clone is not available')
212
213            if not self.isolation_key('user'):
214                pytest.skip('user namespace is not supported')
215
216            if not self.isolation_key('mnt'):
217                pytest.skip('mnt namespace is not supported')
218
219        isolation = {'namespaces': {'pid': True}}
220
221        if not is_su:
222            isolation['namespaces']['mount'] = True
223            isolation['namespaces']['credential'] = True
224
225        self.load('ns_inspect', isolation=isolation)
226
227        obj = self.getjson()['body']
228
229        assert obj['PID'] == 1, 'pid of container is 1'
230
231    def test_isolation_namespace_false(self):
232        self.load('ns_inspect')
233        allns = list(option.available['features']['isolation'].keys())
234
235        remove_list = ['unprivileged_userns_clone', 'ipc', 'cgroup']
236        allns = [ns for ns in allns if ns not in remove_list]
237
238        namespaces = {}
239        for ns in allns:
240            if ns == 'user':
241                namespaces['credential'] = False
242            elif ns == 'mnt':
243                namespaces['mount'] = False
244            elif ns == 'net':
245                namespaces['network'] = False
246            elif ns == 'uts':
247                namespaces['uname'] = False
248            else:
249                namespaces[ns] = False
250
251        self.load('ns_inspect', isolation={'namespaces': namespaces})
252
253        obj = self.getjson()['body']
254
255        for ns in allns:
256            if ns.upper() in obj['NS']:
257                assert (
258                    obj['NS'][ns.upper()]
259                    == option.available['features']['isolation'][ns]
260                ), ('%s match' % ns)
261
262    def test_go_isolation_rootfs_container(self, is_su, temp_dir):
263        if not is_su:
264            if not self.isolation_key('unprivileged_userns_clone'):
265                pytest.skip('unprivileged clone is not available')
266
267            if not self.isolation_key('user'):
268                pytest.skip('user namespace is not supported')
269
270            if not self.isolation_key('mnt'):
271                pytest.skip('mnt namespace is not supported')
272
273            if not self.isolation_key('pid'):
274                pytest.skip('pid namespace is not supported')
275
276        isolation = {'rootfs': temp_dir}
277
278        if not is_su:
279            isolation['namespaces'] = {
280                'mount': True,
281                'credential': True,
282                'pid': True
283            }
284
285        self.load('ns_inspect', isolation=isolation)
286
287        obj = self.getjson(url='/?file=/go/app')['body']
288
289        assert obj['FileExists'] == True, 'app relative to rootfs'
290
291        obj = self.getjson(url='/?file=/bin/sh')['body']
292        assert obj['FileExists'] == False, 'file should not exists'
293
294    def test_go_isolation_rootfs_container_priv(self, is_su, temp_dir):
295        if not is_su:
296            pytest.skip('requires root')
297
298        if not self.isolation_key('mnt'):
299            pytest.skip('mnt namespace is not supported')
300
301        isolation = {
302            'namespaces': {'mount': True},
303            'rootfs': temp_dir,
304        }
305
306        self.load('ns_inspect', isolation=isolation)
307
308        obj = self.getjson(url='/?file=/go/app')['body']
309
310        assert obj['FileExists'] == True, 'app relative to rootfs'
311
312        obj = self.getjson(url='/?file=/bin/sh')['body']
313        assert obj['FileExists'] == False, 'file should not exists'
314
315    def test_go_isolation_rootfs_automount_tmpfs(self, is_su, temp_dir):
316        try:
317            open("/proc/self/mountinfo")
318        except:
319            pytest.skip('The system lacks /proc/self/mountinfo file')
320
321        if not is_su:
322            if not self.isolation_key('unprivileged_userns_clone'):
323                pytest.skip('unprivileged clone is not available')
324
325            if not self.isolation_key('user'):
326                pytest.skip('user namespace is not supported')
327
328            if not self.isolation_key('mnt'):
329                pytest.skip('mnt namespace is not supported')
330
331            if not self.isolation_key('pid'):
332                pytest.skip('pid namespace is not supported')
333
334        isolation = {'rootfs': temp_dir}
335
336        if not is_su:
337            isolation['namespaces'] = {
338                'mount': True,
339                'credential': True,
340                'pid': True
341            }
342
343        isolation['automount'] = {
344            'tmpfs': False
345        }
346
347        self.load('ns_inspect', isolation=isolation)
348
349        obj = self.getjson(url='/?mounts=true')['body']
350
351        assert (
352            "/ /tmp" not in obj['Mounts'] and "tmpfs" not in obj['Mounts']
353        ), 'app has no /tmp mounted'
354
355        isolation['automount'] = {
356            'tmpfs': True
357        }
358
359        self.load('ns_inspect', isolation=isolation)
360
361        obj = self.getjson(url='/?mounts=true')['body']
362
363        assert (
364            "/ /tmp" in obj['Mounts'] and "tmpfs" in obj['Mounts']
365        ), 'app has /tmp mounted on /'
366