1import pytest 2 3from unit.applications.lang.python import ApplicationPython 4from unit.option import option 5 6prerequisites = {'modules': {'python': 'any'}} 7 8client = ApplicationPython() 9 10 11@pytest.fixture(autouse=True) 12def setup_method_fixture(): 13 client.load('client_ip') 14 15 16def client_ip(options): 17 assert 'success' in client.conf( 18 { 19 "127.0.0.1:8081": { 20 "client_ip": options, 21 "pass": "applications/client_ip", 22 }, 23 "[::1]:8082": { 24 "client_ip": options, 25 "pass": "applications/client_ip", 26 }, 27 f"unix:{option.temp_dir}/sock": { 28 "client_ip": options, 29 "pass": "applications/client_ip", 30 }, 31 }, 32 'listeners', 33 ), 'listeners configure' 34 35 36def get_xff(xff, sock_type='ipv4'): 37 address = { 38 'ipv4': ('127.0.0.1', 8081), 39 'ipv6': ('::1', 8082), 40 'unix': (f'{option.temp_dir}/sock', None), 41 } 42 (addr, port) = address[sock_type] 43 44 return client.get( 45 sock_type=sock_type, 46 addr=addr, 47 port=port, 48 headers={'Connection': 'close', 'X-Forwarded-For': xff}, 49 )['body'] 50 51 52def test_client_ip_single_ip(): 53 client_ip({'header': 'X-Forwarded-For', 'source': '123.123.123.123'}) 54 55 assert client.get(port=8081)['body'] == '127.0.0.1', 'ipv4 default' 56 assert ( 57 client.get(sock_type='ipv6', port=8082)['body'] == '::1' 58 ), 'ipv6 default' 59 assert get_xff('1.1.1.1') == '127.0.0.1', 'bad source' 60 assert get_xff('blah') == '127.0.0.1', 'bad header' 61 assert get_xff('1.1.1.1', 'ipv6') == '::1', 'bad source ipv6' 62 63 client_ip({'header': 'X-Forwarded-For', 'source': '127.0.0.1'}) 64 65 assert client.get(port=8081)['body'] == '127.0.0.1', 'ipv4 default 2' 66 assert ( 67 client.get(sock_type='ipv6', port=8082)['body'] == '::1' 68 ), 'ipv6 default 2' 69 assert get_xff('1.1.1.1') == '1.1.1.1', 'replace' 70 assert get_xff('blah') == '127.0.0.1', 'bad header 2' 71 assert get_xff('1.1.1.1', 'ipv6') == '::1', 'bad source ipv6 2' 72 73 client_ip({'header': 'X-Forwarded-For', 'source': '!127.0.0.1'}) 74 75 assert get_xff('1.1.1.1') == '127.0.0.1', 'bad source 3' 76 assert get_xff('1.1.1.1', 'ipv6') == '1.1.1.1', 'replace 2' 77 78 79def test_client_ip_ipv4(): 80 client_ip({'header': 'X-Forwarded-For', 'source': '127.0.0.1'}) 81 82 assert get_xff('8.8.8.8, 84.23.23.11') == '84.23.23.11', 'xff replace' 83 assert ( 84 get_xff('8.8.8.8, 84.23.23.11, 127.0.0.1') == '127.0.0.1' 85 ), 'xff replace 2' 86 assert ( 87 get_xff(['8.8.8.8', '127.0.0.1, 10.0.1.1']) == '10.0.1.1' 88 ), 'xff replace multi' 89 90 91def test_client_ip_ipv6(): 92 client_ip({'header': 'X-Forwarded-For', 'source': '::1'}) 93 94 assert get_xff('1.1.1.1') == '127.0.0.1', 'bad source ipv4' 95 96 for ip in [ 97 'f607:7403:1e4b:6c66:33b2:843f:2517:da27', 98 '2001:db8:3c4d:15::1a2f:1a2b', 99 '2001::3c4d:15:1a2f:1a2b', 100 '::11.22.33.44', 101 ]: 102 assert get_xff(ip, 'ipv6') == ip, 'replace' 103 104 105def test_client_ip_unix(): 106 client_ip({'header': 'X-Forwarded-For', 'source': 'unix'}) 107 108 assert get_xff('1.1.1.1') == '127.0.0.1', 'bad source ipv4' 109 assert get_xff('1.1.1.1', 'ipv6') == '::1', 'bad source ipv6' 110 111 for ip in [ 112 '1.1.1.1', 113 '::11.22.33.44', 114 ]: 115 assert get_xff(ip, 'unix') == ip, 'replace' 116 117 118def test_client_ip_recursive(): 119 client_ip( 120 { 121 'header': 'X-Forwarded-For', 122 'recursive': True, 123 'source': ['127.0.0.1', '10.50.0.17', '10.5.2.1'], 124 } 125 ) 126 127 assert get_xff('1.1.1.1') == '1.1.1.1', 'xff chain' 128 assert get_xff('1.1.1.1, 10.5.2.1') == '1.1.1.1', 'xff chain 2' 129 assert get_xff('8.8.8.8, 1.1.1.1, 10.5.2.1') == '1.1.1.1', 'xff chain 3' 130 assert ( 131 get_xff('10.50.0.17, 10.5.2.1, 10.5.2.1') == '10.50.0.17' 132 ), 'xff chain 4' 133 assert ( 134 get_xff(['8.8.8.8', '1.1.1.1, 127.0.0.1']) == '1.1.1.1' 135 ), 'xff replace multi' 136 assert ( 137 get_xff(['8.8.8.8', '1.1.1.1, 127.0.0.1', '10.5.2.1']) == '1.1.1.1' 138 ), 'xff replace multi 2' 139 assert ( 140 get_xff(['10.5.2.1', '10.50.0.17, 1.1.1.1', '10.5.2.1']) == '1.1.1.1' 141 ), 'xff replace multi 3' 142 assert ( 143 get_xff('8.8.8.8, 2001:db8:3c4d:15::1a2f:1a2b, 127.0.0.1') 144 == '2001:db8:3c4d:15::1a2f:1a2b' 145 ), 'xff chain ipv6' 146 147 148def test_client_ip_case_insensitive(): 149 client_ip({'header': 'x-forwarded-for', 'source': '127.0.0.1'}) 150 151 assert get_xff('1.1.1.1') == '1.1.1.1', 'case insensitive' 152 153 154def test_client_ip_empty_source(): 155 client_ip({'header': 'X-Forwarded-For', 'source': []}) 156 157 assert get_xff('1.1.1.1') == '127.0.0.1', 'empty source' 158 159 160def test_client_ip_invalid(): 161 assert 'error' in client.conf( 162 { 163 "127.0.0.1:8081": { 164 "client_ip": {"source": '127.0.0.1'}, 165 "pass": "applications/client_ip", 166 } 167 }, 168 'listeners', 169 ), 'invalid header' 170 171 def check_invalid_source(source): 172 assert 'error' in client.conf( 173 { 174 "127.0.0.1:8081": { 175 "client_ip": { 176 "header": "X-Forwarded-For", 177 "source": source, 178 }, 179 "pass": "applications/client_ip", 180 } 181 }, 182 'listeners', 183 ), 'invalid source' 184 185 check_invalid_source(None) 186 check_invalid_source('a') 187 check_invalid_source(['a']) 188