xref: /unit/test/test_forwarded_header.py (revision 2616:ab2896c980ab)
1import pytest
2
3from unit.applications.lang.python import ApplicationPython
4
5prerequisites = {'modules': {'python': 'any'}}
6
7client = ApplicationPython()
8
9
10@pytest.fixture(autouse=True)
11def setup_method_fixture():
12    client.load('forwarded_header')
13
14
15def forwarded_header(forwarded):
16    assert 'success' in client.conf(
17        {
18            "127.0.0.1:8081": {
19                "forwarded": forwarded,
20                "pass": "applications/forwarded_header",
21            },
22            "[::1]:8082": {
23                "forwarded": forwarded,
24                "pass": "applications/forwarded_header",
25            },
26        },
27        'listeners',
28    ), 'listeners configure'
29
30
31def get_fwd(sock_type='ipv4', xff=None, xfp=None):
32    port = 8081 if sock_type == 'ipv4' else 8082
33
34    headers = {'Connection': 'close'}
35
36    if xff is not None:
37        headers['X-Forwarded-For'] = xff
38
39    if xfp is not None:
40        headers['X-Forwarded-Proto'] = xfp
41
42    return client.get(sock_type=sock_type, port=port, headers=headers)[
43        'headers'
44    ]
45
46
47def get_addr(*args, **kwargs):
48    return get_fwd(*args, **kwargs)['Remote-Addr']
49
50
51def get_scheme(*args, **kwargs):
52    return get_fwd(*args, **kwargs)['Url-Scheme']
53
54
55def test_forwarded_header_single_ip():
56    forwarded_header(
57        {
58            'client_ip': 'X-Forwarded-For',
59            'protocol': 'X-Forwarded-Proto',
60            'source': '123.123.123.123',
61        }
62    )
63
64    resp = get_fwd(xff='1.1.1.1', xfp='https')
65    assert resp['Remote-Addr'] == '127.0.0.1', 'both headers addr'
66    assert resp['Url-Scheme'] == 'http', 'both headers proto'
67
68    assert get_addr() == '127.0.0.1', 'ipv4 default addr'
69    assert get_addr('ipv6') == '::1', 'ipv6 default addr'
70    assert get_addr(xff='1.1.1.1') == '127.0.0.1', 'bad source'
71    assert get_addr(xff='blah') == '127.0.0.1', 'bad xff'
72    assert get_addr('ipv6', '1.1.1.1') == '::1', 'bad source ipv6'
73
74    assert get_scheme() == 'http', 'ipv4 default proto'
75    assert get_scheme('ipv6') == 'http', 'ipv6 default proto'
76    assert get_scheme(xfp='https') == 'http', 'bad proto'
77    assert get_scheme(xfp='blah') == 'http', 'bad xfp'
78    assert get_scheme('ipv6', xfp='https') == 'http', 'bad proto ipv6'
79
80    forwarded_header(
81        {
82            'client_ip': 'X-Forwarded-For',
83            'protocol': 'X-Forwarded-Proto',
84            'source': '127.0.0.1',
85        }
86    )
87
88    resp = get_fwd(xff='1.1.1.1', xfp='https')
89    assert resp['Remote-Addr'] == '1.1.1.1', 'both headers addr 2'
90    assert resp['Url-Scheme'] == 'https', 'both headers proto 2'
91
92    assert get_addr() == '127.0.0.1', 'ipv4 default addr 2'
93    assert get_addr('ipv6') == '::1', 'ipv6 default addr 2'
94    assert get_addr(xff='1.1.1.1') == '1.1.1.1', 'xff replace'
95    assert get_addr('ipv6', '1.1.1.1') == '::1', 'bad source ipv6 2'
96
97    assert get_scheme() == 'http', 'ipv4 default proto 2'
98    assert get_scheme('ipv6') == 'http', 'ipv6 default proto 2'
99    assert get_scheme(xfp='https') == 'https', 'xfp replace'
100    assert get_scheme(xfp='on') == 'https', 'xfp replace 2'
101    assert get_scheme('ipv6', xfp='https') == 'http', 'bad proto ipv6 2'
102
103    forwarded_header(
104        {
105            'client_ip': 'X-Forwarded-For',
106            'protocol': 'X-Forwarded-Proto',
107            'source': '!127.0.0.1',
108        }
109    )
110
111    assert get_addr(xff='1.1.1.1') == '127.0.0.1', 'bad source 3'
112    assert get_addr('ipv6', '1.1.1.1') == '1.1.1.1', 'xff replace 2'
113    assert get_scheme(xfp='https') == 'http', 'bad proto 2'
114    assert get_scheme('ipv6', xfp='https') == 'https', 'xfp replace 3'
115
116
117def test_forwarded_header_ipv4():
118    forwarded_header(
119        {
120            'client_ip': 'X-Forwarded-For',
121            'protocol': 'X-Forwarded-Proto',
122            'source': '127.0.0.1',
123        }
124    )
125
126    assert get_addr(xff='8.8.8.8, 84.23.23.11') == '84.23.23.11', 'xff replace'
127    assert (
128        get_addr(xff='8.8.8.8, 84.23.23.11, 127.0.0.1') == '127.0.0.1'
129    ), 'xff replace 2'
130    assert (
131        get_addr(xff=['8.8.8.8', '127.0.0.1, 10.0.1.1']) == '10.0.1.1'
132    ), 'xff replace multi'
133
134    assert get_scheme(xfp='http, https') == 'http', 'xfp replace'
135    assert get_scheme(xfp='http, https, http') == 'http', 'xfp replace 2'
136    assert (
137        get_scheme(xfp=['http, https', 'http', 'https']) == 'http'
138    ), 'xfp replace multi'
139
140
141def test_forwarded_header_ipv6():
142    forwarded_header(
143        {
144            'client_ip': 'X-Forwarded-For',
145            'protocol': 'X-Forwarded-Proto',
146            'source': '::1',
147        }
148    )
149
150    assert get_addr(xff='1.1.1.1') == '127.0.0.1', 'bad source ipv4'
151
152    for ip in [
153        'f607:7403:1e4b:6c66:33b2:843f:2517:da27',
154        '2001:db8:3c4d:15::1a2f:1a2b',
155        '2001::3c4d:15:1a2f:1a2b',
156        '::11.22.33.44',
157    ]:
158        assert get_addr('ipv6', ip) == ip, 'replace'
159
160    assert get_scheme(xfp='https') == 'http', 'bad source ipv4'
161
162    for proto in ['http', 'https']:
163        assert get_scheme('ipv6', xfp=proto) == proto, 'replace'
164
165
166def test_forwarded_header_recursive():
167    forwarded_header(
168        {
169            'client_ip': 'X-Forwarded-For',
170            'recursive': True,
171            'source': ['127.0.0.1', '10.50.0.17', '10.5.2.1'],
172        }
173    )
174
175    assert get_addr(xff='1.1.1.1') == '1.1.1.1', 'xff chain'
176    assert get_addr(xff='1.1.1.1, 10.5.2.1') == '1.1.1.1', 'xff chain 2'
177    assert (
178        get_addr(xff='8.8.8.8, 1.1.1.1, 10.5.2.1') == '1.1.1.1'
179    ), 'xff chain 3'
180    assert (
181        get_addr(xff='10.50.0.17, 10.5.2.1, 10.5.2.1') == '10.50.0.17'
182    ), 'xff chain 4'
183    assert (
184        get_addr(xff=['8.8.8.8', '1.1.1.1, 127.0.0.1']) == '1.1.1.1'
185    ), 'xff replace multi'
186    assert (
187        get_addr(xff=['8.8.8.8', '1.1.1.1, 127.0.0.1', '10.5.2.1']) == '1.1.1.1'
188    ), 'xff replace multi 2'
189    assert (
190        get_addr(xff=['10.5.2.1', '10.50.0.17, 1.1.1.1', '10.5.2.1'])
191        == '1.1.1.1'
192    ), 'xff replace multi 3'
193    assert (
194        get_addr(xff='8.8.8.8, 2001:db8:3c4d:15::1a2f:1a2b, 127.0.0.1')
195        == '2001:db8:3c4d:15::1a2f:1a2b'
196    ), 'xff chain ipv6'
197
198
199def test_forwarded_header_case_insensitive():
200    forwarded_header(
201        {
202            'client_ip': 'x-forwarded-for',
203            'protocol': 'x-forwarded-proto',
204            'source': '127.0.0.1',
205        }
206    )
207
208    assert get_addr() == '127.0.0.1', 'ipv4 default addr'
209    assert get_addr('ipv6') == '::1', 'ipv6 default addr'
210    assert get_addr(xff='1.1.1.1') == '1.1.1.1', 'replace'
211
212    assert get_scheme() == 'http', 'ipv4 default proto'
213    assert get_scheme('ipv6') == 'http', 'ipv6 default proto'
214    assert get_scheme(xfp='https') == 'https', 'replace 1'
215    assert get_scheme(xfp='oN') == 'https', 'replace 2'
216
217
218def test_forwarded_header_source_empty():
219    forwarded_header(
220        {
221            'client_ip': 'X-Forwarded-For',
222            'protocol': 'X-Forwarded-Proto',
223            'source': [],
224        }
225    )
226
227    assert get_addr(xff='1.1.1.1') == '127.0.0.1', 'empty source xff'
228    assert get_scheme(xfp='https') == 'http', 'empty source xfp'
229
230
231def test_forwarded_header_source_range():
232    forwarded_header(
233        {
234            'client_ip': 'X-Forwarded-For',
235            'protocol': 'X-Forwarded-Proto',
236            'source': '127.0.0.0-127.0.0.1',
237        }
238    )
239
240    assert get_addr(xff='1.1.1.1') == '1.1.1.1', 'source range'
241    assert get_addr('ipv6', '1.1.1.1') == '::1', 'source range 2'
242
243
244def test_forwarded_header_invalid():
245    assert 'error' in client.conf(
246        {
247            "127.0.0.1:8081": {
248                "forwarded": {"source": '127.0.0.1'},
249                "pass": "applications/forwarded_header",
250            }
251        },
252        'listeners',
253    ), 'invalid forward'
254
255    def check_invalid_source(source):
256        assert 'error' in client.conf(
257            {
258                "127.0.0.1:8081": {
259                    "forwarded": {
260                        "client_ip": "X-Forwarded-For",
261                        "source": source,
262                    },
263                    "pass": "applications/forwarded_header",
264                }
265            },
266            'listeners',
267        ), 'invalid source'
268
269    check_invalid_source(None)
270    check_invalid_source('a')
271    check_invalid_source(['a'])
272