xref: /unit/test/unit/log.py (revision 2488:55440e02a873)
1import os
2import re
3import sys
4import time
5
6from unit.option import option
7
8UNIT_LOG = 'unit.log'
9
10
11def print_log_on_assert(func):
12    def inner_function(*args, **kwargs):
13        try:
14            func(*args, **kwargs)
15        except AssertionError as exception:
16            Log.print_log(*args, **kwargs)
17            raise exception
18
19    return inner_function
20
21
22class Log:
23    pos = {}
24
25    @staticmethod
26    @print_log_on_assert
27    def check_alerts(log=None):
28        if log is None:
29            log = Log.read()
30
31        found = False
32        alerts = re.findall(r'.+\[alert\].+', log)
33
34        if alerts:
35            found = True
36
37            if option.detailed:
38                print('\nAll alerts/sanitizer errors found in log:')
39                _ = [print(alert) for alert in alerts]
40
41        if option.skip_alerts:
42            for skip in option.skip_alerts:
43                alerts = [al for al in alerts if re.search(skip, al) is None]
44
45        assert not alerts, 'alert(s)'
46
47        if not option.skip_sanitizer:
48            sanitizer_errors = re.findall('.+Sanitizer.+', log)
49
50            assert not sanitizer_errors, 'sanitizer error(s)'
51
52        if found and option.detailed:
53            print('skipped.')
54
55    @staticmethod
56    def findall(pattern, name=UNIT_LOG, flags=re.M):
57        return re.findall(pattern, Log.read(name), flags)
58
59    @staticmethod
60    def get_path(name=UNIT_LOG):
61        return f'{option.temp_dir}/{name}'
62
63    @staticmethod
64    def open(name=UNIT_LOG, encoding='utf-8'):
65        file = open(Log.get_path(name), 'r', encoding=encoding, errors='ignore')
66        file.seek(Log.pos.get(name, 0))
67
68        return file
69
70    @staticmethod
71    def print_log(log=None):
72        Log.print_path()
73
74        if option.print_log:
75            os.set_blocking(sys.stdout.fileno(), True)
76            sys.stdout.flush()
77
78            if log is None:
79                log = Log.read()
80
81            sys.stdout.write(log)
82
83    @staticmethod
84    def print_path():
85        print(f'Path to {UNIT_LOG}:\n{Log.get_path()}\n')
86
87    @staticmethod
88    def read(*args, **kwargs):
89        with Log.open(*args, **kwargs) as file:
90            return file.read()
91
92    @staticmethod
93    def set_pos(pos, name=UNIT_LOG):
94        Log.pos[name] = pos
95
96    @staticmethod
97    def swap(name):
98        pos = Log.pos.get(UNIT_LOG, 0)
99        Log.pos[UNIT_LOG] = Log.pos.get(name, 0)
100        Log.pos[name] = pos
101
102    @staticmethod
103    def wait_for_record(pattern, name=UNIT_LOG, wait=150, flags=re.M):
104        with Log.open(name) as file:
105            for _ in range(wait):
106                found = re.search(pattern, file.read(), flags)
107
108                if found is not None:
109                    break
110
111                time.sleep(0.1)
112
113        return found
114