1import os 2import re 3import sys 4import time 5 6from unit.option import option 7 8UNIT_LOG = 'unit.log' 9 10 11def print_log_on_assert(func): 12 def inner_function(*args, **kwargs): 13 try: 14 func(*args, **kwargs) 15 except AssertionError as exception: 16 Log.print_log(*args, **kwargs) 17 raise exception 18 19 return inner_function 20 21 22class Log: 23 pos = {} 24 25 @staticmethod 26 @print_log_on_assert 27 def check_alerts(log=None): 28 if log is None: 29 log = Log.read() 30 31 found = False 32 alerts = re.findall(r'.+\[alert\].+', log) 33 34 if alerts: 35 found = True 36 37 if option.detailed: 38 print('\nAll alerts/sanitizer errors found in log:') 39 _ = [print(alert) for alert in alerts] 40 41 if option.skip_alerts: 42 for skip in option.skip_alerts: 43 alerts = [al for al in alerts if re.search(skip, al) is None] 44 45 assert not alerts, 'alert(s)' 46 47 if not option.skip_sanitizer: 48 sanitizer_errors = re.findall('.+Sanitizer.+', log) 49 50 assert not sanitizer_errors, 'sanitizer error(s)' 51 52 if found and option.detailed: 53 print('skipped.') 54 55 @staticmethod 56 def findall(pattern, name=UNIT_LOG, flags=re.M): 57 return re.findall(pattern, Log.read(name), flags) 58 59 @staticmethod 60 def get_path(name=UNIT_LOG): 61 return f'{option.temp_dir}/{name}' 62 63 @staticmethod 64 def open(name=UNIT_LOG, encoding='utf-8'): 65 file = open(Log.get_path(name), 'r', encoding=encoding, errors='ignore') 66 file.seek(Log.pos.get(name, 0)) 67 68 return file 69 70 @staticmethod 71 def print_log(log=None): 72 Log.print_path() 73 74 if option.print_log: 75 os.set_blocking(sys.stdout.fileno(), True) 76 sys.stdout.flush() 77 78 if log is None: 79 log = Log.read() 80 81 sys.stdout.write(log) 82 83 @staticmethod 84 def print_path(): 85 print(f'Path to {UNIT_LOG}:\n{Log.get_path()}\n') 86 87 @staticmethod 88 def read(*args, **kwargs): 89 with Log.open(*args, **kwargs) as file: 90 return file.read() 91 92 @staticmethod 93 def set_pos(pos, name=UNIT_LOG): 94 Log.pos[name] = pos 95 96 @staticmethod 97 def swap(name): 98 pos = Log.pos.get(UNIT_LOG, 0) 99 Log.pos[UNIT_LOG] = Log.pos.get(name, 0) 100 Log.pos[name] = pos 101 102 @staticmethod 103 def wait_for_record(pattern, name=UNIT_LOG, wait=150, flags=re.M): 104 with Log.open(name) as file: 105 for _ in range(wait): 106 found = re.search(pattern, file.read(), flags) 107 108 if found is not None: 109 break 110 111 time.sleep(0.1) 112 113 return found 114