test_python_application.py (1400:cbd75efbfb74) test_python_application.py (1453:71af60a59338)
1import re
2import os
3import grp
4import pwd
5import time
6import unittest
7from unit.applications.lang.python import TestApplicationPython
8

--- 173 unchanged lines hidden (view full) ---

182 self.wait_for_record(r'RuntimeError'), 'ctx iter atexit'
183 )
184
185 def test_python_keepalive_body(self):
186 self.load('mirror')
187
188 self.assertEqual(self.get()['status'], 200, 'init')
189
1import re
2import os
3import grp
4import pwd
5import time
6import unittest
7from unit.applications.lang.python import TestApplicationPython
8

--- 173 unchanged lines hidden (view full) ---

182 self.wait_for_record(r'RuntimeError'), 'ctx iter atexit'
183 )
184
185 def test_python_keepalive_body(self):
186 self.load('mirror')
187
188 self.assertEqual(self.get()['status'], 200, 'init')
189
190 body = '0123456789' * 500
190 (resp, sock) = self.post(
191 headers={
192 'Host': 'localhost',
193 'Connection': 'keep-alive',
194 'Content-Type': 'text/html',
195 },
196 start=True,
191 (resp, sock) = self.post(
192 headers={
193 'Host': 'localhost',
194 'Connection': 'keep-alive',
195 'Content-Type': 'text/html',
196 },
197 start=True,
197 body='0123456789' * 500,
198 body=body,
198 read_timeout=1,
199 )
200
199 read_timeout=1,
200 )
201
201 self.assertEqual(resp['body'], '0123456789' * 500, 'keep-alive 1')
202 self.assertEqual(resp['body'], body, 'keep-alive 1')
202
203
204 body = '0123456789'
203 resp = self.post(
204 headers={
205 'Host': 'localhost',
206 'Connection': 'close',
207 'Content-Type': 'text/html',
208 },
209 sock=sock,
205 resp = self.post(
206 headers={
207 'Host': 'localhost',
208 'Connection': 'close',
209 'Content-Type': 'text/html',
210 },
211 sock=sock,
210 body='0123456789',
212 body=body,
211 )
212
213 )
214
213 self.assertEqual(resp['body'], '0123456789', 'keep-alive 2')
215 self.assertEqual(resp['body'], body, 'keep-alive 2')
214
215 def test_python_keepalive_reconfigure(self):
216 self.skip_alerts.extend(
217 [
218 r'pthread_mutex.+failed',
219 r'failed to apply',
220 r'process \d+ exited on signal',
221 ]

--- 113 unchanged lines hidden (view full) ---

335
336 self.assertEqual(resp, {}, 'reconfigure 2 keep-alive 3')
337
338 def test_python_keepalive_reconfigure_3(self):
339 self.load('empty')
340
341 self.assertEqual(self.get()['status'], 200, 'init')
342
216
217 def test_python_keepalive_reconfigure(self):
218 self.skip_alerts.extend(
219 [
220 r'pthread_mutex.+failed',
221 r'failed to apply',
222 r'process \d+ exited on signal',
223 ]

--- 113 unchanged lines hidden (view full) ---

337
338 self.assertEqual(resp, {}, 'reconfigure 2 keep-alive 3')
339
340 def test_python_keepalive_reconfigure_3(self):
341 self.load('empty')
342
343 self.assertEqual(self.get()['status'], 200, 'init')
344
343 (resp, sock) = self.http(
345 (_, sock) = self.http(
344 b"""GET / HTTP/1.1
345""",
346 start=True,
347 raw=True,
346 b"""GET / HTTP/1.1
347""",
348 start=True,
349 raw=True,
348 read_timeout=5,
350 no_recv=True,
349 )
350
351 )
352
353 self.assertEqual(self.get()['status'], 200)
354
351 self.assertIn(
352 'success',
353 self.conf({"listeners": {}, "applications": {}}),
354 'reconfigure 3 clear configuration',
355 )
356
357 resp = self.http(
358 b"""Host: localhost

--- 466 unchanged lines hidden ---
355 self.assertIn(
356 'success',
357 self.conf({"listeners": {}, "applications": {}}),
358 'reconfigure 3 clear configuration',
359 )
360
361 resp = self.http(
362 b"""Host: localhost

--- 466 unchanged lines hidden ---