Deleted
Added
1import time 2import unittest 3import unit 4 5class TestUnitPythonApplication(unit.TestUnitApplicationPython): 6 7 def setUpClass(): 8 unit.TestUnit().check_modules('python') 9 10 def test_python_application_variables(self): 11 self.load('variables') 12 13 body = 'Test body string.' 14 15 resp = self.post(headers={ 16 'Host': 'localhost', 17 'Content-Type': 'text/html', |
18 'Custom-Header': 'blah', 19 'Connection': 'close' |
20 }, body=body) 21 22 self.assertEqual(resp['status'], 200, 'status') 23 headers = resp['headers'] 24 header_server = headers.pop('Server') 25 self.assertRegex(header_server, r'Unit/[\d\.]+', 'server header') 26 self.assertEqual(headers.pop('Server-Software'), header_server, 27 'server software header') 28 29 date = headers.pop('Date') 30 self.assertEqual(date[-4:], ' GMT', 'date header timezone') 31 self.assertLess(abs(self.date_to_sec_epoch(date) - self.sec_epoch()), 5, 32 'date header') 33 34 self.assertDictEqual(headers, { |
35 'Connection': 'close', |
36 'Content-Length': str(len(body)), 37 'Content-Type': 'text/html', 38 'Request-Method': 'POST', 39 'Request-Uri': '/', 40 'Http-Host': 'localhost', 41 'Server-Protocol': 'HTTP/1.1', 42 'Custom-Header': 'blah', 43 'Wsgi-Version': '(1, 0)', --- 43 unchanged lines hidden (view full) --- 87 88 self.assertNotIn('Transfer-Encoding', self.get()['headers'], 89 '204 header transfer encoding') 90 91 def test_python_application_ctx_iter_atexit(self): 92 self.load('ctx_iter_atexit') 93 94 resp = self.post(headers={ |
95 'Host': 'localhost', |
96 'Connection': 'close', |
97 'Content-Type': 'text/html' |
98 }, body='0123456789') 99 100 self.assertEqual(resp['status'], 200, 'ctx iter status') 101 self.assertEqual(resp['body'], '0123456789', 'ctx iter body') 102 103 self.conf({ 104 "listeners": {}, 105 "applications": {} --- 5 unchanged lines hidden (view full) --- 111 112 self.assertIsNotNone(self.search_in_log(r'RuntimeError'), 113 'ctx iter atexit') 114 115 def test_python_keepalive_body(self): 116 self.load('mirror') 117 118 (resp, sock) = self.post(headers={ |
119 'Host': 'localhost', |
120 'Connection': 'keep-alive', |
121 'Content-Type': 'text/html' |
122 }, start=True, body='0123456789' * 500) 123 124 self.assertEqual(resp['body'], '0123456789' * 500, 'keep-alive 1') 125 126 resp = self.post(headers={ |
127 'Host': 'localhost', |
128 'Connection': 'close', |
129 'Content-Type': 'text/html' |
130 }, sock=sock, body='0123456789') 131 132 self.assertEqual(resp['body'], '0123456789', 'keep-alive 2') 133 134 def test_python_keepalive_reconfigure(self): 135 self.load('mirror') 136 137 body = '0123456789' 138 conns = 3 139 socks = [] 140 141 for i in range(conns): 142 (resp, sock) = self.post(headers={ |
143 'Host': 'localhost', |
144 'Connection': 'keep-alive', |
145 'Content-Type': 'text/html' |
146 }, start=True, body=body) 147 148 self.assertEqual(resp['body'], body, 'keep-alive open') 149 self.assertIn('success', self.conf({ 150 "spare": i % 4, 151 "max": (i % 4) + 1 152 }, 'applications/mirror/processes'), 'reconfigure') 153 154 socks.append(sock) 155 156 for i in range(conns): 157 (resp, sock) = self.post(headers={ |
158 'Host': 'localhost', |
159 'Connection': 'keep-alive', |
160 'Content-Type': 'text/html' |
161 }, start=True, sock=socks[i], body=body) 162 163 self.assertEqual(resp['body'], body, 'keep-alive request') 164 self.assertIn('success', self.conf({ 165 "spare": i % 4, 166 "max": (i % 4) + 1 167 }, 'applications/mirror/processes'), 'reconfigure 2') 168 169 for i in range(conns): 170 resp = self.post(headers={ |
171 'Host': 'localhost', |
172 'Connection': 'close', |
173 'Content-Type': 'text/html' |
174 }, sock=socks[i], body=body) 175 176 self.assertEqual(resp['body'], body, 'keep-alive close') 177 self.assertIn('success', self.conf({ 178 "spare": i % 4, 179 "max": (i % 4) + 1 180 }, 'applications/mirror/processes'), 'reconfigure 3') 181 182 def test_python_keepalive_reconfigure_2(self): 183 self.load('mirror') 184 185 body = '0123456789' 186 187 (resp, sock) = self.post(headers={ |
188 'Host': 'localhost', |
189 'Connection': 'keep-alive', |
190 'Content-Type': 'text/html' |
191 }, start=True, body=body) 192 193 self.assertEqual(resp['body'], body, 'reconfigure 2 keep-alive 1') 194 195 self.load('empty') 196 197 (resp, sock) = self.post(headers={ |
198 'Host': 'localhost', |
199 'Connection': 'close', |
200 'Content-Type': 'text/html' |
201 }, start=True, sock=sock, body=body) 202 203 self.assertEqual(resp['status'], 200, 'reconfigure 2 keep-alive 2') 204 self.assertEqual(resp['body'], '', 'reconfigure 2 keep-alive 2 body') 205 206 self.assertIn('success', self.conf({ 207 "listeners": {}, 208 "applications": {} --- 160 unchanged lines hidden --- |