xref: /unit/test/test_go_isolation.py (revision 1490:cecf6b11a1e3)
1import grp
2import pwd
3import unittest
4
5from unit.applications.lang.go import TestApplicationGo
6from unit.feature.isolation import TestFeatureIsolation
7
8
9class TestGoIsolation(TestApplicationGo):
10    prerequisites = {'modules': {'go': 'any'}, 'features': ['isolation']}
11
12    isolation = TestFeatureIsolation()
13
14    @classmethod
15    def setUpClass(cls, complete_check=True):
16        unit = super().setUpClass(complete_check=False)
17
18        TestFeatureIsolation().check(cls.available, unit.testdir)
19
20        return unit if not complete_check else unit.complete()
21
22    def unpriv_creds(self):
23        nobody_uid = pwd.getpwnam('nobody').pw_uid
24
25        try:
26            nogroup_gid = grp.getgrnam('nogroup').gr_gid
27            nogroup = 'nogroup'
28        except:
29            nogroup_gid = grp.getgrnam('nobody').gr_gid
30            nogroup = 'nobody'
31
32        return (nobody_uid, nogroup_gid, nogroup)
33
34    def isolation_key(self, key):
35        return key in self.available['features']['isolation'].keys()
36
37    def test_isolation_values(self):
38        self.load('ns_inspect')
39
40        obj = self.getjson()['body']
41
42        for ns, ns_value in self.available['features']['isolation'].items():
43            if ns.upper() in obj['NS']:
44                self.assertEqual(
45                    obj['NS'][ns.upper()], ns_value, '%s match' % ns
46                )
47
48    def test_isolation_unpriv_user(self):
49        if not self.isolation_key('unprivileged_userns_clone'):
50            print('unprivileged clone is not available')
51            raise unittest.SkipTest()
52
53        if self.is_su:
54            print('privileged tests, skip this')
55            raise unittest.SkipTest()
56
57        self.load('ns_inspect')
58        obj = self.getjson()['body']
59
60        self.assertEqual(obj['UID'], self.uid, 'uid match')
61        self.assertEqual(obj['GID'], self.gid, 'gid match')
62
63        self.load('ns_inspect', isolation={'namespaces': {'credential': True}})
64
65        obj = self.getjson()['body']
66
67        nobody_uid, nogroup_gid, nogroup = self.unpriv_creds()
68
69        # unprivileged unit map itself to nobody in the container by default
70        self.assertEqual(obj['UID'], nobody_uid, 'uid of nobody')
71        self.assertEqual(obj['GID'], nogroup_gid, 'gid of %s' % nogroup)
72
73        self.load(
74            'ns_inspect',
75            user='root',
76            isolation={'namespaces': {'credential': True}},
77        )
78
79        obj = self.getjson()['body']
80
81        self.assertEqual(obj['UID'], 0, 'uid match user=root')
82        self.assertEqual(obj['GID'], 0, 'gid match user=root')
83
84        self.load(
85            'ns_inspect',
86            user='root',
87            group=nogroup,
88            isolation={'namespaces': {'credential': True}},
89        )
90
91        obj = self.getjson()['body']
92
93        self.assertEqual(obj['UID'], 0, 'uid match user=root group=nogroup')
94        self.assertEqual(
95            obj['GID'], nogroup_gid, 'gid match user=root group=nogroup'
96        )
97
98        self.load(
99            'ns_inspect',
100            user='root',
101            group='root',
102            isolation={
103                'namespaces': {'credential': True},
104                'uidmap': [{'container': 0, 'host': self.uid, 'size': 1}],
105                'gidmap': [{'container': 0, 'host': self.gid, 'size': 1}],
106            },
107        )
108
109        obj = self.getjson()['body']
110
111        self.assertEqual(obj['UID'], 0, 'uid match uidmap')
112        self.assertEqual(obj['GID'], 0, 'gid match gidmap')
113
114    def test_isolation_priv_user(self):
115        if not self.is_su:
116            print('unprivileged tests, skip this')
117            raise unittest.SkipTest()
118
119        self.load('ns_inspect')
120
121        nobody_uid, nogroup_gid, nogroup = self.unpriv_creds()
122
123        obj = self.getjson()['body']
124
125        self.assertEqual(obj['UID'], nobody_uid, 'uid match')
126        self.assertEqual(obj['GID'], nogroup_gid, 'gid match')
127
128        self.load('ns_inspect', isolation={'namespaces': {'credential': True}})
129
130        obj = self.getjson()['body']
131
132        # privileged unit map app creds in the container by default
133        self.assertEqual(obj['UID'], nobody_uid, 'uid nobody')
134        self.assertEqual(obj['GID'], nogroup_gid, 'gid nobody')
135
136        self.load(
137            'ns_inspect',
138            user='root',
139            isolation={'namespaces': {'credential': True}},
140        )
141
142        obj = self.getjson()['body']
143
144        self.assertEqual(obj['UID'], 0, 'uid nobody user=root')
145        self.assertEqual(obj['GID'], 0, 'gid nobody user=root')
146
147        self.load(
148            'ns_inspect',
149            user='root',
150            group=nogroup,
151            isolation={'namespaces': {'credential': True}},
152        )
153
154        obj = self.getjson()['body']
155
156        self.assertEqual(obj['UID'], 0, 'uid match user=root group=nogroup')
157        self.assertEqual(
158            obj['GID'], nogroup_gid, 'gid match user=root group=nogroup'
159        )
160
161        self.load(
162            'ns_inspect',
163            user='root',
164            group='root',
165            isolation={
166                'namespaces': {'credential': True},
167                'uidmap': [{'container': 0, 'host': 0, 'size': 1}],
168                'gidmap': [{'container': 0, 'host': 0, 'size': 1}],
169            },
170        )
171
172        obj = self.getjson()['body']
173
174        self.assertEqual(obj['UID'], 0, 'uid match uidmap user=root')
175        self.assertEqual(obj['GID'], 0, 'gid match gidmap user=root')
176
177        # map 65535 uids
178        self.load(
179            'ns_inspect',
180            user='nobody',
181            isolation={
182                'namespaces': {'credential': True},
183                'uidmap': [
184                    {'container': 0, 'host': 0, 'size': nobody_uid + 1}
185                ],
186            },
187        )
188
189        obj = self.getjson()['body']
190
191        self.assertEqual(
192            obj['UID'], nobody_uid, 'uid match uidmap user=nobody'
193        )
194        self.assertEqual(
195            obj['GID'], nogroup_gid, 'gid match uidmap user=nobody'
196        )
197
198    def test_isolation_mnt(self):
199        if not self.isolation_key('mnt'):
200            print('mnt namespace is not supported')
201            raise unittest.SkipTest()
202
203        if not self.isolation_key('unprivileged_userns_clone'):
204            print('unprivileged clone is not available')
205            raise unittest.SkipTest()
206
207        self.load(
208            'ns_inspect',
209            isolation={'namespaces': {'mount': True, 'credential': True}},
210        )
211
212        obj = self.getjson()['body']
213
214        # all but user and mnt
215        allns = list(self.available['features']['isolation'].keys())
216        allns.remove('user')
217        allns.remove('mnt')
218
219        for ns in allns:
220            if ns.upper() in obj['NS']:
221                self.assertEqual(
222                    obj['NS'][ns.upper()],
223                    self.available['features']['isolation'][ns],
224                    '%s match' % ns,
225                )
226
227        self.assertNotEqual(
228            obj['NS']['MNT'], self.isolation.getns('mnt'), 'mnt set'
229        )
230        self.assertNotEqual(
231            obj['NS']['USER'], self.isolation.getns('user'), 'user set'
232        )
233
234    def test_isolation_pid(self):
235        if not self.isolation_key('pid'):
236            print('pid namespace is not supported')
237            raise unittest.SkipTest()
238
239        if not (self.is_su or self.isolation_key('unprivileged_userns_clone')):
240            print('requires root or unprivileged_userns_clone')
241            raise unittest.SkipTest()
242
243        self.load(
244            'ns_inspect',
245            isolation={'namespaces': {'pid': True, 'credential': True}},
246        )
247
248        obj = self.getjson()['body']
249
250        self.assertEqual(obj['PID'], 1, 'pid of container is 1')
251
252    def test_isolation_namespace_false(self):
253        self.load('ns_inspect')
254        allns = list(self.available['features']['isolation'].keys())
255
256        remove_list = ['unprivileged_userns_clone', 'ipc', 'cgroup']
257        allns = [ns for ns in allns if ns not in remove_list]
258
259        namespaces = {}
260        for ns in allns:
261            if ns == 'user':
262                namespaces['credential'] = False
263            elif ns == 'mnt':
264                namespaces['mount'] = False
265            elif ns == 'net':
266                namespaces['network'] = False
267            elif ns == 'uts':
268                namespaces['uname'] = False
269            else:
270                namespaces[ns] = False
271
272        self.load('ns_inspect', isolation={'namespaces': namespaces})
273
274        obj = self.getjson()['body']
275
276        for ns in allns:
277            if ns.upper() in obj['NS']:
278                self.assertEqual(
279                    obj['NS'][ns.upper()],
280                    self.available['features']['isolation'][ns],
281                    '%s match' % ns,
282                )
283
284    def test_go_isolation_rootfs_container(self):
285        if not self.isolation_key('unprivileged_userns_clone'):
286            print('unprivileged clone is not available')
287            raise unittest.SkipTest()
288
289        if not self.isolation_key('mnt'):
290            print('mnt namespace is not supported')
291            raise unittest.SkipTest()
292
293        isolation = {
294            'namespaces': {'mount': True, 'credential': True},
295            'rootfs': self.testdir,
296        }
297
298        self.load('ns_inspect', isolation=isolation)
299
300        obj = self.getjson(url='/?file=/go/app')['body']
301
302        self.assertEqual(obj['FileExists'], True, 'app relative to rootfs')
303
304        obj = self.getjson(url='/?file=/bin/sh')['body']
305        self.assertEqual(obj['FileExists'], False, 'file should not exists')
306
307    def test_go_isolation_rootfs_container_priv(self):
308        if not self.is_su:
309            print("requires root")
310            raise unittest.SkipTest()
311
312        if not self.isolation_key('mnt'):
313            print('mnt namespace is not supported')
314            raise unittest.SkipTest()
315
316        isolation = {
317            'namespaces': {'mount': True},
318            'rootfs': self.testdir,
319        }
320
321        self.load('ns_inspect', isolation=isolation)
322
323        obj = self.getjson(url='/?file=/go/app')['body']
324
325        self.assertEqual(obj['FileExists'], True, 'app relative to rootfs')
326
327        obj = self.getjson(url='/?file=/bin/sh')['body']
328        self.assertEqual(obj['FileExists'], False, 'file should not exists')
329
330
331if __name__ == '__main__':
332    TestGoIsolation.main()
333