1import re 2import subprocess 3import time 4 5from unit.applications.lang.python import TestApplicationPython 6from unit.option import option 7 8 9class TestRespawn(TestApplicationPython): 10 prerequisites = {'modules': {'python': 'any'}} 11 12 PATTERN_ROUTER = 'unit: router' 13 PATTERN_CONTROLLER = 'unit: controller' 14 15 def setup_method(self): 16 self.app_name = "app-" + option.temp_dir.split('/')[-1] 17 18 self.load('empty', self.app_name) 19 20 assert 'success' in self.conf( 21 '1', 'applications/' + self.app_name + '/processes' 22 ) 23
| 1import re 2import subprocess 3import time 4 5from unit.applications.lang.python import TestApplicationPython 6from unit.option import option 7 8 9class TestRespawn(TestApplicationPython): 10 prerequisites = {'modules': {'python': 'any'}} 11 12 PATTERN_ROUTER = 'unit: router' 13 PATTERN_CONTROLLER = 'unit: controller' 14 15 def setup_method(self): 16 self.app_name = "app-" + option.temp_dir.split('/')[-1] 17 18 self.load('empty', self.app_name) 19 20 assert 'success' in self.conf( 21 '1', 'applications/' + self.app_name + '/processes' 22 ) 23
|
24 def pid_by_name(self, name): 25 output = subprocess.check_output(['ps', 'ax']).decode() 26 m = re.search(r'\s*(\d+).*' + name, output) 27 return m if m is None else m.group(1)
| 24 def pid_by_name(self, name, ppid): 25 output = subprocess.check_output(['ps', 'ax', '-O', 'ppid']).decode() 26 m = re.search(r'\s*(\d+)\s*' + str(ppid) + r'.*' + name, output) 27 return None if m is None else m.group(1)
|
28 29 def kill_pids(self, *pids): 30 subprocess.call(['kill', '-9'] + list(pids)) 31
| 28 29 def kill_pids(self, *pids): 30 subprocess.call(['kill', '-9'] + list(pids)) 31
|
32 def wait_for_process(self, process):
| 32 def wait_for_process(self, process, unit_pid):
|
33 for i in range(50):
| 33 for i in range(50):
|
34 found = self.pid_by_name(process)
| 34 found = self.pid_by_name(process, unit_pid)
|
35 36 if found is not None: 37 break 38 39 time.sleep(0.1) 40 41 return found 42
| 35 36 if found is not None: 37 break 38 39 time.sleep(0.1) 40 41 return found 42
|
43 def smoke_test(self):
| 43 def find_proc(self, name, ppid, ps_output): 44 return re.findall(str(ppid) + r'.*' + name, ps_output) 45 46 def smoke_test(self, unit_pid):
|
44 for _ in range(5): 45 assert 'success' in self.conf( 46 '1', 'applications/' + self.app_name + '/processes' 47 ) 48 assert self.get()['status'] == 200 49 50 # Check if the only one router, controller, 51 # and application processes running. 52
| 47 for _ in range(5): 48 assert 'success' in self.conf( 49 '1', 'applications/' + self.app_name + '/processes' 50 ) 51 assert self.get()['status'] == 200 52 53 # Check if the only one router, controller, 54 # and application processes running. 55
|
53 output = subprocess.check_output(['ps', 'ax']).decode() 54 assert len(re.findall(self.PATTERN_ROUTER, output)) == 1 55 assert len(re.findall(self.PATTERN_CONTROLLER, output)) == 1 56 assert len(re.findall(self.app_name, output)) == 1
| 56 out = subprocess.check_output(['ps', 'ax', '-O', 'ppid']).decode() 57 assert len(self.find_proc(self.PATTERN_ROUTER, unit_pid, out)) == 1 58 assert len(self.find_proc(self.PATTERN_CONTROLLER, unit_pid, out)) == 1 59 assert len(self.find_proc(self.app_name, unit_pid, out)) == 1
|
57
| 60
|
58 def test_respawn_router(self, skip_alert): 59 pid = self.pid_by_name(self.PATTERN_ROUTER)
| 61 def test_respawn_router(self, skip_alert, unit_pid): 62 pid = self.pid_by_name(self.PATTERN_ROUTER, unit_pid)
|
60 61 self.kill_pids(pid) 62 skip_alert(r'process %s exited on signal 9' % pid) 63
| 63 64 self.kill_pids(pid) 65 skip_alert(r'process %s exited on signal 9' % pid) 66
|
64 assert self.wait_for_process(self.PATTERN_ROUTER) is not None
| 67 assert self.wait_for_process(self.PATTERN_ROUTER, unit_pid) is not None
|
65
| 68
|
66 self.smoke_test()
| 69 self.smoke_test(unit_pid)
|
67
| 70
|
68 def test_respawn_controller(self, skip_alert): 69 pid = self.pid_by_name(self.PATTERN_CONTROLLER)
| 71 def test_respawn_controller(self, skip_alert, unit_pid): 72 pid = self.pid_by_name(self.PATTERN_CONTROLLER, unit_pid)
|
70 71 self.kill_pids(pid) 72 skip_alert(r'process %s exited on signal 9' % pid) 73
| 73 74 self.kill_pids(pid) 75 skip_alert(r'process %s exited on signal 9' % pid) 76
|
74 assert self.wait_for_process(self.PATTERN_CONTROLLER) is not None
| 77 assert self.wait_for_process( 78 self.PATTERN_CONTROLLER, unit_pid 79 ) is not None
|
75 76 assert self.get()['status'] == 200 77
| 80 81 assert self.get()['status'] == 200 82
|
78 self.smoke_test()
| 83 self.smoke_test(unit_pid)
|
79
| 84
|
80 def test_respawn_application(self, skip_alert): 81 pid = self.pid_by_name(self.app_name)
| 85 def test_respawn_application(self, skip_alert, unit_pid): 86 pid = self.pid_by_name(self.app_name, unit_pid)
|
82 83 self.kill_pids(pid) 84 skip_alert(r'process %s exited on signal 9' % pid) 85
| 87 88 self.kill_pids(pid) 89 skip_alert(r'process %s exited on signal 9' % pid) 90
|
86 assert self.wait_for_process(self.app_name) is not None
| 91 assert self.wait_for_process(self.app_name, unit_pid) is not None
|
87
| 92
|
88 self.smoke_test()
| 93 self.smoke_test(unit_pid)
|
| |