test_respawn.py (1736:22db875fda34) test_respawn.py (1769:b7cd1517081e)
1import re
2import subprocess
3import time
4
5from unit.applications.lang.python import TestApplicationPython
6from unit.option import option
7
8

--- 7 unchanged lines hidden (view full) ---

16 self.app_name = "app-" + option.temp_dir.split('/')[-1]
17
18 self.load('empty', self.app_name)
19
20 assert 'success' in self.conf(
21 '1', 'applications/' + self.app_name + '/processes'
22 )
23
1import re
2import subprocess
3import time
4
5from unit.applications.lang.python import TestApplicationPython
6from unit.option import option
7
8

--- 7 unchanged lines hidden (view full) ---

16 self.app_name = "app-" + option.temp_dir.split('/')[-1]
17
18 self.load('empty', self.app_name)
19
20 assert 'success' in self.conf(
21 '1', 'applications/' + self.app_name + '/processes'
22 )
23
24 def pid_by_name(self, name):
25 output = subprocess.check_output(['ps', 'ax']).decode()
26 m = re.search(r'\s*(\d+).*' + name, output)
27 return m if m is None else m.group(1)
24 def pid_by_name(self, name, ppid):
25 output = subprocess.check_output(['ps', 'ax', '-O', 'ppid']).decode()
26 m = re.search(r'\s*(\d+)\s*' + str(ppid) + r'.*' + name, output)
27 return None if m is None else m.group(1)
28
29 def kill_pids(self, *pids):
30 subprocess.call(['kill', '-9'] + list(pids))
31
28
29 def kill_pids(self, *pids):
30 subprocess.call(['kill', '-9'] + list(pids))
31
32 def wait_for_process(self, process):
32 def wait_for_process(self, process, unit_pid):
33 for i in range(50):
33 for i in range(50):
34 found = self.pid_by_name(process)
34 found = self.pid_by_name(process, unit_pid)
35
36 if found is not None:
37 break
38
39 time.sleep(0.1)
40
41 return found
42
35
36 if found is not None:
37 break
38
39 time.sleep(0.1)
40
41 return found
42
43 def smoke_test(self):
43 def find_proc(self, name, ppid, ps_output):
44 return re.findall(str(ppid) + r'.*' + name, ps_output)
45
46 def smoke_test(self, unit_pid):
44 for _ in range(5):
45 assert 'success' in self.conf(
46 '1', 'applications/' + self.app_name + '/processes'
47 )
48 assert self.get()['status'] == 200
49
50 # Check if the only one router, controller,
51 # and application processes running.
52
47 for _ in range(5):
48 assert 'success' in self.conf(
49 '1', 'applications/' + self.app_name + '/processes'
50 )
51 assert self.get()['status'] == 200
52
53 # Check if the only one router, controller,
54 # and application processes running.
55
53 output = subprocess.check_output(['ps', 'ax']).decode()
54 assert len(re.findall(self.PATTERN_ROUTER, output)) == 1
55 assert len(re.findall(self.PATTERN_CONTROLLER, output)) == 1
56 assert len(re.findall(self.app_name, output)) == 1
56 out = subprocess.check_output(['ps', 'ax', '-O', 'ppid']).decode()
57 assert len(self.find_proc(self.PATTERN_ROUTER, unit_pid, out)) == 1
58 assert len(self.find_proc(self.PATTERN_CONTROLLER, unit_pid, out)) == 1
59 assert len(self.find_proc(self.app_name, unit_pid, out)) == 1
57
60
58 def test_respawn_router(self, skip_alert):
59 pid = self.pid_by_name(self.PATTERN_ROUTER)
61 def test_respawn_router(self, skip_alert, unit_pid):
62 pid = self.pid_by_name(self.PATTERN_ROUTER, unit_pid)
60
61 self.kill_pids(pid)
62 skip_alert(r'process %s exited on signal 9' % pid)
63
63
64 self.kill_pids(pid)
65 skip_alert(r'process %s exited on signal 9' % pid)
66
64 assert self.wait_for_process(self.PATTERN_ROUTER) is not None
67 assert self.wait_for_process(self.PATTERN_ROUTER, unit_pid) is not None
65
68
66 self.smoke_test()
69 self.smoke_test(unit_pid)
67
70
68 def test_respawn_controller(self, skip_alert):
69 pid = self.pid_by_name(self.PATTERN_CONTROLLER)
71 def test_respawn_controller(self, skip_alert, unit_pid):
72 pid = self.pid_by_name(self.PATTERN_CONTROLLER, unit_pid)
70
71 self.kill_pids(pid)
72 skip_alert(r'process %s exited on signal 9' % pid)
73
73
74 self.kill_pids(pid)
75 skip_alert(r'process %s exited on signal 9' % pid)
76
74 assert self.wait_for_process(self.PATTERN_CONTROLLER) is not None
77 assert self.wait_for_process(
78 self.PATTERN_CONTROLLER, unit_pid
79 ) is not None
75
76 assert self.get()['status'] == 200
77
80
81 assert self.get()['status'] == 200
82
78 self.smoke_test()
83 self.smoke_test(unit_pid)
79
84
80 def test_respawn_application(self, skip_alert):
81 pid = self.pid_by_name(self.app_name)
85 def test_respawn_application(self, skip_alert, unit_pid):
86 pid = self.pid_by_name(self.app_name, unit_pid)
82
83 self.kill_pids(pid)
84 skip_alert(r'process %s exited on signal 9' % pid)
85
87
88 self.kill_pids(pid)
89 skip_alert(r'process %s exited on signal 9' % pid)
90
86 assert self.wait_for_process(self.app_name) is not None
91 assert self.wait_for_process(self.app_name, unit_pid) is not None
87
92
88 self.smoke_test()
93 self.smoke_test(unit_pid)