1import pytest 2from unit.applications.lang.python import TestApplicationPython 3 4 5class TestClientIP(TestApplicationPython): 6 prerequisites = {'modules': {'python': 'any'}} 7 8 def client_ip(self, options): 9 assert 'success' in self.conf( 10 { 11 "127.0.0.1:7081": 12 {"client_ip": options, "pass": "applications/client_ip"}, 13 "[::1]:7082": 14 {"client_ip": options, "pass": "applications/client_ip"}, 15 }, 16 'listeners', 17 ), 'listeners configure' 18 19 def get_xff(self, xff, sock_type='ipv4'): 20 port = 7081 if sock_type == 'ipv4' else 7082 21 22 return self.get( 23 sock_type=sock_type, 24 port=port, 25 headers={'Connection': 'close', 'X-Forwarded-For': xff}, 26 )['body'] 27 28 def setup_method(self): 29 self.load('client_ip') 30 31 def test_settings_client_ip_single_ip(self): 32 self.client_ip( 33 {'header': 'X-Forwarded-For', 'source': '123.123.123.123'} 34 ) 35 36 assert self.get(port=7081)['body'] == '127.0.0.1', 'ipv4 default' 37 assert ( 38 self.get(sock_type='ipv6', port=7082)['body'] == '::1' 39 ), 'ipv6 default' 40 assert self.get_xff('1.1.1.1') == '127.0.0.1', 'bad source' 41 assert self.get_xff('blah') == '127.0.0.1', 'bad header' 42 assert self.get_xff('1.1.1.1', 'ipv6') == '::1', 'bad source ipv6' 43 44 self.client_ip({'header': 'X-Forwarded-For', 'source': '127.0.0.1'}) 45 46 assert self.get(port=7081)['body'] == '127.0.0.1', 'ipv4 default 2' 47 assert ( 48 self.get(sock_type='ipv6', port=7082)['body'] == '::1' 49 ), 'ipv6 default 2' 50 assert self.get_xff('1.1.1.1') == '1.1.1.1', 'replace' 51 assert self.get_xff('blah') == '127.0.0.1', 'bad header 2' 52 assert ( 53 self.get_xff('1.1.1.1', 'ipv6') == '::1' 54 ), 'bad source ipv6 2' 55 56 self.client_ip({'header': 'X-Forwarded-For', 'source': '!127.0.0.1'}) 57 58 assert self.get_xff('1.1.1.1') == '127.0.0.1', 'bad source 3' 59 assert self.get_xff('1.1.1.1', 'ipv6') == '1.1.1.1', 'replace 2' 60 61 def test_settings_client_ip_ipv4(self): 62 self.client_ip({'header': 'X-Forwarded-For', 'source': '127.0.0.1'}) 63 64 assert ( 65 self.get_xff('8.8.8.8, 84.23.23.11') == '84.23.23.11' 66 ), 'xff replace' 67 assert ( 68 self.get_xff('8.8.8.8, 84.23.23.11, 127.0.0.1') == '127.0.0.1' 69 ), 'xff replace 2' 70 assert ( 71 self.get_xff(['8.8.8.8', '127.0.0.1, 10.0.1.1']) == '10.0.1.1' 72 ), 'xff replace multi' 73 74 def test_settings_client_ip_ipv6(self): 75 self.client_ip({'header': 'X-Forwarded-For', 'source': '::1'}) 76 77 assert self.get_xff('1.1.1.1') == '127.0.0.1', 'bad source ipv4' 78 79 for ip in [ 80 'f607:7403:1e4b:6c66:33b2:843f:2517:da27', 81 '2001:db8:3c4d:15::1a2f:1a2b', 82 '2001::3c4d:15:1a2f:1a2b', 83 '::11.22.33.44', 84 ]: 85 assert self.get_xff(ip, 'ipv6') == ip, 'replace' 86 87 def test_settings_client_ip_recursive(self): 88 self.client_ip( 89 { 90 'header': 'X-Forwarded-For', 91 'recursive': True, 92 'source': ['127.0.0.1', '10.50.0.17', '10.5.2.1'], 93 } 94 ) 95 96 assert self.get_xff('1.1.1.1') == '1.1.1.1', 'xff chain' 97 assert self.get_xff('1.1.1.1, 10.5.2.1') == '1.1.1.1', 'xff chain 2' 98 assert ( 99 self.get_xff('8.8.8.8, 1.1.1.1, 10.5.2.1') == '1.1.1.1' 100 ), 'xff chain 3' 101 assert ( 102 self.get_xff('10.50.0.17, 10.5.2.1, 10.5.2.1') == '10.50.0.17' 103 ), 'xff chain 4' 104 assert ( 105 self.get_xff(['8.8.8.8', '1.1.1.1, 127.0.0.1']) == '1.1.1.1' 106 ), 'xff replace multi' 107 assert ( 108 self.get_xff(['8.8.8.8', '1.1.1.1, 127.0.0.1', '10.5.2.1']) 109 == '1.1.1.1' 110 ), 'xff replace multi 2' 111 assert ( 112 self.get_xff(['10.5.2.1', '10.50.0.17, 1.1.1.1', '10.5.2.1']) 113 == '1.1.1.1' 114 ), 'xff replace multi 3' 115 assert ( 116 self.get_xff('8.8.8.8, 2001:db8:3c4d:15::1a2f:1a2b, 127.0.0.1') 117 == '2001:db8:3c4d:15::1a2f:1a2b' 118 ), 'xff chain ipv6' 119 120 def test_settings_client_ip_invalid(self): 121 assert 'error' in self.conf( 122 {"http": {"client_ip": {'header': 'X-Forwarded-For', 'source': []}}}, 123 'settings', 124 ), 'empty array source' 125 assert 'error' in self.conf( 126 {"http":{"client_ip": {'header': 'X-Forwarded-For', 'source': 'a'}}}, 127 'settings', 128 ), 'empty source invalid' 129