xref: /unit/test/test_client_ip.py (revision 2130:638b03fe73f0)
1from unit.applications.lang.python import TestApplicationPython
2
3
4class TestClientIP(TestApplicationPython):
5    prerequisites = {'modules': {'python': 'any'}}
6
7    def client_ip(self, options):
8        assert 'success' in self.conf(
9            {
10                "127.0.0.1:7081": {
11                    "client_ip": options,
12                    "pass": "applications/client_ip",
13                },
14                "[::1]:7082": {
15                    "client_ip": options,
16                    "pass": "applications/client_ip",
17                },
18            },
19            'listeners',
20        ), 'listeners configure'
21
22    def get_xff(self, xff, sock_type='ipv4'):
23        port = 7081 if sock_type == 'ipv4' else 7082
24
25        return self.get(
26            sock_type=sock_type,
27            port=port,
28            headers={'Connection': 'close', 'X-Forwarded-For': xff},
29        )['body']
30
31    def setup_method(self):
32        self.load('client_ip')
33
34    def test_client_ip_single_ip(self):
35        self.client_ip(
36            {'header': 'X-Forwarded-For', 'source': '123.123.123.123'}
37        )
38
39        assert self.get(port=7081)['body'] == '127.0.0.1', 'ipv4 default'
40        assert (
41            self.get(sock_type='ipv6', port=7082)['body'] == '::1'
42        ), 'ipv6 default'
43        assert self.get_xff('1.1.1.1') == '127.0.0.1', 'bad source'
44        assert self.get_xff('blah') == '127.0.0.1', 'bad header'
45        assert self.get_xff('1.1.1.1', 'ipv6') == '::1', 'bad source ipv6'
46
47        self.client_ip({'header': 'X-Forwarded-For', 'source': '127.0.0.1'})
48
49        assert self.get(port=7081)['body'] == '127.0.0.1', 'ipv4 default 2'
50        assert (
51            self.get(sock_type='ipv6', port=7082)['body'] == '::1'
52        ), 'ipv6 default 2'
53        assert self.get_xff('1.1.1.1') == '1.1.1.1', 'replace'
54        assert self.get_xff('blah') == '127.0.0.1', 'bad header 2'
55        assert self.get_xff('1.1.1.1', 'ipv6') == '::1', 'bad source ipv6 2'
56
57        self.client_ip({'header': 'X-Forwarded-For', 'source': '!127.0.0.1'})
58
59        assert self.get_xff('1.1.1.1') == '127.0.0.1', 'bad source 3'
60        assert self.get_xff('1.1.1.1', 'ipv6') == '1.1.1.1', 'replace 2'
61
62    def test_client_ip_ipv4(self):
63        self.client_ip({'header': 'X-Forwarded-For', 'source': '127.0.0.1'})
64
65        assert (
66            self.get_xff('8.8.8.8, 84.23.23.11') == '84.23.23.11'
67        ), 'xff replace'
68        assert (
69            self.get_xff('8.8.8.8, 84.23.23.11, 127.0.0.1') == '127.0.0.1'
70        ), 'xff replace 2'
71        assert (
72            self.get_xff(['8.8.8.8', '127.0.0.1, 10.0.1.1']) == '10.0.1.1'
73        ), 'xff replace multi'
74
75    def test_client_ip_ipv6(self):
76        self.client_ip({'header': 'X-Forwarded-For', 'source': '::1'})
77
78        assert self.get_xff('1.1.1.1') == '127.0.0.1', 'bad source ipv4'
79
80        for ip in [
81            'f607:7403:1e4b:6c66:33b2:843f:2517:da27',
82            '2001:db8:3c4d:15::1a2f:1a2b',
83            '2001::3c4d:15:1a2f:1a2b',
84            '::11.22.33.44',
85        ]:
86            assert self.get_xff(ip, 'ipv6') == ip, 'replace'
87
88    def test_client_ip_recursive(self):
89        self.client_ip(
90            {
91                'header': 'X-Forwarded-For',
92                'recursive': True,
93                'source': ['127.0.0.1', '10.50.0.17', '10.5.2.1'],
94            }
95        )
96
97        assert self.get_xff('1.1.1.1') == '1.1.1.1', 'xff chain'
98        assert self.get_xff('1.1.1.1, 10.5.2.1') == '1.1.1.1', 'xff chain 2'
99        assert (
100            self.get_xff('8.8.8.8, 1.1.1.1, 10.5.2.1') == '1.1.1.1'
101        ), 'xff chain 3'
102        assert (
103            self.get_xff('10.50.0.17, 10.5.2.1, 10.5.2.1') == '10.50.0.17'
104        ), 'xff chain 4'
105        assert (
106            self.get_xff(['8.8.8.8', '1.1.1.1, 127.0.0.1']) == '1.1.1.1'
107        ), 'xff replace multi'
108        assert (
109            self.get_xff(['8.8.8.8', '1.1.1.1, 127.0.0.1', '10.5.2.1'])
110            == '1.1.1.1'
111        ), 'xff replace multi 2'
112        assert (
113            self.get_xff(['10.5.2.1', '10.50.0.17, 1.1.1.1', '10.5.2.1'])
114            == '1.1.1.1'
115        ), 'xff replace multi 3'
116        assert (
117            self.get_xff('8.8.8.8, 2001:db8:3c4d:15::1a2f:1a2b, 127.0.0.1')
118            == '2001:db8:3c4d:15::1a2f:1a2b'
119        ), 'xff chain ipv6'
120
121    def test_client_ip_case_insensitive(self):
122        self.client_ip({'header': 'x-forwarded-for', 'source': '127.0.0.1'})
123
124        assert self.get_xff('1.1.1.1') == '1.1.1.1', 'case insensitive'
125
126    def test_client_ip_empty_source(self):
127        self.client_ip({'header': 'X-Forwarded-For', 'source': []})
128
129        assert self.get_xff('1.1.1.1') == '127.0.0.1', 'empty source'
130
131    def test_client_ip_invalid(self):
132        assert 'error' in self.conf(
133            {
134                "127.0.0.1:7081": {
135                    "client_ip": {"source": '127.0.0.1'},
136                    "pass": "applications/client_ip",
137                }
138            },
139            'listeners',
140        ), 'invalid header'
141
142        def check_invalid_source(source):
143            assert 'error' in self.conf(
144                {
145                    "127.0.0.1:7081": {
146                        "client_ip": {
147                            "header": "X-Forwarded-For",
148                            "source": source,
149                        },
150                        "pass": "applications/client_ip",
151                    }
152                },
153                'listeners',
154            ), 'invalid source'
155
156        check_invalid_source(None)
157        check_invalid_source('a')
158        check_invalid_source(['a'])
159