1import os 2import re 3 4from unit.applications.lang.python import TestApplicationPython 5from unit.option import option 6 7 8class TestUpstreamsRR(TestApplicationPython): 9 prerequisites = {'modules': {'python': 'any'}} 10 11 def setup_method(self): 12 assert 'success' in self.conf( 13 { 14 "listeners": { 15 "*:7080": {"pass": "upstreams/one"}, 16 "*:7090": {"pass": "upstreams/two"}, 17 "*:7081": {"pass": "routes/one"}, 18 "*:7082": {"pass": "routes/two"}, 19 "*:7083": {"pass": "routes/three"}, 20 }, 21 "upstreams": { 22 "one": { 23 "servers": { 24 "127.0.0.1:7081": {}, 25 "127.0.0.1:7082": {}, 26 }, 27 }, 28 "two": { 29 "servers": { 30 "127.0.0.1:7081": {}, 31 "127.0.0.1:7082": {}, 32 }, 33 }, 34 }, 35 "routes": { 36 "one": [{"action": {"return": 200}}], 37 "two": [{"action": {"return": 201}}], 38 "three": [{"action": {"return": 202}}], 39 }, 40 "applications": {}, 41 }, 42 ), 'upstreams initial configuration' 43 44 self.cpu_count = os.cpu_count() 45 46 def get_resps(self, req=100, port=7080): 47 resps = [0] 48 49 for _ in range(req): 50 status = self.get(port=port)['status'] 51 if 200 > status or status > 209: 52 continue 53 54 ups = status % 10 55 if ups > len(resps) - 1: 56 resps.extend([0] * (ups - len(resps) + 1)) 57 58 resps[ups] += 1 59 60 return resps 61 62 def get_resps_sc(self, req=100, port=7080): 63 to_send = b"""GET / HTTP/1.1 64Host: localhost 65 66""" * ( 67 req - 1 68 ) 69 70 to_send += b"""GET / HTTP/1.1 71Host: localhost 72Connection: close 73 74""" 75 76 resp = self.http(to_send, raw_resp=True, raw=True, port=port) 77 status = re.findall(r'HTTP\/\d\.\d\s(\d\d\d)', resp) 78 status = list(filter(lambda x: x[:2] == '20', status)) 79 ups = list(map(lambda x: int(x[-1]), status)) 80 81 resps = [0] * (max(ups) + 1) 82 for i in range(len(ups)): 83 resps[ups[i]] += 1 84 85 return resps 86 87 def test_upstreams_rr_no_weight(self): 88 resps = self.get_resps() 89 assert sum(resps) == 100, 'no weight sum' 90 assert abs(resps[0] - resps[1]) <= self.cpu_count, 'no weight' 91 92 assert 'success' in self.conf_delete( 93 'upstreams/one/servers/127.0.0.1:7081' 94 ), 'no weight server remove' 95 96 resps = self.get_resps(req=50) 97 assert resps[1] == 50, 'no weight 2' 98 99 assert 'success' in self.conf( 100 {}, 'upstreams/one/servers/127.0.0.1:7081' 101 ), 'no weight server revert' 102 103 resps = self.get_resps() 104 assert sum(resps) == 100, 'no weight 3 sum' 105 assert abs(resps[0] - resps[1]) <= self.cpu_count, 'no weight 3' 106 107 assert 'success' in self.conf( 108 {}, 'upstreams/one/servers/127.0.0.1:7083' 109 ), 'no weight server new' 110 111 resps = self.get_resps() 112 assert sum(resps) == 100, 'no weight 4 sum' 113 assert max(resps) - min(resps) <= self.cpu_count, 'no weight 4' 114 115 resps = self.get_resps_sc(req=30) 116 assert resps[0] == 10, 'no weight 4 0' 117 assert resps[1] == 10, 'no weight 4 1' 118 assert resps[2] == 10, 'no weight 4 2' 119 120 def test_upstreams_rr_weight(self): 121 assert 'success' in self.conf( 122 {"weight": 3}, 'upstreams/one/servers/127.0.0.1:7081' 123 ), 'configure weight' 124 125 resps = self.get_resps_sc() 126 assert resps[0] == 75, 'weight 3 0' 127 assert resps[1] == 25, 'weight 3 1' 128 129 assert 'success' in self.conf_delete( 130 'upstreams/one/servers/127.0.0.1:7081/weight' 131 ), 'configure weight remove' 132 resps = self.get_resps_sc(req=10) 133 assert resps[0] == 5, 'weight 0 0' 134 assert resps[1] == 5, 'weight 0 1' 135 136 assert 'success' in self.conf( 137 '1', 'upstreams/one/servers/127.0.0.1:7081/weight' 138 ), 'configure weight 1' 139 140 resps = self.get_resps_sc() 141 assert resps[0] == 50, 'weight 1 0' 142 assert resps[1] == 50, 'weight 1 1' 143 144 assert 'success' in self.conf( 145 { 146 "127.0.0.1:7081": {"weight": 3}, 147 "127.0.0.1:7083": {"weight": 2}, 148 }, 149 'upstreams/one/servers', 150 ), 'configure weight 2' 151 152 resps = self.get_resps_sc() 153 assert resps[0] == 60, 'weight 2 0' 154 assert resps[2] == 40, 'weight 2 1' 155 156 def test_upstreams_rr_weight_rational(self): 157 def set_weights(w1, w2): 158 assert 'success' in self.conf( 159 { 160 "127.0.0.1:7081": {"weight": w1}, 161 "127.0.0.1:7082": {"weight": w2}, 162 }, 163 'upstreams/one/servers', 164 ), 'configure weights' 165 166 def check_reqs(w1, w2, reqs=10): 167 resps = self.get_resps_sc(req=reqs) 168 assert resps[0] == reqs * w1 / (w1 + w2), 'weight 1' 169 assert resps[1] == reqs * w2 / (w1 + w2), 'weight 2' 170 171 def check_weights(w1, w2): 172 set_weights(w1, w2) 173 check_reqs(w1, w2) 174 175 check_weights(0, 1) 176 check_weights(0, 999999.0123456) 177 check_weights(1, 9) 178 check_weights(100000, 900000) 179 check_weights(1, 0.25) 180 check_weights(1, 0.25) 181 check_weights(0.2, 0.8) 182 check_weights(1, 1.5) 183 check_weights(1e-3, 1e-3) 184 check_weights(1e-20, 1e-20) 185 check_weights(1e4, 1e4) 186 check_weights(1000000, 1000000) 187 188 set_weights(0.25, 0.25) 189 assert 'success' in self.conf_delete( 190 'upstreams/one/servers/127.0.0.1:7081/weight' 191 ), 'delete weight' 192 check_reqs(1, 0.25) 193 194 assert 'success' in self.conf( 195 { 196 "127.0.0.1:7081": {"weight": 0.1}, 197 "127.0.0.1:7082": {"weight": 1}, 198 "127.0.0.1:7083": {"weight": 0.9}, 199 }, 200 'upstreams/one/servers', 201 ), 'configure weights' 202 resps = self.get_resps_sc(req=20) 203 assert resps[0] == 1, 'weight 3 1' 204 assert resps[1] == 10, 'weight 3 2' 205 assert resps[2] == 9, 'weight 3 3' 206 207 def test_upstreams_rr_independent(self): 208 def sum_resps(*args): 209 sum = [0] * len(args[0]) 210 for arg in args: 211 sum = [x + y for x, y in zip(sum, arg)] 212 213 return sum 214 215 resps = self.get_resps_sc(req=30, port=7090) 216 assert resps[0] == 15, 'dep two before 0' 217 assert resps[1] == 15, 'dep two before 1' 218 219 resps = self.get_resps_sc(req=30) 220 assert resps[0] == 15, 'dep one before 0' 221 assert resps[1] == 15, 'dep one before 1' 222 223 assert 'success' in self.conf( 224 '2', 'upstreams/two/servers/127.0.0.1:7081/weight' 225 ), 'configure dep weight' 226 227 resps = self.get_resps_sc(req=30, port=7090) 228 assert resps[0] == 20, 'dep two 0' 229 assert resps[1] == 10, 'dep two 1' 230 231 resps = self.get_resps_sc(req=30) 232 assert resps[0] == 15, 'dep one 0' 233 assert resps[1] == 15, 'dep one 1' 234 235 assert 'success' in self.conf( 236 '1', 'upstreams/two/servers/127.0.0.1:7081/weight' 237 ), 'configure dep weight 1' 238 239 r_one, r_two = [0, 0], [0, 0] 240 for _ in range(10): 241 r_one = sum_resps(r_one, self.get_resps(req=10)) 242 r_two = sum_resps(r_two, self.get_resps(req=10, port=7090)) 243 244 assert sum(r_one) == 100, 'dep one mix sum' 245 assert abs(r_one[0] - r_one[1]) <= self.cpu_count, 'dep one mix' 246 assert sum(r_two) == 100, 'dep two mix sum' 247 assert abs(r_two[0] - r_two[1]) <= self.cpu_count, 'dep two mix' 248 249 def test_upstreams_rr_delay(self): 250 assert 'success' in self.conf( 251 { 252 "listeners": { 253 "*:7080": {"pass": "upstreams/one"}, 254 "*:7081": {"pass": "routes"}, 255 "*:7082": {"pass": "routes"}, 256 }, 257 "upstreams": { 258 "one": { 259 "servers": { 260 "127.0.0.1:7081": {}, 261 "127.0.0.1:7082": {}, 262 }, 263 }, 264 }, 265 "routes": [ 266 { 267 "match": {"destination": "*:7081"}, 268 "action": {"pass": "applications/delayed"}, 269 }, 270 { 271 "match": {"destination": "*:7082"}, 272 "action": {"return": 201}, 273 }, 274 ], 275 "applications": { 276 "delayed": { 277 "type": self.get_application_type(), 278 "processes": {"spare": 0}, 279 "path": option.test_dir + "/python/delayed", 280 "working_directory": option.test_dir 281 + "/python/delayed", 282 "module": "wsgi", 283 } 284 }, 285 }, 286 ), 'upstreams initial configuration' 287 288 req = 50 289 290 socks = [] 291 for i in range(req): 292 delay = 1 if i % 5 == 0 else 0 293 _, sock = self.get( 294 headers={ 295 'Host': 'localhost', 296 'Content-Length': '0', 297 'X-Delay': str(delay), 298 'Connection': 'close', 299 }, 300 start=True, 301 no_recv=True, 302 ) 303 socks.append(sock) 304 305 resps = [0, 0] 306 for i in range(req): 307 resp = self.recvall(socks[i]).decode() 308 socks[i].close() 309 310 m = re.search(r'HTTP/1.1 20(\d)', resp) 311 assert m is not None, 'status' 312 resps[int(m.group(1))] += 1 313 314 assert sum(resps) == req, 'delay sum' 315 assert abs(resps[0] - resps[1]) <= self.cpu_count, 'delay' 316 317 def test_upstreams_rr_active_req(self): 318 conns = 5 319 socks = [] 320 socks2 = [] 321 322 for _ in range(conns): 323 _, sock = self.get(start=True, no_recv=True) 324 socks.append(sock) 325 326 _, sock2 = self.http( 327 b"""POST / HTTP/1.1 328Host: localhost 329Content-Length: 10 330Connection: close 331 332""", 333 start=True, 334 no_recv=True, 335 raw=True, 336 ) 337 socks2.append(sock2) 338 339 # Send one more request and read response to make sure that previous 340 # requests had enough time to reach server. 341 342 assert self.get()['body'] == '' 343 344 assert 'success' in self.conf( 345 {"127.0.0.1:7083": {"weight": 2}}, 'upstreams/one/servers', 346 ), 'active req new server' 347 assert 'success' in self.conf_delete( 348 'upstreams/one/servers/127.0.0.1:7083' 349 ), 'active req server remove' 350 assert 'success' in self.conf_delete( 351 'listeners/*:7080' 352 ), 'delete listener' 353 assert 'success' in self.conf_delete( 354 'upstreams/one' 355 ), 'active req upstream remove' 356 357 for i in range(conns): 358 assert ( 359 self.http(b'', sock=socks[i], raw=True)['body'] == '' 360 ), 'active req GET' 361 362 assert ( 363 self.http(b"""0123456789""", sock=socks2[i], raw=True)['body'] 364 == '' 365 ), 'active req POST' 366 367 def test_upstreams_rr_bad_server(self): 368 assert 'success' in self.conf( 369 {"weight": 1}, 'upstreams/one/servers/127.0.0.1:7084' 370 ), 'configure bad server' 371 372 resps = self.get_resps_sc(req=30) 373 assert resps[0] == 10, 'bad server 0' 374 assert resps[1] == 10, 'bad server 1' 375 assert sum(resps) == 20, 'bad server sum' 376 377 def test_upstreams_rr_pipeline(self): 378 resps = self.get_resps_sc() 379 380 assert resps[0] == 50, 'pipeline 0' 381 assert resps[1] == 50, 'pipeline 1' 382 383 def test_upstreams_rr_post(self): 384 resps = [0, 0] 385 for _ in range(50): 386 resps[self.get()['status'] % 10] += 1 387 resps[self.post(body='0123456789')['status'] % 10] += 1 388 389 assert sum(resps) == 100, 'post sum' 390 assert abs(resps[0] - resps[1]) <= self.cpu_count, 'post' 391 392 def test_upstreams_rr_unix(self, temp_dir): 393 addr_0 = temp_dir + '/sock_0' 394 addr_1 = temp_dir + '/sock_1' 395 396 assert 'success' in self.conf( 397 { 398 "*:7080": {"pass": "upstreams/one"}, 399 "unix:" + addr_0: {"pass": "routes/one"}, 400 "unix:" + addr_1: {"pass": "routes/two"}, 401 }, 402 'listeners', 403 ), 'configure listeners unix' 404 405 assert 'success' in self.conf( 406 {"unix:" + addr_0: {}, "unix:" + addr_1: {}}, 407 'upstreams/one/servers', 408 ), 'configure servers unix' 409 410 resps = self.get_resps_sc() 411 412 assert resps[0] == 50, 'unix 0' 413 assert resps[1] == 50, 'unix 1' 414 415 def test_upstreams_rr_ipv6(self): 416 assert 'success' in self.conf( 417 { 418 "*:7080": {"pass": "upstreams/one"}, 419 "[::1]:7081": {"pass": "routes/one"}, 420 "[::1]:7082": {"pass": "routes/two"}, 421 }, 422 'listeners', 423 ), 'configure listeners ipv6' 424 425 assert 'success' in self.conf( 426 {"[::1]:7081": {}, "[::1]:7082": {}}, 'upstreams/one/servers' 427 ), 'configure servers ipv6' 428 429 resps = self.get_resps_sc() 430 431 assert resps[0] == 50, 'ipv6 0' 432 assert resps[1] == 50, 'ipv6 1' 433 434 def test_upstreams_rr_servers_empty(self): 435 assert 'success' in self.conf( 436 {}, 'upstreams/one/servers' 437 ), 'configure servers empty' 438 assert self.get()['status'] == 502, 'servers empty' 439 440 assert 'success' in self.conf( 441 {"127.0.0.1:7081": {"weight": 0}}, 'upstreams/one/servers' 442 ), 'configure servers empty one' 443 assert self.get()['status'] == 502, 'servers empty one' 444 assert 'success' in self.conf( 445 { 446 "127.0.0.1:7081": {"weight": 0}, 447 "127.0.0.1:7082": {"weight": 0}, 448 }, 449 'upstreams/one/servers', 450 ), 'configure servers empty two' 451 assert self.get()['status'] == 502, 'servers empty two' 452 453 def test_upstreams_rr_invalid(self): 454 assert 'error' in self.conf({}, 'upstreams'), 'upstreams empty' 455 assert 'error' in self.conf( 456 {}, 'upstreams/one' 457 ), 'named upstreams empty' 458 assert 'error' in self.conf( 459 {}, 'upstreams/one/servers/127.0.0.1' 460 ), 'invalid address' 461 assert 'error' in self.conf( 462 {}, 'upstreams/one/servers/127.0.0.1:7081/blah' 463 ), 'invalid server option' 464 465 def check_weight(w): 466 assert 'error' in self.conf( 467 w, 'upstreams/one/servers/127.0.0.1:7081/weight' 468 ), 'invalid weight option' 469 470 check_weight({}) 471 check_weight('-1') 472 check_weight('1.') 473 check_weight('1.1.') 474 check_weight('.') 475 check_weight('.01234567890123') 476 check_weight('1000001') 477 check_weight('2e6') 478