1import fcntl 2import inspect 3import json 4import os 5import re 6import shutil 7import signal 8import stat 9import subprocess 10import sys 11import tempfile 12import time 13from multiprocessing import Process 14from pathlib import Path 15 16import pytest 17 18from unit.check.check_prerequisites import check_prerequisites 19from unit.check.discover_available import discover_available 20from unit.http import HTTP1 21from unit.log import Log 22from unit.log import print_log_on_assert 23from unit.option import option 24from unit.status import Status 25from unit.utils import check_findmnt 26from unit.utils import public_dir 27from unit.utils import waitforfiles 28from unit.utils import waitforunmount 29 30 31def pytest_addoption(parser): 32 parser.addoption( 33 "--detailed", 34 default=False, 35 action="store_true", 36 help="Detailed output for tests", 37 ) 38 parser.addoption( 39 "--print-log", 40 default=False, 41 action="store_true", 42 help="Print unit.log to stdout in case of errors", 43 ) 44 parser.addoption( 45 "--save-log", 46 default=False, 47 action="store_true", 48 help="Save unit.log after the test execution", 49 ) 50 parser.addoption( 51 "--unsafe", 52 default=False, 53 action="store_true", 54 help="Run unsafe tests", 55 ) 56 parser.addoption( 57 "--user", 58 type=str, 59 help="Default user for non-privileged processes of unitd", 60 ) 61 parser.addoption( 62 "--fds-threshold", 63 type=int, 64 default=0, 65 help="File descriptors threshold", 66 ) 67 parser.addoption( 68 "--restart", 69 default=False, 70 action="store_true", 71 help="Force Unit to restart after every test", 72 ) 73 74 75unit_instance = {} 76_processes = [] 77_fds_info = { 78 'main': {'fds': 0, 'skip': False}, 79 'router': {'name': 'unit: router', 'pid': -1, 'fds': 0, 'skip': False}, 80 'controller': { 81 'name': 'unit: controller', 82 'pid': -1, 83 'fds': 0, 84 'skip': False, 85 }, 86} 87http = HTTP1() 88is_findmnt = check_findmnt() 89 90 91def pytest_configure(config): 92 option.config = config.option 93 94 option.detailed = config.option.detailed 95 option.fds_threshold = config.option.fds_threshold 96 option.print_log = config.option.print_log 97 option.save_log = config.option.save_log 98 option.unsafe = config.option.unsafe 99 option.user = config.option.user 100 option.restart = config.option.restart 101 102 option.generated_tests = {} 103 option.current_dir = os.path.abspath( 104 os.path.join(os.path.dirname(__file__), os.pardir) 105 ) 106 option.test_dir = f'{option.current_dir}/test' 107 108 option.cache_dir = tempfile.mkdtemp(prefix='unit-test-cache-') 109 public_dir(option.cache_dir) 110 111 # set stdout to non-blocking 112 113 if option.detailed or option.print_log: 114 fcntl.fcntl(sys.stdout.fileno(), fcntl.F_SETFL, 0) 115 116 117def pytest_generate_tests(metafunc): 118 module = metafunc.module 119 if ( 120 not hasattr(module, 'client') 121 or not hasattr(module.client, 'application_type') 122 or module.client.application_type is None 123 or module.client.application_type == 'external' 124 ): 125 return 126 127 app_type = module.client.application_type 128 129 def generate_tests(versions): 130 if not versions: 131 pytest.skip('no available module versions') 132 133 metafunc.fixturenames.append('tmp_ct') 134 metafunc.parametrize('tmp_ct', versions) 135 136 for version in versions: 137 option.generated_tests[ 138 f'{metafunc.function.__name__} [{version}]' 139 ] = f'{app_type} {version}' 140 141 # take available module from option and generate tests for each version 142 143 available_modules = option.available['modules'] 144 145 for module, version in metafunc.module.prerequisites['modules'].items(): 146 if module in available_modules and available_modules[module]: 147 available_versions = available_modules[module] 148 149 if version == 'all': 150 generate_tests(available_versions) 151 152 elif version == 'any': 153 option.generated_tests[ 154 metafunc.function.__name__ 155 ] = f'{app_type} {available_versions[0]}' 156 elif callable(version): 157 generate_tests(list(filter(version, available_versions))) 158 159 else: 160 raise ValueError( 161 f''' 162Unexpected prerequisite version "{version}" for module "{module}". 163'all', 'any' or callable expected.''' 164 ) 165 166 167def pytest_sessionstart(): 168 unit = unit_run() 169 170 discover_available(unit) 171 172 _clear_conf() 173 174 unit_stop() 175 176 Log.check_alerts() 177 178 if option.restart: 179 shutil.rmtree(unit['temp_dir']) 180 else: 181 _clear_temp_dir() 182 183 184@pytest.hookimpl(tryfirst=True, hookwrapper=True) 185def pytest_runtest_makereport(item): 186 # execute all other hooks to obtain the report object 187 outcome = yield 188 rep = outcome.get_result() 189 190 # set a report attribute for each phase of a call, which can 191 # be "setup", "call", "teardown" 192 193 setattr(item, f'rep_{rep.when}', rep) 194 195 196@pytest.fixture(scope='module', autouse=True) 197def check_prerequisites_module(request): 198 if hasattr(request.module, 'prerequisites'): 199 check_prerequisites(request.module.prerequisites) 200 201 202@pytest.fixture(autouse=True) 203def run(request): 204 unit = unit_run() 205 206 option.skip_alerts = [ 207 r'read signalfd\(4\) failed', 208 r'sendmsg.+failed', 209 r'recvmsg.+failed', 210 ] 211 option.skip_sanitizer = False 212 213 _fds_info['main']['skip'] = False 214 _fds_info['router']['skip'] = False 215 _fds_info['controller']['skip'] = False 216 217 yield 218 219 # stop unit 220 221 error_stop_unit = unit_stop() 222 error_stop_processes = stop_processes() 223 224 # prepare log 225 226 with Log.open() as f: 227 log = f.read() 228 Log.set_pos(f.tell()) 229 230 if not option.save_log and option.restart: 231 shutil.rmtree(unit['temp_dir']) 232 Log.set_pos(0) 233 234 # clean temp_dir before the next test 235 236 if not option.restart: 237 _clear_conf(log=log) 238 _clear_temp_dir() 239 240 # check descriptors 241 242 _check_fds(log=log) 243 244 # check processes id's and amount 245 246 _check_processes() 247 248 # print unit.log in case of error 249 250 if hasattr(request.node, 'rep_call') and request.node.rep_call.failed: 251 Log.print_log(log) 252 253 if error_stop_unit or error_stop_processes: 254 Log.print_log(log) 255 256 # check unit.log for errors 257 258 assert error_stop_unit is None, 'stop unit' 259 assert error_stop_processes is None, 'stop processes' 260 261 Log.check_alerts(log=log) 262 263 264def unit_run(state_dir=None): 265 global unit_instance 266 267 if not option.restart and 'unitd' in unit_instance: 268 return unit_instance 269 270 builddir = f'{option.current_dir}/build' 271 libdir = f'{builddir}/lib' 272 modulesdir = f'{libdir}/unit/modules' 273 sbindir = f'{builddir}/sbin' 274 unitd = f'{sbindir}/unitd' 275 276 if not Path(unitd).is_file(): 277 sys.exit('Could not find unit') 278 279 temporary_dir = tempfile.mkdtemp(prefix='unit-test-') 280 option.temp_dir = temporary_dir 281 public_dir(temporary_dir) 282 283 if oct(stat.S_IMODE(Path(builddir).stat().st_mode)) != '0o777': 284 public_dir(builddir) 285 286 statedir = f'{temporary_dir}/state' if state_dir is None else state_dir 287 Path(statedir).mkdir(exist_ok=True) 288 289 control_sock = f'{temporary_dir}/control.unit.sock' 290 291 unitd_args = [ 292 unitd, 293 '--no-daemon', 294 '--modulesdir', 295 modulesdir, 296 '--statedir', 297 statedir, 298 '--pid', 299 f'{temporary_dir}/unit.pid', 300 '--log', 301 f'{temporary_dir}/unit.log', 302 '--control', 303 f'unix:{temporary_dir}/control.unit.sock', 304 '--tmpdir', 305 temporary_dir, 306 ] 307 308 if option.user: 309 unitd_args.extend(['--user', option.user]) 310 311 with open(f'{temporary_dir}/unit.log', 'w', encoding='utf-8') as log: 312 unit_instance['process'] = subprocess.Popen(unitd_args, stderr=log) 313 314 if not waitforfiles(control_sock): 315 Log.print_log() 316 sys.exit('Could not start unit') 317 318 unit_instance['temp_dir'] = temporary_dir 319 unit_instance['control_sock'] = control_sock 320 unit_instance['unitd'] = unitd 321 322 unit_instance['pid'] = ( 323 Path(f'{temporary_dir}/unit.pid').read_text(encoding='utf-8').rstrip() 324 ) 325 326 if state_dir is None: 327 _clear_conf() 328 329 _fds_info['main']['fds'] = _count_fds(unit_instance['pid']) 330 331 router = _fds_info['router'] 332 router['pid'] = pid_by_name(router['name']) 333 router['fds'] = _count_fds(router['pid']) 334 335 controller = _fds_info['controller'] 336 controller['pid'] = pid_by_name(controller['name']) 337 controller['fds'] = _count_fds(controller['pid']) 338 339 Status._check_zeros() 340 341 return unit_instance 342 343 344def unit_stop(): 345 if not option.restart: 346 if inspect.stack()[1].function.startswith('test_'): 347 pytest.skip('no restart mode') 348 349 return 350 351 # check zombies 352 353 out = subprocess.check_output( 354 ['ps', 'ax', '-o', 'state', '-o', 'ppid'] 355 ).decode() 356 z_ppids = re.findall(r'Z\s*(\d+)', out) 357 assert unit_instance['pid'] not in z_ppids, 'no zombies' 358 359 # terminate unit 360 361 p = unit_instance['process'] 362 363 if p.poll() is not None: 364 return 365 366 p.send_signal(signal.SIGQUIT) 367 368 try: 369 retcode = p.wait(15) 370 if retcode: 371 return f'Child process terminated with code {retcode}' 372 373 except KeyboardInterrupt: 374 p.kill() 375 raise 376 377 except: 378 p.kill() 379 return 'Could not terminate unit' 380 381 382@print_log_on_assert 383def _clear_conf(*, log=None): 384 sock = unit_instance['control_sock'] 385 386 resp = http.put( 387 url='/config', 388 sock_type='unix', 389 addr=sock, 390 body=json.dumps({"listeners": {}, "applications": {}}), 391 )['body'] 392 393 assert 'success' in resp, 'clear conf' 394 395 def get(url): 396 return http.get(url=url, sock_type='unix', addr=sock)['body'] 397 398 def delete(url): 399 return http.delete(url=url, sock_type='unix', addr=sock)['body'] 400 401 if ( 402 'openssl' in option.available['modules'] 403 and option.available['modules']['openssl'] 404 ): 405 try: 406 certs = json.loads(get('/certificates')).keys() 407 408 except json.JSONDecodeError: 409 pytest.fail("Can't parse certificates list.") 410 411 for cert in certs: 412 assert 'success' in delete(f'/certificates/{cert}'), 'delete cert' 413 414 if ( 415 'njs' in option.available['modules'] 416 and option.available['modules']['njs'] 417 ): 418 try: 419 scripts = json.loads(get('/js_modules')).keys() 420 421 except json.JSONDecodeError: 422 pytest.fail("Can't parse njs modules list.") 423 424 for script in scripts: 425 assert 'success' in delete(f'/js_modules/{script}'), 'delete script' 426 427 428def _clear_temp_dir(): 429 temporary_dir = unit_instance['temp_dir'] 430 431 if is_findmnt and not waitforunmount(temporary_dir, timeout=600): 432 sys.exit('Could not unmount filesystems in tmpdir ({temporary_dir}).') 433 434 for item in Path(temporary_dir).iterdir(): 435 if item.name not in [ 436 'control.unit.sock', 437 'state', 438 'unit.pid', 439 'unit.log', 440 ]: 441 442 public_dir(item) 443 444 if item.is_file() or stat.S_ISSOCK(item.stat().st_mode): 445 item.unlink() 446 else: 447 for _ in range(10): 448 try: 449 shutil.rmtree(item) 450 break 451 except OSError as err: 452 # OSError: [Errno 16] Device or resource busy 453 # OSError: [Errno 39] Directory not empty 454 if err.errno not in [16, 39]: 455 raise 456 time.sleep(1) 457 458 459def _check_processes(): 460 router_pid = _fds_info['router']['pid'] 461 controller_pid = _fds_info['controller']['pid'] 462 main_pid = unit_instance['pid'] 463 464 for _ in range(600): 465 out = ( 466 subprocess.check_output( 467 ['ps', '-ax', '-o', 'pid', '-o', 'ppid', '-o', 'command'] 468 ) 469 .decode() 470 .splitlines() 471 ) 472 out = [l for l in out if main_pid in l] 473 474 if len(out) <= 3: 475 break 476 477 time.sleep(0.1) 478 479 if option.restart: 480 assert len(out) == 0, 'all termimated' 481 return 482 483 assert len(out) == 3, 'main, router, and controller expected' 484 485 out = [l for l in out if 'unit: main' not in l] 486 assert len(out) == 2, 'one main' 487 488 out = [ 489 l 490 for l in out 491 if re.search(fr'{router_pid}\s+{main_pid}.*unit: router', l) is None 492 ] 493 assert len(out) == 1, 'one router' 494 495 out = [ 496 l 497 for l in out 498 if re.search(fr'{controller_pid}\s+{main_pid}.*unit: controller', l) 499 is None 500 ] 501 assert len(out) == 0, 'one controller' 502 503 504@print_log_on_assert 505def _check_fds(*, log=None): 506 def waitforfds(diff): 507 for _ in range(600): 508 fds_diff = diff() 509 510 if fds_diff <= option.fds_threshold: 511 break 512 513 time.sleep(0.1) 514 515 return fds_diff 516 517 ps = _fds_info['main'] 518 if not ps['skip']: 519 fds_diff = waitforfds( 520 lambda: _count_fds(unit_instance['pid']) - ps['fds'] 521 ) 522 ps['fds'] += fds_diff 523 524 assert fds_diff <= option.fds_threshold, 'descriptors leak main process' 525 526 else: 527 ps['fds'] = _count_fds(unit_instance['pid']) 528 529 for name in ['controller', 'router']: 530 ps = _fds_info[name] 531 ps_pid = ps['pid'] 532 ps['pid'] = pid_by_name(ps['name']) 533 534 if not ps['skip']: 535 fds_diff = waitforfds(lambda: _count_fds(ps['pid']) - ps['fds']) 536 ps['fds'] += fds_diff 537 538 if not option.restart: 539 assert ps['pid'] == ps_pid, f'same pid {name}' 540 541 assert fds_diff <= option.fds_threshold, f'descriptors leak {name}' 542 543 else: 544 ps['fds'] = _count_fds(ps['pid']) 545 546 547def _count_fds(pid): 548 procfile = Path(f'/proc/{pid}/fd') 549 if procfile.is_dir(): 550 return len(list(procfile.iterdir())) 551 552 try: 553 out = subprocess.check_output( 554 ['procstat', '-f', pid], 555 stderr=subprocess.STDOUT, 556 ).decode() 557 return len(out.splitlines()) 558 559 except (FileNotFoundError, TypeError, subprocess.CalledProcessError): 560 pass 561 562 try: 563 out = subprocess.check_output( 564 ['lsof', '-n', '-p', pid], 565 stderr=subprocess.STDOUT, 566 ).decode() 567 return len(out.splitlines()) 568 569 except (FileNotFoundError, TypeError, subprocess.CalledProcessError): 570 pass 571 572 return 0 573 574 575def run_process(target, *args): 576 global _processes 577 578 process = Process(target=target, args=args) 579 process.start() 580 581 _processes.append(process) 582 583 584def stop_processes(): 585 if not _processes: 586 return 587 588 fail = False 589 for process in _processes: 590 if process.is_alive(): 591 process.terminate() 592 process.join(timeout=15) 593 594 if process.is_alive(): 595 fail = True 596 597 if fail: 598 return 'Fail to stop process(es)' 599 600 601def pid_by_name(name): 602 output = subprocess.check_output(['ps', 'ax', '-O', 'ppid']).decode() 603 m = re.search(fr'\s*(\d+)\s*{unit_instance["pid"]}.*{name}', output) 604 return None if m is None else m.group(1) 605 606 607def find_proc(name, ps_output): 608 return re.findall(f'{unit_instance["pid"]}.*{name}', ps_output) 609 610 611def pytest_sessionfinish(): 612 if not option.restart and option.save_log: 613 Log.print_path() 614 615 option.restart = True 616 617 unit_stop() 618 619 public_dir(option.cache_dir) 620 shutil.rmtree(option.cache_dir) 621 622 if not option.save_log and Path(option.temp_dir).is_dir(): 623 public_dir(option.temp_dir) 624 shutil.rmtree(option.temp_dir) 625 626 627@pytest.fixture 628def date_to_sec_epoch(): 629 def _date_to_sec_epoch(date, template='%a, %d %b %Y %X %Z'): 630 return time.mktime(time.strptime(date, template)) 631 632 return _date_to_sec_epoch 633 634 635@pytest.fixture 636def findall(): 637 def _findall(*args, **kwargs): 638 return Log.findall(*args, **kwargs) 639 640 return _findall 641 642 643@pytest.fixture 644def is_su(): 645 return option.is_privileged 646 647 648@pytest.fixture 649def is_unsafe(request): 650 return request.config.getoption("--unsafe") 651 652 653@pytest.fixture 654def require(): 655 return check_prerequisites 656 657 658@pytest.fixture 659def search_in_file(): 660 def _search_in_file(pattern, name='unit.log', flags=re.M): 661 return re.search(pattern, Log.read(name), flags) 662 663 return _search_in_file 664 665 666@pytest.fixture 667def sec_epoch(): 668 return time.mktime(time.gmtime()) 669 670 671@pytest.fixture() 672def skip_alert(): 673 def _skip(*alerts): 674 option.skip_alerts.extend(alerts) 675 676 return _skip 677 678 679@pytest.fixture() 680def skip_fds_check(): 681 def _skip(main=False, router=False, controller=False): 682 _fds_info['main']['skip'] = main 683 _fds_info['router']['skip'] = router 684 _fds_info['controller']['skip'] = controller 685 686 return _skip 687 688 689@pytest.fixture() 690def system(): 691 return option.system 692 693 694@pytest.fixture 695def temp_dir(): 696 return unit_instance['temp_dir'] 697 698 699@pytest.fixture 700def unit_pid(): 701 return unit_instance['process'].pid 702 703 704@pytest.fixture 705def wait_for_record(): 706 def _wait_for_record(*args, **kwargs): 707 return Log.wait_for_record(*args, **kwargs) 708 709 return _wait_for_record 710