xref: /unit/test/unit/utils.py (revision 1735:a0e0d4f90e51)
1import os
2import socket
3import time
4
5import pytest
6
7
8def public_dir(path):
9    os.chmod(path, 0o777)
10
11    for root, dirs, files in os.walk(path):
12        for d in dirs:
13            os.chmod(os.path.join(root, d), 0o777)
14        for f in files:
15            os.chmod(os.path.join(root, f), 0o777)
16
17
18def waitforfiles(*files):
19    for i in range(50):
20        wait = False
21
22        for f in files:
23            if not os.path.exists(f):
24                wait = True
25                break
26
27        if not wait:
28            return True
29
30        time.sleep(0.1)
31
32    return False
33
34
35def waitforsocket(port):
36    for i in range(50):
37        with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock:
38            try:
39                sock.settimeout(5)
40                sock.connect(('127.0.0.1', port))
41                return
42
43            except ConnectionRefusedError:
44                time.sleep(0.1)
45
46            except KeyboardInterrupt:
47                raise
48
49    pytest.fail('Can\'t connect to the 127.0.0.1:' + port)
50
51