1import pytest 2 3from unit.applications.lang.python import ApplicationPython 4 5prerequisites = {'modules': {'python': 'any'}} 6 7client = ApplicationPython() 8 9 10@pytest.fixture(autouse=True) 11def setup_method_fixture(): 12 client.load('forwarded_header') 13 14 15def forwarded_header(forwarded): 16 assert 'success' in client.conf( 17 { 18 "127.0.0.1:8081": { 19 "forwarded": forwarded, 20 "pass": "applications/forwarded_header", 21 }, 22 "[::1]:8082": { 23 "forwarded": forwarded, 24 "pass": "applications/forwarded_header", 25 }, 26 }, 27 'listeners', 28 ), 'listeners configure' 29 30 31def get_fwd(sock_type='ipv4', xff=None, xfp=None): 32 port = 8081 if sock_type == 'ipv4' else 8082 33 34 headers = {'Connection': 'close'} 35 36 if xff is not None: 37 headers['X-Forwarded-For'] = xff 38 39 if xfp is not None: 40 headers['X-Forwarded-Proto'] = xfp 41 42 return client.get(sock_type=sock_type, port=port, headers=headers)[ 43 'headers' 44 ] 45 46 47def get_addr(*args, **kwargs): 48 return get_fwd(*args, **kwargs)['Remote-Addr'] 49 50 51def get_scheme(*args, **kwargs): 52 return get_fwd(*args, **kwargs)['Url-Scheme'] 53 54 55def test_forwarded_header_single_ip(): 56 forwarded_header( 57 { 58 'client_ip': 'X-Forwarded-For', 59 'protocol': 'X-Forwarded-Proto', 60 'source': '123.123.123.123', 61 } 62 ) 63 64 resp = get_fwd(xff='1.1.1.1', xfp='https') 65 assert resp['Remote-Addr'] == '127.0.0.1', 'both headers addr' 66 assert resp['Url-Scheme'] == 'http', 'both headers proto' 67 68 assert get_addr() == '127.0.0.1', 'ipv4 default addr' 69 assert get_addr('ipv6') == '::1', 'ipv6 default addr' 70 assert get_addr(xff='1.1.1.1') == '127.0.0.1', 'bad source' 71 assert get_addr(xff='blah') == '127.0.0.1', 'bad xff' 72 assert get_addr('ipv6', '1.1.1.1') == '::1', 'bad source ipv6' 73 74 assert get_scheme() == 'http', 'ipv4 default proto' 75 assert get_scheme('ipv6') == 'http', 'ipv6 default proto' 76 assert get_scheme(xfp='https') == 'http', 'bad proto' 77 assert get_scheme(xfp='blah') == 'http', 'bad xfp' 78 assert get_scheme('ipv6', xfp='https') == 'http', 'bad proto ipv6' 79 80 forwarded_header( 81 { 82 'client_ip': 'X-Forwarded-For', 83 'protocol': 'X-Forwarded-Proto', 84 'source': '127.0.0.1', 85 } 86 ) 87 88 resp = get_fwd(xff='1.1.1.1', xfp='https') 89 assert resp['Remote-Addr'] == '1.1.1.1', 'both headers addr 2' 90 assert resp['Url-Scheme'] == 'https', 'both headers proto 2' 91 92 assert get_addr() == '127.0.0.1', 'ipv4 default addr 2' 93 assert get_addr('ipv6') == '::1', 'ipv6 default addr 2' 94 assert get_addr(xff='1.1.1.1') == '1.1.1.1', 'xff replace' 95 assert get_addr('ipv6', '1.1.1.1') == '::1', 'bad source ipv6 2' 96 97 assert get_scheme() == 'http', 'ipv4 default proto 2' 98 assert get_scheme('ipv6') == 'http', 'ipv6 default proto 2' 99 assert get_scheme(xfp='https') == 'https', 'xfp replace' 100 assert get_scheme(xfp='on') == 'https', 'xfp replace 2' 101 assert get_scheme('ipv6', xfp='https') == 'http', 'bad proto ipv6 2' 102 103 forwarded_header( 104 { 105 'client_ip': 'X-Forwarded-For', 106 'protocol': 'X-Forwarded-Proto', 107 'source': '!127.0.0.1', 108 } 109 ) 110 111 assert get_addr(xff='1.1.1.1') == '127.0.0.1', 'bad source 3' 112 assert get_addr('ipv6', '1.1.1.1') == '1.1.1.1', 'xff replace 2' 113 assert get_scheme(xfp='https') == 'http', 'bad proto 2' 114 assert get_scheme('ipv6', xfp='https') == 'https', 'xfp replace 3' 115 116 117def test_forwarded_header_ipv4(): 118 forwarded_header( 119 { 120 'client_ip': 'X-Forwarded-For', 121 'protocol': 'X-Forwarded-Proto', 122 'source': '127.0.0.1', 123 } 124 ) 125 126 assert get_addr(xff='8.8.8.8, 84.23.23.11') == '84.23.23.11', 'xff replace' 127 assert ( 128 get_addr(xff='8.8.8.8, 84.23.23.11, 127.0.0.1') == '127.0.0.1' 129 ), 'xff replace 2' 130 assert ( 131 get_addr(xff=['8.8.8.8', '127.0.0.1, 10.0.1.1']) == '10.0.1.1' 132 ), 'xff replace multi' 133 134 assert get_scheme(xfp='http, https') == 'http', 'xfp replace' 135 assert get_scheme(xfp='http, https, http') == 'http', 'xfp replace 2' 136 assert ( 137 get_scheme(xfp=['http, https', 'http', 'https']) == 'http' 138 ), 'xfp replace multi' 139 140 141def test_forwarded_header_ipv6(): 142 forwarded_header( 143 { 144 'client_ip': 'X-Forwarded-For', 145 'protocol': 'X-Forwarded-Proto', 146 'source': '::1', 147 } 148 ) 149 150 assert get_addr(xff='1.1.1.1') == '127.0.0.1', 'bad source ipv4' 151 152 for ip in [ 153 'f607:7403:1e4b:6c66:33b2:843f:2517:da27', 154 '2001:db8:3c4d:15::1a2f:1a2b', 155 '2001::3c4d:15:1a2f:1a2b', 156 '::11.22.33.44', 157 ]: 158 assert get_addr('ipv6', ip) == ip, 'replace' 159 160 assert get_scheme(xfp='https') == 'http', 'bad source ipv4' 161 162 for proto in ['http', 'https']: 163 assert get_scheme('ipv6', xfp=proto) == proto, 'replace' 164 165 166def test_forwarded_header_recursive(): 167 forwarded_header( 168 { 169 'client_ip': 'X-Forwarded-For', 170 'recursive': True, 171 'source': ['127.0.0.1', '10.50.0.17', '10.5.2.1'], 172 } 173 ) 174 175 assert get_addr(xff='1.1.1.1') == '1.1.1.1', 'xff chain' 176 assert get_addr(xff='1.1.1.1, 10.5.2.1') == '1.1.1.1', 'xff chain 2' 177 assert ( 178 get_addr(xff='8.8.8.8, 1.1.1.1, 10.5.2.1') == '1.1.1.1' 179 ), 'xff chain 3' 180 assert ( 181 get_addr(xff='10.50.0.17, 10.5.2.1, 10.5.2.1') == '10.50.0.17' 182 ), 'xff chain 4' 183 assert ( 184 get_addr(xff=['8.8.8.8', '1.1.1.1, 127.0.0.1']) == '1.1.1.1' 185 ), 'xff replace multi' 186 assert ( 187 get_addr(xff=['8.8.8.8', '1.1.1.1, 127.0.0.1', '10.5.2.1']) == '1.1.1.1' 188 ), 'xff replace multi 2' 189 assert ( 190 get_addr(xff=['10.5.2.1', '10.50.0.17, 1.1.1.1', '10.5.2.1']) 191 == '1.1.1.1' 192 ), 'xff replace multi 3' 193 assert ( 194 get_addr(xff='8.8.8.8, 2001:db8:3c4d:15::1a2f:1a2b, 127.0.0.1') 195 == '2001:db8:3c4d:15::1a2f:1a2b' 196 ), 'xff chain ipv6' 197 198 199def test_forwarded_header_case_insensitive(): 200 forwarded_header( 201 { 202 'client_ip': 'x-forwarded-for', 203 'protocol': 'x-forwarded-proto', 204 'source': '127.0.0.1', 205 } 206 ) 207 208 assert get_addr() == '127.0.0.1', 'ipv4 default addr' 209 assert get_addr('ipv6') == '::1', 'ipv6 default addr' 210 assert get_addr(xff='1.1.1.1') == '1.1.1.1', 'replace' 211 212 assert get_scheme() == 'http', 'ipv4 default proto' 213 assert get_scheme('ipv6') == 'http', 'ipv6 default proto' 214 assert get_scheme(xfp='https') == 'https', 'replace 1' 215 assert get_scheme(xfp='oN') == 'https', 'replace 2' 216 217 218def test_forwarded_header_source_empty(): 219 forwarded_header( 220 { 221 'client_ip': 'X-Forwarded-For', 222 'protocol': 'X-Forwarded-Proto', 223 'source': [], 224 } 225 ) 226 227 assert get_addr(xff='1.1.1.1') == '127.0.0.1', 'empty source xff' 228 assert get_scheme(xfp='https') == 'http', 'empty source xfp' 229 230 231def test_forwarded_header_source_range(): 232 forwarded_header( 233 { 234 'client_ip': 'X-Forwarded-For', 235 'protocol': 'X-Forwarded-Proto', 236 'source': '127.0.0.0-127.0.0.1', 237 } 238 ) 239 240 assert get_addr(xff='1.1.1.1') == '1.1.1.1', 'source range' 241 assert get_addr('ipv6', '1.1.1.1') == '::1', 'source range 2' 242 243 244def test_forwarded_header_invalid(): 245 assert 'error' in client.conf( 246 { 247 "127.0.0.1:8081": { 248 "forwarded": {"source": '127.0.0.1'}, 249 "pass": "applications/forwarded_header", 250 } 251 }, 252 'listeners', 253 ), 'invalid forward' 254 255 def check_invalid_source(source): 256 assert 'error' in client.conf( 257 { 258 "127.0.0.1:8081": { 259 "forwarded": { 260 "client_ip": "X-Forwarded-For", 261 "source": source, 262 }, 263 "pass": "applications/forwarded_header", 264 } 265 }, 266 'listeners', 267 ), 'invalid source' 268 269 check_invalid_source(None) 270 check_invalid_source('a') 271 check_invalid_source(['a']) 272