1from unit.applications.lang.python import TestApplicationPython 2 3 4class TestClientIP(TestApplicationPython): 5 prerequisites = {'modules': {'python': 'any'}} 6 7 def client_ip(self, options): 8 assert 'success' in self.conf( 9 { 10 "127.0.0.1:7081": { 11 "client_ip": options, 12 "pass": "applications/client_ip", 13 }, 14 "[::1]:7082": { 15 "client_ip": options, 16 "pass": "applications/client_ip", 17 }, 18 }, 19 'listeners', 20 ), 'listeners configure' 21 22 def get_xff(self, xff, sock_type='ipv4'): 23 port = 7081 if sock_type == 'ipv4' else 7082 24 25 return self.get( 26 sock_type=sock_type, 27 port=port, 28 headers={'Connection': 'close', 'X-Forwarded-For': xff}, 29 )['body'] 30 31 def setup_method(self): 32 self.load('client_ip') 33 34 def test_client_ip_single_ip(self): 35 self.client_ip( 36 {'header': 'X-Forwarded-For', 'source': '123.123.123.123'} 37 ) 38 39 assert self.get(port=7081)['body'] == '127.0.0.1', 'ipv4 default' 40 assert ( 41 self.get(sock_type='ipv6', port=7082)['body'] == '::1' 42 ), 'ipv6 default' 43 assert self.get_xff('1.1.1.1') == '127.0.0.1', 'bad source' 44 assert self.get_xff('blah') == '127.0.0.1', 'bad header' 45 assert self.get_xff('1.1.1.1', 'ipv6') == '::1', 'bad source ipv6' 46 47 self.client_ip({'header': 'X-Forwarded-For', 'source': '127.0.0.1'}) 48 49 assert self.get(port=7081)['body'] == '127.0.0.1', 'ipv4 default 2' 50 assert ( 51 self.get(sock_type='ipv6', port=7082)['body'] == '::1' 52 ), 'ipv6 default 2' 53 assert self.get_xff('1.1.1.1') == '1.1.1.1', 'replace' 54 assert self.get_xff('blah') == '127.0.0.1', 'bad header 2' 55 assert self.get_xff('1.1.1.1', 'ipv6') == '::1', 'bad source ipv6 2' 56 57 self.client_ip({'header': 'X-Forwarded-For', 'source': '!127.0.0.1'}) 58 59 assert self.get_xff('1.1.1.1') == '127.0.0.1', 'bad source 3' 60 assert self.get_xff('1.1.1.1', 'ipv6') == '1.1.1.1', 'replace 2' 61 62 def test_client_ip_ipv4(self): 63 self.client_ip({'header': 'X-Forwarded-For', 'source': '127.0.0.1'}) 64 65 assert ( 66 self.get_xff('8.8.8.8, 84.23.23.11') == '84.23.23.11' 67 ), 'xff replace' 68 assert ( 69 self.get_xff('8.8.8.8, 84.23.23.11, 127.0.0.1') == '127.0.0.1' 70 ), 'xff replace 2' 71 assert ( 72 self.get_xff(['8.8.8.8', '127.0.0.1, 10.0.1.1']) == '10.0.1.1' 73 ), 'xff replace multi' 74 75 def test_client_ip_ipv6(self): 76 self.client_ip({'header': 'X-Forwarded-For', 'source': '::1'}) 77 78 assert self.get_xff('1.1.1.1') == '127.0.0.1', 'bad source ipv4' 79 80 for ip in [ 81 'f607:7403:1e4b:6c66:33b2:843f:2517:da27', 82 '2001:db8:3c4d:15::1a2f:1a2b', 83 '2001::3c4d:15:1a2f:1a2b', 84 '::11.22.33.44', 85 ]: 86 assert self.get_xff(ip, 'ipv6') == ip, 'replace' 87 88 def test_client_ip_recursive(self): 89 self.client_ip( 90 { 91 'header': 'X-Forwarded-For', 92 'recursive': True, 93 'source': ['127.0.0.1', '10.50.0.17', '10.5.2.1'], 94 } 95 ) 96 97 assert self.get_xff('1.1.1.1') == '1.1.1.1', 'xff chain' 98 assert self.get_xff('1.1.1.1, 10.5.2.1') == '1.1.1.1', 'xff chain 2' 99 assert ( 100 self.get_xff('8.8.8.8, 1.1.1.1, 10.5.2.1') == '1.1.1.1' 101 ), 'xff chain 3' 102 assert ( 103 self.get_xff('10.50.0.17, 10.5.2.1, 10.5.2.1') == '10.50.0.17' 104 ), 'xff chain 4' 105 assert ( 106 self.get_xff(['8.8.8.8', '1.1.1.1, 127.0.0.1']) == '1.1.1.1' 107 ), 'xff replace multi' 108 assert ( 109 self.get_xff(['8.8.8.8', '1.1.1.1, 127.0.0.1', '10.5.2.1']) 110 == '1.1.1.1' 111 ), 'xff replace multi 2' 112 assert ( 113 self.get_xff(['10.5.2.1', '10.50.0.17, 1.1.1.1', '10.5.2.1']) 114 == '1.1.1.1' 115 ), 'xff replace multi 3' 116 assert ( 117 self.get_xff('8.8.8.8, 2001:db8:3c4d:15::1a2f:1a2b, 127.0.0.1') 118 == '2001:db8:3c4d:15::1a2f:1a2b' 119 ), 'xff chain ipv6' 120 121 def test_client_ip_case_insensitive(self): 122 self.client_ip({'header': 'x-forwarded-for', 'source': '127.0.0.1'}) 123 124 assert self.get_xff('1.1.1.1') == '1.1.1.1', 'case insensitive' 125 126 def test_client_ip_empty_source(self): 127 self.client_ip({'header': 'X-Forwarded-For', 'source': []}) 128 129 assert self.get_xff('1.1.1.1') == '127.0.0.1', 'empty source' 130 131 def test_client_ip_invalid(self): 132 assert 'error' in self.conf( 133 { 134 "127.0.0.1:7081": { 135 "client_ip": {"source": '127.0.0.1'}, 136 "pass": "applications/client_ip", 137 } 138 }, 139 'listeners', 140 ), 'invalid header' 141 142 def check_invalid_source(source): 143 assert 'error' in self.conf( 144 { 145 "127.0.0.1:7081": { 146 "client_ip": { 147 "header": "X-Forwarded-For", 148 "source": source, 149 }, 150 "pass": "applications/client_ip", 151 } 152 }, 153 'listeners', 154 ), 'invalid source' 155 156 check_invalid_source(None) 157 check_invalid_source('a') 158 check_invalid_source(['a']) 159