1import os 2import re 3 4from unit.applications.lang.python import TestApplicationPython 5from conftest import option 6 7 8class TestUpstreamsRR(TestApplicationPython): 9 prerequisites = {'modules': {'python': 'any'}} 10 11 def setup_method(self): 12 super().setup_method() 13 14 assert 'success' in self.conf( 15 { 16 "listeners": { 17 "*:7080": {"pass": "upstreams/one"}, 18 "*:7090": {"pass": "upstreams/two"}, 19 "*:7081": {"pass": "routes/one"}, 20 "*:7082": {"pass": "routes/two"}, 21 "*:7083": {"pass": "routes/three"}, 22 }, 23 "upstreams": { 24 "one": { 25 "servers": { 26 "127.0.0.1:7081": {}, 27 "127.0.0.1:7082": {}, 28 }, 29 }, 30 "two": { 31 "servers": { 32 "127.0.0.1:7081": {}, 33 "127.0.0.1:7082": {}, 34 }, 35 }, 36 }, 37 "routes": { 38 "one": [{"action": {"return": 200}}], 39 "two": [{"action": {"return": 201}}], 40 "three": [{"action": {"return": 202}}], 41 }, 42 "applications": {}, 43 }, 44 ), 'upstreams initial configuration' 45 46 self.cpu_count = os.cpu_count() 47 48 def get_resps(self, req=100, port=7080): 49 resps = [0] 50 51 for _ in range(req): 52 status = self.get(port=port)['status'] 53 if 200 > status or status > 209: 54 continue 55 56 ups = status % 10 57 if ups > len(resps) - 1: 58 resps.extend([0] * (ups - len(resps) + 1)) 59 60 resps[ups] += 1 61 62 return resps 63 64 def get_resps_sc(self, req=100, port=7080): 65 to_send = b"""GET / HTTP/1.1 66Host: localhost 67 68""" * ( 69 req - 1 70 ) 71 72 to_send += b"""GET / HTTP/1.1 73Host: localhost 74Connection: close 75 76""" 77 78 resp = self.http(to_send, raw_resp=True, raw=True, port=port) 79 status = re.findall(r'HTTP\/\d\.\d\s(\d\d\d)', resp) 80 status = list(filter(lambda x: x[:2] == '20', status)) 81 ups = list(map(lambda x: int(x[-1]), status)) 82 83 resps = [0] * (max(ups) + 1) 84 for i in range(len(ups)): 85 resps[ups[i]] += 1 86 87 return resps 88 89 def test_upstreams_rr_no_weight(self): 90 resps = self.get_resps() 91 assert sum(resps) == 100, 'no weight sum' 92 assert abs(resps[0] - resps[1]) <= self.cpu_count, 'no weight' 93 94 assert 'success' in self.conf_delete( 95 'upstreams/one/servers/127.0.0.1:7081' 96 ), 'no weight server remove' 97 98 resps = self.get_resps(req=50) 99 assert resps[1] == 50, 'no weight 2' 100 101 assert 'success' in self.conf( 102 {}, 'upstreams/one/servers/127.0.0.1:7081' 103 ), 'no weight server revert' 104 105 resps = self.get_resps() 106 assert sum(resps) == 100, 'no weight 3 sum' 107 assert abs(resps[0] - resps[1]) <= self.cpu_count, 'no weight 3' 108 109 assert 'success' in self.conf( 110 {}, 'upstreams/one/servers/127.0.0.1:7083' 111 ), 'no weight server new' 112 113 resps = self.get_resps() 114 assert sum(resps) == 100, 'no weight 4 sum' 115 assert max(resps) - min(resps) <= self.cpu_count, 'no weight 4' 116 117 resps = self.get_resps_sc(req=30) 118 assert resps[0] == 10, 'no weight 4 0' 119 assert resps[1] == 10, 'no weight 4 1' 120 assert resps[2] == 10, 'no weight 4 2' 121 122 def test_upstreams_rr_weight(self): 123 assert 'success' in self.conf( 124 {"weight": 3}, 'upstreams/one/servers/127.0.0.1:7081' 125 ), 'configure weight' 126 127 resps = self.get_resps_sc() 128 assert resps[0] == 75, 'weight 3 0' 129 assert resps[1] == 25, 'weight 3 1' 130 131 assert 'success' in self.conf_delete( 132 'upstreams/one/servers/127.0.0.1:7081/weight' 133 ), 'configure weight remove' 134 resps = self.get_resps_sc(req=10) 135 assert resps[0] == 5, 'weight 0 0' 136 assert resps[1] == 5, 'weight 0 1' 137 138 assert 'success' in self.conf( 139 '1', 'upstreams/one/servers/127.0.0.1:7081/weight' 140 ), 'configure weight 1' 141 142 resps = self.get_resps_sc() 143 assert resps[0] == 50, 'weight 1 0' 144 assert resps[1] == 50, 'weight 1 1' 145 146 assert 'success' in self.conf( 147 { 148 "127.0.0.1:7081": {"weight": 3}, 149 "127.0.0.1:7083": {"weight": 2}, 150 }, 151 'upstreams/one/servers', 152 ), 'configure weight 2' 153 154 resps = self.get_resps_sc() 155 assert resps[0] == 60, 'weight 2 0' 156 assert resps[2] == 40, 'weight 2 1' 157 158 def test_upstreams_rr_weight_rational(self): 159 def set_weights(w1, w2): 160 assert 'success' in self.conf( 161 { 162 "127.0.0.1:7081": {"weight": w1}, 163 "127.0.0.1:7082": {"weight": w2}, 164 }, 165 'upstreams/one/servers', 166 ), 'configure weights' 167 168 def check_reqs(w1, w2, reqs=10): 169 resps = self.get_resps_sc(req=reqs) 170 assert resps[0] == reqs * w1 / (w1 + w2), 'weight 1' 171 assert resps[1] == reqs * w2 / (w1 + w2), 'weight 2' 172 173 def check_weights(w1, w2): 174 set_weights(w1, w2) 175 check_reqs(w1, w2) 176 177 check_weights(0, 1) 178 check_weights(0, 999999.0123456) 179 check_weights(1, 9) 180 check_weights(100000, 900000) 181 check_weights(1, 0.25) 182 check_weights(1, 0.25) 183 check_weights(0.2, 0.8) 184 check_weights(1, 1.5) 185 check_weights(1e-3, 1e-3) 186 check_weights(1e-20, 1e-20) 187 check_weights(1e4, 1e4) 188 check_weights(1000000, 1000000) 189 190 set_weights(0.25, 0.25) 191 assert 'success' in self.conf_delete( 192 'upstreams/one/servers/127.0.0.1:7081/weight' 193 ), 'delete weight' 194 check_reqs(1, 0.25) 195 196 assert 'success' in self.conf( 197 { 198 "127.0.0.1:7081": {"weight": 0.1}, 199 "127.0.0.1:7082": {"weight": 1}, 200 "127.0.0.1:7083": {"weight": 0.9}, 201 }, 202 'upstreams/one/servers', 203 ), 'configure weights' 204 resps = self.get_resps_sc(req=20) 205 assert resps[0] == 1, 'weight 3 1' 206 assert resps[1] == 10, 'weight 3 2' 207 assert resps[2] == 9, 'weight 3 3' 208 209 def test_upstreams_rr_independent(self): 210 def sum_resps(*args): 211 sum = [0] * len(args[0]) 212 for arg in args: 213 sum = [x + y for x, y in zip(sum, arg)] 214 215 return sum 216 217 resps = self.get_resps_sc(req=30, port=7090) 218 assert resps[0] == 15, 'dep two before 0' 219 assert resps[1] == 15, 'dep two before 1' 220 221 resps = self.get_resps_sc(req=30) 222 assert resps[0] == 15, 'dep one before 0' 223 assert resps[1] == 15, 'dep one before 1' 224 225 assert 'success' in self.conf( 226 '2', 'upstreams/two/servers/127.0.0.1:7081/weight' 227 ), 'configure dep weight' 228 229 resps = self.get_resps_sc(req=30, port=7090) 230 assert resps[0] == 20, 'dep two 0' 231 assert resps[1] == 10, 'dep two 1' 232 233 resps = self.get_resps_sc(req=30) 234 assert resps[0] == 15, 'dep one 0' 235 assert resps[1] == 15, 'dep one 1' 236 237 assert 'success' in self.conf( 238 '1', 'upstreams/two/servers/127.0.0.1:7081/weight' 239 ), 'configure dep weight 1' 240 241 r_one, r_two = [0, 0], [0, 0] 242 for _ in range(10): 243 r_one = sum_resps(r_one, self.get_resps(req=10)) 244 r_two = sum_resps(r_two, self.get_resps(req=10, port=7090)) 245 246 assert sum(r_one) == 100, 'dep one mix sum' 247 assert abs(r_one[0] - r_one[1]) <= self.cpu_count, 'dep one mix' 248 assert sum(r_two) == 100, 'dep two mix sum' 249 assert abs(r_two[0] - r_two[1]) <= self.cpu_count, 'dep two mix' 250 251 def test_upstreams_rr_delay(self): 252 assert 'success' in self.conf( 253 { 254 "listeners": { 255 "*:7080": {"pass": "upstreams/one"}, 256 "*:7081": {"pass": "routes"}, 257 "*:7082": {"pass": "routes"}, 258 }, 259 "upstreams": { 260 "one": { 261 "servers": { 262 "127.0.0.1:7081": {}, 263 "127.0.0.1:7082": {}, 264 }, 265 }, 266 }, 267 "routes": [ 268 { 269 "match": {"destination": "*:7081"}, 270 "action": {"pass": "applications/delayed"}, 271 }, 272 { 273 "match": {"destination": "*:7082"}, 274 "action": {"return": 201}, 275 }, 276 ], 277 "applications": { 278 "delayed": { 279 "type": "python", 280 "processes": {"spare": 0}, 281 "path": option.test_dir + "/python/delayed", 282 "working_directory": option.test_dir 283 + "/python/delayed", 284 "module": "wsgi", 285 } 286 }, 287 }, 288 ), 'upstreams initial configuration' 289 290 req = 50 291 292 socks = [] 293 for i in range(req): 294 delay = 1 if i % 5 == 0 else 0 295 _, sock = self.get( 296 headers={ 297 'Host': 'localhost', 298 'Content-Length': '0', 299 'X-Delay': str(delay), 300 'Connection': 'close', 301 }, 302 start=True, 303 no_recv=True, 304 ) 305 socks.append(sock) 306 307 resps = [0, 0] 308 for i in range(req): 309 resp = self.recvall(socks[i]).decode() 310 socks[i].close() 311 312 m = re.search(r'HTTP/1.1 20(\d)', resp) 313 assert m is not None, 'status' 314 resps[int(m.group(1))] += 1 315 316 assert sum(resps) == req, 'delay sum' 317 assert abs(resps[0] - resps[1]) <= self.cpu_count, 'delay' 318 319 def test_upstreams_rr_active_req(self): 320 conns = 5 321 socks = [] 322 socks2 = [] 323 324 for _ in range(conns): 325 _, sock = self.get(start=True, no_recv=True) 326 socks.append(sock) 327 328 _, sock2 = self.http( 329 b"""POST / HTTP/1.1 330Host: localhost 331Content-Length: 10 332Connection: close 333 334""", 335 start=True, 336 no_recv=True, 337 raw=True, 338 ) 339 socks2.append(sock2) 340 341 # Send one more request and read response to make sure that previous 342 # requests had enough time to reach server. 343 344 assert self.get()['body'] == '' 345 346 assert 'success' in self.conf( 347 {"127.0.0.1:7083": {"weight": 2}}, 'upstreams/one/servers', 348 ), 'active req new server' 349 assert 'success' in self.conf_delete( 350 'upstreams/one/servers/127.0.0.1:7083' 351 ), 'active req server remove' 352 assert 'success' in self.conf_delete( 353 'listeners/*:7080' 354 ), 'delete listener' 355 assert 'success' in self.conf_delete( 356 'upstreams/one' 357 ), 'active req upstream remove' 358 359 for i in range(conns): 360 assert ( 361 self.http(b'', sock=socks[i], raw=True)['body'] == '' 362 ), 'active req GET' 363 364 assert ( 365 self.http(b"""0123456789""", sock=socks2[i], raw=True)['body'] 366 == '' 367 ), 'active req POST' 368 369 def test_upstreams_rr_bad_server(self): 370 assert 'success' in self.conf( 371 {"weight": 1}, 'upstreams/one/servers/127.0.0.1:7084' 372 ), 'configure bad server' 373 374 resps = self.get_resps_sc(req=30) 375 assert resps[0] == 10, 'bad server 0' 376 assert resps[1] == 10, 'bad server 1' 377 assert sum(resps) == 20, 'bad server sum' 378 379 def test_upstreams_rr_pipeline(self): 380 resps = self.get_resps_sc() 381 382 assert resps[0] == 50, 'pipeline 0' 383 assert resps[1] == 50, 'pipeline 1' 384 385 def test_upstreams_rr_post(self): 386 resps = [0, 0] 387 for _ in range(50): 388 resps[self.get()['status'] % 10] += 1 389 resps[self.post(body='0123456789')['status'] % 10] += 1 390 391 assert sum(resps) == 100, 'post sum' 392 assert abs(resps[0] - resps[1]) <= self.cpu_count, 'post' 393 394 def test_upstreams_rr_unix(self): 395 addr_0 = self.temp_dir + '/sock_0' 396 addr_1 = self.temp_dir + '/sock_1' 397 398 assert 'success' in self.conf( 399 { 400 "*:7080": {"pass": "upstreams/one"}, 401 "unix:" + addr_0: {"pass": "routes/one"}, 402 "unix:" + addr_1: {"pass": "routes/two"}, 403 }, 404 'listeners', 405 ), 'configure listeners unix' 406 407 assert 'success' in self.conf( 408 {"unix:" + addr_0: {}, "unix:" + addr_1: {}}, 409 'upstreams/one/servers', 410 ), 'configure servers unix' 411 412 resps = self.get_resps_sc() 413 414 assert resps[0] == 50, 'unix 0' 415 assert resps[1] == 50, 'unix 1' 416 417 def test_upstreams_rr_ipv6(self): 418 assert 'success' in self.conf( 419 { 420 "*:7080": {"pass": "upstreams/one"}, 421 "[::1]:7081": {"pass": "routes/one"}, 422 "[::1]:7082": {"pass": "routes/two"}, 423 }, 424 'listeners', 425 ), 'configure listeners ipv6' 426 427 assert 'success' in self.conf( 428 {"[::1]:7081": {}, "[::1]:7082": {}}, 'upstreams/one/servers' 429 ), 'configure servers ipv6' 430 431 resps = self.get_resps_sc() 432 433 assert resps[0] == 50, 'ipv6 0' 434 assert resps[1] == 50, 'ipv6 1' 435 436 def test_upstreams_rr_servers_empty(self): 437 assert 'success' in self.conf( 438 {}, 'upstreams/one/servers' 439 ), 'configure servers empty' 440 assert self.get()['status'] == 502, 'servers empty' 441 442 assert 'success' in self.conf( 443 {"127.0.0.1:7081": {"weight": 0}}, 'upstreams/one/servers' 444 ), 'configure servers empty one' 445 assert self.get()['status'] == 502, 'servers empty one' 446 assert 'success' in self.conf( 447 { 448 "127.0.0.1:7081": {"weight": 0}, 449 "127.0.0.1:7082": {"weight": 0}, 450 }, 451 'upstreams/one/servers', 452 ), 'configure servers empty two' 453 assert self.get()['status'] == 502, 'servers empty two' 454 455 def test_upstreams_rr_invalid(self): 456 assert 'error' in self.conf({}, 'upstreams'), 'upstreams empty' 457 assert 'error' in self.conf( 458 {}, 'upstreams/one' 459 ), 'named upstreams empty' 460 assert 'error' in self.conf( 461 {}, 'upstreams/one/servers/127.0.0.1' 462 ), 'invalid address' 463 assert 'error' in self.conf( 464 {}, 'upstreams/one/servers/127.0.0.1:7081/blah' 465 ), 'invalid server option' 466 467 def check_weight(w): 468 assert 'error' in self.conf( 469 w, 'upstreams/one/servers/127.0.0.1:7081/weight' 470 ), 'invalid weight option' 471 472 check_weight({}) 473 check_weight('-1') 474 check_weight('1.') 475 check_weight('1.1.') 476 check_weight('.') 477 check_weight('.01234567890123') 478 check_weight('1000001') 479 check_weight('2e6') 480