1import fcntl 2import inspect 3import json 4import os 5import platform 6import re 7import shutil 8import signal 9import stat 10import subprocess 11import sys 12import tempfile 13import time 14from multiprocessing import Process 15 16import pytest 17from unit.check.chroot import check_chroot 18from unit.check.go import check_go 19from unit.check.isolation import check_isolation 20from unit.check.node import check_node 21from unit.check.regex import check_regex 22from unit.check.tls import check_openssl 23from unit.http import TestHTTP 24from unit.log import Log 25from unit.option import option 26from unit.utils import public_dir 27from unit.utils import waitforfiles 28 29 30def pytest_addoption(parser): 31 parser.addoption( 32 "--detailed", 33 default=False, 34 action="store_true", 35 help="Detailed output for tests", 36 ) 37 parser.addoption( 38 "--print-log", 39 default=False, 40 action="store_true", 41 help="Print unit.log to stdout in case of errors", 42 ) 43 parser.addoption( 44 "--save-log", 45 default=False, 46 action="store_true", 47 help="Save unit.log after the test execution", 48 ) 49 parser.addoption( 50 "--unsafe", 51 default=False, 52 action="store_true", 53 help="Run unsafe tests", 54 ) 55 parser.addoption( 56 "--user", 57 type=str, 58 help="Default user for non-privileged processes of unitd", 59 ) 60 parser.addoption( 61 "--fds-threshold", 62 type=int, 63 default=0, 64 help="File descriptors threshold", 65 ) 66 parser.addoption( 67 "--restart", 68 default=False, 69 action="store_true", 70 help="Force Unit to restart after every test", 71 ) 72 73 74unit_instance = {} 75_processes = [] 76_fds_info = { 77 'main': {'fds': 0, 'skip': False}, 78 'router': {'name': 'unit: router', 'pid': -1, 'fds': 0, 'skip': False}, 79 'controller': { 80 'name': 'unit: controller', 81 'pid': -1, 82 'fds': 0, 83 'skip': False, 84 }, 85} 86http = TestHTTP() 87 88 89def pytest_configure(config): 90 option.config = config.option 91 92 option.detailed = config.option.detailed 93 option.fds_threshold = config.option.fds_threshold 94 option.print_log = config.option.print_log 95 option.save_log = config.option.save_log 96 option.unsafe = config.option.unsafe 97 option.user = config.option.user 98 option.restart = config.option.restart 99 100 option.generated_tests = {} 101 option.current_dir = os.path.abspath( 102 os.path.join(os.path.dirname(__file__), os.pardir) 103 ) 104 option.test_dir = option.current_dir + '/test' 105 option.architecture = platform.architecture()[0] 106 option.system = platform.system() 107 108 option.cache_dir = tempfile.mkdtemp(prefix='unit-test-cache-') 109 public_dir(option.cache_dir) 110 111 # set stdout to non-blocking 112 113 if option.detailed or option.print_log: 114 fcntl.fcntl(sys.stdout.fileno(), fcntl.F_SETFL, 0) 115 116 117def print_log_on_assert(func): 118 def inner_function(*args, **kwargs): 119 try: 120 func(*args, **kwargs) 121 except AssertionError as e: 122 _print_log(kwargs.get('log', None)) 123 raise e 124 125 return inner_function 126 127 128def pytest_generate_tests(metafunc): 129 cls = metafunc.cls 130 if ( 131 not hasattr(cls, 'application_type') 132 or cls.application_type == None 133 or cls.application_type == 'external' 134 ): 135 return 136 137 type = cls.application_type 138 139 def generate_tests(versions): 140 metafunc.fixturenames.append('tmp_ct') 141 metafunc.parametrize('tmp_ct', versions) 142 143 for version in versions: 144 option.generated_tests[ 145 metafunc.function.__name__ + '[{}]'.format(version) 146 ] = (type + ' ' + version) 147 148 # take available module from option and generate tests for each version 149 150 for module, prereq_version in cls.prerequisites['modules'].items(): 151 if module in option.available['modules']: 152 available_versions = option.available['modules'][module] 153 154 if prereq_version == 'all': 155 generate_tests(available_versions) 156 157 elif prereq_version == 'any': 158 option.generated_tests[metafunc.function.__name__] = ( 159 type + ' ' + available_versions[0] 160 ) 161 elif callable(prereq_version): 162 generate_tests(list(filter(prereq_version, available_versions))) 163 164 else: 165 raise ValueError( 166 """ 167Unexpected prerequisite version "%s" for module "%s" in %s. 168'all', 'any' or callable expected.""" 169 % (str(prereq_version), module, str(cls)) 170 ) 171 172 173def pytest_sessionstart(session): 174 option.available = {'modules': {}, 'features': {}} 175 176 unit = unit_run() 177 178 # read unit.log 179 180 for i in range(50): 181 with open(Log.get_path(), 'r') as f: 182 log = f.read() 183 m = re.search('controller started', log) 184 185 if m is None: 186 time.sleep(0.1) 187 else: 188 break 189 190 if m is None: 191 _print_log(log) 192 exit("Unit is writing log too long") 193 194 # discover available modules from unit.log 195 196 for module in re.findall(r'module: ([a-zA-Z]+) (.*) ".*"$', log, re.M): 197 versions = option.available['modules'].setdefault(module[0], []) 198 if module[1] not in versions: 199 versions.append(module[1]) 200 201 # discover modules from check 202 203 option.available['modules']['openssl'] = check_openssl(unit['unitd']) 204 option.available['modules']['go'] = check_go() 205 option.available['modules']['node'] = check_node(option.current_dir) 206 option.available['modules']['regex'] = check_regex(unit['unitd']) 207 208 # remove None values 209 210 option.available['modules'] = { 211 k: v for k, v in option.available['modules'].items() if v is not None 212 } 213 214 check_chroot() 215 check_isolation() 216 217 _clear_conf(unit['temp_dir'] + '/control.unit.sock') 218 219 unit_stop() 220 221 _check_alerts() 222 223 if option.restart: 224 shutil.rmtree(unit_instance['temp_dir']) 225 226 227@pytest.hookimpl(tryfirst=True, hookwrapper=True) 228def pytest_runtest_makereport(item, call): 229 # execute all other hooks to obtain the report object 230 outcome = yield 231 rep = outcome.get_result() 232 233 # set a report attribute for each phase of a call, which can 234 # be "setup", "call", "teardown" 235 236 setattr(item, "rep_" + rep.when, rep) 237 238 239@pytest.fixture(scope='class', autouse=True) 240def check_prerequisites(request): 241 cls = request.cls 242 missed = [] 243 244 # check modules 245 246 if 'modules' in cls.prerequisites: 247 available_modules = list(option.available['modules'].keys()) 248 249 for module in cls.prerequisites['modules']: 250 if module in available_modules: 251 continue 252 253 missed.append(module) 254 255 if missed: 256 pytest.skip('Unit has no ' + ', '.join(missed) + ' module(s)') 257 258 # check features 259 260 if 'features' in cls.prerequisites: 261 available_features = list(option.available['features'].keys()) 262 263 for feature in cls.prerequisites['features']: 264 if feature in available_features: 265 continue 266 267 missed.append(feature) 268 269 if missed: 270 pytest.skip(', '.join(missed) + ' feature(s) not supported') 271 272 273@pytest.fixture(autouse=True) 274def run(request): 275 unit = unit_run() 276 277 option.skip_alerts = [ 278 r'read signalfd\(4\) failed', 279 r'sendmsg.+failed', 280 r'recvmsg.+failed', 281 ] 282 option.skip_sanitizer = False 283 284 _fds_info['main']['skip'] = False 285 _fds_info['router']['skip'] = False 286 _fds_info['controller']['skip'] = False 287 288 yield 289 290 # stop unit 291 292 error_stop_unit = unit_stop() 293 error_stop_processes = stop_processes() 294 295 # prepare log 296 297 with Log.open(encoding='utf-8') as f: 298 log = f.read() 299 Log.set_pos(f.tell()) 300 301 if not option.save_log and option.restart: 302 shutil.rmtree(unit['temp_dir']) 303 Log.set_pos(0) 304 305 # clean temp_dir before the next test 306 307 if not option.restart: 308 _clear_conf(unit['temp_dir'] + '/control.unit.sock', log=log) 309 310 for item in os.listdir(unit['temp_dir']): 311 if item not in [ 312 'control.unit.sock', 313 'state', 314 'unit.pid', 315 'unit.log', 316 ]: 317 path = os.path.join(unit['temp_dir'], item) 318 319 public_dir(path) 320 321 if os.path.isfile(path) or stat.S_ISSOCK(os.stat(path).st_mode): 322 os.remove(path) 323 else: 324 for attempt in range(10): 325 try: 326 shutil.rmtree(path) 327 break 328 except OSError as err: 329 if err.errno != 16: 330 raise 331 time.sleep(1) 332 333 # check descriptors 334 335 _check_fds(log=log) 336 337 # check processes id's and amount 338 339 _check_processes() 340 341 # print unit.log in case of error 342 343 if hasattr(request.node, 'rep_call') and request.node.rep_call.failed: 344 _print_log(log) 345 346 if error_stop_unit or error_stop_processes: 347 _print_log(log) 348 349 # check unit.log for errors 350 351 assert error_stop_unit is None, 'stop unit' 352 assert error_stop_processes is None, 'stop processes' 353 354 _check_alerts(log=log) 355 356 357def unit_run(state_dir=None): 358 global unit_instance 359 360 if not option.restart and 'unitd' in unit_instance: 361 return unit_instance 362 363 build_dir = option.current_dir + '/build' 364 unitd = build_dir + '/unitd' 365 366 if not os.path.isfile(unitd): 367 exit('Could not find unit') 368 369 temp_dir = tempfile.mkdtemp(prefix='unit-test-') 370 public_dir(temp_dir) 371 372 if oct(stat.S_IMODE(os.stat(build_dir).st_mode)) != '0o777': 373 public_dir(build_dir) 374 375 state = temp_dir + '/state' if state_dir is None else state_dir 376 if not os.path.isdir(state): 377 os.mkdir(state) 378 379 unitd_args = [ 380 unitd, 381 '--no-daemon', 382 '--modules', 383 build_dir, 384 '--state', 385 state, 386 '--pid', 387 temp_dir + '/unit.pid', 388 '--log', 389 temp_dir + '/unit.log', 390 '--control', 391 'unix:' + temp_dir + '/control.unit.sock', 392 '--tmp', 393 temp_dir, 394 ] 395 396 if option.user: 397 unitd_args.extend(['--user', option.user]) 398 399 with open(temp_dir + '/unit.log', 'w') as log: 400 unit_instance['process'] = subprocess.Popen(unitd_args, stderr=log) 401 402 Log.temp_dir = temp_dir 403 404 if not waitforfiles(temp_dir + '/control.unit.sock'): 405 _print_log() 406 exit('Could not start unit') 407 408 unit_instance['temp_dir'] = temp_dir 409 unit_instance['control_sock'] = temp_dir + '/control.unit.sock' 410 unit_instance['unitd'] = unitd 411 412 option.temp_dir = temp_dir 413 414 with open(temp_dir + '/unit.pid', 'r') as f: 415 unit_instance['pid'] = f.read().rstrip() 416 417 if state_dir is None: 418 _clear_conf(unit_instance['temp_dir'] + '/control.unit.sock') 419 420 _fds_info['main']['fds'] = _count_fds(unit_instance['pid']) 421 422 router = _fds_info['router'] 423 router['pid'] = pid_by_name(router['name']) 424 router['fds'] = _count_fds(router['pid']) 425 426 controller = _fds_info['controller'] 427 controller['pid'] = pid_by_name(controller['name']) 428 controller['fds'] = _count_fds(controller['pid']) 429 430 return unit_instance 431 432 433def unit_stop(): 434 if not option.restart: 435 if inspect.stack()[1].function.startswith('test_'): 436 pytest.skip('no restart mode') 437 438 return 439 440 # check zombies 441 442 out = subprocess.check_output( 443 ['ps', 'ax', '-o', 'state', '-o', 'ppid'] 444 ).decode() 445 z_ppids = re.findall(r'Z\s*(\d+)', out) 446 assert unit_instance['pid'] not in z_ppids, 'no zombies' 447 448 # terminate unit 449 450 p = unit_instance['process'] 451 452 if p.poll() is not None: 453 return 454 455 p.send_signal(signal.SIGQUIT) 456 457 try: 458 retcode = p.wait(15) 459 if retcode: 460 return 'Child process terminated with code ' + str(retcode) 461 462 except KeyboardInterrupt: 463 p.kill() 464 raise 465 466 except: 467 p.kill() 468 return 'Could not terminate unit' 469 470 471@print_log_on_assert 472def _check_alerts(*, log=None): 473 if log is None: 474 with Log.open(encoding='utf-8') as f: 475 log = f.read() 476 477 found = False 478 479 alerts = re.findall(r'.+\[alert\].+', log) 480 481 if alerts: 482 print('\nAll alerts/sanitizer errors found in log:') 483 [print(alert) for alert in alerts] 484 found = True 485 486 if option.skip_alerts: 487 for skip in option.skip_alerts: 488 alerts = [al for al in alerts if re.search(skip, al) is None] 489 490 assert not alerts, 'alert(s)' 491 492 if not option.skip_sanitizer: 493 sanitizer_errors = re.findall('.+Sanitizer.+', log) 494 495 assert not sanitizer_errors, 'sanitizer error(s)' 496 497 if found: 498 print('skipped.') 499 500 501def _print_log(log=None): 502 path = Log.get_path() 503 504 print('Path to unit.log:\n' + path + '\n') 505 506 if option.print_log: 507 os.set_blocking(sys.stdout.fileno(), True) 508 sys.stdout.flush() 509 510 if log is None: 511 with open(path, 'r', encoding='utf-8', errors='ignore') as f: 512 shutil.copyfileobj(f, sys.stdout) 513 else: 514 sys.stdout.write(log) 515 516 517@print_log_on_assert 518def _clear_conf(sock, *, log=None): 519 resp = http.put( 520 url='/config', 521 sock_type='unix', 522 addr=sock, 523 body=json.dumps({"listeners": {}, "applications": {}}), 524 )['body'] 525 526 assert 'success' in resp, 'clear conf' 527 528 if 'openssl' not in option.available['modules']: 529 return 530 531 try: 532 certs = json.loads( 533 http.get(url='/certificates', sock_type='unix', addr=sock)['body'] 534 ).keys() 535 536 except json.JSONDecodeError: 537 pytest.fail('Can\'t parse certificates list.') 538 539 for cert in certs: 540 resp = http.delete( 541 url='/certificates/' + cert, 542 sock_type='unix', 543 addr=sock, 544 )['body'] 545 546 assert 'success' in resp, 'remove certificate' 547 548 549def _check_processes(): 550 router_pid = _fds_info['router']['pid'] 551 controller_pid = _fds_info['controller']['pid'] 552 unit_pid = unit_instance['pid'] 553 554 for i in range(600): 555 out = ( 556 subprocess.check_output( 557 ['ps', '-ax', '-o', 'pid', '-o', 'ppid', '-o', 'command'] 558 ) 559 .decode() 560 .splitlines() 561 ) 562 out = [l for l in out if unit_pid in l] 563 564 if len(out) <= 3: 565 break 566 567 time.sleep(0.1) 568 569 assert len(out) == 3, 'main, router, and controller expected' 570 571 out = [l for l in out if 'unit: main' not in l] 572 assert len(out) == 2, 'one main' 573 574 out = [ 575 l 576 for l in out 577 if re.search(router_pid + r'\s+' + unit_pid + r'.*unit: router', l) 578 is None 579 ] 580 assert len(out) == 1, 'one router' 581 582 out = [ 583 l 584 for l in out 585 if re.search( 586 controller_pid + r'\s+' + unit_pid + r'.*unit: controller', l 587 ) 588 is None 589 ] 590 assert len(out) == 0, 'one controller' 591 592 593@print_log_on_assert 594def _check_fds(*, log=None): 595 def waitforfds(diff): 596 for i in range(600): 597 fds_diff = diff() 598 599 if fds_diff <= option.fds_threshold: 600 break 601 602 time.sleep(0.1) 603 604 return fds_diff 605 606 ps = _fds_info['main'] 607 if not ps['skip']: 608 fds_diff = waitforfds( 609 lambda: _count_fds(unit_instance['pid']) - ps['fds'] 610 ) 611 ps['fds'] += fds_diff 612 613 assert fds_diff <= option.fds_threshold, 'descriptors leak main process' 614 615 else: 616 ps['fds'] = _count_fds(unit_instance['pid']) 617 618 for name in ['controller', 'router']: 619 ps = _fds_info[name] 620 ps_pid = ps['pid'] 621 ps['pid'] = pid_by_name(ps['name']) 622 623 if not ps['skip']: 624 fds_diff = waitforfds(lambda: _count_fds(ps['pid']) - ps['fds']) 625 ps['fds'] += fds_diff 626 627 if not option.restart: 628 assert ps['pid'] == ps_pid, 'same pid %s' % name 629 630 assert fds_diff <= option.fds_threshold, ( 631 'descriptors leak %s' % name 632 ) 633 634 else: 635 ps['fds'] = _count_fds(ps['pid']) 636 637 638def _count_fds(pid): 639 procfile = '/proc/%s/fd' % pid 640 if os.path.isdir(procfile): 641 return len(os.listdir(procfile)) 642 643 try: 644 out = subprocess.check_output( 645 ['procstat', '-f', pid], 646 stderr=subprocess.STDOUT, 647 ).decode() 648 return len(out.splitlines()) 649 650 except (FileNotFoundError, TypeError, subprocess.CalledProcessError): 651 pass 652 653 try: 654 out = subprocess.check_output( 655 ['lsof', '-n', '-p', pid], 656 stderr=subprocess.STDOUT, 657 ).decode() 658 return len(out.splitlines()) 659 660 except (FileNotFoundError, TypeError, subprocess.CalledProcessError): 661 pass 662 663 return 0 664 665 666def run_process(target, *args): 667 global _processes 668 669 process = Process(target=target, args=args) 670 process.start() 671 672 _processes.append(process) 673 674 675def stop_processes(): 676 if not _processes: 677 return 678 679 fail = False 680 for process in _processes: 681 if process.is_alive(): 682 process.terminate() 683 process.join(timeout=15) 684 685 if process.is_alive(): 686 fail = True 687 688 if fail: 689 return 'Fail to stop process(es)' 690 691 692def pid_by_name(name): 693 output = subprocess.check_output(['ps', 'ax', '-O', 'ppid']).decode() 694 m = re.search( 695 r'\s*(\d+)\s*' + str(unit_instance['pid']) + r'.*' + name, output 696 ) 697 return None if m is None else m.group(1) 698 699 700def find_proc(name, ps_output): 701 return re.findall(str(unit_instance['pid']) + r'.*' + name, ps_output) 702 703 704@pytest.fixture() 705def skip_alert(): 706 def _skip(*alerts): 707 option.skip_alerts.extend(alerts) 708 709 return _skip 710 711 712@pytest.fixture() 713def skip_fds_check(): 714 def _skip(main=False, router=False, controller=False): 715 _fds_info['main']['skip'] = main 716 _fds_info['router']['skip'] = router 717 _fds_info['controller']['skip'] = controller 718 719 return _skip 720 721 722@pytest.fixture 723def temp_dir(request): 724 return unit_instance['temp_dir'] 725 726 727@pytest.fixture 728def is_unsafe(request): 729 return request.config.getoption("--unsafe") 730 731 732@pytest.fixture 733def is_su(request): 734 return os.geteuid() == 0 735 736 737@pytest.fixture 738def unit_pid(request): 739 return unit_instance['process'].pid 740 741 742def pytest_sessionfinish(session): 743 if not option.restart and option.save_log: 744 print('Path to unit.log:\n' + Log.get_path() + '\n') 745 746 option.restart = True 747 748 unit_stop() 749 750 public_dir(option.cache_dir) 751 shutil.rmtree(option.cache_dir) 752 753 if not option.save_log and os.path.isdir(option.temp_dir): 754 public_dir(option.temp_dir) 755 shutil.rmtree(option.temp_dir) 756