1import os 2import re 3import unittest 4from unit.applications.lang.python import TestApplicationPython 5 6 7class TestUpstreamsRR(TestApplicationPython): 8 prerequisites = {'modules': {'python': 'any'}} 9 10 def setUp(self): 11 super().setUp() 12 13 self.assertIn( 14 'success', 15 self.conf( 16 { 17 "listeners": { 18 "*:7080": {"pass": "upstreams/one"}, 19 "*:7090": {"pass": "upstreams/two"}, 20 "*:7081": {"pass": "routes/one"}, 21 "*:7082": {"pass": "routes/two"}, 22 "*:7083": {"pass": "routes/three"}, 23 }, 24 "upstreams": { 25 "one": { 26 "servers": { 27 "127.0.0.1:7081": {}, 28 "127.0.0.1:7082": {}, 29 }, 30 }, 31 "two": { 32 "servers": { 33 "127.0.0.1:7081": {}, 34 "127.0.0.1:7082": {}, 35 }, 36 }, 37 }, 38 "routes": { 39 "one": [{"action": {"return": 200}}], 40 "two": [{"action": {"return": 201}}], 41 "three": [{"action": {"return": 202}}], 42 }, 43 "applications": {}, 44 }, 45 ), 46 'upstreams initial configuration', 47 ) 48 49 self.cpu_count = os.cpu_count() 50 51 def get_resps(self, req=100, port=7080): 52 resps = [0] 53 54 for _ in range(req): 55 status = self.get(port=port)['status'] 56 if 200 > status or status > 209: 57 continue 58 59 ups = status % 10 60 if ups > len(resps) - 1: 61 resps.extend([0] * (ups - len(resps) + 1)) 62 63 resps[ups] += 1 64 65 return resps 66 67 def get_resps_sc(self, req=100, port=7080): 68 to_send = b"""GET / HTTP/1.1 69Host: localhost 70 71""" * ( 72 req - 1 73 ) 74 75 to_send += b"""GET / HTTP/1.1 76Host: localhost 77Connection: close 78 79""" 80 81 resp = self.http(to_send, raw_resp=True, raw=True, port=port) 82 status = re.findall(r'HTTP\/\d\.\d\s(\d\d\d)', resp) 83 status = list(filter(lambda x: x[:2] == '20', status)) 84 ups = list(map(lambda x: int(x[-1]), status)) 85 86 resps = [0] * (max(ups) + 1) 87 for i in range(len(ups)): 88 resps[ups[i]] += 1 89 90 return resps 91 92 def test_upstreams_rr_no_weight(self): 93 resps = self.get_resps() 94 self.assertEqual(sum(resps), 100, 'no weight sum') 95 self.assertLessEqual( 96 abs(resps[0] - resps[1]), self.cpu_count, 'no weight' 97 ) 98 99 self.assertIn( 100 'success', 101 self.conf_delete('upstreams/one/servers/127.0.0.1:7081'), 102 'no weight server remove', 103 ) 104 105 resps = self.get_resps(req=50) 106 self.assertEqual(resps[1], 50, 'no weight 2') 107 108 self.assertIn( 109 'success', 110 self.conf({}, 'upstreams/one/servers/127.0.0.1:7081'), 111 'no weight server revert', 112 ) 113 114 resps = self.get_resps() 115 self.assertEqual(sum(resps), 100, 'no weight 3 sum') 116 self.assertLessEqual( 117 abs(resps[0] - resps[1]), self.cpu_count, 'no weight 3' 118 ) 119 120 self.assertIn( 121 'success', 122 self.conf({}, 'upstreams/one/servers/127.0.0.1:7083'), 123 'no weight server new', 124 ) 125 126 resps = self.get_resps() 127 self.assertEqual(sum(resps), 100, 'no weight 4 sum') 128 self.assertLessEqual( 129 max(resps) - min(resps), self.cpu_count, 'no weight 4' 130 ) 131 132 resps = self.get_resps_sc(req=30) 133 self.assertEqual(resps[0], 10, 'no weight 4 0') 134 self.assertEqual(resps[1], 10, 'no weight 4 1') 135 self.assertEqual(resps[2], 10, 'no weight 4 2') 136 137 def test_upstreams_rr_weight(self): 138 self.assertIn( 139 'success', 140 self.conf({"weight": 3}, 'upstreams/one/servers/127.0.0.1:7081'), 141 'configure weight', 142 ) 143 144 resps = self.get_resps_sc() 145 self.assertEqual(resps[0], 75, 'weight 3 0') 146 self.assertEqual(resps[1], 25, 'weight 3 1') 147 148 self.assertIn( 149 'success', 150 self.conf_delete('upstreams/one/servers/127.0.0.1:7081/weight'), 151 'configure weight remove', 152 ) 153 resps = self.get_resps_sc(req=10) 154 self.assertEqual(resps[0], 5, 'weight 0 0') 155 self.assertEqual(resps[1], 5, 'weight 0 1') 156 157 self.assertIn( 158 'success', 159 self.conf('1', 'upstreams/one/servers/127.0.0.1:7081/weight'), 160 'configure weight 1', 161 ) 162 163 resps = self.get_resps_sc() 164 self.assertEqual(resps[0], 50, 'weight 1 0') 165 self.assertEqual(resps[1], 50, 'weight 1 1') 166 167 self.assertIn( 168 'success', 169 self.conf( 170 { 171 "127.0.0.1:7081": {"weight": 3}, 172 "127.0.0.1:7083": {"weight": 2}, 173 }, 174 'upstreams/one/servers', 175 ), 176 'configure weight 2', 177 ) 178 179 resps = self.get_resps_sc() 180 self.assertEqual(resps[0], 60, 'weight 2 0') 181 self.assertEqual(resps[2], 40, 'weight 2 1') 182 183 def test_upstreams_rr_weight_rational(self): 184 def set_weights(w1, w2): 185 self.assertIn( 186 'success', 187 self.conf( 188 { 189 "127.0.0.1:7081": {"weight": w1}, 190 "127.0.0.1:7082": {"weight": w2}, 191 }, 192 'upstreams/one/servers', 193 ), 194 'configure weights', 195 ) 196 197 def check_reqs(w1, w2, reqs=10): 198 resps = self.get_resps_sc(req=reqs) 199 self.assertEqual(resps[0], reqs * w1 / (w1 + w2), 'weight 1') 200 self.assertEqual(resps[1], reqs * w2 / (w1 + w2), 'weight 2') 201 202 def check_weights(w1, w2): 203 set_weights(w1, w2) 204 check_reqs(w1, w2) 205 206 check_weights(0, 1) 207 check_weights(0, 999999.0123456) 208 check_weights(1, 9) 209 check_weights(100000, 900000) 210 check_weights(1, .25) 211 check_weights(1, 0.25) 212 check_weights(0.2, .8) 213 check_weights(1, 1.5) 214 check_weights(1e-3, 1E-3) 215 check_weights(1e-20, 1e-20) 216 check_weights(1e4, 1e4) 217 check_weights(1000000, 1000000) 218 219 set_weights(0.25, 0.25) 220 self.assertIn( 221 'success', 222 self.conf_delete('upstreams/one/servers/127.0.0.1:7081/weight'), 223 'delete weight', 224 ) 225 check_reqs(1, 0.25) 226 227 self.assertIn( 228 'success', 229 self.conf( 230 { 231 "127.0.0.1:7081": {"weight": 0.1}, 232 "127.0.0.1:7082": {"weight": 1}, 233 "127.0.0.1:7083": {"weight": 0.9}, 234 }, 235 'upstreams/one/servers', 236 ), 237 'configure weights', 238 ) 239 resps = self.get_resps_sc(req=20) 240 self.assertEqual(resps[0], 1, 'weight 3 1') 241 self.assertEqual(resps[1], 10, 'weight 3 2') 242 self.assertEqual(resps[2], 9, 'weight 3 3') 243 244 def test_upstreams_rr_independent(self): 245 def sum_resps(*args): 246 sum = [0] * len(args[0]) 247 for arg in args: 248 sum = [x + y for x, y in zip(sum, arg)] 249 250 return sum 251 252 resps = self.get_resps_sc(req=30, port=7090) 253 self.assertEqual(resps[0], 15, 'dep two before 0') 254 self.assertEqual(resps[1], 15, 'dep two before 1') 255 256 resps = self.get_resps_sc(req=30) 257 self.assertEqual(resps[0], 15, 'dep one before 0') 258 self.assertEqual(resps[1], 15, 'dep one before 1') 259 260 self.assertIn( 261 'success', 262 self.conf('2', 'upstreams/two/servers/127.0.0.1:7081/weight'), 263 'configure dep weight', 264 ) 265 266 resps = self.get_resps_sc(req=30, port=7090) 267 self.assertEqual(resps[0], 20, 'dep two 0') 268 self.assertEqual(resps[1], 10, 'dep two 1') 269 270 resps = self.get_resps_sc(req=30) 271 self.assertEqual(resps[0], 15, 'dep one 0') 272 self.assertEqual(resps[1], 15, 'dep one 1') 273 274 self.assertIn( 275 'success', 276 self.conf('1', 'upstreams/two/servers/127.0.0.1:7081/weight'), 277 'configure dep weight 1', 278 ) 279 280 r_one, r_two = [0, 0], [0, 0] 281 for _ in range(10): 282 r_one = sum_resps(r_one, self.get_resps(req=10)) 283 r_two = sum_resps(r_two, self.get_resps(req=10, port=7090)) 284 285 286 self.assertEqual(sum(r_one), 100, 'dep one mix sum') 287 self.assertLessEqual( 288 abs(r_one[0] - r_one[1]), self.cpu_count, 'dep one mix' 289 ) 290 self.assertEqual(sum(r_two), 100, 'dep two mix sum') 291 self.assertLessEqual( 292 abs(r_two[0] - r_two[1]), self.cpu_count, 'dep two mix' 293 ) 294 295 def test_upstreams_rr_delay(self): 296 self.assertIn( 297 'success', 298 self.conf( 299 { 300 "listeners": { 301 "*:7080": {"pass": "upstreams/one"}, 302 "*:7081": {"pass": "routes"}, 303 "*:7082": {"pass": "routes"}, 304 }, 305 "upstreams": { 306 "one": { 307 "servers": { 308 "127.0.0.1:7081": {}, 309 "127.0.0.1:7082": {}, 310 }, 311 }, 312 }, 313 "routes": [ 314 { 315 "match": {"destination": "*:7081"}, 316 "action": {"pass": "applications/delayed"}, 317 }, 318 { 319 "match": {"destination": "*:7082"}, 320 "action": {"return": 201}, 321 }, 322 ], 323 "applications": { 324 "delayed": { 325 "type": "python", 326 "processes": {"spare": 0}, 327 "path": self.current_dir + "/python/delayed", 328 "working_directory": self.current_dir 329 + "/python/delayed", 330 "module": "wsgi", 331 } 332 }, 333 }, 334 ), 335 'upstreams initial configuration', 336 ) 337 338 req = 50 339 340 socks = [] 341 for i in range(req): 342 delay = 1 if i % 5 == 0 else 0 343 _, sock = self.get( 344 headers={ 345 'Host': 'localhost', 346 'Content-Length': '0', 347 'X-Delay': str(delay), 348 'Connection': 'close', 349 }, 350 start=True, 351 no_recv=True, 352 ) 353 socks.append(sock) 354 355 resps = [0, 0] 356 for i in range(req): 357 resp = self.recvall(socks[i]).decode() 358 socks[i].close() 359 360 m = re.search('HTTP/1.1 20(\d)', resp) 361 self.assertIsNotNone(m, 'status') 362 resps[int(m.group(1))] += 1 363 364 self.assertEqual(sum(resps), req, 'delay sum') 365 self.assertLessEqual(abs(resps[0] - resps[1]), self.cpu_count, 'delay') 366 367 def test_upstreams_rr_active_req(self): 368 conns = 5 369 socks = [] 370 socks2 = [] 371 372 for _ in range(conns): 373 _, sock = self.get(start=True, no_recv=True) 374 socks.append(sock) 375 376 _, sock2 = self.http( 377 b"""POST / HTTP/1.1 378Host: localhost 379Content-Length: 10 380Connection: close 381 382""", 383 start=True, 384 no_recv=True, 385 raw=True, 386 ) 387 socks2.append(sock2) 388 389 # Send one more request and read response to make sure that previous 390 # requests had enough time to reach server. 391 392 self.assertEqual(self.get()['body'], '') 393 394 self.assertIn( 395 'success', 396 self.conf( 397 {"127.0.0.1:7083": {"weight": 2}}, 'upstreams/one/servers', 398 ), 399 'active req new server', 400 ) 401 self.assertIn( 402 'success', 403 self.conf_delete('upstreams/one/servers/127.0.0.1:7083'), 404 'active req server remove', 405 ) 406 self.assertIn( 407 'success', self.conf_delete('listeners/*:7080'), 'delete listener' 408 ) 409 self.assertIn( 410 'success', 411 self.conf_delete('upstreams/one'), 412 'active req upstream remove', 413 ) 414 415 for i in range(conns): 416 self.assertEqual( 417 self.http(b'', sock=socks[i], raw=True)['body'], 418 '', 419 'active req GET', 420 ) 421 422 self.assertEqual( 423 self.http(b"""0123456789""", sock=socks2[i], raw=True)['body'], 424 '', 425 'active req POST', 426 ) 427 428 def test_upstreams_rr_bad_server(self): 429 self.assertIn( 430 'success', 431 self.conf({"weight": 1}, 'upstreams/one/servers/127.0.0.1:7084'), 432 'configure bad server', 433 ) 434 435 resps = self.get_resps_sc(req=30) 436 self.assertEqual(resps[0], 10, 'bad server 0') 437 self.assertEqual(resps[1], 10, 'bad server 1') 438 self.assertEqual(sum(resps), 20, 'bad server sum') 439 440 def test_upstreams_rr_pipeline(self): 441 resps = self.get_resps_sc() 442 443 self.assertEqual(resps[0], 50, 'pipeline 0') 444 self.assertEqual(resps[1], 50, 'pipeline 1') 445 446 def test_upstreams_rr_post(self): 447 resps = [0, 0] 448 for _ in range(50): 449 resps[self.get()['status'] % 10] += 1 450 resps[self.post(body='0123456789')['status'] % 10] += 1 451 452 self.assertEqual(sum(resps), 100, 'post sum') 453 self.assertLessEqual(abs(resps[0] - resps[1]), self.cpu_count, 'post') 454 455 def test_upstreams_rr_unix(self): 456 addr_0 = self.testdir + '/sock_0' 457 addr_1 = self.testdir + '/sock_1' 458 459 self.assertIn( 460 'success', 461 self.conf( 462 { 463 "*:7080": {"pass": "upstreams/one"}, 464 "unix:" + addr_0: {"pass": "routes/one"}, 465 "unix:" + addr_1: {"pass": "routes/two"}, 466 }, 467 'listeners', 468 ), 469 'configure listeners unix', 470 ) 471 472 self.assertIn( 473 'success', 474 self.conf( 475 {"unix:" + addr_0: {}, "unix:" + addr_1: {}}, 476 'upstreams/one/servers', 477 ), 478 'configure servers unix', 479 ) 480 481 resps = self.get_resps_sc() 482 483 self.assertEqual(resps[0], 50, 'unix 0') 484 self.assertEqual(resps[1], 50, 'unix 1') 485 486 def test_upstreams_rr_ipv6(self): 487 self.assertIn( 488 'success', 489 self.conf( 490 { 491 "*:7080": {"pass": "upstreams/one"}, 492 "[::1]:7081": {"pass": "routes/one"}, 493 "[::1]:7082": {"pass": "routes/two"}, 494 }, 495 'listeners', 496 ), 497 'configure listeners ipv6', 498 ) 499 500 self.assertIn( 501 'success', 502 self.conf( 503 {"[::1]:7081": {}, "[::1]:7082": {}}, 'upstreams/one/servers' 504 ), 505 'configure servers ipv6', 506 ) 507 508 resps = self.get_resps_sc() 509 510 self.assertEqual(resps[0], 50, 'ipv6 0') 511 self.assertEqual(resps[1], 50, 'ipv6 1') 512 513 def test_upstreams_rr_servers_empty(self): 514 self.assertIn( 515 'success', 516 self.conf({}, 'upstreams/one/servers'), 517 'configure servers empty', 518 ) 519 self.assertEqual(self.get()['status'], 502, 'servers empty') 520 521 self.assertIn( 522 'success', 523 self.conf( 524 {"127.0.0.1:7081": {"weight": 0}}, 'upstreams/one/servers' 525 ), 526 'configure servers empty one', 527 ) 528 self.assertEqual(self.get()['status'], 502, 'servers empty one') 529 self.assertIn( 530 'success', 531 self.conf( 532 { 533 "127.0.0.1:7081": {"weight": 0}, 534 "127.0.0.1:7082": {"weight": 0}, 535 }, 536 'upstreams/one/servers', 537 ), 538 'configure servers empty two', 539 ) 540 self.assertEqual(self.get()['status'], 502, 'servers empty two') 541 542 def test_upstreams_rr_invalid(self): 543 self.assertIn( 544 'error', self.conf({}, 'upstreams'), 'upstreams empty', 545 ) 546 self.assertIn( 547 'error', self.conf({}, 'upstreams/one'), 'named upstreams empty', 548 ) 549 self.assertIn( 550 'error', 551 self.conf({}, 'upstreams/one/servers/127.0.0.1'), 552 'invalid address', 553 ) 554 self.assertIn( 555 'error', 556 self.conf({}, 'upstreams/one/servers/127.0.0.1:7081/blah'), 557 'invalid server option', 558 ) 559 560 def check_weight(w): 561 self.assertIn( 562 'error', 563 self.conf(w, 'upstreams/one/servers/127.0.0.1:7081/weight'), 564 'invalid weight option', 565 ) 566 check_weight({}) 567 check_weight('-1') 568 check_weight('1.') 569 check_weight('1.1.') 570 check_weight('.') 571 check_weight('.01234567890123') 572 check_weight('1000001') 573 check_weight('2e6') 574 575 576if __name__ == '__main__': 577 TestUpstreamsRR.main() 578