test_tls.py (898:10333717f899) test_tls.py (970:2f4376c8f358)
1import re
2import ssl
3import time
4import subprocess
5import unittest
6import unit
7
8class TestUnitTLS(unit.TestUnitApplicationTLS):
9
10 def setUpClass():
11 unit.TestUnit().check_modules('python', 'openssl')
12
13 def findall(self, pattern):
14 with open(self.testdir + '/unit.log', 'r', errors='ignore') as f:
15 return re.findall(pattern, f.read())
16
17 def wait_for_record(self, pattern):
18 for i in range(50):
19 with open(self.testdir + '/unit.log', 'r', errors='ignore') as f:
20 if re.search(pattern, f.read()) is not None:
21 break
22
23 time.sleep(0.1)
24
25 def openssl_date_to_sec_epoch(self, date):
26 return self.date_to_sec_epoch(date, '%b %d %H:%M:%S %Y %Z')
27
28 def add_tls(self, application='empty', cert='default', port=7080):
29 self.conf({
30 "application": application,
31 "tls": {
32 "certificate": cert
33 }
34 }, 'listeners/*:' + str(port))
35
36 def remove_tls(self, application='empty', port=7080):
37 self.conf({
38 "application": application
39 }, 'listeners/*:' + str(port))
40
41 def test_tls_listener_option_add(self):
42 self.load('empty')
43
44 self.certificate()
45
46 self.add_tls()
47
48 self.assertEqual(self.get_ssl()['status'], 200, 'add listener option')
49
50 def test_tls_listener_option_remove(self):
51 self.load('empty')
52
53 self.certificate()
54
55 self.add_tls()
56
57 self.get_ssl()
58
59 self.remove_tls()
60
61 self.assertEqual(self.get()['status'], 200, 'remove listener option')
62
63 def test_tls_certificate_remove(self):
64 self.load('empty')
65
66 self.certificate()
67
68 self.assertIn('success', self.conf_delete('/certificates/default'),
69 'remove certificate')
70
71 def test_tls_certificate_remove_used(self):
72 self.load('empty')
73
74 self.certificate()
75
76 self.add_tls()
77
78 self.assertIn('error', self.conf_delete('/certificates/default'),
79 'remove certificate')
80
81 def test_tls_certificate_remove_nonexisting(self):
82 self.load('empty')
83
84 self.certificate()
85
86 self.add_tls()
87
88 self.assertIn('error', self.conf_delete('/certificates/blah'),
89 'remove nonexistings certificate')
90
91 @unittest.expectedFailure
92 def test_tls_certificate_update(self):
93 self.load('empty')
94
95 self.certificate()
96
97 self.add_tls()
98
99 cert_old = self.get_server_certificate()
100
101 self.certificate()
102
103 self.assertNotEqual(cert_old, self.get_server_certificate(),
104 'update certificate')
105
106 @unittest.expectedFailure
107 def test_tls_certificate_key_incorrect(self):
108 self.load('empty')
109
110 self.certificate('first', False)
111 self.certificate('second', False)
112
113 self.assertIn('error', self.certificate_load('first', 'second'),
114 'key incorrect')
115
116 def test_tls_certificate_change(self):
117 self.load('empty')
118
119 self.certificate()
120 self.certificate('new')
121
122 self.add_tls()
123
124 cert_old = self.get_server_certificate()
125
126 self.add_tls(cert='new')
127
128 self.assertNotEqual(cert_old, self.get_server_certificate(),
129 'change certificate')
130
131 def test_tls_certificate_key_rsa(self):
132 self.load('empty')
133
134 self.certificate()
135
136 self.assertEqual(self.conf_get('/certificates/default/key'),
137 'RSA (1024 bits)', 'certificate key rsa')
138
139 def test_tls_certificate_key_ec(self):
140 self.load('empty')
141
142 subprocess.call(['openssl', 'ecparam', '-noout', '-genkey',
143 '-out', self.testdir + '/ec.key',
144 '-name', 'prime256v1'])
145
146 subprocess.call(['openssl', 'req', '-x509', '-new',
147 '-config', self.testdir + '/openssl.conf',
148 '-key', self.testdir + '/ec.key', '-subj', '/CN=ec/',
149 '-out', self.testdir + '/ec.crt'])
150
151 self.certificate_load('ec')
152
153 self.assertEqual(self.conf_get('/certificates/ec/key'), 'ECDH',
154 'certificate key ec')
155
156 def test_tls_certificate_chain_options(self):
157 self.load('empty')
158
159 self.certificate()
160
161 chain = self.conf_get('/certificates/default/chain')
162
163 self.assertEqual(len(chain), 1, 'certificate chain length')
164
165 cert = chain[0]
166
167 self.assertEqual(cert['subject']['common_name'], 'default',
168 'certificate subject common name')
169 self.assertEqual(cert['issuer']['common_name'], 'default',
170 'certificate issuer common name')
171
172 self.assertLess(abs(self.sec_epoch() -
173 self.openssl_date_to_sec_epoch(cert['validity']['since'])), 5,
174 'certificate validity since')
175 self.assertEqual(
176 self.openssl_date_to_sec_epoch(cert['validity']['until']) -
177 self.openssl_date_to_sec_epoch(cert['validity']['since']), 2592000,
178 'certificate validity until')
179
180 def test_tls_certificate_chain(self):
181 self.load('empty')
182
183 self.certificate('root', False)
184
185 subprocess.call(['openssl', 'req', '-new', '-config',
186 self.testdir + '/openssl.conf', '-subj', '/CN=int/',
187 '-out', self.testdir + '/int.csr',
188 '-keyout', self.testdir + '/int.key'])
189
190 subprocess.call(['openssl', 'req', '-new', '-config',
191 self.testdir + '/openssl.conf', '-subj', '/CN=end/',
192 '-out', self.testdir + '/end.csr',
193 '-keyout', self.testdir + '/end.key'])
194
195 with open(self.testdir + '/ca.conf', 'w') as f:
196 f.write("""[ ca ]
197default_ca = myca
198
199[ myca ]
200new_certs_dir = %(dir)s
201database = %(database)s
202default_md = sha1
203policy = myca_policy
204serial = %(certserial)s
205default_days = 1
206x509_extensions = myca_extensions
207
208[ myca_policy ]
209commonName = supplied
210
211[ myca_extensions ]
212basicConstraints = critical,CA:TRUE""" % {
213 'dir': self.testdir,
214 'database': self.testdir + '/certindex',
215 'certserial': self.testdir + '/certserial'
216 })
217
218 with open(self.testdir + '/certserial', 'w') as f:
219 f.write('1000')
220
221 with open(self.testdir + '/certindex', 'w') as f:
222 f.write('')
223
224 subprocess.call(['openssl', 'ca', '-batch',
225 '-config', self.testdir + '/ca.conf',
226 '-keyfile', self.testdir + '/root.key',
227 '-cert', self.testdir + '/root.crt',
228 '-subj', '/CN=int/',
229 '-in', self.testdir + '/int.csr',
230 '-out', self.testdir + '/int.crt'])
231
232 subprocess.call(['openssl', 'ca', '-batch',
233 '-config', self.testdir + '/ca.conf',
234 '-keyfile', self.testdir + '/int.key',
235 '-cert', self.testdir + '/int.crt',
236 '-subj', '/CN=end/',
237 '-in', self.testdir + '/end.csr',
238 '-out', self.testdir + '/end.crt'])
239
240 with open(self.testdir + '/end-int.crt', 'wb') as crt, \
241 open(self.testdir + '/end.crt', 'rb') as end, \
242 open(self.testdir + '/int.crt', 'rb') as int:
243 crt.write(end.read() + int.read())
244
245 self.context = ssl.create_default_context()
246 self.context.check_hostname = False
247 self.context.verify_mode = ssl.CERT_REQUIRED
248 self.context.load_verify_locations(self.testdir + '/root.crt')
249
250 # incomplete chain
251
252 self.assertIn('success', self.certificate_load('end', 'end'),
253 'certificate chain end upload')
254
255 chain = self.conf_get('/certificates/end/chain')
256 self.assertEqual(len(chain), 1, 'certificate chain end length')
257 self.assertEqual(chain[0]['subject']['common_name'], 'end',
258 'certificate chain end subject common name')
259 self.assertEqual(chain[0]['issuer']['common_name'], 'int',
260 'certificate chain end issuer common name')
261
262 self.add_tls(cert='end')
263
264 try:
265 resp = self.get_ssl()
266 except ssl.SSLError:
267 resp = None
268
269 self.assertEqual(resp, None, 'certificate chain incomplete chain')
270
271 # intermediate
272
273 self.assertIn('success', self.certificate_load('int', 'int'),
274 'certificate chain int upload')
275
276 chain = self.conf_get('/certificates/int/chain')
277 self.assertEqual(len(chain), 1, 'certificate chain int length')
278 self.assertEqual(chain[0]['subject']['common_name'], 'int',
279 'certificate chain int subject common name')
280 self.assertEqual(chain[0]['issuer']['common_name'], 'root',
281 'certificate chain int issuer common name')
282
283 self.add_tls(cert='int')
284
285 self.assertEqual(self.get_ssl()['status'], 200,
286 'certificate chain intermediate')
287
288 # intermediate server
289
290 self.assertIn('success', self.certificate_load('end-int', 'end'),
291 'certificate chain end-int upload')
292
293 chain = self.conf_get('/certificates/end-int/chain')
294 self.assertEqual(len(chain), 2, 'certificate chain end-int length')
295 self.assertEqual(chain[0]['subject']['common_name'], 'end',
296 'certificate chain end-int int subject common name')
297 self.assertEqual(chain[0]['issuer']['common_name'], 'int',
298 'certificate chain end-int int issuer common name')
299 self.assertEqual(chain[1]['subject']['common_name'], 'int',
300 'certificate chain end-int end subject common name')
301 self.assertEqual(chain[1]['issuer']['common_name'], 'root',
302 'certificate chain end-int end issuer common name')
303
304 self.add_tls(cert='end-int')
305
306 self.assertEqual(self.get_ssl()['status'], 200,
307 'certificate chain intermediate server')
308
309 @unittest.expectedFailure
310 def test_tls_reconfigure(self):
311 self.load('empty')
312
313 self.certificate()
314
315 (resp, sock) = self.get(headers={
1import re
2import ssl
3import time
4import subprocess
5import unittest
6import unit
7
8class TestUnitTLS(unit.TestUnitApplicationTLS):
9
10 def setUpClass():
11 unit.TestUnit().check_modules('python', 'openssl')
12
13 def findall(self, pattern):
14 with open(self.testdir + '/unit.log', 'r', errors='ignore') as f:
15 return re.findall(pattern, f.read())
16
17 def wait_for_record(self, pattern):
18 for i in range(50):
19 with open(self.testdir + '/unit.log', 'r', errors='ignore') as f:
20 if re.search(pattern, f.read()) is not None:
21 break
22
23 time.sleep(0.1)
24
25 def openssl_date_to_sec_epoch(self, date):
26 return self.date_to_sec_epoch(date, '%b %d %H:%M:%S %Y %Z')
27
28 def add_tls(self, application='empty', cert='default', port=7080):
29 self.conf({
30 "application": application,
31 "tls": {
32 "certificate": cert
33 }
34 }, 'listeners/*:' + str(port))
35
36 def remove_tls(self, application='empty', port=7080):
37 self.conf({
38 "application": application
39 }, 'listeners/*:' + str(port))
40
41 def test_tls_listener_option_add(self):
42 self.load('empty')
43
44 self.certificate()
45
46 self.add_tls()
47
48 self.assertEqual(self.get_ssl()['status'], 200, 'add listener option')
49
50 def test_tls_listener_option_remove(self):
51 self.load('empty')
52
53 self.certificate()
54
55 self.add_tls()
56
57 self.get_ssl()
58
59 self.remove_tls()
60
61 self.assertEqual(self.get()['status'], 200, 'remove listener option')
62
63 def test_tls_certificate_remove(self):
64 self.load('empty')
65
66 self.certificate()
67
68 self.assertIn('success', self.conf_delete('/certificates/default'),
69 'remove certificate')
70
71 def test_tls_certificate_remove_used(self):
72 self.load('empty')
73
74 self.certificate()
75
76 self.add_tls()
77
78 self.assertIn('error', self.conf_delete('/certificates/default'),
79 'remove certificate')
80
81 def test_tls_certificate_remove_nonexisting(self):
82 self.load('empty')
83
84 self.certificate()
85
86 self.add_tls()
87
88 self.assertIn('error', self.conf_delete('/certificates/blah'),
89 'remove nonexistings certificate')
90
91 @unittest.expectedFailure
92 def test_tls_certificate_update(self):
93 self.load('empty')
94
95 self.certificate()
96
97 self.add_tls()
98
99 cert_old = self.get_server_certificate()
100
101 self.certificate()
102
103 self.assertNotEqual(cert_old, self.get_server_certificate(),
104 'update certificate')
105
106 @unittest.expectedFailure
107 def test_tls_certificate_key_incorrect(self):
108 self.load('empty')
109
110 self.certificate('first', False)
111 self.certificate('second', False)
112
113 self.assertIn('error', self.certificate_load('first', 'second'),
114 'key incorrect')
115
116 def test_tls_certificate_change(self):
117 self.load('empty')
118
119 self.certificate()
120 self.certificate('new')
121
122 self.add_tls()
123
124 cert_old = self.get_server_certificate()
125
126 self.add_tls(cert='new')
127
128 self.assertNotEqual(cert_old, self.get_server_certificate(),
129 'change certificate')
130
131 def test_tls_certificate_key_rsa(self):
132 self.load('empty')
133
134 self.certificate()
135
136 self.assertEqual(self.conf_get('/certificates/default/key'),
137 'RSA (1024 bits)', 'certificate key rsa')
138
139 def test_tls_certificate_key_ec(self):
140 self.load('empty')
141
142 subprocess.call(['openssl', 'ecparam', '-noout', '-genkey',
143 '-out', self.testdir + '/ec.key',
144 '-name', 'prime256v1'])
145
146 subprocess.call(['openssl', 'req', '-x509', '-new',
147 '-config', self.testdir + '/openssl.conf',
148 '-key', self.testdir + '/ec.key', '-subj', '/CN=ec/',
149 '-out', self.testdir + '/ec.crt'])
150
151 self.certificate_load('ec')
152
153 self.assertEqual(self.conf_get('/certificates/ec/key'), 'ECDH',
154 'certificate key ec')
155
156 def test_tls_certificate_chain_options(self):
157 self.load('empty')
158
159 self.certificate()
160
161 chain = self.conf_get('/certificates/default/chain')
162
163 self.assertEqual(len(chain), 1, 'certificate chain length')
164
165 cert = chain[0]
166
167 self.assertEqual(cert['subject']['common_name'], 'default',
168 'certificate subject common name')
169 self.assertEqual(cert['issuer']['common_name'], 'default',
170 'certificate issuer common name')
171
172 self.assertLess(abs(self.sec_epoch() -
173 self.openssl_date_to_sec_epoch(cert['validity']['since'])), 5,
174 'certificate validity since')
175 self.assertEqual(
176 self.openssl_date_to_sec_epoch(cert['validity']['until']) -
177 self.openssl_date_to_sec_epoch(cert['validity']['since']), 2592000,
178 'certificate validity until')
179
180 def test_tls_certificate_chain(self):
181 self.load('empty')
182
183 self.certificate('root', False)
184
185 subprocess.call(['openssl', 'req', '-new', '-config',
186 self.testdir + '/openssl.conf', '-subj', '/CN=int/',
187 '-out', self.testdir + '/int.csr',
188 '-keyout', self.testdir + '/int.key'])
189
190 subprocess.call(['openssl', 'req', '-new', '-config',
191 self.testdir + '/openssl.conf', '-subj', '/CN=end/',
192 '-out', self.testdir + '/end.csr',
193 '-keyout', self.testdir + '/end.key'])
194
195 with open(self.testdir + '/ca.conf', 'w') as f:
196 f.write("""[ ca ]
197default_ca = myca
198
199[ myca ]
200new_certs_dir = %(dir)s
201database = %(database)s
202default_md = sha1
203policy = myca_policy
204serial = %(certserial)s
205default_days = 1
206x509_extensions = myca_extensions
207
208[ myca_policy ]
209commonName = supplied
210
211[ myca_extensions ]
212basicConstraints = critical,CA:TRUE""" % {
213 'dir': self.testdir,
214 'database': self.testdir + '/certindex',
215 'certserial': self.testdir + '/certserial'
216 })
217
218 with open(self.testdir + '/certserial', 'w') as f:
219 f.write('1000')
220
221 with open(self.testdir + '/certindex', 'w') as f:
222 f.write('')
223
224 subprocess.call(['openssl', 'ca', '-batch',
225 '-config', self.testdir + '/ca.conf',
226 '-keyfile', self.testdir + '/root.key',
227 '-cert', self.testdir + '/root.crt',
228 '-subj', '/CN=int/',
229 '-in', self.testdir + '/int.csr',
230 '-out', self.testdir + '/int.crt'])
231
232 subprocess.call(['openssl', 'ca', '-batch',
233 '-config', self.testdir + '/ca.conf',
234 '-keyfile', self.testdir + '/int.key',
235 '-cert', self.testdir + '/int.crt',
236 '-subj', '/CN=end/',
237 '-in', self.testdir + '/end.csr',
238 '-out', self.testdir + '/end.crt'])
239
240 with open(self.testdir + '/end-int.crt', 'wb') as crt, \
241 open(self.testdir + '/end.crt', 'rb') as end, \
242 open(self.testdir + '/int.crt', 'rb') as int:
243 crt.write(end.read() + int.read())
244
245 self.context = ssl.create_default_context()
246 self.context.check_hostname = False
247 self.context.verify_mode = ssl.CERT_REQUIRED
248 self.context.load_verify_locations(self.testdir + '/root.crt')
249
250 # incomplete chain
251
252 self.assertIn('success', self.certificate_load('end', 'end'),
253 'certificate chain end upload')
254
255 chain = self.conf_get('/certificates/end/chain')
256 self.assertEqual(len(chain), 1, 'certificate chain end length')
257 self.assertEqual(chain[0]['subject']['common_name'], 'end',
258 'certificate chain end subject common name')
259 self.assertEqual(chain[0]['issuer']['common_name'], 'int',
260 'certificate chain end issuer common name')
261
262 self.add_tls(cert='end')
263
264 try:
265 resp = self.get_ssl()
266 except ssl.SSLError:
267 resp = None
268
269 self.assertEqual(resp, None, 'certificate chain incomplete chain')
270
271 # intermediate
272
273 self.assertIn('success', self.certificate_load('int', 'int'),
274 'certificate chain int upload')
275
276 chain = self.conf_get('/certificates/int/chain')
277 self.assertEqual(len(chain), 1, 'certificate chain int length')
278 self.assertEqual(chain[0]['subject']['common_name'], 'int',
279 'certificate chain int subject common name')
280 self.assertEqual(chain[0]['issuer']['common_name'], 'root',
281 'certificate chain int issuer common name')
282
283 self.add_tls(cert='int')
284
285 self.assertEqual(self.get_ssl()['status'], 200,
286 'certificate chain intermediate')
287
288 # intermediate server
289
290 self.assertIn('success', self.certificate_load('end-int', 'end'),
291 'certificate chain end-int upload')
292
293 chain = self.conf_get('/certificates/end-int/chain')
294 self.assertEqual(len(chain), 2, 'certificate chain end-int length')
295 self.assertEqual(chain[0]['subject']['common_name'], 'end',
296 'certificate chain end-int int subject common name')
297 self.assertEqual(chain[0]['issuer']['common_name'], 'int',
298 'certificate chain end-int int issuer common name')
299 self.assertEqual(chain[1]['subject']['common_name'], 'int',
300 'certificate chain end-int end subject common name')
301 self.assertEqual(chain[1]['issuer']['common_name'], 'root',
302 'certificate chain end-int end issuer common name')
303
304 self.add_tls(cert='end-int')
305
306 self.assertEqual(self.get_ssl()['status'], 200,
307 'certificate chain intermediate server')
308
309 @unittest.expectedFailure
310 def test_tls_reconfigure(self):
311 self.load('empty')
312
313 self.certificate()
314
315 (resp, sock) = self.get(headers={
316 'Connection': 'keep-alive',
317 'Host': 'localhost'
316 'Host': 'localhost',
317 'Connection': 'keep-alive'
318 }, start=True)
319
320 self.assertEqual(resp['status'], 200, 'initial status')
321
322 self.add_tls()
323
324 self.assertEqual(self.get(sock=sock)['status'], 200,
325 'reconfigure status')
326 self.assertEqual(self.get_ssl()['status'], 200,
327 'reconfigure tls status')
328
329 def test_tls_keepalive(self):
330 self.load('mirror')
331
332 self.certificate()
333
334 self.add_tls(application='mirror')
335
336 (resp, sock) = self.post_ssl(headers={
318 }, start=True)
319
320 self.assertEqual(resp['status'], 200, 'initial status')
321
322 self.add_tls()
323
324 self.assertEqual(self.get(sock=sock)['status'], 200,
325 'reconfigure status')
326 self.assertEqual(self.get_ssl()['status'], 200,
327 'reconfigure tls status')
328
329 def test_tls_keepalive(self):
330 self.load('mirror')
331
332 self.certificate()
333
334 self.add_tls(application='mirror')
335
336 (resp, sock) = self.post_ssl(headers={
337 'Host': 'localhost',
337 'Connection': 'keep-alive',
338 'Connection': 'keep-alive',
338 'Content-Type': 'text/html',
339 'Host': 'localhost'
339 'Content-Type': 'text/html'
340 }, start=True, body='0123456789')
341
342 self.assertEqual(resp['body'], '0123456789', 'keepalive 1')
343
344 resp = self.post_ssl(headers={
340 }, start=True, body='0123456789')
341
342 self.assertEqual(resp['body'], '0123456789', 'keepalive 1')
343
344 resp = self.post_ssl(headers={
345 'Host': 'localhost',
345 'Connection': 'close',
346 'Connection': 'close',
346 'Content-Type': 'text/html',
347 'Host': 'localhost'
347 'Content-Type': 'text/html'
348 }, sock=sock, body='0123456789')
349
350 self.assertEqual(resp['body'], '0123456789', 'keepalive 2')
351
352 @unittest.expectedFailure
353 def test_tls_keepalive_certificate_remove(self):
354 self.load('empty')
355
356 self.certificate()
357
358 self.add_tls()
359
360 (resp, sock) = self.get_ssl(headers={
348 }, sock=sock, body='0123456789')
349
350 self.assertEqual(resp['body'], '0123456789', 'keepalive 2')
351
352 @unittest.expectedFailure
353 def test_tls_keepalive_certificate_remove(self):
354 self.load('empty')
355
356 self.certificate()
357
358 self.add_tls()
359
360 (resp, sock) = self.get_ssl(headers={
361 'Connection': 'keep-alive',
362 'Host': 'localhost'
361 'Host': 'localhost',
362 'Connection': 'keep-alive'
363 }, start=True)
364
365 self.conf({
366 "application": "empty"
367 }, 'listeners/*:7080')
368 self.conf_delete('/certificates/default')
369
370 try:
371 resp = self.get_ssl(headers={
363 }, start=True)
364
365 self.conf({
366 "application": "empty"
367 }, 'listeners/*:7080')
368 self.conf_delete('/certificates/default')
369
370 try:
371 resp = self.get_ssl(headers={
372 'Connection': 'close',
373 'Host': 'localhost'
372 'Host': 'localhost',
373 'Connection': 'close'
374 }, sock=sock)
375 except:
376 resp = None
377
378 self.assertEqual(resp, None, 'keepalive remove certificate')
379
380 @unittest.expectedFailure
381 def test_tls_certificates_remove_all(self):
382 self.load('empty')
383
384 self.certificate()
385
386 self.assertIn('success', self.conf_delete('/certificates'),
387 'remove all certificates')
388
389 def test_tls_application_respawn(self):
390 self.skip_alerts.append(r'process \d+ exited on signal 9')
391 self.load('mirror')
392
393 self.certificate()
394
395 self.conf('1', 'applications/mirror/processes')
396
397 self.add_tls(application='mirror')
398
399 (resp, sock) = self.post_ssl(headers={
374 }, sock=sock)
375 except:
376 resp = None
377
378 self.assertEqual(resp, None, 'keepalive remove certificate')
379
380 @unittest.expectedFailure
381 def test_tls_certificates_remove_all(self):
382 self.load('empty')
383
384 self.certificate()
385
386 self.assertIn('success', self.conf_delete('/certificates'),
387 'remove all certificates')
388
389 def test_tls_application_respawn(self):
390 self.skip_alerts.append(r'process \d+ exited on signal 9')
391 self.load('mirror')
392
393 self.certificate()
394
395 self.conf('1', 'applications/mirror/processes')
396
397 self.add_tls(application='mirror')
398
399 (resp, sock) = self.post_ssl(headers={
400 'Host': 'localhost',
400 'Connection': 'keep-alive',
401 'Connection': 'keep-alive',
401 'Content-Type': 'text/html',
402 'Host': 'localhost'
402 'Content-Type': 'text/html'
403 }, start=True, body='0123456789')
404
405 app_id = self.findall(r'(\d+)#\d+ "mirror" application started')[0]
406
407 subprocess.call(['kill', '-9', app_id])
408
409 self.wait_for_record(re.compile(' (?!' + app_id +
410 '#)(\d+)#\d+ "mirror" application started'))
411
412 resp = self.post_ssl(headers={
403 }, start=True, body='0123456789')
404
405 app_id = self.findall(r'(\d+)#\d+ "mirror" application started')[0]
406
407 subprocess.call(['kill', '-9', app_id])
408
409 self.wait_for_record(re.compile(' (?!' + app_id +
410 '#)(\d+)#\d+ "mirror" application started'))
411
412 resp = self.post_ssl(headers={
413 'Host': 'localhost',
413 'Connection': 'close',
414 'Connection': 'close',
414 'Content-Type': 'text/html',
415 'Host': 'localhost'
415 'Content-Type': 'text/html'
416 }, sock=sock, body='0123456789')
417
418 self.assertEqual(resp['status'], 200, 'application respawn status')
419 self.assertEqual(resp['body'], '0123456789', 'application respawn body')
420
421if __name__ == '__main__':
422 TestUnitTLS.main()
416 }, sock=sock, body='0123456789')
417
418 self.assertEqual(resp['status'], 200, 'application respawn status')
419 self.assertEqual(resp['body'], '0123456789', 'application respawn body')
420
421if __name__ == '__main__':
422 TestUnitTLS.main()