1from unit.applications.lang.python import TestApplicationPython 2from unit.option import option 3 4 5class TestClientIP(TestApplicationPython): 6 prerequisites = {'modules': {'python': 'any'}} 7 8 def client_ip(self, options): 9 assert 'success' in self.conf( 10 { 11 "127.0.0.1:7081": { 12 "client_ip": options, 13 "pass": "applications/client_ip", 14 }, 15 "[::1]:7082": { 16 "client_ip": options, 17 "pass": "applications/client_ip", 18 }, 19 "unix:" 20 + option.temp_dir 21 + "/sock": { 22 "client_ip": options, 23 "pass": "applications/client_ip", 24 }, 25 }, 26 'listeners', 27 ), 'listeners configure' 28 29 def get_xff(self, xff, sock_type='ipv4'): 30 address = { 31 'ipv4': ('127.0.0.1', 7081), 32 'ipv6': ('::1', 7082), 33 'unix': (option.temp_dir + '/sock', None), 34 } 35 (addr, port) = address[sock_type] 36 37 return self.get( 38 sock_type=sock_type, 39 addr=addr, 40 port=port, 41 headers={'Connection': 'close', 'X-Forwarded-For': xff}, 42 )['body'] 43 44 def setup_method(self): 45 self.load('client_ip') 46 47 def test_client_ip_single_ip(self): 48 self.client_ip( 49 {'header': 'X-Forwarded-For', 'source': '123.123.123.123'} 50 ) 51 52 assert self.get(port=7081)['body'] == '127.0.0.1', 'ipv4 default' 53 assert ( 54 self.get(sock_type='ipv6', port=7082)['body'] == '::1' 55 ), 'ipv6 default' 56 assert self.get_xff('1.1.1.1') == '127.0.0.1', 'bad source' 57 assert self.get_xff('blah') == '127.0.0.1', 'bad header' 58 assert self.get_xff('1.1.1.1', 'ipv6') == '::1', 'bad source ipv6' 59 60 self.client_ip({'header': 'X-Forwarded-For', 'source': '127.0.0.1'}) 61 62 assert self.get(port=7081)['body'] == '127.0.0.1', 'ipv4 default 2' 63 assert ( 64 self.get(sock_type='ipv6', port=7082)['body'] == '::1' 65 ), 'ipv6 default 2' 66 assert self.get_xff('1.1.1.1') == '1.1.1.1', 'replace' 67 assert self.get_xff('blah') == '127.0.0.1', 'bad header 2' 68 assert self.get_xff('1.1.1.1', 'ipv6') == '::1', 'bad source ipv6 2' 69 70 self.client_ip({'header': 'X-Forwarded-For', 'source': '!127.0.0.1'}) 71 72 assert self.get_xff('1.1.1.1') == '127.0.0.1', 'bad source 3' 73 assert self.get_xff('1.1.1.1', 'ipv6') == '1.1.1.1', 'replace 2' 74 75 def test_client_ip_ipv4(self): 76 self.client_ip({'header': 'X-Forwarded-For', 'source': '127.0.0.1'}) 77 78 assert ( 79 self.get_xff('8.8.8.8, 84.23.23.11') == '84.23.23.11' 80 ), 'xff replace' 81 assert ( 82 self.get_xff('8.8.8.8, 84.23.23.11, 127.0.0.1') == '127.0.0.1' 83 ), 'xff replace 2' 84 assert ( 85 self.get_xff(['8.8.8.8', '127.0.0.1, 10.0.1.1']) == '10.0.1.1' 86 ), 'xff replace multi' 87 88 def test_client_ip_ipv6(self): 89 self.client_ip({'header': 'X-Forwarded-For', 'source': '::1'}) 90 91 assert self.get_xff('1.1.1.1') == '127.0.0.1', 'bad source ipv4' 92 93 for ip in [ 94 'f607:7403:1e4b:6c66:33b2:843f:2517:da27', 95 '2001:db8:3c4d:15::1a2f:1a2b', 96 '2001::3c4d:15:1a2f:1a2b', 97 '::11.22.33.44', 98 ]: 99 assert self.get_xff(ip, 'ipv6') == ip, 'replace' 100 101 def test_client_ip_unix(self, temp_dir): 102 self.client_ip({'header': 'X-Forwarded-For', 'source': 'unix'}) 103 104 assert self.get_xff('1.1.1.1') == '127.0.0.1', 'bad source ipv4' 105 assert self.get_xff('1.1.1.1', 'ipv6') == '::1', 'bad source ipv6' 106 107 for ip in [ 108 '1.1.1.1', 109 '::11.22.33.44', 110 ]: 111 assert self.get_xff(ip, 'unix') == ip, 'replace' 112 113 def test_client_ip_recursive(self): 114 self.client_ip( 115 { 116 'header': 'X-Forwarded-For', 117 'recursive': True, 118 'source': ['127.0.0.1', '10.50.0.17', '10.5.2.1'], 119 } 120 ) 121 122 assert self.get_xff('1.1.1.1') == '1.1.1.1', 'xff chain' 123 assert self.get_xff('1.1.1.1, 10.5.2.1') == '1.1.1.1', 'xff chain 2' 124 assert ( 125 self.get_xff('8.8.8.8, 1.1.1.1, 10.5.2.1') == '1.1.1.1' 126 ), 'xff chain 3' 127 assert ( 128 self.get_xff('10.50.0.17, 10.5.2.1, 10.5.2.1') == '10.50.0.17' 129 ), 'xff chain 4' 130 assert ( 131 self.get_xff(['8.8.8.8', '1.1.1.1, 127.0.0.1']) == '1.1.1.1' 132 ), 'xff replace multi' 133 assert ( 134 self.get_xff(['8.8.8.8', '1.1.1.1, 127.0.0.1', '10.5.2.1']) 135 == '1.1.1.1' 136 ), 'xff replace multi 2' 137 assert ( 138 self.get_xff(['10.5.2.1', '10.50.0.17, 1.1.1.1', '10.5.2.1']) 139 == '1.1.1.1' 140 ), 'xff replace multi 3' 141 assert ( 142 self.get_xff('8.8.8.8, 2001:db8:3c4d:15::1a2f:1a2b, 127.0.0.1') 143 == '2001:db8:3c4d:15::1a2f:1a2b' 144 ), 'xff chain ipv6' 145 146 def test_client_ip_case_insensitive(self): 147 self.client_ip({'header': 'x-forwarded-for', 'source': '127.0.0.1'}) 148 149 assert self.get_xff('1.1.1.1') == '1.1.1.1', 'case insensitive' 150 151 def test_client_ip_empty_source(self): 152 self.client_ip({'header': 'X-Forwarded-For', 'source': []}) 153 154 assert self.get_xff('1.1.1.1') == '127.0.0.1', 'empty source' 155 156 def test_client_ip_invalid(self): 157 assert 'error' in self.conf( 158 { 159 "127.0.0.1:7081": { 160 "client_ip": {"source": '127.0.0.1'}, 161 "pass": "applications/client_ip", 162 } 163 }, 164 'listeners', 165 ), 'invalid header' 166 167 def check_invalid_source(source): 168 assert 'error' in self.conf( 169 { 170 "127.0.0.1:7081": { 171 "client_ip": { 172 "header": "X-Forwarded-For", 173 "source": source, 174 }, 175 "pass": "applications/client_ip", 176 } 177 }, 178 'listeners', 179 ), 'invalid source' 180 181 check_invalid_source(None) 182 check_invalid_source('a') 183 check_invalid_source(['a']) 184