1import time
2import unittest
3import unit
4
5class TestUnitPythonApplication(unit.TestUnitApplicationPython):
6
7 def setUpClass():
8 unit.TestUnit().check_modules('python')
9
10 def test_python_application_variables(self):
11 self.load('variables')
12
13 body = 'Test body string.'
14
15 resp = self.post(headers={
16 'Host': 'localhost',
17 'Content-Type': 'text/html',
18 'Custom-Header': 'blah'
18 'Custom-Header': 'blah',
19 'Connection': 'close'
20 }, body=body)
21
22 self.assertEqual(resp['status'], 200, 'status')
23 headers = resp['headers']
24 header_server = headers.pop('Server')
25 self.assertRegex(header_server, r'Unit/[\d\.]+', 'server header')
26 self.assertEqual(headers.pop('Server-Software'), header_server,
27 'server software header')
28
29 date = headers.pop('Date')
30 self.assertEqual(date[-4:], ' GMT', 'date header timezone')
31 self.assertLess(abs(self.date_to_sec_epoch(date) - self.sec_epoch()), 5,
32 'date header')
33
34 self.assertDictEqual(headers, {
35 'Connection': 'close',
36 'Content-Length': str(len(body)),
37 'Content-Type': 'text/html',
38 'Request-Method': 'POST',
39 'Request-Uri': '/',
40 'Http-Host': 'localhost',
41 'Server-Protocol': 'HTTP/1.1',
42 'Custom-Header': 'blah',
43 'Wsgi-Version': '(1, 0)',

--- 43 unchanged lines hidden (view full) ---

87
88 self.assertNotIn('Transfer-Encoding', self.get()['headers'],
89 '204 header transfer encoding')
90
91 def test_python_application_ctx_iter_atexit(self):
92 self.load('ctx_iter_atexit')
93
94 resp = self.post(headers={
95 'Host': 'localhost',
96 'Connection': 'close',
94 'Content-Type': 'text/html',
95 'Host': 'localhost'
97 'Content-Type': 'text/html'
98 }, body='0123456789')
99
100 self.assertEqual(resp['status'], 200, 'ctx iter status')
101 self.assertEqual(resp['body'], '0123456789', 'ctx iter body')
102
103 self.conf({
104 "listeners": {},
105 "applications": {}

--- 5 unchanged lines hidden (view full) ---

111
112 self.assertIsNotNone(self.search_in_log(r'RuntimeError'),
113 'ctx iter atexit')
114
115 def test_python_keepalive_body(self):
116 self.load('mirror')
117
118 (resp, sock) = self.post(headers={
119 'Host': 'localhost',
120 'Connection': 'keep-alive',
118 'Content-Type': 'text/html',
119 'Host': 'localhost'
121 'Content-Type': 'text/html'
122 }, start=True, body='0123456789' * 500)
123
124 self.assertEqual(resp['body'], '0123456789' * 500, 'keep-alive 1')
125
126 resp = self.post(headers={
127 'Host': 'localhost',
128 'Connection': 'close',
126 'Content-Type': 'text/html',
127 'Host': 'localhost'
129 'Content-Type': 'text/html'
130 }, sock=sock, body='0123456789')
131
132 self.assertEqual(resp['body'], '0123456789', 'keep-alive 2')
133
134 def test_python_keepalive_reconfigure(self):
135 self.load('mirror')
136
137 body = '0123456789'
138 conns = 3
139 socks = []
140
141 for i in range(conns):
142 (resp, sock) = self.post(headers={
143 'Host': 'localhost',
144 'Connection': 'keep-alive',
142 'Content-Type': 'text/html',
143 'Host': 'localhost'
145 'Content-Type': 'text/html'
146 }, start=True, body=body)
147
148 self.assertEqual(resp['body'], body, 'keep-alive open')
149 self.assertIn('success', self.conf({
150 "spare": i % 4,
151 "max": (i % 4) + 1
152 }, 'applications/mirror/processes'), 'reconfigure')
153
154 socks.append(sock)
155
156 for i in range(conns):
157 (resp, sock) = self.post(headers={
158 'Host': 'localhost',
159 'Connection': 'keep-alive',
157 'Content-Type': 'text/html',
158 'Host': 'localhost'
160 'Content-Type': 'text/html'
161 }, start=True, sock=socks[i], body=body)
162
163 self.assertEqual(resp['body'], body, 'keep-alive request')
164 self.assertIn('success', self.conf({
165 "spare": i % 4,
166 "max": (i % 4) + 1
167 }, 'applications/mirror/processes'), 'reconfigure 2')
168
169 for i in range(conns):
170 resp = self.post(headers={
171 'Host': 'localhost',
172 'Connection': 'close',
170 'Content-Type': 'text/html',
171 'Host': 'localhost'
173 'Content-Type': 'text/html'
174 }, sock=socks[i], body=body)
175
176 self.assertEqual(resp['body'], body, 'keep-alive close')
177 self.assertIn('success', self.conf({
178 "spare": i % 4,
179 "max": (i % 4) + 1
180 }, 'applications/mirror/processes'), 'reconfigure 3')
181
182 def test_python_keepalive_reconfigure_2(self):
183 self.load('mirror')
184
185 body = '0123456789'
186
187 (resp, sock) = self.post(headers={
188 'Host': 'localhost',
189 'Connection': 'keep-alive',
187 'Content-Type': 'text/html',
188 'Host': 'localhost'
190 'Content-Type': 'text/html'
191 }, start=True, body=body)
192
193 self.assertEqual(resp['body'], body, 'reconfigure 2 keep-alive 1')
194
195 self.load('empty')
196
197 (resp, sock) = self.post(headers={
198 'Host': 'localhost',
199 'Connection': 'close',
197 'Content-Type': 'text/html',
198 'Host': 'localhost'
200 'Content-Type': 'text/html'
201 }, start=True, sock=sock, body=body)
202
203 self.assertEqual(resp['status'], 200, 'reconfigure 2 keep-alive 2')
204 self.assertEqual(resp['body'], '', 'reconfigure 2 keep-alive 2 body')
205
206 self.assertIn('success', self.conf({
207 "listeners": {},
208 "applications": {}

--- 160 unchanged lines hidden ---