xref: /unit/test/test_forwarded_header.py (revision 2134:f576f11a58b4)
1from unit.applications.lang.python import TestApplicationPython
2
3
4class TestForwardedHeader(TestApplicationPython):
5    prerequisites = {'modules': {'python': 'any'}}
6
7    def forwarded_header(self, forwarded):
8        assert 'success' in self.conf(
9            {
10                "127.0.0.1:7081": {
11                    "forwarded": forwarded,
12                    "pass": "applications/forwarded_header",
13                },
14                "[::1]:7082": {
15                    "forwarded": forwarded,
16                    "pass": "applications/forwarded_header",
17                },
18            },
19            'listeners',
20        ), 'listeners configure'
21
22    def get_fwd(self, sock_type='ipv4', xff=None, xfp=None):
23        port = 7081 if sock_type == 'ipv4' else 7082
24
25        headers = {'Connection': 'close'}
26
27        if xff is not None:
28            headers['X-Forwarded-For'] = xff
29
30        if xfp is not None:
31            headers['X-Forwarded-Proto'] = xfp
32
33        return self.get(sock_type=sock_type, port=port, headers=headers)[
34            'headers'
35        ]
36
37    def get_addr(self, *args, **kwargs):
38        return self.get_fwd(*args, **kwargs)['Remote-Addr']
39
40    def get_scheme(self, *args, **kwargs):
41        return self.get_fwd(*args, **kwargs)['Url-Scheme']
42
43    def setup_method(self):
44        self.load('forwarded_header')
45
46    def test_forwarded_header_single_ip(self):
47        self.forwarded_header(
48            {
49                'client_ip': 'X-Forwarded-For',
50                'protocol': 'X-Forwarded-Proto',
51                'source': '123.123.123.123',
52            }
53        )
54
55        resp = self.get_fwd(xff='1.1.1.1', xfp='https')
56        assert resp['Remote-Addr'] == '127.0.0.1', 'both headers addr'
57        assert resp['Url-Scheme'] == 'http', 'both headers proto'
58
59        assert self.get_addr() == '127.0.0.1', 'ipv4 default addr'
60        assert self.get_addr('ipv6') == '::1', 'ipv6 default addr'
61        assert self.get_addr(xff='1.1.1.1') == '127.0.0.1', 'bad source'
62        assert self.get_addr(xff='blah') == '127.0.0.1', 'bad xff'
63        assert self.get_addr('ipv6', '1.1.1.1') == '::1', 'bad source ipv6'
64
65        assert self.get_scheme() == 'http', 'ipv4 default proto'
66        assert self.get_scheme('ipv6') == 'http', 'ipv6 default proto'
67        assert self.get_scheme(xfp='https') == 'http', 'bad proto'
68        assert self.get_scheme(xfp='blah') == 'http', 'bad xfp'
69        assert self.get_scheme('ipv6', xfp='https') == 'http', 'bad proto ipv6'
70
71        self.forwarded_header(
72            {
73                'client_ip': 'X-Forwarded-For',
74                'protocol': 'X-Forwarded-Proto',
75                'source': '127.0.0.1',
76            }
77        )
78
79        resp = self.get_fwd(xff='1.1.1.1', xfp='https')
80        assert resp['Remote-Addr'] == '1.1.1.1', 'both headers addr 2'
81        assert resp['Url-Scheme'] == 'https', 'both headers proto 2'
82
83        assert self.get_addr() == '127.0.0.1', 'ipv4 default addr 2'
84        assert self.get_addr('ipv6') == '::1', 'ipv6 default addr 2'
85        assert self.get_addr(xff='1.1.1.1') == '1.1.1.1', 'xff replace'
86        assert self.get_addr('ipv6', '1.1.1.1') == '::1', 'bad source ipv6 2'
87
88        assert self.get_scheme() == 'http', 'ipv4 default proto 2'
89        assert self.get_scheme('ipv6') == 'http', 'ipv6 default proto 2'
90        assert self.get_scheme(xfp='https') == 'https', 'xfp replace'
91        assert self.get_scheme(xfp='on') == 'https', 'xfp replace 2'
92        assert (
93            self.get_scheme('ipv6', xfp='https') == 'http'
94        ), 'bad proto ipv6 2'
95
96        self.forwarded_header(
97            {
98                'client_ip': 'X-Forwarded-For',
99                'protocol': 'X-Forwarded-Proto',
100                'source': '!127.0.0.1',
101            }
102        )
103
104        assert self.get_addr(xff='1.1.1.1') == '127.0.0.1', 'bad source 3'
105        assert self.get_addr('ipv6', '1.1.1.1') == '1.1.1.1', 'xff replace 2'
106        assert self.get_scheme(xfp='https') == 'http', 'bad proto 2'
107        assert self.get_scheme('ipv6', xfp='https') == 'https', 'xfp replace 3'
108
109    def test_forwarded_header_ipv4(self):
110        self.forwarded_header(
111            {
112                'client_ip': 'X-Forwarded-For',
113                'protocol': 'X-Forwarded-Proto',
114                'source': '127.0.0.1',
115            }
116        )
117
118        assert (
119            self.get_addr(xff='8.8.8.8, 84.23.23.11') == '84.23.23.11'
120        ), 'xff replace'
121        assert (
122            self.get_addr(xff='8.8.8.8, 84.23.23.11, 127.0.0.1') == '127.0.0.1'
123        ), 'xff replace 2'
124        assert (
125            self.get_addr(xff=['8.8.8.8', '127.0.0.1, 10.0.1.1']) == '10.0.1.1'
126        ), 'xff replace multi'
127
128        assert self.get_scheme(xfp='http, https') == 'http', 'xfp replace'
129        assert (
130            self.get_scheme(xfp='http, https, http') == 'http'
131        ), 'xfp replace 2'
132        assert (
133            self.get_scheme(xfp=['http, https', 'http', 'https']) == 'http'
134        ), 'xfp replace multi'
135
136    def test_forwarded_header_ipv6(self):
137        self.forwarded_header(
138            {
139                'client_ip': 'X-Forwarded-For',
140                'protocol': 'X-Forwarded-Proto',
141                'source': '::1',
142            }
143        )
144
145        assert self.get_addr(xff='1.1.1.1') == '127.0.0.1', 'bad source ipv4'
146
147        for ip in [
148            'f607:7403:1e4b:6c66:33b2:843f:2517:da27',
149            '2001:db8:3c4d:15::1a2f:1a2b',
150            '2001::3c4d:15:1a2f:1a2b',
151            '::11.22.33.44',
152        ]:
153            assert self.get_addr('ipv6', ip) == ip, 'replace'
154
155        assert self.get_scheme(xfp='https') == 'http', 'bad source ipv4'
156
157        for proto in ['http', 'https']:
158            assert self.get_scheme('ipv6', xfp=proto) == proto, 'replace'
159
160    def test_forwarded_header_recursive(self):
161        self.forwarded_header(
162            {
163                'client_ip': 'X-Forwarded-For',
164                'recursive': True,
165                'source': ['127.0.0.1', '10.50.0.17', '10.5.2.1'],
166            }
167        )
168
169        assert self.get_addr(xff='1.1.1.1') == '1.1.1.1', 'xff chain'
170        assert (
171            self.get_addr(xff='1.1.1.1, 10.5.2.1') == '1.1.1.1'
172        ), 'xff chain 2'
173        assert (
174            self.get_addr(xff='8.8.8.8, 1.1.1.1, 10.5.2.1') == '1.1.1.1'
175        ), 'xff chain 3'
176        assert (
177            self.get_addr(xff='10.50.0.17, 10.5.2.1, 10.5.2.1') == '10.50.0.17'
178        ), 'xff chain 4'
179        assert (
180            self.get_addr(xff=['8.8.8.8', '1.1.1.1, 127.0.0.1']) == '1.1.1.1'
181        ), 'xff replace multi'
182        assert (
183            self.get_addr(xff=['8.8.8.8', '1.1.1.1, 127.0.0.1', '10.5.2.1'])
184            == '1.1.1.1'
185        ), 'xff replace multi 2'
186        assert (
187            self.get_addr(xff=['10.5.2.1', '10.50.0.17, 1.1.1.1', '10.5.2.1'])
188            == '1.1.1.1'
189        ), 'xff replace multi 3'
190        assert (
191            self.get_addr(
192                xff='8.8.8.8, 2001:db8:3c4d:15::1a2f:1a2b, 127.0.0.1'
193            )
194            == '2001:db8:3c4d:15::1a2f:1a2b'
195        ), 'xff chain ipv6'
196
197    def test_forwarded_header_case_insensitive(self):
198        self.forwarded_header(
199            {
200                'client_ip': 'x-forwarded-for',
201                'protocol': 'x-forwarded-proto',
202                'source': '127.0.0.1',
203            }
204        )
205
206        assert self.get_addr() == '127.0.0.1', 'ipv4 default addr'
207        assert self.get_addr('ipv6') == '::1', 'ipv6 default addr'
208        assert self.get_addr(xff='1.1.1.1') == '1.1.1.1', 'replace'
209
210        assert self.get_scheme() == 'http', 'ipv4 default proto'
211        assert self.get_scheme('ipv6') == 'http', 'ipv6 default proto'
212        assert self.get_scheme(xfp='https') == 'https', 'replace 1'
213        assert self.get_scheme(xfp='oN') == 'https', 'replace 2'
214
215    def test_forwarded_header_source_empty(self):
216        self.forwarded_header(
217            {
218                'client_ip': 'X-Forwarded-For',
219                'protocol': 'X-Forwarded-Proto',
220                'source': [],
221            }
222        )
223
224        assert self.get_addr(xff='1.1.1.1') == '127.0.0.1', 'empty source xff'
225        assert self.get_scheme(xfp='https') == 'http', 'empty source xfp'
226
227    def test_forwarded_header_source_range(self):
228        self.forwarded_header(
229            {
230                'client_ip': 'X-Forwarded-For',
231                'protocol': 'X-Forwarded-Proto',
232                'source': '127.0.0.0-127.0.0.1',
233            }
234        )
235
236        assert self.get_addr(xff='1.1.1.1') == '1.1.1.1', 'source range'
237        assert self.get_addr('ipv6', '1.1.1.1') == '::1', 'source range 2'
238
239    def test_forwarded_header_invalid(self):
240        assert 'error' in self.conf(
241            {
242                "127.0.0.1:7081": {
243                    "forwarded": {"source": '127.0.0.1'},
244                    "pass": "applications/forwarded_header",
245                }
246            },
247            'listeners',
248        ), 'invalid forward'
249
250        def check_invalid_source(source):
251            assert 'error' in self.conf(
252                {
253                    "127.0.0.1:7081": {
254                        "forwarded": {
255                            "client_ip": "X-Forwarded-For",
256                            "source": source,
257                        },
258                        "pass": "applications/forwarded_header",
259                    }
260                },
261                'listeners',
262            ), 'invalid source'
263
264        check_invalid_source(None)
265        check_invalid_source('a')
266        check_invalid_source(['a'])
267