xref: /unit/test/test_client_ip.py (revision 2616:ab2896c980ab)
1import pytest
2
3from unit.applications.lang.python import ApplicationPython
4from unit.option import option
5
6prerequisites = {'modules': {'python': 'any'}}
7
8client = ApplicationPython()
9
10
11@pytest.fixture(autouse=True)
12def setup_method_fixture():
13    client.load('client_ip')
14
15
16def client_ip(options):
17    assert 'success' in client.conf(
18        {
19            "127.0.0.1:8081": {
20                "client_ip": options,
21                "pass": "applications/client_ip",
22            },
23            "[::1]:8082": {
24                "client_ip": options,
25                "pass": "applications/client_ip",
26            },
27            f"unix:{option.temp_dir}/sock": {
28                "client_ip": options,
29                "pass": "applications/client_ip",
30            },
31        },
32        'listeners',
33    ), 'listeners configure'
34
35
36def get_xff(xff, sock_type='ipv4'):
37    address = {
38        'ipv4': ('127.0.0.1', 8081),
39        'ipv6': ('::1', 8082),
40        'unix': (f'{option.temp_dir}/sock', None),
41    }
42    (addr, port) = address[sock_type]
43
44    return client.get(
45        sock_type=sock_type,
46        addr=addr,
47        port=port,
48        headers={'Connection': 'close', 'X-Forwarded-For': xff},
49    )['body']
50
51
52def test_client_ip_single_ip():
53    client_ip({'header': 'X-Forwarded-For', 'source': '123.123.123.123'})
54
55    assert client.get(port=8081)['body'] == '127.0.0.1', 'ipv4 default'
56    assert (
57        client.get(sock_type='ipv6', port=8082)['body'] == '::1'
58    ), 'ipv6 default'
59    assert get_xff('1.1.1.1') == '127.0.0.1', 'bad source'
60    assert get_xff('blah') == '127.0.0.1', 'bad header'
61    assert get_xff('1.1.1.1', 'ipv6') == '::1', 'bad source ipv6'
62
63    client_ip({'header': 'X-Forwarded-For', 'source': '127.0.0.1'})
64
65    assert client.get(port=8081)['body'] == '127.0.0.1', 'ipv4 default 2'
66    assert (
67        client.get(sock_type='ipv6', port=8082)['body'] == '::1'
68    ), 'ipv6 default 2'
69    assert get_xff('1.1.1.1') == '1.1.1.1', 'replace'
70    assert get_xff('blah') == '127.0.0.1', 'bad header 2'
71    assert get_xff('1.1.1.1', 'ipv6') == '::1', 'bad source ipv6 2'
72
73    client_ip({'header': 'X-Forwarded-For', 'source': '!127.0.0.1'})
74
75    assert get_xff('1.1.1.1') == '127.0.0.1', 'bad source 3'
76    assert get_xff('1.1.1.1', 'ipv6') == '1.1.1.1', 'replace 2'
77
78
79def test_client_ip_ipv4():
80    client_ip({'header': 'X-Forwarded-For', 'source': '127.0.0.1'})
81
82    assert get_xff('8.8.8.8, 84.23.23.11') == '84.23.23.11', 'xff replace'
83    assert (
84        get_xff('8.8.8.8, 84.23.23.11, 127.0.0.1') == '127.0.0.1'
85    ), 'xff replace 2'
86    assert (
87        get_xff(['8.8.8.8', '127.0.0.1, 10.0.1.1']) == '10.0.1.1'
88    ), 'xff replace multi'
89
90
91def test_client_ip_ipv6():
92    client_ip({'header': 'X-Forwarded-For', 'source': '::1'})
93
94    assert get_xff('1.1.1.1') == '127.0.0.1', 'bad source ipv4'
95
96    for ip in [
97        'f607:7403:1e4b:6c66:33b2:843f:2517:da27',
98        '2001:db8:3c4d:15::1a2f:1a2b',
99        '2001::3c4d:15:1a2f:1a2b',
100        '::11.22.33.44',
101    ]:
102        assert get_xff(ip, 'ipv6') == ip, 'replace'
103
104
105def test_client_ip_unix():
106    client_ip({'header': 'X-Forwarded-For', 'source': 'unix'})
107
108    assert get_xff('1.1.1.1') == '127.0.0.1', 'bad source ipv4'
109    assert get_xff('1.1.1.1', 'ipv6') == '::1', 'bad source ipv6'
110
111    for ip in [
112        '1.1.1.1',
113        '::11.22.33.44',
114    ]:
115        assert get_xff(ip, 'unix') == ip, 'replace'
116
117
118def test_client_ip_recursive():
119    client_ip(
120        {
121            'header': 'X-Forwarded-For',
122            'recursive': True,
123            'source': ['127.0.0.1', '10.50.0.17', '10.5.2.1'],
124        }
125    )
126
127    assert get_xff('1.1.1.1') == '1.1.1.1', 'xff chain'
128    assert get_xff('1.1.1.1, 10.5.2.1') == '1.1.1.1', 'xff chain 2'
129    assert get_xff('8.8.8.8, 1.1.1.1, 10.5.2.1') == '1.1.1.1', 'xff chain 3'
130    assert (
131        get_xff('10.50.0.17, 10.5.2.1, 10.5.2.1') == '10.50.0.17'
132    ), 'xff chain 4'
133    assert (
134        get_xff(['8.8.8.8', '1.1.1.1, 127.0.0.1']) == '1.1.1.1'
135    ), 'xff replace multi'
136    assert (
137        get_xff(['8.8.8.8', '1.1.1.1, 127.0.0.1', '10.5.2.1']) == '1.1.1.1'
138    ), 'xff replace multi 2'
139    assert (
140        get_xff(['10.5.2.1', '10.50.0.17, 1.1.1.1', '10.5.2.1']) == '1.1.1.1'
141    ), 'xff replace multi 3'
142    assert (
143        get_xff('8.8.8.8, 2001:db8:3c4d:15::1a2f:1a2b, 127.0.0.1')
144        == '2001:db8:3c4d:15::1a2f:1a2b'
145    ), 'xff chain ipv6'
146
147
148def test_client_ip_case_insensitive():
149    client_ip({'header': 'x-forwarded-for', 'source': '127.0.0.1'})
150
151    assert get_xff('1.1.1.1') == '1.1.1.1', 'case insensitive'
152
153
154def test_client_ip_empty_source():
155    client_ip({'header': 'X-Forwarded-For', 'source': []})
156
157    assert get_xff('1.1.1.1') == '127.0.0.1', 'empty source'
158
159
160def test_client_ip_invalid():
161    assert 'error' in client.conf(
162        {
163            "127.0.0.1:8081": {
164                "client_ip": {"source": '127.0.0.1'},
165                "pass": "applications/client_ip",
166            }
167        },
168        'listeners',
169    ), 'invalid header'
170
171    def check_invalid_source(source):
172        assert 'error' in client.conf(
173            {
174                "127.0.0.1:8081": {
175                    "client_ip": {
176                        "header": "X-Forwarded-For",
177                        "source": source,
178                    },
179                    "pass": "applications/client_ip",
180                }
181            },
182            'listeners',
183        ), 'invalid source'
184
185    check_invalid_source(None)
186    check_invalid_source('a')
187    check_invalid_source(['a'])
188