1import os 2import re 3 4import pytest 5 6from unit.applications.lang.python import ApplicationPython 7from unit.option import option 8 9prerequisites = {'modules': {'python': 'any'}} 10 11client = ApplicationPython() 12 13 14@pytest.fixture(autouse=True) 15def setup_method_fixture(): 16 assert 'success' in client.conf( 17 { 18 "listeners": { 19 "*:8080": {"pass": "upstreams/one"}, 20 "*:8090": {"pass": "upstreams/two"}, 21 "*:8081": {"pass": "routes/one"}, 22 "*:8082": {"pass": "routes/two"}, 23 "*:8083": {"pass": "routes/three"}, 24 }, 25 "upstreams": { 26 "one": { 27 "servers": { 28 "127.0.0.1:8081": {}, 29 "127.0.0.1:8082": {}, 30 }, 31 }, 32 "two": { 33 "servers": { 34 "127.0.0.1:8081": {}, 35 "127.0.0.1:8082": {}, 36 }, 37 }, 38 }, 39 "routes": { 40 "one": [{"action": {"return": 200}}], 41 "two": [{"action": {"return": 201}}], 42 "three": [{"action": {"return": 202}}], 43 }, 44 "applications": {}, 45 }, 46 ), 'upstreams initial configuration' 47 48 client.cpu_count = os.cpu_count() 49 50 51def get_resps(req=100, port=8080): 52 resps = [0] 53 54 for _ in range(req): 55 status = client.get(port=port)['status'] 56 if 200 > status or status > 209: 57 continue 58 59 ups = status % 10 60 if ups > len(resps) - 1: 61 resps.extend([0] * (ups - len(resps) + 1)) 62 63 resps[ups] += 1 64 65 return resps 66 67 68def get_resps_sc(req=100, port=8080): 69 to_send = b"""GET / HTTP/1.1 70Host: localhost 71 72""" * ( 73 req - 1 74 ) 75 76 to_send += b"""GET / HTTP/1.1 77Host: localhost 78Connection: close 79 80""" 81 82 resp = client.http(to_send, raw_resp=True, raw=True, port=port) 83 status = re.findall(r'HTTP\/\d\.\d\s(\d\d\d)', resp) 84 status = list(filter(lambda x: x[:2] == '20', status)) 85 ups = list(map(lambda x: int(x[-1]), status)) 86 87 resps = [0] * (max(ups) + 1) 88 for _, up in enumerate(ups): 89 resps[up] += 1 90 91 return resps 92 93 94def test_upstreams_rr_no_weight(): 95 resps = get_resps() 96 assert sum(resps) == 100, 'no weight sum' 97 assert abs(resps[0] - resps[1]) <= client.cpu_count, 'no weight' 98 99 assert 'success' in client.conf_delete( 100 'upstreams/one/servers/127.0.0.1:8081' 101 ), 'no weight server remove' 102 103 resps = get_resps(req=50) 104 assert resps[1] == 50, 'no weight 2' 105 106 assert 'success' in client.conf( 107 {}, 'upstreams/one/servers/127.0.0.1:8081' 108 ), 'no weight server revert' 109 110 resps = get_resps() 111 assert sum(resps) == 100, 'no weight 3 sum' 112 assert abs(resps[0] - resps[1]) <= client.cpu_count, 'no weight 3' 113 114 assert 'success' in client.conf( 115 {}, 'upstreams/one/servers/127.0.0.1:8083' 116 ), 'no weight server new' 117 118 resps = get_resps() 119 assert sum(resps) == 100, 'no weight 4 sum' 120 assert max(resps) - min(resps) <= client.cpu_count, 'no weight 4' 121 122 resps = get_resps_sc(req=30) 123 assert resps[0] == 10, 'no weight 4 0' 124 assert resps[1] == 10, 'no weight 4 1' 125 assert resps[2] == 10, 'no weight 4 2' 126 127 128def test_upstreams_rr_weight(): 129 assert 'success' in client.conf( 130 {"weight": 3}, 'upstreams/one/servers/127.0.0.1:8081' 131 ), 'configure weight' 132 133 resps = get_resps_sc() 134 assert resps[0] == 75, 'weight 3 0' 135 assert resps[1] == 25, 'weight 3 1' 136 137 assert 'success' in client.conf_delete( 138 'upstreams/one/servers/127.0.0.1:8081/weight' 139 ), 'configure weight remove' 140 resps = get_resps_sc(req=10) 141 assert resps[0] == 5, 'weight 0 0' 142 assert resps[1] == 5, 'weight 0 1' 143 144 assert 'success' in client.conf( 145 '1', 'upstreams/one/servers/127.0.0.1:8081/weight' 146 ), 'configure weight 1' 147 148 resps = get_resps_sc() 149 assert resps[0] == 50, 'weight 1 0' 150 assert resps[1] == 50, 'weight 1 1' 151 152 assert 'success' in client.conf( 153 { 154 "127.0.0.1:8081": {"weight": 3}, 155 "127.0.0.1:8083": {"weight": 2}, 156 }, 157 'upstreams/one/servers', 158 ), 'configure weight 2' 159 160 resps = get_resps_sc() 161 assert resps[0] == 60, 'weight 2 0' 162 assert resps[2] == 40, 'weight 2 1' 163 164 165def test_upstreams_rr_weight_rational(): 166 def set_weights(w1, w2): 167 assert 'success' in client.conf( 168 { 169 "127.0.0.1:8081": {"weight": w1}, 170 "127.0.0.1:8082": {"weight": w2}, 171 }, 172 'upstreams/one/servers', 173 ), 'configure weights' 174 175 def check_reqs(w1, w2, reqs=10): 176 resps = get_resps_sc(req=reqs) 177 assert resps[0] == reqs * w1 / (w1 + w2), 'weight 1' 178 assert resps[1] == reqs * w2 / (w1 + w2), 'weight 2' 179 180 def check_weights(w1, w2): 181 set_weights(w1, w2) 182 check_reqs(w1, w2) 183 184 check_weights(0, 1) 185 check_weights(0, 999999.0123456) 186 check_weights(1, 9) 187 check_weights(100000, 900000) 188 check_weights(1, 0.25) 189 check_weights(1, 0.25) 190 check_weights(0.2, 0.8) 191 check_weights(1, 1.5) 192 check_weights(1e-3, 1e-3) 193 check_weights(1e-20, 1e-20) 194 check_weights(1e4, 1e4) 195 check_weights(1000000, 1000000) 196 197 set_weights(0.25, 0.25) 198 assert 'success' in client.conf_delete( 199 'upstreams/one/servers/127.0.0.1:8081/weight' 200 ), 'delete weight' 201 check_reqs(1, 0.25) 202 203 assert 'success' in client.conf( 204 { 205 "127.0.0.1:8081": {"weight": 0.1}, 206 "127.0.0.1:8082": {"weight": 1}, 207 "127.0.0.1:8083": {"weight": 0.9}, 208 }, 209 'upstreams/one/servers', 210 ), 'configure weights' 211 resps = get_resps_sc(req=20) 212 assert resps[0] == 1, 'weight 3 1' 213 assert resps[1] == 10, 'weight 3 2' 214 assert resps[2] == 9, 'weight 3 3' 215 216 217def test_upstreams_rr_independent(): 218 def sum_resps(*args): 219 sum_r = [0] * len(args[0]) 220 for arg in args: 221 sum_r = [x + y for x, y in zip(sum_r, arg)] 222 223 return sum_r 224 225 resps = get_resps_sc(req=30, port=8090) 226 assert resps[0] == 15, 'dep two before 0' 227 assert resps[1] == 15, 'dep two before 1' 228 229 resps = get_resps_sc(req=30) 230 assert resps[0] == 15, 'dep one before 0' 231 assert resps[1] == 15, 'dep one before 1' 232 233 assert 'success' in client.conf( 234 '2', 'upstreams/two/servers/127.0.0.1:8081/weight' 235 ), 'configure dep weight' 236 237 resps = get_resps_sc(req=30, port=8090) 238 assert resps[0] == 20, 'dep two 0' 239 assert resps[1] == 10, 'dep two 1' 240 241 resps = get_resps_sc(req=30) 242 assert resps[0] == 15, 'dep one 0' 243 assert resps[1] == 15, 'dep one 1' 244 245 assert 'success' in client.conf( 246 '1', 'upstreams/two/servers/127.0.0.1:8081/weight' 247 ), 'configure dep weight 1' 248 249 r_one, r_two = [0, 0], [0, 0] 250 for _ in range(10): 251 r_one = sum_resps(r_one, get_resps(req=10)) 252 r_two = sum_resps(r_two, get_resps(req=10, port=8090)) 253 254 assert sum(r_one) == 100, 'dep one mix sum' 255 assert abs(r_one[0] - r_one[1]) <= client.cpu_count, 'dep one mix' 256 assert sum(r_two) == 100, 'dep two mix sum' 257 assert abs(r_two[0] - r_two[1]) <= client.cpu_count, 'dep two mix' 258 259 260def test_upstreams_rr_delay(): 261 delayed_dir = f'{option.test_dir}/python/delayed' 262 assert 'success' in client.conf( 263 { 264 "listeners": { 265 "*:8080": {"pass": "upstreams/one"}, 266 "*:8081": {"pass": "routes"}, 267 "*:8082": {"pass": "routes"}, 268 }, 269 "upstreams": { 270 "one": { 271 "servers": { 272 "127.0.0.1:8081": {}, 273 "127.0.0.1:8082": {}, 274 }, 275 }, 276 }, 277 "routes": [ 278 { 279 "match": {"destination": "*:8081"}, 280 "action": {"pass": "applications/delayed"}, 281 }, 282 { 283 "match": {"destination": "*:8082"}, 284 "action": {"return": 201}, 285 }, 286 ], 287 "applications": { 288 "delayed": { 289 "type": client.get_application_type(), 290 "processes": {"spare": 0}, 291 "path": delayed_dir, 292 "working_directory": delayed_dir, 293 "module": "wsgi", 294 } 295 }, 296 }, 297 ), 'upstreams initial configuration' 298 299 req = 50 300 301 socks = [] 302 for i in range(req): 303 delay = 1 if i % 5 == 0 else 0 304 sock = client.get( 305 headers={ 306 'Host': 'localhost', 307 'Content-Length': '0', 308 'X-Delay': str(delay), 309 'Connection': 'close', 310 }, 311 no_recv=True, 312 ) 313 socks.append(sock) 314 315 resps = [0, 0] 316 for i in range(req): 317 resp = client.recvall(socks[i]).decode() 318 socks[i].close() 319 320 m = re.search(r'HTTP/1.1 20(\d)', resp) 321 assert m is not None, 'status' 322 resps[int(m.group(1))] += 1 323 324 assert sum(resps) == req, 'delay sum' 325 assert abs(resps[0] - resps[1]) <= client.cpu_count, 'delay' 326 327 328def test_upstreams_rr_active_req(): 329 conns = 5 330 socks = [] 331 socks2 = [] 332 333 for _ in range(conns): 334 sock = client.get(no_recv=True) 335 socks.append(sock) 336 337 sock2 = client.http( 338 b"""POST / HTTP/1.1 339Host: localhost 340Content-Length: 10 341Connection: close 342 343""", 344 no_recv=True, 345 raw=True, 346 ) 347 socks2.append(sock2) 348 349 # Send one more request and read response to make sure that previous 350 # requests had enough time to reach server. 351 352 assert client.get()['body'] == '' 353 354 assert 'success' in client.conf( 355 {"127.0.0.1:8083": {"weight": 2}}, 356 'upstreams/one/servers', 357 ), 'active req new server' 358 assert 'success' in client.conf_delete( 359 'upstreams/one/servers/127.0.0.1:8083' 360 ), 'active req server remove' 361 assert 'success' in client.conf_delete( 362 'listeners/*:8080' 363 ), 'delete listener' 364 assert 'success' in client.conf_delete( 365 'upstreams/one' 366 ), 'active req upstream remove' 367 368 for i in range(conns): 369 assert ( 370 client.http(b'', sock=socks[i], raw=True)['body'] == '' 371 ), 'active req GET' 372 373 assert ( 374 client.http(b"""0123456789""", sock=socks2[i], raw=True)['body'] 375 == '' 376 ), 'active req POST' 377 378 379def test_upstreams_rr_bad_server(): 380 assert 'success' in client.conf( 381 {"weight": 1}, 'upstreams/one/servers/127.0.0.1:8084' 382 ), 'configure bad server' 383 384 resps = get_resps_sc(req=30) 385 assert resps[0] == 10, 'bad server 0' 386 assert resps[1] == 10, 'bad server 1' 387 assert sum(resps) == 20, 'bad server sum' 388 389 390def test_upstreams_rr_pipeline(): 391 resps = get_resps_sc() 392 393 assert resps[0] == 50, 'pipeline 0' 394 assert resps[1] == 50, 'pipeline 1' 395 396 397def test_upstreams_rr_post(): 398 resps = [0, 0] 399 for _ in range(50): 400 resps[client.get()['status'] % 10] += 1 401 resps[client.post(body='0123456789')['status'] % 10] += 1 402 403 assert sum(resps) == 100, 'post sum' 404 assert abs(resps[0] - resps[1]) <= client.cpu_count, 'post' 405 406 407def test_upstreams_rr_unix(temp_dir): 408 addr_0 = f'{temp_dir}/sock_0' 409 addr_1 = f'{temp_dir}/sock_1' 410 411 assert 'success' in client.conf( 412 { 413 "*:8080": {"pass": "upstreams/one"}, 414 f"unix:{addr_0}": {"pass": "routes/one"}, 415 f"unix:{addr_1}": {"pass": "routes/two"}, 416 }, 417 'listeners', 418 ), 'configure listeners unix' 419 420 assert 'success' in client.conf( 421 {f"unix:{addr_0}": {}, f"unix:{addr_1}": {}}, 422 'upstreams/one/servers', 423 ), 'configure servers unix' 424 425 resps = get_resps_sc() 426 427 assert resps[0] == 50, 'unix 0' 428 assert resps[1] == 50, 'unix 1' 429 430 431def test_upstreams_rr_ipv6(): 432 assert 'success' in client.conf( 433 { 434 "*:8080": {"pass": "upstreams/one"}, 435 "[::1]:8081": {"pass": "routes/one"}, 436 "[::1]:8082": {"pass": "routes/two"}, 437 }, 438 'listeners', 439 ), 'configure listeners ipv6' 440 441 assert 'success' in client.conf( 442 {"[::1]:8081": {}, "[::1]:8082": {}}, 'upstreams/one/servers' 443 ), 'configure servers ipv6' 444 445 resps = get_resps_sc() 446 447 assert resps[0] == 50, 'ipv6 0' 448 assert resps[1] == 50, 'ipv6 1' 449 450 451def test_upstreams_rr_servers_empty(): 452 assert 'success' in client.conf( 453 {}, 'upstreams/one/servers' 454 ), 'configure servers empty' 455 assert client.get()['status'] == 502, 'servers empty' 456 457 assert 'success' in client.conf( 458 {"127.0.0.1:8081": {"weight": 0}}, 'upstreams/one/servers' 459 ), 'configure servers empty one' 460 assert client.get()['status'] == 502, 'servers empty one' 461 assert 'success' in client.conf( 462 { 463 "127.0.0.1:8081": {"weight": 0}, 464 "127.0.0.1:8082": {"weight": 0}, 465 }, 466 'upstreams/one/servers', 467 ), 'configure servers empty two' 468 assert client.get()['status'] == 502, 'servers empty two' 469 470 471def test_upstreams_rr_invalid(): 472 assert 'error' in client.conf({}, 'upstreams'), 'upstreams empty' 473 assert 'error' in client.conf({}, 'upstreams/one'), 'named upstreams empty' 474 assert 'error' in client.conf( 475 {}, 'upstreams/one/servers/127.0.0.1' 476 ), 'invalid address' 477 assert 'error' in client.conf( 478 {}, 'upstreams/one/servers/127.0.0.1:8081/blah' 479 ), 'invalid server option' 480 481 def check_weight(w): 482 assert 'error' in client.conf( 483 w, 'upstreams/one/servers/127.0.0.1:8081/weight' 484 ), 'invalid weight option' 485 486 check_weight({}) 487 check_weight('-1') 488 check_weight('1.') 489 check_weight('1.1.') 490 check_weight('.') 491 check_weight('.01234567890123') 492 check_weight('1000001') 493 check_weight('2e6') 494