xref: /unit/test/test_client_ip.py (revision 2171:20f712007059)
1from unit.applications.lang.python import TestApplicationPython
2from unit.option import option
3
4
5class TestClientIP(TestApplicationPython):
6    prerequisites = {'modules': {'python': 'any'}}
7
8    def client_ip(self, options):
9        assert 'success' in self.conf(
10            {
11                "127.0.0.1:7081": {
12                    "client_ip": options,
13                    "pass": "applications/client_ip",
14                },
15                "[::1]:7082": {
16                    "client_ip": options,
17                    "pass": "applications/client_ip",
18                },
19                "unix:"
20                + option.temp_dir
21                + "/sock": {
22                    "client_ip": options,
23                    "pass": "applications/client_ip",
24                },
25            },
26            'listeners',
27        ), 'listeners configure'
28
29    def get_xff(self, xff, sock_type='ipv4'):
30        address = {
31            'ipv4': ('127.0.0.1', 7081),
32            'ipv6': ('::1', 7082),
33            'unix': (option.temp_dir + '/sock', None),
34        }
35        (addr, port) = address[sock_type]
36
37        return self.get(
38            sock_type=sock_type,
39            addr=addr,
40            port=port,
41            headers={'Connection': 'close', 'X-Forwarded-For': xff},
42        )['body']
43
44    def setup_method(self):
45        self.load('client_ip')
46
47    def test_client_ip_single_ip(self):
48        self.client_ip(
49            {'header': 'X-Forwarded-For', 'source': '123.123.123.123'}
50        )
51
52        assert self.get(port=7081)['body'] == '127.0.0.1', 'ipv4 default'
53        assert (
54            self.get(sock_type='ipv6', port=7082)['body'] == '::1'
55        ), 'ipv6 default'
56        assert self.get_xff('1.1.1.1') == '127.0.0.1', 'bad source'
57        assert self.get_xff('blah') == '127.0.0.1', 'bad header'
58        assert self.get_xff('1.1.1.1', 'ipv6') == '::1', 'bad source ipv6'
59
60        self.client_ip({'header': 'X-Forwarded-For', 'source': '127.0.0.1'})
61
62        assert self.get(port=7081)['body'] == '127.0.0.1', 'ipv4 default 2'
63        assert (
64            self.get(sock_type='ipv6', port=7082)['body'] == '::1'
65        ), 'ipv6 default 2'
66        assert self.get_xff('1.1.1.1') == '1.1.1.1', 'replace'
67        assert self.get_xff('blah') == '127.0.0.1', 'bad header 2'
68        assert self.get_xff('1.1.1.1', 'ipv6') == '::1', 'bad source ipv6 2'
69
70        self.client_ip({'header': 'X-Forwarded-For', 'source': '!127.0.0.1'})
71
72        assert self.get_xff('1.1.1.1') == '127.0.0.1', 'bad source 3'
73        assert self.get_xff('1.1.1.1', 'ipv6') == '1.1.1.1', 'replace 2'
74
75    def test_client_ip_ipv4(self):
76        self.client_ip({'header': 'X-Forwarded-For', 'source': '127.0.0.1'})
77
78        assert (
79            self.get_xff('8.8.8.8, 84.23.23.11') == '84.23.23.11'
80        ), 'xff replace'
81        assert (
82            self.get_xff('8.8.8.8, 84.23.23.11, 127.0.0.1') == '127.0.0.1'
83        ), 'xff replace 2'
84        assert (
85            self.get_xff(['8.8.8.8', '127.0.0.1, 10.0.1.1']) == '10.0.1.1'
86        ), 'xff replace multi'
87
88    def test_client_ip_ipv6(self):
89        self.client_ip({'header': 'X-Forwarded-For', 'source': '::1'})
90
91        assert self.get_xff('1.1.1.1') == '127.0.0.1', 'bad source ipv4'
92
93        for ip in [
94            'f607:7403:1e4b:6c66:33b2:843f:2517:da27',
95            '2001:db8:3c4d:15::1a2f:1a2b',
96            '2001::3c4d:15:1a2f:1a2b',
97            '::11.22.33.44',
98        ]:
99            assert self.get_xff(ip, 'ipv6') == ip, 'replace'
100
101    def test_client_ip_unix(self, temp_dir):
102        self.client_ip({'header': 'X-Forwarded-For', 'source': 'unix'})
103
104        assert self.get_xff('1.1.1.1') == '127.0.0.1', 'bad source ipv4'
105        assert self.get_xff('1.1.1.1', 'ipv6') == '::1', 'bad source ipv6'
106
107        for ip in [
108            '1.1.1.1',
109            '::11.22.33.44',
110        ]:
111            assert self.get_xff(ip, 'unix') == ip, 'replace'
112
113    def test_client_ip_recursive(self):
114        self.client_ip(
115            {
116                'header': 'X-Forwarded-For',
117                'recursive': True,
118                'source': ['127.0.0.1', '10.50.0.17', '10.5.2.1'],
119            }
120        )
121
122        assert self.get_xff('1.1.1.1') == '1.1.1.1', 'xff chain'
123        assert self.get_xff('1.1.1.1, 10.5.2.1') == '1.1.1.1', 'xff chain 2'
124        assert (
125            self.get_xff('8.8.8.8, 1.1.1.1, 10.5.2.1') == '1.1.1.1'
126        ), 'xff chain 3'
127        assert (
128            self.get_xff('10.50.0.17, 10.5.2.1, 10.5.2.1') == '10.50.0.17'
129        ), 'xff chain 4'
130        assert (
131            self.get_xff(['8.8.8.8', '1.1.1.1, 127.0.0.1']) == '1.1.1.1'
132        ), 'xff replace multi'
133        assert (
134            self.get_xff(['8.8.8.8', '1.1.1.1, 127.0.0.1', '10.5.2.1'])
135            == '1.1.1.1'
136        ), 'xff replace multi 2'
137        assert (
138            self.get_xff(['10.5.2.1', '10.50.0.17, 1.1.1.1', '10.5.2.1'])
139            == '1.1.1.1'
140        ), 'xff replace multi 3'
141        assert (
142            self.get_xff('8.8.8.8, 2001:db8:3c4d:15::1a2f:1a2b, 127.0.0.1')
143            == '2001:db8:3c4d:15::1a2f:1a2b'
144        ), 'xff chain ipv6'
145
146    def test_client_ip_case_insensitive(self):
147        self.client_ip({'header': 'x-forwarded-for', 'source': '127.0.0.1'})
148
149        assert self.get_xff('1.1.1.1') == '1.1.1.1', 'case insensitive'
150
151    def test_client_ip_empty_source(self):
152        self.client_ip({'header': 'X-Forwarded-For', 'source': []})
153
154        assert self.get_xff('1.1.1.1') == '127.0.0.1', 'empty source'
155
156    def test_client_ip_invalid(self):
157        assert 'error' in self.conf(
158            {
159                "127.0.0.1:7081": {
160                    "client_ip": {"source": '127.0.0.1'},
161                    "pass": "applications/client_ip",
162                }
163            },
164            'listeners',
165        ), 'invalid header'
166
167        def check_invalid_source(source):
168            assert 'error' in self.conf(
169                {
170                    "127.0.0.1:7081": {
171                        "client_ip": {
172                            "header": "X-Forwarded-For",
173                            "source": source,
174                        },
175                        "pass": "applications/client_ip",
176                    }
177                },
178                'listeners',
179            ), 'invalid source'
180
181        check_invalid_source(None)
182        check_invalid_source('a')
183        check_invalid_source(['a'])
184