xref: /unit/test/test_go_isolation.py (revision 2616:ab2896c980ab)
1import grp
2import os
3import pwd
4
5import pytest
6
7from unit.applications.lang.go import ApplicationGo
8from unit.option import option
9from unit.utils import getns
10
11prerequisites = {'modules': {'go': 'any'}, 'features': {'isolation': True}}
12
13client = ApplicationGo()
14
15
16def unpriv_creds():
17    nobody_uid = pwd.getpwnam('nobody').pw_uid
18
19    try:
20        nogroup_gid = grp.getgrnam('nogroup').gr_gid
21        nogroup = 'nogroup'
22    except KeyError:
23        nogroup_gid = grp.getgrnam('nobody').gr_gid
24        nogroup = 'nobody'
25
26    return (nobody_uid, nogroup_gid, nogroup)
27
28
29def test_isolation_values():
30    client.load('ns_inspect')
31
32    obj = client.getjson()['body']
33
34    for ns, ns_value in option.available['features']['isolation'].items():
35        if ns.upper() in obj['NS']:
36            assert obj['NS'][ns.upper()] == ns_value, f'{ns} match'
37
38
39def test_isolation_unpriv_user(require):
40    require(
41        {
42            'privileged_user': False,
43            'features': {'isolation': ['unprivileged_userns_clone']},
44        }
45    )
46
47    client.load('ns_inspect')
48    obj = client.getjson()['body']
49
50    assert obj['UID'] == os.geteuid(), 'uid match'
51    assert obj['GID'] == os.getegid(), 'gid match'
52
53    client.load('ns_inspect', isolation={'namespaces': {'credential': True}})
54
55    obj = client.getjson()['body']
56
57    nobody_uid, nogroup_gid, nogroup = unpriv_creds()
58
59    # unprivileged unit map itself to nobody in the container by default
60    assert obj['UID'] == nobody_uid, 'uid of nobody'
61    assert obj['GID'] == nogroup_gid, f'gid of {nogroup}'
62
63    client.load(
64        'ns_inspect',
65        user='root',
66        isolation={'namespaces': {'credential': True}},
67    )
68
69    obj = client.getjson()['body']
70
71    assert obj['UID'] == 0, 'uid match user=root'
72    assert obj['GID'] == 0, 'gid match user=root'
73
74    client.load(
75        'ns_inspect',
76        user='root',
77        group=nogroup,
78        isolation={'namespaces': {'credential': True}},
79    )
80
81    obj = client.getjson()['body']
82
83    assert obj['UID'] == 0, 'uid match user=root group=nogroup'
84    assert obj['GID'] == nogroup_gid, 'gid match user=root group=nogroup'
85
86    client.load(
87        'ns_inspect',
88        user='root',
89        group='root',
90        isolation={
91            'namespaces': {'credential': True},
92            'uidmap': [{'container': 0, 'host': os.geteuid(), 'size': 1}],
93            'gidmap': [{'container': 0, 'host': os.getegid(), 'size': 1}],
94        },
95    )
96
97    obj = client.getjson()['body']
98
99    assert obj['UID'] == 0, 'uid match uidmap'
100    assert obj['GID'] == 0, 'gid match gidmap'
101
102
103def test_isolation_priv_user(require):
104    require({'privileged_user': True})
105
106    client.load('ns_inspect')
107
108    nobody_uid, nogroup_gid, nogroup = unpriv_creds()
109
110    obj = client.getjson()['body']
111
112    assert obj['UID'] == nobody_uid, 'uid match'
113    assert obj['GID'] == nogroup_gid, 'gid match'
114
115    client.load('ns_inspect', isolation={'namespaces': {'credential': True}})
116
117    obj = client.getjson()['body']
118
119    # privileged unit map app creds in the container by default
120    assert obj['UID'] == nobody_uid, 'uid nobody'
121    assert obj['GID'] == nogroup_gid, 'gid nobody'
122
123    client.load(
124        'ns_inspect',
125        user='root',
126        isolation={'namespaces': {'credential': True}},
127    )
128
129    obj = client.getjson()['body']
130
131    assert obj['UID'] == 0, 'uid nobody user=root'
132    assert obj['GID'] == 0, 'gid nobody user=root'
133
134    client.load(
135        'ns_inspect',
136        user='root',
137        group=nogroup,
138        isolation={'namespaces': {'credential': True}},
139    )
140
141    obj = client.getjson()['body']
142
143    assert obj['UID'] == 0, 'uid match user=root group=nogroup'
144    assert obj['GID'] == nogroup_gid, 'gid match user=root group=nogroup'
145
146    client.load(
147        'ns_inspect',
148        user='root',
149        group='root',
150        isolation={
151            'namespaces': {'credential': True},
152            'uidmap': [{'container': 0, 'host': 0, 'size': 1}],
153            'gidmap': [{'container': 0, 'host': 0, 'size': 1}],
154        },
155    )
156
157    obj = client.getjson()['body']
158
159    assert obj['UID'] == 0, 'uid match uidmap user=root'
160    assert obj['GID'] == 0, 'gid match gidmap user=root'
161
162    # map 65535 uids
163    client.load(
164        'ns_inspect',
165        user='nobody',
166        isolation={
167            'namespaces': {'credential': True},
168            'uidmap': [{'container': 0, 'host': 0, 'size': nobody_uid + 1}],
169        },
170    )
171
172    obj = client.getjson()['body']
173
174    assert obj['UID'] == nobody_uid, 'uid match uidmap user=nobody'
175    assert obj['GID'] == nogroup_gid, 'gid match uidmap user=nobody'
176
177
178def test_isolation_mnt(require):
179    require(
180        {
181            'features': {'isolation': ['unprivileged_userns_clone', 'mnt']},
182        }
183    )
184
185    client.load(
186        'ns_inspect',
187        isolation={'namespaces': {'mount': True, 'credential': True}},
188    )
189
190    obj = client.getjson()['body']
191
192    # all but user and mnt
193    allns = list(option.available['features']['isolation'].keys())
194    allns.remove('user')
195    allns.remove('mnt')
196
197    for ns in allns:
198        if ns.upper() in obj['NS']:
199            assert (
200                obj['NS'][ns.upper()]
201                == option.available['features']['isolation'][ns]
202            ), f'{ns} match'
203
204    assert obj['NS']['MNT'] != getns('mnt'), 'mnt set'
205    assert obj['NS']['USER'] != getns('user'), 'user set'
206
207
208def test_isolation_pid(is_su, require):
209    require({'features': {'isolation': ['pid']}})
210
211    if not is_su:
212        require(
213            {
214                'features': {
215                    'isolation': [
216                        'unprivileged_userns_clone',
217                        'user',
218                        'mnt',
219                    ]
220                }
221            }
222        )
223
224    isolation = {'namespaces': {'pid': True}}
225
226    if not is_su:
227        isolation['namespaces']['mount'] = True
228        isolation['namespaces']['credential'] = True
229
230    client.load('ns_inspect', isolation=isolation)
231
232    obj = client.getjson()['body']
233
234    assert obj['PID'] == 2, 'pid of container is 2'
235
236
237def test_isolation_namespace_false():
238    client.load('ns_inspect')
239    allns = list(option.available['features']['isolation'].keys())
240
241    remove_list = ['unprivileged_userns_clone', 'ipc', 'cgroup']
242    allns = [ns for ns in allns if ns not in remove_list]
243
244    namespaces = {}
245    for ns in allns:
246        if ns == 'user':
247            namespaces['credential'] = False
248        elif ns == 'mnt':
249            namespaces['mount'] = False
250        elif ns == 'net':
251            namespaces['network'] = False
252        elif ns == 'uts':
253            namespaces['uname'] = False
254        else:
255            namespaces[ns] = False
256
257    client.load('ns_inspect', isolation={'namespaces': namespaces})
258
259    obj = client.getjson()['body']
260
261    for ns in allns:
262        if ns.upper() in obj['NS']:
263            assert (
264                obj['NS'][ns.upper()]
265                == option.available['features']['isolation'][ns]
266            ), f'{ns} match'
267
268
269def test_go_isolation_rootfs_container(is_su, require, temp_dir):
270    if not is_su:
271        require(
272            {
273                'features': {
274                    'isolation': [
275                        'unprivileged_userns_clone',
276                        'user',
277                        'mnt',
278                        'pid',
279                    ]
280                }
281            }
282        )
283
284    isolation = {'rootfs': temp_dir}
285
286    if not is_su:
287        isolation['namespaces'] = {
288            'mount': True,
289            'credential': True,
290            'pid': True,
291        }
292
293    client.load('ns_inspect', isolation=isolation)
294
295    obj = client.getjson(url='/?file=/go/app')['body']
296
297    assert obj['FileExists'], 'app relative to rootfs'
298
299    obj = client.getjson(url='/?file=/bin/sh')['body']
300    assert not obj['FileExists'], 'file should not exists'
301
302
303def test_go_isolation_rootfs_container_priv(require, temp_dir):
304    require({'privileged_user': True, 'features': {'isolation': ['mnt']}})
305
306    isolation = {
307        'namespaces': {'mount': True},
308        'rootfs': temp_dir,
309    }
310
311    client.load('ns_inspect', isolation=isolation)
312
313    obj = client.getjson(url='/?file=/go/app')['body']
314
315    assert obj['FileExists'], 'app relative to rootfs'
316
317    obj = client.getjson(url='/?file=/bin/sh')['body']
318    assert not obj['FileExists'], 'file should not exists'
319
320
321def test_go_isolation_rootfs_automount_tmpfs(is_su, require, temp_dir):
322    try:
323        open("/proc/self/mountinfo", encoding='utf-8')
324    except:
325        pytest.skip('The system lacks /proc/self/mountinfo file')
326
327    if not is_su:
328        require(
329            {
330                'features': {
331                    'isolation': [
332                        'unprivileged_userns_clone',
333                        'user',
334                        'mnt',
335                        'pid',
336                    ]
337                }
338            }
339        )
340
341    isolation = {'rootfs': temp_dir}
342
343    if not is_su:
344        isolation['namespaces'] = {
345            'mount': True,
346            'credential': True,
347            'pid': True,
348        }
349
350    isolation['automount'] = {'tmpfs': False}
351
352    client.load('ns_inspect', isolation=isolation)
353
354    obj = client.getjson(url='/?mounts=true')['body']
355
356    assert (
357        "/ /tmp" not in obj['Mounts'] and "tmpfs" not in obj['Mounts']
358    ), 'app has no /tmp mounted'
359
360    isolation['automount'] = {'tmpfs': True}
361
362    client.load('ns_inspect', isolation=isolation)
363
364    obj = client.getjson(url='/?mounts=true')['body']
365
366    assert (
367        "/ /tmp" in obj['Mounts'] and "tmpfs" in obj['Mounts']
368    ), 'app has /tmp mounted on /'
369